1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

2 Commits

Author SHA1 Message Date
Viktor Lofgren
982dcb28f0 (live-crawler) Use Apache HttpClient + code cleanup 2025-06-24 13:04:19 +02:00
Viktor Lofgren
fc686d8b2e (live-crawler) Fix startup race condition
The fix makes sure we wait for the feeds API to be available before fetching from it, so that the process doesn't crash on a cold system reboot.
2025-06-24 11:42:41 +02:00
7 changed files with 377 additions and 118 deletions

View File

@@ -11,6 +11,7 @@ import nu.marginalia.service.discovery.property.ServicePartition;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
import javax.annotation.CheckReturnValue; import javax.annotation.CheckReturnValue;
import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@@ -59,6 +60,11 @@ public class FeedsClient {
.forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList()))); .forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList())));
} }
public boolean waitReady(Duration duration) throws InterruptedException {
return channelPool.awaitChannel(duration);
}
/** Get the hash of the feed data, for identifying when the data has been updated */ /** Get the hash of the feed data, for identifying when the data has been updated */
public String getFeedDataHash() { public String getFeedDataHash() {
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash) return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash)

View File

@@ -50,6 +50,7 @@ dependencies {
implementation libs.notnull implementation libs.notnull
implementation libs.guava implementation libs.guava
implementation libs.httpclient
implementation dependencies.create(libs.guice.get()) { implementation dependencies.create(libs.guice.get()) {
exclude group: 'com.google.guava' exclude group: 'com.google.guava'
} }

View File

@@ -15,6 +15,7 @@ import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.db.DbDomainQueries; import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.db.DomainBlacklist; import nu.marginalia.db.DomainBlacklist;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.livecrawler.io.HttpClientProvider;
import nu.marginalia.loading.LoaderInputData; import nu.marginalia.loading.LoaderInputData;
import nu.marginalia.loading.documents.DocumentLoaderService; import nu.marginalia.loading.documents.DocumentLoaderService;
import nu.marginalia.loading.documents.KeywordLoaderService; import nu.marginalia.loading.documents.KeywordLoaderService;
@@ -32,12 +33,15 @@ import nu.marginalia.service.module.ServiceDiscoveryModule;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageBaseType; import nu.marginalia.storage.model.FileStorageBaseType;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.core5.io.CloseMode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.security.Security; import java.security.Security;
import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.HashMap; import java.util.HashMap;
@@ -74,7 +78,9 @@ public class LiveCrawlerMain extends ProcessMainClass {
DomainProcessor domainProcessor, DomainProcessor domainProcessor,
FileStorageService fileStorageService, FileStorageService fileStorageService,
KeywordLoaderService keywordLoaderService, KeywordLoaderService keywordLoaderService,
DocumentLoaderService documentLoaderService, DomainCoordinator domainCoordinator, HikariDataSource dataSource) DocumentLoaderService documentLoaderService,
DomainCoordinator domainCoordinator,
HikariDataSource dataSource)
throws Exception throws Exception
{ {
super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX); super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX);
@@ -148,7 +154,10 @@ public class LiveCrawlerMain extends ProcessMainClass {
} }
private void run() throws Exception { private void run() throws Exception {
Path basePath = fileStorageService.getStorageBase(FileStorageBaseType.STORAGE).asPath().resolve("live-crawl-data"); Path basePath = fileStorageService
.getStorageBase(FileStorageBaseType.STORAGE)
.asPath()
.resolve("live-crawl-data");
if (!Files.isDirectory(basePath)) { if (!Files.isDirectory(basePath)) {
Files.createDirectories(basePath); Files.createDirectories(basePath);
@@ -163,21 +172,38 @@ public class LiveCrawlerMain extends ProcessMainClass {
{ {
final Instant cutoff = Instant.now().minus(60, ChronoUnit.DAYS); final Instant cutoff = Instant.now().minus(60, ChronoUnit.DAYS);
/* ------------------------------------------------ */
/* Fetch the latest domains from the feeds database */
/* ------------------------------------------------ */
processHeartbeat.progress(LiveCrawlState.FETCH_LINKS); processHeartbeat.progress(LiveCrawlState.FETCH_LINKS);
Map<String, List<String>> urlsPerDomain = new HashMap<>(10_000); Map<String, List<String>> urlsPerDomain = new HashMap<>(10_000);
if (!feedsClient.waitReady(Duration.ofHours(1))) {
throw new RuntimeException("Feeds client never became ready, cannot proceed with live crawling");
}
feedsClient.getUpdatedDomains(cutoff, urlsPerDomain::put); feedsClient.getUpdatedDomains(cutoff, urlsPerDomain::put);
logger.info("Fetched data for {} domains", urlsPerDomain.size()); logger.info("Fetched data for {} domains", urlsPerDomain.size());
/* ------------------------------------- */
/* Prune the database from old entries */
/* ------------------------------------- */
processHeartbeat.progress(LiveCrawlState.PRUNE_DB); processHeartbeat.progress(LiveCrawlState.PRUNE_DB);
// Remove data that is too old
dataSet.prune(cutoff); dataSet.prune(cutoff);
/* ------------------------------------- */
/* Fetch the links for each domain */
/* ------------------------------------- */
processHeartbeat.progress(LiveCrawlState.CRAWLING); processHeartbeat.progress(LiveCrawlState.CRAWLING);
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainCoordinator, domainQueries, domainBlacklist); CloseableHttpClient client = HttpClientProvider.createClient();
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainCoordinator, domainQueries, client, domainBlacklist);
var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling")) var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling"))
{ {
for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) { for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) {
@@ -190,18 +216,29 @@ public class LiveCrawlerMain extends ProcessMainClass {
fetcher.scheduleRetrieval(domain, urls); fetcher.scheduleRetrieval(domain, urls);
} }
} }
finally {
client.close(CloseMode.GRACEFUL);
}
Path tempPath = dataSet.createWorkDir(); Path tempPath = dataSet.createWorkDir();
try { try {
/* ------------------------------------- */
/* Process the fetched links */
/* ------------------------------------- */
processHeartbeat.progress(LiveCrawlState.PROCESSING); processHeartbeat.progress(LiveCrawlState.PROCESSING);
try (var hb = heartbeat.createAdHocTaskHeartbeat("Processing"); try (var hb = heartbeat.createAdHocTaskHeartbeat("Processing");
var writer = new ConverterBatchWriter(tempPath, 0) var writer = new ConverterBatchWriter(tempPath, 0)
) { ) {
// Offset the documents' ordinals toward the upper range, to avoid an ID collisions with the // We need unique document ids that do not collide with the document id from the main index,
// main indexes (the maximum permissible for doc ordinal is value is 67_108_863, so this // so we offset the documents' ordinals toward the upper range.
// leaves us with a lot of headroom still) //
// The maximum permissible for doc ordinal is value is 67_108_863,
// so this leaves us with a lot of headroom still!
// Expected document count here is order of 10 :^)
writer.setOrdinalOffset(67_000_000); writer.setOrdinalOffset(67_000_000);
for (SerializableCrawlDataStream stream : hb.wrap("Processing", dataSet.getDataStreams())) { for (SerializableCrawlDataStream stream : hb.wrap("Processing", dataSet.getDataStreams())) {
@@ -209,10 +246,15 @@ public class LiveCrawlerMain extends ProcessMainClass {
} }
} }
/* ---------------------------------------------- */
/* Load the processed data into the link database */
/* and construct an index journal for the docs */
/* ---------------------------------------------- */
processHeartbeat.progress(LiveCrawlState.LOADING); processHeartbeat.progress(LiveCrawlState.LOADING);
LoaderInputData lid = new LoaderInputData(tempPath, 1); LoaderInputData lid = new LoaderInputData(tempPath, 1);
DomainIdRegistry domainIdRegistry = new DbDomainIdRegistry(dataSource); DomainIdRegistry domainIdRegistry = new DbDomainIdRegistry(dataSource);
keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, lid); keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, lid);
@@ -224,9 +266,16 @@ public class LiveCrawlerMain extends ProcessMainClass {
FileUtils.deleteDirectory(tempPath.toFile()); FileUtils.deleteDirectory(tempPath.toFile());
} }
// Construct the index
/* ------------------------------------- */
/* Finish up */
/* ------------------------------------- */
processHeartbeat.progress(LiveCrawlState.DONE); processHeartbeat.progress(LiveCrawlState.DONE);
// After we return from here, the LiveCrawlActor will trigger an index construction
// job. Unlike all the stuff we did in this process, it's identical to the real job
// so we don't need to do anything special from this process
} }
} }

