mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 17:32:39 +02:00
Compare commits
11 Commits
deploy-026
...
deploy-026
Author | SHA1 | Date | |
---|---|---|---|
|
294ab19177 | ||
|
6f1659ecb2 | ||
|
982dcb28f0 | ||
|
fc686d8b2e | ||
|
69ef0f334a | ||
|
446746f3bd | ||
|
24ab8398bb | ||
|
d2ceeff4cf | ||
|
cf64214b1c | ||
|
e50d09cc01 | ||
|
bce3892ce0 |
@@ -45,7 +45,7 @@ public class NodeConfigurationService {
|
|||||||
public List<NodeConfiguration> getAll() {
|
public List<NodeConfiguration> getAll() {
|
||||||
try (var conn = dataSource.getConnection();
|
try (var conn = dataSource.getConnection();
|
||||||
var qs = conn.prepareStatement("""
|
var qs = conn.prepareStatement("""
|
||||||
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, KEEP_WARCS, NODE_PROFILE, DISABLED
|
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, AUTO_ASSIGN_DOMAINS, KEEP_WARCS, NODE_PROFILE, DISABLED
|
||||||
FROM NODE_CONFIGURATION
|
FROM NODE_CONFIGURATION
|
||||||
""")) {
|
""")) {
|
||||||
var rs = qs.executeQuery();
|
var rs = qs.executeQuery();
|
||||||
@@ -59,6 +59,7 @@ public class NodeConfigurationService {
|
|||||||
rs.getBoolean("ACCEPT_QUERIES"),
|
rs.getBoolean("ACCEPT_QUERIES"),
|
||||||
rs.getBoolean("AUTO_CLEAN"),
|
rs.getBoolean("AUTO_CLEAN"),
|
||||||
rs.getBoolean("PRECESSION"),
|
rs.getBoolean("PRECESSION"),
|
||||||
|
rs.getBoolean("AUTO_ASSIGN_DOMAINS"),
|
||||||
rs.getBoolean("KEEP_WARCS"),
|
rs.getBoolean("KEEP_WARCS"),
|
||||||
NodeProfile.valueOf(rs.getString("NODE_PROFILE")),
|
NodeProfile.valueOf(rs.getString("NODE_PROFILE")),
|
||||||
rs.getBoolean("DISABLED")
|
rs.getBoolean("DISABLED")
|
||||||
@@ -75,7 +76,7 @@ public class NodeConfigurationService {
|
|||||||
public NodeConfiguration get(int nodeId) throws SQLException {
|
public NodeConfiguration get(int nodeId) throws SQLException {
|
||||||
try (var conn = dataSource.getConnection();
|
try (var conn = dataSource.getConnection();
|
||||||
var qs = conn.prepareStatement("""
|
var qs = conn.prepareStatement("""
|
||||||
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, KEEP_WARCS, NODE_PROFILE, DISABLED
|
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, AUTO_ASSIGN_DOMAINS, KEEP_WARCS, NODE_PROFILE, DISABLED
|
||||||
FROM NODE_CONFIGURATION
|
FROM NODE_CONFIGURATION
|
||||||
WHERE ID=?
|
WHERE ID=?
|
||||||
""")) {
|
""")) {
|
||||||
@@ -88,6 +89,7 @@ public class NodeConfigurationService {
|
|||||||
rs.getBoolean("ACCEPT_QUERIES"),
|
rs.getBoolean("ACCEPT_QUERIES"),
|
||||||
rs.getBoolean("AUTO_CLEAN"),
|
rs.getBoolean("AUTO_CLEAN"),
|
||||||
rs.getBoolean("PRECESSION"),
|
rs.getBoolean("PRECESSION"),
|
||||||
|
rs.getBoolean("AUTO_ASSIGN_DOMAINS"),
|
||||||
rs.getBoolean("KEEP_WARCS"),
|
rs.getBoolean("KEEP_WARCS"),
|
||||||
NodeProfile.valueOf(rs.getString("NODE_PROFILE")),
|
NodeProfile.valueOf(rs.getString("NODE_PROFILE")),
|
||||||
rs.getBoolean("DISABLED")
|
rs.getBoolean("DISABLED")
|
||||||
@@ -102,7 +104,7 @@ public class NodeConfigurationService {
|
|||||||
try (var conn = dataSource.getConnection();
|
try (var conn = dataSource.getConnection();
|
||||||
var us = conn.prepareStatement("""
|
var us = conn.prepareStatement("""
|
||||||
UPDATE NODE_CONFIGURATION
|
UPDATE NODE_CONFIGURATION
|
||||||
SET DESCRIPTION=?, ACCEPT_QUERIES=?, AUTO_CLEAN=?, PRECESSION=?, KEEP_WARCS=?, DISABLED=?, NODE_PROFILE=?
|
SET DESCRIPTION=?, ACCEPT_QUERIES=?, AUTO_CLEAN=?, PRECESSION=?, AUTO_ASSIGN_DOMAINS=?, KEEP_WARCS=?, DISABLED=?, NODE_PROFILE=?
|
||||||
WHERE ID=?
|
WHERE ID=?
|
||||||
"""))
|
"""))
|
||||||
{
|
{
|
||||||
@@ -110,10 +112,11 @@ public class NodeConfigurationService {
|
|||||||
us.setBoolean(2, config.acceptQueries());
|
us.setBoolean(2, config.acceptQueries());
|
||||||
us.setBoolean(3, config.autoClean());
|
us.setBoolean(3, config.autoClean());
|
||||||
us.setBoolean(4, config.includeInPrecession());
|
us.setBoolean(4, config.includeInPrecession());
|
||||||
us.setBoolean(5, config.keepWarcs());
|
us.setBoolean(5, config.autoAssignDomains());
|
||||||
us.setBoolean(6, config.disabled());
|
us.setBoolean(6, config.keepWarcs());
|
||||||
us.setString(7, config.profile().name());
|
us.setBoolean(7, config.disabled());
|
||||||
us.setInt(8, config.node());
|
us.setString(8, config.profile().name());
|
||||||
|
us.setInt(9, config.node());
|
||||||
|
|
||||||
if (us.executeUpdate() <= 0)
|
if (us.executeUpdate() <= 0)
|
||||||
throw new IllegalStateException("Failed to update configuration");
|
throw new IllegalStateException("Failed to update configuration");
|
||||||
|
@@ -5,6 +5,7 @@ public record NodeConfiguration(int node,
|
|||||||
boolean acceptQueries,
|
boolean acceptQueries,
|
||||||
boolean autoClean,
|
boolean autoClean,
|
||||||
boolean includeInPrecession,
|
boolean includeInPrecession,
|
||||||
|
boolean autoAssignDomains,
|
||||||
boolean keepWarcs,
|
boolean keepWarcs,
|
||||||
NodeProfile profile,
|
NodeProfile profile,
|
||||||
boolean disabled
|
boolean disabled
|
||||||
|
@@ -22,7 +22,5 @@ public enum NodeProfile {
|
|||||||
public boolean permitBatchCrawl() {
|
public boolean permitBatchCrawl() {
|
||||||
return isBatchCrawl() || isMixed();
|
return isBatchCrawl() || isMixed();
|
||||||
}
|
}
|
||||||
public boolean permitSideload() {
|
public boolean permitSideload() { return isSideload() || isMixed(); }
|
||||||
return isMixed() || isSideload();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@@ -2,6 +2,7 @@ package nu.marginalia.nodecfg;
|
|||||||
|
|
||||||
import com.zaxxer.hikari.HikariConfig;
|
import com.zaxxer.hikari.HikariConfig;
|
||||||
import com.zaxxer.hikari.HikariDataSource;
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
|
import nu.marginalia.nodecfg.model.NodeConfiguration;
|
||||||
import nu.marginalia.nodecfg.model.NodeProfile;
|
import nu.marginalia.nodecfg.model.NodeProfile;
|
||||||
import nu.marginalia.test.TestMigrationLoader;
|
import nu.marginalia.test.TestMigrationLoader;
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
@@ -62,6 +63,63 @@ public class NodeConfigurationServiceTest {
|
|||||||
assertEquals(2, list.size());
|
assertEquals(2, list.size());
|
||||||
assertEquals(a, list.get(0));
|
assertEquals(a, list.get(0));
|
||||||
assertEquals(b, list.get(1));
|
assertEquals(b, list.get(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Test all the fields that are only exposed via save()
|
||||||
|
@Test
|
||||||
|
public void testSaveChanges() throws SQLException {
|
||||||
|
var original = nodeConfigurationService.create(1, "Test", false, false, NodeProfile.MIXED);
|
||||||
|
|
||||||
|
assertEquals(1, original.node());
|
||||||
|
assertEquals("Test", original.description());
|
||||||
|
assertFalse(original.acceptQueries());
|
||||||
|
|
||||||
|
var precession = new NodeConfiguration(
|
||||||
|
original.node(),
|
||||||
|
"Foo",
|
||||||
|
true,
|
||||||
|
original.autoClean(),
|
||||||
|
original.includeInPrecession(),
|
||||||
|
!original.autoAssignDomains(),
|
||||||
|
original.keepWarcs(),
|
||||||
|
original.profile(),
|
||||||
|
original.disabled()
|
||||||
|
);
|
||||||
|
|
||||||
|
nodeConfigurationService.save(precession);
|
||||||
|
precession = nodeConfigurationService.get(original.node());
|
||||||
|
assertNotEquals(original.autoAssignDomains(), precession.autoAssignDomains());
|
||||||
|
|
||||||
|
var autoClean = new NodeConfiguration(
|
||||||
|
original.node(),
|
||||||
|
"Foo",
|
||||||
|
true,
|
||||||
|
!original.autoClean(),
|
||||||
|
original.includeInPrecession(),
|
||||||
|
original.autoAssignDomains(),
|
||||||
|
original.keepWarcs(),
|
||||||
|
original.profile(),
|
||||||
|
original.disabled()
|
||||||
|
);
|
||||||
|
|
||||||
|
nodeConfigurationService.save(autoClean);
|
||||||
|
autoClean = nodeConfigurationService.get(original.node());
|
||||||
|
assertNotEquals(original.autoClean(), autoClean.autoClean());
|
||||||
|
|
||||||
|
var disabled = new NodeConfiguration(
|
||||||
|
original.node(),
|
||||||
|
"Foo",
|
||||||
|
true,
|
||||||
|
autoClean.autoClean(),
|
||||||
|
autoClean.includeInPrecession(),
|
||||||
|
autoClean.autoAssignDomains(),
|
||||||
|
autoClean.keepWarcs(),
|
||||||
|
autoClean.profile(),
|
||||||
|
!autoClean.disabled()
|
||||||
|
);
|
||||||
|
nodeConfigurationService.save(disabled);
|
||||||
|
disabled = nodeConfigurationService.get(original.node());
|
||||||
|
assertNotEquals(autoClean.disabled(), disabled.disabled());
|
||||||
}
|
}
|
||||||
}
|
}
|
@@ -0,0 +1,3 @@
|
|||||||
|
-- Migration script to add AUTO_ASSIGN_DOMAINS column to NODE_CONFIGURATION table
|
||||||
|
|
||||||
|
ALTER TABLE NODE_CONFIGURATION ADD COLUMN AUTO_ASSIGN_DOMAINS BOOLEAN NOT NULL DEFAULT TRUE;
|
@@ -9,6 +9,7 @@ import nu.marginalia.executor.storage.FileStorageFile;
|
|||||||
import nu.marginalia.executor.upload.UploadDirContents;
|
import nu.marginalia.executor.upload.UploadDirContents;
|
||||||
import nu.marginalia.executor.upload.UploadDirItem;
|
import nu.marginalia.executor.upload.UploadDirItem;
|
||||||
import nu.marginalia.functions.execution.api.*;
|
import nu.marginalia.functions.execution.api.*;
|
||||||
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
import nu.marginalia.service.ServiceId;
|
import nu.marginalia.service.ServiceId;
|
||||||
import nu.marginalia.service.client.GrpcChannelPoolFactory;
|
import nu.marginalia.service.client.GrpcChannelPoolFactory;
|
||||||
import nu.marginalia.service.client.GrpcMultiNodeChannelPool;
|
import nu.marginalia.service.client.GrpcMultiNodeChannelPool;
|
||||||
@@ -25,27 +26,37 @@ import java.net.URISyntaxException;
|
|||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static nu.marginalia.functions.execution.api.ExecutorApiGrpc.ExecutorApiBlockingStub;
|
import static nu.marginalia.functions.execution.api.ExecutorApiGrpc.ExecutorApiBlockingStub;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
public class ExecutorClient {
|
public class ExecutorClient {
|
||||||
|
private final MqPersistence persistence;
|
||||||
private final GrpcMultiNodeChannelPool<ExecutorApiBlockingStub> channelPool;
|
private final GrpcMultiNodeChannelPool<ExecutorApiBlockingStub> channelPool;
|
||||||
private static final Logger logger = LoggerFactory.getLogger(ExecutorClient.class);
|
private static final Logger logger = LoggerFactory.getLogger(ExecutorClient.class);
|
||||||
private final ServiceRegistryIf registry;
|
private final ServiceRegistryIf registry;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public ExecutorClient(ServiceRegistryIf registry,
|
public ExecutorClient(ServiceRegistryIf registry,
|
||||||
|
MqPersistence persistence,
|
||||||
GrpcChannelPoolFactory grpcChannelPoolFactory)
|
GrpcChannelPoolFactory grpcChannelPoolFactory)
|
||||||
{
|
{
|
||||||
this.registry = registry;
|
this.registry = registry;
|
||||||
|
this.persistence = persistence;
|
||||||
this.channelPool = grpcChannelPoolFactory
|
this.channelPool = grpcChannelPoolFactory
|
||||||
.createMulti(
|
.createMulti(
|
||||||
ServiceKey.forGrpcApi(ExecutorApiGrpc.class, ServicePartition.multi()),
|
ServiceKey.forGrpcApi(ExecutorApiGrpc.class, ServicePartition.multi()),
|
||||||
ExecutorApiGrpc::newBlockingStub);
|
ExecutorApiGrpc::newBlockingStub);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private long createTrackingTokenMsg(String task, int node, Duration ttl) throws Exception {
|
||||||
|
return persistence.sendNewMessage("task-tracking[" + node + "]", "export-client", null, task, "", ttl);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public void startFsm(int node, String actorName) {
|
public void startFsm(int node, String actorName) {
|
||||||
channelPool.call(ExecutorApiBlockingStub::startFsm)
|
channelPool.call(ExecutorApiBlockingStub::startFsm)
|
||||||
.forNode(node)
|
.forNode(node)
|
||||||
@@ -96,6 +107,16 @@ public class ExecutorClient {
|
|||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long updateNsfwFilters() throws Exception {
|
||||||
|
long msgId = createTrackingTokenMsg("nsfw-filters", 1, Duration.ofHours(6));
|
||||||
|
|
||||||
|
channelPool.call(ExecutorApiBlockingStub::updateNsfwFilters)
|
||||||
|
.forNode(1)
|
||||||
|
.run(RpcUpdateNsfwFilters.newBuilder().setMsgId(msgId).build());
|
||||||
|
|
||||||
|
return msgId;
|
||||||
|
}
|
||||||
|
|
||||||
public ActorRunStates getActorStates(int node) {
|
public ActorRunStates getActorStates(int node) {
|
||||||
try {
|
try {
|
||||||
var rs = channelPool.call(ExecutorApiBlockingStub::getActorStates)
|
var rs = channelPool.call(ExecutorApiBlockingStub::getActorStates)
|
||||||
|
@@ -18,6 +18,8 @@ service ExecutorApi {
|
|||||||
rpc calculateAdjacencies(Empty) returns (Empty) {}
|
rpc calculateAdjacencies(Empty) returns (Empty) {}
|
||||||
rpc restoreBackup(RpcFileStorageId) returns (Empty) {}
|
rpc restoreBackup(RpcFileStorageId) returns (Empty) {}
|
||||||
|
|
||||||
|
rpc updateNsfwFilters(RpcUpdateNsfwFilters) returns (Empty) {}
|
||||||
|
|
||||||
rpc restartExecutorService(Empty) returns (Empty) {}
|
rpc restartExecutorService(Empty) returns (Empty) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -66,6 +68,9 @@ message RpcExportRequest {
|
|||||||
int64 fileStorageId = 1;
|
int64 fileStorageId = 1;
|
||||||
int64 msgId = 2;
|
int64 msgId = 2;
|
||||||
}
|
}
|
||||||
|
message RpcUpdateNsfwFilters {
|
||||||
|
int64 msgId = 1;
|
||||||
|
}
|
||||||
message RpcFileStorageIdWithDomainName {
|
message RpcFileStorageIdWithDomainName {
|
||||||
int64 fileStorageId = 1;
|
int64 fileStorageId = 1;
|
||||||
string targetDomainName = 2;
|
string targetDomainName = 2;
|
||||||
|
@@ -6,7 +6,7 @@ import java.util.Set;
|
|||||||
|
|
||||||
public enum ExecutorActor {
|
public enum ExecutorActor {
|
||||||
PREC_EXPORT_ALL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
PREC_EXPORT_ALL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||||
SYNC_NSFW_LISTS(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
UPDATE_NSFW_LISTS(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.SIDELOAD, NodeProfile.REALTIME),
|
||||||
|
|
||||||
CRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
CRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||||
RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||||
|
@@ -113,7 +113,7 @@ public class ExecutorActorControlService {
|
|||||||
register(ExecutorActor.UPDATE_RSS, updateRssActor);
|
register(ExecutorActor.UPDATE_RSS, updateRssActor);
|
||||||
|
|
||||||
register(ExecutorActor.MIGRATE_CRAWL_DATA, migrateCrawlDataActor);
|
register(ExecutorActor.MIGRATE_CRAWL_DATA, migrateCrawlDataActor);
|
||||||
register(ExecutorActor.SYNC_NSFW_LISTS, updateNsfwFiltersActor);
|
register(ExecutorActor.UPDATE_NSFW_LISTS, updateNsfwFiltersActor);
|
||||||
|
|
||||||
if (serviceConfiguration.node() == 1) {
|
if (serviceConfiguration.node() == 1) {
|
||||||
register(ExecutorActor.PREC_EXPORT_ALL, exportAllPrecessionActor);
|
register(ExecutorActor.PREC_EXPORT_ALL, exportAllPrecessionActor);
|
||||||
|
@@ -25,6 +25,10 @@ import java.util.concurrent.Executors;
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
|
||||||
|
// Unlike other monitor actors, the ping monitor will not merely wait for a request
|
||||||
|
// to be sent, but send one itself, hence we can't extend AbstractProcessSpawnerActor
|
||||||
|
// but have to reimplement a lot of the same logic ourselves.
|
||||||
@Singleton
|
@Singleton
|
||||||
public class PingMonitorActor extends RecordActorPrototype {
|
public class PingMonitorActor extends RecordActorPrototype {
|
||||||
|
|
||||||
@@ -53,7 +57,6 @@ public class PingMonitorActor extends RecordActorPrototype {
|
|||||||
return switch (self) {
|
return switch (self) {
|
||||||
case Initial i -> {
|
case Initial i -> {
|
||||||
PingRequest request = new PingRequest();
|
PingRequest request = new PingRequest();
|
||||||
|
|
||||||
persistence.sendNewMessage(inboxName, null, null,
|
persistence.sendNewMessage(inboxName, null, null,
|
||||||
"PingRequest",
|
"PingRequest",
|
||||||
gson.toJson(request),
|
gson.toJson(request),
|
||||||
|
@@ -44,7 +44,6 @@ public class LiveCrawlActor extends RecordActorPrototype {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ActorStep transition(ActorStep self) throws Exception {
|
public ActorStep transition(ActorStep self) throws Exception {
|
||||||
logger.info("{}", self);
|
|
||||||
return switch (self) {
|
return switch (self) {
|
||||||
case Initial() -> {
|
case Initial() -> {
|
||||||
yield new Monitor("-");
|
yield new Monitor("-");
|
||||||
|
@@ -5,6 +5,8 @@ import com.google.inject.Inject;
|
|||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import nu.marginalia.actor.prototype.RecordActorPrototype;
|
import nu.marginalia.actor.prototype.RecordActorPrototype;
|
||||||
import nu.marginalia.actor.state.ActorStep;
|
import nu.marginalia.actor.state.ActorStep;
|
||||||
|
import nu.marginalia.mq.MqMessageState;
|
||||||
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
import nu.marginalia.nsfw.NsfwDomainFilter;
|
import nu.marginalia.nsfw.NsfwDomainFilter;
|
||||||
import nu.marginalia.service.module.ServiceConfiguration;
|
import nu.marginalia.service.module.ServiceConfiguration;
|
||||||
|
|
||||||
@@ -12,23 +14,26 @@ import nu.marginalia.service.module.ServiceConfiguration;
|
|||||||
public class UpdateNsfwFiltersActor extends RecordActorPrototype {
|
public class UpdateNsfwFiltersActor extends RecordActorPrototype {
|
||||||
private final ServiceConfiguration serviceConfiguration;
|
private final ServiceConfiguration serviceConfiguration;
|
||||||
private final NsfwDomainFilter nsfwDomainFilter;
|
private final NsfwDomainFilter nsfwDomainFilter;
|
||||||
|
private final MqPersistence persistence;
|
||||||
|
|
||||||
public record Initial() implements ActorStep {}
|
public record Initial(long respondMsgId) implements ActorStep {}
|
||||||
public record Run() implements ActorStep {}
|
public record Run(long respondMsgId) implements ActorStep {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ActorStep transition(ActorStep self) throws Exception {
|
public ActorStep transition(ActorStep self) throws Exception {
|
||||||
return switch(self) {
|
return switch(self) {
|
||||||
case Initial() -> {
|
case Initial(long respondMsgId) -> {
|
||||||
if (serviceConfiguration.node() != 1) {
|
if (serviceConfiguration.node() != 1) {
|
||||||
|
persistence.updateMessageState(respondMsgId, MqMessageState.ERR);
|
||||||
yield new Error("This actor can only run on node 1");
|
yield new Error("This actor can only run on node 1");
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
yield new Run();
|
yield new Run(respondMsgId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case Run() -> {
|
case Run(long respondMsgId) -> {
|
||||||
nsfwDomainFilter.fetchLists();
|
nsfwDomainFilter.fetchLists();
|
||||||
|
persistence.updateMessageState(respondMsgId, MqMessageState.OK);
|
||||||
yield new End();
|
yield new End();
|
||||||
}
|
}
|
||||||
default -> new Error();
|
default -> new Error();
|
||||||
@@ -43,11 +48,13 @@ public class UpdateNsfwFiltersActor extends RecordActorPrototype {
|
|||||||
@Inject
|
@Inject
|
||||||
public UpdateNsfwFiltersActor(Gson gson,
|
public UpdateNsfwFiltersActor(Gson gson,
|
||||||
ServiceConfiguration serviceConfiguration,
|
ServiceConfiguration serviceConfiguration,
|
||||||
NsfwDomainFilter nsfwDomainFilter)
|
NsfwDomainFilter nsfwDomainFilter,
|
||||||
|
MqPersistence persistence)
|
||||||
{
|
{
|
||||||
super(gson);
|
super(gson);
|
||||||
this.serviceConfiguration = serviceConfiguration;
|
this.serviceConfiguration = serviceConfiguration;
|
||||||
this.nsfwDomainFilter = nsfwDomainFilter;
|
this.nsfwDomainFilter = nsfwDomainFilter;
|
||||||
|
this.persistence = persistence;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@@ -10,6 +10,7 @@ import nu.marginalia.actor.state.ActorStateInstance;
|
|||||||
import nu.marginalia.actor.task.DownloadSampleActor;
|
import nu.marginalia.actor.task.DownloadSampleActor;
|
||||||
import nu.marginalia.actor.task.RestoreBackupActor;
|
import nu.marginalia.actor.task.RestoreBackupActor;
|
||||||
import nu.marginalia.actor.task.TriggerAdjacencyCalculationActor;
|
import nu.marginalia.actor.task.TriggerAdjacencyCalculationActor;
|
||||||
|
import nu.marginalia.actor.task.UpdateNsfwFiltersActor;
|
||||||
import nu.marginalia.functions.execution.api.*;
|
import nu.marginalia.functions.execution.api.*;
|
||||||
import nu.marginalia.service.module.ServiceConfiguration;
|
import nu.marginalia.service.module.ServiceConfiguration;
|
||||||
import nu.marginalia.service.server.DiscoverableService;
|
import nu.marginalia.service.server.DiscoverableService;
|
||||||
@@ -263,4 +264,19 @@ public class ExecutorGrpcService
|
|||||||
System.exit(0);
|
System.exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateNsfwFilters(RpcUpdateNsfwFilters request, StreamObserver<Empty> responseObserver) {
|
||||||
|
logger.info("Got request {}", request);
|
||||||
|
try {
|
||||||
|
actorControlService.startFrom(ExecutorActor.UPDATE_NSFW_LISTS,
|
||||||
|
new UpdateNsfwFiltersActor.Initial(request.getMsgId()));
|
||||||
|
|
||||||
|
responseObserver.onNext(Empty.getDefaultInstance());
|
||||||
|
responseObserver.onCompleted();
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
logger.error("Failed to update nsfw filters", e);
|
||||||
|
responseObserver.onError(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -11,6 +11,7 @@ import nu.marginalia.service.discovery.property.ServicePartition;
|
|||||||
import nu.marginalia.service.module.ServiceConfiguration;
|
import nu.marginalia.service.module.ServiceConfiguration;
|
||||||
|
|
||||||
import javax.annotation.CheckReturnValue;
|
import javax.annotation.CheckReturnValue;
|
||||||
|
import java.time.Duration;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -59,6 +60,11 @@ public class FeedsClient {
|
|||||||
.forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList())));
|
.forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList())));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean waitReady(Duration duration) throws InterruptedException {
|
||||||
|
return channelPool.awaitChannel(duration);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/** Get the hash of the feed data, for identifying when the data has been updated */
|
/** Get the hash of the feed data, for identifying when the data has been updated */
|
||||||
public String getFeedDataHash() {
|
public String getFeedDataHash() {
|
||||||
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash)
|
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash)
|
||||||
|
@@ -35,6 +35,7 @@ dependencies {
|
|||||||
implementation libs.bundles.slf4j
|
implementation libs.bundles.slf4j
|
||||||
implementation libs.commons.lang3
|
implementation libs.commons.lang3
|
||||||
implementation libs.commons.io
|
implementation libs.commons.io
|
||||||
|
implementation libs.httpclient
|
||||||
implementation libs.wiremock
|
implementation libs.wiremock
|
||||||
|
|
||||||
implementation libs.prometheus
|
implementation libs.prometheus
|
||||||
|
@@ -20,19 +20,36 @@ import nu.marginalia.storage.FileStorageService;
|
|||||||
import nu.marginalia.storage.model.FileStorage;
|
import nu.marginalia.storage.model.FileStorage;
|
||||||
import nu.marginalia.storage.model.FileStorageType;
|
import nu.marginalia.storage.model.FileStorageType;
|
||||||
import nu.marginalia.util.SimpleBlockingThreadPool;
|
import nu.marginalia.util.SimpleBlockingThreadPool;
|
||||||
|
import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
|
||||||
|
import org.apache.hc.client5.http.classic.HttpClient;
|
||||||
|
import org.apache.hc.client5.http.config.ConnectionConfig;
|
||||||
|
import org.apache.hc.client5.http.config.RequestConfig;
|
||||||
|
import org.apache.hc.client5.http.cookie.StandardCookieSpec;
|
||||||
|
import org.apache.hc.client5.http.impl.classic.HttpClients;
|
||||||
|
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
|
||||||
|
import org.apache.hc.core5.http.Header;
|
||||||
|
import org.apache.hc.core5.http.HeaderElement;
|
||||||
|
import org.apache.hc.core5.http.HeaderElements;
|
||||||
|
import org.apache.hc.core5.http.HttpResponse;
|
||||||
|
import org.apache.hc.core5.http.io.SocketConfig;
|
||||||
|
import org.apache.hc.core5.http.io.entity.EntityUtils;
|
||||||
|
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
|
||||||
|
import org.apache.hc.core5.http.message.MessageSupport;
|
||||||
|
import org.apache.hc.core5.http.protocol.HttpContext;
|
||||||
|
import org.apache.hc.core5.util.TimeValue;
|
||||||
|
import org.apache.hc.core5.util.Timeout;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.net.http.HttpClient;
|
|
||||||
import java.net.http.HttpRequest;
|
|
||||||
import java.net.http.HttpResponse;
|
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.time.*;
|
import java.time.Instant;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.ZoneId;
|
||||||
|
import java.time.ZonedDateTime;
|
||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
@@ -55,6 +72,8 @@ public class FeedFetcherService {
|
|||||||
|
|
||||||
private final DomainCoordinator domainCoordinator;
|
private final DomainCoordinator domainCoordinator;
|
||||||
|
|
||||||
|
private final HttpClient httpClient;
|
||||||
|
|
||||||
private volatile boolean updating;
|
private volatile boolean updating;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
@@ -71,6 +90,83 @@ public class FeedFetcherService {
|
|||||||
this.serviceHeartbeat = serviceHeartbeat;
|
this.serviceHeartbeat = serviceHeartbeat;
|
||||||
this.executorClient = executorClient;
|
this.executorClient = executorClient;
|
||||||
this.domainCoordinator = domainCoordinator;
|
this.domainCoordinator = domainCoordinator;
|
||||||
|
|
||||||
|
final ConnectionConfig connectionConfig = ConnectionConfig.custom()
|
||||||
|
.setSocketTimeout(15, TimeUnit.SECONDS)
|
||||||
|
.setConnectTimeout(15, TimeUnit.SECONDS)
|
||||||
|
.setValidateAfterInactivity(TimeValue.ofSeconds(5))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
|
||||||
|
var connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
|
||||||
|
.setMaxConnPerRoute(2)
|
||||||
|
.setMaxConnTotal(50)
|
||||||
|
.setDefaultConnectionConfig(connectionConfig)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
connectionManager.setDefaultSocketConfig(SocketConfig.custom()
|
||||||
|
.setSoLinger(TimeValue.ofSeconds(-1))
|
||||||
|
.setSoTimeout(Timeout.ofSeconds(10))
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
|
||||||
|
Thread.ofPlatform().daemon(true).start(() -> {
|
||||||
|
try {
|
||||||
|
for (;;) {
|
||||||
|
TimeUnit.SECONDS.sleep(15);
|
||||||
|
logger.info("Connection pool stats: {}", connectionManager.getTotalStats());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
final RequestConfig defaultRequestConfig = RequestConfig.custom()
|
||||||
|
.setCookieSpec(StandardCookieSpec.IGNORE)
|
||||||
|
.setResponseTimeout(10, TimeUnit.SECONDS)
|
||||||
|
.setConnectionRequestTimeout(5, TimeUnit.MINUTES)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
httpClient = HttpClients.custom()
|
||||||
|
.setDefaultRequestConfig(defaultRequestConfig)
|
||||||
|
.setConnectionManager(connectionManager)
|
||||||
|
.setUserAgent(WmsaHome.getUserAgent().uaIdentifier())
|
||||||
|
.setConnectionManager(connectionManager)
|
||||||
|
.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
|
||||||
|
// Default keep-alive duration is 3 minutes, but this is too long for us,
|
||||||
|
// as we are either going to re-use it fairly quickly or close it for a long time.
|
||||||
|
//
|
||||||
|
// So we set it to 30 seconds or clamp the server-provided value to a minimum of 10 seconds.
|
||||||
|
private static final TimeValue defaultValue = TimeValue.ofSeconds(30);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimeValue getKeepAliveDuration(HttpResponse response, HttpContext context) {
|
||||||
|
final Iterator<HeaderElement> it = MessageSupport.iterate(response, HeaderElements.KEEP_ALIVE);
|
||||||
|
|
||||||
|
while (it.hasNext()) {
|
||||||
|
final HeaderElement he = it.next();
|
||||||
|
final String param = he.getName();
|
||||||
|
final String value = he.getValue();
|
||||||
|
|
||||||
|
if (value == null)
|
||||||
|
continue;
|
||||||
|
if (!"timeout".equalsIgnoreCase(param))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
try {
|
||||||
|
long timeout = Long.parseLong(value);
|
||||||
|
timeout = Math.clamp(timeout, 30, defaultValue.toSeconds());
|
||||||
|
return TimeValue.ofSeconds(timeout);
|
||||||
|
} catch (final NumberFormatException ignore) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return defaultValue;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.build();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public enum UpdateMode {
|
public enum UpdateMode {
|
||||||
@@ -86,13 +182,7 @@ public class FeedFetcherService {
|
|||||||
|
|
||||||
|
|
||||||
try (FeedDbWriter writer = feedDb.createWriter();
|
try (FeedDbWriter writer = feedDb.createWriter();
|
||||||
HttpClient client = HttpClient.newBuilder()
|
ExecutorService fetchExecutor = Executors.newVirtualThreadPerTaskExecutor();
|
||||||
.connectTimeout(Duration.ofSeconds(15))
|
|
||||||
.executor(Executors.newCachedThreadPool())
|
|
||||||
.followRedirects(HttpClient.Redirect.NORMAL)
|
|
||||||
.version(HttpClient.Version.HTTP_2)
|
|
||||||
.build();
|
|
||||||
ExecutorService fetchExecutor = Executors.newCachedThreadPool();
|
|
||||||
FeedJournal feedJournal = FeedJournal.create();
|
FeedJournal feedJournal = FeedJournal.create();
|
||||||
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds")
|
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds")
|
||||||
) {
|
) {
|
||||||
@@ -137,7 +227,8 @@ public class FeedFetcherService {
|
|||||||
|
|
||||||
FetchResult feedData;
|
FetchResult feedData;
|
||||||
try (DomainLock domainLock = domainCoordinator.lockDomain(new EdgeDomain(feed.domain()))) {
|
try (DomainLock domainLock = domainCoordinator.lockDomain(new EdgeDomain(feed.domain()))) {
|
||||||
feedData = fetchFeedData(feed, client, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
|
feedData = fetchFeedData(feed, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
|
||||||
|
TimeUnit.SECONDS.sleep(1); // Sleep before we yield the lock to avoid hammering the server from multiple processes
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
feedData = new FetchResult.TransientError();
|
feedData = new FetchResult.TransientError();
|
||||||
}
|
}
|
||||||
@@ -216,7 +307,6 @@ public class FeedFetcherService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private FetchResult fetchFeedData(FeedDefinition feed,
|
private FetchResult fetchFeedData(FeedDefinition feed,
|
||||||
HttpClient client,
|
|
||||||
ExecutorService executorService,
|
ExecutorService executorService,
|
||||||
@Nullable String ifModifiedSinceDate,
|
@Nullable String ifModifiedSinceDate,
|
||||||
@Nullable String ifNoneMatchTag)
|
@Nullable String ifNoneMatchTag)
|
||||||
@@ -224,59 +314,63 @@ public class FeedFetcherService {
|
|||||||
try {
|
try {
|
||||||
URI uri = new URI(feed.feedUrl());
|
URI uri = new URI(feed.feedUrl());
|
||||||
|
|
||||||
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
|
var requestBuilder = ClassicRequestBuilder.get(uri)
|
||||||
.GET()
|
.setHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier())
|
||||||
.uri(uri)
|
.setHeader("Accept-Encoding", "gzip")
|
||||||
.header("User-Agent", WmsaHome.getUserAgent().uaIdentifier())
|
.setHeader("Accept", "text/*, */*;q=0.9");
|
||||||
.header("Accept-Encoding", "gzip")
|
|
||||||
.header("Accept", "text/*, */*;q=0.9")
|
|
||||||
.timeout(Duration.ofSeconds(15))
|
|
||||||
;
|
|
||||||
|
|
||||||
// Set the If-Modified-Since or If-None-Match headers if we have them
|
// Set the If-Modified-Since or If-None-Match headers if we have them
|
||||||
// though since there are certain idiosyncrasies in server implementations,
|
// though since there are certain idiosyncrasies in server implementations,
|
||||||
// we avoid setting both at the same time as that may turn a 304 into a 200.
|
// we avoid setting both at the same time as that may turn a 304 into a 200.
|
||||||
if (ifNoneMatchTag != null) {
|
if (ifNoneMatchTag != null) {
|
||||||
requestBuilder.header("If-None-Match", ifNoneMatchTag);
|
requestBuilder.addHeader("If-None-Match", ifNoneMatchTag);
|
||||||
} else if (ifModifiedSinceDate != null) {
|
} else if (ifModifiedSinceDate != null) {
|
||||||
requestBuilder.header("If-Modified-Since", ifModifiedSinceDate);
|
requestBuilder.addHeader("If-Modified-Since", ifModifiedSinceDate);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return httpClient.execute(requestBuilder.build(), rsp -> {
|
||||||
|
try {
|
||||||
|
logger.info("Code: {}, URL: {}", rsp.getCode(), uri);
|
||||||
|
|
||||||
HttpRequest getRequest = requestBuilder.build();
|
switch (rsp.getCode()) {
|
||||||
|
|
||||||
for (int i = 0; i < 3; i++) {
|
|
||||||
|
|
||||||
/* Note we need to use an executor to time-limit the send() method in HttpClient, as
|
|
||||||
* its support for timeouts only applies to the time until response starts to be received,
|
|
||||||
* and does not catch the case when the server starts to send data but then hangs.
|
|
||||||
*/
|
|
||||||
HttpResponse<byte[]> rs = executorService.submit(
|
|
||||||
() -> client.send(getRequest, HttpResponse.BodyHandlers.ofByteArray()))
|
|
||||||
.get(15, TimeUnit.SECONDS);
|
|
||||||
|
|
||||||
if (rs.statusCode() == 429) { // Too Many Requests
|
|
||||||
int retryAfter = Integer.parseInt(rs.headers().firstValue("Retry-After").orElse("2"));
|
|
||||||
Thread.sleep(Duration.ofSeconds(Math.clamp(retryAfter, 1, 5)));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
String newEtagValue = rs.headers().firstValue("ETag").orElse("");
|
|
||||||
|
|
||||||
return switch (rs.statusCode()) {
|
|
||||||
case 200 -> {
|
case 200 -> {
|
||||||
byte[] responseData = getResponseData(rs);
|
if (rsp.getEntity() == null) {
|
||||||
|
return new FetchResult.TransientError(); // No content to read, treat as transient error
|
||||||
|
}
|
||||||
|
byte[] responseData = EntityUtils.toByteArray(rsp.getEntity());
|
||||||
|
|
||||||
String contentType = rs.headers().firstValue("Content-Type").orElse("");
|
// Decode the response body based on the Content-Type header
|
||||||
|
Header contentTypeHeader = rsp.getFirstHeader("Content-Type");
|
||||||
|
if (contentTypeHeader == null) {
|
||||||
|
return new FetchResult.TransientError();
|
||||||
|
}
|
||||||
|
String contentType = contentTypeHeader.getValue();
|
||||||
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), responseData);
|
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), responseData);
|
||||||
|
|
||||||
yield new FetchResult.Success(bodyText, newEtagValue);
|
// Grab the ETag header if it exists
|
||||||
|
Header etagHeader = rsp.getFirstHeader("ETag");
|
||||||
|
String newEtagValue = etagHeader == null ? null : etagHeader.getValue();
|
||||||
|
|
||||||
|
return new FetchResult.Success(bodyText, newEtagValue);
|
||||||
}
|
}
|
||||||
case 304 -> new FetchResult.NotModified(); // via If-Modified-Since semantics
|
case 304 -> {
|
||||||
case 404 -> new FetchResult.PermanentError(); // never try again
|
return new FetchResult.NotModified(); // via If-Modified-Since semantics
|
||||||
default -> new FetchResult.TransientError(); // we try again later
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
case 404 -> {
|
||||||
|
return new FetchResult.PermanentError(); // never try again
|
||||||
|
}
|
||||||
|
default -> {
|
||||||
|
return new FetchResult.TransientError(); // we try again later
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex) {
|
||||||
|
return new FetchResult.PermanentError(); // treat as permanent error
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
EntityUtils.consumeQuietly(rsp.getEntity());
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
catch (Exception ex) {
|
catch (Exception ex) {
|
||||||
logger.debug("Error fetching feed", ex);
|
logger.debug("Error fetching feed", ex);
|
||||||
@@ -285,19 +379,6 @@ public class FeedFetcherService {
|
|||||||
return new FetchResult.TransientError();
|
return new FetchResult.TransientError();
|
||||||
}
|
}
|
||||||
|
|
||||||
private byte[] getResponseData(HttpResponse<byte[]> response) throws IOException {
|
|
||||||
String encoding = response.headers().firstValue("Content-Encoding").orElse("");
|
|
||||||
|
|
||||||
if ("gzip".equals(encoding)) {
|
|
||||||
try (var stream = new GZIPInputStream(new ByteArrayInputStream(response.body()))) {
|
|
||||||
return stream.readAllBytes();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
return response.body();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public sealed interface FetchResult {
|
public sealed interface FetchResult {
|
||||||
record Success(String value, String etag) implements FetchResult {}
|
record Success(String value, String etag) implements FetchResult {}
|
||||||
record NotModified() implements FetchResult {}
|
record NotModified() implements FetchResult {}
|
||||||
|
@@ -5,6 +5,8 @@ import com.google.inject.Guice;
|
|||||||
import com.google.inject.name.Names;
|
import com.google.inject.name.Names;
|
||||||
import com.zaxxer.hikari.HikariConfig;
|
import com.zaxxer.hikari.HikariConfig;
|
||||||
import com.zaxxer.hikari.HikariDataSource;
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
|
import nu.marginalia.coordination.DomainCoordinator;
|
||||||
|
import nu.marginalia.coordination.LocalDomainCoordinator;
|
||||||
import nu.marginalia.model.EdgeDomain;
|
import nu.marginalia.model.EdgeDomain;
|
||||||
import nu.marginalia.rss.db.FeedDb;
|
import nu.marginalia.rss.db.FeedDb;
|
||||||
import nu.marginalia.rss.model.FeedItems;
|
import nu.marginalia.rss.model.FeedItems;
|
||||||
@@ -82,6 +84,7 @@ class FeedFetcherServiceTest extends AbstractModule {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void configure() {
|
public void configure() {
|
||||||
|
bind(DomainCoordinator.class).to(LocalDomainCoordinator.class);
|
||||||
bind(HikariDataSource.class).toInstance(dataSource);
|
bind(HikariDataSource.class).toInstance(dataSource);
|
||||||
bind(ServiceRegistryIf.class).toInstance(Mockito.mock(ServiceRegistryIf.class));
|
bind(ServiceRegistryIf.class).toInstance(Mockito.mock(ServiceRegistryIf.class));
|
||||||
bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration(ServiceId.Executor, 1, "", "", 0, UUID.randomUUID()));
|
bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration(ServiceId.Executor, 1, "", "", 0, UUID.randomUUID()));
|
||||||
|
@@ -115,9 +115,13 @@ public class CrawlerRetreiver implements AutoCloseable {
|
|||||||
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(probedUrl.domain, warcRecorder);
|
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(probedUrl.domain, warcRecorder);
|
||||||
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
|
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
|
||||||
|
|
||||||
|
if (!robotsRules.isAllowed(probedUrl.toString())) {
|
||||||
|
warcRecorder.flagAsRobotsTxtError(probedUrl);
|
||||||
|
yield 1; // Nothing we can do here, we aren't allowed to fetch the root URL
|
||||||
|
}
|
||||||
delayTimer.waitFetchDelay(0); // initial delay after robots.txt
|
delayTimer.waitFetchDelay(0); // initial delay after robots.txt
|
||||||
|
|
||||||
DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(probedUrl, delayTimer);
|
DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(probedUrl, robotsRules, delayTimer);
|
||||||
domainStateDb.save(summaryRecord);
|
domainStateDb.save(summaryRecord);
|
||||||
|
|
||||||
if (Thread.interrupted()) {
|
if (Thread.interrupted()) {
|
||||||
@@ -270,11 +274,11 @@ public class CrawlerRetreiver implements AutoCloseable {
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
private DomainStateDb.SummaryRecord sniffRootDocument(EdgeUrl rootUrl, CrawlDelayTimer timer) {
|
private DomainStateDb.SummaryRecord sniffRootDocument(EdgeUrl rootUrl, SimpleRobotRules robotsRules, CrawlDelayTimer timer) {
|
||||||
Optional<String> feedLink = Optional.empty();
|
Optional<String> feedLink = Optional.empty();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
var url = rootUrl.withPathAndParam("/", null);
|
EdgeUrl url = rootUrl.withPathAndParam("/", null);
|
||||||
|
|
||||||
HttpFetchResult result = fetcher.fetchContent(url, warcRecorder, cookies, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED);
|
HttpFetchResult result = fetcher.fetchContent(url, warcRecorder, cookies, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED);
|
||||||
timer.waitFetchDelay(0);
|
timer.waitFetchDelay(0);
|
||||||
@@ -331,7 +335,7 @@ public class CrawlerRetreiver implements AutoCloseable {
|
|||||||
|
|
||||||
|
|
||||||
if (feedLink.isEmpty()) {
|
if (feedLink.isEmpty()) {
|
||||||
feedLink = guessFeedUrl(timer);
|
feedLink = guessFeedUrl(timer, robotsRules);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Download the sitemap if available
|
// Download the sitemap if available
|
||||||
@@ -339,7 +343,10 @@ public class CrawlerRetreiver implements AutoCloseable {
|
|||||||
|
|
||||||
// Grab the favicon if it exists
|
// Grab the favicon if it exists
|
||||||
|
|
||||||
if (fetcher.fetchContent(faviconUrl, warcRecorder, cookies, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED) instanceof HttpFetchResult.ResultOk iconResult) {
|
if (robotsRules.isAllowed(faviconUrl.toString())) {
|
||||||
|
if (fetcher.fetchContent(faviconUrl, warcRecorder, cookies, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED)
|
||||||
|
instanceof HttpFetchResult.ResultOk iconResult)
|
||||||
|
{
|
||||||
String contentType = iconResult.header("Content-Type");
|
String contentType = iconResult.header("Content-Type");
|
||||||
byte[] iconData = iconResult.getBodyBytes();
|
byte[] iconData = iconResult.getBodyBytes();
|
||||||
|
|
||||||
@@ -348,6 +355,7 @@ public class CrawlerRetreiver implements AutoCloseable {
|
|||||||
new DomainStateDb.FaviconRecord(contentType, iconData)
|
new DomainStateDb.FaviconRecord(contentType, iconData)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
timer.waitFetchDelay(0);
|
timer.waitFetchDelay(0);
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -383,7 +391,7 @@ public class CrawlerRetreiver implements AutoCloseable {
|
|||||||
"blog/rss"
|
"blog/rss"
|
||||||
);
|
);
|
||||||
|
|
||||||
private Optional<String> guessFeedUrl(CrawlDelayTimer timer) throws InterruptedException {
|
private Optional<String> guessFeedUrl(CrawlDelayTimer timer, SimpleRobotRules robotsRules) throws InterruptedException {
|
||||||
var oldDomainStateRecord = domainStateDb.getSummary(domain);
|
var oldDomainStateRecord = domainStateDb.getSummary(domain);
|
||||||
|
|
||||||
// If we are already aware of an old feed URL, then we can just revalidate it
|
// If we are already aware of an old feed URL, then we can just revalidate it
|
||||||
@@ -396,6 +404,9 @@ public class CrawlerRetreiver implements AutoCloseable {
|
|||||||
|
|
||||||
for (String endpoint : likelyFeedEndpoints) {
|
for (String endpoint : likelyFeedEndpoints) {
|
||||||
String url = "https://" + domain + "/" + endpoint;
|
String url = "https://" + domain + "/" + endpoint;
|
||||||
|
if (!robotsRules.isAllowed(url)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (validateFeedUrl(url, timer)) {
|
if (validateFeedUrl(url, timer)) {
|
||||||
return Optional.of(url);
|
return Optional.of(url);
|
||||||
}
|
}
|
||||||
|
@@ -50,6 +50,7 @@ dependencies {
|
|||||||
|
|
||||||
implementation libs.notnull
|
implementation libs.notnull
|
||||||
implementation libs.guava
|
implementation libs.guava
|
||||||
|
implementation libs.httpclient
|
||||||
implementation dependencies.create(libs.guice.get()) {
|
implementation dependencies.create(libs.guice.get()) {
|
||||||
exclude group: 'com.google.guava'
|
exclude group: 'com.google.guava'
|
||||||
}
|
}
|
||||||
|
@@ -15,6 +15,7 @@ import nu.marginalia.coordination.DomainCoordinator;
|
|||||||
import nu.marginalia.db.DbDomainQueries;
|
import nu.marginalia.db.DbDomainQueries;
|
||||||
import nu.marginalia.db.DomainBlacklist;
|
import nu.marginalia.db.DomainBlacklist;
|
||||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||||
|
import nu.marginalia.livecrawler.io.HttpClientProvider;
|
||||||
import nu.marginalia.loading.LoaderInputData;
|
import nu.marginalia.loading.LoaderInputData;
|
||||||
import nu.marginalia.loading.documents.DocumentLoaderService;
|
import nu.marginalia.loading.documents.DocumentLoaderService;
|
||||||
import nu.marginalia.loading.documents.KeywordLoaderService;
|
import nu.marginalia.loading.documents.KeywordLoaderService;
|
||||||
@@ -32,12 +33,15 @@ import nu.marginalia.service.module.ServiceDiscoveryModule;
|
|||||||
import nu.marginalia.storage.FileStorageService;
|
import nu.marginalia.storage.FileStorageService;
|
||||||
import nu.marginalia.storage.model.FileStorageBaseType;
|
import nu.marginalia.storage.model.FileStorageBaseType;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
||||||
|
import org.apache.hc.core5.io.CloseMode;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.security.Security;
|
import java.security.Security;
|
||||||
|
import java.time.Duration;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.time.temporal.ChronoUnit;
|
import java.time.temporal.ChronoUnit;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
@@ -74,7 +78,9 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
|||||||
DomainProcessor domainProcessor,
|
DomainProcessor domainProcessor,
|
||||||
FileStorageService fileStorageService,
|
FileStorageService fileStorageService,
|
||||||
KeywordLoaderService keywordLoaderService,
|
KeywordLoaderService keywordLoaderService,
|
||||||
DocumentLoaderService documentLoaderService, DomainCoordinator domainCoordinator, HikariDataSource dataSource)
|
DocumentLoaderService documentLoaderService,
|
||||||
|
DomainCoordinator domainCoordinator,
|
||||||
|
HikariDataSource dataSource)
|
||||||
throws Exception
|
throws Exception
|
||||||
{
|
{
|
||||||
super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX);
|
super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX);
|
||||||
@@ -148,7 +154,10 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void run() throws Exception {
|
private void run() throws Exception {
|
||||||
Path basePath = fileStorageService.getStorageBase(FileStorageBaseType.STORAGE).asPath().resolve("live-crawl-data");
|
Path basePath = fileStorageService
|
||||||
|
.getStorageBase(FileStorageBaseType.STORAGE)
|
||||||
|
.asPath()
|
||||||
|
.resolve("live-crawl-data");
|
||||||
|
|
||||||
if (!Files.isDirectory(basePath)) {
|
if (!Files.isDirectory(basePath)) {
|
||||||
Files.createDirectories(basePath);
|
Files.createDirectories(basePath);
|
||||||
@@ -163,21 +172,38 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
|||||||
{
|
{
|
||||||
final Instant cutoff = Instant.now().minus(60, ChronoUnit.DAYS);
|
final Instant cutoff = Instant.now().minus(60, ChronoUnit.DAYS);
|
||||||
|
|
||||||
|
/* ------------------------------------------------ */
|
||||||
|
/* Fetch the latest domains from the feeds database */
|
||||||
|
/* ------------------------------------------------ */
|
||||||
|
|
||||||
processHeartbeat.progress(LiveCrawlState.FETCH_LINKS);
|
processHeartbeat.progress(LiveCrawlState.FETCH_LINKS);
|
||||||
|
|
||||||
Map<String, List<String>> urlsPerDomain = new HashMap<>(10_000);
|
Map<String, List<String>> urlsPerDomain = new HashMap<>(10_000);
|
||||||
|
if (!feedsClient.waitReady(Duration.ofHours(1))) {
|
||||||
|
throw new RuntimeException("Feeds client never became ready, cannot proceed with live crawling");
|
||||||
|
}
|
||||||
feedsClient.getUpdatedDomains(cutoff, urlsPerDomain::put);
|
feedsClient.getUpdatedDomains(cutoff, urlsPerDomain::put);
|
||||||
|
|
||||||
logger.info("Fetched data for {} domains", urlsPerDomain.size());
|
logger.info("Fetched data for {} domains", urlsPerDomain.size());
|
||||||
|
|
||||||
|
|
||||||
|
/* ------------------------------------- */
|
||||||
|
/* Prune the database from old entries */
|
||||||
|
/* ------------------------------------- */
|
||||||
|
|
||||||
processHeartbeat.progress(LiveCrawlState.PRUNE_DB);
|
processHeartbeat.progress(LiveCrawlState.PRUNE_DB);
|
||||||
|
|
||||||
// Remove data that is too old
|
|
||||||
dataSet.prune(cutoff);
|
dataSet.prune(cutoff);
|
||||||
|
|
||||||
|
|
||||||
|
/* ------------------------------------- */
|
||||||
|
/* Fetch the links for each domain */
|
||||||
|
/* ------------------------------------- */
|
||||||
|
|
||||||
processHeartbeat.progress(LiveCrawlState.CRAWLING);
|
processHeartbeat.progress(LiveCrawlState.CRAWLING);
|
||||||
|
|
||||||
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainCoordinator, domainQueries, domainBlacklist);
|
CloseableHttpClient client = HttpClientProvider.createClient();
|
||||||
|
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainCoordinator, domainQueries, client, domainBlacklist);
|
||||||
var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling"))
|
var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling"))
|
||||||
{
|
{
|
||||||
for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) {
|
for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) {
|
||||||
@@ -190,18 +216,29 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
|||||||
fetcher.scheduleRetrieval(domain, urls);
|
fetcher.scheduleRetrieval(domain, urls);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
finally {
|
||||||
|
client.close(CloseMode.GRACEFUL);
|
||||||
|
}
|
||||||
|
|
||||||
Path tempPath = dataSet.createWorkDir();
|
Path tempPath = dataSet.createWorkDir();
|
||||||
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
/* ------------------------------------- */
|
||||||
|
/* Process the fetched links */
|
||||||
|
/* ------------------------------------- */
|
||||||
|
|
||||||
processHeartbeat.progress(LiveCrawlState.PROCESSING);
|
processHeartbeat.progress(LiveCrawlState.PROCESSING);
|
||||||
|
|
||||||
try (var hb = heartbeat.createAdHocTaskHeartbeat("Processing");
|
try (var hb = heartbeat.createAdHocTaskHeartbeat("Processing");
|
||||||
var writer = new ConverterBatchWriter(tempPath, 0)
|
var writer = new ConverterBatchWriter(tempPath, 0)
|
||||||
) {
|
) {
|
||||||
// Offset the documents' ordinals toward the upper range, to avoid an ID collisions with the
|
// We need unique document ids that do not collide with the document id from the main index,
|
||||||
// main indexes (the maximum permissible for doc ordinal is value is 67_108_863, so this
|
// so we offset the documents' ordinals toward the upper range.
|
||||||
// leaves us with a lot of headroom still)
|
//
|
||||||
|
// The maximum permissible for doc ordinal is value is 67_108_863,
|
||||||
|
// so this leaves us with a lot of headroom still!
|
||||||
|
// Expected document count here is order of 10 :^)
|
||||||
writer.setOrdinalOffset(67_000_000);
|
writer.setOrdinalOffset(67_000_000);
|
||||||
|
|
||||||
for (SerializableCrawlDataStream stream : hb.wrap("Processing", dataSet.getDataStreams())) {
|
for (SerializableCrawlDataStream stream : hb.wrap("Processing", dataSet.getDataStreams())) {
|
||||||
@@ -209,10 +246,15 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* ---------------------------------------------- */
|
||||||
|
/* Load the processed data into the link database */
|
||||||
|
/* and construct an index journal for the docs */
|
||||||
|
/* ---------------------------------------------- */
|
||||||
|
|
||||||
processHeartbeat.progress(LiveCrawlState.LOADING);
|
processHeartbeat.progress(LiveCrawlState.LOADING);
|
||||||
|
|
||||||
LoaderInputData lid = new LoaderInputData(tempPath, 1);
|
LoaderInputData lid = new LoaderInputData(tempPath, 1);
|
||||||
|
|
||||||
DomainIdRegistry domainIdRegistry = new DbDomainIdRegistry(dataSource);
|
DomainIdRegistry domainIdRegistry = new DbDomainIdRegistry(dataSource);
|
||||||
|
|
||||||
keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, lid);
|
keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, lid);
|
||||||
@@ -224,9 +266,16 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
|||||||
FileUtils.deleteDirectory(tempPath.toFile());
|
FileUtils.deleteDirectory(tempPath.toFile());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Construct the index
|
|
||||||
|
/* ------------------------------------- */
|
||||||
|
/* Finish up */
|
||||||
|
/* ------------------------------------- */
|
||||||
|
|
||||||
processHeartbeat.progress(LiveCrawlState.DONE);
|
processHeartbeat.progress(LiveCrawlState.DONE);
|
||||||
|
|
||||||
|
// After we return from here, the LiveCrawlActor will trigger an index construction
|
||||||
|
// job. Unlike all the stuff we did in this process, it's identical to the real job
|
||||||
|
// so we don't need to do anything special from this process
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -7,7 +7,6 @@ import nu.marginalia.contenttype.ContentType;
|
|||||||
import nu.marginalia.contenttype.DocumentBodyToString;
|
import nu.marginalia.contenttype.DocumentBodyToString;
|
||||||
import nu.marginalia.coordination.DomainCoordinator;
|
import nu.marginalia.coordination.DomainCoordinator;
|
||||||
import nu.marginalia.coordination.DomainLock;
|
import nu.marginalia.coordination.DomainLock;
|
||||||
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
|
|
||||||
import nu.marginalia.crawl.retreival.CrawlDelayTimer;
|
import nu.marginalia.crawl.retreival.CrawlDelayTimer;
|
||||||
import nu.marginalia.db.DbDomainQueries;
|
import nu.marginalia.db.DbDomainQueries;
|
||||||
import nu.marginalia.db.DomainBlacklist;
|
import nu.marginalia.db.DomainBlacklist;
|
||||||
@@ -15,24 +14,21 @@ import nu.marginalia.link_parser.LinkParser;
|
|||||||
import nu.marginalia.model.EdgeDomain;
|
import nu.marginalia.model.EdgeDomain;
|
||||||
import nu.marginalia.model.EdgeUrl;
|
import nu.marginalia.model.EdgeUrl;
|
||||||
import nu.marginalia.util.SimpleBlockingThreadPool;
|
import nu.marginalia.util.SimpleBlockingThreadPool;
|
||||||
|
import org.apache.hc.client5.http.classic.HttpClient;
|
||||||
|
import org.apache.hc.core5.http.ClassicHttpRequest;
|
||||||
|
import org.apache.hc.core5.http.io.entity.EntityUtils;
|
||||||
|
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.net.http.HttpClient;
|
|
||||||
import java.net.http.HttpHeaders;
|
|
||||||
import java.net.http.HttpRequest;
|
|
||||||
import java.net.http.HttpResponse;
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.zip.GZIPInputStream;
|
|
||||||
|
|
||||||
/** A simple link scraper that fetches URLs and stores them in a database,
|
/** A simple link scraper that fetches URLs and stores them in a database,
|
||||||
* with no concept of a crawl frontier, WARC output, or other advanced features
|
* with no concept of a crawl frontier, WARC output, or other advanced features
|
||||||
@@ -45,20 +41,21 @@ public class SimpleLinkScraper implements AutoCloseable {
|
|||||||
private final LiveCrawlDataSet dataSet;
|
private final LiveCrawlDataSet dataSet;
|
||||||
private final DbDomainQueries domainQueries;
|
private final DbDomainQueries domainQueries;
|
||||||
private final DomainBlacklist domainBlacklist;
|
private final DomainBlacklist domainBlacklist;
|
||||||
private final Duration connectTimeout = Duration.ofSeconds(10);
|
|
||||||
private final Duration readTimeout = Duration.ofSeconds(10);
|
|
||||||
private final DomainCoordinator domainCoordinator;
|
private final DomainCoordinator domainCoordinator;
|
||||||
|
|
||||||
private final static int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 10 * 1024 * 1024);
|
private final static int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 10 * 1024 * 1024);
|
||||||
|
private final HttpClient httpClient;
|
||||||
|
|
||||||
public SimpleLinkScraper(LiveCrawlDataSet dataSet,
|
public SimpleLinkScraper(LiveCrawlDataSet dataSet,
|
||||||
DomainCoordinator domainCoordinator,
|
DomainCoordinator domainCoordinator,
|
||||||
DbDomainQueries domainQueries,
|
DbDomainQueries domainQueries,
|
||||||
|
HttpClient httpClient,
|
||||||
DomainBlacklist domainBlacklist) {
|
DomainBlacklist domainBlacklist) {
|
||||||
this.dataSet = dataSet;
|
this.dataSet = dataSet;
|
||||||
this.domainCoordinator = domainCoordinator;
|
this.domainCoordinator = domainCoordinator;
|
||||||
this.domainQueries = domainQueries;
|
this.domainQueries = domainQueries;
|
||||||
this.domainBlacklist = domainBlacklist;
|
this.domainBlacklist = domainBlacklist;
|
||||||
|
this.httpClient = httpClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void scheduleRetrieval(EdgeDomain domain, List<String> urls) {
|
public void scheduleRetrieval(EdgeDomain domain, List<String> urls) {
|
||||||
@@ -75,17 +72,19 @@ public class SimpleLinkScraper implements AutoCloseable {
|
|||||||
|
|
||||||
EdgeUrl rootUrl = domain.toRootUrlHttps();
|
EdgeUrl rootUrl = domain.toRootUrlHttps();
|
||||||
|
|
||||||
List<EdgeUrl> relevantUrls = new ArrayList<>();
|
List<EdgeUrl> relevantUrls = new ArrayList<>(Math.max(1, urls.size()));
|
||||||
|
|
||||||
|
// Resolve absolute URLs
|
||||||
for (var url : urls) {
|
for (var url : urls) {
|
||||||
Optional<EdgeUrl> optParsedUrl = lp.parseLink(rootUrl, url);
|
Optional<EdgeUrl> optParsedUrl = lp.parseLink(rootUrl, url);
|
||||||
if (optParsedUrl.isEmpty()) {
|
|
||||||
|
if (optParsedUrl.isEmpty())
|
||||||
continue;
|
continue;
|
||||||
}
|
|
||||||
if (dataSet.hasUrl(optParsedUrl.get())) {
|
EdgeUrl absoluteUrl = optParsedUrl.get();
|
||||||
continue;
|
|
||||||
}
|
if (!dataSet.hasUrl(absoluteUrl))
|
||||||
relevantUrls.add(optParsedUrl.get());
|
relevantUrls.add(absoluteUrl);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (relevantUrls.isEmpty()) {
|
if (relevantUrls.isEmpty()) {
|
||||||
@@ -94,16 +93,10 @@ public class SimpleLinkScraper implements AutoCloseable {
|
|||||||
|
|
||||||
int fetched = 0;
|
int fetched = 0;
|
||||||
|
|
||||||
try (HttpClient client = HttpClient
|
try (// throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove:
|
||||||
.newBuilder()
|
|
||||||
.connectTimeout(connectTimeout)
|
|
||||||
.followRedirects(HttpClient.Redirect.NEVER)
|
|
||||||
.version(HttpClient.Version.HTTP_2)
|
|
||||||
.build();
|
|
||||||
// throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove:
|
|
||||||
DomainLock lock = domainCoordinator.lockDomain(domain)
|
DomainLock lock = domainCoordinator.lockDomain(domain)
|
||||||
) {
|
) {
|
||||||
SimpleRobotRules rules = fetchRobotsRules(rootUrl, client);
|
SimpleRobotRules rules = fetchRobotsRules(rootUrl);
|
||||||
|
|
||||||
if (rules == null) { // I/O error fetching robots.txt
|
if (rules == null) { // I/O error fetching robots.txt
|
||||||
// If we can't fetch the robots.txt,
|
// If we can't fetch the robots.txt,
|
||||||
@@ -116,18 +109,19 @@ public class SimpleLinkScraper implements AutoCloseable {
|
|||||||
CrawlDelayTimer timer = new CrawlDelayTimer(rules.getCrawlDelay());
|
CrawlDelayTimer timer = new CrawlDelayTimer(rules.getCrawlDelay());
|
||||||
|
|
||||||
for (var parsedUrl : relevantUrls) {
|
for (var parsedUrl : relevantUrls) {
|
||||||
|
|
||||||
if (!rules.isAllowed(parsedUrl.toString())) {
|
if (!rules.isAllowed(parsedUrl.toString())) {
|
||||||
maybeFlagAsBad(parsedUrl);
|
maybeFlagAsBad(parsedUrl);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (fetchUrl(domainId, parsedUrl, timer, client)) {
|
switch (fetchUrl(domainId, parsedUrl, timer)) {
|
||||||
case FetchResult.Success(int id, EdgeUrl docUrl, String body, String headers) -> {
|
case FetchResult.Success(int id, EdgeUrl docUrl, String body, String headers) -> {
|
||||||
dataSet.saveDocument(id, docUrl, body, headers, "");
|
dataSet.saveDocument(id, docUrl, body, headers, "");
|
||||||
fetched++;
|
fetched++;
|
||||||
}
|
}
|
||||||
case FetchResult.Error(EdgeUrl docUrl) -> maybeFlagAsBad(docUrl);
|
case FetchResult.Error(EdgeUrl docUrl) -> {
|
||||||
|
maybeFlagAsBad(docUrl);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -150,96 +144,100 @@ public class SimpleLinkScraper implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
private SimpleRobotRules fetchRobotsRules(EdgeUrl rootUrl, HttpClient client) throws IOException, InterruptedException, URISyntaxException {
|
private SimpleRobotRules fetchRobotsRules(EdgeUrl rootUrl) throws URISyntaxException {
|
||||||
var robotsRequest = HttpRequest.newBuilder(rootUrl.withPathAndParam("/robots.txt", null).asURI())
|
ClassicHttpRequest request = ClassicRequestBuilder.get(rootUrl.withPathAndParam("/robots.txt", null).asURI())
|
||||||
.GET()
|
.setHeader("User-Agent", WmsaHome.getUserAgent().uaString())
|
||||||
.header("User-Agent", WmsaHome.getUserAgent().uaString())
|
.setHeader("Accept-Encoding", "gzip")
|
||||||
.header("Accept-Encoding","gzip")
|
.build();
|
||||||
.timeout(readTimeout);
|
|
||||||
|
|
||||||
// Fetch the robots.txt
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
SimpleRobotRulesParser parser = new SimpleRobotRulesParser();
|
return httpClient.execute(request, rsp -> {
|
||||||
HttpResponse<byte[]> robotsTxt = client.send(robotsRequest.build(), HttpResponse.BodyHandlers.ofByteArray());
|
if (rsp.getEntity() == null) {
|
||||||
|
return null;
|
||||||
if (robotsTxt.statusCode() == 200) {
|
|
||||||
return parser.parseContent(rootUrl.toString(),
|
|
||||||
getResponseData(robotsTxt),
|
|
||||||
robotsTxt.headers().firstValue("Content-Type").orElse("text/plain"),
|
|
||||||
WmsaHome.getUserAgent().uaIdentifier());
|
|
||||||
}
|
}
|
||||||
else if (robotsTxt.statusCode() == 404) {
|
try {
|
||||||
|
if (rsp.getCode() == 200) {
|
||||||
|
var contentTypeHeader = rsp.getFirstHeader("Content-Type");
|
||||||
|
if (contentTypeHeader == null) {
|
||||||
|
return null; // No content type header, can't parse
|
||||||
|
}
|
||||||
|
return new SimpleRobotRulesParser().parseContent(
|
||||||
|
rootUrl.toString(),
|
||||||
|
EntityUtils.toByteArray(rsp.getEntity()),
|
||||||
|
contentTypeHeader.getValue(),
|
||||||
|
WmsaHome.getUserAgent().uaIdentifier()
|
||||||
|
);
|
||||||
|
} else if (rsp.getCode() == 404) {
|
||||||
return new SimpleRobotRules(SimpleRobotRules.RobotRulesMode.ALLOW_ALL);
|
return new SimpleRobotRules(SimpleRobotRules.RobotRulesMode.ALLOW_ALL);
|
||||||
}
|
}
|
||||||
}
|
} finally {
|
||||||
catch (IOException ex) {
|
EntityUtils.consumeQuietly(rsp.getEntity());
|
||||||
logger.error("Error fetching robots.txt for {}: {} {}", rootUrl, ex.getClass().getSimpleName(), ex.getMessage());
|
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
logger.error("Error fetching robots.txt for {}: {}", rootUrl, e.getMessage());
|
||||||
|
return null; // I/O error fetching robots.txt
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
try {
|
||||||
|
TimeUnit.SECONDS.sleep(1);
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Fetch a URL and store it in the database
|
/** Fetch a URL and store it in the database
|
||||||
*/
|
*/
|
||||||
private FetchResult fetchUrl(int domainId, EdgeUrl parsedUrl, CrawlDelayTimer timer, HttpClient client) throws Exception {
|
private FetchResult fetchUrl(int domainId, EdgeUrl parsedUrl, CrawlDelayTimer timer) throws Exception {
|
||||||
|
|
||||||
timer.waitFetchDelay();
|
ClassicHttpRequest request = ClassicRequestBuilder.get(parsedUrl.asURI())
|
||||||
|
.setHeader("User-Agent", WmsaHome.getUserAgent().uaString())
|
||||||
HttpRequest request = HttpRequest.newBuilder(parsedUrl.asURI())
|
.setHeader("Accept", "text/html")
|
||||||
.GET()
|
.setHeader("Accept-Encoding", "gzip")
|
||||||
.header("User-Agent", WmsaHome.getUserAgent().uaString())
|
|
||||||
.header("Accept", "text/html")
|
|
||||||
.header("Accept-Encoding", "gzip")
|
|
||||||
.timeout(readTimeout)
|
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
HttpResponse<byte[]> response = client.send(request, HttpResponse.BodyHandlers.ofByteArray());
|
return httpClient.execute(request, rsp -> {
|
||||||
|
try {
|
||||||
// Handle rate limiting by waiting and retrying once
|
if (rsp.getCode() == 200) {
|
||||||
if (response.statusCode() == 429) {
|
String contentType = rsp.getFirstHeader("Content-Type").getValue();
|
||||||
timer.waitRetryDelay(new HttpFetcherImpl.RateLimitException(
|
|
||||||
response.headers().firstValue("Retry-After").orElse("5")
|
|
||||||
));
|
|
||||||
response = client.send(request, HttpResponse.BodyHandlers.ofByteArray());
|
|
||||||
}
|
|
||||||
|
|
||||||
String contentType = response.headers().firstValue("Content-Type").orElse("").toLowerCase();
|
|
||||||
|
|
||||||
if (response.statusCode() == 200) {
|
|
||||||
if (!contentType.toLowerCase().startsWith("text/html")) {
|
if (!contentType.toLowerCase().startsWith("text/html")) {
|
||||||
return new FetchResult.Error(parsedUrl);
|
return new FetchResult.Error(parsedUrl);
|
||||||
}
|
}
|
||||||
|
|
||||||
byte[] body = getResponseData(response);
|
byte[] body = EntityUtils.toByteArray(rsp.getEntity(), MAX_SIZE);
|
||||||
if (body.length > MAX_SIZE) {
|
|
||||||
return new FetchResult.Error(parsedUrl);
|
|
||||||
}
|
|
||||||
|
|
||||||
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), body);
|
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), body);
|
||||||
|
|
||||||
return new FetchResult.Success(domainId, parsedUrl, bodyText, headersToString(response.headers()));
|
StringBuilder headersStr = new StringBuilder();
|
||||||
}
|
for (var header : rsp.getHeaders()) {
|
||||||
}
|
headersStr.append(header.getName()).append(": ").append(header.getValue()).append("\n");
|
||||||
catch (IOException ex) {
|
|
||||||
// We don't want a full stack trace on every error, as it's quite common and very noisy
|
|
||||||
logger.error("Error fetching URL {}: {} {}", parsedUrl, ex.getClass().getSimpleName(), ex.getMessage());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return new FetchResult.Success(domainId, parsedUrl, bodyText, headersStr.toString());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (rsp.getEntity() != null) {
|
||||||
|
EntityUtils.consumeQuietly(rsp.getEntity());
|
||||||
|
}
|
||||||
|
}
|
||||||
return new FetchResult.Error(parsedUrl);
|
return new FetchResult.Error(parsedUrl);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
catch (IOException e) {
|
||||||
private byte[] getResponseData(HttpResponse<byte[]> response) throws IOException {
|
logger.error("Error fetching {}: {}", parsedUrl, e.getMessage());
|
||||||
String encoding = response.headers().firstValue("Content-Encoding").orElse("");
|
// If we can't fetch the URL, we return an error result
|
||||||
|
// so that the caller can decide what to do with it.
|
||||||
if ("gzip".equals(encoding)) {
|
|
||||||
try (var stream = new GZIPInputStream(new ByteArrayInputStream(response.body()))) {
|
|
||||||
return stream.readAllBytes();
|
|
||||||
}
|
}
|
||||||
|
finally {
|
||||||
|
timer.waitFetchDelay();
|
||||||
}
|
}
|
||||||
else {
|
return new FetchResult.Error(parsedUrl);
|
||||||
return response.body();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sealed interface FetchResult {
|
sealed interface FetchResult {
|
||||||
@@ -247,14 +245,6 @@ public class SimpleLinkScraper implements AutoCloseable {
|
|||||||
record Error(EdgeUrl url) implements FetchResult {}
|
record Error(EdgeUrl url) implements FetchResult {}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String headersToString(HttpHeaders headers) {
|
|
||||||
StringBuilder headersStr = new StringBuilder();
|
|
||||||
headers.map().forEach((k, v) -> {
|
|
||||||
headersStr.append(k).append(": ").append(v).append("\n");
|
|
||||||
});
|
|
||||||
return headersStr.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws Exception {
|
public void close() throws Exception {
|
||||||
pool.shutDown();
|
pool.shutDown();
|
||||||
|
@@ -0,0 +1,126 @@
|
|||||||
|
package nu.marginalia.livecrawler.io;
|
||||||
|
|
||||||
|
import com.google.inject.Provider;
|
||||||
|
import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
|
||||||
|
import org.apache.hc.client5.http.classic.HttpClient;
|
||||||
|
import org.apache.hc.client5.http.config.ConnectionConfig;
|
||||||
|
import org.apache.hc.client5.http.config.RequestConfig;
|
||||||
|
import org.apache.hc.client5.http.cookie.StandardCookieSpec;
|
||||||
|
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
||||||
|
import org.apache.hc.client5.http.impl.classic.HttpClients;
|
||||||
|
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
|
||||||
|
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
|
||||||
|
import org.apache.hc.core5.http.HeaderElement;
|
||||||
|
import org.apache.hc.core5.http.HeaderElements;
|
||||||
|
import org.apache.hc.core5.http.HttpResponse;
|
||||||
|
import org.apache.hc.core5.http.io.SocketConfig;
|
||||||
|
import org.apache.hc.core5.http.message.MessageSupport;
|
||||||
|
import org.apache.hc.core5.http.protocol.HttpContext;
|
||||||
|
import org.apache.hc.core5.util.TimeValue;
|
||||||
|
import org.apache.hc.core5.util.Timeout;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.security.KeyManagementException;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class HttpClientProvider implements Provider<HttpClient> {
|
||||||
|
private static final HttpClient client;
|
||||||
|
private static PoolingHttpClientConnectionManager connectionManager;
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(HttpClientProvider.class);
|
||||||
|
|
||||||
|
static {
|
||||||
|
try {
|
||||||
|
client = createClient();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CloseableHttpClient createClient() throws NoSuchAlgorithmException, KeyManagementException {
|
||||||
|
final ConnectionConfig connectionConfig = ConnectionConfig.custom()
|
||||||
|
.setSocketTimeout(15, TimeUnit.SECONDS)
|
||||||
|
.setConnectTimeout(15, TimeUnit.SECONDS)
|
||||||
|
.setValidateAfterInactivity(TimeValue.ofSeconds(5))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
|
||||||
|
connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
|
||||||
|
.setMaxConnPerRoute(2)
|
||||||
|
.setMaxConnTotal(50)
|
||||||
|
.setDefaultConnectionConfig(connectionConfig)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
connectionManager.setDefaultSocketConfig(SocketConfig.custom()
|
||||||
|
.setSoLinger(TimeValue.ofSeconds(-1))
|
||||||
|
.setSoTimeout(Timeout.ofSeconds(10))
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
|
||||||
|
Thread.ofPlatform().daemon(true).start(() -> {
|
||||||
|
try {
|
||||||
|
for (;;) {
|
||||||
|
TimeUnit.SECONDS.sleep(15);
|
||||||
|
logger.info("Connection pool stats: {}", connectionManager.getTotalStats());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
final RequestConfig defaultRequestConfig = RequestConfig.custom()
|
||||||
|
.setCookieSpec(StandardCookieSpec.IGNORE)
|
||||||
|
.setResponseTimeout(10, TimeUnit.SECONDS)
|
||||||
|
.setConnectionRequestTimeout(5, TimeUnit.MINUTES)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
return HttpClients.custom()
|
||||||
|
.setConnectionManager(connectionManager)
|
||||||
|
.setRetryStrategy(new RetryStrategy())
|
||||||
|
.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
|
||||||
|
// Default keep-alive duration is 3 minutes, but this is too long for us,
|
||||||
|
// as we are either going to re-use it fairly quickly or close it for a long time.
|
||||||
|
//
|
||||||
|
// So we set it to 30 seconds or clamp the server-provided value to a minimum of 10 seconds.
|
||||||
|
private static final TimeValue defaultValue = TimeValue.ofSeconds(30);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimeValue getKeepAliveDuration(HttpResponse response, HttpContext context) {
|
||||||
|
final Iterator<HeaderElement> it = MessageSupport.iterate(response, HeaderElements.KEEP_ALIVE);
|
||||||
|
|
||||||
|
while (it.hasNext()) {
|
||||||
|
final HeaderElement he = it.next();
|
||||||
|
final String param = he.getName();
|
||||||
|
final String value = he.getValue();
|
||||||
|
|
||||||
|
if (value == null)
|
||||||
|
continue;
|
||||||
|
if (!"timeout".equalsIgnoreCase(param))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
try {
|
||||||
|
long timeout = Long.parseLong(value);
|
||||||
|
timeout = Math.clamp(timeout, 30, defaultValue.toSeconds());
|
||||||
|
return TimeValue.ofSeconds(timeout);
|
||||||
|
} catch (final NumberFormatException ignore) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return defaultValue;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.disableRedirectHandling()
|
||||||
|
.setDefaultRequestConfig(defaultRequestConfig)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HttpClient get() {
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@@ -0,0 +1,79 @@
|
|||||||
|
package nu.marginalia.livecrawler.io;
|
||||||
|
|
||||||
|
import org.apache.hc.client5.http.HttpHostConnectException;
|
||||||
|
import org.apache.hc.client5.http.HttpRequestRetryStrategy;
|
||||||
|
import org.apache.hc.core5.http.HttpRequest;
|
||||||
|
import org.apache.hc.core5.http.HttpResponse;
|
||||||
|
import org.apache.hc.core5.http.protocol.HttpContext;
|
||||||
|
import org.apache.hc.core5.util.TimeValue;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.net.ssl.SSLException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.SocketException;
|
||||||
|
import java.net.SocketTimeoutException;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
|
|
||||||
|
public class RetryStrategy implements HttpRequestRetryStrategy {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(RetryStrategy.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean retryRequest(HttpRequest request, IOException exception, int executionCount, HttpContext context) {
|
||||||
|
return switch (exception) {
|
||||||
|
case SocketTimeoutException ste -> false;
|
||||||
|
case SSLException ssle -> false;
|
||||||
|
case UnknownHostException uhe -> false;
|
||||||
|
case HttpHostConnectException ex -> executionCount < 2;
|
||||||
|
case SocketException ex -> executionCount < 2;
|
||||||
|
default -> executionCount <= 3;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean retryRequest(HttpResponse response, int executionCount, HttpContext context) {
|
||||||
|
return switch (response.getCode()) {
|
||||||
|
case 500, 503 -> executionCount <= 2;
|
||||||
|
case 429 -> executionCount <= 3;
|
||||||
|
default -> false;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimeValue getRetryInterval(HttpRequest request, IOException exception, int executionCount, HttpContext context) {
|
||||||
|
return TimeValue.ofSeconds(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimeValue getRetryInterval(HttpResponse response, int executionCount, HttpContext context) {
|
||||||
|
|
||||||
|
int statusCode = response.getCode();
|
||||||
|
|
||||||
|
// Give 503 a bit more time
|
||||||
|
if (statusCode == 503) return TimeValue.ofSeconds(5);
|
||||||
|
|
||||||
|
if (statusCode == 429) {
|
||||||
|
// get the Retry-After header
|
||||||
|
var retryAfterHeader = response.getFirstHeader("Retry-After");
|
||||||
|
if (retryAfterHeader == null) {
|
||||||
|
return TimeValue.ofSeconds(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
String retryAfter = retryAfterHeader.getValue();
|
||||||
|
if (retryAfter == null) {
|
||||||
|
return TimeValue.ofSeconds(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
int retryAfterTime = Integer.parseInt(retryAfter);
|
||||||
|
retryAfterTime = Math.clamp(retryAfterTime, 1, 5);
|
||||||
|
|
||||||
|
return TimeValue.ofSeconds(retryAfterTime);
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
logger.warn("Invalid Retry-After header: {}", retryAfter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TimeValue.ofSeconds(2);
|
||||||
|
}
|
||||||
|
}
|
@@ -3,10 +3,13 @@ package nu.marginalia.livecrawler;
|
|||||||
import nu.marginalia.coordination.LocalDomainCoordinator;
|
import nu.marginalia.coordination.LocalDomainCoordinator;
|
||||||
import nu.marginalia.db.DomainBlacklistImpl;
|
import nu.marginalia.db.DomainBlacklistImpl;
|
||||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||||
|
import nu.marginalia.livecrawler.io.HttpClientProvider;
|
||||||
import nu.marginalia.model.EdgeDomain;
|
import nu.marginalia.model.EdgeDomain;
|
||||||
import nu.marginalia.model.EdgeUrl;
|
import nu.marginalia.model.EdgeUrl;
|
||||||
import nu.marginalia.model.crawldata.CrawledDocument;
|
import nu.marginalia.model.crawldata.CrawledDocument;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
||||||
|
import org.apache.hc.core5.io.CloseMode;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
@@ -16,29 +19,34 @@ import org.mockito.Mockito;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
import java.security.KeyManagementException;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
class SimpleLinkScraperTest {
|
class SimpleLinkScraperTest {
|
||||||
private Path tempDir;
|
private Path tempDir;
|
||||||
private LiveCrawlDataSet dataSet;
|
private LiveCrawlDataSet dataSet;
|
||||||
|
private CloseableHttpClient httpClient;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void setUp() throws IOException, SQLException {
|
public void setUp() throws IOException, SQLException, NoSuchAlgorithmException, KeyManagementException {
|
||||||
tempDir = Files.createTempDirectory(getClass().getSimpleName());
|
tempDir = Files.createTempDirectory(getClass().getSimpleName());
|
||||||
dataSet = new LiveCrawlDataSet(tempDir);
|
dataSet = new LiveCrawlDataSet(tempDir);
|
||||||
|
httpClient = HttpClientProvider.createClient();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
dataSet.close();
|
dataSet.close();
|
||||||
|
httpClient.close(CloseMode.IMMEDIATE);
|
||||||
FileUtils.deleteDirectory(tempDir.toFile());
|
FileUtils.deleteDirectory(tempDir.toFile());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRetrieveNow() throws Exception {
|
public void testRetrieveNow() throws Exception {
|
||||||
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(), null, Mockito.mock(DomainBlacklistImpl.class));
|
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(), null, httpClient, Mockito.mock(DomainBlacklistImpl.class));
|
||||||
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
|
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
|
||||||
Assertions.assertEquals(1, fetched);
|
Assertions.assertEquals(1, fetched);
|
||||||
|
|
||||||
@@ -58,7 +66,7 @@ class SimpleLinkScraperTest {
|
|||||||
@Test
|
@Test
|
||||||
public void testRetrieveNow_Redundant() throws Exception {
|
public void testRetrieveNow_Redundant() throws Exception {
|
||||||
dataSet.saveDocument(1, new EdgeUrl("https://www.marginalia.nu/"), "<html>", "", "127.0.0.1");
|
dataSet.saveDocument(1, new EdgeUrl("https://www.marginalia.nu/"), "<html>", "", "127.0.0.1");
|
||||||
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(),null, Mockito.mock(DomainBlacklistImpl.class));
|
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(),null, httpClient, Mockito.mock(DomainBlacklistImpl.class));
|
||||||
|
|
||||||
// If the requested URL is already in the dataSet, we retrieveNow should shortcircuit and not fetch anything
|
// If the requested URL is already in the dataSet, we retrieveNow should shortcircuit and not fetch anything
|
||||||
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
|
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
|
||||||
|
12
code/processes/new-domain-process/README.md
Normal file
12
code/processes/new-domain-process/README.md
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
The new domain process (NDP) is a process that evaluates new domains for
|
||||||
|
inclusion in the search engine index.
|
||||||
|
|
||||||
|
It visits the root document of each candidate domain, ensures that it's reachable,
|
||||||
|
verifies that the response is valid HTML, and checks for a few factors such as length
|
||||||
|
and links before deciding whether to assign the domain to a node.
|
||||||
|
|
||||||
|
The NDP process will assign new domains to the node with the fewest assigned domains.
|
||||||
|
|
||||||
|
The NDP process is triggered with a goal target number of domains to process, and
|
||||||
|
will find domains until that target is reached. If e.g. a goal of 100 is set,
|
||||||
|
and 50 are in the index, it will find 50 more domains.
|
@@ -32,6 +32,8 @@ dependencies {
|
|||||||
implementation project(':code:libraries:message-queue')
|
implementation project(':code:libraries:message-queue')
|
||||||
implementation project(':code:libraries:blocking-thread-pool')
|
implementation project(':code:libraries:blocking-thread-pool')
|
||||||
|
|
||||||
|
implementation project(':code:functions:link-graph:api')
|
||||||
|
|
||||||
implementation project(':code:processes:process-mq-api')
|
implementation project(':code:processes:process-mq-api')
|
||||||
implementation project(':code:processes:crawling-process:ft-content-type')
|
implementation project(':code:processes:crawling-process:ft-content-type')
|
||||||
implementation project(':code:processes:crawling-process:ft-link-parser')
|
implementation project(':code:processes:crawling-process:ft-link-parser')
|
||||||
|
@@ -2,40 +2,41 @@ package nu.marginalia.ndp;
|
|||||||
|
|
||||||
|
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import com.google.inject.Singleton;
|
||||||
import nu.marginalia.WmsaHome;
|
import nu.marginalia.WmsaHome;
|
||||||
import nu.marginalia.contenttype.ContentType;
|
import nu.marginalia.contenttype.ContentType;
|
||||||
import nu.marginalia.contenttype.DocumentBodyToString;
|
import nu.marginalia.contenttype.DocumentBodyToString;
|
||||||
import nu.marginalia.coordination.DomainCoordinator;
|
import nu.marginalia.coordination.DomainCoordinator;
|
||||||
import nu.marginalia.link_parser.LinkParser;
|
import nu.marginalia.link_parser.LinkParser;
|
||||||
import nu.marginalia.model.EdgeDomain;
|
import nu.marginalia.model.EdgeDomain;
|
||||||
|
import nu.marginalia.model.EdgeUrl;
|
||||||
import nu.marginalia.ndp.io.HttpClientProvider;
|
import nu.marginalia.ndp.io.HttpClientProvider;
|
||||||
import nu.marginalia.ndp.model.DomainToTest;
|
|
||||||
import org.apache.hc.client5.http.classic.HttpClient;
|
import org.apache.hc.client5.http.classic.HttpClient;
|
||||||
import org.apache.hc.core5.http.ClassicHttpResponse;
|
|
||||||
import org.apache.hc.core5.http.io.entity.EntityUtils;
|
import org.apache.hc.core5.http.io.entity.EntityUtils;
|
||||||
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
|
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
|
||||||
import org.jsoup.Jsoup;
|
import org.jsoup.Jsoup;
|
||||||
import org.jsoup.nodes.Document;
|
import org.jsoup.nodes.Document;
|
||||||
|
import org.jsoup.nodes.Element;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.io.InputStream;
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.security.KeyManagementException;
|
import java.security.KeyManagementException;
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.Instant;
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/** Evaluates a domain to determine if it is worth indexing.
|
||||||
|
* This class fetches the root document, checks the response code, content type,
|
||||||
|
* and parses the HTML to ensure it smells alright.
|
||||||
|
*/
|
||||||
|
@Singleton
|
||||||
public class DomainEvaluator {
|
public class DomainEvaluator {
|
||||||
private final HttpClient client;
|
private final HttpClient client;
|
||||||
private final String userAgentString = WmsaHome.getUserAgent().uaString();
|
private final String userAgentString = WmsaHome.getUserAgent().uaString();
|
||||||
|
|
||||||
private final LinkParser linkParser = new LinkParser();
|
private final LinkParser linkParser = new LinkParser();
|
||||||
private final DomainCoordinator domainCoordinator;
|
private final DomainCoordinator domainCoordinator;
|
||||||
sealed interface FetchResult permits FetchSuccess, FetchFailure {}
|
|
||||||
record FetchSuccess(Document content) implements FetchResult {}
|
|
||||||
record FetchFailure(String reason) implements FetchResult {}
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public DomainEvaluator(DomainCoordinator domainCoordinator) throws NoSuchAlgorithmException, KeyManagementException {
|
public DomainEvaluator(DomainCoordinator domainCoordinator) throws NoSuchAlgorithmException, KeyManagementException {
|
||||||
@@ -43,100 +44,103 @@ public class DomainEvaluator {
|
|||||||
client = HttpClientProvider.createClient();
|
client = HttpClientProvider.createClient();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean evaluateDomain(DomainToTest domain) throws Exception {
|
public boolean evaluateDomain(String domainName) {
|
||||||
var edgeDomain = new EdgeDomain(domain.domainName());
|
var edgeDomain = new EdgeDomain(domainName);
|
||||||
|
|
||||||
|
// Grab a lock on the domain to prevent concurrent evaluations between processes
|
||||||
try (var lock = domainCoordinator.lockDomain(edgeDomain)) {
|
try (var lock = domainCoordinator.lockDomain(edgeDomain)) {
|
||||||
var result = fetch(domain.domainName());
|
var rootUrl = edgeDomain.toRootUrlHttps();
|
||||||
|
|
||||||
Instant start = Instant.now();
|
var request = ClassicRequestBuilder.get(rootUrl.asURI())
|
||||||
|
|
||||||
var ret = switch(result) {
|
|
||||||
case FetchSuccess(Document content) -> validateHtml(content, edgeDomain);
|
|
||||||
case FetchFailure failure -> false;
|
|
||||||
};
|
|
||||||
|
|
||||||
// Sleep for up to 1 second before we yield the lock to respect rate limits reasonably well
|
|
||||||
Instant end = Instant.now();
|
|
||||||
Duration sleepDuration = Duration.ofSeconds(1).minus(Duration.between(start, end));
|
|
||||||
|
|
||||||
if (sleepDuration.isPositive()) {
|
|
||||||
TimeUnit.MILLISECONDS.sleep(sleepDuration.toMillis());
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean validateHtml(Document content, EdgeDomain domain) {
|
|
||||||
var rootUrl = domain.toRootUrlHttps();
|
|
||||||
var text = content.body().text();
|
|
||||||
|
|
||||||
if (text.length() < 100) {
|
|
||||||
return false; // Too short to be a valid page
|
|
||||||
}
|
|
||||||
|
|
||||||
if (text.contains("404 Not Found") || text.contains("Page not found")) {
|
|
||||||
return false; // Common indicators of a 404 page
|
|
||||||
}
|
|
||||||
|
|
||||||
for (var metaTag : content.select("meta")) {
|
|
||||||
if ("refresh".equalsIgnoreCase(metaTag.attr("http-equiv"))) {
|
|
||||||
return false; // Page has a refresh tag, very likely a parked domain
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean hasInternalLink = false;
|
|
||||||
|
|
||||||
for (var atag : content.select("a")) {
|
|
||||||
var link = linkParser.parseLink(rootUrl, atag);
|
|
||||||
if (link.isEmpty()) {
|
|
||||||
continue; // Skip invalid links
|
|
||||||
}
|
|
||||||
var edgeUrl = link.get();
|
|
||||||
if (Objects.equals(domain, edgeUrl.getDomain())) {
|
|
||||||
hasInternalLink = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return hasInternalLink;
|
|
||||||
}
|
|
||||||
|
|
||||||
private FetchResult fetch(String domain) throws URISyntaxException {
|
|
||||||
var uri = new URI("https://" + domain + "/");
|
|
||||||
|
|
||||||
var request = ClassicRequestBuilder.get(uri)
|
|
||||||
.addHeader("User-Agent", userAgentString)
|
.addHeader("User-Agent", userAgentString)
|
||||||
.addHeader("Accept-Encoding", "gzip")
|
.addHeader("Accept-Encoding", "gzip")
|
||||||
.addHeader("Accept", "text/html,application/xhtml+xml;q=0.9")
|
.addHeader("Accept", "text/html,application/xhtml+xml;q=0.9")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
try {
|
return client.execute(request, (rsp) -> {
|
||||||
return client.execute(request, (rsp) -> responseHandler(rsp, domain));
|
|
||||||
} catch (Exception e) {
|
|
||||||
return new FetchFailure("Failed to fetch domain: " + e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private FetchResult responseHandler(ClassicHttpResponse rsp, String domain) {
|
|
||||||
if (rsp.getEntity() == null)
|
if (rsp.getEntity() == null)
|
||||||
return new FetchFailure("No content returned from " + domain);
|
return false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
int code = rsp.getCode();
|
// Check if the response code indicates a successful fetch
|
||||||
byte[] content = rsp.getEntity().getContent().readAllBytes();
|
if (200 != rsp.getCode()) {
|
||||||
|
return false;
|
||||||
if (code >= 300) {
|
|
||||||
return new FetchFailure("Received HTTP " + code + " from " + domain);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
byte[] content;
|
||||||
|
// Read the content from the response entity
|
||||||
|
try (InputStream contentStream = rsp.getEntity().getContent()) {
|
||||||
|
content = contentStream.readNBytes(8192);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse the content (if it's valid)
|
||||||
ContentType contentType = ContentType.parse(rsp.getEntity().getContentType());
|
ContentType contentType = ContentType.parse(rsp.getEntity().getContentType());
|
||||||
var html = DocumentBodyToString.getStringData(contentType, content);
|
|
||||||
return new FetchSuccess(Jsoup.parse(html));
|
// Validate the content type
|
||||||
|
if (!contentType.contentType().startsWith("text/html") && !contentType.contentType().startsWith("application/xhtml+xml"))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
// Parse the document body to a Jsoup Document
|
||||||
|
final Document document = Jsoup.parse(DocumentBodyToString.getStringData(contentType, content));
|
||||||
|
final String text = document.body().text();
|
||||||
|
|
||||||
|
if (text.length() < 100)
|
||||||
|
return false;
|
||||||
|
if (text.contains("404 Not Found") || text.contains("Page not found"))
|
||||||
|
return false;
|
||||||
|
if (hasMetaRefresh(document))
|
||||||
|
return false; // This almost always indicates a parked domain
|
||||||
|
if (!hasInternalLink(document, edgeDomain, rootUrl))
|
||||||
|
return false; // No internal links means it's not worth indexing
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
// May or may not be necessary, but let's ensure we clean up the response entity
|
||||||
|
// to avoid resource leaks
|
||||||
EntityUtils.consumeQuietly(rsp.getEntity());
|
EntityUtils.consumeQuietly(rsp.getEntity());
|
||||||
return new FetchFailure("Failed to read content from " + domain + ": " + e.getMessage());
|
|
||||||
|
// Sleep for a while before yielding the lock, to avoid immediately hammering the domain
|
||||||
|
// from another process
|
||||||
|
sleepQuietly(Duration.ofSeconds(1));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
catch (Exception ex) {
|
||||||
|
return false; // If we fail to fetch or parse the domain, we consider it invalid
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean hasInternalLink(Document document, EdgeDomain currentDomain, EdgeUrl rootUrl) {
|
||||||
|
for (Element atag : document.select("a")) {
|
||||||
|
Optional<EdgeDomain> destDomain = linkParser
|
||||||
|
.parseLink(rootUrl, atag)
|
||||||
|
.map(EdgeUrl::getDomain);
|
||||||
|
|
||||||
|
if (destDomain.isPresent() && Objects.equals(currentDomain, destDomain.get()))
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean hasMetaRefresh(Document document) {
|
||||||
|
for (Element metaTag : document.select("meta")) {
|
||||||
|
if ("refresh".equalsIgnoreCase(metaTag.attr("http-equiv")))
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sleepQuietly(Duration duration) {
|
||||||
|
try {
|
||||||
|
TimeUnit.MILLISECONDS.sleep(duration.toMillis());
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@@ -16,6 +16,9 @@ public class DomainNodeAllocator {
|
|||||||
|
|
||||||
private final NodeConfigurationService nodeConfigurationService;
|
private final NodeConfigurationService nodeConfigurationService;
|
||||||
private final HikariDataSource dataSource;
|
private final HikariDataSource dataSource;
|
||||||
|
private final PriorityQueue<NodeCount> countPerNode = new PriorityQueue<>();
|
||||||
|
|
||||||
|
private volatile boolean initialized = false;
|
||||||
|
|
||||||
private record NodeCount(int nodeId, int count)
|
private record NodeCount(int nodeId, int count)
|
||||||
implements Comparable<NodeCount>
|
implements Comparable<NodeCount>
|
||||||
@@ -30,8 +33,6 @@ public class DomainNodeAllocator {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final PriorityQueue<NodeCount> countPerNode = new PriorityQueue<>();
|
|
||||||
volatile boolean initialized = false;
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public DomainNodeAllocator(NodeConfigurationService nodeConfigurationService, HikariDataSource dataSource) {
|
public DomainNodeAllocator(NodeConfigurationService nodeConfigurationService, HikariDataSource dataSource) {
|
||||||
@@ -43,6 +44,43 @@ public class DomainNodeAllocator {
|
|||||||
.start(this::initialize);
|
.start(this::initialize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized int totalCount() {
|
||||||
|
ensureInitialized();
|
||||||
|
return countPerNode.stream().mapToInt(NodeCount::count).sum();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns the next node ID to assign a domain to.
|
||||||
|
* This method is synchronized to ensure thread safety when multiple threads are allocating domains.
|
||||||
|
* The node ID returned is guaranteed to be one of the viable nodes configured in the system.
|
||||||
|
*/
|
||||||
|
public synchronized int nextNodeId() {
|
||||||
|
ensureInitialized();
|
||||||
|
|
||||||
|
// Synchronized is fine here as this is not a hot path
|
||||||
|
// (and PriorityBlockingQueue won't help since we're re-adding the same element with a new count all the time)
|
||||||
|
|
||||||
|
NodeCount allocation = countPerNode.remove();
|
||||||
|
countPerNode.add(allocation.incrementCount());
|
||||||
|
return allocation.nodeId();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void ensureInitialized() {
|
||||||
|
if (initialized) return;
|
||||||
|
|
||||||
|
synchronized (this) {
|
||||||
|
while (!initialized) {
|
||||||
|
try {
|
||||||
|
// Wait until the initialization is complete
|
||||||
|
this.wait(1000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new RuntimeException("DomainAllocator initialization interrupted", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public void initialize() {
|
public void initialize() {
|
||||||
if (initialized) return;
|
if (initialized) return;
|
||||||
@@ -53,6 +91,9 @@ public class DomainNodeAllocator {
|
|||||||
for (var node : nodeConfigurationService.getAll()) {
|
for (var node : nodeConfigurationService.getAll()) {
|
||||||
if (node.disabled())
|
if (node.disabled())
|
||||||
continue;
|
continue;
|
||||||
|
if (!node.autoAssignDomains())
|
||||||
|
continue;
|
||||||
|
|
||||||
if (node.profile().permitBatchCrawl())
|
if (node.profile().permitBatchCrawl())
|
||||||
viableNodes.add(node.node());
|
viableNodes.add(node.node());
|
||||||
}
|
}
|
||||||
@@ -89,39 +130,5 @@ public class DomainNodeAllocator {
|
|||||||
initialized = true;
|
initialized = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void ensureInitialized() {
|
|
||||||
if (initialized) return;
|
|
||||||
|
|
||||||
synchronized (this) {
|
|
||||||
while (!initialized) {
|
|
||||||
try {
|
|
||||||
// Wait until the initialization is complete
|
|
||||||
this.wait(1000);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new RuntimeException("DomainAllocator initialization interrupted", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized int totalCount() {
|
|
||||||
ensureInitialized();
|
|
||||||
return countPerNode.stream().mapToInt(NodeCount::count).sum();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns the next node ID to assign a domain to.
|
|
||||||
* This method is synchronized to ensure thread safety when multiple threads are allocating domains.
|
|
||||||
* The node ID returned is guaranteed to be one of the viable nodes configured in the system.
|
|
||||||
*/
|
|
||||||
public synchronized int nextNodeId() {
|
|
||||||
ensureInitialized();
|
|
||||||
|
|
||||||
// Synchronized is fine here as this is not a hot path
|
|
||||||
// (and PriorityBlockingQueue won't help since we're re-adding the same element with a new count all the time)
|
|
||||||
|
|
||||||
NodeCount allocation = countPerNode.remove();
|
|
||||||
countPerNode.add(allocation.incrementCount());
|
|
||||||
return allocation.nodeId();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@@ -2,30 +2,41 @@ package nu.marginalia.ndp;
|
|||||||
|
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.zaxxer.hikari.HikariDataSource;
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
|
import it.unimi.dsi.fastutil.ints.Int2IntMap;
|
||||||
|
import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
|
||||||
|
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
|
||||||
|
import nu.marginalia.api.linkgraph.AggregateLinkGraphClient;
|
||||||
import nu.marginalia.ndp.model.DomainToTest;
|
import nu.marginalia.ndp.model.DomainToTest;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
public class DomainTestingQueue {
|
public class DomainTestingQueue {
|
||||||
private final ArrayBlockingQueue<DomainToTest> queue = new ArrayBlockingQueue<>(1000);
|
private static Logger logger = LoggerFactory.getLogger(DomainTestingQueue.class);
|
||||||
|
|
||||||
|
private final ArrayBlockingQueue<DomainToTest> queue = new ArrayBlockingQueue<>(2);
|
||||||
|
|
||||||
// This will grow quite large, but should be manageable in memory, as theoretical maximum is around 100M domains,
|
// This will grow quite large, but should be manageable in memory, as theoretical maximum is around 100M domains,
|
||||||
// order of 2 GB in memory.
|
// order of 2 GB in memory.
|
||||||
private final ConcurrentHashMap<String, Boolean> takenDomains = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, Boolean> takenDomains = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private final HikariDataSource dataSource;
|
private final HikariDataSource dataSource;
|
||||||
|
private final AggregateLinkGraphClient linkGraphClient;
|
||||||
|
|
||||||
private static Logger logger = LoggerFactory.getLogger(DomainTestingQueue.class);
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public DomainTestingQueue(HikariDataSource dataSource) {
|
public DomainTestingQueue(HikariDataSource dataSource,
|
||||||
|
AggregateLinkGraphClient linkGraphClient
|
||||||
|
) {
|
||||||
this.dataSource = dataSource;
|
this.dataSource = dataSource;
|
||||||
|
this.linkGraphClient = linkGraphClient;
|
||||||
|
|
||||||
Thread.ofPlatform()
|
Thread.ofPlatform()
|
||||||
.name("DomainTestingQueue::fetch()")
|
.name("DomainTestingQueue::fetch()")
|
||||||
@@ -43,9 +54,10 @@ public class DomainTestingQueue {
|
|||||||
SET STATE='ACCEPTED'
|
SET STATE='ACCEPTED'
|
||||||
WHERE DOMAIN_ID=?
|
WHERE DOMAIN_ID=?
|
||||||
""");
|
""");
|
||||||
var assigNodeStmt = conn.prepareStatement("""
|
var assignNodeStmt = conn.prepareStatement("""
|
||||||
UPDATE EC_DOMAIN SET NODE_AFFINITY=?
|
UPDATE EC_DOMAIN SET NODE_AFFINITY=?
|
||||||
WHERE ID=?
|
WHERE ID=?
|
||||||
|
AND EC_DOMAIN.NODE_AFFINITY < 0
|
||||||
""")
|
""")
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
@@ -53,9 +65,9 @@ public class DomainTestingQueue {
|
|||||||
flagOkStmt.setInt(1, domain.domainId());
|
flagOkStmt.setInt(1, domain.domainId());
|
||||||
flagOkStmt.executeUpdate();
|
flagOkStmt.executeUpdate();
|
||||||
|
|
||||||
assigNodeStmt.setInt(1, nodeId);
|
assignNodeStmt.setInt(1, nodeId);
|
||||||
assigNodeStmt.setInt(2, domain.domainId());
|
assignNodeStmt.setInt(2, domain.domainId());
|
||||||
assigNodeStmt.executeUpdate();
|
assignNodeStmt.executeUpdate();
|
||||||
conn.commit();
|
conn.commit();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException("Failed to accept domain in database", e);
|
throw new RuntimeException("Failed to accept domain in database", e);
|
||||||
@@ -105,9 +117,14 @@ public class DomainTestingQueue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (domains.isEmpty()) {
|
if (domains.isEmpty()) {
|
||||||
refreshQueue(conn);
|
if (!refreshQueue(conn)) {
|
||||||
|
throw new RuntimeException("No new domains found, aborting!");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
catch (RuntimeException e) {
|
||||||
|
throw e; // Rethrow runtime exceptions to avoid wrapping them in another runtime exception
|
||||||
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
throw new RuntimeException("Failed to fetch domains from database", e);
|
throw new RuntimeException("Failed to fetch domains from database", e);
|
||||||
}
|
}
|
||||||
@@ -124,25 +141,100 @@ public class DomainTestingQueue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void refreshQueue(Connection conn) {
|
private boolean refreshQueue(Connection conn) {
|
||||||
logger.info("Refreshing domain queue in database");
|
logger.info("Refreshing domain queue in database");
|
||||||
try (var stmt = conn.createStatement()) {
|
|
||||||
conn.setAutoCommit(false);
|
|
||||||
logger.info("Revitalizing rejected domains");
|
|
||||||
|
|
||||||
// Revitalize rejected domains
|
Int2IntMap domainIdToCount = new Int2IntOpenHashMap();
|
||||||
stmt.executeUpdate("""
|
|
||||||
UPDATE NDP_NEW_DOMAINS
|
// Load known domain IDs from the database to avoid inserting duplicates from NDP_NEW_DOMAINS
|
||||||
SET STATE='NEW'
|
// or domains that are already assigned to a node
|
||||||
WHERE NDP_NEW_DOMAINS.STATE = 'REJECTED'
|
{
|
||||||
AND DATE_ADD(TS_CHANGE, INTERVAL CHECK_COUNT DAY) > NOW()
|
IntOpenHashSet knownIds = new IntOpenHashSet();
|
||||||
""");
|
|
||||||
|
try (var stmt = conn.createStatement()) {
|
||||||
|
ResultSet rs = stmt.executeQuery("SELECT DOMAIN_ID FROM NDP_NEW_DOMAINS");
|
||||||
|
rs.setFetchSize(10_000);
|
||||||
|
while (rs.next()) {
|
||||||
|
int domainId = rs.getInt("DOMAIN_ID");
|
||||||
|
knownIds.add(domainId);
|
||||||
|
}
|
||||||
|
|
||||||
|
rs = stmt.executeQuery("SELECT ID FROM EC_DOMAIN WHERE NODE_AFFINITY>=0");
|
||||||
|
rs.setFetchSize(10_000);
|
||||||
|
while (rs.next()) {
|
||||||
|
int domainId = rs.getInt("ID");
|
||||||
|
knownIds.add(domainId);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException("Failed to load known domain IDs from database", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure the link graph is ready before proceeding. This is mainly necessary in a cold reboot
|
||||||
|
// of the entire system.
|
||||||
|
try {
|
||||||
|
logger.info("Waiting for link graph client to be ready...");
|
||||||
|
linkGraphClient.waitReady(Duration.ofHours(1));
|
||||||
|
logger.info("Link graph client is ready, fetching domain links...");
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch all domain links from the link graph and count by how many sources each dest domain is linked from
|
||||||
|
var iter = linkGraphClient.getAllDomainLinks().iterator();
|
||||||
|
while (iter.advance()) {
|
||||||
|
int dest = iter.dest();
|
||||||
|
if (!knownIds.contains(dest)) {
|
||||||
|
domainIdToCount.mergeInt(dest, 1, (i, j) -> i + j);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean didInsert = false;
|
||||||
|
|
||||||
|
/* Insert new domains into NDP_NEW_DOMAINS table */
|
||||||
|
try (var insertStmt = conn.prepareStatement("""
|
||||||
|
INSERT IGNORE INTO NDP_NEW_DOMAINS (DOMAIN_ID, PRIORITY) VALUES (?, ?)
|
||||||
|
""")) {
|
||||||
|
conn.setAutoCommit(false);
|
||||||
|
|
||||||
|
int cnt = 0;
|
||||||
|
for (var entry : domainIdToCount.int2IntEntrySet()) {
|
||||||
|
int domainId = entry.getIntKey();
|
||||||
|
int count = entry.getIntValue();
|
||||||
|
|
||||||
|
insertStmt.setInt(1, domainId);
|
||||||
|
insertStmt.setInt(2, count);
|
||||||
|
insertStmt.addBatch();
|
||||||
|
|
||||||
|
if (++cnt >= 1000) {
|
||||||
|
cnt = 0;
|
||||||
|
insertStmt.executeBatch(); // Execute in batches to avoid memory issues
|
||||||
conn.commit();
|
conn.commit();
|
||||||
|
didInsert = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (cnt != 0) {
|
||||||
|
insertStmt.executeBatch(); // Execute any remaining batch
|
||||||
|
conn.commit();
|
||||||
|
didInsert = true;
|
||||||
|
}
|
||||||
|
|
||||||
logger.info("Queue refreshed successfully");
|
logger.info("Queue refreshed successfully");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException("Failed to refresh queue in database", e);
|
throw new RuntimeException("Failed to refresh queue in database", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clean up NDP_NEW_DOMAINS table to remove any domains that are already in EC_DOMAIN
|
||||||
|
// This acts not only to clean up domains that we've flagged as ACCEPTED, but also to
|
||||||
|
// repair inconsistent states where domains might have incorrectly been added to NDP_NEW_DOMAINS
|
||||||
|
try (var stmt = conn.createStatement()) {
|
||||||
|
stmt.executeUpdate("DELETE FROM NDP_NEW_DOMAINS WHERE DOMAIN_ID IN (SELECT ID FROM EC_DOMAIN WHERE NODE_AFFINITY>=0)");
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException("Failed to clean up NDP_NEW_DOMAINS", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return didInsert;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@@ -84,23 +84,10 @@ public class NdpMain extends ProcessMainClass {
|
|||||||
hb.progress("Discovery Process", cnt, toInsertCount);
|
hb.progress("Discovery Process", cnt, toInsertCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
var nextDomain = domainTestingQueue.next();
|
final DomainToTest nextDomain = domainTestingQueue.next();
|
||||||
threadPool.submit(() -> evaluateDomain(nextDomain));
|
threadPool.submit(() -> {
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
threadPool.shutDown();
|
|
||||||
// Wait for all tasks to complete or give up after 1 hour
|
|
||||||
threadPool.awaitTermination(1, TimeUnit.HOURS);
|
|
||||||
|
|
||||||
logger.info("NDP process completed. Total domains processed: " + domainCount.get());
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private void evaluateDomain(DomainToTest nextDomain) {
|
|
||||||
try {
|
try {
|
||||||
if (domainEvaluator.evaluateDomain(nextDomain)) {
|
if (domainEvaluator.evaluateDomain(nextDomain.domainName())) {
|
||||||
logger.info("Accepting: {}", nextDomain.domainName());
|
logger.info("Accepting: {}", nextDomain.domainName());
|
||||||
domainCount.incrementAndGet();
|
domainCount.incrementAndGet();
|
||||||
domainTestingQueue.accept(nextDomain, domainNodeAllocator.nextNodeId());
|
domainTestingQueue.accept(nextDomain, domainNodeAllocator.nextNodeId());
|
||||||
@@ -113,6 +100,16 @@ public class NdpMain extends ProcessMainClass {
|
|||||||
domainTestingQueue.reject(nextDomain);
|
domainTestingQueue.reject(nextDomain);
|
||||||
logger.error("Error evaluating domain: " + nextDomain.domainId(), e);
|
logger.error("Error evaluating domain: " + nextDomain.domainId(), e);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
threadPool.shutDown();
|
||||||
|
// Wait for all tasks to complete or give up after 1 hour
|
||||||
|
threadPool.awaitTermination(1, TimeUnit.HOURS);
|
||||||
|
|
||||||
|
logger.info("NDP process completed. Total domains processed: " + domainCount.get());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
@@ -0,0 +1,29 @@
|
|||||||
|
package nu.marginalia.ndp;
|
||||||
|
|
||||||
|
import nu.marginalia.coordination.LocalDomainCoordinator;
|
||||||
|
import org.junit.jupiter.api.Tag;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.security.KeyManagementException;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
class DomainEvaluatorTest {
|
||||||
|
|
||||||
|
@Tag("flaky") // Exclude from CI runs due to potential network issues
|
||||||
|
@Test
|
||||||
|
public void testSunnyDay() throws NoSuchAlgorithmException, KeyManagementException {
|
||||||
|
DomainEvaluator evaluator = new DomainEvaluator(new LocalDomainCoordinator());
|
||||||
|
|
||||||
|
// Should be a valid domain
|
||||||
|
assertTrue(evaluator.evaluateDomain("www.marginalia.nu"));
|
||||||
|
|
||||||
|
// Should be a redirect to www.marginalia.nu
|
||||||
|
assertFalse(evaluator.evaluateDomain("memex.marginalia.nu"));
|
||||||
|
|
||||||
|
// Should fail on Anubis
|
||||||
|
assertFalse(evaluator.evaluateDomain("marginalia-search.com"));
|
||||||
|
}
|
||||||
|
}
|
@@ -28,6 +28,7 @@ the data generated by the loader.
|
|||||||
## 5. Other Processes
|
## 5. Other Processes
|
||||||
|
|
||||||
* Ping Process: The [ping-process](ping-process/) keeps track of the aliveness of websites, gathering fingerprint information about the security posture of the website, as well as DNS information.
|
* Ping Process: The [ping-process](ping-process/) keeps track of the aliveness of websites, gathering fingerprint information about the security posture of the website, as well as DNS information.
|
||||||
|
* New Domain Process (NDP): The [new-domain-process](new-domain-process/) evaluates new domains for inclusion in the search engine index.
|
||||||
* Live-Crawling Process: The [live-crawling-process](live-crawling-process/) is a process that crawls websites in real-time based on RSS feeds, updating a smaller index with the latest content.
|
* Live-Crawling Process: The [live-crawling-process](live-crawling-process/) is a process that crawls websites in real-time based on RSS feeds, updating a smaller index with the latest content.
|
||||||
|
|
||||||
## Overview
|
## Overview
|
||||||
|
@@ -20,6 +20,6 @@ public class StatusModule extends AbstractModule {
|
|||||||
bind(String.class)
|
bind(String.class)
|
||||||
.annotatedWith(Names.named("searchEngineTestQuery"))
|
.annotatedWith(Names.named("searchEngineTestQuery"))
|
||||||
.toInstance(System.getProperty("status-service.public-query",
|
.toInstance(System.getProperty("status-service.public-query",
|
||||||
"https://marginalia-search.com/search?query=plato&ref=marginalia-automatic-metrics"));
|
"https://old-search.marginalia.nu/search?query=plato&ref=marginalia-automatic-metrics"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -280,6 +280,7 @@ public class ControlNodeService {
|
|||||||
"on".equalsIgnoreCase(request.queryParams("autoClean")),
|
"on".equalsIgnoreCase(request.queryParams("autoClean")),
|
||||||
"on".equalsIgnoreCase(request.queryParams("includeInPrecession")),
|
"on".equalsIgnoreCase(request.queryParams("includeInPrecession")),
|
||||||
"on".equalsIgnoreCase(request.queryParams("keepWarcs")),
|
"on".equalsIgnoreCase(request.queryParams("keepWarcs")),
|
||||||
|
"on".equalsIgnoreCase(request.queryParams("autoAssignDomains")),
|
||||||
NodeProfile.valueOf(request.queryParams("profile")),
|
NodeProfile.valueOf(request.queryParams("profile")),
|
||||||
"on".equalsIgnoreCase(request.queryParams("disabled"))
|
"on".equalsIgnoreCase(request.queryParams("disabled"))
|
||||||
);
|
);
|
||||||
|
@@ -74,6 +74,8 @@ public class ControlSysActionsService {
|
|||||||
Spark.post("/actions/recrawl-all", this::recrawlAll, Redirects.redirectToOverview);
|
Spark.post("/actions/recrawl-all", this::recrawlAll, Redirects.redirectToOverview);
|
||||||
Spark.post("/actions/flush-api-caches", this::flushApiCaches, Redirects.redirectToOverview);
|
Spark.post("/actions/flush-api-caches", this::flushApiCaches, Redirects.redirectToOverview);
|
||||||
Spark.post("/actions/reload-blogs-list", this::reloadBlogsList, Redirects.redirectToOverview);
|
Spark.post("/actions/reload-blogs-list", this::reloadBlogsList, Redirects.redirectToOverview);
|
||||||
|
|
||||||
|
Spark.post("/actions/update-nsfw-filters", this::updateNsfwFilters, Redirects.redirectToOverview);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
@@ -132,6 +134,14 @@ public class ControlSysActionsService {
|
|||||||
return "";
|
return "";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Object updateNsfwFilters(Request request, Response response) throws Exception {
|
||||||
|
eventLog.logEvent("USER-ACTION", "UPDATE-NSFW-FILTERS");
|
||||||
|
|
||||||
|
executorClient.updateNsfwFilters();
|
||||||
|
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
public Object flushApiCaches(Request request, Response response) throws Exception {
|
public Object flushApiCaches(Request request, Response response) throws Exception {
|
||||||
eventLog.logEvent("USER-ACTION", "FLUSH-API-CACHES");
|
eventLog.logEvent("USER-ACTION", "FLUSH-API-CACHES");
|
||||||
apiOutbox.sendNotice("FLUSH_CACHES", "");
|
apiOutbox.sendNotice("FLUSH_CACHES", "");
|
||||||
|
@@ -66,13 +66,23 @@
|
|||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
|
<div class="form-check form-switch">
|
||||||
|
<input class="form-check-input" type="checkbox" role="switch" name="autoAssignDomains" {{#if config.autoAssignDomains}}checked{{/if}}>
|
||||||
|
<label class="form-check-label" for="autoClean">Auto-Assign Domains</label>
|
||||||
|
|
||||||
|
<div class="form-text">If true, the New Domain Process will assign new domains to this node and all other nodes with this setting enabled.
|
||||||
|
This is the default behavior, but can be overridden if you want one node with a specific manual domain assignment.
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<!-- This is not currently used, but may be in the future
|
||||||
<div class="form-check form-switch">
|
<div class="form-check form-switch">
|
||||||
<input class="form-check-input" type="checkbox" role="switch" name="includeInPrecession" {{#if config.includeInPrecession}}checked{{/if}}>
|
<input class="form-check-input" type="checkbox" role="switch" name="includeInPrecession" {{#if config.includeInPrecession}}checked{{/if}}>
|
||||||
<label class="form-check-label" for="includeInPrecession">Include in crawling precession</label>
|
<label class="form-check-label" for="includeInPrecession">Include in crawling precession</label>
|
||||||
|
|
||||||
<div class="form-text">If true, this node will be included in the crawling precession.</div>
|
<div class="form-text">If true, this node will be included in the crawling precession.</div>
|
||||||
</div>
|
</div>
|
||||||
|
-->
|
||||||
<div class="form-check form-switch">
|
<div class="form-check form-switch">
|
||||||
<input class="form-check-input" type="checkbox" role="switch" name="keepWarcs" {{#if config.keepWarcs}}checked{{/if}}>
|
<input class="form-check-input" type="checkbox" role="switch" name="keepWarcs" {{#if config.keepWarcs}}checked{{/if}}>
|
||||||
<label class="form-check-label" for="includeInPrecession">Keep WARC files during crawling</label>
|
<label class="form-check-label" for="includeInPrecession">Keep WARC files during crawling</label>
|
||||||
|
@@ -13,14 +13,23 @@
|
|||||||
{{#unless node.profile.realtime}}
|
{{#unless node.profile.realtime}}
|
||||||
<li class="nav-item dropdown">
|
<li class="nav-item dropdown">
|
||||||
<a class="nav-link dropdown-toggle {{#if tab.actions}}active{{/if}}" data-bs-toggle="dropdown" href="#" role="button" aria-expanded="false">Actions</a>
|
<a class="nav-link dropdown-toggle {{#if tab.actions}}active{{/if}}" data-bs-toggle="dropdown" href="#" role="button" aria-expanded="false">Actions</a>
|
||||||
{{#if node.profile.permitBatchCrawl}}
|
|
||||||
<ul class="dropdown-menu">
|
<ul class="dropdown-menu">
|
||||||
|
{{#if node.profile.permitBatchCrawl}}
|
||||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=new-crawl">New Crawl</a></li>
|
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=new-crawl">New Crawl</a></li>
|
||||||
<li><hr class="dropdown-divider"></li>
|
<li><hr class="dropdown-divider"></li>
|
||||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=process">Process Crawl Data</a></li>
|
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=process">Process Crawl Data</a></li>
|
||||||
|
{{/if}}
|
||||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=load">Load Processed Data</a></li>
|
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=load">Load Processed Data</a></li>
|
||||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=repartition">Repartition Index</a></li>
|
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=repartition">Repartition Index</a></li>
|
||||||
<li><hr class="dropdown-divider"></li>
|
<li><hr class="dropdown-divider"></li>
|
||||||
|
{{#if node.profile.permitSideload}}
|
||||||
|
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-encyclopedia">Sideload Encyclopedia</a></li>
|
||||||
|
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-stackexchange">Sideload Stackexchange</a></li>
|
||||||
|
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-warc">Sideload WARC Files</a></li>
|
||||||
|
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-dirtree">Sideload Dirtree</a></li>
|
||||||
|
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-reddit">Sideload Reddit</a></li>
|
||||||
|
<li><hr class="dropdown-divider"></li>
|
||||||
|
{{/if}}
|
||||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=download-sample-data">Download Sample Crawl Data</a></li>
|
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=download-sample-data">Download Sample Crawl Data</a></li>
|
||||||
<li><hr class="dropdown-divider"></li>
|
<li><hr class="dropdown-divider"></li>
|
||||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=export-db-data">Export Database Data</a></li>
|
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=export-db-data">Export Database Data</a></li>
|
||||||
@@ -30,19 +39,6 @@
|
|||||||
<li><hr class="dropdown-divider"></li>
|
<li><hr class="dropdown-divider"></li>
|
||||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=restore-backup">Restore Index Backup</a></li>
|
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=restore-backup">Restore Index Backup</a></li>
|
||||||
</ul>
|
</ul>
|
||||||
{{/if}}
|
|
||||||
{{#if node.profile.permitSideload}}
|
|
||||||
<ul class="dropdown-menu">
|
|
||||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-encyclopedia">Sideload Encyclopedia</a></li>
|
|
||||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-stackexchange">Sideload Stackexchange</a></li>
|
|
||||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-warc">Sideload WARC Files</a></li>
|
|
||||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-dirtree">Sideload Dirtree</a></li>
|
|
||||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-reddit">Sideload Reddit</a></li>
|
|
||||||
<li><hr class="dropdown-divider"></li>
|
|
||||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=load">Load Processed Data</a></li>
|
|
||||||
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=restore-backup">Restore Index Backup</a></li>
|
|
||||||
</ul>
|
|
||||||
{{/if}}
|
|
||||||
</li>
|
</li>
|
||||||
{{/unless}}
|
{{/unless}}
|
||||||
<li class="nav-item">
|
<li class="nav-item">
|
||||||
|
@@ -53,6 +53,31 @@
|
|||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
|
<div class="accordion-item">
|
||||||
|
<h2 class="accordion-header">
|
||||||
|
<button class="accordion-button collapsed"
|
||||||
|
type="button"
|
||||||
|
data-bs-toggle="collapse"
|
||||||
|
data-bs-target="#collapseNsfwFilters"
|
||||||
|
aria-expanded="false"
|
||||||
|
aria-controls="collapseNsfwFilters">
|
||||||
|
Update NSFW Filters Definitions
|
||||||
|
</button>
|
||||||
|
</h2>
|
||||||
|
<div id="collapseNsfwFilters" class="accordion-collapse collapse p-3" data-bs-parent="#accordionActions">
|
||||||
|
<div class="mb-3">
|
||||||
|
This will fetch NSFW filter definitions.
|
||||||
|
</div>
|
||||||
|
<form method="post" action="actions/update-nsfw-filters">
|
||||||
|
<button
|
||||||
|
class="btn btn-primary me-md-2"
|
||||||
|
onclick="return confirm('Confirm update NSFW filters');"
|
||||||
|
type="submit">
|
||||||
|
Update NSFW Filter</button>
|
||||||
|
</form>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
<div class="accordion-item">
|
<div class="accordion-item">
|
||||||
<h2 class="accordion-header">
|
<h2 class="accordion-header">
|
||||||
<button class="accordion-button collapsed"
|
<button class="accordion-button collapsed"
|
||||||
|
Reference in New Issue
Block a user