1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

19 Commits

Author SHA1 Message Date
Viktor Lofgren
982dcb28f0 (live-crawler) Use Apache HttpClient + code cleanup 2025-06-24 13:04:19 +02:00
Viktor Lofgren
fc686d8b2e (live-crawler) Fix startup race condition
The fix makes sure we wait for the feeds API to be available before fetching from it, so that the process doesn't crash on a cold system reboot.
2025-06-24 11:42:41 +02:00
Viktor Lofgren
69ef0f334a (rss) Make feed fetcher use Apache's HttpClient 2025-06-23 18:49:55 +02:00
Viktor Lofgren
446746f3bd (control) Fix so that sideload actions show up in Mixed profile nodes 2025-06-23 18:08:09 +02:00
Viktor Lofgren
24ab8398bb (ndp) Use LinkGraphClient to populate NDP table 2025-06-23 16:44:38 +02:00
Viktor Lofgren
d2ceeff4cf (ndp) Add toggle for excluding nodes from assignment via NDP 2025-06-23 15:38:02 +02:00
Viktor Lofgren
cf64214b1c (ndp) Update documentation 2025-06-23 15:18:35 +02:00
Viktor Lofgren
e50d09cc01 (crawler) Remove illegal requests when denied via robots.txt
The commit removes attempts at probing the root document, feed URLs, and favicon if we are not permitted to do so via robots.txt
2025-06-22 17:10:44 +02:00
Viktor Lofgren
bce3892ce0 (ndp) Simplify code 2025-06-22 16:08:55 +02:00
Viktor Lofgren
36581b25c2 (ndp) Fix process tracking in domain discovery process 2025-06-21 14:35:25 +02:00
Viktor Lofgren
52ff7fb4dd (ndp) Add a process for adding new domains to be crawled
This is a working "work in progress" commit, will need more refinement, but given the usual difficulties in testing crawler-adjacent code without actually crawling, it needs some maturation time in production.
2025-06-21 14:10:27 +02:00
Viktor Lofgren
a4e49e658a (ping) Add README for ping 2025-06-19 11:21:52 +02:00
Viktor Lofgren
e2c56dc3ca (search) Clean up the rate limiting
We fail quietly to make life harder for the bot farmers
2025-06-18 11:26:30 +02:00
Viktor Lofgren
470b866008 (search) Clean up the rate limiting
We fail quietly to make life harder for the bot farmers
2025-06-18 11:22:26 +02:00
Viktor Lofgren
4895a2ac7a (search) Clean up the rate limiting
We fail quietly to make life harder for the bot farmers
2025-06-18 11:20:24 +02:00
Viktor Lofgren
fd32ae9fa7 (search) Add automatic rate limiting to /site
Fix typo
2025-06-18 11:10:08 +02:00
Viktor Lofgren
470651ea4c (search) Add automatic rate limiting to /site 2025-06-18 11:04:36 +02:00
Viktor Lofgren
8d4829e783 (ping) Change cookie specification to ignore cookies 2025-06-17 12:26:34 +02:00
Viktor Lofgren
1290bc15dc (ping) Reduce retries for SocketException and pals 2025-06-16 22:35:33 +02:00
50 changed files with 1768 additions and 230 deletions

View File

@@ -45,7 +45,7 @@ public class NodeConfigurationService {
public List<NodeConfiguration> getAll() { public List<NodeConfiguration> getAll() {
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var qs = conn.prepareStatement(""" var qs = conn.prepareStatement("""
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, KEEP_WARCS, NODE_PROFILE, DISABLED SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, AUTO_ASSIGN_DOMAINS, KEEP_WARCS, NODE_PROFILE, DISABLED
FROM NODE_CONFIGURATION FROM NODE_CONFIGURATION
""")) { """)) {
var rs = qs.executeQuery(); var rs = qs.executeQuery();
@@ -59,6 +59,7 @@ public class NodeConfigurationService {
rs.getBoolean("ACCEPT_QUERIES"), rs.getBoolean("ACCEPT_QUERIES"),
rs.getBoolean("AUTO_CLEAN"), rs.getBoolean("AUTO_CLEAN"),
rs.getBoolean("PRECESSION"), rs.getBoolean("PRECESSION"),
rs.getBoolean("AUTO_ASSIGN_DOMAINS"),
rs.getBoolean("KEEP_WARCS"), rs.getBoolean("KEEP_WARCS"),
NodeProfile.valueOf(rs.getString("NODE_PROFILE")), NodeProfile.valueOf(rs.getString("NODE_PROFILE")),
rs.getBoolean("DISABLED") rs.getBoolean("DISABLED")
@@ -75,7 +76,7 @@ public class NodeConfigurationService {
public NodeConfiguration get(int nodeId) throws SQLException { public NodeConfiguration get(int nodeId) throws SQLException {
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var qs = conn.prepareStatement(""" var qs = conn.prepareStatement("""
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, KEEP_WARCS, NODE_PROFILE, DISABLED SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, AUTO_ASSIGN_DOMAINS, KEEP_WARCS, NODE_PROFILE, DISABLED
FROM NODE_CONFIGURATION FROM NODE_CONFIGURATION
WHERE ID=? WHERE ID=?
""")) { """)) {
@@ -88,6 +89,7 @@ public class NodeConfigurationService {
rs.getBoolean("ACCEPT_QUERIES"), rs.getBoolean("ACCEPT_QUERIES"),
rs.getBoolean("AUTO_CLEAN"), rs.getBoolean("AUTO_CLEAN"),
rs.getBoolean("PRECESSION"), rs.getBoolean("PRECESSION"),
rs.getBoolean("AUTO_ASSIGN_DOMAINS"),
rs.getBoolean("KEEP_WARCS"), rs.getBoolean("KEEP_WARCS"),
NodeProfile.valueOf(rs.getString("NODE_PROFILE")), NodeProfile.valueOf(rs.getString("NODE_PROFILE")),
rs.getBoolean("DISABLED") rs.getBoolean("DISABLED")
@@ -102,7 +104,7 @@ public class NodeConfigurationService {
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var us = conn.prepareStatement(""" var us = conn.prepareStatement("""
UPDATE NODE_CONFIGURATION UPDATE NODE_CONFIGURATION
SET DESCRIPTION=?, ACCEPT_QUERIES=?, AUTO_CLEAN=?, PRECESSION=?, KEEP_WARCS=?, DISABLED=?, NODE_PROFILE=? SET DESCRIPTION=?, ACCEPT_QUERIES=?, AUTO_CLEAN=?, PRECESSION=?, AUTO_ASSIGN_DOMAINS=?, KEEP_WARCS=?, DISABLED=?, NODE_PROFILE=?
WHERE ID=? WHERE ID=?
""")) """))
{ {
@@ -110,10 +112,11 @@ public class NodeConfigurationService {
us.setBoolean(2, config.acceptQueries()); us.setBoolean(2, config.acceptQueries());
us.setBoolean(3, config.autoClean()); us.setBoolean(3, config.autoClean());
us.setBoolean(4, config.includeInPrecession()); us.setBoolean(4, config.includeInPrecession());
us.setBoolean(5, config.keepWarcs()); us.setBoolean(5, config.autoAssignDomains());
us.setBoolean(6, config.disabled()); us.setBoolean(6, config.keepWarcs());
us.setString(7, config.profile().name()); us.setBoolean(7, config.disabled());
us.setInt(8, config.node()); us.setString(8, config.profile().name());
us.setInt(9, config.node());
if (us.executeUpdate() <= 0) if (us.executeUpdate() <= 0)
throw new IllegalStateException("Failed to update configuration"); throw new IllegalStateException("Failed to update configuration");

View File

@@ -5,6 +5,7 @@ public record NodeConfiguration(int node,
boolean acceptQueries, boolean acceptQueries,
boolean autoClean, boolean autoClean,
boolean includeInPrecession, boolean includeInPrecession,
boolean autoAssignDomains,
boolean keepWarcs, boolean keepWarcs,
NodeProfile profile, NodeProfile profile,
boolean disabled boolean disabled

View File

@@ -20,9 +20,7 @@ public enum NodeProfile {
} }
public boolean permitBatchCrawl() { public boolean permitBatchCrawl() {
return isBatchCrawl() ||isMixed(); return isBatchCrawl() || isMixed();
}
public boolean permitSideload() {
return isMixed() || isSideload();
} }
public boolean permitSideload() { return isSideload() || isMixed(); }
} }

View File

@@ -2,6 +2,7 @@ package nu.marginalia.nodecfg;
import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.nodecfg.model.NodeConfiguration;
import nu.marginalia.nodecfg.model.NodeProfile; import nu.marginalia.nodecfg.model.NodeProfile;
import nu.marginalia.test.TestMigrationLoader; import nu.marginalia.test.TestMigrationLoader;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
@@ -62,6 +63,63 @@ public class NodeConfigurationServiceTest {
assertEquals(2, list.size()); assertEquals(2, list.size());
assertEquals(a, list.get(0)); assertEquals(a, list.get(0));
assertEquals(b, list.get(1)); assertEquals(b, list.get(1));
}
// Test all the fields that are only exposed via save()
@Test
public void testSaveChanges() throws SQLException {
var original = nodeConfigurationService.create(1, "Test", false, false, NodeProfile.MIXED);
assertEquals(1, original.node());
assertEquals("Test", original.description());
assertFalse(original.acceptQueries());
var precession = new NodeConfiguration(
original.node(),
"Foo",
true,
original.autoClean(),
original.includeInPrecession(),
!original.autoAssignDomains(),
original.keepWarcs(),
original.profile(),
original.disabled()
);
nodeConfigurationService.save(precession);
precession = nodeConfigurationService.get(original.node());
assertNotEquals(original.autoAssignDomains(), precession.autoAssignDomains());
var autoClean = new NodeConfiguration(
original.node(),
"Foo",
true,
!original.autoClean(),
original.includeInPrecession(),
original.autoAssignDomains(),
original.keepWarcs(),
original.profile(),
original.disabled()
);
nodeConfigurationService.save(autoClean);
autoClean = nodeConfigurationService.get(original.node());
assertNotEquals(original.autoClean(), autoClean.autoClean());
var disabled = new NodeConfiguration(
original.node(),
"Foo",
true,
autoClean.autoClean(),
autoClean.includeInPrecession(),
autoClean.autoAssignDomains(),
autoClean.keepWarcs(),
autoClean.profile(),
!autoClean.disabled()
);
nodeConfigurationService.save(disabled);
disabled = nodeConfigurationService.get(original.node());
assertNotEquals(autoClean.disabled(), disabled.disabled());
} }
} }

View File

@@ -0,0 +1,12 @@
-- Table holding domains to be processed by the NDP in order to figure out whether to add them to
-- be crawled.
CREATE TABLE IF NOT EXISTS NDP_NEW_DOMAINS(
DOMAIN_ID INT NOT NULL PRIMARY KEY,
STATE ENUM ('NEW', 'ACCEPTED', 'REJECTED') NOT NULL DEFAULT 'NEW',
PRIORITY INT NOT NULL DEFAULT 0,
TS_CHANGE TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
CHECK_COUNT INT NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS NDP_NEW_DOMAINS__STATE_PRIORITY ON NDP_NEW_DOMAINS (STATE, PRIORITY DESC);

View File

@@ -0,0 +1,3 @@
-- Migration script to add AUTO_ASSIGN_DOMAINS column to NODE_CONFIGURATION table
ALTER TABLE NODE_CONFIGURATION ADD COLUMN AUTO_ASSIGN_DOMAINS BOOLEAN NOT NULL DEFAULT TRUE;

View File

@@ -20,6 +20,7 @@ dependencies {
implementation project(':code:processes:live-crawling-process') implementation project(':code:processes:live-crawling-process')
implementation project(':code:processes:loading-process') implementation project(':code:processes:loading-process')
implementation project(':code:processes:ping-process') implementation project(':code:processes:ping-process')
implementation project(':code:processes:new-domain-process')
implementation project(':code:processes:converting-process') implementation project(':code:processes:converting-process')
implementation project(':code:processes:index-constructor-process') implementation project(':code:processes:index-constructor-process')
@@ -41,7 +42,6 @@ dependencies {
implementation project(':code:functions:nsfw-domain-filter') implementation project(':code:functions:nsfw-domain-filter')
implementation project(':code:execution:api') implementation project(':code:execution:api')
implementation project(':code:processes:crawling-process:model')
implementation project(':code:processes:crawling-process:model') implementation project(':code:processes:crawling-process:model')
implementation project(':code:processes:crawling-process:ft-link-parser') implementation project(':code:processes:crawling-process:ft-link-parser')
implementation project(':code:index:index-journal') implementation project(':code:index:index-journal')

View File

@@ -14,6 +14,7 @@ public enum ExecutorActor {
PROC_CRAWLER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), PROC_CRAWLER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
PROC_PING_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.REALTIME), PROC_PING_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.REALTIME),
PROC_EXPORT_TASKS_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), PROC_EXPORT_TASKS_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
PROC_NDP_SPAWNER(NodeProfile.MIXED, NodeProfile.REALTIME),
ADJACENCY_CALCULATION(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), ADJACENCY_CALCULATION(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
EXPORT_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), EXPORT_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
EXPORT_SEGMENTATION_MODEL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), EXPORT_SEGMENTATION_MODEL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),

View File

@@ -49,6 +49,7 @@ public class ExecutorActorControlService {
RecrawlSingleDomainActor recrawlSingleDomainActor, RecrawlSingleDomainActor recrawlSingleDomainActor,
RestoreBackupActor restoreBackupActor, RestoreBackupActor restoreBackupActor,
ConverterMonitorActor converterMonitorFSM, ConverterMonitorActor converterMonitorFSM,
NdpMonitorActor ndpMonitorActor,
PingMonitorActor pingMonitorActor, PingMonitorActor pingMonitorActor,
CrawlerMonitorActor crawlerMonitorActor, CrawlerMonitorActor crawlerMonitorActor,
LiveCrawlerMonitorActor liveCrawlerMonitorActor, LiveCrawlerMonitorActor liveCrawlerMonitorActor,
@@ -93,7 +94,7 @@ public class ExecutorActorControlService {
register(ExecutorActor.PROC_PING_SPAWNER, pingMonitorActor); register(ExecutorActor.PROC_PING_SPAWNER, pingMonitorActor);
register(ExecutorActor.PROC_LIVE_CRAWL_SPAWNER, liveCrawlerMonitorActor); register(ExecutorActor.PROC_LIVE_CRAWL_SPAWNER, liveCrawlerMonitorActor);
register(ExecutorActor.PROC_EXPORT_TASKS_SPAWNER, exportTasksMonitorActor); register(ExecutorActor.PROC_EXPORT_TASKS_SPAWNER, exportTasksMonitorActor);
register(ExecutorActor.PROC_NDP_SPAWNER, ndpMonitorActor);
register(ExecutorActor.MONITOR_PROCESS_LIVENESS, processMonitorFSM); register(ExecutorActor.MONITOR_PROCESS_LIVENESS, processMonitorFSM);
register(ExecutorActor.MONITOR_FILE_STORAGE, fileStorageMonitorActor); register(ExecutorActor.MONITOR_FILE_STORAGE, fileStorageMonitorActor);

View File

@@ -0,0 +1,29 @@
package nu.marginalia.actor.proc;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessService;
import nu.marginalia.service.module.ServiceConfiguration;
@Singleton
public class NdpMonitorActor extends AbstractProcessSpawnerActor {
@Inject
public NdpMonitorActor(Gson gson,
ServiceConfiguration configuration,
MqPersistence persistence,
ProcessService processService) {
super(gson,
configuration,
persistence,
processService,
ProcessInboxNames.NDP_INBOX,
ProcessService.ProcessId.NDP);
}
}

View File

@@ -8,6 +8,7 @@ import nu.marginalia.crawl.CrawlerMain;
import nu.marginalia.index.IndexConstructorMain; import nu.marginalia.index.IndexConstructorMain;
import nu.marginalia.livecrawler.LiveCrawlerMain; import nu.marginalia.livecrawler.LiveCrawlerMain;
import nu.marginalia.loading.LoaderMain; import nu.marginalia.loading.LoaderMain;
import nu.marginalia.ndp.NdpMain;
import nu.marginalia.ping.PingMain; import nu.marginalia.ping.PingMain;
import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.BaseServiceParams; import nu.marginalia.service.server.BaseServiceParams;
@@ -57,6 +58,7 @@ public class ProcessService {
CONVERTER(ConverterMain.class), CONVERTER(ConverterMain.class),
LOADER(LoaderMain.class), LOADER(LoaderMain.class),
INDEX_CONSTRUCTOR(IndexConstructorMain.class), INDEX_CONSTRUCTOR(IndexConstructorMain.class),
NDP(NdpMain.class),
EXPORT_TASKS(ExportTasksMain.class), EXPORT_TASKS(ExportTasksMain.class),
; ;
@@ -72,6 +74,7 @@ public class ProcessService {
case CONVERTER -> "CONVERTER_PROCESS_OPTS"; case CONVERTER -> "CONVERTER_PROCESS_OPTS";
case LOADER -> "LOADER_PROCESS_OPTS"; case LOADER -> "LOADER_PROCESS_OPTS";
case PING -> "PING_PROCESS_OPTS"; case PING -> "PING_PROCESS_OPTS";
case NDP -> "NDP_PROCESS_OPTS";
case INDEX_CONSTRUCTOR -> "INDEX_CONSTRUCTION_PROCESS_OPTS"; case INDEX_CONSTRUCTOR -> "INDEX_CONSTRUCTION_PROCESS_OPTS";
case EXPORT_TASKS -> "EXPORT_TASKS_PROCESS_OPTS"; case EXPORT_TASKS -> "EXPORT_TASKS_PROCESS_OPTS";
}; };

View File

@@ -11,6 +11,7 @@ import nu.marginalia.service.discovery.property.ServicePartition;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
import javax.annotation.CheckReturnValue; import javax.annotation.CheckReturnValue;
import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@@ -59,6 +60,11 @@ public class FeedsClient {
.forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList()))); .forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList())));
} }
public boolean waitReady(Duration duration) throws InterruptedException {
return channelPool.awaitChannel(duration);
}
/** Get the hash of the feed data, for identifying when the data has been updated */ /** Get the hash of the feed data, for identifying when the data has been updated */
public String getFeedDataHash() { public String getFeedDataHash() {
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash) return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash)

