mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 07:32:38 +02:00
Compare commits
11 Commits
deploy-011
...
deploy-011
Author | SHA1 | Date | |
---|---|---|---|
|
b6265cee11 | ||
|
c91af247e9 | ||
|
7a31227de1 | ||
|
4f477604c5 | ||
|
2970f4395b | ||
|
d1ec909b36 | ||
|
c67c5bbf42 | ||
|
ecb0e57a1a | ||
|
8c61f61b46 | ||
|
662a18c933 | ||
|
1c2426a052 |
@@ -35,21 +35,8 @@ public class RateLimiter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static RateLimiter forExpensiveRequest() {
|
|
||||||
return new RateLimiter(5, 10);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static RateLimiter custom(int perMinute) {
|
public static RateLimiter custom(int perMinute) {
|
||||||
return new RateLimiter(perMinute, 60);
|
return new RateLimiter(4 * perMinute, perMinute);
|
||||||
}
|
|
||||||
|
|
||||||
public static RateLimiter forSpamBots() {
|
|
||||||
return new RateLimiter(120, 3600);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public static RateLimiter forLogin() {
|
|
||||||
return new RateLimiter(3, 15);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cleanIdleBuckets() {
|
private void cleanIdleBuckets() {
|
||||||
@@ -62,7 +49,7 @@ public class RateLimiter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private Bucket createBucket() {
|
private Bucket createBucket() {
|
||||||
var refill = Refill.greedy(1, Duration.ofSeconds(refillRate));
|
var refill = Refill.greedy(refillRate, Duration.ofSeconds(60));
|
||||||
var bw = Bandwidth.classic(capacity, refill);
|
var bw = Bandwidth.classic(capacity, refill);
|
||||||
return Bucket.builder().addLimit(bw).build();
|
return Bucket.builder().addLimit(bw).build();
|
||||||
}
|
}
|
||||||
|
@@ -33,6 +33,7 @@ import java.sql.SQLException;
|
|||||||
import java.time.*;
|
import java.time.*;
|
||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
@@ -71,7 +72,7 @@ public class FeedFetcherService {
|
|||||||
public enum UpdateMode {
|
public enum UpdateMode {
|
||||||
CLEAN,
|
CLEAN,
|
||||||
REFRESH
|
REFRESH
|
||||||
};
|
}
|
||||||
|
|
||||||
public void updateFeeds(UpdateMode updateMode) throws IOException {
|
public void updateFeeds(UpdateMode updateMode) throws IOException {
|
||||||
if (updating) // Prevent concurrent updates
|
if (updating) // Prevent concurrent updates
|
||||||
@@ -87,6 +88,7 @@ public class FeedFetcherService {
|
|||||||
.followRedirects(HttpClient.Redirect.NORMAL)
|
.followRedirects(HttpClient.Redirect.NORMAL)
|
||||||
.version(HttpClient.Version.HTTP_2)
|
.version(HttpClient.Version.HTTP_2)
|
||||||
.build();
|
.build();
|
||||||
|
ExecutorService fetchExecutor = Executors.newCachedThreadPool();
|
||||||
FeedJournal feedJournal = FeedJournal.create();
|
FeedJournal feedJournal = FeedJournal.create();
|
||||||
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds")
|
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds")
|
||||||
) {
|
) {
|
||||||
@@ -131,7 +133,7 @@ public class FeedFetcherService {
|
|||||||
|
|
||||||
FetchResult feedData;
|
FetchResult feedData;
|
||||||
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
|
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
|
||||||
feedData = fetchFeedData(feed, client, ifModifiedSinceDate, ifNoneMatchTag);
|
feedData = fetchFeedData(feed, client, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
feedData = new FetchResult.TransientError();
|
feedData = new FetchResult.TransientError();
|
||||||
}
|
}
|
||||||
@@ -211,6 +213,7 @@ public class FeedFetcherService {
|
|||||||
|
|
||||||
private FetchResult fetchFeedData(FeedDefinition feed,
|
private FetchResult fetchFeedData(FeedDefinition feed,
|
||||||
HttpClient client,
|
HttpClient client,
|
||||||
|
ExecutorService executorService,
|
||||||
@Nullable String ifModifiedSinceDate,
|
@Nullable String ifModifiedSinceDate,
|
||||||
@Nullable String ifNoneMatchTag)
|
@Nullable String ifNoneMatchTag)
|
||||||
{
|
{
|
||||||
@@ -237,7 +240,14 @@ public class FeedFetcherService {
|
|||||||
HttpRequest getRequest = requestBuilder.build();
|
HttpRequest getRequest = requestBuilder.build();
|
||||||
|
|
||||||
for (int i = 0; i < 3; i++) {
|
for (int i = 0; i < 3; i++) {
|
||||||
HttpResponse<byte[]> rs = client.send(getRequest, HttpResponse.BodyHandlers.ofByteArray());
|
|
||||||
|
/* Note we need to use an executor to time-limit the send() method in HttpClient, as
|
||||||
|
* its support for timeouts only applies to the time until response starts to be received,
|
||||||
|
* and does not catch the case when the server starts to send data but then hangs.
|
||||||
|
*/
|
||||||
|
HttpResponse<byte[]> rs = executorService.submit(
|
||||||
|
() -> client.send(getRequest, HttpResponse.BodyHandlers.ofByteArray()))
|
||||||
|
.get(15, TimeUnit.SECONDS);
|
||||||
|
|
||||||
if (rs.statusCode() == 429) { // Too Many Requests
|
if (rs.statusCode() == 429) { // Too Many Requests
|
||||||
int retryAfter = Integer.parseInt(rs.headers().firstValue("Retry-After").orElse("2"));
|
int retryAfter = Integer.parseInt(rs.headers().firstValue("Retry-After").orElse("2"));
|
||||||
|
@@ -103,10 +103,18 @@ public class CrawlerMain extends ProcessMainClass {
|
|||||||
this.blacklist = blacklist;
|
this.blacklist = blacklist;
|
||||||
this.node = processConfiguration.node();
|
this.node = processConfiguration.node();
|
||||||
|
|
||||||
|
SimpleBlockingThreadPool.ThreadType threadType;
|
||||||
|
if (Boolean.getBoolean("crawler.useVirtualThreads")) {
|
||||||
|
threadType = SimpleBlockingThreadPool.ThreadType.VIRTUAL;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
threadType = SimpleBlockingThreadPool.ThreadType.PLATFORM;
|
||||||
|
}
|
||||||
|
|
||||||
pool = new SimpleBlockingThreadPool("CrawlerPool",
|
pool = new SimpleBlockingThreadPool("CrawlerPool",
|
||||||
Integer.getInteger("crawler.poolSize", 256),
|
Integer.getInteger("crawler.poolSize", 256),
|
||||||
1,
|
1,
|
||||||
SimpleBlockingThreadPool.ThreadType.VIRTUAL);
|
threadType);
|
||||||
|
|
||||||
|
|
||||||
// Wait for the blacklist to be loaded before starting the crawl
|
// Wait for the blacklist to be loaded before starting the crawl
|
||||||
@@ -493,7 +501,7 @@ public class CrawlerMain extends ProcessMainClass {
|
|||||||
return new CrawlDataReference(slopPath);
|
return new CrawlDataReference(slopPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (Exception e) {
|
||||||
logger.debug("Failed to read previous crawl data for {}", specification.domain());
|
logger.debug("Failed to read previous crawl data for {}", specification.domain());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -11,6 +11,7 @@ import java.nio.file.Path;
|
|||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.DriverManager;
|
import java.sql.DriverManager;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
|
import java.time.Duration;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
@@ -24,6 +25,17 @@ public class DomainStateDb implements AutoCloseable {
|
|||||||
|
|
||||||
private final Connection connection;
|
private final Connection connection;
|
||||||
|
|
||||||
|
|
||||||
|
public record CrawlMeta(
|
||||||
|
String domainName,
|
||||||
|
Instant lastFullCrawl,
|
||||||
|
Duration recrawlTime,
|
||||||
|
Duration crawlTime,
|
||||||
|
int recrawlErrors,
|
||||||
|
int crawlChanges,
|
||||||
|
int totalCrawlSize
|
||||||
|
) {}
|
||||||
|
|
||||||
public record SummaryRecord(
|
public record SummaryRecord(
|
||||||
String domainName,
|
String domainName,
|
||||||
Instant lastUpdated,
|
Instant lastUpdated,
|
||||||
@@ -102,6 +114,17 @@ public class DomainStateDb implements AutoCloseable {
|
|||||||
feedUrl TEXT
|
feedUrl TEXT
|
||||||
)
|
)
|
||||||
""");
|
""");
|
||||||
|
stmt.executeUpdate("""
|
||||||
|
CREATE TABLE IF NOT EXISTS crawl_meta (
|
||||||
|
domain TEXT PRIMARY KEY,
|
||||||
|
lastFullCrawlEpochMs LONG NOT NULL,
|
||||||
|
recrawlTimeMs LONG NOT NULL,
|
||||||
|
recrawlErrors INTEGER NOT NULL,
|
||||||
|
crawlTimeMs LONG NOT NULL,
|
||||||
|
crawlChanges INTEGER NOT NULL,
|
||||||
|
totalCrawlSize INTEGER NOT NULL
|
||||||
|
)
|
||||||
|
""");
|
||||||
stmt.executeUpdate("""
|
stmt.executeUpdate("""
|
||||||
CREATE TABLE IF NOT EXISTS favicon (
|
CREATE TABLE IF NOT EXISTS favicon (
|
||||||
domain TEXT PRIMARY KEY,
|
domain TEXT PRIMARY KEY,
|
||||||
@@ -164,6 +187,26 @@ public class DomainStateDb implements AutoCloseable {
|
|||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void save(CrawlMeta crawlMeta) {
|
||||||
|
if (connection == null) throw new IllegalStateException("No connection to domainstate db");
|
||||||
|
|
||||||
|
try (var stmt = connection.prepareStatement("""
|
||||||
|
INSERT OR REPLACE INTO crawl_meta (domain, lastFullCrawlEpochMs, recrawlTimeMs, recrawlErrors, crawlTimeMs, crawlChanges, totalCrawlSize)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||||
|
""")) {
|
||||||
|
stmt.setString(1, crawlMeta.domainName());
|
||||||
|
stmt.setLong(2, crawlMeta.lastFullCrawl.toEpochMilli());
|
||||||
|
stmt.setLong(3, crawlMeta.recrawlTime.toMillis());
|
||||||
|
stmt.setInt(4, crawlMeta.recrawlErrors);
|
||||||
|
stmt.setLong(5, crawlMeta.crawlTime.toMillis());
|
||||||
|
stmt.setInt(6, crawlMeta.crawlChanges);
|
||||||
|
stmt.setInt(7, crawlMeta.totalCrawlSize);
|
||||||
|
stmt.executeUpdate();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
logger.error("Failed to insert crawl meta record", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void save(SummaryRecord record) {
|
public void save(SummaryRecord record) {
|
||||||
if (connection == null) throw new IllegalStateException("No connection to domainstate db");
|
if (connection == null) throw new IllegalStateException("No connection to domainstate db");
|
||||||
|
|
||||||
@@ -182,7 +225,35 @@ public class DomainStateDb implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<SummaryRecord> get(String domainName) {
|
public Optional<CrawlMeta> getMeta(String domainName) {
|
||||||
|
if (connection == null)
|
||||||
|
return Optional.empty();
|
||||||
|
|
||||||
|
try (var stmt = connection.prepareStatement("""
|
||||||
|
SELECT domain, lastFullCrawlEpochMs, recrawlTimeMs, recrawlErrors, crawlTimeMs, crawlChanges, totalCrawlSize
|
||||||
|
FROM crawl_meta
|
||||||
|
WHERE domain = ?
|
||||||
|
""")) {
|
||||||
|
stmt.setString(1, domainName);
|
||||||
|
var rs = stmt.executeQuery();
|
||||||
|
if (rs.next()) {
|
||||||
|
return Optional.of(new CrawlMeta(
|
||||||
|
rs.getString("domain"),
|
||||||
|
Instant.ofEpochMilli(rs.getLong("lastFullCrawlEpochMs")),
|
||||||
|
Duration.ofMillis(rs.getLong("recrawlTimeMs")),
|
||||||
|
Duration.ofMillis(rs.getLong("crawlTimeMs")),
|
||||||
|
rs.getInt("recrawlErrors"),
|
||||||
|
rs.getInt("crawlChanges"),
|
||||||
|
rs.getInt("totalCrawlSize")
|
||||||
|
));
|
||||||
|
}
|
||||||
|
} catch (SQLException ex) {
|
||||||
|
logger.error("Failed to get crawl meta record", ex);
|
||||||
|
}
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Optional<SummaryRecord> getSummary(String domainName) {
|
||||||
if (connection == null)
|
if (connection == null)
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
|
|
||||||
|
@@ -29,6 +29,7 @@ import java.net.http.HttpResponse;
|
|||||||
import java.net.http.HttpTimeoutException;
|
import java.net.http.HttpTimeoutException;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
import java.util.zip.GZIPInputStream;
|
import java.util.zip.GZIPInputStream;
|
||||||
@@ -56,12 +57,22 @@ public class HttpFetcherImpl implements HttpFetcher {
|
|||||||
private final HttpClient client;
|
private final HttpClient client;
|
||||||
|
|
||||||
private HttpClient createClient() {
|
private HttpClient createClient() {
|
||||||
|
final ExecutorService executorService;
|
||||||
|
|
||||||
|
if (Boolean.getBoolean("crawler.httpclient.useVirtualThreads")) {
|
||||||
|
executorService = Executors.newVirtualThreadPerTaskExecutor();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
executorService = Executors.newCachedThreadPool();
|
||||||
|
}
|
||||||
|
|
||||||
return HttpClient.newBuilder()
|
return HttpClient.newBuilder()
|
||||||
.sslContext(NoSecuritySSL.buildSslContext())
|
.sslContext(NoSecuritySSL.buildSslContext())
|
||||||
.cookieHandler(cookies)
|
.cookieHandler(cookies)
|
||||||
.followRedirects(HttpClient.Redirect.NORMAL)
|
.followRedirects(HttpClient.Redirect.NORMAL)
|
||||||
|
.version(HttpClient.Version.HTTP_1_1)
|
||||||
.connectTimeout(Duration.ofSeconds(8))
|
.connectTimeout(Duration.ofSeconds(8))
|
||||||
.executor(Executors.newVirtualThreadPerTaskExecutor())
|
.executor(executorService)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -8,7 +8,10 @@ import java.net.http.HttpHeaders;
|
|||||||
import java.net.http.HttpResponse;
|
import java.net.http.HttpResponse;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.*;
|
||||||
import java.util.zip.GZIPInputStream;
|
import java.util.zip.GZIPInputStream;
|
||||||
|
|
||||||
/** Input buffer for temporary storage of a HTTP response
|
/** Input buffer for temporary storage of a HTTP response
|
||||||
@@ -39,7 +42,7 @@ public abstract class WarcInputBuffer implements AutoCloseable {
|
|||||||
* and suppressed from the headers.
|
* and suppressed from the headers.
|
||||||
* If an error occurs, a buffer will be created with no content and an error status.
|
* If an error occurs, a buffer will be created with no content and an error status.
|
||||||
*/
|
*/
|
||||||
static WarcInputBuffer forResponse(HttpResponse<InputStream> rsp) {
|
static WarcInputBuffer forResponse(HttpResponse<InputStream> rsp, Duration timeLimit) {
|
||||||
if (rsp == null)
|
if (rsp == null)
|
||||||
return new ErrorBuffer();
|
return new ErrorBuffer();
|
||||||
|
|
||||||
@@ -51,11 +54,11 @@ public abstract class WarcInputBuffer implements AutoCloseable {
|
|||||||
|
|
||||||
if (contentEncoding == null && contentLength > 0 && contentLength < 8192) {
|
if (contentEncoding == null && contentLength > 0 && contentLength < 8192) {
|
||||||
// If the content is small and not compressed, we can just read it into memory
|
// If the content is small and not compressed, we can just read it into memory
|
||||||
return new MemoryBuffer(headers, is, contentLength);
|
return new MemoryBuffer(headers, timeLimit, is, contentLength);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// Otherwise, we unpack it into a file and read it from there
|
// Otherwise, we unpack it into a file and read it from there
|
||||||
return new FileBuffer(headers, is);
|
return new FileBuffer(headers, timeLimit, is);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Exception ex) {
|
catch (Exception ex) {
|
||||||
@@ -64,9 +67,16 @@ public abstract class WarcInputBuffer implements AutoCloseable {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final ExecutorService virtualExecutorService = Executors.newVirtualThreadPerTaskExecutor();
|
||||||
|
|
||||||
|
private Future<Integer> readAsync(InputStream is, byte[] out) {
|
||||||
|
return virtualExecutorService.submit(() -> is.read(out));
|
||||||
|
}
|
||||||
|
|
||||||
/** Copy an input stream to an output stream, with a maximum size and time limit */
|
/** Copy an input stream to an output stream, with a maximum size and time limit */
|
||||||
protected void copy(InputStream is, OutputStream os) {
|
protected void copy(InputStream is, OutputStream os, Duration timeLimit) {
|
||||||
long startTime = System.currentTimeMillis();
|
Instant start = Instant.now();
|
||||||
|
Instant timeout = start.plus(timeLimit);
|
||||||
long size = 0;
|
long size = 0;
|
||||||
|
|
||||||
byte[] buffer = new byte[8192];
|
byte[] buffer = new byte[8192];
|
||||||
@@ -76,7 +86,15 @@ public abstract class WarcInputBuffer implements AutoCloseable {
|
|||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
int n = is.read(buffer);
|
Duration remaining = Duration.between(Instant.now(), timeout);
|
||||||
|
if (remaining.isNegative()) {
|
||||||
|
truncationReason = WarcTruncationReason.TIME;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Integer> readAsync = readAsync(is, buffer);
|
||||||
|
int n = readAsync.get(remaining.toMillis(), TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
if (n < 0) break;
|
if (n < 0) break;
|
||||||
size += n;
|
size += n;
|
||||||
os.write(buffer, 0, n);
|
os.write(buffer, 0, n);
|
||||||
@@ -85,12 +103,11 @@ public abstract class WarcInputBuffer implements AutoCloseable {
|
|||||||
truncationReason = WarcTruncationReason.LENGTH;
|
truncationReason = WarcTruncationReason.LENGTH;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
} catch (IOException|ExecutionException e) {
|
||||||
if (System.currentTimeMillis() - startTime > WarcRecorder.MAX_TIME) {
|
truncationReason = WarcTruncationReason.UNSPECIFIED;
|
||||||
truncationReason = WarcTruncationReason.TIME;
|
} catch (TimeoutException e) {
|
||||||
break;
|
truncationReason = WarcTruncationReason.TIME;
|
||||||
}
|
} catch (InterruptedException e) {
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -123,12 +140,12 @@ class ErrorBuffer extends WarcInputBuffer {
|
|||||||
/** Buffer for when we have the response in memory */
|
/** Buffer for when we have the response in memory */
|
||||||
class MemoryBuffer extends WarcInputBuffer {
|
class MemoryBuffer extends WarcInputBuffer {
|
||||||
byte[] data;
|
byte[] data;
|
||||||
public MemoryBuffer(HttpHeaders headers, InputStream responseStream, int size) {
|
public MemoryBuffer(HttpHeaders headers, Duration timeLimit, InputStream responseStream, int size) {
|
||||||
super(headers);
|
super(headers);
|
||||||
|
|
||||||
var outputStream = new ByteArrayOutputStream(size);
|
var outputStream = new ByteArrayOutputStream(size);
|
||||||
|
|
||||||
copy(responseStream, outputStream);
|
copy(responseStream, outputStream, timeLimit);
|
||||||
|
|
||||||
data = outputStream.toByteArray();
|
data = outputStream.toByteArray();
|
||||||
}
|
}
|
||||||
@@ -152,7 +169,7 @@ class MemoryBuffer extends WarcInputBuffer {
|
|||||||
class FileBuffer extends WarcInputBuffer {
|
class FileBuffer extends WarcInputBuffer {
|
||||||
private final Path tempFile;
|
private final Path tempFile;
|
||||||
|
|
||||||
public FileBuffer(HttpHeaders headers, InputStream responseStream) throws IOException {
|
public FileBuffer(HttpHeaders headers, Duration timeLimit, InputStream responseStream) throws IOException {
|
||||||
super(suppressContentEncoding(headers));
|
super(suppressContentEncoding(headers));
|
||||||
|
|
||||||
this.tempFile = Files.createTempFile("rsp", ".html");
|
this.tempFile = Files.createTempFile("rsp", ".html");
|
||||||
@@ -160,7 +177,7 @@ class FileBuffer extends WarcInputBuffer {
|
|||||||
|
|
||||||
if ("gzip".equalsIgnoreCase(headers.firstValue("Content-Encoding").orElse(""))) {
|
if ("gzip".equalsIgnoreCase(headers.firstValue("Content-Encoding").orElse(""))) {
|
||||||
try (var out = Files.newOutputStream(tempFile)) {
|
try (var out = Files.newOutputStream(tempFile)) {
|
||||||
copy(new GZIPInputStream(responseStream), out);
|
copy(new GZIPInputStream(responseStream), out, timeLimit);
|
||||||
}
|
}
|
||||||
catch (Exception ex) {
|
catch (Exception ex) {
|
||||||
truncationReason = WarcTruncationReason.UNSPECIFIED;
|
truncationReason = WarcTruncationReason.UNSPECIFIED;
|
||||||
@@ -168,7 +185,7 @@ class FileBuffer extends WarcInputBuffer {
|
|||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
try (var out = Files.newOutputStream(tempFile)) {
|
try (var out = Files.newOutputStream(tempFile)) {
|
||||||
copy(responseStream, out);
|
copy(responseStream, out, timeLimit);
|
||||||
}
|
}
|
||||||
catch (Exception ex) {
|
catch (Exception ex) {
|
||||||
truncationReason = WarcTruncationReason.UNSPECIFIED;
|
truncationReason = WarcTruncationReason.UNSPECIFIED;
|
||||||
|
@@ -102,7 +102,7 @@ public class WarcRecorder implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
try (WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response);
|
try (WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response, request.timeout().orElseGet(() -> Duration.ofMillis(MAX_TIME)));
|
||||||
InputStream inputStream = inputBuffer.read())
|
InputStream inputStream = inputBuffer.read())
|
||||||
{
|
{
|
||||||
if (cookies.hasCookies()) {
|
if (cookies.hasCookies()) {
|
||||||
|
@@ -26,6 +26,8 @@ import java.io.IOException;
|
|||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
@@ -113,15 +115,19 @@ public class CrawlerRetreiver implements AutoCloseable {
|
|||||||
throw new InterruptedException();
|
throw new InterruptedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Instant recrawlStart = Instant.now();
|
||||||
|
CrawlerRevisitor.RecrawlMetadata recrawlMetadata = crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer);
|
||||||
|
Duration recrawlTime = Duration.between(recrawlStart, Instant.now());
|
||||||
|
|
||||||
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
|
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
|
||||||
if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) {
|
if (recrawlMetadata.size() > 0) {
|
||||||
// If we have reference data, we will always grow the crawl depth a bit
|
// If we have reference data, we will always grow the crawl depth a bit
|
||||||
crawlFrontier.increaseDepth(1.5, 2500);
|
crawlFrontier.increaseDepth(1.5, 2500);
|
||||||
}
|
}
|
||||||
|
|
||||||
oldCrawlData.close(); // proactively close the crawl data reference here to not hold onto expensive resources
|
oldCrawlData.close(); // proactively close the crawl data reference here to not hold onto expensive resources
|
||||||
|
|
||||||
yield crawlDomain(probedUrl, robotsRules, delayTimer, domainLinks);
|
yield crawlDomain(probedUrl, robotsRules, delayTimer, domainLinks, recrawlMetadata, recrawlTime);
|
||||||
}
|
}
|
||||||
case HttpFetcher.DomainProbeResult.Redirect(EdgeDomain domain1) -> {
|
case HttpFetcher.DomainProbeResult.Redirect(EdgeDomain domain1) -> {
|
||||||
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, "Redirect", domain1.toString()));
|
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, "Redirect", domain1.toString()));
|
||||||
@@ -143,16 +149,29 @@ public class CrawlerRetreiver implements AutoCloseable {
|
|||||||
private int crawlDomain(EdgeUrl rootUrl,
|
private int crawlDomain(EdgeUrl rootUrl,
|
||||||
SimpleRobotRules robotsRules,
|
SimpleRobotRules robotsRules,
|
||||||
CrawlDelayTimer delayTimer,
|
CrawlDelayTimer delayTimer,
|
||||||
DomainLinks domainLinks) {
|
DomainLinks domainLinks,
|
||||||
|
CrawlerRevisitor.RecrawlMetadata recrawlMetadata,
|
||||||
|
Duration recrawlTime) {
|
||||||
|
|
||||||
|
Instant crawlStart = Instant.now();
|
||||||
|
|
||||||
// Add external links to the crawl frontier
|
// Add external links to the crawl frontier
|
||||||
crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto));
|
crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto));
|
||||||
|
|
||||||
// Fetch sitemaps
|
// Fetch sitemaps
|
||||||
for (var sitemap : robotsRules.getSitemaps()) {
|
for (var sitemap : robotsRules.getSitemaps()) {
|
||||||
crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer));
|
|
||||||
|
// Validate the sitemap URL and check if it belongs to the domain as the root URL
|
||||||
|
if (EdgeUrl.parse(sitemap)
|
||||||
|
.map(url -> url.getDomain().equals(rootUrl.domain))
|
||||||
|
.orElse(false)) {
|
||||||
|
|
||||||
|
crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int crawlerAdditions = 0;
|
||||||
|
|
||||||
while (!crawlFrontier.isEmpty()
|
while (!crawlFrontier.isEmpty()
|
||||||
&& !crawlFrontier.isCrawlDepthReached()
|
&& !crawlFrontier.isCrawlDepthReached()
|
||||||
&& errorCount < MAX_ERRORS
|
&& errorCount < MAX_ERRORS
|
||||||
@@ -184,7 +203,11 @@ public class CrawlerRetreiver implements AutoCloseable {
|
|||||||
continue;
|
continue;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
fetchContentWithReference(top, delayTimer, DocumentWithReference.empty());
|
var result = fetchContentWithReference(top, delayTimer, DocumentWithReference.empty());
|
||||||
|
|
||||||
|
if (result.isOk()) {
|
||||||
|
crawlerAdditions++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (InterruptedException ex) {
|
catch (InterruptedException ex) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
@@ -192,6 +215,17 @@ public class CrawlerRetreiver implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Duration crawlTime = Duration.between(crawlStart, Instant.now());
|
||||||
|
domainStateDb.save(new DomainStateDb.CrawlMeta(
|
||||||
|
domain,
|
||||||
|
Instant.now(),
|
||||||
|
recrawlTime,
|
||||||
|
crawlTime,
|
||||||
|
recrawlMetadata.errors(),
|
||||||
|
crawlerAdditions,
|
||||||
|
recrawlMetadata.size() + crawlerAdditions
|
||||||
|
));
|
||||||
|
|
||||||
return crawlFrontier.visitedSize();
|
return crawlFrontier.visitedSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -324,7 +358,7 @@ public class CrawlerRetreiver implements AutoCloseable {
|
|||||||
);
|
);
|
||||||
|
|
||||||
private Optional<String> guessFeedUrl(CrawlDelayTimer timer) throws InterruptedException {
|
private Optional<String> guessFeedUrl(CrawlDelayTimer timer) throws InterruptedException {
|
||||||
var oldDomainStateRecord = domainStateDb.get(domain);
|
var oldDomainStateRecord = domainStateDb.getSummary(domain);
|
||||||
|
|
||||||
// If we are already aware of an old feed URL, then we can just revalidate it
|
// If we are already aware of an old feed URL, then we can just revalidate it
|
||||||
if (oldDomainStateRecord.isPresent()) {
|
if (oldDomainStateRecord.isPresent()) {
|
||||||
|
@@ -31,7 +31,7 @@ public class CrawlerRevisitor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Performs a re-crawl of old documents, comparing etags and last-modified */
|
/** Performs a re-crawl of old documents, comparing etags and last-modified */
|
||||||
public int recrawl(CrawlDataReference oldCrawlData,
|
public RecrawlMetadata recrawl(CrawlDataReference oldCrawlData,
|
||||||
SimpleRobotRules robotsRules,
|
SimpleRobotRules robotsRules,
|
||||||
CrawlDelayTimer delayTimer)
|
CrawlDelayTimer delayTimer)
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
@@ -39,6 +39,7 @@ public class CrawlerRevisitor {
|
|||||||
int retained = 0;
|
int retained = 0;
|
||||||
int errors = 0;
|
int errors = 0;
|
||||||
int skipped = 0;
|
int skipped = 0;
|
||||||
|
int size = 0;
|
||||||
|
|
||||||
for (CrawledDocument doc : oldCrawlData) {
|
for (CrawledDocument doc : oldCrawlData) {
|
||||||
if (errors > 20) {
|
if (errors > 20) {
|
||||||
@@ -82,6 +83,7 @@ public class CrawlerRevisitor {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size++;
|
||||||
|
|
||||||
double skipProb;
|
double skipProb;
|
||||||
|
|
||||||
@@ -154,6 +156,8 @@ public class CrawlerRevisitor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return recrawled;
|
return new RecrawlMetadata(size, errors, skipped);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public record RecrawlMetadata(int size, int errors, int skipped) {}
|
||||||
}
|
}
|
||||||
|
@@ -8,6 +8,7 @@ import java.io.IOException;
|
|||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
|
import java.time.Duration;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.*;
|
import static org.junit.jupiter.api.Assertions.*;
|
||||||
@@ -47,8 +48,8 @@ class DomainStateDbTest {
|
|||||||
db.save(allFields);
|
db.save(allFields);
|
||||||
db.save(minFields);
|
db.save(minFields);
|
||||||
|
|
||||||
assertEquals(allFields, db.get("all.marginalia.nu").orElseThrow());
|
assertEquals(allFields, db.getSummary("all.marginalia.nu").orElseThrow());
|
||||||
assertEquals(minFields, db.get("min.marginalia.nu").orElseThrow());
|
assertEquals(minFields, db.getSummary("min.marginalia.nu").orElseThrow());
|
||||||
|
|
||||||
var updatedAllFields = new DomainStateDb.SummaryRecord(
|
var updatedAllFields = new DomainStateDb.SummaryRecord(
|
||||||
"all.marginalia.nu",
|
"all.marginalia.nu",
|
||||||
@@ -59,7 +60,19 @@ class DomainStateDbTest {
|
|||||||
);
|
);
|
||||||
|
|
||||||
db.save(updatedAllFields);
|
db.save(updatedAllFields);
|
||||||
assertEquals(updatedAllFields, db.get("all.marginalia.nu").orElseThrow());
|
assertEquals(updatedAllFields, db.getSummary("all.marginalia.nu").orElseThrow());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMetadata() throws SQLException {
|
||||||
|
try (var db = new DomainStateDb(tempFile)) {
|
||||||
|
var original = new DomainStateDb.CrawlMeta("example.com", Instant.ofEpochMilli(12345), Duration.ofMillis(30), Duration.ofMillis(300), 1, 2, 3);
|
||||||
|
db.save(original);
|
||||||
|
|
||||||
|
var maybeMeta = db.getMeta("example.com");
|
||||||
|
assertTrue(maybeMeta.isPresent());
|
||||||
|
assertEquals(original, maybeMeta.get());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -0,0 +1,152 @@
|
|||||||
|
package nu.marginalia.crawl.retreival.fetcher;
|
||||||
|
|
||||||
|
import com.sun.net.httpserver.HttpServer;
|
||||||
|
import nu.marginalia.crawl.fetcher.Cookies;
|
||||||
|
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
|
||||||
|
import org.junit.jupiter.api.*;
|
||||||
|
import org.netpreserve.jwarc.WarcReader;
|
||||||
|
import org.netpreserve.jwarc.WarcRequest;
|
||||||
|
import org.netpreserve.jwarc.WarcResponse;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.http.HttpClient;
|
||||||
|
import java.net.http.HttpRequest;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@Tag("slow")
|
||||||
|
class WarcRecorderFakeServerTest {
|
||||||
|
static HttpServer server;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void setUpAll() throws IOException {
|
||||||
|
server = HttpServer.create(new InetSocketAddress("127.0.0.1", 14510), 10);
|
||||||
|
|
||||||
|
// This endpoint will finish sending the response immediately
|
||||||
|
server.createContext("/fast", exchange -> {
|
||||||
|
exchange.getResponseHeaders().add("Content-Type", "text/html");
|
||||||
|
exchange.sendResponseHeaders(200, "<html><body>hello</body></html>".length());
|
||||||
|
|
||||||
|
try (var os = exchange.getResponseBody()) {
|
||||||
|
os.write("<html><body>hello</body></html>".getBytes());
|
||||||
|
os.flush();
|
||||||
|
}
|
||||||
|
exchange.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
// This endpoint will take 10 seconds to finish sending the response,
|
||||||
|
// which should trigger a timeout in the client
|
||||||
|
server.createContext("/slow", exchange -> {
|
||||||
|
exchange.getResponseHeaders().add("Content-Type", "text/html");
|
||||||
|
exchange.sendResponseHeaders(200, "<html><body>hello</body></html>:D".length());
|
||||||
|
|
||||||
|
try (var os = exchange.getResponseBody()) {
|
||||||
|
os.write("<html><body>hello</body></html>".getBytes());
|
||||||
|
os.flush();
|
||||||
|
try {
|
||||||
|
TimeUnit.SECONDS.sleep(10);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
os.write(":D".getBytes());
|
||||||
|
os.flush();
|
||||||
|
}
|
||||||
|
exchange.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
server.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void tearDownAll() {
|
||||||
|
server.stop(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
Path fileNameWarc;
|
||||||
|
Path fileNameParquet;
|
||||||
|
WarcRecorder client;
|
||||||
|
|
||||||
|
HttpClient httpClient;
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
httpClient = HttpClient.newBuilder().build();
|
||||||
|
|
||||||
|
fileNameWarc = Files.createTempFile("test", ".warc");
|
||||||
|
fileNameParquet = Files.createTempFile("test", ".parquet");
|
||||||
|
|
||||||
|
client = new WarcRecorder(fileNameWarc, new Cookies());
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
client.close();
|
||||||
|
Files.delete(fileNameWarc);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void fetchFast() throws Exception {
|
||||||
|
client.fetch(httpClient,
|
||||||
|
HttpRequest.newBuilder()
|
||||||
|
.uri(new java.net.URI("http://localhost:14510/fast"))
|
||||||
|
.timeout(Duration.ofSeconds(1))
|
||||||
|
.header("User-agent", "test.marginalia.nu")
|
||||||
|
.header("Accept-Encoding", "gzip")
|
||||||
|
.GET().build()
|
||||||
|
);
|
||||||
|
|
||||||
|
Map<String, String> sampleData = new HashMap<>();
|
||||||
|
try (var warcReader = new WarcReader(fileNameWarc)) {
|
||||||
|
warcReader.forEach(record -> {
|
||||||
|
if (record instanceof WarcRequest req) {
|
||||||
|
sampleData.put(record.type(), req.target());
|
||||||
|
}
|
||||||
|
if (record instanceof WarcResponse rsp) {
|
||||||
|
sampleData.put(record.type(), rsp.target());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
System.out.println(sampleData);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void fetchSlow() throws Exception {
|
||||||
|
Instant start = Instant.now();
|
||||||
|
client.fetch(httpClient,
|
||||||
|
HttpRequest.newBuilder()
|
||||||
|
.uri(new java.net.URI("http://localhost:14510/slow"))
|
||||||
|
.timeout(Duration.ofSeconds(1))
|
||||||
|
.header("User-agent", "test.marginalia.nu")
|
||||||
|
.header("Accept-Encoding", "gzip")
|
||||||
|
.GET().build()
|
||||||
|
);
|
||||||
|
Instant end = Instant.now();
|
||||||
|
|
||||||
|
Map<String, String> sampleData = new HashMap<>();
|
||||||
|
try (var warcReader = new WarcReader(fileNameWarc)) {
|
||||||
|
warcReader.forEach(record -> {
|
||||||
|
if (record instanceof WarcRequest req) {
|
||||||
|
sampleData.put(record.type(), req.target());
|
||||||
|
}
|
||||||
|
if (record instanceof WarcResponse rsp) {
|
||||||
|
sampleData.put(record.type(), rsp.target());
|
||||||
|
System.out.println(rsp.target());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
System.out.println(sampleData);
|
||||||
|
|
||||||
|
// Timeout is set to 1 second, but the server will take 5 seconds to respond,
|
||||||
|
// so we expect the request to take 1s and change before it times out.
|
||||||
|
|
||||||
|
Assertions.assertTrue(Duration.between(start, end).toMillis() < 2000);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Reference in New Issue
Block a user