mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 07:32:38 +02:00
Compare commits
4 Commits
deploy-026
...
deploy-026
Author | SHA1 | Date | |
---|---|---|---|
|
982dcb28f0 | ||
|
fc686d8b2e | ||
|
69ef0f334a | ||
|
446746f3bd |
@@ -20,9 +20,7 @@ public enum NodeProfile {
|
||||
}
|
||||
|
||||
public boolean permitBatchCrawl() {
|
||||
return isBatchCrawl() ||isMixed();
|
||||
}
|
||||
public boolean permitSideload() {
|
||||
return isMixed() || isSideload();
|
||||
return isBatchCrawl() || isMixed();
|
||||
}
|
||||
public boolean permitSideload() { return isSideload() || isMixed(); }
|
||||
}
|
||||
|
@@ -11,6 +11,7 @@ import nu.marginalia.service.discovery.property.ServicePartition;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
|
||||
import javax.annotation.CheckReturnValue;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@@ -59,6 +60,11 @@ public class FeedsClient {
|
||||
.forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList())));
|
||||
}
|
||||
|
||||
public boolean waitReady(Duration duration) throws InterruptedException {
|
||||
return channelPool.awaitChannel(duration);
|
||||
}
|
||||
|
||||
|
||||
/** Get the hash of the feed data, for identifying when the data has been updated */
|
||||
public String getFeedDataHash() {
|
||||
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash)
|
||||
|
@@ -35,6 +35,7 @@ dependencies {
|
||||
implementation libs.bundles.slf4j
|
||||
implementation libs.commons.lang3
|
||||
implementation libs.commons.io
|
||||
implementation libs.httpclient
|
||||
implementation libs.wiremock
|
||||
|
||||
implementation libs.prometheus
|
||||
|
@@ -20,19 +20,36 @@ import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorage;
|
||||
import nu.marginalia.storage.model.FileStorageType;
|
||||
import nu.marginalia.util.SimpleBlockingThreadPool;
|
||||
import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
|
||||
import org.apache.hc.client5.http.classic.HttpClient;
|
||||
import org.apache.hc.client5.http.config.ConnectionConfig;
|
||||
import org.apache.hc.client5.http.config.RequestConfig;
|
||||
import org.apache.hc.client5.http.cookie.StandardCookieSpec;
|
||||
import org.apache.hc.client5.http.impl.classic.HttpClients;
|
||||
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
|
||||
import org.apache.hc.core5.http.Header;
|
||||
import org.apache.hc.core5.http.HeaderElement;
|
||||
import org.apache.hc.core5.http.HeaderElements;
|
||||
import org.apache.hc.core5.http.HttpResponse;
|
||||
import org.apache.hc.core5.http.io.SocketConfig;
|
||||
import org.apache.hc.core5.http.io.entity.EntityUtils;
|
||||
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
|
||||
import org.apache.hc.core5.http.message.MessageSupport;
|
||||
import org.apache.hc.core5.http.protocol.HttpContext;
|
||||
import org.apache.hc.core5.util.TimeValue;
|
||||
import org.apache.hc.core5.util.Timeout;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.sql.SQLException;
|
||||
import java.time.*;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
@@ -55,6 +72,8 @@ public class FeedFetcherService {
|
||||
|
||||
private final DomainCoordinator domainCoordinator;
|
||||
|
||||
private final HttpClient httpClient;
|
||||
|
||||
private volatile boolean updating;
|
||||
|
||||
@Inject
|
||||
@@ -71,6 +90,83 @@ public class FeedFetcherService {
|
||||
this.serviceHeartbeat = serviceHeartbeat;
|
||||
this.executorClient = executorClient;
|
||||
this.domainCoordinator = domainCoordinator;
|
||||
|
||||
final ConnectionConfig connectionConfig = ConnectionConfig.custom()
|
||||
.setSocketTimeout(15, TimeUnit.SECONDS)
|
||||
.setConnectTimeout(15, TimeUnit.SECONDS)
|
||||
.setValidateAfterInactivity(TimeValue.ofSeconds(5))
|
||||
.build();
|
||||
|
||||
|
||||
var connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
|
||||
.setMaxConnPerRoute(2)
|
||||
.setMaxConnTotal(50)
|
||||
.setDefaultConnectionConfig(connectionConfig)
|
||||
.build();
|
||||
|
||||
connectionManager.setDefaultSocketConfig(SocketConfig.custom()
|
||||
.setSoLinger(TimeValue.ofSeconds(-1))
|
||||
.setSoTimeout(Timeout.ofSeconds(10))
|
||||
.build()
|
||||
);
|
||||
|
||||
Thread.ofPlatform().daemon(true).start(() -> {
|
||||
try {
|
||||
for (;;) {
|
||||
TimeUnit.SECONDS.sleep(15);
|
||||
logger.info("Connection pool stats: {}", connectionManager.getTotalStats());
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
});
|
||||
|
||||
final RequestConfig defaultRequestConfig = RequestConfig.custom()
|
||||
.setCookieSpec(StandardCookieSpec.IGNORE)
|
||||
.setResponseTimeout(10, TimeUnit.SECONDS)
|
||||
.setConnectionRequestTimeout(5, TimeUnit.MINUTES)
|
||||
.build();
|
||||
|
||||
httpClient = HttpClients.custom()
|
||||
.setDefaultRequestConfig(defaultRequestConfig)
|
||||
.setConnectionManager(connectionManager)
|
||||
.setUserAgent(WmsaHome.getUserAgent().uaIdentifier())
|
||||
.setConnectionManager(connectionManager)
|
||||
.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
|
||||
// Default keep-alive duration is 3 minutes, but this is too long for us,
|
||||
// as we are either going to re-use it fairly quickly or close it for a long time.
|
||||
//
|
||||
// So we set it to 30 seconds or clamp the server-provided value to a minimum of 10 seconds.
|
||||
private static final TimeValue defaultValue = TimeValue.ofSeconds(30);
|
||||
|
||||
@Override
|
||||
public TimeValue getKeepAliveDuration(HttpResponse response, HttpContext context) {
|
||||
final Iterator<HeaderElement> it = MessageSupport.iterate(response, HeaderElements.KEEP_ALIVE);
|
||||
|
||||
while (it.hasNext()) {
|
||||
final HeaderElement he = it.next();
|
||||
final String param = he.getName();
|
||||
final String value = he.getValue();
|
||||
|
||||
if (value == null)
|
||||
continue;
|
||||
if (!"timeout".equalsIgnoreCase(param))
|
||||
continue;
|
||||
|
||||
try {
|
||||
long timeout = Long.parseLong(value);
|
||||
timeout = Math.clamp(timeout, 30, defaultValue.toSeconds());
|
||||
return TimeValue.ofSeconds(timeout);
|
||||
} catch (final NumberFormatException ignore) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return defaultValue;
|
||||
}
|
||||
})
|
||||
.build();
|
||||
|
||||
}
|
||||
|
||||
public enum UpdateMode {
|
||||
@@ -86,13 +182,7 @@ public class FeedFetcherService {
|
||||
|
||||
|
||||
try (FeedDbWriter writer = feedDb.createWriter();
|
||||
HttpClient client = HttpClient.newBuilder()
|
||||
.connectTimeout(Duration.ofSeconds(15))
|
||||
.executor(Executors.newCachedThreadPool())
|
||||
.followRedirects(HttpClient.Redirect.NORMAL)
|
||||
.version(HttpClient.Version.HTTP_2)
|
||||
.build();
|
||||
ExecutorService fetchExecutor = Executors.newCachedThreadPool();
|
||||
ExecutorService fetchExecutor = Executors.newVirtualThreadPerTaskExecutor();
|
||||
FeedJournal feedJournal = FeedJournal.create();
|
||||
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds")
|
||||
) {
|
||||
@@ -137,7 +227,8 @@ public class FeedFetcherService {
|
||||
|
||||
FetchResult feedData;
|
||||
try (DomainLock domainLock = domainCoordinator.lockDomain(new EdgeDomain(feed.domain()))) {
|
||||
feedData = fetchFeedData(feed, client, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
|
||||
feedData = fetchFeedData(feed, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
|
||||
TimeUnit.SECONDS.sleep(1); // Sleep before we yield the lock to avoid hammering the server from multiple processes
|
||||
} catch (Exception ex) {
|
||||
feedData = new FetchResult.TransientError();
|
||||
}
|
||||
@@ -216,7 +307,6 @@ public class FeedFetcherService {
|
||||
}
|
||||
|
||||
private FetchResult fetchFeedData(FeedDefinition feed,
|
||||
HttpClient client,
|
||||
ExecutorService executorService,
|
||||
@Nullable String ifModifiedSinceDate,
|
||||
@Nullable String ifNoneMatchTag)
|
||||
@@ -224,59 +314,63 @@ public class FeedFetcherService {
|
||||
try {
|
||||
URI uri = new URI(feed.feedUrl());
|
||||
|
||||
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
|
||||
.GET()
|
||||
.uri(uri)
|
||||
.header("User-Agent", WmsaHome.getUserAgent().uaIdentifier())
|
||||
.header("Accept-Encoding", "gzip")
|
||||
.header("Accept", "text/*, */*;q=0.9")
|
||||
.timeout(Duration.ofSeconds(15))
|
||||
;
|
||||
var requestBuilder = ClassicRequestBuilder.get(uri)
|
||||
.setHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier())
|
||||
.setHeader("Accept-Encoding", "gzip")
|
||||
.setHeader("Accept", "text/*, */*;q=0.9");
|
||||
|
||||
// Set the If-Modified-Since or If-None-Match headers if we have them
|
||||
// though since there are certain idiosyncrasies in server implementations,
|
||||
// we avoid setting both at the same time as that may turn a 304 into a 200.
|
||||
if (ifNoneMatchTag != null) {
|
||||
requestBuilder.header("If-None-Match", ifNoneMatchTag);
|
||||
requestBuilder.addHeader("If-None-Match", ifNoneMatchTag);
|
||||
} else if (ifModifiedSinceDate != null) {
|
||||
requestBuilder.header("If-Modified-Since", ifModifiedSinceDate);
|
||||
requestBuilder.addHeader("If-Modified-Since", ifModifiedSinceDate);
|
||||
}
|
||||
|
||||
return httpClient.execute(requestBuilder.build(), rsp -> {
|
||||
try {
|
||||
logger.info("Code: {}, URL: {}", rsp.getCode(), uri);
|
||||
|
||||
HttpRequest getRequest = requestBuilder.build();
|
||||
switch (rsp.getCode()) {
|
||||
case 200 -> {
|
||||
if (rsp.getEntity() == null) {
|
||||
return new FetchResult.TransientError(); // No content to read, treat as transient error
|
||||
}
|
||||
byte[] responseData = EntityUtils.toByteArray(rsp.getEntity());
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
// Decode the response body based on the Content-Type header
|
||||
Header contentTypeHeader = rsp.getFirstHeader("Content-Type");
|
||||
if (contentTypeHeader == null) {
|
||||
return new FetchResult.TransientError();
|
||||
}
|
||||
String contentType = contentTypeHeader.getValue();
|
||||
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), responseData);
|
||||
|
||||
/* Note we need to use an executor to time-limit the send() method in HttpClient, as
|
||||
* its support for timeouts only applies to the time until response starts to be received,
|
||||
* and does not catch the case when the server starts to send data but then hangs.
|
||||
*/
|
||||
HttpResponse<byte[]> rs = executorService.submit(
|
||||
() -> client.send(getRequest, HttpResponse.BodyHandlers.ofByteArray()))
|
||||
.get(15, TimeUnit.SECONDS);
|
||||
// Grab the ETag header if it exists
|
||||
Header etagHeader = rsp.getFirstHeader("ETag");
|
||||
String newEtagValue = etagHeader == null ? null : etagHeader.getValue();
|
||||
|
||||
if (rs.statusCode() == 429) { // Too Many Requests
|
||||
int retryAfter = Integer.parseInt(rs.headers().firstValue("Retry-After").orElse("2"));
|
||||
Thread.sleep(Duration.ofSeconds(Math.clamp(retryAfter, 1, 5)));
|
||||
continue;
|
||||
}
|
||||
|
||||
String newEtagValue = rs.headers().firstValue("ETag").orElse("");
|
||||
|
||||
return switch (rs.statusCode()) {
|
||||
case 200 -> {
|
||||
byte[] responseData = getResponseData(rs);
|
||||
|
||||
String contentType = rs.headers().firstValue("Content-Type").orElse("");
|
||||
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), responseData);
|
||||
|
||||
yield new FetchResult.Success(bodyText, newEtagValue);
|
||||
return new FetchResult.Success(bodyText, newEtagValue);
|
||||
}
|
||||
case 304 -> {
|
||||
return new FetchResult.NotModified(); // via If-Modified-Since semantics
|
||||
}
|
||||
case 404 -> {
|
||||
return new FetchResult.PermanentError(); // never try again
|
||||
}
|
||||
default -> {
|
||||
return new FetchResult.TransientError(); // we try again later
|
||||
}
|
||||
}
|
||||
case 304 -> new FetchResult.NotModified(); // via If-Modified-Since semantics
|
||||
case 404 -> new FetchResult.PermanentError(); // never try again
|
||||
default -> new FetchResult.TransientError(); // we try again later
|
||||
};
|
||||
}
|
||||
}
|
||||
catch (Exception ex) {
|
||||
return new FetchResult.PermanentError(); // treat as permanent error
|
||||
}
|
||||
finally {
|
||||
EntityUtils.consumeQuietly(rsp.getEntity());
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.debug("Error fetching feed", ex);
|
||||
@@ -285,19 +379,6 @@ public class FeedFetcherService {
|
||||
return new FetchResult.TransientError();
|
||||
}
|
||||
|
||||
private byte[] getResponseData(HttpResponse<byte[]> response) throws IOException {
|
||||
String encoding = response.headers().firstValue("Content-Encoding").orElse("");
|
||||
|
||||
if ("gzip".equals(encoding)) {
|
||||
try (var stream = new GZIPInputStream(new ByteArrayInputStream(response.body()))) {
|
||||
return stream.readAllBytes();
|
||||
}
|
||||
}
|
||||
else {
|
||||
return response.body();
|
||||
}
|
||||
}
|
||||
|
||||
public sealed interface FetchResult {
|
||||
record Success(String value, String etag) implements FetchResult {}
|
||||
record NotModified() implements FetchResult {}
|
||||
|
@@ -5,6 +5,8 @@ import com.google.inject.Guice;
|
||||
import com.google.inject.name.Names;
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import nu.marginalia.coordination.DomainCoordinator;
|
||||
import nu.marginalia.coordination.LocalDomainCoordinator;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.rss.db.FeedDb;
|
||||
import nu.marginalia.rss.model.FeedItems;
|
||||
@@ -82,6 +84,7 @@ class FeedFetcherServiceTest extends AbstractModule {
|
||||
}
|
||||
|
||||
public void configure() {
|
||||
bind(DomainCoordinator.class).to(LocalDomainCoordinator.class);
|
||||
bind(HikariDataSource.class).toInstance(dataSource);
|
||||
bind(ServiceRegistryIf.class).toInstance(Mockito.mock(ServiceRegistryIf.class));
|
||||
bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration(ServiceId.Executor, 1, "", "", 0, UUID.randomUUID()));
|
||||
|
@@ -50,6 +50,7 @@ dependencies {
|
||||
|
||||
implementation libs.notnull
|
||||
implementation libs.guava
|
||||
implementation libs.httpclient
|
||||
implementation dependencies.create(libs.guice.get()) {
|
||||
exclude group: 'com.google.guava'
|
||||
}
|
||||
|
@@ -15,6 +15,7 @@ import nu.marginalia.coordination.DomainCoordinator;
|
||||
import nu.marginalia.db.DbDomainQueries;
|
||||
import nu.marginalia.db.DomainBlacklist;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.livecrawler.io.HttpClientProvider;
|
||||
import nu.marginalia.loading.LoaderInputData;
|
||||
import nu.marginalia.loading.documents.DocumentLoaderService;
|
||||
import nu.marginalia.loading.documents.KeywordLoaderService;
|
||||
@@ -32,12 +33,15 @@ import nu.marginalia.service.module.ServiceDiscoveryModule;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorageBaseType;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
||||
import org.apache.hc.core5.io.CloseMode;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.security.Security;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.HashMap;
|
||||
@@ -74,7 +78,9 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
||||
DomainProcessor domainProcessor,
|
||||
FileStorageService fileStorageService,
|
||||
KeywordLoaderService keywordLoaderService,
|
||||
DocumentLoaderService documentLoaderService, DomainCoordinator domainCoordinator, HikariDataSource dataSource)
|
||||
DocumentLoaderService documentLoaderService,
|
||||
DomainCoordinator domainCoordinator,
|
||||
HikariDataSource dataSource)
|
||||
throws Exception
|
||||
{
|
||||
super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX);
|
||||
@@ -148,7 +154,10 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
||||
}
|
||||
|
||||
private void run() throws Exception {
|
||||
Path basePath = fileStorageService.getStorageBase(FileStorageBaseType.STORAGE).asPath().resolve("live-crawl-data");
|
||||
Path basePath = fileStorageService
|
||||
.getStorageBase(FileStorageBaseType.STORAGE)
|
||||
.asPath()
|
||||
.resolve("live-crawl-data");
|
||||
|
||||
if (!Files.isDirectory(basePath)) {
|
||||
Files.createDirectories(basePath);
|
||||
@@ -163,21 +172,38 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
||||
{
|
||||
final Instant cutoff = Instant.now().minus(60, ChronoUnit.DAYS);
|
||||
|
||||
/* ------------------------------------------------ */
|
||||
/* Fetch the latest domains from the feeds database */
|
||||
/* ------------------------------------------------ */
|
||||
|
||||
processHeartbeat.progress(LiveCrawlState.FETCH_LINKS);
|
||||
|
||||
Map<String, List<String>> urlsPerDomain = new HashMap<>(10_000);
|
||||
if (!feedsClient.waitReady(Duration.ofHours(1))) {
|
||||
throw new RuntimeException("Feeds client never became ready, cannot proceed with live crawling");
|
||||
}
|
||||
feedsClient.getUpdatedDomains(cutoff, urlsPerDomain::put);
|
||||
|
||||
logger.info("Fetched data for {} domains", urlsPerDomain.size());
|
||||
|
||||
|
||||
/* ------------------------------------- */
|
||||
/* Prune the database from old entries */
|
||||
/* ------------------------------------- */
|
||||
|
||||
processHeartbeat.progress(LiveCrawlState.PRUNE_DB);
|
||||
|
||||
// Remove data that is too old
|
||||
dataSet.prune(cutoff);
|
||||
|
||||
|
||||
/* ------------------------------------- */
|
||||
/* Fetch the links for each domain */
|
||||
/* ------------------------------------- */
|
||||
|
||||
processHeartbeat.progress(LiveCrawlState.CRAWLING);
|
||||
|
||||
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainCoordinator, domainQueries, domainBlacklist);
|
||||
CloseableHttpClient client = HttpClientProvider.createClient();
|
||||
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainCoordinator, domainQueries, client, domainBlacklist);
|
||||
var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling"))
|
||||
{
|
||||
for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) {
|
||||
@@ -190,18 +216,29 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
||||
fetcher.scheduleRetrieval(domain, urls);
|
||||
}
|
||||
}
|
||||
finally {
|
||||
client.close(CloseMode.GRACEFUL);
|
||||
}
|
||||
|
||||
Path tempPath = dataSet.createWorkDir();
|
||||
|
||||
|
||||
try {
|
||||
/* ------------------------------------- */
|
||||
/* Process the fetched links */
|
||||
/* ------------------------------------- */
|
||||
|
||||
processHeartbeat.progress(LiveCrawlState.PROCESSING);
|
||||
|
||||
try (var hb = heartbeat.createAdHocTaskHeartbeat("Processing");
|
||||
var writer = new ConverterBatchWriter(tempPath, 0)
|
||||
) {
|
||||
// Offset the documents' ordinals toward the upper range, to avoid an ID collisions with the
|
||||
// main indexes (the maximum permissible for doc ordinal is value is 67_108_863, so this
|
||||
// leaves us with a lot of headroom still)
|
||||
// We need unique document ids that do not collide with the document id from the main index,
|
||||
// so we offset the documents' ordinals toward the upper range.
|
||||
//
|
||||
// The maximum permissible for doc ordinal is value is 67_108_863,
|
||||
// so this leaves us with a lot of headroom still!
|
||||
// Expected document count here is order of 10 :^)
|
||||
writer.setOrdinalOffset(67_000_000);
|
||||
|
||||
for (SerializableCrawlDataStream stream : hb.wrap("Processing", dataSet.getDataStreams())) {
|
||||
@@ -209,10 +246,15 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* ---------------------------------------------- */
|
||||
/* Load the processed data into the link database */
|
||||
/* and construct an index journal for the docs */
|
||||
/* ---------------------------------------------- */
|
||||
|
||||
processHeartbeat.progress(LiveCrawlState.LOADING);
|
||||
|
||||
LoaderInputData lid = new LoaderInputData(tempPath, 1);
|
||||
|
||||
DomainIdRegistry domainIdRegistry = new DbDomainIdRegistry(dataSource);
|
||||
|
||||
keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, lid);
|
||||
@@ -224,9 +266,16 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
||||
FileUtils.deleteDirectory(tempPath.toFile());
|
||||
}
|
||||
|
||||
// Construct the index
|
||||
|
||||
/* ------------------------------------- */
|
||||
/* Finish up */
|
||||
/* ------------------------------------- */
|
||||
|
||||
processHeartbeat.progress(LiveCrawlState.DONE);
|
||||
|
||||
// After we return from here, the LiveCrawlActor will trigger an index construction
|
||||
// job. Unlike all the stuff we did in this process, it's identical to the real job
|
||||
// so we don't need to do anything special from this process
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -7,7 +7,6 @@ import nu.marginalia.contenttype.ContentType;
|
||||
import nu.marginalia.contenttype.DocumentBodyToString;
|
||||
import nu.marginalia.coordination.DomainCoordinator;
|
||||
import nu.marginalia.coordination.DomainLock;
|
||||
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
|
||||
import nu.marginalia.crawl.retreival.CrawlDelayTimer;
|
||||
import nu.marginalia.db.DbDomainQueries;
|
||||
import nu.marginalia.db.DomainBlacklist;
|
||||
@@ -15,24 +14,21 @@ import nu.marginalia.link_parser.LinkParser;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.model.EdgeUrl;
|
||||
import nu.marginalia.util.SimpleBlockingThreadPool;
|
||||
import org.apache.hc.client5.http.classic.HttpClient;
|
||||
import org.apache.hc.core5.http.ClassicHttpRequest;
|
||||
import org.apache.hc.core5.http.io.entity.EntityUtils;
|
||||
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpHeaders;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
/** A simple link scraper that fetches URLs and stores them in a database,
|
||||
* with no concept of a crawl frontier, WARC output, or other advanced features
|
||||
@@ -45,20 +41,21 @@ public class SimpleLinkScraper implements AutoCloseable {
|
||||
private final LiveCrawlDataSet dataSet;
|
||||
private final DbDomainQueries domainQueries;
|
||||
private final DomainBlacklist domainBlacklist;
|
||||
private final Duration connectTimeout = Duration.ofSeconds(10);
|
||||
private final Duration readTimeout = Duration.ofSeconds(10);
|
||||
private final DomainCoordinator domainCoordinator;
|
||||
|
||||
private final static int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 10 * 1024 * 1024);
|
||||
private final HttpClient httpClient;
|
||||
|
||||
public SimpleLinkScraper(LiveCrawlDataSet dataSet,
|
||||
DomainCoordinator domainCoordinator,
|
||||
DbDomainQueries domainQueries,
|
||||
HttpClient httpClient,
|
||||
DomainBlacklist domainBlacklist) {
|
||||
this.dataSet = dataSet;
|
||||
this.domainCoordinator = domainCoordinator;
|
||||
this.domainQueries = domainQueries;
|
||||
this.domainBlacklist = domainBlacklist;
|
||||
this.httpClient = httpClient;
|
||||
}
|
||||
|
||||
public void scheduleRetrieval(EdgeDomain domain, List<String> urls) {
|
||||
@@ -75,17 +72,19 @@ public class SimpleLinkScraper implements AutoCloseable {
|
||||
|
||||
EdgeUrl rootUrl = domain.toRootUrlHttps();
|
||||
|
||||
List<EdgeUrl> relevantUrls = new ArrayList<>();
|
||||
List<EdgeUrl> relevantUrls = new ArrayList<>(Math.max(1, urls.size()));
|
||||
|
||||
// Resolve absolute URLs
|
||||
for (var url : urls) {
|
||||
Optional<EdgeUrl> optParsedUrl = lp.parseLink(rootUrl, url);
|
||||
if (optParsedUrl.isEmpty()) {
|
||||
|
||||
if (optParsedUrl.isEmpty())
|
||||
continue;
|
||||
}
|
||||
if (dataSet.hasUrl(optParsedUrl.get())) {
|
||||
continue;
|
||||
}
|
||||
relevantUrls.add(optParsedUrl.get());
|
||||
|
||||
EdgeUrl absoluteUrl = optParsedUrl.get();
|
||||
|
||||
if (!dataSet.hasUrl(absoluteUrl))
|
||||
relevantUrls.add(absoluteUrl);
|
||||
}
|
||||
|
||||
if (relevantUrls.isEmpty()) {
|
||||
@@ -94,16 +93,10 @@ public class SimpleLinkScraper implements AutoCloseable {
|
||||
|
||||
int fetched = 0;
|
||||
|
||||
try (HttpClient client = HttpClient
|
||||
.newBuilder()
|
||||
.connectTimeout(connectTimeout)
|
||||
.followRedirects(HttpClient.Redirect.NEVER)
|
||||
.version(HttpClient.Version.HTTP_2)
|
||||
.build();
|
||||
// throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove:
|
||||
try (// throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove:
|
||||
DomainLock lock = domainCoordinator.lockDomain(domain)
|
||||
) {
|
||||
SimpleRobotRules rules = fetchRobotsRules(rootUrl, client);
|
||||
SimpleRobotRules rules = fetchRobotsRules(rootUrl);
|
||||
|
||||
if (rules == null) { // I/O error fetching robots.txt
|
||||
// If we can't fetch the robots.txt,
|
||||
@@ -116,18 +109,19 @@ public class SimpleLinkScraper implements AutoCloseable {
|
||||
CrawlDelayTimer timer = new CrawlDelayTimer(rules.getCrawlDelay());
|
||||
|
||||
for (var parsedUrl : relevantUrls) {
|
||||
|
||||
if (!rules.isAllowed(parsedUrl.toString())) {
|
||||
maybeFlagAsBad(parsedUrl);
|
||||
continue;
|
||||
}
|
||||
|
||||
switch (fetchUrl(domainId, parsedUrl, timer, client)) {
|
||||
switch (fetchUrl(domainId, parsedUrl, timer)) {
|
||||
case FetchResult.Success(int id, EdgeUrl docUrl, String body, String headers) -> {
|
||||
dataSet.saveDocument(id, docUrl, body, headers, "");
|
||||
fetched++;
|
||||
}
|
||||
case FetchResult.Error(EdgeUrl docUrl) -> maybeFlagAsBad(docUrl);
|
||||
case FetchResult.Error(EdgeUrl docUrl) -> {
|
||||
maybeFlagAsBad(docUrl);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -150,111 +144,107 @@ public class SimpleLinkScraper implements AutoCloseable {
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private SimpleRobotRules fetchRobotsRules(EdgeUrl rootUrl, HttpClient client) throws IOException, InterruptedException, URISyntaxException {
|
||||
var robotsRequest = HttpRequest.newBuilder(rootUrl.withPathAndParam("/robots.txt", null).asURI())
|
||||
.GET()
|
||||
.header("User-Agent", WmsaHome.getUserAgent().uaString())
|
||||
.header("Accept-Encoding","gzip")
|
||||
.timeout(readTimeout);
|
||||
|
||||
// Fetch the robots.txt
|
||||
private SimpleRobotRules fetchRobotsRules(EdgeUrl rootUrl) throws URISyntaxException {
|
||||
ClassicHttpRequest request = ClassicRequestBuilder.get(rootUrl.withPathAndParam("/robots.txt", null).asURI())
|
||||
.setHeader("User-Agent", WmsaHome.getUserAgent().uaString())
|
||||
.setHeader("Accept-Encoding", "gzip")
|
||||
.build();
|
||||
|
||||
try {
|
||||
SimpleRobotRulesParser parser = new SimpleRobotRulesParser();
|
||||
HttpResponse<byte[]> robotsTxt = client.send(robotsRequest.build(), HttpResponse.BodyHandlers.ofByteArray());
|
||||
|
||||
if (robotsTxt.statusCode() == 200) {
|
||||
return parser.parseContent(rootUrl.toString(),
|
||||
getResponseData(robotsTxt),
|
||||
robotsTxt.headers().firstValue("Content-Type").orElse("text/plain"),
|
||||
WmsaHome.getUserAgent().uaIdentifier());
|
||||
return httpClient.execute(request, rsp -> {
|
||||
if (rsp.getEntity() == null) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
if (rsp.getCode() == 200) {
|
||||
var contentTypeHeader = rsp.getFirstHeader("Content-Type");
|
||||
if (contentTypeHeader == null) {
|
||||
return null; // No content type header, can't parse
|
||||
}
|
||||
return new SimpleRobotRulesParser().parseContent(
|
||||
rootUrl.toString(),
|
||||
EntityUtils.toByteArray(rsp.getEntity()),
|
||||
contentTypeHeader.getValue(),
|
||||
WmsaHome.getUserAgent().uaIdentifier()
|
||||
);
|
||||
} else if (rsp.getCode() == 404) {
|
||||
return new SimpleRobotRules(SimpleRobotRules.RobotRulesMode.ALLOW_ALL);
|
||||
}
|
||||
} finally {
|
||||
EntityUtils.consumeQuietly(rsp.getEntity());
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
catch (IOException e) {
|
||||
logger.error("Error fetching robots.txt for {}: {}", rootUrl, e.getMessage());
|
||||
return null; // I/O error fetching robots.txt
|
||||
}
|
||||
finally {
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
}
|
||||
else if (robotsTxt.statusCode() == 404) {
|
||||
return new SimpleRobotRules(SimpleRobotRules.RobotRulesMode.ALLOW_ALL);
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
catch (IOException ex) {
|
||||
logger.error("Error fetching robots.txt for {}: {} {}", rootUrl, ex.getClass().getSimpleName(), ex.getMessage());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/** Fetch a URL and store it in the database
|
||||
*/
|
||||
private FetchResult fetchUrl(int domainId, EdgeUrl parsedUrl, CrawlDelayTimer timer, HttpClient client) throws Exception {
|
||||
private FetchResult fetchUrl(int domainId, EdgeUrl parsedUrl, CrawlDelayTimer timer) throws Exception {
|
||||
|
||||
timer.waitFetchDelay();
|
||||
|
||||
HttpRequest request = HttpRequest.newBuilder(parsedUrl.asURI())
|
||||
.GET()
|
||||
.header("User-Agent", WmsaHome.getUserAgent().uaString())
|
||||
.header("Accept", "text/html")
|
||||
.header("Accept-Encoding", "gzip")
|
||||
.timeout(readTimeout)
|
||||
ClassicHttpRequest request = ClassicRequestBuilder.get(parsedUrl.asURI())
|
||||
.setHeader("User-Agent", WmsaHome.getUserAgent().uaString())
|
||||
.setHeader("Accept", "text/html")
|
||||
.setHeader("Accept-Encoding", "gzip")
|
||||
.build();
|
||||
|
||||
try {
|
||||
HttpResponse<byte[]> response = client.send(request, HttpResponse.BodyHandlers.ofByteArray());
|
||||
return httpClient.execute(request, rsp -> {
|
||||
try {
|
||||
if (rsp.getCode() == 200) {
|
||||
String contentType = rsp.getFirstHeader("Content-Type").getValue();
|
||||
if (!contentType.toLowerCase().startsWith("text/html")) {
|
||||
return new FetchResult.Error(parsedUrl);
|
||||
}
|
||||
|
||||
// Handle rate limiting by waiting and retrying once
|
||||
if (response.statusCode() == 429) {
|
||||
timer.waitRetryDelay(new HttpFetcherImpl.RateLimitException(
|
||||
response.headers().firstValue("Retry-After").orElse("5")
|
||||
));
|
||||
response = client.send(request, HttpResponse.BodyHandlers.ofByteArray());
|
||||
}
|
||||
byte[] body = EntityUtils.toByteArray(rsp.getEntity(), MAX_SIZE);
|
||||
|
||||
String contentType = response.headers().firstValue("Content-Type").orElse("").toLowerCase();
|
||||
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), body);
|
||||
|
||||
if (response.statusCode() == 200) {
|
||||
if (!contentType.toLowerCase().startsWith("text/html")) {
|
||||
return new FetchResult.Error(parsedUrl);
|
||||
StringBuilder headersStr = new StringBuilder();
|
||||
for (var header : rsp.getHeaders()) {
|
||||
headersStr.append(header.getName()).append(": ").append(header.getValue()).append("\n");
|
||||
}
|
||||
|
||||
return new FetchResult.Success(domainId, parsedUrl, bodyText, headersStr.toString());
|
||||
}
|
||||
} finally {
|
||||
if (rsp.getEntity() != null) {
|
||||
EntityUtils.consumeQuietly(rsp.getEntity());
|
||||
}
|
||||
}
|
||||
|
||||
byte[] body = getResponseData(response);
|
||||
if (body.length > MAX_SIZE) {
|
||||
return new FetchResult.Error(parsedUrl);
|
||||
}
|
||||
|
||||
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), body);
|
||||
|
||||
return new FetchResult.Success(domainId, parsedUrl, bodyText, headersToString(response.headers()));
|
||||
}
|
||||
return new FetchResult.Error(parsedUrl);
|
||||
});
|
||||
}
|
||||
catch (IOException ex) {
|
||||
// We don't want a full stack trace on every error, as it's quite common and very noisy
|
||||
logger.error("Error fetching URL {}: {} {}", parsedUrl, ex.getClass().getSimpleName(), ex.getMessage());
|
||||
catch (IOException e) {
|
||||
logger.error("Error fetching {}: {}", parsedUrl, e.getMessage());
|
||||
// If we can't fetch the URL, we return an error result
|
||||
// so that the caller can decide what to do with it.
|
||||
}
|
||||
finally {
|
||||
timer.waitFetchDelay();
|
||||
}
|
||||
|
||||
return new FetchResult.Error(parsedUrl);
|
||||
}
|
||||
|
||||
private byte[] getResponseData(HttpResponse<byte[]> response) throws IOException {
|
||||
String encoding = response.headers().firstValue("Content-Encoding").orElse("");
|
||||
|
||||
if ("gzip".equals(encoding)) {
|
||||
try (var stream = new GZIPInputStream(new ByteArrayInputStream(response.body()))) {
|
||||
return stream.readAllBytes();
|
||||
}
|
||||
}
|
||||
else {
|
||||
return response.body();
|
||||
}
|
||||
}
|
||||
|
||||
sealed interface FetchResult {
|
||||
record Success(int domainId, EdgeUrl url, String body, String headers) implements FetchResult {}
|
||||
record Error(EdgeUrl url) implements FetchResult {}
|
||||
}
|
||||
|
||||
private String headersToString(HttpHeaders headers) {
|
||||
StringBuilder headersStr = new StringBuilder();
|
||||
headers.map().forEach((k, v) -> {
|
||||
headersStr.append(k).append(": ").append(v).append("\n");
|
||||
});
|
||||
return headersStr.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
pool.shutDown();
|
||||
|
@@ -0,0 +1,126 @@
|
||||
package nu.marginalia.livecrawler.io;
|
||||
|
||||
import com.google.inject.Provider;
|
||||
import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
|
||||
import org.apache.hc.client5.http.classic.HttpClient;
|
||||
import org.apache.hc.client5.http.config.ConnectionConfig;
|
||||
import org.apache.hc.client5.http.config.RequestConfig;
|
||||
import org.apache.hc.client5.http.cookie.StandardCookieSpec;
|
||||
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
||||
import org.apache.hc.client5.http.impl.classic.HttpClients;
|
||||
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
|
||||
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
|
||||
import org.apache.hc.core5.http.HeaderElement;
|
||||
import org.apache.hc.core5.http.HeaderElements;
|
||||
import org.apache.hc.core5.http.HttpResponse;
|
||||
import org.apache.hc.core5.http.io.SocketConfig;
|
||||
import org.apache.hc.core5.http.message.MessageSupport;
|
||||
import org.apache.hc.core5.http.protocol.HttpContext;
|
||||
import org.apache.hc.core5.util.TimeValue;
|
||||
import org.apache.hc.core5.util.Timeout;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class HttpClientProvider implements Provider<HttpClient> {
|
||||
private static final HttpClient client;
|
||||
private static PoolingHttpClientConnectionManager connectionManager;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(HttpClientProvider.class);
|
||||
|
||||
static {
|
||||
try {
|
||||
client = createClient();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static CloseableHttpClient createClient() throws NoSuchAlgorithmException, KeyManagementException {
|
||||
final ConnectionConfig connectionConfig = ConnectionConfig.custom()
|
||||
.setSocketTimeout(15, TimeUnit.SECONDS)
|
||||
.setConnectTimeout(15, TimeUnit.SECONDS)
|
||||
.setValidateAfterInactivity(TimeValue.ofSeconds(5))
|
||||
.build();
|
||||
|
||||
|
||||
connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
|
||||
.setMaxConnPerRoute(2)
|
||||
.setMaxConnTotal(50)
|
||||
.setDefaultConnectionConfig(connectionConfig)
|
||||
.build();
|
||||
|
||||
connectionManager.setDefaultSocketConfig(SocketConfig.custom()
|
||||
.setSoLinger(TimeValue.ofSeconds(-1))
|
||||
.setSoTimeout(Timeout.ofSeconds(10))
|
||||
.build()
|
||||
);
|
||||
|
||||
Thread.ofPlatform().daemon(true).start(() -> {
|
||||
try {
|
||||
for (;;) {
|
||||
TimeUnit.SECONDS.sleep(15);
|
||||
logger.info("Connection pool stats: {}", connectionManager.getTotalStats());
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
});
|
||||
|
||||
final RequestConfig defaultRequestConfig = RequestConfig.custom()
|
||||
.setCookieSpec(StandardCookieSpec.IGNORE)
|
||||
.setResponseTimeout(10, TimeUnit.SECONDS)
|
||||
.setConnectionRequestTimeout(5, TimeUnit.MINUTES)
|
||||
.build();
|
||||
|
||||
return HttpClients.custom()
|
||||
.setConnectionManager(connectionManager)
|
||||
.setRetryStrategy(new RetryStrategy())
|
||||
.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
|
||||
// Default keep-alive duration is 3 minutes, but this is too long for us,
|
||||
// as we are either going to re-use it fairly quickly or close it for a long time.
|
||||
//
|
||||
// So we set it to 30 seconds or clamp the server-provided value to a minimum of 10 seconds.
|
||||
private static final TimeValue defaultValue = TimeValue.ofSeconds(30);
|
||||
|
||||
@Override
|
||||
public TimeValue getKeepAliveDuration(HttpResponse response, HttpContext context) {
|
||||
final Iterator<HeaderElement> it = MessageSupport.iterate(response, HeaderElements.KEEP_ALIVE);
|
||||
|
||||
while (it.hasNext()) {
|
||||
final HeaderElement he = it.next();
|
||||
final String param = he.getName();
|
||||
final String value = he.getValue();
|
||||
|
||||
if (value == null)
|
||||
continue;
|
||||
if (!"timeout".equalsIgnoreCase(param))
|
||||
continue;
|
||||
|
||||
try {
|
||||
long timeout = Long.parseLong(value);
|
||||
timeout = Math.clamp(timeout, 30, defaultValue.toSeconds());
|
||||
return TimeValue.ofSeconds(timeout);
|
||||
} catch (final NumberFormatException ignore) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return defaultValue;
|
||||
}
|
||||
})
|
||||
.disableRedirectHandling()
|
||||
.setDefaultRequestConfig(defaultRequestConfig)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpClient get() {
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
@@ -0,0 +1,79 @@
|
||||
package nu.marginalia.livecrawler.io;
|
||||
|
||||
import org.apache.hc.client5.http.HttpHostConnectException;
|
||||
import org.apache.hc.client5.http.HttpRequestRetryStrategy;
|
||||
import org.apache.hc.core5.http.HttpRequest;
|
||||
import org.apache.hc.core5.http.HttpResponse;
|
||||
import org.apache.hc.core5.http.protocol.HttpContext;
|
||||
import org.apache.hc.core5.util.TimeValue;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.net.ssl.SSLException;
|
||||
import java.io.IOException;
|
||||
import java.net.SocketException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
public class RetryStrategy implements HttpRequestRetryStrategy {
|
||||
private static final Logger logger = LoggerFactory.getLogger(RetryStrategy.class);
|
||||
|
||||
@Override
|
||||
public boolean retryRequest(HttpRequest request, IOException exception, int executionCount, HttpContext context) {
|
||||
return switch (exception) {
|
||||
case SocketTimeoutException ste -> false;
|
||||
case SSLException ssle -> false;
|
||||
case UnknownHostException uhe -> false;
|
||||
case HttpHostConnectException ex -> executionCount < 2;
|
||||
case SocketException ex -> executionCount < 2;
|
||||
default -> executionCount <= 3;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean retryRequest(HttpResponse response, int executionCount, HttpContext context) {
|
||||
return switch (response.getCode()) {
|
||||
case 500, 503 -> executionCount <= 2;
|
||||
case 429 -> executionCount <= 3;
|
||||
default -> false;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue getRetryInterval(HttpRequest request, IOException exception, int executionCount, HttpContext context) {
|
||||
return TimeValue.ofSeconds(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue getRetryInterval(HttpResponse response, int executionCount, HttpContext context) {
|
||||
|
||||
int statusCode = response.getCode();
|
||||
|
||||
// Give 503 a bit more time
|
||||
if (statusCode == 503) return TimeValue.ofSeconds(5);
|
||||
|
||||
if (statusCode == 429) {
|
||||
// get the Retry-After header
|
||||
var retryAfterHeader = response.getFirstHeader("Retry-After");
|
||||
if (retryAfterHeader == null) {
|
||||
return TimeValue.ofSeconds(3);
|
||||
}
|
||||
|
||||
String retryAfter = retryAfterHeader.getValue();
|
||||
if (retryAfter == null) {
|
||||
return TimeValue.ofSeconds(2);
|
||||
}
|
||||
|
||||
try {
|
||||
int retryAfterTime = Integer.parseInt(retryAfter);
|
||||
retryAfterTime = Math.clamp(retryAfterTime, 1, 5);
|
||||
|
||||
return TimeValue.ofSeconds(retryAfterTime);
|
||||
} catch (NumberFormatException e) {
|
||||
logger.warn("Invalid Retry-After header: {}", retryAfter);
|
||||
}
|
||||
}
|
||||
|
||||
return TimeValue.ofSeconds(2);
|
||||
}
|
||||
}
|
@@ -3,10 +3,13 @@ package nu.marginalia.livecrawler;
|
||||
import nu.marginalia.coordination.LocalDomainCoordinator;
|
||||
import nu.marginalia.db.DomainBlacklistImpl;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.livecrawler.io.HttpClientProvider;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.model.EdgeUrl;
|
||||
import nu.marginalia.model.crawldata.CrawledDocument;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
||||
import org.apache.hc.core5.io.CloseMode;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
@@ -16,29 +19,34 @@ import org.mockito.Mockito;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.sql.SQLException;
|
||||
import java.util.List;
|
||||
|
||||
class SimpleLinkScraperTest {
|
||||
private Path tempDir;
|
||||
private LiveCrawlDataSet dataSet;
|
||||
private CloseableHttpClient httpClient;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws IOException, SQLException {
|
||||
public void setUp() throws IOException, SQLException, NoSuchAlgorithmException, KeyManagementException {
|
||||
tempDir = Files.createTempDirectory(getClass().getSimpleName());
|
||||
dataSet = new LiveCrawlDataSet(tempDir);
|
||||
httpClient = HttpClientProvider.createClient();
|
||||
}
|
||||
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() throws Exception {
|
||||
dataSet.close();
|
||||
httpClient.close(CloseMode.IMMEDIATE);
|
||||
FileUtils.deleteDirectory(tempDir.toFile());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetrieveNow() throws Exception {
|
||||
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(), null, Mockito.mock(DomainBlacklistImpl.class));
|
||||
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(), null, httpClient, Mockito.mock(DomainBlacklistImpl.class));
|
||||
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
|
||||
Assertions.assertEquals(1, fetched);
|
||||
|
||||
@@ -58,7 +66,7 @@ class SimpleLinkScraperTest {
|
||||
@Test
|
||||
public void testRetrieveNow_Redundant() throws Exception {
|
||||
dataSet.saveDocument(1, new EdgeUrl("https://www.marginalia.nu/"), "<html>", "", "127.0.0.1");
|
||||
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(),null, Mockito.mock(DomainBlacklistImpl.class));
|
||||
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(),null, httpClient, Mockito.mock(DomainBlacklistImpl.class));
|
||||
|
||||
// If the requested URL is already in the dataSet, we retrieveNow should shortcircuit and not fetch anything
|
||||
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
|
||||
|
@@ -13,14 +13,23 @@
|
||||
{{#unless node.profile.realtime}}
|
||||
<li class="nav-item dropdown">
|
||||
<a class="nav-link dropdown-toggle {{#if tab.actions}}active{{/if}}" data-bs-toggle="dropdown" href="#" role="button" aria-expanded="false">Actions</a>
|
||||
{{#if node.profile.permitBatchCrawl}}
|
||||
<ul class="dropdown-menu">
|
||||
{{#if node.profile.permitBatchCrawl}}
|
||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=new-crawl">New Crawl</a></li>
|
||||
<li><hr class="dropdown-divider"></li>
|
||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=process">Process Crawl Data</a></li>
|
||||
{{/if}}
|
||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=load">Load Processed Data</a></li>
|
||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=repartition">Repartition Index</a></li>
|
||||
<li><hr class="dropdown-divider"></li>
|
||||
{{#if node.profile.permitSideload}}
|
||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-encyclopedia">Sideload Encyclopedia</a></li>
|
||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-stackexchange">Sideload Stackexchange</a></li>
|
||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-warc">Sideload WARC Files</a></li>
|
||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-dirtree">Sideload Dirtree</a></li>
|
||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-reddit">Sideload Reddit</a></li>
|
||||
<li><hr class="dropdown-divider"></li>
|
||||
{{/if}}
|
||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=download-sample-data">Download Sample Crawl Data</a></li>
|
||||
<li><hr class="dropdown-divider"></li>
|
||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=export-db-data">Export Database Data</a></li>
|
||||
@@ -30,19 +39,6 @@
|
||||
<li><hr class="dropdown-divider"></li>
|
||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=restore-backup">Restore Index Backup</a></li>
|
||||
</ul>
|
||||
{{/if}}
|
||||
{{#if node.profile.permitSideload}}
|
||||
<ul class="dropdown-menu">
|
||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-encyclopedia">Sideload Encyclopedia</a></li>
|
||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-stackexchange">Sideload Stackexchange</a></li>
|
||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-warc">Sideload WARC Files</a></li>
|
||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-dirtree">Sideload Dirtree</a></li>
|
||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-reddit">Sideload Reddit</a></li>
|
||||
<li><hr class="dropdown-divider"></li>
|
||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=load">Load Processed Data</a></li>
|
||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=restore-backup">Restore Index Backup</a></li>
|
||||
</ul>
|
||||
{{/if}}
|
||||
</li>
|
||||
{{/unless}}
|
||||
<li class="nav-item">
|
||||
|
Reference in New Issue
Block a user