View File

@@ -35,6 +35,7 @@ dependencies {
implementation libs.bundles.slf4j implementation libs.bundles.slf4j
implementation libs.commons.lang3 implementation libs.commons.lang3
implementation libs.commons.io implementation libs.commons.io
implementation libs.httpclient
implementation libs.wiremock implementation libs.wiremock
implementation libs.prometheus implementation libs.prometheus

View File

@@ -20,19 +20,36 @@ import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorage; import nu.marginalia.storage.model.FileStorage;
import nu.marginalia.storage.model.FileStorageType; import nu.marginalia.storage.model.FileStorageType;
import nu.marginalia.util.SimpleBlockingThreadPool; import nu.marginalia.util.SimpleBlockingThreadPool;
import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.cookie.StandardCookieSpec;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HeaderElement;
import org.apache.hc.core5.http.HeaderElements;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.io.SocketConfig;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
import org.apache.hc.core5.http.message.MessageSupport;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.sql.SQLException; import java.sql.SQLException;
import java.time.*; import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@@ -55,6 +72,8 @@ public class FeedFetcherService {
private final DomainCoordinator domainCoordinator; private final DomainCoordinator domainCoordinator;
private final HttpClient httpClient;
private volatile boolean updating; private volatile boolean updating;
@Inject @Inject
@@ -71,6 +90,83 @@ public class FeedFetcherService {
this.serviceHeartbeat = serviceHeartbeat; this.serviceHeartbeat = serviceHeartbeat;
this.executorClient = executorClient; this.executorClient = executorClient;
this.domainCoordinator = domainCoordinator; this.domainCoordinator = domainCoordinator;
final ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setSocketTimeout(15, TimeUnit.SECONDS)
.setConnectTimeout(15, TimeUnit.SECONDS)
.setValidateAfterInactivity(TimeValue.ofSeconds(5))
.build();
var connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
.setMaxConnPerRoute(2)
.setMaxConnTotal(50)
.setDefaultConnectionConfig(connectionConfig)
.build();
connectionManager.setDefaultSocketConfig(SocketConfig.custom()
.setSoLinger(TimeValue.ofSeconds(-1))
.setSoTimeout(Timeout.ofSeconds(10))
.build()
);
Thread.ofPlatform().daemon(true).start(() -> {
try {
for (;;) {
TimeUnit.SECONDS.sleep(15);
logger.info("Connection pool stats: {}", connectionManager.getTotalStats());
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
final RequestConfig defaultRequestConfig = RequestConfig.custom()
.setCookieSpec(StandardCookieSpec.IGNORE)
.setResponseTimeout(10, TimeUnit.SECONDS)
.setConnectionRequestTimeout(5, TimeUnit.MINUTES)
.build();
httpClient = HttpClients.custom()
.setDefaultRequestConfig(defaultRequestConfig)
.setConnectionManager(connectionManager)
.setUserAgent(WmsaHome.getUserAgent().uaIdentifier())
.setConnectionManager(connectionManager)
.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
// Default keep-alive duration is 3 minutes, but this is too long for us,
// as we are either going to re-use it fairly quickly or close it for a long time.
//
// So we set it to 30 seconds or clamp the server-provided value to a minimum of 10 seconds.
private static final TimeValue defaultValue = TimeValue.ofSeconds(30);
@Override
public TimeValue getKeepAliveDuration(HttpResponse response, HttpContext context) {
final Iterator<HeaderElement> it = MessageSupport.iterate(response, HeaderElements.KEEP_ALIVE);
while (it.hasNext()) {
final HeaderElement he = it.next();
final String param = he.getName();
final String value = he.getValue();
if (value == null)
continue;
if (!"timeout".equalsIgnoreCase(param))
continue;
try {
long timeout = Long.parseLong(value);
timeout = Math.clamp(timeout, 30, defaultValue.toSeconds());
return TimeValue.ofSeconds(timeout);
} catch (final NumberFormatException ignore) {
break;
}
}
return defaultValue;
}
})
.build();
} }
public enum UpdateMode { public enum UpdateMode {
@@ -86,13 +182,7 @@ public class FeedFetcherService {
try (FeedDbWriter writer = feedDb.createWriter(); try (FeedDbWriter writer = feedDb.createWriter();
HttpClient client = HttpClient.newBuilder() ExecutorService fetchExecutor = Executors.newVirtualThreadPerTaskExecutor();
.connectTimeout(Duration.ofSeconds(15))
.executor(Executors.newCachedThreadPool())
.followRedirects(HttpClient.Redirect.NORMAL)
.version(HttpClient.Version.HTTP_2)
.build();
ExecutorService fetchExecutor = Executors.newCachedThreadPool();
FeedJournal feedJournal = FeedJournal.create(); FeedJournal feedJournal = FeedJournal.create();
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds") var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds")
) { ) {
@@ -137,7 +227,8 @@ public class FeedFetcherService {
FetchResult feedData; FetchResult feedData;
try (DomainLock domainLock = domainCoordinator.lockDomain(new EdgeDomain(feed.domain()))) { try (DomainLock domainLock = domainCoordinator.lockDomain(new EdgeDomain(feed.domain()))) {
feedData = fetchFeedData(feed, client, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag); feedData = fetchFeedData(feed, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
TimeUnit.SECONDS.sleep(1); // Sleep before we yield the lock to avoid hammering the server from multiple processes
} catch (Exception ex) { } catch (Exception ex) {
feedData = new FetchResult.TransientError(); feedData = new FetchResult.TransientError();
} }
@@ -216,7 +307,6 @@ public class FeedFetcherService {
} }
private FetchResult fetchFeedData(FeedDefinition feed, private FetchResult fetchFeedData(FeedDefinition feed,
HttpClient client,
ExecutorService executorService, ExecutorService executorService,
@Nullable String ifModifiedSinceDate, @Nullable String ifModifiedSinceDate,
@Nullable String ifNoneMatchTag) @Nullable String ifNoneMatchTag)
@@ -224,59 +314,63 @@ public class FeedFetcherService {
try { try {
URI uri = new URI(feed.feedUrl()); URI uri = new URI(feed.feedUrl());
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder() var requestBuilder = ClassicRequestBuilder.get(uri)
.GET() .setHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier())
.uri(uri) .setHeader("Accept-Encoding", "gzip")
.header("User-Agent", WmsaHome.getUserAgent().uaIdentifier()) .setHeader("Accept", "text/*, */*;q=0.9");
.header("Accept-Encoding", "gzip")
.header("Accept", "text/*, */*;q=0.9")
.timeout(Duration.ofSeconds(15))
;
// Set the If-Modified-Since or If-None-Match headers if we have them // Set the If-Modified-Since or If-None-Match headers if we have them
// though since there are certain idiosyncrasies in server implementations, // though since there are certain idiosyncrasies in server implementations,
// we avoid setting both at the same time as that may turn a 304 into a 200. // we avoid setting both at the same time as that may turn a 304 into a 200.
if (ifNoneMatchTag != null) { if (ifNoneMatchTag != null) {
requestBuilder.header("If-None-Match", ifNoneMatchTag); requestBuilder.addHeader("If-None-Match", ifNoneMatchTag);
} else if (ifModifiedSinceDate != null) { } else if (ifModifiedSinceDate != null) {
requestBuilder.header("If-Modified-Since", ifModifiedSinceDate); requestBuilder.addHeader("If-Modified-Since", ifModifiedSinceDate);
} }
return httpClient.execute(requestBuilder.build(), rsp -> {
try {
logger.info("Code: {}, URL: {}", rsp.getCode(), uri);
HttpRequest getRequest = requestBuilder.build(); switch (rsp.getCode()) {
case 200 -> {
if (rsp.getEntity() == null) {
return new FetchResult.TransientError(); // No content to read, treat as transient error
}
byte[] responseData = EntityUtils.toByteArray(rsp.getEntity());
for (int i = 0; i < 3; i++) { // Decode the response body based on the Content-Type header
Header contentTypeHeader = rsp.getFirstHeader("Content-Type");
if (contentTypeHeader == null) {
return new FetchResult.TransientError();
}
String contentType = contentTypeHeader.getValue();
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), responseData);
/* Note we need to use an executor to time-limit the send() method in HttpClient, as // Grab the ETag header if it exists
* its support for timeouts only applies to the time until response starts to be received, Header etagHeader = rsp.getFirstHeader("ETag");
* and does not catch the case when the server starts to send data but then hangs. String newEtagValue = etagHeader == null ? null : etagHeader.getValue();
*/
HttpResponse<byte[]> rs = executorService.submit(
() -> client.send(getRequest, HttpResponse.BodyHandlers.ofByteArray()))
.get(15, TimeUnit.SECONDS);
if (rs.statusCode() == 429) { // Too Many Requests return new FetchResult.Success(bodyText, newEtagValue);
int retryAfter = Integer.parseInt(rs.headers().firstValue("Retry-After").orElse("2")); }
Thread.sleep(Duration.ofSeconds(Math.clamp(retryAfter, 1, 5))); case 304 -> {
continue; return new FetchResult.NotModified(); // via If-Modified-Since semantics
} }
case 404 -> {
String newEtagValue = rs.headers().firstValue("ETag").orElse(""); return new FetchResult.PermanentError(); // never try again
}
return switch (rs.statusCode()) { default -> {
case 200 -> { return new FetchResult.TransientError(); // we try again later
byte[] responseData = getResponseData(rs); }
String contentType = rs.headers().firstValue("Content-Type").orElse("");
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), responseData);
yield new FetchResult.Success(bodyText, newEtagValue);
} }
case 304 -> new FetchResult.NotModified(); // via If-Modified-Since semantics }
case 404 -> new FetchResult.PermanentError(); // never try again catch (Exception ex) {
default -> new FetchResult.TransientError(); // we try again later return new FetchResult.PermanentError(); // treat as permanent error
}; }
} finally {
EntityUtils.consumeQuietly(rsp.getEntity());
}
});
} }
catch (Exception ex) { catch (Exception ex) {
logger.debug("Error fetching feed", ex); logger.debug("Error fetching feed", ex);
@@ -285,19 +379,6 @@ public class FeedFetcherService {
return new FetchResult.TransientError(); return new FetchResult.TransientError();
} }
private byte[] getResponseData(HttpResponse<byte[]> response) throws IOException {
String encoding = response.headers().firstValue("Content-Encoding").orElse("");
if ("gzip".equals(encoding)) {
try (var stream = new GZIPInputStream(new ByteArrayInputStream(response.body()))) {
return stream.readAllBytes();
}
}
else {
return response.body();
}
}
public sealed interface FetchResult { public sealed interface FetchResult {
record Success(String value, String etag) implements FetchResult {} record Success(String value, String etag) implements FetchResult {}
record NotModified() implements FetchResult {} record NotModified() implements FetchResult {}

View File

@@ -5,6 +5,8 @@ import com.google.inject.Guice;
import com.google.inject.name.Names; import com.google.inject.name.Names;
import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.coordination.LocalDomainCoordinator;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.rss.db.FeedDb; import nu.marginalia.rss.db.FeedDb;
import nu.marginalia.rss.model.FeedItems; import nu.marginalia.rss.model.FeedItems;
@@ -82,6 +84,7 @@ class FeedFetcherServiceTest extends AbstractModule {
} }
public void configure() { public void configure() {
bind(DomainCoordinator.class).to(LocalDomainCoordinator.class);
bind(HikariDataSource.class).toInstance(dataSource); bind(HikariDataSource.class).toInstance(dataSource);
bind(ServiceRegistryIf.class).toInstance(Mockito.mock(ServiceRegistryIf.class)); bind(ServiceRegistryIf.class).toInstance(Mockito.mock(ServiceRegistryIf.class));
bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration(ServiceId.Executor, 1, "", "", 0, UUID.randomUUID())); bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration(ServiceId.Executor, 1, "", "", 0, UUID.randomUUID()));

View File

@@ -115,9 +115,13 @@ public class CrawlerRetreiver implements AutoCloseable {
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(probedUrl.domain, warcRecorder); final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(probedUrl.domain, warcRecorder);
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay()); final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
if (!robotsRules.isAllowed(probedUrl.toString())) {
warcRecorder.flagAsRobotsTxtError(probedUrl);
yield 1; // Nothing we can do here, we aren't allowed to fetch the root URL
}
delayTimer.waitFetchDelay(0); // initial delay after robots.txt delayTimer.waitFetchDelay(0); // initial delay after robots.txt
DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(probedUrl, delayTimer); DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(probedUrl, robotsRules, delayTimer);
domainStateDb.save(summaryRecord); domainStateDb.save(summaryRecord);
if (Thread.interrupted()) { if (Thread.interrupted()) {
@@ -270,11 +274,11 @@ public class CrawlerRetreiver implements AutoCloseable {
private DomainStateDb.SummaryRecord sniffRootDocument(EdgeUrl rootUrl, CrawlDelayTimer timer) { private DomainStateDb.SummaryRecord sniffRootDocument(EdgeUrl rootUrl, SimpleRobotRules robotsRules, CrawlDelayTimer timer) {
Optional<String> feedLink = Optional.empty(); Optional<String> feedLink = Optional.empty();
try { try {
var url = rootUrl.withPathAndParam("/", null); EdgeUrl url = rootUrl.withPathAndParam("/", null);
HttpFetchResult result = fetcher.fetchContent(url, warcRecorder, cookies, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED); HttpFetchResult result = fetcher.fetchContent(url, warcRecorder, cookies, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED);
timer.waitFetchDelay(0); timer.waitFetchDelay(0);
@@ -331,7 +335,7 @@ public class CrawlerRetreiver implements AutoCloseable {
if (feedLink.isEmpty()) { if (feedLink.isEmpty()) {
feedLink = guessFeedUrl(timer); feedLink = guessFeedUrl(timer, robotsRules);
} }
// Download the sitemap if available // Download the sitemap if available
@@ -339,14 +343,18 @@ public class CrawlerRetreiver implements AutoCloseable {
// Grab the favicon if it exists // Grab the favicon if it exists
if (fetcher.fetchContent(faviconUrl, warcRecorder, cookies, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED) instanceof HttpFetchResult.ResultOk iconResult) { if (robotsRules.isAllowed(faviconUrl.toString())) {
String contentType = iconResult.header("Content-Type"); if (fetcher.fetchContent(faviconUrl, warcRecorder, cookies, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED)
byte[] iconData = iconResult.getBodyBytes(); instanceof HttpFetchResult.ResultOk iconResult)
{
String contentType = iconResult.header("Content-Type");
byte[] iconData = iconResult.getBodyBytes();
domainStateDb.saveIcon( domainStateDb.saveIcon(
domain, domain,
new DomainStateDb.FaviconRecord(contentType, iconData) new DomainStateDb.FaviconRecord(contentType, iconData)
); );
}
} }
timer.waitFetchDelay(0); timer.waitFetchDelay(0);
@@ -383,7 +391,7 @@ public class CrawlerRetreiver implements AutoCloseable {
"blog/rss" "blog/rss"
); );
private Optional<String> guessFeedUrl(CrawlDelayTimer timer) throws InterruptedException { private Optional<String> guessFeedUrl(CrawlDelayTimer timer, SimpleRobotRules robotsRules) throws InterruptedException {
var oldDomainStateRecord = domainStateDb.getSummary(domain); var oldDomainStateRecord = domainStateDb.getSummary(domain);
// If we are already aware of an old feed URL, then we can just revalidate it // If we are already aware of an old feed URL, then we can just revalidate it
@@ -396,6 +404,9 @@ public class CrawlerRetreiver implements AutoCloseable {
for (String endpoint : likelyFeedEndpoints) { for (String endpoint : likelyFeedEndpoints) {
String url = "https://" + domain + "/" + endpoint; String url = "https://" + domain + "/" + endpoint;
if (!robotsRules.isAllowed(url)) {
continue;
}
if (validateFeedUrl(url, timer)) { if (validateFeedUrl(url, timer)) {
return Optional.of(url); return Optional.of(url);
} }

View File

@@ -50,6 +50,7 @@ dependencies {
implementation libs.notnull implementation libs.notnull
implementation libs.guava implementation libs.guava
implementation libs.httpclient
implementation dependencies.create(libs.guice.get()) { implementation dependencies.create(libs.guice.get()) {
exclude group: 'com.google.guava' exclude group: 'com.google.guava'
} }

View File

@@ -15,6 +15,7 @@ import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.db.DbDomainQueries; import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.db.DomainBlacklist; import nu.marginalia.db.DomainBlacklist;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.livecrawler.io.HttpClientProvider;
import nu.marginalia.loading.LoaderInputData; import nu.marginalia.loading.LoaderInputData;
import nu.marginalia.loading.documents.DocumentLoaderService; import nu.marginalia.loading.documents.DocumentLoaderService;
import nu.marginalia.loading.documents.KeywordLoaderService; import nu.marginalia.loading.documents.KeywordLoaderService;
@@ -32,12 +33,15 @@ import nu.marginalia.service.module.ServiceDiscoveryModule;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageBaseType; import nu.marginalia.storage.model.FileStorageBaseType;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.core5.io.CloseMode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.security.Security; import java.security.Security;
import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.HashMap; import java.util.HashMap;
@@ -74,7 +78,9 @@ public class LiveCrawlerMain extends ProcessMainClass {
DomainProcessor domainProcessor, DomainProcessor domainProcessor,
FileStorageService fileStorageService, FileStorageService fileStorageService,
KeywordLoaderService keywordLoaderService, KeywordLoaderService keywordLoaderService,
DocumentLoaderService documentLoaderService, DomainCoordinator domainCoordinator, HikariDataSource dataSource) DocumentLoaderService documentLoaderService,
DomainCoordinator domainCoordinator,
HikariDataSource dataSource)
throws Exception throws Exception
{ {
super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX); super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX);
@@ -148,7 +154,10 @@ public class LiveCrawlerMain extends ProcessMainClass {
} }
private void run() throws Exception { private void run() throws Exception {
Path basePath = fileStorageService.getStorageBase(FileStorageBaseType.STORAGE).asPath().resolve("live-crawl-data"); Path basePath = fileStorageService
.getStorageBase(FileStorageBaseType.STORAGE)
.asPath()
.resolve("live-crawl-data");
if (!Files.isDirectory(basePath)) { if (!Files.isDirectory(basePath)) {
Files.createDirectories(basePath); Files.createDirectories(basePath);
@@ -163,21 +172,38 @@ public class LiveCrawlerMain extends ProcessMainClass {
{ {
final Instant cutoff = Instant.now().minus(60, ChronoUnit.DAYS); final Instant cutoff = Instant.now().minus(60, ChronoUnit.DAYS);
/* ------------------------------------------------ */
/* Fetch the latest domains from the feeds database */
/* ------------------------------------------------ */
processHeartbeat.progress(LiveCrawlState.FETCH_LINKS); processHeartbeat.progress(LiveCrawlState.FETCH_LINKS);
Map<String, List<String>> urlsPerDomain = new HashMap<>(10_000); Map<String, List<String>> urlsPerDomain = new HashMap<>(10_000);
if (!feedsClient.waitReady(Duration.ofHours(1))) {
throw new RuntimeException("Feeds client never became ready, cannot proceed with live crawling");
}
feedsClient.getUpdatedDomains(cutoff, urlsPerDomain::put); feedsClient.getUpdatedDomains(cutoff, urlsPerDomain::put);
logger.info("Fetched data for {} domains", urlsPerDomain.size()); logger.info("Fetched data for {} domains", urlsPerDomain.size());
/* ------------------------------------- */
/* Prune the database from old entries */
/* ------------------------------------- */
processHeartbeat.progress(LiveCrawlState.PRUNE_DB); processHeartbeat.progress(LiveCrawlState.PRUNE_DB);
// Remove data that is too old
dataSet.prune(cutoff); dataSet.prune(cutoff);
/* ------------------------------------- */
/* Fetch the links for each domain */
/* ------------------------------------- */
processHeartbeat.progress(LiveCrawlState.CRAWLING); processHeartbeat.progress(LiveCrawlState.CRAWLING);
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainCoordinator, domainQueries, domainBlacklist); CloseableHttpClient client = HttpClientProvider.createClient();
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainCoordinator, domainQueries, client, domainBlacklist);
var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling")) var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling"))
{ {
for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) { for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) {
@@ -190,18 +216,29 @@ public class LiveCrawlerMain extends ProcessMainClass {
fetcher.scheduleRetrieval(domain, urls); fetcher.scheduleRetrieval(domain, urls);
} }
} }
finally {
client.close(CloseMode.GRACEFUL);
}
Path tempPath = dataSet.createWorkDir(); Path tempPath = dataSet.createWorkDir();
try { try {
/* ------------------------------------- */
/* Process the fetched links */
/* ------------------------------------- */
processHeartbeat.progress(LiveCrawlState.PROCESSING); processHeartbeat.progress(LiveCrawlState.PROCESSING);
try (var hb = heartbeat.createAdHocTaskHeartbeat("Processing"); try (var hb = heartbeat.createAdHocTaskHeartbeat("Processing");
var writer = new ConverterBatchWriter(tempPath, 0) var writer = new ConverterBatchWriter(tempPath, 0)
) { ) {
// Offset the documents' ordinals toward the upper range, to avoid an ID collisions with the // We need unique document ids that do not collide with the document id from the main index,
// main indexes (the maximum permissible for doc ordinal is value is 67_108_863, so this // so we offset the documents' ordinals toward the upper range.
// leaves us with a lot of headroom still) //
// The maximum permissible for doc ordinal is value is 67_108_863,
// so this leaves us with a lot of headroom still!
// Expected document count here is order of 10 :^)
writer.setOrdinalOffset(67_000_000); writer.setOrdinalOffset(67_000_000);
for (SerializableCrawlDataStream stream : hb.wrap("Processing", dataSet.getDataStreams())) { for (SerializableCrawlDataStream stream : hb.wrap("Processing", dataSet.getDataStreams())) {
@@ -209,10 +246,15 @@ public class LiveCrawlerMain extends ProcessMainClass {
} }
} }
/* ---------------------------------------------- */
/* Load the processed data into the link database */
/* and construct an index journal for the docs */
/* ---------------------------------------------- */
processHeartbeat.progress(LiveCrawlState.LOADING); processHeartbeat.progress(LiveCrawlState.LOADING);
LoaderInputData lid = new LoaderInputData(tempPath, 1); LoaderInputData lid = new LoaderInputData(tempPath, 1);
DomainIdRegistry domainIdRegistry = new DbDomainIdRegistry(dataSource); DomainIdRegistry domainIdRegistry = new DbDomainIdRegistry(dataSource);
keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, lid); keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, lid);
@@ -224,9 +266,16 @@ public class LiveCrawlerMain extends ProcessMainClass {
FileUtils.deleteDirectory(tempPath.toFile()); FileUtils.deleteDirectory(tempPath.toFile());
} }
// Construct the index
/* ------------------------------------- */
/* Finish up */
/* ------------------------------------- */
processHeartbeat.progress(LiveCrawlState.DONE); processHeartbeat.progress(LiveCrawlState.DONE);
// After we return from here, the LiveCrawlActor will trigger an index construction
// job. Unlike all the stuff we did in this process, it's identical to the real job
// so we don't need to do anything special from this process
} }
} }

View File

@@ -7,7 +7,6 @@ import nu.marginalia.contenttype.ContentType;
import nu.marginalia.contenttype.DocumentBodyToString; import nu.marginalia.contenttype.DocumentBodyToString;
import nu.marginalia.coordination.DomainCoordinator; import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.coordination.DomainLock; import nu.marginalia.coordination.DomainLock;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.retreival.CrawlDelayTimer; import nu.marginalia.crawl.retreival.CrawlDelayTimer;
import nu.marginalia.db.DbDomainQueries; import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.db.DomainBlacklist; import nu.marginalia.db.DomainBlacklist;
@@ -15,24 +14,21 @@ import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
import nu.marginalia.util.SimpleBlockingThreadPool; import nu.marginalia.util.SimpleBlockingThreadPool;
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
/** A simple link scraper that fetches URLs and stores them in a database, /** A simple link scraper that fetches URLs and stores them in a database,
* with no concept of a crawl frontier, WARC output, or other advanced features * with no concept of a crawl frontier, WARC output, or other advanced features
@@ -45,20 +41,21 @@ public class SimpleLinkScraper implements AutoCloseable {
private final LiveCrawlDataSet dataSet; private final LiveCrawlDataSet dataSet;
private final DbDomainQueries domainQueries; private final DbDomainQueries domainQueries;
private final DomainBlacklist domainBlacklist; private final DomainBlacklist domainBlacklist;
private final Duration connectTimeout = Duration.ofSeconds(10);
private final Duration readTimeout = Duration.ofSeconds(10);
private final DomainCoordinator domainCoordinator; private final DomainCoordinator domainCoordinator;
private final static int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 10 * 1024 * 1024); private final static int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 10 * 1024 * 1024);
private final HttpClient httpClient;
public SimpleLinkScraper(LiveCrawlDataSet dataSet, public SimpleLinkScraper(LiveCrawlDataSet dataSet,
DomainCoordinator domainCoordinator, DomainCoordinator domainCoordinator,
DbDomainQueries domainQueries, DbDomainQueries domainQueries,
HttpClient httpClient,
DomainBlacklist domainBlacklist) { DomainBlacklist domainBlacklist) {
this.dataSet = dataSet; this.dataSet = dataSet;
this.domainCoordinator = domainCoordinator; this.domainCoordinator = domainCoordinator;
this.domainQueries = domainQueries; this.domainQueries = domainQueries;
this.domainBlacklist = domainBlacklist; this.domainBlacklist = domainBlacklist;
this.httpClient = httpClient;
} }
public void scheduleRetrieval(EdgeDomain domain, List<String> urls) { public void scheduleRetrieval(EdgeDomain domain, List<String> urls) {
@@ -75,17 +72,19 @@ public class SimpleLinkScraper implements AutoCloseable {
EdgeUrl rootUrl = domain.toRootUrlHttps(); EdgeUrl rootUrl = domain.toRootUrlHttps();
List<EdgeUrl> relevantUrls = new ArrayList<>(); List<EdgeUrl> relevantUrls = new ArrayList<>(Math.max(1, urls.size()));
// Resolve absolute URLs
for (var url : urls) { for (var url : urls) {
Optional<EdgeUrl> optParsedUrl = lp.parseLink(rootUrl, url); Optional<EdgeUrl> optParsedUrl = lp.parseLink(rootUrl, url);
if (optParsedUrl.isEmpty()) {
if (optParsedUrl.isEmpty())
continue; continue;
}
if (dataSet.hasUrl(optParsedUrl.get())) { EdgeUrl absoluteUrl = optParsedUrl.get();
continue;
} if (!dataSet.hasUrl(absoluteUrl))
relevantUrls.add(optParsedUrl.get()); relevantUrls.add(absoluteUrl);
} }
if (relevantUrls.isEmpty()) { if (relevantUrls.isEmpty()) {
@@ -94,16 +93,10 @@ public class SimpleLinkScraper implements AutoCloseable {
int fetched = 0; int fetched = 0;
try (HttpClient client = HttpClient try (// throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove:
.newBuilder()
.connectTimeout(connectTimeout)
.followRedirects(HttpClient.Redirect.NEVER)
.version(HttpClient.Version.HTTP_2)
.build();
// throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove:
DomainLock lock = domainCoordinator.lockDomain(domain) DomainLock lock = domainCoordinator.lockDomain(domain)
) { ) {
SimpleRobotRules rules = fetchRobotsRules(rootUrl, client); SimpleRobotRules rules = fetchRobotsRules(rootUrl);
if (rules == null) { // I/O error fetching robots.txt if (rules == null) { // I/O error fetching robots.txt
// If we can't fetch the robots.txt, // If we can't fetch the robots.txt,
@@ -116,18 +109,19 @@ public class SimpleLinkScraper implements AutoCloseable {
CrawlDelayTimer timer = new CrawlDelayTimer(rules.getCrawlDelay()); CrawlDelayTimer timer = new CrawlDelayTimer(rules.getCrawlDelay());
for (var parsedUrl : relevantUrls) { for (var parsedUrl : relevantUrls) {
if (!rules.isAllowed(parsedUrl.toString())) { if (!rules.isAllowed(parsedUrl.toString())) {
maybeFlagAsBad(parsedUrl); maybeFlagAsBad(parsedUrl);
continue; continue;
} }
switch (fetchUrl(domainId, parsedUrl, timer, client)) { switch (fetchUrl(domainId, parsedUrl, timer)) {
case FetchResult.Success(int id, EdgeUrl docUrl, String body, String headers) -> { case FetchResult.Success(int id, EdgeUrl docUrl, String body, String headers) -> {
dataSet.saveDocument(id, docUrl, body, headers, ""); dataSet.saveDocument(id, docUrl, body, headers, "");
fetched++; fetched++;
} }
case FetchResult.Error(EdgeUrl docUrl) -> maybeFlagAsBad(docUrl); case FetchResult.Error(EdgeUrl docUrl) -> {
maybeFlagAsBad(docUrl);
}
} }
} }
} }
@@ -150,111 +144,107 @@ public class SimpleLinkScraper implements AutoCloseable {
} }
@Nullable @Nullable
private SimpleRobotRules fetchRobotsRules(EdgeUrl rootUrl, HttpClient client) throws IOException, InterruptedException, URISyntaxException { private SimpleRobotRules fetchRobotsRules(EdgeUrl rootUrl) throws URISyntaxException {
var robotsRequest = HttpRequest.newBuilder(rootUrl.withPathAndParam("/robots.txt", null).asURI()) ClassicHttpRequest request = ClassicRequestBuilder.get(rootUrl.withPathAndParam("/robots.txt", null).asURI())
.GET() .setHeader("User-Agent", WmsaHome.getUserAgent().uaString())
.header("User-Agent", WmsaHome.getUserAgent().uaString()) .setHeader("Accept-Encoding", "gzip")
.header("Accept-Encoding","gzip") .build();
.timeout(readTimeout);
// Fetch the robots.txt
try { try {
SimpleRobotRulesParser parser = new SimpleRobotRulesParser(); return httpClient.execute(request, rsp -> {
HttpResponse<byte[]> robotsTxt = client.send(robotsRequest.build(), HttpResponse.BodyHandlers.ofByteArray()); if (rsp.getEntity() == null) {
return null;
if (robotsTxt.statusCode() == 200) { }
return parser.parseContent(rootUrl.toString(), try {
getResponseData(robotsTxt), if (rsp.getCode() == 200) {
robotsTxt.headers().firstValue("Content-Type").orElse("text/plain"), var contentTypeHeader = rsp.getFirstHeader("Content-Type");
WmsaHome.getUserAgent().uaIdentifier()); if (contentTypeHeader == null) {
return null; // No content type header, can't parse
}
return new SimpleRobotRulesParser().parseContent(
rootUrl.toString(),
EntityUtils.toByteArray(rsp.getEntity()),
contentTypeHeader.getValue(),
WmsaHome.getUserAgent().uaIdentifier()
);
} else if (rsp.getCode() == 404) {
return new SimpleRobotRules(SimpleRobotRules.RobotRulesMode.ALLOW_ALL);
}
} finally {
EntityUtils.consumeQuietly(rsp.getEntity());
}
return null;
});
}
catch (IOException e) {
logger.error("Error fetching robots.txt for {}: {}", rootUrl, e.getMessage());
return null; // I/O error fetching robots.txt
}
finally {
try {
TimeUnit.SECONDS.sleep(1);
} }
else if (robotsTxt.statusCode() == 404) { catch (InterruptedException e) {
return new SimpleRobotRules(SimpleRobotRules.RobotRulesMode.ALLOW_ALL); Thread.currentThread().interrupt();
throw new RuntimeException(e);
} }
} }
catch (IOException ex) {
logger.error("Error fetching robots.txt for {}: {} {}", rootUrl, ex.getClass().getSimpleName(), ex.getMessage());
}
return null;
} }
/** Fetch a URL and store it in the database /** Fetch a URL and store it in the database
*/ */
private FetchResult fetchUrl(int domainId, EdgeUrl parsedUrl, CrawlDelayTimer timer, HttpClient client) throws Exception { private FetchResult fetchUrl(int domainId, EdgeUrl parsedUrl, CrawlDelayTimer timer) throws Exception {
timer.waitFetchDelay(); ClassicHttpRequest request = ClassicRequestBuilder.get(parsedUrl.asURI())
.setHeader("User-Agent", WmsaHome.getUserAgent().uaString())
HttpRequest request = HttpRequest.newBuilder(parsedUrl.asURI()) .setHeader("Accept", "text/html")
.GET() .setHeader("Accept-Encoding", "gzip")
.header("User-Agent", WmsaHome.getUserAgent().uaString())
.header("Accept", "text/html")
.header("Accept-Encoding", "gzip")
.timeout(readTimeout)
.build(); .build();
try { try {
HttpResponse<byte[]> response = client.send(request, HttpResponse.BodyHandlers.ofByteArray()); return httpClient.execute(request, rsp -> {
try {
if (rsp.getCode() == 200) {
String contentType = rsp.getFirstHeader("Content-Type").getValue();
if (!contentType.toLowerCase().startsWith("text/html")) {
return new FetchResult.Error(parsedUrl);
}
// Handle rate limiting by waiting and retrying once byte[] body = EntityUtils.toByteArray(rsp.getEntity(), MAX_SIZE);
if (response.statusCode() == 429) {
timer.waitRetryDelay(new HttpFetcherImpl.RateLimitException(
response.headers().firstValue("Retry-After").orElse("5")
));
response = client.send(request, HttpResponse.BodyHandlers.ofByteArray());
}
String contentType = response.headers().firstValue("Content-Type").orElse("").toLowerCase(); String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), body);
if (response.statusCode() == 200) { StringBuilder headersStr = new StringBuilder();
if (!contentType.toLowerCase().startsWith("text/html")) { for (var header : rsp.getHeaders()) {
return new FetchResult.Error(parsedUrl); headersStr.append(header.getName()).append(": ").append(header.getValue()).append("\n");
}
return new FetchResult.Success(domainId, parsedUrl, bodyText, headersStr.toString());
}
} finally {
if (rsp.getEntity() != null) {
EntityUtils.consumeQuietly(rsp.getEntity());
}
} }
return new FetchResult.Error(parsedUrl);
byte[] body = getResponseData(response); });
if (body.length > MAX_SIZE) {
return new FetchResult.Error(parsedUrl);
}
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), body);
return new FetchResult.Success(domainId, parsedUrl, bodyText, headersToString(response.headers()));
}
} }
catch (IOException ex) { catch (IOException e) {
// We don't want a full stack trace on every error, as it's quite common and very noisy logger.error("Error fetching {}: {}", parsedUrl, e.getMessage());
logger.error("Error fetching URL {}: {} {}", parsedUrl, ex.getClass().getSimpleName(), ex.getMessage()); // If we can't fetch the URL, we return an error result
// so that the caller can decide what to do with it.
}
finally {
timer.waitFetchDelay();
} }
return new FetchResult.Error(parsedUrl); return new FetchResult.Error(parsedUrl);
} }
private byte[] getResponseData(HttpResponse<byte[]> response) throws IOException {
String encoding = response.headers().firstValue("Content-Encoding").orElse("");
if ("gzip".equals(encoding)) {
try (var stream = new GZIPInputStream(new ByteArrayInputStream(response.body()))) {
return stream.readAllBytes();
}
}
else {
return response.body();
}
}
sealed interface FetchResult { sealed interface FetchResult {
record Success(int domainId, EdgeUrl url, String body, String headers) implements FetchResult {} record Success(int domainId, EdgeUrl url, String body, String headers) implements FetchResult {}
record Error(EdgeUrl url) implements FetchResult {} record Error(EdgeUrl url) implements FetchResult {}
} }
private String headersToString(HttpHeaders headers) {
StringBuilder headersStr = new StringBuilder();
headers.map().forEach((k, v) -> {
headersStr.append(k).append(": ").append(v).append("\n");
});
return headersStr.toString();
}
@Override @Override
public void close() throws Exception { public void close() throws Exception {
pool.shutDown(); pool.shutDown();

View File

@@ -0,0 +1,126 @@
package nu.marginalia.livecrawler.io;
import com.google.inject.Provider;
import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.cookie.StandardCookieSpec;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.core5.http.HeaderElement;
import org.apache.hc.core5.http.HeaderElements;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.io.SocketConfig;
import org.apache.hc.core5.http.message.MessageSupport;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
public class HttpClientProvider implements Provider<HttpClient> {
private static final HttpClient client;
private static PoolingHttpClientConnectionManager connectionManager;
private static final Logger logger = LoggerFactory.getLogger(HttpClientProvider.class);
static {
try {
client = createClient();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static CloseableHttpClient createClient() throws NoSuchAlgorithmException, KeyManagementException {
final ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setSocketTimeout(15, TimeUnit.SECONDS)
.setConnectTimeout(15, TimeUnit.SECONDS)
.setValidateAfterInactivity(TimeValue.ofSeconds(5))
.build();
connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
.setMaxConnPerRoute(2)
.setMaxConnTotal(50)
.setDefaultConnectionConfig(connectionConfig)
.build();
connectionManager.setDefaultSocketConfig(SocketConfig.custom()
.setSoLinger(TimeValue.ofSeconds(-1))
.setSoTimeout(Timeout.ofSeconds(10))
.build()
);
Thread.ofPlatform().daemon(true).start(() -> {
try {
for (;;) {
TimeUnit.SECONDS.sleep(15);
logger.info("Connection pool stats: {}", connectionManager.getTotalStats());
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
final RequestConfig defaultRequestConfig = RequestConfig.custom()
.setCookieSpec(StandardCookieSpec.IGNORE)
.setResponseTimeout(10, TimeUnit.SECONDS)
.setConnectionRequestTimeout(5, TimeUnit.MINUTES)
.build();
return HttpClients.custom()
.setConnectionManager(connectionManager)
.setRetryStrategy(new RetryStrategy())
.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
// Default keep-alive duration is 3 minutes, but this is too long for us,
// as we are either going to re-use it fairly quickly or close it for a long time.
//
// So we set it to 30 seconds or clamp the server-provided value to a minimum of 10 seconds.
private static final TimeValue defaultValue = TimeValue.ofSeconds(30);
@Override
public TimeValue getKeepAliveDuration(HttpResponse response, HttpContext context) {
final Iterator<HeaderElement> it = MessageSupport.iterate(response, HeaderElements.KEEP_ALIVE);
while (it.hasNext()) {
final HeaderElement he = it.next();
final String param = he.getName();
final String value = he.getValue();
if (value == null)
continue;
if (!"timeout".equalsIgnoreCase(param))
continue;
try {
long timeout = Long.parseLong(value);
timeout = Math.clamp(timeout, 30, defaultValue.toSeconds());
return TimeValue.ofSeconds(timeout);
} catch (final NumberFormatException ignore) {
break;
}
}
return defaultValue;
}
})
.disableRedirectHandling()
.setDefaultRequestConfig(defaultRequestConfig)
.build();
}
@Override
public HttpClient get() {
return client;
}
}

View File

@@ -0,0 +1,79 @@
package nu.marginalia.livecrawler.io;
import org.apache.hc.client5.http.HttpHostConnectException;
import org.apache.hc.client5.http.HttpRequestRetryStrategy;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.util.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
import java.io.IOException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
public class RetryStrategy implements HttpRequestRetryStrategy {
private static final Logger logger = LoggerFactory.getLogger(RetryStrategy.class);
@Override
public boolean retryRequest(HttpRequest request, IOException exception, int executionCount, HttpContext context) {
return switch (exception) {
case SocketTimeoutException ste -> false;
case SSLException ssle -> false;
case UnknownHostException uhe -> false;
case HttpHostConnectException ex -> executionCount < 2;
case SocketException ex -> executionCount < 2;
default -> executionCount <= 3;
};
}
@Override
public boolean retryRequest(HttpResponse response, int executionCount, HttpContext context) {
return switch (response.getCode()) {
case 500, 503 -> executionCount <= 2;
case 429 -> executionCount <= 3;
default -> false;
};
}
@Override
public TimeValue getRetryInterval(HttpRequest request, IOException exception, int executionCount, HttpContext context) {
return TimeValue.ofSeconds(1);
}
@Override
public TimeValue getRetryInterval(HttpResponse response, int executionCount, HttpContext context) {
int statusCode = response.getCode();
// Give 503 a bit more time
if (statusCode == 503) return TimeValue.ofSeconds(5);
if (statusCode == 429) {
// get the Retry-After header
var retryAfterHeader = response.getFirstHeader("Retry-After");
if (retryAfterHeader == null) {
return TimeValue.ofSeconds(3);
}
String retryAfter = retryAfterHeader.getValue();
if (retryAfter == null) {
return TimeValue.ofSeconds(2);
}
try {
int retryAfterTime = Integer.parseInt(retryAfter);
retryAfterTime = Math.clamp(retryAfterTime, 1, 5);
return TimeValue.ofSeconds(retryAfterTime);
} catch (NumberFormatException e) {
logger.warn("Invalid Retry-After header: {}", retryAfter);
}
}
return TimeValue.ofSeconds(2);
}
}

View File

@@ -3,10 +3,13 @@ package nu.marginalia.livecrawler;
import nu.marginalia.coordination.LocalDomainCoordinator; import nu.marginalia.coordination.LocalDomainCoordinator;
import nu.marginalia.db.DomainBlacklistImpl; import nu.marginalia.db.DomainBlacklistImpl;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.livecrawler.io.HttpClientProvider;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.crawldata.CrawledDocument; import nu.marginalia.model.crawldata.CrawledDocument;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.core5.io.CloseMode;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@@ -16,29 +19,34 @@ import org.mockito.Mockito;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List; import java.util.List;
class SimpleLinkScraperTest { class SimpleLinkScraperTest {
private Path tempDir; private Path tempDir;
private LiveCrawlDataSet dataSet; private LiveCrawlDataSet dataSet;
private CloseableHttpClient httpClient;
@BeforeEach @BeforeEach
public void setUp() throws IOException, SQLException { public void setUp() throws IOException, SQLException, NoSuchAlgorithmException, KeyManagementException {
tempDir = Files.createTempDirectory(getClass().getSimpleName()); tempDir = Files.createTempDirectory(getClass().getSimpleName());
dataSet = new LiveCrawlDataSet(tempDir); dataSet = new LiveCrawlDataSet(tempDir);
httpClient = HttpClientProvider.createClient();
} }
@AfterEach @AfterEach
public void tearDown() throws Exception { public void tearDown() throws Exception {
dataSet.close(); dataSet.close();
httpClient.close(CloseMode.IMMEDIATE);
FileUtils.deleteDirectory(tempDir.toFile()); FileUtils.deleteDirectory(tempDir.toFile());
} }
@Test @Test
public void testRetrieveNow() throws Exception { public void testRetrieveNow() throws Exception {
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(), null, Mockito.mock(DomainBlacklistImpl.class)); var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(), null, httpClient, Mockito.mock(DomainBlacklistImpl.class));
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/")); int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
Assertions.assertEquals(1, fetched); Assertions.assertEquals(1, fetched);
@@ -58,7 +66,7 @@ class SimpleLinkScraperTest {
@Test @Test
public void testRetrieveNow_Redundant() throws Exception { public void testRetrieveNow_Redundant() throws Exception {
dataSet.saveDocument(1, new EdgeUrl("https://www.marginalia.nu/"), "<html>", "", "127.0.0.1"); dataSet.saveDocument(1, new EdgeUrl("https://www.marginalia.nu/"), "<html>", "", "127.0.0.1");
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(),null, Mockito.mock(DomainBlacklistImpl.class)); var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(),null, httpClient, Mockito.mock(DomainBlacklistImpl.class));
// If the requested URL is already in the dataSet, we retrieveNow should shortcircuit and not fetch anything // If the requested URL is already in the dataSet, we retrieveNow should shortcircuit and not fetch anything
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/")); int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));

View File

@@ -0,0 +1,12 @@
The new domain process (NDP) is a process that evaluates new domains for
inclusion in the search engine index.
It visits the root document of each candidate domain, ensures that it's reachable,
verifies that the response is valid HTML, and checks for a few factors such as length
and links before deciding whether to assign the domain to a node.
The NDP process will assign new domains to the node with the fewest assigned domains.
The NDP process is triggered with a goal target number of domains to process, and
will find domains until that target is reached. If e.g. a goal of 100 is set,
and 50 are in the index, it will find 50 more domains.

View File

@@ -0,0 +1,75 @@
plugins {
id 'java'
id 'application'
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
}
}
application {
mainClass = 'nu.marginalia.ping.PingMain'
applicationName = 'ping-process'
}
tasks.distZip.enabled = false
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation project(':code:common:db')
implementation project(':code:common:model')
implementation project(':code:common:config')
implementation project(':code:common:service')
implementation project(':code:libraries:domain-lock')
implementation project(':code:libraries:geo-ip')
implementation project(':code:libraries:message-queue')
implementation project(':code:libraries:blocking-thread-pool')
implementation project(':code:functions:link-graph:api')
implementation project(':code:processes:process-mq-api')
implementation project(':code:processes:crawling-process:ft-content-type')
implementation project(':code:processes:crawling-process:ft-link-parser')
implementation libs.bundles.slf4j
implementation libs.notnull
implementation libs.guava
implementation dependencies.create(libs.guice.get()) {
exclude group: 'com.google.guava'
}
implementation libs.gson
implementation libs.zstd
implementation libs.bucket4j
implementation libs.crawlercommons
implementation libs.jsoup
implementation libs.fastutil
implementation libs.bundles.curator
implementation libs.bundles.mariadb
implementation libs.bundles.httpcomponents
implementation libs.commons.lang3
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
testImplementation libs.wiremock
testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4')
testImplementation libs.commons.codec
testImplementation 'org.testcontainers:mariadb:1.17.4'
testImplementation 'org.testcontainers:junit-jupiter:1.17.4'
testImplementation project(':code:libraries:test-helpers')
testImplementation project(':code:processes:test-data')
}

View File

@@ -0,0 +1,146 @@
package nu.marginalia.ndp;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.WmsaHome;
import nu.marginalia.contenttype.ContentType;
import nu.marginalia.contenttype.DocumentBodyToString;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.ndp.io.HttpClientProvider;
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/** Evaluates a domain to determine if it is worth indexing.
* This class fetches the root document, checks the response code, content type,
* and parses the HTML to ensure it smells alright.
*/
@Singleton
public class DomainEvaluator {
private final HttpClient client;
private final String userAgentString = WmsaHome.getUserAgent().uaString();
private final LinkParser linkParser = new LinkParser();
private final DomainCoordinator domainCoordinator;
@Inject
public DomainEvaluator(DomainCoordinator domainCoordinator) throws NoSuchAlgorithmException, KeyManagementException {
this.domainCoordinator = domainCoordinator;
client = HttpClientProvider.createClient();
}
public boolean evaluateDomain(String domainName) {
var edgeDomain = new EdgeDomain(domainName);
// Grab a lock on the domain to prevent concurrent evaluations between processes
try (var lock = domainCoordinator.lockDomain(edgeDomain)) {
var rootUrl = edgeDomain.toRootUrlHttps();
var request = ClassicRequestBuilder.get(rootUrl.asURI())
.addHeader("User-Agent", userAgentString)
.addHeader("Accept-Encoding", "gzip")
.addHeader("Accept", "text/html,application/xhtml+xml;q=0.9")
.build();
return client.execute(request, (rsp) -> {
if (rsp.getEntity() == null)
return false;
try {
// Check if the response code indicates a successful fetch
if (200 != rsp.getCode()) {
return false;
}
byte[] content;
// Read the content from the response entity
try (InputStream contentStream = rsp.getEntity().getContent()) {
content = contentStream.readNBytes(8192);
}
// Parse the content (if it's valid)
ContentType contentType = ContentType.parse(rsp.getEntity().getContentType());
// Validate the content type
if (!contentType.contentType().startsWith("text/html") && !contentType.contentType().startsWith("application/xhtml+xml"))
return false;
// Parse the document body to a Jsoup Document
final Document document = Jsoup.parse(DocumentBodyToString.getStringData(contentType, content));
final String text = document.body().text();
if (text.length() < 100)
return false;
if (text.contains("404 Not Found") || text.contains("Page not found"))
return false;
if (hasMetaRefresh(document))
return false; // This almost always indicates a parked domain
if (!hasInternalLink(document, edgeDomain, rootUrl))
return false; // No internal links means it's not worth indexing
return true;
}
catch (Exception e) {
return false;
}
finally {
// May or may not be necessary, but let's ensure we clean up the response entity
// to avoid resource leaks
EntityUtils.consumeQuietly(rsp.getEntity());
// Sleep for a while before yielding the lock, to avoid immediately hammering the domain
// from another process
sleepQuietly(Duration.ofSeconds(1));
}
});
}
catch (Exception ex) {
return false; // If we fail to fetch or parse the domain, we consider it invalid
}
}
private boolean hasInternalLink(Document document, EdgeDomain currentDomain, EdgeUrl rootUrl) {
for (Element atag : document.select("a")) {
Optional<EdgeDomain> destDomain = linkParser
.parseLink(rootUrl, atag)
.map(EdgeUrl::getDomain);
if (destDomain.isPresent() && Objects.equals(currentDomain, destDomain.get()))
return true;
}
return false;
}
private boolean hasMetaRefresh(Document document) {
for (Element metaTag : document.select("meta")) {
if ("refresh".equalsIgnoreCase(metaTag.attr("http-equiv")))
return true;
}
return false;
}
private void sleepQuietly(Duration duration) {
try {
TimeUnit.MILLISECONDS.sleep(duration.toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -0,0 +1,134 @@
package nu.marginalia.ndp;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.nodecfg.NodeConfigurationService;
import org.jetbrains.annotations.NotNull;
import java.util.HashSet;
import java.util.PriorityQueue;
import java.util.Set;
/** DomainAllocator is responsible for assigning domains to partitions/nodes.
* This is ensured to make sure that domains are evenly distributed across the nodes.
*/
public class DomainNodeAllocator {
private final NodeConfigurationService nodeConfigurationService;
private final HikariDataSource dataSource;
private final PriorityQueue<NodeCount> countPerNode = new PriorityQueue<>();
private volatile boolean initialized = false;
private record NodeCount(int nodeId, int count)
implements Comparable<NodeCount>
{
public NodeCount incrementCount() {
return new NodeCount(nodeId, count + 1);
}
@Override
public int compareTo(@NotNull DomainNodeAllocator.NodeCount o) {
return Integer.compare(this.count, o.count);
}
}
@Inject
public DomainNodeAllocator(NodeConfigurationService nodeConfigurationService, HikariDataSource dataSource) {
this.nodeConfigurationService = nodeConfigurationService;
this.dataSource = dataSource;
Thread.ofPlatform()
.name("DomainNodeAllocator::initialize()")
.start(this::initialize);
}
public synchronized int totalCount() {
ensureInitialized();
return countPerNode.stream().mapToInt(NodeCount::count).sum();
}
/** Returns the next node ID to assign a domain to.
* This method is synchronized to ensure thread safety when multiple threads are allocating domains.
* The node ID returned is guaranteed to be one of the viable nodes configured in the system.
*/
public synchronized int nextNodeId() {
ensureInitialized();
// Synchronized is fine here as this is not a hot path
// (and PriorityBlockingQueue won't help since we're re-adding the same element with a new count all the time)
NodeCount allocation = countPerNode.remove();
countPerNode.add(allocation.incrementCount());
return allocation.nodeId();
}
private void ensureInitialized() {
if (initialized) return;
synchronized (this) {
while (!initialized) {
try {
// Wait until the initialization is complete
this.wait(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("DomainAllocator initialization interrupted", e);
}
}
}
}
public void initialize() {
if (initialized) return;
Set<Integer> viableNodes = new HashSet<>();
// Find all viable nodes that can handle batch crawls
for (var node : nodeConfigurationService.getAll()) {
if (node.disabled())
continue;
if (!node.autoAssignDomains())
continue;
if (node.profile().permitBatchCrawl())
viableNodes.add(node.node());
}
// Fetch the current counts of domains per node from the database
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT COUNT(*) AS CNT, NODE_AFFINITY
FROM EC_DOMAIN
WHERE NODE_AFFINITY>0
GROUP BY NODE_AFFINITY
"""))
{
var rs = stmt.executeQuery();
while (rs.next()) {
int nodeId = rs.getInt("NODE_AFFINITY");
int count = rs.getInt("CNT");
if (viableNodes.remove(nodeId)) {
countPerNode.add(new NodeCount(nodeId, count));
}
}
} catch (Exception e) {
throw new RuntimeException("Failed to load domain counts from database", e);
}
// Add any remaining viable nodes that were not found in the database
for (int nodeId : viableNodes) {
countPerNode.add(new NodeCount(nodeId, 0));
}
initialized = true;
}
}

View File

@@ -0,0 +1,240 @@
package nu.marginalia.ndp;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import nu.marginalia.api.linkgraph.AggregateLinkGraphClient;
import nu.marginalia.ndp.model.DomainToTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
public class DomainTestingQueue {
private static Logger logger = LoggerFactory.getLogger(DomainTestingQueue.class);
private final ArrayBlockingQueue<DomainToTest> queue = new ArrayBlockingQueue<>(2);
// This will grow quite large, but should be manageable in memory, as theoretical maximum is around 100M domains,
// order of 2 GB in memory.
private final ConcurrentHashMap<String, Boolean> takenDomains = new ConcurrentHashMap<>();
private final HikariDataSource dataSource;
private final AggregateLinkGraphClient linkGraphClient;
@Inject
public DomainTestingQueue(HikariDataSource dataSource,
AggregateLinkGraphClient linkGraphClient
) {
this.dataSource = dataSource;
this.linkGraphClient = linkGraphClient;
Thread.ofPlatform()
.name("DomainTestingQueue::fetch()")
.start(this::fetch);
}
public DomainToTest next() throws InterruptedException {
return queue.take();
}
public void accept(DomainToTest domain, int nodeId) {
try (var conn = dataSource.getConnection();
var flagOkStmt = conn.prepareStatement("""
UPDATE NDP_NEW_DOMAINS
SET STATE='ACCEPTED'
WHERE DOMAIN_ID=?
""");
var assignNodeStmt = conn.prepareStatement("""
UPDATE EC_DOMAIN SET NODE_AFFINITY=?
WHERE ID=?
AND EC_DOMAIN.NODE_AFFINITY < 0
""")
)
{
conn.setAutoCommit(false);
flagOkStmt.setInt(1, domain.domainId());
flagOkStmt.executeUpdate();
assignNodeStmt.setInt(1, nodeId);
assignNodeStmt.setInt(2, domain.domainId());
assignNodeStmt.executeUpdate();
conn.commit();
} catch (Exception e) {
throw new RuntimeException("Failed to accept domain in database", e);
}
}
public void reject(DomainToTest domain) {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
UPDATE NDP_NEW_DOMAINS
SET STATE='REJECTED', CHECK_COUNT=CHECK_COUNT + 1
WHERE DOMAIN_ID=?
"""))
{
conn.setAutoCommit(false);
stmt.setInt(1, domain.domainId());
stmt.executeUpdate();
conn.commit();
} catch (Exception e) {
throw new RuntimeException("Failed to reject domain in database", e);
}
}
public void fetch() {
while (true) {
List<DomainToTest> domains = new ArrayList<>(2000);
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT DOMAIN_ID, DOMAIN_NAME
FROM NDP_NEW_DOMAINS
INNER JOIN EC_DOMAIN ON ID=DOMAIN_ID
WHERE NDP_NEW_DOMAINS.STATE = 'NEW'
ORDER BY PRIORITY DESC
LIMIT 2000
"""))
{
var rs = stmt.executeQuery();
while (rs.next()) {
int domainId = rs.getInt("DOMAIN_ID");
String domainName = rs.getString("DOMAIN_NAME");
if (takenDomains.put(domainName, true) != null) {
logger.warn("Domain {} is already processed, skipping", domainName);
continue; // Skip if already taken
}
domains.add(new DomainToTest(domainName, domainId));
}
if (domains.isEmpty()) {
if (!refreshQueue(conn)) {
throw new RuntimeException("No new domains found, aborting!");
}
}
}
catch (RuntimeException e) {
throw e; // Rethrow runtime exceptions to avoid wrapping them in another runtime exception
}
catch (Exception e) {
throw new RuntimeException("Failed to fetch domains from database", e);
}
try {
for (var domain : domains) {
queue.put(domain);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Domain fetching interrupted", e);
}
}
}
private boolean refreshQueue(Connection conn) {
logger.info("Refreshing domain queue in database");
Int2IntMap domainIdToCount = new Int2IntOpenHashMap();
// Load known domain IDs from the database to avoid inserting duplicates from NDP_NEW_DOMAINS
// or domains that are already assigned to a node
{
IntOpenHashSet knownIds = new IntOpenHashSet();
try (var stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("SELECT DOMAIN_ID FROM NDP_NEW_DOMAINS");
rs.setFetchSize(10_000);
while (rs.next()) {
int domainId = rs.getInt("DOMAIN_ID");
knownIds.add(domainId);
}
rs = stmt.executeQuery("SELECT ID FROM EC_DOMAIN WHERE NODE_AFFINITY>=0");
rs.setFetchSize(10_000);
while (rs.next()) {
int domainId = rs.getInt("ID");
knownIds.add(domainId);
}
} catch (Exception e) {
throw new RuntimeException("Failed to load known domain IDs from database", e);
}
// Ensure the link graph is ready before proceeding. This is mainly necessary in a cold reboot
// of the entire system.
try {
logger.info("Waiting for link graph client to be ready...");
linkGraphClient.waitReady(Duration.ofHours(1));
logger.info("Link graph client is ready, fetching domain links...");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// Fetch all domain links from the link graph and count by how many sources each dest domain is linked from
var iter = linkGraphClient.getAllDomainLinks().iterator();
while (iter.advance()) {
int dest = iter.dest();
if (!knownIds.contains(dest)) {
domainIdToCount.mergeInt(dest, 1, (i, j) -> i + j);
}
}
}
boolean didInsert = false;
/* Insert new domains into NDP_NEW_DOMAINS table */
try (var insertStmt = conn.prepareStatement("""
INSERT IGNORE INTO NDP_NEW_DOMAINS (DOMAIN_ID, PRIORITY) VALUES (?, ?)
""")) {
conn.setAutoCommit(false);
int cnt = 0;
for (var entry : domainIdToCount.int2IntEntrySet()) {
int domainId = entry.getIntKey();
int count = entry.getIntValue();
insertStmt.setInt(1, domainId);
insertStmt.setInt(2, count);
insertStmt.addBatch();
if (++cnt >= 1000) {
cnt = 0;
insertStmt.executeBatch(); // Execute in batches to avoid memory issues
conn.commit();
didInsert = true;
}
}
if (cnt != 0) {
insertStmt.executeBatch(); // Execute any remaining batch
conn.commit();
didInsert = true;
}
logger.info("Queue refreshed successfully");
} catch (Exception e) {
throw new RuntimeException("Failed to refresh queue in database", e);
}
// Clean up NDP_NEW_DOMAINS table to remove any domains that are already in EC_DOMAIN
// This acts not only to clean up domains that we've flagged as ACCEPTED, but also to
// repair inconsistent states where domains might have incorrectly been added to NDP_NEW_DOMAINS
try (var stmt = conn.createStatement()) {
stmt.executeUpdate("DELETE FROM NDP_NEW_DOMAINS WHERE DOMAIN_ID IN (SELECT ID FROM EC_DOMAIN WHERE NODE_AFFINITY>=0)");
}
catch (Exception e) {
throw new RuntimeException("Failed to clean up NDP_NEW_DOMAINS", e);
}
return didInsert;
}
}

View File

@@ -0,0 +1,159 @@
package nu.marginalia.ndp;
import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.WmsaHome;
import nu.marginalia.coordination.DomainCoordinationModule;
import nu.marginalia.db.DomainBlacklist;
import nu.marginalia.geoip.GeoIpDictionary;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqapi.ndp.NdpRequest;
import nu.marginalia.ndp.model.DomainToTest;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.module.ServiceDiscoveryModule;
import nu.marginalia.util.SimpleBlockingThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.Security;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class NdpMain extends ProcessMainClass {
private static final Logger logger = LoggerFactory.getLogger(NdpMain.class);
private final DomainNodeAllocator domainNodeAllocator;
private final DomainTestingQueue domainTestingQueue;
private final ProcessHeartbeat processHeartbeat;
private final DomainEvaluator domainEvaluator;
private final DomainBlacklist domainBlacklist;
private final AtomicInteger domainCount = new AtomicInteger(0);
@Inject
public NdpMain(MessageQueueFactory messageQueueFactory,
ProcessConfiguration config,
DomainNodeAllocator domainNodeAllocator,
DomainTestingQueue domainTestingQueue,
DomainEvaluator domainEvaluator,
DomainBlacklist domainBlacklist,
ProcessHeartbeat processHeartbeat,
Gson gson)
{
super(messageQueueFactory, config, gson, ProcessInboxNames.NDP_INBOX);
this.domainNodeAllocator = domainNodeAllocator;
this.domainEvaluator = domainEvaluator;
this.domainBlacklist = domainBlacklist;
this.domainTestingQueue = domainTestingQueue;
this.processHeartbeat = processHeartbeat;
}
public void run(int goalCount) throws InterruptedException {
logger.info("Wait for blacklist to load...");
domainBlacklist.waitUntilLoaded();
SimpleBlockingThreadPool threadPool = new SimpleBlockingThreadPool(
"NDP-Worker",
8,
10,
SimpleBlockingThreadPool.ThreadType.PLATFORM
);
logger.info("Starting NDP process");
int toInsertCount = goalCount - domainNodeAllocator.totalCount();
if (toInsertCount <= 0) {
logger.info("No new domains to process. Current count: " + domainNodeAllocator.totalCount());
return;
}
try (var hb = processHeartbeat.createAdHocTaskHeartbeat("Growing Index")) {
int cnt;
while ((cnt = domainCount.get()) < toInsertCount) {
if (cnt % 100 == 0) {
hb.progress("Discovery Process", cnt, toInsertCount);
}
final DomainToTest nextDomain = domainTestingQueue.next();
threadPool.submit(() -> {
try {
if (domainEvaluator.evaluateDomain(nextDomain.domainName())) {
logger.info("Accepting: {}", nextDomain.domainName());
domainCount.incrementAndGet();
domainTestingQueue.accept(nextDomain, domainNodeAllocator.nextNodeId());
} else {
logger.info("Rejecting: {}", nextDomain.domainName());
domainTestingQueue.reject(nextDomain);
}
}
catch (Exception e) {
domainTestingQueue.reject(nextDomain);
logger.error("Error evaluating domain: " + nextDomain.domainId(), e);
}
});
}
}
threadPool.shutDown();
// Wait for all tasks to complete or give up after 1 hour
threadPool.awaitTermination(1, TimeUnit.HOURS);
logger.info("NDP process completed. Total domains processed: " + domainCount.get());
}
public static void main(String[] args) throws Exception {
// Prevent Java from caching DNS lookups forever (filling up the system RAM as a result)
Security.setProperty("networkaddress.cache.ttl" , "3600");
// This must run *early*
System.setProperty("http.agent", WmsaHome.getUserAgent().uaString());
// If these aren't set properly, the JVM will hang forever on some requests
System.setProperty("sun.net.client.defaultConnectTimeout", "30000");
System.setProperty("sun.net.client.defaultReadTimeout", "30000");
// Set the maximum number of connections to keep alive in the connection pool
System.setProperty("jdk.httpclient.idleTimeout", "15"); // 15 seconds
System.setProperty("jdk.httpclient.connectionPoolSize", "256");
// We don't want to use too much memory caching sessions for https
System.setProperty("javax.net.ssl.sessionCacheSize", "2048");
Injector injector = Guice.createInjector(
new NdpModule(),
new ServiceDiscoveryModule(),
new DomainCoordinationModule(),
new ProcessConfigurationModule("ndp"),
new DatabaseModule(false)
);
GeoIpDictionary geoIpDictionary = injector.getInstance(GeoIpDictionary.class);
geoIpDictionary.waitReady(); // Ensure the GeoIpDictionary is ready before proceeding
NdpMain main = injector.getInstance(NdpMain.class);
var instructions = main.fetchInstructions(NdpRequest.class);
try {
main.run(instructions.value().goal());
instructions.ok();
}
catch (Throwable ex) {
logger.error("Error running ping process", ex);
instructions.err();
}
}
}

View File

@@ -0,0 +1,8 @@
package nu.marginalia.ndp;
import com.google.inject.AbstractModule;
public class NdpModule extends AbstractModule {
public void configure() {
}
}

View File

@@ -0,0 +1,126 @@
package nu.marginalia.ndp.io;
import com.google.inject.Provider;
import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.cookie.StandardCookieSpec;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.core5.http.HeaderElement;
import org.apache.hc.core5.http.HeaderElements;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.io.SocketConfig;
import org.apache.hc.core5.http.message.MessageSupport;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
public class HttpClientProvider implements Provider<HttpClient> {
private static final HttpClient client;
private static PoolingHttpClientConnectionManager connectionManager;
private static final Logger logger = LoggerFactory.getLogger(HttpClientProvider.class);
static {
try {
client = createClient();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static CloseableHttpClient createClient() throws NoSuchAlgorithmException, KeyManagementException {
final ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setSocketTimeout(15, TimeUnit.SECONDS)
.setConnectTimeout(15, TimeUnit.SECONDS)
.setValidateAfterInactivity(TimeValue.ofSeconds(5))
.build();
connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
.setMaxConnPerRoute(2)
.setMaxConnTotal(50)
.setDefaultConnectionConfig(connectionConfig)
.build();
connectionManager.setDefaultSocketConfig(SocketConfig.custom()
.setSoLinger(TimeValue.ofSeconds(-1))
.setSoTimeout(Timeout.ofSeconds(10))
.build()
);
Thread.ofPlatform().daemon(true).start(() -> {
try {
for (;;) {
TimeUnit.SECONDS.sleep(15);
logger.info("Connection pool stats: {}", connectionManager.getTotalStats());
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
final RequestConfig defaultRequestConfig = RequestConfig.custom()
.setCookieSpec(StandardCookieSpec.IGNORE)
.setResponseTimeout(10, TimeUnit.SECONDS)
.setConnectionRequestTimeout(5, TimeUnit.MINUTES)
.build();
return HttpClients.custom()
.setConnectionManager(connectionManager)
.setRetryStrategy(new RetryStrategy())
.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
// Default keep-alive duration is 3 minutes, but this is too long for us,
// as we are either going to re-use it fairly quickly or close it for a long time.
//
// So we set it to 30 seconds or clamp the server-provided value to a minimum of 10 seconds.
private static final TimeValue defaultValue = TimeValue.ofSeconds(30);
@Override
public TimeValue getKeepAliveDuration(HttpResponse response, HttpContext context) {
final Iterator<HeaderElement> it = MessageSupport.iterate(response, HeaderElements.KEEP_ALIVE);
while (it.hasNext()) {
final HeaderElement he = it.next();
final String param = he.getName();
final String value = he.getValue();
if (value == null)
continue;
if (!"timeout".equalsIgnoreCase(param))
continue;
try {
long timeout = Long.parseLong(value);
timeout = Math.clamp(timeout, 30, defaultValue.toSeconds());
return TimeValue.ofSeconds(timeout);
} catch (final NumberFormatException ignore) {
break;
}
}
return defaultValue;
}
})
.disableRedirectHandling()
.setDefaultRequestConfig(defaultRequestConfig)
.build();
}
@Override
public HttpClient get() {
return client;
}
}

View File

@@ -0,0 +1,79 @@
package nu.marginalia.ndp.io;
import org.apache.hc.client5.http.HttpHostConnectException;
import org.apache.hc.client5.http.HttpRequestRetryStrategy;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.util.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
import java.io.IOException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
public class RetryStrategy implements HttpRequestRetryStrategy {
private static final Logger logger = LoggerFactory.getLogger(RetryStrategy.class);
@Override
public boolean retryRequest(HttpRequest request, IOException exception, int executionCount, HttpContext context) {
return switch (exception) {
case SocketTimeoutException ste -> false;
case SSLException ssle -> false;
case UnknownHostException uhe -> false;
case HttpHostConnectException ex -> executionCount < 2;
case SocketException ex -> executionCount < 2;
default -> executionCount <= 3;
};
}
@Override
public boolean retryRequest(HttpResponse response, int executionCount, HttpContext context) {
return switch (response.getCode()) {
case 500, 503 -> executionCount <= 2;
case 429 -> executionCount <= 3;
default -> false;
};
}
@Override
public TimeValue getRetryInterval(HttpRequest request, IOException exception, int executionCount, HttpContext context) {
return TimeValue.ofSeconds(1);
}
@Override
public TimeValue getRetryInterval(HttpResponse response, int executionCount, HttpContext context) {
int statusCode = response.getCode();
// Give 503 a bit more time
if (statusCode == 503) return TimeValue.ofSeconds(5);
if (statusCode == 429) {
// get the Retry-After header
var retryAfterHeader = response.getFirstHeader("Retry-After");
if (retryAfterHeader == null) {
return TimeValue.ofSeconds(3);
}
String retryAfter = retryAfterHeader.getValue();
if (retryAfter == null) {
return TimeValue.ofSeconds(2);
}
try {
int retryAfterTime = Integer.parseInt(retryAfter);
retryAfterTime = Math.clamp(retryAfterTime, 1, 5);
return TimeValue.ofSeconds(retryAfterTime);
} catch (NumberFormatException e) {
logger.warn("Invalid Retry-After header: {}", retryAfter);
}
}
return TimeValue.ofSeconds(2);
}
}

View File

@@ -0,0 +1,4 @@
package nu.marginalia.ndp.model;
public record DomainToTest(String domainName, int domainId) {
}

View File

@@ -0,0 +1,29 @@
package nu.marginalia.ndp;
import nu.marginalia.coordination.LocalDomainCoordinator;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
class DomainEvaluatorTest {
@Tag("flaky") // Exclude from CI runs due to potential network issues
@Test
public void testSunnyDay() throws NoSuchAlgorithmException, KeyManagementException {
DomainEvaluator evaluator = new DomainEvaluator(new LocalDomainCoordinator());
// Should be a valid domain
assertTrue(evaluator.evaluateDomain("www.marginalia.nu"));
// Should be a redirect to www.marginalia.nu
assertFalse(evaluator.evaluateDomain("memex.marginalia.nu"));
// Should fail on Anubis
assertFalse(evaluator.evaluateDomain("marginalia-search.com"));
}
}

View File

@@ -0,0 +1,12 @@
The ping process (which has nothing to do with ICMP ping) keeps track of
the aliveness of websites. It also gathers fingerprint information about
the security posture of the website, as well as DNS information.
This is kept to build an idea of when a website is down, and to identify
ownership changes, as well as other significant events in the lifecycle
of a website.
# Central Classes
* [PingMain](java/nu/marginalia/ping/PingMain.java) main class.
* [PingJobScheduler](java/nu/marginalia/ping/PingJobScheduler.java) service that dispatches pings.

View File

@@ -112,7 +112,7 @@ public class HttpClientProvider implements Provider<HttpClient> {
}); });
final RequestConfig defaultRequestConfig = RequestConfig.custom() final RequestConfig defaultRequestConfig = RequestConfig.custom()
.setCookieSpec(StandardCookieSpec.RELAXED) .setCookieSpec(StandardCookieSpec.IGNORE)
.setResponseTimeout(10, TimeUnit.SECONDS) .setResponseTimeout(10, TimeUnit.SECONDS)
.setConnectionRequestTimeout(5, TimeUnit.MINUTES) .setConnectionRequestTimeout(5, TimeUnit.MINUTES)
.build(); .build();

View File

@@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException; import javax.net.ssl.SSLException;
import java.io.IOException; import java.io.IOException;
import java.net.SocketException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
@@ -23,7 +24,8 @@ public class RetryStrategy implements HttpRequestRetryStrategy {
case SocketTimeoutException ste -> false; case SocketTimeoutException ste -> false;
case SSLException ssle -> false; case SSLException ssle -> false;
case UnknownHostException uhe -> false; case UnknownHostException uhe -> false;
case HttpHostConnectException ex -> executionCount <= 2; // Only retry once for connection errors case HttpHostConnectException ex -> executionCount < 2;
case SocketException ex -> executionCount < 2;
default -> executionCount <= 3; default -> executionCount <= 3;
}; };
} }

View File

@@ -4,6 +4,7 @@ public class ProcessInboxNames {
public static final String CONVERTER_INBOX = "converter"; public static final String CONVERTER_INBOX = "converter";
public static final String LOADER_INBOX = "loader"; public static final String LOADER_INBOX = "loader";
public static final String PING_INBOX = "ping"; public static final String PING_INBOX = "ping";
public static final String NDP_INBOX = "ndp";
public static final String CRAWLER_INBOX = "crawler"; public static final String CRAWLER_INBOX = "crawler";
public static final String LIVE_CRAWLER_INBOX = "live-crawler"; public static final String LIVE_CRAWLER_INBOX = "live-crawler";

View File

@@ -0,0 +1,4 @@
package nu.marginalia.mqapi.ndp;
public record NdpRequest(int goal) {
}

View File

@@ -25,6 +25,12 @@ into the [MariaDB database](../common/db).
The [index-construction-process](index-constructor-process/) constructs indices from The [index-construction-process](index-constructor-process/) constructs indices from
the data generated by the loader. the data generated by the loader.
## 5. Other Processes
* Ping Process: The [ping-process](ping-process/) keeps track of the aliveness of websites, gathering fingerprint information about the security posture of the website, as well as DNS information.
* New Domain Process (NDP): The [new-domain-process](new-domain-process/) evaluates new domains for inclusion in the search engine index.
* Live-Crawling Process: The [live-crawling-process](live-crawling-process/) is a process that crawls websites in real-time based on RSS feeds, updating a smaller index with the latest content.
## Overview ## Overview
Schematically the crawling and loading process looks like this: Schematically the crawling and loading process looks like this:

View File

@@ -22,6 +22,7 @@ import nu.marginalia.search.model.NavbarModel;
import nu.marginalia.search.model.ResultsPage; import nu.marginalia.search.model.ResultsPage;
import nu.marginalia.search.model.UrlDetails; import nu.marginalia.search.model.UrlDetails;
import nu.marginalia.search.svc.SearchFlagSiteService.FlagSiteFormData; import nu.marginalia.search.svc.SearchFlagSiteService.FlagSiteFormData;
import nu.marginalia.service.server.RateLimiter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -47,6 +48,8 @@ public class SearchSiteInfoService {
private final HikariDataSource dataSource; private final HikariDataSource dataSource;
private final SearchSiteSubscriptionService searchSiteSubscriptions; private final SearchSiteSubscriptionService searchSiteSubscriptions;
private final RateLimiter rateLimiter = RateLimiter.custom(60);
@Inject @Inject
public SearchSiteInfoService(SearchOperator searchOperator, public SearchSiteInfoService(SearchOperator searchOperator,
DomainInfoClient domainInfoClient, DomainInfoClient domainInfoClient,
@@ -238,6 +241,7 @@ public class SearchSiteInfoService {
boolean hasScreenshot = screenshotService.hasScreenshot(domainId); boolean hasScreenshot = screenshotService.hasScreenshot(domainId);
boolean isSubscribed = searchSiteSubscriptions.isSubscribed(context, domain); boolean isSubscribed = searchSiteSubscriptions.isSubscribed(context, domain);
boolean rateLimited = !rateLimiter.isAllowed();
if (domainId < 0) { if (domainId < 0) {
domainInfoFuture = CompletableFuture.failedFuture(new Exception("Unknown Domain ID")); domainInfoFuture = CompletableFuture.failedFuture(new Exception("Unknown Domain ID"));
similarSetFuture = CompletableFuture.failedFuture(new Exception("Unknown Domain ID")); similarSetFuture = CompletableFuture.failedFuture(new Exception("Unknown Domain ID"));
@@ -250,6 +254,12 @@ public class SearchSiteInfoService {
linkingDomainsFuture = CompletableFuture.failedFuture(new Exception("Assistant Service Unavailable")); linkingDomainsFuture = CompletableFuture.failedFuture(new Exception("Assistant Service Unavailable"));
feedItemsFuture = CompletableFuture.failedFuture(new Exception("Assistant Service Unavailable")); feedItemsFuture = CompletableFuture.failedFuture(new Exception("Assistant Service Unavailable"));
} }
else if (rateLimited) {
domainInfoFuture = domainInfoClient.domainInformation(domainId);
similarSetFuture = CompletableFuture.failedFuture(new Exception("Rate limit exceeded"));
linkingDomainsFuture = CompletableFuture.failedFuture(new Exception("Rate limit exceeded"));
feedItemsFuture = CompletableFuture.failedFuture(new Exception("Rate limit exceeded"));
}
else { else {
domainInfoFuture = domainInfoClient.domainInformation(domainId); domainInfoFuture = domainInfoClient.domainInformation(domainId);
similarSetFuture = domainInfoClient.similarDomains(domainId, 25); similarSetFuture = domainInfoClient.similarDomains(domainId, 25);
@@ -257,7 +267,14 @@ public class SearchSiteInfoService {
feedItemsFuture = feedsClient.getFeed(domainId); feedItemsFuture = feedsClient.getFeed(domainId);
} }
List<UrlDetails> sampleResults = searchOperator.doSiteSearch(domainName, domainId,5, 1).results; List<UrlDetails> sampleResults;
if (rateLimited) {
sampleResults = List.of();
}
else {
sampleResults = searchOperator.doSiteSearch(domainName, domainId, 5, 1).results;
}
if (!sampleResults.isEmpty()) { if (!sampleResults.isEmpty()) {
url = sampleResults.getFirst().url.withPathAndParam("/", null).toString(); url = sampleResults.getFirst().url.withPathAndParam("/", null).toString();
} }
@@ -276,8 +293,9 @@ public class SearchSiteInfoService {
sampleResults sampleResults
); );
requestMissingScreenshots(result); if (!rateLimited) {
requestMissingScreenshots(result);
}
return result; return result;
} }

View File

@@ -0,0 +1,11 @@
@param String message
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8">
<title>Unavailable</title></head>
<body>
<h1>Service Overloaded</h1>
<p>${message}</p>
</body>
</html>

View File

@@ -280,6 +280,7 @@ public class ControlNodeService {
"on".equalsIgnoreCase(request.queryParams("autoClean")), "on".equalsIgnoreCase(request.queryParams("autoClean")),
"on".equalsIgnoreCase(request.queryParams("includeInPrecession")), "on".equalsIgnoreCase(request.queryParams("includeInPrecession")),
"on".equalsIgnoreCase(request.queryParams("keepWarcs")), "on".equalsIgnoreCase(request.queryParams("keepWarcs")),
"on".equalsIgnoreCase(request.queryParams("autoAssignDomains")),
NodeProfile.valueOf(request.queryParams("profile")), NodeProfile.valueOf(request.queryParams("profile")),
"on".equalsIgnoreCase(request.queryParams("disabled")) "on".equalsIgnoreCase(request.queryParams("disabled"))
); );

View File

@@ -66,13 +66,23 @@
</div> </div>
</div> </div>
<div class="form-check form-switch">
<input class="form-check-input" type="checkbox" role="switch" name="autoAssignDomains" {{#if config.autoAssignDomains}}checked{{/if}}>
<label class="form-check-label" for="autoClean">Auto-Assign Domains</label>
<div class="form-text">If true, the New Domain Process will assign new domains to this node and all other nodes with this setting enabled.
This is the default behavior, but can be overridden if you want one node with a specific manual domain assignment.
</div>
</div>
<!-- This is not currently used, but may be in the future
<div class="form-check form-switch"> <div class="form-check form-switch">
<input class="form-check-input" type="checkbox" role="switch" name="includeInPrecession" {{#if config.includeInPrecession}}checked{{/if}}> <input class="form-check-input" type="checkbox" role="switch" name="includeInPrecession" {{#if config.includeInPrecession}}checked{{/if}}>
<label class="form-check-label" for="includeInPrecession">Include in crawling precession</label> <label class="form-check-label" for="includeInPrecession">Include in crawling precession</label>
<div class="form-text">If true, this node will be included in the crawling precession.</div> <div class="form-text">If true, this node will be included in the crawling precession.</div>
</div> </div>
-->
<div class="form-check form-switch"> <div class="form-check form-switch">
<input class="form-check-input" type="checkbox" role="switch" name="keepWarcs" {{#if config.keepWarcs}}checked{{/if}}> <input class="form-check-input" type="checkbox" role="switch" name="keepWarcs" {{#if config.keepWarcs}}checked{{/if}}>
<label class="form-check-label" for="includeInPrecession">Keep WARC files during crawling</label> <label class="form-check-label" for="includeInPrecession">Keep WARC files during crawling</label>

View File

@@ -13,14 +13,23 @@
{{#unless node.profile.realtime}} {{#unless node.profile.realtime}}
<li class="nav-item dropdown"> <li class="nav-item dropdown">
<a class="nav-link dropdown-toggle {{#if tab.actions}}active{{/if}}" data-bs-toggle="dropdown" href="#" role="button" aria-expanded="false">Actions</a> <a class="nav-link dropdown-toggle {{#if tab.actions}}active{{/if}}" data-bs-toggle="dropdown" href="#" role="button" aria-expanded="false">Actions</a>
{{#if node.profile.permitBatchCrawl}}
<ul class="dropdown-menu"> <ul class="dropdown-menu">
{{#if node.profile.permitBatchCrawl}}
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=new-crawl">New Crawl</a></li> <li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=new-crawl">New Crawl</a></li>
<li><hr class="dropdown-divider"></li> <li><hr class="dropdown-divider"></li>
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=process">Process Crawl Data</a></li> <li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=process">Process Crawl Data</a></li>
{{/if}}
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=load">Load Processed Data</a></li> <li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=load">Load Processed Data</a></li>
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=repartition">Repartition Index</a></li> <li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=repartition">Repartition Index</a></li>
<li><hr class="dropdown-divider"></li> <li><hr class="dropdown-divider"></li>
{{#if node.profile.permitSideload}}
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-encyclopedia">Sideload Encyclopedia</a></li>
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-stackexchange">Sideload Stackexchange</a></li>
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-warc">Sideload WARC Files</a></li>
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-dirtree">Sideload Dirtree</a></li>
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-reddit">Sideload Reddit</a></li>
<li><hr class="dropdown-divider"></li>
{{/if}}
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=download-sample-data">Download Sample Crawl Data</a></li> <li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=download-sample-data">Download Sample Crawl Data</a></li>
<li><hr class="dropdown-divider"></li> <li><hr class="dropdown-divider"></li>
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=export-db-data">Export Database Data</a></li> <li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=export-db-data">Export Database Data</a></li>
@@ -30,19 +39,6 @@
<li><hr class="dropdown-divider"></li> <li><hr class="dropdown-divider"></li>
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=restore-backup">Restore Index Backup</a></li> <li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=restore-backup">Restore Index Backup</a></li>
</ul> </ul>
{{/if}}
{{#if node.profile.permitSideload}}
<ul class="dropdown-menu">
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-encyclopedia">Sideload Encyclopedia</a></li>
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-stackexchange">Sideload Stackexchange</a></li>
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-warc">Sideload WARC Files</a></li>
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-dirtree">Sideload Dirtree</a></li>
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-reddit">Sideload Reddit</a></li>
<li><hr class="dropdown-divider"></li>
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=load">Load Processed Data</a></li>
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=restore-backup">Restore Index Backup</a></li>
</ul>
{{/if}}
</li> </li>
{{/unless}} {{/unless}}
<li class="nav-item"> <li class="nav-item">

View File

@@ -69,6 +69,7 @@ include 'code:processes:crawling-process:ft-link-parser'
include 'code:processes:crawling-process:ft-content-type' include 'code:processes:crawling-process:ft-content-type'
include 'code:processes:live-crawling-process' include 'code:processes:live-crawling-process'
include 'code:processes:ping-process' include 'code:processes:ping-process'
include 'code:processes:new-domain-process'
include 'code:processes:process-mq-api' include 'code:processes:process-mq-api'