1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 17:32:39 +02:00

Compare commits

...

13 Commits

Author SHA1 Message Date
Viktor Lofgren
b6265cee11 (feeds) Add timeout code to send()
Due to the unique way java's HttpClient implements timeouts, we must always wrap it in an executor to catch the scenario that a server stops sending data mid-response, which would otherwise hang the send method forever.
2025-04-08 22:09:59 +02:00
Viktor Lofgren
c91af247e9 (rate-limit) Fix rate limiting logic
The rate limiter was misconfigured to regenerate tokens at a fixed rate of 1 per refillRate; not refillRate per minute.  Additionally increasing the default bucket size to 4x refill rate.
2025-04-05 12:26:26 +02:00
Viktor Lofgren
7a31227de1 (crawler) Filter out robots.txt-sitemaps that belong to different domains 2025-04-02 13:35:37 +02:00
Viktor Lofgren
4f477604c5 (crawler) Improve error handling in parquet->slop conversion
Parquet code throws a RuntimeException, which was not correctly caught, leading to a failure to crawl.
2025-04-02 13:16:01 +02:00
Viktor Lofgren
2970f4395b (minor) Test code cleanup 2025-04-02 13:16:01 +02:00
Viktor Lofgren
d1ec909b36 (crawler) Improve handling of timeouts to prevent crawler from getting stuck 2025-04-02 12:57:21 +02:00
Viktor Lofgren
c67c5bbf42 (crawler) Experimentally drop to HTTP 1.1 for crawler to see if this solves stuck send()s 2025-04-01 12:05:21 +02:00
Viktor Lofgren
ecb0e57a1a (crawler) Make the use of virtual threads in the crawler configurable via system properties 2025-03-27 21:26:05 +01:00
Viktor Lofgren
8c61f61b46 (crawler) Add crawling metadata to domainstate db 2025-03-27 16:38:37 +01:00
Viktor Lofgren
662a18c933 Revert "(crawler) Further rearrange crawl order"
This reverts commit 1c2426a052.

The change does not appear necessary to avoid problems.
2025-03-27 11:25:08 +01:00
Viktor Lofgren
1c2426a052 (crawler) Further rearrange crawl order
Limit crawl order preferrence to edu domains, to avoid hitting stuff like medium and wordpress with shotgun requests.
2025-03-27 11:19:20 +01:00
Viktor Lofgren
34df7441ac (crawler) Add some jitter to crawl delay to avoid accidentally synchronized requests 2025-03-27 11:15:16 +01:00
Viktor Lofgren
5387e2bd80 (crawler) Adjust crawl order to get a better mixture of domains 2025-03-27 11:12:48 +01:00
12 changed files with 364 additions and 55 deletions

View File