View File

@@ -7,7 +7,6 @@ import nu.marginalia.contenttype.ContentType;
import nu.marginalia.contenttype.DocumentBodyToString; import nu.marginalia.contenttype.DocumentBodyToString;
import nu.marginalia.coordination.DomainCoordinator; import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.coordination.DomainLock; import nu.marginalia.coordination.DomainLock;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.retreival.CrawlDelayTimer; import nu.marginalia.crawl.retreival.CrawlDelayTimer;
import nu.marginalia.db.DbDomainQueries; import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.db.DomainBlacklist; import nu.marginalia.db.DomainBlacklist;
@@ -15,24 +14,21 @@ import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
import nu.marginalia.util.SimpleBlockingThreadPool; import nu.marginalia.util.SimpleBlockingThreadPool;
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
/** A simple link scraper that fetches URLs and stores them in a database, /** A simple link scraper that fetches URLs and stores them in a database,
* with no concept of a crawl frontier, WARC output, or other advanced features * with no concept of a crawl frontier, WARC output, or other advanced features
@@ -45,20 +41,21 @@ public class SimpleLinkScraper implements AutoCloseable {
private final LiveCrawlDataSet dataSet; private final LiveCrawlDataSet dataSet;
private final DbDomainQueries domainQueries; private final DbDomainQueries domainQueries;
private final DomainBlacklist domainBlacklist; private final DomainBlacklist domainBlacklist;
private final Duration connectTimeout = Duration.ofSeconds(10);
private final Duration readTimeout = Duration.ofSeconds(10);
private final DomainCoordinator domainCoordinator; private final DomainCoordinator domainCoordinator;
private final static int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 10 * 1024 * 1024); private final static int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 10 * 1024 * 1024);
private final HttpClient httpClient;
public SimpleLinkScraper(LiveCrawlDataSet dataSet, public SimpleLinkScraper(LiveCrawlDataSet dataSet,
DomainCoordinator domainCoordinator, DomainCoordinator domainCoordinator,
DbDomainQueries domainQueries, DbDomainQueries domainQueries,
HttpClient httpClient,
DomainBlacklist domainBlacklist) { DomainBlacklist domainBlacklist) {
this.dataSet = dataSet; this.dataSet = dataSet;
this.domainCoordinator = domainCoordinator; this.domainCoordinator = domainCoordinator;
this.domainQueries = domainQueries; this.domainQueries = domainQueries;
this.domainBlacklist = domainBlacklist; this.domainBlacklist = domainBlacklist;
this.httpClient = httpClient;
} }
public void scheduleRetrieval(EdgeDomain domain, List<String> urls) { public void scheduleRetrieval(EdgeDomain domain, List<String> urls) {
@@ -75,17 +72,19 @@ public class SimpleLinkScraper implements AutoCloseable {
EdgeUrl rootUrl = domain.toRootUrlHttps(); EdgeUrl rootUrl = domain.toRootUrlHttps();
List<EdgeUrl> relevantUrls = new ArrayList<>(); List<EdgeUrl> relevantUrls = new ArrayList<>(Math.max(1, urls.size()));
// Resolve absolute URLs
for (var url : urls) { for (var url : urls) {
Optional<EdgeUrl> optParsedUrl = lp.parseLink(rootUrl, url); Optional<EdgeUrl> optParsedUrl = lp.parseLink(rootUrl, url);
if (optParsedUrl.isEmpty()) {
if (optParsedUrl.isEmpty())
continue; continue;
}
if (dataSet.hasUrl(optParsedUrl.get())) { EdgeUrl absoluteUrl = optParsedUrl.get();
continue;
} if (!dataSet.hasUrl(absoluteUrl))
relevantUrls.add(optParsedUrl.get()); relevantUrls.add(absoluteUrl);
} }
if (relevantUrls.isEmpty()) { if (relevantUrls.isEmpty()) {
@@ -94,16 +93,10 @@ public class SimpleLinkScraper implements AutoCloseable {
int fetched = 0; int fetched = 0;
try (HttpClient client = HttpClient try (// throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove:
.newBuilder()
.connectTimeout(connectTimeout)
.followRedirects(HttpClient.Redirect.NEVER)
.version(HttpClient.Version.HTTP_2)
.build();
// throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove:
DomainLock lock = domainCoordinator.lockDomain(domain) DomainLock lock = domainCoordinator.lockDomain(domain)
) { ) {
SimpleRobotRules rules = fetchRobotsRules(rootUrl, client); SimpleRobotRules rules = fetchRobotsRules(rootUrl);
if (rules == null) { // I/O error fetching robots.txt if (rules == null) { // I/O error fetching robots.txt
// If we can't fetch the robots.txt, // If we can't fetch the robots.txt,
@@ -116,18 +109,19 @@ public class SimpleLinkScraper implements AutoCloseable {
CrawlDelayTimer timer = new CrawlDelayTimer(rules.getCrawlDelay()); CrawlDelayTimer timer = new CrawlDelayTimer(rules.getCrawlDelay());
for (var parsedUrl : relevantUrls) { for (var parsedUrl : relevantUrls) {
if (!rules.isAllowed(parsedUrl.toString())) { if (!rules.isAllowed(parsedUrl.toString())) {
maybeFlagAsBad(parsedUrl); maybeFlagAsBad(parsedUrl);
continue; continue;
} }
switch (fetchUrl(domainId, parsedUrl, timer, client)) { switch (fetchUrl(domainId, parsedUrl, timer)) {
case FetchResult.Success(int id, EdgeUrl docUrl, String body, String headers) -> { case FetchResult.Success(int id, EdgeUrl docUrl, String body, String headers) -> {
dataSet.saveDocument(id, docUrl, body, headers, ""); dataSet.saveDocument(id, docUrl, body, headers, "");
fetched++; fetched++;
} }
case FetchResult.Error(EdgeUrl docUrl) -> maybeFlagAsBad(docUrl); case FetchResult.Error(EdgeUrl docUrl) -> {
maybeFlagAsBad(docUrl);
}
} }
} }
} }
@@ -150,111 +144,107 @@ public class SimpleLinkScraper implements AutoCloseable {
} }
@Nullable @Nullable
private SimpleRobotRules fetchRobotsRules(EdgeUrl rootUrl, HttpClient client) throws IOException, InterruptedException, URISyntaxException { private SimpleRobotRules fetchRobotsRules(EdgeUrl rootUrl) throws URISyntaxException {
var robotsRequest = HttpRequest.newBuilder(rootUrl.withPathAndParam("/robots.txt", null).asURI()) ClassicHttpRequest request = ClassicRequestBuilder.get(rootUrl.withPathAndParam("/robots.txt", null).asURI())
.GET() .setHeader("User-Agent", WmsaHome.getUserAgent().uaString())
.header("User-Agent", WmsaHome.getUserAgent().uaString()) .setHeader("Accept-Encoding", "gzip")
.header("Accept-Encoding","gzip") .build();
.timeout(readTimeout);
// Fetch the robots.txt
try { try {
SimpleRobotRulesParser parser = new SimpleRobotRulesParser(); return httpClient.execute(request, rsp -> {
HttpResponse<byte[]> robotsTxt = client.send(robotsRequest.build(), HttpResponse.BodyHandlers.ofByteArray()); if (rsp.getEntity() == null) {
return null;
if (robotsTxt.statusCode() == 200) { }
return parser.parseContent(rootUrl.toString(), try {
getResponseData(robotsTxt), if (rsp.getCode() == 200) {
robotsTxt.headers().firstValue("Content-Type").orElse("text/plain"), var contentTypeHeader = rsp.getFirstHeader("Content-Type");
WmsaHome.getUserAgent().uaIdentifier()); if (contentTypeHeader == null) {
return null; // No content type header, can't parse
}
return new SimpleRobotRulesParser().parseContent(
rootUrl.toString(),
EntityUtils.toByteArray(rsp.getEntity()),
contentTypeHeader.getValue(),
WmsaHome.getUserAgent().uaIdentifier()
);
} else if (rsp.getCode() == 404) {
return new SimpleRobotRules(SimpleRobotRules.RobotRulesMode.ALLOW_ALL);
}
} finally {
EntityUtils.consumeQuietly(rsp.getEntity());
}
return null;
});
}
catch (IOException e) {
logger.error("Error fetching robots.txt for {}: {}", rootUrl, e.getMessage());
return null; // I/O error fetching robots.txt
}
finally {
try {
TimeUnit.SECONDS.sleep(1);
} }
else if (robotsTxt.statusCode() == 404) { catch (InterruptedException e) {
return new SimpleRobotRules(SimpleRobotRules.RobotRulesMode.ALLOW_ALL); Thread.currentThread().interrupt();
throw new RuntimeException(e);
} }
} }
catch (IOException ex) {
logger.error("Error fetching robots.txt for {}: {} {}", rootUrl, ex.getClass().getSimpleName(), ex.getMessage());
}
return null;
} }
/** Fetch a URL and store it in the database /** Fetch a URL and store it in the database
*/ */
private FetchResult fetchUrl(int domainId, EdgeUrl parsedUrl, CrawlDelayTimer timer, HttpClient client) throws Exception { private FetchResult fetchUrl(int domainId, EdgeUrl parsedUrl, CrawlDelayTimer timer) throws Exception {
timer.waitFetchDelay(); ClassicHttpRequest request = ClassicRequestBuilder.get(parsedUrl.asURI())
.setHeader("User-Agent", WmsaHome.getUserAgent().uaString())
HttpRequest request = HttpRequest.newBuilder(parsedUrl.asURI()) .setHeader("Accept", "text/html")
.GET() .setHeader("Accept-Encoding", "gzip")
.header("User-Agent", WmsaHome.getUserAgent().uaString())
.header("Accept", "text/html")
.header("Accept-Encoding", "gzip")
.timeout(readTimeout)
.build(); .build();
try { try {
HttpResponse<byte[]> response = client.send(request, HttpResponse.BodyHandlers.ofByteArray()); return httpClient.execute(request, rsp -> {
try {
if (rsp.getCode() == 200) {
String contentType = rsp.getFirstHeader("Content-Type").getValue();
if (!contentType.toLowerCase().startsWith("text/html")) {
return new FetchResult.Error(parsedUrl);
}
// Handle rate limiting by waiting and retrying once byte[] body = EntityUtils.toByteArray(rsp.getEntity(), MAX_SIZE);
if (response.statusCode() == 429) {
timer.waitRetryDelay(new HttpFetcherImpl.RateLimitException(
response.headers().firstValue("Retry-After").orElse("5")
));
response = client.send(request, HttpResponse.BodyHandlers.ofByteArray());
}
String contentType = response.headers().firstValue("Content-Type").orElse("").toLowerCase(); String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), body);
if (response.statusCode() == 200) { StringBuilder headersStr = new StringBuilder();
if (!contentType.toLowerCase().startsWith("text/html")) { for (var header : rsp.getHeaders()) {
return new FetchResult.Error(parsedUrl); headersStr.append(header.getName()).append(": ").append(header.getValue()).append("\n");
}
return new FetchResult.Success(domainId, parsedUrl, bodyText, headersStr.toString());
}
} finally {
if (rsp.getEntity() != null) {
EntityUtils.consumeQuietly(rsp.getEntity());
}
} }
return new FetchResult.Error(parsedUrl);
byte[] body = getResponseData(response); });
if (body.length > MAX_SIZE) {
return new FetchResult.Error(parsedUrl);
}
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), body);
return new FetchResult.Success(domainId, parsedUrl, bodyText, headersToString(response.headers()));
}
} }
catch (IOException ex) { catch (IOException e) {
// We don't want a full stack trace on every error, as it's quite common and very noisy logger.error("Error fetching {}: {}", parsedUrl, e.getMessage());
logger.error("Error fetching URL {}: {} {}", parsedUrl, ex.getClass().getSimpleName(), ex.getMessage()); // If we can't fetch the URL, we return an error result
// so that the caller can decide what to do with it.
}
finally {
timer.waitFetchDelay();
} }
return new FetchResult.Error(parsedUrl); return new FetchResult.Error(parsedUrl);
} }
private byte[] getResponseData(HttpResponse<byte[]> response) throws IOException {
String encoding = response.headers().firstValue("Content-Encoding").orElse("");
if ("gzip".equals(encoding)) {
try (var stream = new GZIPInputStream(new ByteArrayInputStream(response.body()))) {
return stream.readAllBytes();
}
}
else {
return response.body();
}
}
sealed interface FetchResult { sealed interface FetchResult {
record Success(int domainId, EdgeUrl url, String body, String headers) implements FetchResult {} record Success(int domainId, EdgeUrl url, String body, String headers) implements FetchResult {}
record Error(EdgeUrl url) implements FetchResult {} record Error(EdgeUrl url) implements FetchResult {}
} }
private String headersToString(HttpHeaders headers) {
StringBuilder headersStr = new StringBuilder();
headers.map().forEach((k, v) -> {
headersStr.append(k).append(": ").append(v).append("\n");
});
return headersStr.toString();
}
@Override @Override
public void close() throws Exception { public void close() throws Exception {
pool.shutDown(); pool.shutDown();

View File

@@ -0,0 +1,126 @@
package nu.marginalia.livecrawler.io;
import com.google.inject.Provider;
import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.cookie.StandardCookieSpec;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.core5.http.HeaderElement;
import org.apache.hc.core5.http.HeaderElements;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.io.SocketConfig;
import org.apache.hc.core5.http.message.MessageSupport;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
public class HttpClientProvider implements Provider<HttpClient> {
private static final HttpClient client;
private static PoolingHttpClientConnectionManager connectionManager;
private static final Logger logger = LoggerFactory.getLogger(HttpClientProvider.class);
static {
try {
client = createClient();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static CloseableHttpClient createClient() throws NoSuchAlgorithmException, KeyManagementException {
final ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setSocketTimeout(15, TimeUnit.SECONDS)
.setConnectTimeout(15, TimeUnit.SECONDS)
.setValidateAfterInactivity(TimeValue.ofSeconds(5))
.build();
connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
.setMaxConnPerRoute(2)
.setMaxConnTotal(50)
.setDefaultConnectionConfig(connectionConfig)
.build();
connectionManager.setDefaultSocketConfig(SocketConfig.custom()
.setSoLinger(TimeValue.ofSeconds(-1))
.setSoTimeout(Timeout.ofSeconds(10))
.build()
);
Thread.ofPlatform().daemon(true).start(() -> {
try {
for (;;) {
TimeUnit.SECONDS.sleep(15);
logger.info("Connection pool stats: {}", connectionManager.getTotalStats());
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
final RequestConfig defaultRequestConfig = RequestConfig.custom()
.setCookieSpec(StandardCookieSpec.IGNORE)
.setResponseTimeout(10, TimeUnit.SECONDS)
.setConnectionRequestTimeout(5, TimeUnit.MINUTES)
.build();
return HttpClients.custom()
.setConnectionManager(connectionManager)
.setRetryStrategy(new RetryStrategy())
.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
// Default keep-alive duration is 3 minutes, but this is too long for us,
// as we are either going to re-use it fairly quickly or close it for a long time.
//
// So we set it to 30 seconds or clamp the server-provided value to a minimum of 10 seconds.
private static final TimeValue defaultValue = TimeValue.ofSeconds(30);
@Override
public TimeValue getKeepAliveDuration(HttpResponse response, HttpContext context) {
final Iterator<HeaderElement> it = MessageSupport.iterate(response, HeaderElements.KEEP_ALIVE);
while (it.hasNext()) {
final HeaderElement he = it.next();
final String param = he.getName();
final String value = he.getValue();
if (value == null)
continue;
if (!"timeout".equalsIgnoreCase(param))
continue;
try {
long timeout = Long.parseLong(value);
timeout = Math.clamp(timeout, 30, defaultValue.toSeconds());
return TimeValue.ofSeconds(timeout);
} catch (final NumberFormatException ignore) {
break;
}
}
return defaultValue;
}
})
.disableRedirectHandling()
.setDefaultRequestConfig(defaultRequestConfig)
.build();
}
@Override
public HttpClient get() {
return client;
}
}

View File

@@ -0,0 +1,79 @@
package nu.marginalia.livecrawler.io;
import org.apache.hc.client5.http.HttpHostConnectException;
import org.apache.hc.client5.http.HttpRequestRetryStrategy;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.util.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
import java.io.IOException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
public class RetryStrategy implements HttpRequestRetryStrategy {
private static final Logger logger = LoggerFactory.getLogger(RetryStrategy.class);
@Override
public boolean retryRequest(HttpRequest request, IOException exception, int executionCount, HttpContext context) {
return switch (exception) {
case SocketTimeoutException ste -> false;
case SSLException ssle -> false;
case UnknownHostException uhe -> false;
case HttpHostConnectException ex -> executionCount < 2;
case SocketException ex -> executionCount < 2;
default -> executionCount <= 3;
};
}
@Override
public boolean retryRequest(HttpResponse response, int executionCount, HttpContext context) {
return switch (response.getCode()) {
case 500, 503 -> executionCount <= 2;
case 429 -> executionCount <= 3;
default -> false;
};
}
@Override
public TimeValue getRetryInterval(HttpRequest request, IOException exception, int executionCount, HttpContext context) {
return TimeValue.ofSeconds(1);
}
@Override
public TimeValue getRetryInterval(HttpResponse response, int executionCount, HttpContext context) {
int statusCode = response.getCode();
// Give 503 a bit more time
if (statusCode == 503) return TimeValue.ofSeconds(5);
if (statusCode == 429) {
// get the Retry-After header
var retryAfterHeader = response.getFirstHeader("Retry-After");
if (retryAfterHeader == null) {
return TimeValue.ofSeconds(3);
}
String retryAfter = retryAfterHeader.getValue();
if (retryAfter == null) {
return TimeValue.ofSeconds(2);
}
try {
int retryAfterTime = Integer.parseInt(retryAfter);
retryAfterTime = Math.clamp(retryAfterTime, 1, 5);
return TimeValue.ofSeconds(retryAfterTime);
} catch (NumberFormatException e) {
logger.warn("Invalid Retry-After header: {}", retryAfter);
}
}
return TimeValue.ofSeconds(2);
}
}

View File

@@ -3,10 +3,13 @@ package nu.marginalia.livecrawler;
import nu.marginalia.coordination.LocalDomainCoordinator; import nu.marginalia.coordination.LocalDomainCoordinator;
import nu.marginalia.db.DomainBlacklistImpl; import nu.marginalia.db.DomainBlacklistImpl;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.livecrawler.io.HttpClientProvider;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.crawldata.CrawledDocument; import nu.marginalia.model.crawldata.CrawledDocument;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.core5.io.CloseMode;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@@ -16,29 +19,34 @@ import org.mockito.Mockito;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List; import java.util.List;
class SimpleLinkScraperTest { class SimpleLinkScraperTest {
private Path tempDir; private Path tempDir;
private LiveCrawlDataSet dataSet; private LiveCrawlDataSet dataSet;
private CloseableHttpClient httpClient;
@BeforeEach @BeforeEach
public void setUp() throws IOException, SQLException { public void setUp() throws IOException, SQLException, NoSuchAlgorithmException, KeyManagementException {
tempDir = Files.createTempDirectory(getClass().getSimpleName()); tempDir = Files.createTempDirectory(getClass().getSimpleName());
dataSet = new LiveCrawlDataSet(tempDir); dataSet = new LiveCrawlDataSet(tempDir);
httpClient = HttpClientProvider.createClient();
} }
@AfterEach @AfterEach
public void tearDown() throws Exception { public void tearDown() throws Exception {
dataSet.close(); dataSet.close();
httpClient.close(CloseMode.IMMEDIATE);
FileUtils.deleteDirectory(tempDir.toFile()); FileUtils.deleteDirectory(tempDir.toFile());
} }
@Test @Test
public void testRetrieveNow() throws Exception { public void testRetrieveNow() throws Exception {
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(), null, Mockito.mock(DomainBlacklistImpl.class)); var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(), null, httpClient, Mockito.mock(DomainBlacklistImpl.class));
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/")); int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
Assertions.assertEquals(1, fetched); Assertions.assertEquals(1, fetched);
@@ -58,7 +66,7 @@ class SimpleLinkScraperTest {
@Test @Test
public void testRetrieveNow_Redundant() throws Exception { public void testRetrieveNow_Redundant() throws Exception {
dataSet.saveDocument(1, new EdgeUrl("https://www.marginalia.nu/"), "<html>", "", "127.0.0.1"); dataSet.saveDocument(1, new EdgeUrl("https://www.marginalia.nu/"), "<html>", "", "127.0.0.1");
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(),null, Mockito.mock(DomainBlacklistImpl.class)); var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(),null, httpClient, Mockito.mock(DomainBlacklistImpl.class));
// If the requested URL is already in the dataSet, we retrieveNow should shortcircuit and not fetch anything // If the requested URL is already in the dataSet, we retrieveNow should shortcircuit and not fetch anything
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/")); int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));