@@ -35,21 +35,8 @@ public class RateLimiter {
} }
public static RateLimiter forExpensiveRequest() {
return new RateLimiter(5, 10);
}
public static RateLimiter custom(int perMinute) { public static RateLimiter custom(int perMinute) {
return new RateLimiter(perMinute, 60); return new RateLimiter(4 * perMinute, perMinute);
}
public static RateLimiter forSpamBots() {
return new RateLimiter(120, 3600);
}
public static RateLimiter forLogin() {
return new RateLimiter(3, 15);
} }
private void cleanIdleBuckets() { private void cleanIdleBuckets() {
@@ -62,7 +49,7 @@ public class RateLimiter {
} }
private Bucket createBucket() { private Bucket createBucket() {
var refill = Refill.greedy(1, Duration.ofSeconds(refillRate)); var refill = Refill.greedy(refillRate, Duration.ofSeconds(60));
var bw = Bandwidth.classic(capacity, refill); var bw = Bandwidth.classic(capacity, refill);
return Bucket.builder().addLimit(bw).build(); return Bucket.builder().addLimit(bw).build();
} }

View File

@@ -33,6 +33,7 @@ import java.sql.SQLException;
import java.time.*; import java.time.*;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@@ -71,7 +72,7 @@ public class FeedFetcherService {
public enum UpdateMode { public enum UpdateMode {
CLEAN, CLEAN,
REFRESH REFRESH
}; }
public void updateFeeds(UpdateMode updateMode) throws IOException { public void updateFeeds(UpdateMode updateMode) throws IOException {
if (updating) // Prevent concurrent updates if (updating) // Prevent concurrent updates
@@ -87,6 +88,7 @@ public class FeedFetcherService {
.followRedirects(HttpClient.Redirect.NORMAL) .followRedirects(HttpClient.Redirect.NORMAL)
.version(HttpClient.Version.HTTP_2) .version(HttpClient.Version.HTTP_2)
.build(); .build();
ExecutorService fetchExecutor = Executors.newCachedThreadPool();
FeedJournal feedJournal = FeedJournal.create(); FeedJournal feedJournal = FeedJournal.create();
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds") var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds")
) { ) {
@@ -131,7 +133,7 @@ public class FeedFetcherService {
FetchResult feedData; FetchResult feedData;
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) { try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
feedData = fetchFeedData(feed, client, ifModifiedSinceDate, ifNoneMatchTag); feedData = fetchFeedData(feed, client, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
} catch (Exception ex) { } catch (Exception ex) {
feedData = new FetchResult.TransientError(); feedData = new FetchResult.TransientError();
} }
@@ -211,6 +213,7 @@ public class FeedFetcherService {
private FetchResult fetchFeedData(FeedDefinition feed, private FetchResult fetchFeedData(FeedDefinition feed,
HttpClient client, HttpClient client,
ExecutorService executorService,
@Nullable String ifModifiedSinceDate, @Nullable String ifModifiedSinceDate,
@Nullable String ifNoneMatchTag) @Nullable String ifNoneMatchTag)
{ {
@@ -237,7 +240,14 @@ public class FeedFetcherService {
HttpRequest getRequest = requestBuilder.build(); HttpRequest getRequest = requestBuilder.build();
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
HttpResponse<byte[]> rs = client.send(getRequest, HttpResponse.BodyHandlers.ofByteArray());
/* Note we need to use an executor to time-limit the send() method in HttpClient, as
* its support for timeouts only applies to the time until response starts to be received,
* and does not catch the case when the server starts to send data but then hangs.
*/
HttpResponse<byte[]> rs = executorService.submit(
() -> client.send(getRequest, HttpResponse.BodyHandlers.ofByteArray()))
.get(15, TimeUnit.SECONDS);
if (rs.statusCode() == 429) { // Too Many Requests if (rs.statusCode() == 429) { // Too Many Requests
int retryAfter = Integer.parseInt(rs.headers().firstValue("Retry-After").orElse("2")); int retryAfter = Integer.parseInt(rs.headers().firstValue("Retry-After").orElse("2"));

View File

@@ -103,10 +103,18 @@ public class CrawlerMain extends ProcessMainClass {
this.blacklist = blacklist; this.blacklist = blacklist;
this.node = processConfiguration.node(); this.node = processConfiguration.node();
SimpleBlockingThreadPool.ThreadType threadType;
if (Boolean.getBoolean("crawler.useVirtualThreads")) {
threadType = SimpleBlockingThreadPool.ThreadType.VIRTUAL;
}
else {
threadType = SimpleBlockingThreadPool.ThreadType.PLATFORM;
}
pool = new SimpleBlockingThreadPool("CrawlerPool", pool = new SimpleBlockingThreadPool("CrawlerPool",
Integer.getInteger("crawler.poolSize", 256), Integer.getInteger("crawler.poolSize", 256),
1, 1,
SimpleBlockingThreadPool.ThreadType.VIRTUAL); threadType);
// Wait for the blacklist to be loaded before starting the crawl // Wait for the blacklist to be loaded before starting the crawl
@@ -319,7 +327,7 @@ public class CrawlerMain extends ProcessMainClass {
randomOrder.put(spec.domain, r.nextInt()); randomOrder.put(spec.domain, r.nextInt());
} }
return Comparator.comparing((CrawlSpecRecord spec) -> topDomainCounts.getOrDefault(EdgeDomain.getTopDomain(spec.domain), 0)) return Comparator.comparing((CrawlSpecRecord spec) -> topDomainCounts.getOrDefault(EdgeDomain.getTopDomain(spec.domain), 0) >= 8)
.reversed() .reversed()
.thenComparing(spec -> randomOrder.get(spec.domain)) .thenComparing(spec -> randomOrder.get(spec.domain))
.thenComparing(Record::hashCode); // non-deterministic tie-breaker to .thenComparing(Record::hashCode); // non-deterministic tie-breaker to
@@ -493,7 +501,7 @@ public class CrawlerMain extends ProcessMainClass {
return new CrawlDataReference(slopPath); return new CrawlDataReference(slopPath);
} }
} catch (IOException e) { } catch (Exception e) {
logger.debug("Failed to read previous crawl data for {}", specification.domain()); logger.debug("Failed to read previous crawl data for {}", specification.domain());
} }

View File

@@ -11,6 +11,7 @@ import java.nio.file.Path;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
@@ -24,6 +25,17 @@ public class DomainStateDb implements AutoCloseable {
private final Connection connection; private final Connection connection;
public record CrawlMeta(
String domainName,
Instant lastFullCrawl,
Duration recrawlTime,
Duration crawlTime,
int recrawlErrors,
int crawlChanges,
int totalCrawlSize
) {}
public record SummaryRecord( public record SummaryRecord(
String domainName, String domainName,
Instant lastUpdated, Instant lastUpdated,
@@ -102,6 +114,17 @@ public class DomainStateDb implements AutoCloseable {
feedUrl TEXT feedUrl TEXT
) )
"""); """);
stmt.executeUpdate("""
CREATE TABLE IF NOT EXISTS crawl_meta (
domain TEXT PRIMARY KEY,
lastFullCrawlEpochMs LONG NOT NULL,
recrawlTimeMs LONG NOT NULL,
recrawlErrors INTEGER NOT NULL,
crawlTimeMs LONG NOT NULL,
crawlChanges INTEGER NOT NULL,
totalCrawlSize INTEGER NOT NULL
)
""");
stmt.executeUpdate(""" stmt.executeUpdate("""
CREATE TABLE IF NOT EXISTS favicon ( CREATE TABLE IF NOT EXISTS favicon (
domain TEXT PRIMARY KEY, domain TEXT PRIMARY KEY,
@@ -164,6 +187,26 @@ public class DomainStateDb implements AutoCloseable {
return Optional.empty(); return Optional.empty();
} }
public void save(CrawlMeta crawlMeta) {
if (connection == null) throw new IllegalStateException("No connection to domainstate db");
try (var stmt = connection.prepareStatement("""
INSERT OR REPLACE INTO crawl_meta (domain, lastFullCrawlEpochMs, recrawlTimeMs, recrawlErrors, crawlTimeMs, crawlChanges, totalCrawlSize)
VALUES (?, ?, ?, ?, ?, ?, ?)
""")) {
stmt.setString(1, crawlMeta.domainName());
stmt.setLong(2, crawlMeta.lastFullCrawl.toEpochMilli());
stmt.setLong(3, crawlMeta.recrawlTime.toMillis());
stmt.setInt(4, crawlMeta.recrawlErrors);
stmt.setLong(5, crawlMeta.crawlTime.toMillis());
stmt.setInt(6, crawlMeta.crawlChanges);
stmt.setInt(7, crawlMeta.totalCrawlSize);
stmt.executeUpdate();
} catch (SQLException e) {
logger.error("Failed to insert crawl meta record", e);
}
}
public void save(SummaryRecord record) { public void save(SummaryRecord record) {
if (connection == null) throw new IllegalStateException("No connection to domainstate db"); if (connection == null) throw new IllegalStateException("No connection to domainstate db");
@@ -182,7 +225,35 @@ public class DomainStateDb implements AutoCloseable {
} }
} }
public Optional<SummaryRecord> get(String domainName) { public Optional<CrawlMeta> getMeta(String domainName) {
if (connection == null)
return Optional.empty();
try (var stmt = connection.prepareStatement("""
SELECT domain, lastFullCrawlEpochMs, recrawlTimeMs, recrawlErrors, crawlTimeMs, crawlChanges, totalCrawlSize
FROM crawl_meta
WHERE domain = ?
""")) {
stmt.setString(1, domainName);
var rs = stmt.executeQuery();
if (rs.next()) {
return Optional.of(new CrawlMeta(
rs.getString("domain"),
Instant.ofEpochMilli(rs.getLong("lastFullCrawlEpochMs")),
Duration.ofMillis(rs.getLong("recrawlTimeMs")),
Duration.ofMillis(rs.getLong("crawlTimeMs")),
rs.getInt("recrawlErrors"),
rs.getInt("crawlChanges"),
rs.getInt("totalCrawlSize")
));
}
} catch (SQLException ex) {
logger.error("Failed to get crawl meta record", ex);
}
return Optional.empty();
}
public Optional<SummaryRecord> getSummary(String domainName) {
if (connection == null) if (connection == null)
return Optional.empty(); return Optional.empty();

View File

@@ -29,6 +29,7 @@ import java.net.http.HttpResponse;
import java.net.http.HttpTimeoutException; import java.net.http.HttpTimeoutException;
import java.time.Duration; import java.time.Duration;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
@@ -56,12 +57,22 @@ public class HttpFetcherImpl implements HttpFetcher {
private final HttpClient client; private final HttpClient client;
private HttpClient createClient() { private HttpClient createClient() {
final ExecutorService executorService;
if (Boolean.getBoolean("crawler.httpclient.useVirtualThreads")) {
executorService = Executors.newVirtualThreadPerTaskExecutor();
}
else {
executorService = Executors.newCachedThreadPool();
}
return HttpClient.newBuilder() return HttpClient.newBuilder()
.sslContext(NoSecuritySSL.buildSslContext()) .sslContext(NoSecuritySSL.buildSslContext())
.cookieHandler(cookies) .cookieHandler(cookies)
.followRedirects(HttpClient.Redirect.NORMAL) .followRedirects(HttpClient.Redirect.NORMAL)
.version(HttpClient.Version.HTTP_1_1)
.connectTimeout(Duration.ofSeconds(8)) .connectTimeout(Duration.ofSeconds(8))
.executor(Executors.newVirtualThreadPerTaskExecutor()) .executor(executorService)
.build(); .build();
} }

View File

@@ -8,7 +8,10 @@ import java.net.http.HttpHeaders;
import java.net.http.HttpResponse; import java.net.http.HttpResponse;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
/** Input buffer for temporary storage of a HTTP response /** Input buffer for temporary storage of a HTTP response
@@ -39,7 +42,7 @@ public abstract class WarcInputBuffer implements AutoCloseable {
* and suppressed from the headers. * and suppressed from the headers.
* If an error occurs, a buffer will be created with no content and an error status. * If an error occurs, a buffer will be created with no content and an error status.
*/ */
static WarcInputBuffer forResponse(HttpResponse<InputStream> rsp) { static WarcInputBuffer forResponse(HttpResponse<InputStream> rsp, Duration timeLimit) {
if (rsp == null) if (rsp == null)
return new ErrorBuffer(); return new ErrorBuffer();
@@ -51,11 +54,11 @@ public abstract class WarcInputBuffer implements AutoCloseable {
if (contentEncoding == null && contentLength > 0 && contentLength < 8192) { if (contentEncoding == null && contentLength > 0 && contentLength < 8192) {
// If the content is small and not compressed, we can just read it into memory // If the content is small and not compressed, we can just read it into memory
return new MemoryBuffer(headers, is, contentLength); return new MemoryBuffer(headers, timeLimit, is, contentLength);
} }
else { else {
// Otherwise, we unpack it into a file and read it from there // Otherwise, we unpack it into a file and read it from there
return new FileBuffer(headers, is); return new FileBuffer(headers, timeLimit, is);
} }
} }
catch (Exception ex) { catch (Exception ex) {
@@ -64,9 +67,16 @@ public abstract class WarcInputBuffer implements AutoCloseable {
} }
private static final ExecutorService virtualExecutorService = Executors.newVirtualThreadPerTaskExecutor();
private Future<Integer> readAsync(InputStream is, byte[] out) {
return virtualExecutorService.submit(() -> is.read(out));
}
/** Copy an input stream to an output stream, with a maximum size and time limit */ /** Copy an input stream to an output stream, with a maximum size and time limit */
protected void copy(InputStream is, OutputStream os) { protected void copy(InputStream is, OutputStream os, Duration timeLimit) {
long startTime = System.currentTimeMillis(); Instant start = Instant.now();
Instant timeout = start.plus(timeLimit);
long size = 0; long size = 0;
byte[] buffer = new byte[8192]; byte[] buffer = new byte[8192];
@@ -76,7 +86,15 @@ public abstract class WarcInputBuffer implements AutoCloseable {
while (true) { while (true) {
try { try {
int n = is.read(buffer); Duration remaining = Duration.between(Instant.now(), timeout);
if (remaining.isNegative()) {
truncationReason = WarcTruncationReason.TIME;
break;
}
Future<Integer> readAsync = readAsync(is, buffer);
int n = readAsync.get(remaining.toMillis(), TimeUnit.MILLISECONDS);
if (n < 0) break; if (n < 0) break;
size += n; size += n;
os.write(buffer, 0, n); os.write(buffer, 0, n);
@@ -85,12 +103,11 @@ public abstract class WarcInputBuffer implements AutoCloseable {
truncationReason = WarcTruncationReason.LENGTH; truncationReason = WarcTruncationReason.LENGTH;
break; break;
} }
} catch (IOException|ExecutionException e) {
if (System.currentTimeMillis() - startTime > WarcRecorder.MAX_TIME) { truncationReason = WarcTruncationReason.UNSPECIFIED;
} catch (TimeoutException e) {
truncationReason = WarcTruncationReason.TIME; truncationReason = WarcTruncationReason.TIME;
break; } catch (InterruptedException e) {
}
} catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
@@ -123,12 +140,12 @@ class ErrorBuffer extends WarcInputBuffer {
/** Buffer for when we have the response in memory */ /** Buffer for when we have the response in memory */
class MemoryBuffer extends WarcInputBuffer { class MemoryBuffer extends WarcInputBuffer {
byte[] data; byte[] data;
public MemoryBuffer(HttpHeaders headers, InputStream responseStream, int size) { public MemoryBuffer(HttpHeaders headers, Duration timeLimit, InputStream responseStream, int size) {
super(headers); super(headers);
var outputStream = new ByteArrayOutputStream(size); var outputStream = new ByteArrayOutputStream(size);
copy(responseStream, outputStream); copy(responseStream, outputStream, timeLimit);
data = outputStream.toByteArray(); data = outputStream.toByteArray();
} }
@@ -152,7 +169,7 @@ class MemoryBuffer extends WarcInputBuffer {
class FileBuffer extends WarcInputBuffer { class FileBuffer extends WarcInputBuffer {
private final Path tempFile; private final Path tempFile;
public FileBuffer(HttpHeaders headers, InputStream responseStream) throws IOException { public FileBuffer(HttpHeaders headers, Duration timeLimit, InputStream responseStream) throws IOException {
super(suppressContentEncoding(headers)); super(suppressContentEncoding(headers));
this.tempFile = Files.createTempFile("rsp", ".html"); this.tempFile = Files.createTempFile("rsp", ".html");
@@ -160,7 +177,7 @@ class FileBuffer extends WarcInputBuffer {
if ("gzip".equalsIgnoreCase(headers.firstValue("Content-Encoding").orElse(""))) { if ("gzip".equalsIgnoreCase(headers.firstValue("Content-Encoding").orElse(""))) {
try (var out = Files.newOutputStream(tempFile)) { try (var out = Files.newOutputStream(tempFile)) {
copy(new GZIPInputStream(responseStream), out); copy(new GZIPInputStream(responseStream), out, timeLimit);
} }
catch (Exception ex) { catch (Exception ex) {
truncationReason = WarcTruncationReason.UNSPECIFIED; truncationReason = WarcTruncationReason.UNSPECIFIED;
@@ -168,7 +185,7 @@ class FileBuffer extends WarcInputBuffer {
} }
else { else {
try (var out = Files.newOutputStream(tempFile)) { try (var out = Files.newOutputStream(tempFile)) {
copy(responseStream, out); copy(responseStream, out, timeLimit);
} }
catch (Exception ex) { catch (Exception ex) {
truncationReason = WarcTruncationReason.UNSPECIFIED; truncationReason = WarcTruncationReason.UNSPECIFIED;

View File

@@ -102,7 +102,7 @@ public class WarcRecorder implements AutoCloseable {
} }
try (WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response); try (WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response, request.timeout().orElseGet(() -> Duration.ofMillis(MAX_TIME)));
InputStream inputStream = inputBuffer.read()) InputStream inputStream = inputBuffer.read())
{ {
if (cookies.hasCookies()) { if (cookies.hasCookies()) {

View File

@@ -3,6 +3,7 @@ package nu.marginalia.crawl.retreival;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
import static java.lang.Math.max; import static java.lang.Math.max;
import static java.lang.Math.min; import static java.lang.Math.min;
@@ -53,12 +54,13 @@ public class CrawlDelayTimer {
public void waitFetchDelay(long spentTime) { public void waitFetchDelay(long spentTime) {
long sleepTime = delayTime; long sleepTime = delayTime;
long jitter = ThreadLocalRandom.current().nextLong(0, 150);
try { try {
if (sleepTime >= 1) { if (sleepTime >= 1) {
if (spentTime > sleepTime) if (spentTime > sleepTime)
return; return;
Thread.sleep(min(sleepTime - spentTime, 5000)); Thread.sleep(min(sleepTime - spentTime, 5000) + jitter);
} else { } else {
// When no crawl delay is specified, lean toward twice the fetch+process time, // When no crawl delay is specified, lean toward twice the fetch+process time,
// within sane limits. This means slower servers get slower crawling, and faster // within sane limits. This means slower servers get slower crawling, and faster
@@ -71,12 +73,12 @@ public class CrawlDelayTimer {
if (spentTime > sleepTime) if (spentTime > sleepTime)
return; return;
Thread.sleep(sleepTime - spentTime); Thread.sleep(sleepTime - spentTime + jitter);
} }
if (slowDown) { if (slowDown) {
// Additional delay when the server is signalling it wants slower requests // Additional delay when the server is signalling it wants slower requests
Thread.sleep(DEFAULT_CRAWL_DELAY_MIN_MS); Thread.sleep(DEFAULT_CRAWL_DELAY_MIN_MS + jitter);
} }
} }
catch (InterruptedException e) { catch (InterruptedException e) {

View File

@@ -26,6 +26,8 @@ import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@@ -113,15 +115,19 @@ public class CrawlerRetreiver implements AutoCloseable {
throw new InterruptedException(); throw new InterruptedException();
} }
Instant recrawlStart = Instant.now();
CrawlerRevisitor.RecrawlMetadata recrawlMetadata = crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer);
Duration recrawlTime = Duration.between(recrawlStart, Instant.now());
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified // Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) { if (recrawlMetadata.size() > 0) {
// If we have reference data, we will always grow the crawl depth a bit // If we have reference data, we will always grow the crawl depth a bit
crawlFrontier.increaseDepth(1.5, 2500); crawlFrontier.increaseDepth(1.5, 2500);
} }
oldCrawlData.close(); // proactively close the crawl data reference here to not hold onto expensive resources oldCrawlData.close(); // proactively close the crawl data reference here to not hold onto expensive resources
yield crawlDomain(probedUrl, robotsRules, delayTimer, domainLinks); yield crawlDomain(probedUrl, robotsRules, delayTimer, domainLinks, recrawlMetadata, recrawlTime);
} }
case HttpFetcher.DomainProbeResult.Redirect(EdgeDomain domain1) -> { case HttpFetcher.DomainProbeResult.Redirect(EdgeDomain domain1) -> {
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, "Redirect", domain1.toString())); domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, "Redirect", domain1.toString()));
@@ -143,15 +149,28 @@ public class CrawlerRetreiver implements AutoCloseable {
private int crawlDomain(EdgeUrl rootUrl, private int crawlDomain(EdgeUrl rootUrl,
SimpleRobotRules robotsRules, SimpleRobotRules robotsRules,
CrawlDelayTimer delayTimer, CrawlDelayTimer delayTimer,
DomainLinks domainLinks) { DomainLinks domainLinks,
CrawlerRevisitor.RecrawlMetadata recrawlMetadata,
Duration recrawlTime) {
Instant crawlStart = Instant.now();
// Add external links to the crawl frontier // Add external links to the crawl frontier
crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto)); crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto));
// Fetch sitemaps // Fetch sitemaps
for (var sitemap : robotsRules.getSitemaps()) { for (var sitemap : robotsRules.getSitemaps()) {
// Validate the sitemap URL and check if it belongs to the domain as the root URL
if (EdgeUrl.parse(sitemap)
.map(url -> url.getDomain().equals(rootUrl.domain))
.orElse(false)) {
crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer)); crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer));
} }
}
int crawlerAdditions = 0;
while (!crawlFrontier.isEmpty() while (!crawlFrontier.isEmpty()
&& !crawlFrontier.isCrawlDepthReached() && !crawlFrontier.isCrawlDepthReached()
@@ -184,7 +203,11 @@ public class CrawlerRetreiver implements AutoCloseable {
continue; continue;
try { try {
fetchContentWithReference(top, delayTimer, DocumentWithReference.empty()); var result = fetchContentWithReference(top, delayTimer, DocumentWithReference.empty());
if (result.isOk()) {
crawlerAdditions++;
}
} }
catch (InterruptedException ex) { catch (InterruptedException ex) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@@ -192,6 +215,17 @@ public class CrawlerRetreiver implements AutoCloseable {
} }
} }
Duration crawlTime = Duration.between(crawlStart, Instant.now());
domainStateDb.save(new DomainStateDb.CrawlMeta(
domain,
Instant.now(),
recrawlTime,
crawlTime,
recrawlMetadata.errors(),
crawlerAdditions,
recrawlMetadata.size() + crawlerAdditions
));
return crawlFrontier.visitedSize(); return crawlFrontier.visitedSize();
} }
@@ -324,7 +358,7 @@ public class CrawlerRetreiver implements AutoCloseable {
); );
private Optional<String> guessFeedUrl(CrawlDelayTimer timer) throws InterruptedException { private Optional<String> guessFeedUrl(CrawlDelayTimer timer) throws InterruptedException {
var oldDomainStateRecord = domainStateDb.get(domain); var oldDomainStateRecord = domainStateDb.getSummary(domain);
// If we are already aware of an old feed URL, then we can just revalidate it // If we are already aware of an old feed URL, then we can just revalidate it
if (oldDomainStateRecord.isPresent()) { if (oldDomainStateRecord.isPresent()) {

View File

@@ -31,7 +31,7 @@ public class CrawlerRevisitor {
} }
/** Performs a re-crawl of old documents, comparing etags and last-modified */ /** Performs a re-crawl of old documents, comparing etags and last-modified */
public int recrawl(CrawlDataReference oldCrawlData, public RecrawlMetadata recrawl(CrawlDataReference oldCrawlData,
SimpleRobotRules robotsRules, SimpleRobotRules robotsRules,
CrawlDelayTimer delayTimer) CrawlDelayTimer delayTimer)
throws InterruptedException { throws InterruptedException {
@@ -39,6 +39,7 @@ public class CrawlerRevisitor {
int retained = 0; int retained = 0;
int errors = 0; int errors = 0;
int skipped = 0; int skipped = 0;
int size = 0;
for (CrawledDocument doc : oldCrawlData) { for (CrawledDocument doc : oldCrawlData) {
if (errors > 20) { if (errors > 20) {
@@ -82,6 +83,7 @@ public class CrawlerRevisitor {
continue; continue;
} }
size++;
double skipProb; double skipProb;
@@ -154,6 +156,8 @@ public class CrawlerRevisitor {
} }
} }
return recrawled; return new RecrawlMetadata(size, errors, skipped);
} }
public record RecrawlMetadata(int size, int errors, int skipped) {}
} }

View File

@@ -8,6 +8,7 @@ import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.sql.SQLException; import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.Assertions.*;
@@ -47,8 +48,8 @@ class DomainStateDbTest {
db.save(allFields); db.save(allFields);
db.save(minFields); db.save(minFields);
assertEquals(allFields, db.get("all.marginalia.nu").orElseThrow()); assertEquals(allFields, db.getSummary("all.marginalia.nu").orElseThrow());
assertEquals(minFields, db.get("min.marginalia.nu").orElseThrow()); assertEquals(minFields, db.getSummary("min.marginalia.nu").orElseThrow());
var updatedAllFields = new DomainStateDb.SummaryRecord( var updatedAllFields = new DomainStateDb.SummaryRecord(
"all.marginalia.nu", "all.marginalia.nu",
@@ -59,7 +60,19 @@ class DomainStateDbTest {
); );
db.save(updatedAllFields); db.save(updatedAllFields);
assertEquals(updatedAllFields, db.get("all.marginalia.nu").orElseThrow()); assertEquals(updatedAllFields, db.getSummary("all.marginalia.nu").orElseThrow());
}
}
@Test
public void testMetadata() throws SQLException {
try (var db = new DomainStateDb(tempFile)) {
var original = new DomainStateDb.CrawlMeta("example.com", Instant.ofEpochMilli(12345), Duration.ofMillis(30), Duration.ofMillis(300), 1, 2, 3);
db.save(original);
var maybeMeta = db.getMeta("example.com");
assertTrue(maybeMeta.isPresent());
assertEquals(original, maybeMeta.get());
} }
} }

View File

@@ -0,0 +1,152 @@
package nu.marginalia.crawl.retreival.fetcher;
import com.sun.net.httpserver.HttpServer;
import nu.marginalia.crawl.fetcher.Cookies;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import org.junit.jupiter.api.*;
import org.netpreserve.jwarc.WarcReader;
import org.netpreserve.jwarc.WarcRequest;
import org.netpreserve.jwarc.WarcResponse;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Tag("slow")
class WarcRecorderFakeServerTest {
static HttpServer server;
@BeforeAll
public static void setUpAll() throws IOException {
server = HttpServer.create(new InetSocketAddress("127.0.0.1", 14510), 10);
// This endpoint will finish sending the response immediately
server.createContext("/fast", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "text/html");
exchange.sendResponseHeaders(200, "<html><body>hello</body></html>".length());
try (var os = exchange.getResponseBody()) {
os.write("<html><body>hello</body></html>".getBytes());
os.flush();
}
exchange.close();
});
// This endpoint will take 10 seconds to finish sending the response,
// which should trigger a timeout in the client
server.createContext("/slow", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "text/html");
exchange.sendResponseHeaders(200, "<html><body>hello</body></html>:D".length());
try (var os = exchange.getResponseBody()) {
os.write("<html><body>hello</body></html>".getBytes());
os.flush();
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
os.write(":D".getBytes());
os.flush();
}
exchange.close();
});
server.start();
}
@AfterAll
public static void tearDownAll() {
server.stop(0);
}
Path fileNameWarc;
Path fileNameParquet;
WarcRecorder client;
HttpClient httpClient;
@BeforeEach
public void setUp() throws Exception {
httpClient = HttpClient.newBuilder().build();
fileNameWarc = Files.createTempFile("test", ".warc");
fileNameParquet = Files.createTempFile("test", ".parquet");
client = new WarcRecorder(fileNameWarc, new Cookies());
}
@AfterEach
public void tearDown() throws Exception {
client.close();
Files.delete(fileNameWarc);
}
@Test
public void fetchFast() throws Exception {
client.fetch(httpClient,
HttpRequest.newBuilder()
.uri(new java.net.URI("http://localhost:14510/fast"))
.timeout(Duration.ofSeconds(1))
.header("User-agent", "test.marginalia.nu")
.header("Accept-Encoding", "gzip")
.GET().build()
);
Map<String, String> sampleData = new HashMap<>();
try (var warcReader = new WarcReader(fileNameWarc)) {
warcReader.forEach(record -> {
if (record instanceof WarcRequest req) {
sampleData.put(record.type(), req.target());
}
if (record instanceof WarcResponse rsp) {
sampleData.put(record.type(), rsp.target());
}
});
}
System.out.println(sampleData);
}
@Test
public void fetchSlow() throws Exception {
Instant start = Instant.now();
client.fetch(httpClient,
HttpRequest.newBuilder()
.uri(new java.net.URI("http://localhost:14510/slow"))
.timeout(Duration.ofSeconds(1))
.header("User-agent", "test.marginalia.nu")
.header("Accept-Encoding", "gzip")
.GET().build()
);
Instant end = Instant.now();
Map<String, String> sampleData = new HashMap<>();
try (var warcReader = new WarcReader(fileNameWarc)) {
warcReader.forEach(record -> {
if (record instanceof WarcRequest req) {
sampleData.put(record.type(), req.target());
}
if (record instanceof WarcResponse rsp) {
sampleData.put(record.type(), rsp.target());
System.out.println(rsp.target());
}
});
}
System.out.println(sampleData);
// Timeout is set to 1 second, but the server will take 5 seconds to respond,
// so we expect the request to take 1s and change before it times out.
Assertions.assertTrue(Duration.between(start, end).toMillis() < 2000);
}
}