1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 17:32:39 +02:00

Compare commits

...

17 Commits

Author SHA1 Message Date
Viktor Lofgren
390f053406 (api) Add query parameter 'dc' for specifying the max number of results per domain 2025-07-14 10:09:30 +02:00
Viktor Lofgren
b03c43224c (search) Fix redirects in new search UI 2025-07-11 23:44:45 +02:00
Viktor Lofgren
9b4ce9e9eb (search) Fix !w redirect 2025-07-11 23:28:09 +02:00
Viktor
81ac02a695 Merge pull request #209 from us3r1d/master
added converter.insertFoundDomains property
2025-07-11 21:34:04 +02:00
krystal
47f624fb3b changed converter.insertFoundDomains to loader.insertFoundDomains 2025-07-11 12:13:45 -07:00
krystal
c866f19cbb added converter.insertFoundDomains property 2025-07-10 15:36:59 -07:00
Viktor Lofgren
518278493b (converter) Increase the max byte length when parsing crawled documents to 500 kB from 200 kB. 2025-07-08 21:22:02 +02:00
Viktor Lofgren
1ac0bab0b8 (converter) Also exclude length checks when lenient processing is enabled 2025-07-08 20:37:53 +02:00
Viktor Lofgren
08b45ed10a (converter) Add system property converter.lenientProcessing to disable most disqualification checks 2025-07-08 19:44:51 +02:00
Viktor Lofgren
f2cfb91973 (converter) Add audit log of converter errors and rejections 2025-07-08 19:15:41 +02:00
Viktor Lofgren
2f79524eb3 (refac) Rename ProcessService to ProcessSpawnerService for clarity 2025-07-07 15:48:44 +02:00
Viktor Lofgren
3b00142c96 (search) Don't say unknown domains are in the crawler queue 2025-07-06 18:42:36 +02:00
Viktor Lofgren
294ab19177 (status) Use old-search for status service instead of marginalia-search.com 2025-07-06 15:40:53 +02:00
Viktor Lofgren
6f1659ecb2 (control) Add GUI for NSFW Filter Update trigger 2025-06-25 16:03:27 +02:00
Viktor Lofgren
982dcb28f0 (live-crawler) Use Apache HttpClient + code cleanup 2025-06-24 13:04:19 +02:00
Viktor Lofgren
fc686d8b2e (live-crawler) Fix startup race condition
The fix makes sure we wait for the feeds API to be available before fetching from it, so that the process doesn't crash on a cold system reboot.
2025-06-24 11:42:41 +02:00
Viktor Lofgren
69ef0f334a (rss) Make feed fetcher use Apache's HttpClient 2025-06-23 18:49:55 +02:00
59 changed files with 873 additions and 347 deletions

View File

@@ -7,6 +7,7 @@
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
</Filters> </Filters>
</Console> </Console>
<Console name="ProcessConsole" target="SYSTEM_OUT"> <Console name="ProcessConsole" target="SYSTEM_OUT">
@@ -23,6 +24,7 @@
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="PROCESS" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="PROCESS" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
</Filters> </Filters>
<SizeBasedTriggeringPolicy size="10MB" /> <SizeBasedTriggeringPolicy size="10MB" />
</RollingFile> </RollingFile>
@@ -36,6 +38,16 @@
<MarkerFilter marker="CRAWLER" onMatch="ALLOW" onMismatch="DENY" /> <MarkerFilter marker="CRAWLER" onMatch="ALLOW" onMismatch="DENY" />
</Filters> </Filters>
</RollingFile> </RollingFile>
<RollingFile name="LogToFile" fileName="${env:WMSA_LOG_DIR:-/var/log/wmsa}/converter-audit-${env:WMSA_SERVICE_NODE:-0}.log" filePattern="/var/log/wmsa/converter-audit-${env:WMSA_SERVICE_NODE:-0}-log-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz"
ignoreExceptions="false">
<PatternLayout>
<Pattern>%d{yyyy-MM-dd HH:mm:ss,SSS}: %msg{nolookups}%n</Pattern>
</PatternLayout>
<SizeBasedTriggeringPolicy size="100MB" />
<Filters>
<MarkerFilter marker="CONVERTER" onMatch="ALLOW" onMismatch="DENY" />
</Filters>
</RollingFile>
</Appenders> </Appenders>
<Loggers> <Loggers>
<Logger name="org.apache.zookeeper" level="WARN" /> <Logger name="org.apache.zookeeper" level="WARN" />

View File

@@ -8,6 +8,7 @@
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
</Filters> </Filters>
</Console> </Console>
<Console name="ConsoleWarn" target="SYSTEM_OUT"> <Console name="ConsoleWarn" target="SYSTEM_OUT">
@@ -18,6 +19,7 @@
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
</Filters> </Filters>
</Console> </Console>
<Console name="ConsoleError" target="SYSTEM_OUT"> <Console name="ConsoleError" target="SYSTEM_OUT">
@@ -28,6 +30,7 @@
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
</Filters> </Filters>
</Console> </Console>
<Console name="ConsoleFatal" target="SYSTEM_OUT"> <Console name="ConsoleFatal" target="SYSTEM_OUT">
@@ -38,6 +41,7 @@
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
</Filters> </Filters>
</Console> </Console>
<Console name="ProcessConsole" target="SYSTEM_OUT"> <Console name="ProcessConsole" target="SYSTEM_OUT">
@@ -57,6 +61,7 @@
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
</Filters> </Filters>
</RollingFile> </RollingFile>
<RollingFile name="LogToFile" fileName="${env:WMSA_LOG_DIR:-/var/log/wmsa}/crawler-audit-${env:WMSA_SERVICE_NODE:-0}.log" filePattern="/var/log/wmsa/crawler-audit-${env:WMSA_SERVICE_NODE:-0}-log-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz" <RollingFile name="LogToFile" fileName="${env:WMSA_LOG_DIR:-/var/log/wmsa}/crawler-audit-${env:WMSA_SERVICE_NODE:-0}.log" filePattern="/var/log/wmsa/crawler-audit-${env:WMSA_SERVICE_NODE:-0}-log-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz"
@@ -69,6 +74,16 @@
<MarkerFilter marker="CRAWLER" onMatch="ALLOW" onMismatch="DENY" /> <MarkerFilter marker="CRAWLER" onMatch="ALLOW" onMismatch="DENY" />
</Filters> </Filters>
</RollingFile> </RollingFile>
<RollingFile name="LogToFile" fileName="${env:WMSA_LOG_DIR:-/var/log/wmsa}/converter-audit-${env:WMSA_SERVICE_NODE:-0}.log" filePattern="/var/log/wmsa/converter-audit-${env:WMSA_SERVICE_NODE:-0}-log-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz"
ignoreExceptions="false">
<PatternLayout>
<Pattern>%d{yyyy-MM-dd HH:mm:ss,SSS}: %msg{nolookups}%n</Pattern>
</PatternLayout>
<SizeBasedTriggeringPolicy size="100MB" />
<Filters>
<MarkerFilter marker="CONVERTER" onMatch="ALLOW" onMismatch="DENY" />
</Filters>
</RollingFile>
</Appenders> </Appenders>
<Loggers> <Loggers>
<Logger name="org.apache.zookeeper" level="WARN" /> <Logger name="org.apache.zookeeper" level="WARN" />

View File

@@ -9,6 +9,7 @@ import nu.marginalia.executor.storage.FileStorageFile;
import nu.marginalia.executor.upload.UploadDirContents; import nu.marginalia.executor.upload.UploadDirContents;
import nu.marginalia.executor.upload.UploadDirItem; import nu.marginalia.executor.upload.UploadDirItem;
import nu.marginalia.functions.execution.api.*; import nu.marginalia.functions.execution.api.*;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.service.ServiceId; import nu.marginalia.service.ServiceId;
import nu.marginalia.service.client.GrpcChannelPoolFactory; import nu.marginalia.service.client.GrpcChannelPoolFactory;
import nu.marginalia.service.client.GrpcMultiNodeChannelPool; import nu.marginalia.service.client.GrpcMultiNodeChannelPool;
@@ -25,27 +26,37 @@ import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List; import java.util.List;
import static nu.marginalia.functions.execution.api.ExecutorApiGrpc.ExecutorApiBlockingStub; import static nu.marginalia.functions.execution.api.ExecutorApiGrpc.ExecutorApiBlockingStub;
@Singleton @Singleton
public class ExecutorClient { public class ExecutorClient {
private final MqPersistence persistence;
private final GrpcMultiNodeChannelPool<ExecutorApiBlockingStub> channelPool; private final GrpcMultiNodeChannelPool<ExecutorApiBlockingStub> channelPool;
private static final Logger logger = LoggerFactory.getLogger(ExecutorClient.class); private static final Logger logger = LoggerFactory.getLogger(ExecutorClient.class);
private final ServiceRegistryIf registry; private final ServiceRegistryIf registry;
@Inject @Inject
public ExecutorClient(ServiceRegistryIf registry, public ExecutorClient(ServiceRegistryIf registry,
MqPersistence persistence,
GrpcChannelPoolFactory grpcChannelPoolFactory) GrpcChannelPoolFactory grpcChannelPoolFactory)
{ {
this.registry = registry; this.registry = registry;
this.persistence = persistence;
this.channelPool = grpcChannelPoolFactory this.channelPool = grpcChannelPoolFactory
.createMulti( .createMulti(
ServiceKey.forGrpcApi(ExecutorApiGrpc.class, ServicePartition.multi()), ServiceKey.forGrpcApi(ExecutorApiGrpc.class, ServicePartition.multi()),
ExecutorApiGrpc::newBlockingStub); ExecutorApiGrpc::newBlockingStub);
} }
private long createTrackingTokenMsg(String task, int node, Duration ttl) throws Exception {
return persistence.sendNewMessage("task-tracking[" + node + "]", "export-client", null, task, "", ttl);
}
public void startFsm(int node, String actorName) { public void startFsm(int node, String actorName) {
channelPool.call(ExecutorApiBlockingStub::startFsm) channelPool.call(ExecutorApiBlockingStub::startFsm)
.forNode(node) .forNode(node)
@@ -96,6 +107,16 @@ public class ExecutorClient {
.build()); .build());
} }
public long updateNsfwFilters() throws Exception {
long msgId = createTrackingTokenMsg("nsfw-filters", 1, Duration.ofHours(6));
channelPool.call(ExecutorApiBlockingStub::updateNsfwFilters)
.forNode(1)
.run(RpcUpdateNsfwFilters.newBuilder().setMsgId(msgId).build());
return msgId;
}
public ActorRunStates getActorStates(int node) { public ActorRunStates getActorStates(int node) {
try { try {
var rs = channelPool.call(ExecutorApiBlockingStub::getActorStates) var rs = channelPool.call(ExecutorApiBlockingStub::getActorStates)

View File

@@ -18,6 +18,8 @@ service ExecutorApi {
rpc calculateAdjacencies(Empty) returns (Empty) {} rpc calculateAdjacencies(Empty) returns (Empty) {}
rpc restoreBackup(RpcFileStorageId) returns (Empty) {} rpc restoreBackup(RpcFileStorageId) returns (Empty) {}
rpc updateNsfwFilters(RpcUpdateNsfwFilters) returns (Empty) {}
rpc restartExecutorService(Empty) returns (Empty) {} rpc restartExecutorService(Empty) returns (Empty) {}
} }
@@ -66,6 +68,9 @@ message RpcExportRequest {
int64 fileStorageId = 1; int64 fileStorageId = 1;
int64 msgId = 2; int64 msgId = 2;
} }
message RpcUpdateNsfwFilters {
int64 msgId = 1;
}
message RpcFileStorageIdWithDomainName { message RpcFileStorageIdWithDomainName {
int64 fileStorageId = 1; int64 fileStorageId = 1;
string targetDomainName = 2; string targetDomainName = 2;

View File

@@ -2,10 +2,11 @@ package nu.marginalia.actor;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.functions.execution.api.*; import nu.marginalia.functions.execution.api.RpcFsmName;
import nu.marginalia.functions.execution.api.RpcProcessId;
import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -14,18 +15,18 @@ import spark.Spark;
@Singleton @Singleton
public class ActorApi { public class ActorApi {
private final ExecutorActorControlService actors; private final ExecutorActorControlService actors;
private final ProcessService processService; private final ProcessSpawnerService processSpawnerService;
private final MqPersistence mqPersistence; private final MqPersistence mqPersistence;
private final ServiceConfiguration serviceConfiguration; private final ServiceConfiguration serviceConfiguration;
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
@Inject @Inject
public ActorApi(ExecutorActorControlService actors, public ActorApi(ExecutorActorControlService actors,
ProcessService processService, ProcessSpawnerService processSpawnerService,
MqPersistence mqPersistence, MqPersistence mqPersistence,
ServiceConfiguration serviceConfiguration) ServiceConfiguration serviceConfiguration)
{ {
this.actors = actors; this.actors = actors;
this.processService = processService; this.processSpawnerService = processSpawnerService;
this.mqPersistence = mqPersistence; this.mqPersistence = mqPersistence;
this.serviceConfiguration = serviceConfiguration; this.serviceConfiguration = serviceConfiguration;
} }
@@ -43,7 +44,7 @@ public class ActorApi {
} }
public Object stopProcess(RpcProcessId processId) { public Object stopProcess(RpcProcessId processId) {
ProcessService.ProcessId id = ProcessService.translateExternalIdBase(processId.getProcessId()); ProcessSpawnerService.ProcessId id = ProcessSpawnerService.translateExternalIdBase(processId.getProcessId());
try { try {
String inbox = id.name().toLowerCase() + ":" + serviceConfiguration.node(); String inbox = id.name().toLowerCase() + ":" + serviceConfiguration.node();
@@ -60,7 +61,7 @@ public class ActorApi {
} }
} }
processService.kill(id); processSpawnerService.kill(id);
} }
catch (Exception ex) { catch (Exception ex) {
logger.error("Failed to stop process {}", id, ex); logger.error("Failed to stop process {}", id, ex);

View File

@@ -6,7 +6,7 @@ import java.util.Set;
public enum ExecutorActor { public enum ExecutorActor {
PREC_EXPORT_ALL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), PREC_EXPORT_ALL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
SYNC_NSFW_LISTS(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), UPDATE_NSFW_LISTS(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.SIDELOAD, NodeProfile.REALTIME),
CRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), CRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),

View File

@@ -113,7 +113,7 @@ public class ExecutorActorControlService {
register(ExecutorActor.UPDATE_RSS, updateRssActor); register(ExecutorActor.UPDATE_RSS, updateRssActor);
register(ExecutorActor.MIGRATE_CRAWL_DATA, migrateCrawlDataActor); register(ExecutorActor.MIGRATE_CRAWL_DATA, migrateCrawlDataActor);
register(ExecutorActor.SYNC_NSFW_LISTS, updateNsfwFiltersActor); register(ExecutorActor.UPDATE_NSFW_LISTS, updateNsfwFiltersActor);
if (serviceConfiguration.node() == 1) { if (serviceConfiguration.node() == 1) {
register(ExecutorActor.PREC_EXPORT_ALL, exportAllPrecessionActor); register(ExecutorActor.PREC_EXPORT_ALL, exportAllPrecessionActor);

View File

@@ -4,11 +4,14 @@ import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.*; import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry; import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.process.ProcessService; import nu.marginalia.actor.state.Resume;
import nu.marginalia.actor.state.Terminal;
import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -24,13 +27,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class AbstractProcessSpawnerActor extends RecordActorPrototype { public class AbstractProcessSpawnerActor extends RecordActorPrototype {
private final MqPersistence persistence; private final MqPersistence persistence;
private final ProcessService processService; private final ProcessSpawnerService processSpawnerService;
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
public static final int MAX_ATTEMPTS = 3; public static final int MAX_ATTEMPTS = 3;
private final String inboxName; private final String inboxName;
private final ProcessService.ProcessId processId; private final ProcessSpawnerService.ProcessId processId;
private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final int node; private final int node;
@@ -50,7 +53,7 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype {
for (;;) { for (;;) {
var messages = persistence.eavesdrop(inboxName, 1); var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty() && !processService.isRunning(processId)) { if (messages.isEmpty() && !processSpawnerService.isRunning(processId)) {
synchronized (processId) { synchronized (processId) {
processId.wait(5000); processId.wait(5000);
} }
@@ -92,7 +95,7 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype {
catch (InterruptedException ex) { catch (InterruptedException ex) {
// We get this exception when the process is cancelled by the user // We get this exception when the process is cancelled by the user
processService.kill(processId); processSpawnerService.kill(processId);
setCurrentMessageToDead(); setCurrentMessageToDead();
yield new Aborted(); yield new Aborted();
@@ -112,13 +115,13 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype {
public AbstractProcessSpawnerActor(Gson gson, public AbstractProcessSpawnerActor(Gson gson,
ServiceConfiguration configuration, ServiceConfiguration configuration,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService, ProcessSpawnerService processSpawnerService,
String inboxName, String inboxName,
ProcessService.ProcessId processId) { ProcessSpawnerService.ProcessId processId) {
super(gson); super(gson);
this.node = configuration.node(); this.node = configuration.node();
this.persistence = persistence; this.persistence = persistence;
this.processService = processService; this.processSpawnerService = processSpawnerService;
this.inboxName = inboxName + ":" + node; this.inboxName = inboxName + ":" + node;
this.processId = processId; this.processId = processId;
} }
@@ -149,7 +152,7 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype {
// Run this call in a separate thread so that this thread can be interrupted waiting for it // Run this call in a separate thread so that this thread can be interrupted waiting for it
executorService.submit(() -> { executorService.submit(() -> {
try { try {
processService.trigger(processId); processSpawnerService.trigger(processId);
} catch (Exception e) { } catch (Exception e) {
logger.warn("Error in triggering process", e); logger.warn("Error in triggering process", e);
error.set(true); error.set(true);

View File

@@ -4,9 +4,9 @@ import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor; import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@Singleton @Singleton
@@ -17,13 +17,13 @@ public class ConverterMonitorActor extends AbstractProcessSpawnerActor {
public ConverterMonitorActor(Gson gson, public ConverterMonitorActor(Gson gson,
ServiceConfiguration configuration, ServiceConfiguration configuration,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessSpawnerService processSpawnerService) {
super(gson, super(gson,
configuration, configuration,
persistence, persistence,
processService, processSpawnerService,
ProcessInboxNames.CONVERTER_INBOX, ProcessInboxNames.CONVERTER_INBOX,
ProcessService.ProcessId.CONVERTER); ProcessSpawnerService.ProcessId.CONVERTER);
} }

View File

@@ -4,9 +4,9 @@ import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor; import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@Singleton @Singleton
@@ -16,13 +16,13 @@ public class CrawlerMonitorActor extends AbstractProcessSpawnerActor {
public CrawlerMonitorActor(Gson gson, public CrawlerMonitorActor(Gson gson,
ServiceConfiguration configuration, ServiceConfiguration configuration,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessSpawnerService processSpawnerService) {
super(gson, super(gson,
configuration, configuration,
persistence, persistence,
processService, processSpawnerService,
ProcessInboxNames.CRAWLER_INBOX, ProcessInboxNames.CRAWLER_INBOX,
ProcessService.ProcessId.CRAWLER); ProcessSpawnerService.ProcessId.CRAWLER);
} }

View File

@@ -6,7 +6,7 @@ import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor; import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@Singleton @Singleton
@@ -16,13 +16,13 @@ public class ExportTaskMonitorActor extends AbstractProcessSpawnerActor {
public ExportTaskMonitorActor(Gson gson, public ExportTaskMonitorActor(Gson gson,
ServiceConfiguration configuration, ServiceConfiguration configuration,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessSpawnerService processSpawnerService) {
super(gson, super(gson,
configuration, configuration,
persistence, persistence,
processService, processSpawnerService,
ProcessInboxNames.EXPORT_TASK_INBOX, ProcessInboxNames.EXPORT_TASK_INBOX,
ProcessService.ProcessId.EXPORT_TASKS); ProcessSpawnerService.ProcessId.EXPORT_TASKS);
} }

View File

@@ -4,9 +4,9 @@ import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor; import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@Singleton @Singleton
@@ -17,13 +17,13 @@ public class IndexConstructorMonitorActor extends AbstractProcessSpawnerActor {
public IndexConstructorMonitorActor(Gson gson, public IndexConstructorMonitorActor(Gson gson,
ServiceConfiguration configuration, ServiceConfiguration configuration,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessSpawnerService processSpawnerService) {
super(gson, super(gson,
configuration, configuration,
persistence, persistence,
processService, processSpawnerService,
ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX, ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX,
ProcessService.ProcessId.INDEX_CONSTRUCTOR); ProcessSpawnerService.ProcessId.INDEX_CONSTRUCTOR);
} }

View File

@@ -6,7 +6,7 @@ import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor; import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@Singleton @Singleton
@@ -16,13 +16,13 @@ public class LiveCrawlerMonitorActor extends AbstractProcessSpawnerActor {
public LiveCrawlerMonitorActor(Gson gson, public LiveCrawlerMonitorActor(Gson gson,
ServiceConfiguration configuration, ServiceConfiguration configuration,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessSpawnerService processSpawnerService) {
super(gson, super(gson,
configuration, configuration,
persistence, persistence,
processService, processSpawnerService,
ProcessInboxNames.LIVE_CRAWLER_INBOX, ProcessInboxNames.LIVE_CRAWLER_INBOX,
ProcessService.ProcessId.LIVE_CRAWLER); ProcessSpawnerService.ProcessId.LIVE_CRAWLER);
} }

View File

@@ -4,9 +4,9 @@ import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor; import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@Singleton @Singleton
@@ -17,13 +17,13 @@ public class LoaderMonitorActor extends AbstractProcessSpawnerActor {
public LoaderMonitorActor(Gson gson, public LoaderMonitorActor(Gson gson,
ServiceConfiguration configuration, ServiceConfiguration configuration,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessSpawnerService processSpawnerService) {
super(gson, super(gson,
configuration, configuration,
persistence, processService, persistence, processSpawnerService,
ProcessInboxNames.LOADER_INBOX, ProcessInboxNames.LOADER_INBOX,
ProcessService.ProcessId.LOADER); ProcessSpawnerService.ProcessId.LOADER);
} }
} }

View File

@@ -6,7 +6,7 @@ import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor; import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@Singleton @Singleton
@@ -16,13 +16,13 @@ public class NdpMonitorActor extends AbstractProcessSpawnerActor {
public NdpMonitorActor(Gson gson, public NdpMonitorActor(Gson gson,
ServiceConfiguration configuration, ServiceConfiguration configuration,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessSpawnerService processSpawnerService) {
super(gson, super(gson,
configuration, configuration,
persistence, persistence,
processService, processSpawnerService,
ProcessInboxNames.NDP_INBOX, ProcessInboxNames.NDP_INBOX,
ProcessService.ProcessId.NDP); ProcessSpawnerService.ProcessId.NDP);
} }

View File

@@ -13,7 +13,7 @@ import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqapi.ping.PingRequest; import nu.marginalia.mqapi.ping.PingRequest;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -25,17 +25,21 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
// Unlike other monitor actors, the ping monitor will not merely wait for a request
// to be sent, but send one itself, hence we can't extend AbstractProcessSpawnerActor
// but have to reimplement a lot of the same logic ourselves.
@Singleton @Singleton
public class PingMonitorActor extends RecordActorPrototype { public class PingMonitorActor extends RecordActorPrototype {
private final MqPersistence persistence; private final MqPersistence persistence;
private final ProcessService processService; private final ProcessSpawnerService processSpawnerService;
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
public static final int MAX_ATTEMPTS = 3; public static final int MAX_ATTEMPTS = 3;
private final String inboxName; private final String inboxName;
private final ProcessService.ProcessId processId; private final ProcessSpawnerService.ProcessId processId;
private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final int node; private final int node;
private final Gson gson; private final Gson gson;
@@ -53,7 +57,6 @@ public class PingMonitorActor extends RecordActorPrototype {
return switch (self) { return switch (self) {
case Initial i -> { case Initial i -> {
PingRequest request = new PingRequest(); PingRequest request = new PingRequest();
persistence.sendNewMessage(inboxName, null, null, persistence.sendNewMessage(inboxName, null, null,
"PingRequest", "PingRequest",
gson.toJson(request), gson.toJson(request),
@@ -65,7 +68,7 @@ public class PingMonitorActor extends RecordActorPrototype {
for (;;) { for (;;) {
var messages = persistence.eavesdrop(inboxName, 1); var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty() && !processService.isRunning(processId)) { if (messages.isEmpty() && !processSpawnerService.isRunning(processId)) {
synchronized (processId) { synchronized (processId) {
processId.wait(5000); processId.wait(5000);
} }
@@ -107,7 +110,7 @@ public class PingMonitorActor extends RecordActorPrototype {
catch (InterruptedException ex) { catch (InterruptedException ex) {
// We get this exception when the process is cancelled by the user // We get this exception when the process is cancelled by the user
processService.kill(processId); processSpawnerService.kill(processId);
setCurrentMessageToDead(); setCurrentMessageToDead();
yield new Aborted(); yield new Aborted();
@@ -127,14 +130,14 @@ public class PingMonitorActor extends RecordActorPrototype {
public PingMonitorActor(Gson gson, public PingMonitorActor(Gson gson,
ServiceConfiguration configuration, ServiceConfiguration configuration,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) throws SQLException { ProcessSpawnerService processSpawnerService) throws SQLException {
super(gson); super(gson);
this.gson = gson; this.gson = gson;
this.node = configuration.node(); this.node = configuration.node();
this.persistence = persistence; this.persistence = persistence;
this.processService = processService; this.processSpawnerService = processSpawnerService;
this.inboxName = ProcessInboxNames.PING_INBOX + ":" + node; this.inboxName = ProcessInboxNames.PING_INBOX + ":" + node;
this.processId = ProcessService.ProcessId.PING; this.processId = ProcessSpawnerService.ProcessId.PING;
} }
/** Sets the message to dead in the database to avoid /** Sets the message to dead in the database to avoid
@@ -163,7 +166,7 @@ public class PingMonitorActor extends RecordActorPrototype {
// Run this call in a separate thread so that this thread can be interrupted waiting for it // Run this call in a separate thread so that this thread can be interrupted waiting for it
executorService.submit(() -> { executorService.submit(() -> {
try { try {
processService.trigger(processId); processSpawnerService.trigger(processId);
} catch (Exception e) { } catch (Exception e) {
logger.warn("Error in triggering process", e); logger.warn("Error in triggering process", e);
error.set(true); error.set(true);

View File

@@ -8,7 +8,7 @@ import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior; import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorStep; import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume; import nu.marginalia.actor.state.Resume;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit;
public class ProcessLivenessMonitorActor extends RecordActorPrototype { public class ProcessLivenessMonitorActor extends RecordActorPrototype {
private final ServiceEventLog eventLogService; private final ServiceEventLog eventLogService;
private final ProcessService processService; private final ProcessSpawnerService processSpawnerService;
private final HikariDataSource dataSource; private final HikariDataSource dataSource;
private final int node; private final int node;
@@ -49,7 +49,7 @@ public class ProcessLivenessMonitorActor extends RecordActorPrototype {
var processId = heartbeat.getProcessId(); var processId = heartbeat.getProcessId();
if (null == processId) continue; if (null == processId) continue;
if (processService.isRunning(processId) && heartbeat.lastSeenMillis() < 10_000) if (processSpawnerService.isRunning(processId) && heartbeat.lastSeenMillis() < 10_000)
continue; continue;
flagProcessAsStopped(heartbeat); flagProcessAsStopped(heartbeat);
@@ -72,12 +72,12 @@ public class ProcessLivenessMonitorActor extends RecordActorPrototype {
public ProcessLivenessMonitorActor(Gson gson, public ProcessLivenessMonitorActor(Gson gson,
ServiceEventLog eventLogService, ServiceEventLog eventLogService,
ServiceConfiguration configuration, ServiceConfiguration configuration,
ProcessService processService, ProcessSpawnerService processSpawnerService,
HikariDataSource dataSource) { HikariDataSource dataSource) {
super(gson); super(gson);
this.node = configuration.node(); this.node = configuration.node();
this.eventLogService = eventLogService; this.eventLogService = eventLogService;
this.processService = processService; this.processSpawnerService = processSpawnerService;
this.dataSource = dataSource; this.dataSource = dataSource;
} }
@@ -208,8 +208,8 @@ public class ProcessLivenessMonitorActor extends RecordActorPrototype {
public boolean isRunning() { public boolean isRunning() {
return "RUNNING".equals(status); return "RUNNING".equals(status);
} }
public ProcessService.ProcessId getProcessId() { public ProcessSpawnerService.ProcessId getProcessId() {
return ProcessService.translateExternalIdBase(processBase); return ProcessSpawnerService.translateExternalIdBase(processBase);
} }
} }

View File

@@ -47,6 +47,8 @@ public class ScrapeFeedsActor extends RecordActorPrototype {
private final Path feedPath = WmsaHome.getHomePath().resolve("data/scrape-urls.txt"); private final Path feedPath = WmsaHome.getHomePath().resolve("data/scrape-urls.txt");
private static boolean insertFoundDomains = Boolean.getBoolean("loader.insertFoundDomains");
public record Initial() implements ActorStep {} public record Initial() implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RETRY) @Resume(behavior = ActorResumeBehavior.RETRY)
public record Wait(String ts) implements ActorStep {} public record Wait(String ts) implements ActorStep {}
@@ -57,6 +59,8 @@ public class ScrapeFeedsActor extends RecordActorPrototype {
public ActorStep transition(ActorStep self) throws Exception { public ActorStep transition(ActorStep self) throws Exception {
return switch(self) { return switch(self) {
case Initial() -> { case Initial() -> {
if (!insertFoundDomains) yield new Error("Domain insertion prohibited, aborting");
if (nodeConfigurationService.get(nodeId).profile() != NodeProfile.REALTIME) { if (nodeConfigurationService.get(nodeId).profile() != NodeProfile.REALTIME) {
yield new Error("Invalid node profile for RSS update"); yield new Error("Invalid node profile for RSS update");
} }

View File

@@ -3,11 +3,11 @@ package nu.marginalia.actor.task;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.state.ActorControlFlowException; import nu.marginalia.actor.state.ActorControlFlowException;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.process.ProcessSpawnerService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -20,13 +20,13 @@ public class ActorProcessWatcher {
private static final Logger logger = LoggerFactory.getLogger(ActorProcessWatcher.class); private static final Logger logger = LoggerFactory.getLogger(ActorProcessWatcher.class);
private final MqPersistence persistence; private final MqPersistence persistence;
private final ProcessService processService; private final ProcessSpawnerService processSpawnerService;
@Inject @Inject
public ActorProcessWatcher(MqPersistence persistence, public ActorProcessWatcher(MqPersistence persistence,
ProcessService processService) { ProcessSpawnerService processSpawnerService) {
this.persistence = persistence; this.persistence = persistence;
this.processService = processService; this.processSpawnerService = processSpawnerService;
} }
/** Wait for a process to start, and then wait for a response from the process, /** Wait for a process to start, and then wait for a response from the process,
@@ -36,7 +36,7 @@ public class ActorProcessWatcher {
* <p> * <p>
* When interrupted, the process is killed and the message is marked as dead. * When interrupted, the process is killed and the message is marked as dead.
*/ */
public MqMessage waitResponse(MqOutbox outbox, ProcessService.ProcessId processId, long msgId) public MqMessage waitResponse(MqOutbox outbox, ProcessSpawnerService.ProcessId processId, long msgId)
throws ActorControlFlowException, InterruptedException, SQLException throws ActorControlFlowException, InterruptedException, SQLException
{ {
// enums values only have a single instance, // enums values only have a single instance,
@@ -65,7 +65,7 @@ public class ActorProcessWatcher {
// This will prevent the monitor process from attempting to respawn the process as we kill it // This will prevent the monitor process from attempting to respawn the process as we kill it
outbox.flagAsDead(msgId); outbox.flagAsDead(msgId);
processService.kill(processId); processSpawnerService.kill(processId);
logger.info("Process {} killed due to interrupt", processId); logger.info("Process {} killed due to interrupt", processId);
} }
@@ -94,12 +94,12 @@ public class ActorProcessWatcher {
} }
/** Wait the specified time for the specified process to start running (does not start the process) */ /** Wait the specified time for the specified process to start running (does not start the process) */
private boolean waitForProcess(ProcessService.ProcessId processId, TimeUnit unit, int duration) throws InterruptedException { private boolean waitForProcess(ProcessSpawnerService.ProcessId processId, TimeUnit unit, int duration) throws InterruptedException {
// Wait for process to start // Wait for process to start
long deadline = System.currentTimeMillis() + unit.toMillis(duration); long deadline = System.currentTimeMillis() + unit.toMillis(duration);
while (System.currentTimeMillis() < deadline) { while (System.currentTimeMillis() < deadline) {
if (processService.isRunning(processId)) if (processSpawnerService.isRunning(processId))
return true; return true;
TimeUnit.MILLISECONDS.sleep(100); TimeUnit.MILLISECONDS.sleep(100);

View File

@@ -12,7 +12,7 @@ import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.converting.ConvertRequest; import nu.marginalia.mqapi.converting.ConvertRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.sideload.RedditSideloadHelper; import nu.marginalia.sideload.RedditSideloadHelper;
import nu.marginalia.sideload.SideloadHelper; import nu.marginalia.sideload.SideloadHelper;
import nu.marginalia.sideload.StackExchangeSideloadHelper; import nu.marginalia.sideload.StackExchangeSideloadHelper;
@@ -218,7 +218,7 @@ public class ConvertActor extends RecordActorPrototype {
); );
} }
case ConvertWait(FileStorageId destFid, long msgId) -> { case ConvertWait(FileStorageId destFid, long msgId) -> {
var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessService.ProcessId.CONVERTER, msgId); var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessSpawnerService.ProcessId.CONVERTER, msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
yield new Error("Converter failed"); yield new Error("Converter failed");

View File

@@ -18,7 +18,7 @@ import nu.marginalia.mqapi.index.IndexName;
import nu.marginalia.mqapi.loading.LoadRequest; import nu.marginalia.mqapi.loading.LoadRequest;
import nu.marginalia.nodecfg.NodeConfigurationService; import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageId;
@@ -95,7 +95,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
case Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) when msgId < 0 -> case Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) when msgId < 0 ->
new Convert(crawlId, processedId, mqConverterOutbox.sendAsync(ConvertRequest.forCrawlData(crawlId, processedId))); new Convert(crawlId, processedId, mqConverterOutbox.sendAsync(ConvertRequest.forCrawlData(crawlId, processedId)));
case Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) -> { case Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) -> {
var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessService.ProcessId.CONVERTER, msgId); var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessSpawnerService.ProcessId.CONVERTER, msgId);
if (rsp.state() != MqMessageState.OK) if (rsp.state() != MqMessageState.OK)
yield new Error("Converter failed"); yield new Error("Converter failed");
@@ -129,7 +129,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
yield new Load(processedIds, id); yield new Load(processedIds, id);
} }
case Load(List<FileStorageId> processedIds, long msgId) -> { case Load(List<FileStorageId> processedIds, long msgId) -> {
var rsp = processWatcher.waitResponse(mqLoaderOutbox, ProcessService.ProcessId.LOADER, msgId); var rsp = processWatcher.waitResponse(mqLoaderOutbox, ProcessSpawnerService.ProcessId.LOADER, msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
yield new Error("Loader failed"); yield new Error("Loader failed");
@@ -165,7 +165,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
} }
case ReindexFwd(long id) when id < 0 -> new ReindexFwd(createIndex(IndexName.FORWARD)); case ReindexFwd(long id) when id < 0 -> new ReindexFwd(createIndex(IndexName.FORWARD));
case ReindexFwd(long id) -> { case ReindexFwd(long id) -> {
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessService.ProcessId.INDEX_CONSTRUCTOR, id); var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessSpawnerService.ProcessId.INDEX_CONSTRUCTOR, id);
if (rsp.state() != MqMessageState.OK) if (rsp.state() != MqMessageState.OK)
yield new Error("Forward index construction failed"); yield new Error("Forward index construction failed");
@@ -174,7 +174,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
} }
case ReindexFull(long id) when id < 0 -> new ReindexFull(createIndex(IndexName.REVERSE_FULL)); case ReindexFull(long id) when id < 0 -> new ReindexFull(createIndex(IndexName.REVERSE_FULL));
case ReindexFull(long id) -> { case ReindexFull(long id) -> {
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessService.ProcessId.INDEX_CONSTRUCTOR, id); var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessSpawnerService.ProcessId.INDEX_CONSTRUCTOR, id);
if (rsp.state() != MqMessageState.OK) if (rsp.state() != MqMessageState.OK)
yield new Error("Full index construction failed"); yield new Error("Full index construction failed");
@@ -183,7 +183,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
} }
case ReindexPrio(long id) when id < 0 -> new ReindexPrio(createIndex(IndexName.REVERSE_PRIO)); case ReindexPrio(long id) when id < 0 -> new ReindexPrio(createIndex(IndexName.REVERSE_PRIO));
case ReindexPrio(long id) -> { case ReindexPrio(long id) -> {
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessService.ProcessId.INDEX_CONSTRUCTOR, id); var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessSpawnerService.ProcessId.INDEX_CONSTRUCTOR, id);
if (rsp.state() != MqMessageState.OK) if (rsp.state() != MqMessageState.OK)
yield new Error("Prio index construction failed"); yield new Error("Prio index construction failed");

View File

@@ -13,7 +13,7 @@ import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.crawling.CrawlRequest; import nu.marginalia.mqapi.crawling.CrawlRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageType; import nu.marginalia.storage.model.FileStorageType;
@@ -76,7 +76,7 @@ public class CrawlActor extends RecordActorPrototype {
case Crawl (long msgId, FileStorageId fid, boolean cascadeLoad) -> { case Crawl (long msgId, FileStorageId fid, boolean cascadeLoad) -> {
var rsp = processWatcher.waitResponse( var rsp = processWatcher.waitResponse(
mqCrawlerOutbox, mqCrawlerOutbox,
ProcessService.ProcessId.CRAWLER, ProcessSpawnerService.ProcessId.CRAWLER,
msgId); msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {

View File

@@ -10,7 +10,7 @@ import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.tasks.ExportTaskRequest; import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageState; import nu.marginalia.storage.model.FileStorageState;
@@ -55,7 +55,7 @@ public class ExportAtagsActor extends RecordActorPrototype {
yield new Run(responseMsgId, crawlId, destId, newMsgId); yield new Run(responseMsgId, crawlId, destId, newMsgId);
} }
case Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId, long msgId) -> { case Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
storageService.flagFileForDeletion(destId); storageService.flagFileForDeletion(destId);

View File

@@ -10,7 +10,7 @@ import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.tasks.ExportTaskRequest; import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageState; import nu.marginalia.storage.model.FileStorageState;
@@ -54,7 +54,7 @@ public class ExportFeedsActor extends RecordActorPrototype {
yield new Run(responseMsgId, crawlId, destId, newMsgId); yield new Run(responseMsgId, crawlId, destId, newMsgId);
} }
case Run(long responseMsgId, _, FileStorageId destId, long msgId) -> { case Run(long responseMsgId, _, FileStorageId destId, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
storageService.flagFileForDeletion(destId); storageService.flagFileForDeletion(destId);

View File

@@ -9,7 +9,7 @@ import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.tasks.ExportTaskRequest; import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageState; import nu.marginalia.storage.model.FileStorageState;
@@ -52,7 +52,7 @@ public class ExportSampleDataActor extends RecordActorPrototype {
yield new Run(crawlId, destId, size, ctFilter, name, newMsgId); yield new Run(crawlId, destId, size, ctFilter, name, newMsgId);
} }
case Run(_, FileStorageId destId, _, _, _, long msgId) -> { case Run(_, FileStorageId destId, _, _, _, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
storageService.flagFileForDeletion(destId); storageService.flagFileForDeletion(destId);

View File

@@ -10,7 +10,7 @@ import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.tasks.ExportTaskRequest; import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageState; import nu.marginalia.storage.model.FileStorageState;
@@ -52,7 +52,7 @@ public class ExportTermFreqActor extends RecordActorPrototype {
yield new Run(responseMsgId, crawlId, destId, newMsgId); yield new Run(responseMsgId, crawlId, destId, newMsgId);
} }
case Run(long responseMsgId, _, FileStorageId destId, long msgId) -> { case Run(long responseMsgId, _, FileStorageId destId, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
storageService.flagFileForDeletion(destId); storageService.flagFileForDeletion(destId);

View File

@@ -13,7 +13,7 @@ import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.crawling.LiveCrawlRequest; import nu.marginalia.mqapi.crawling.LiveCrawlRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -44,7 +44,6 @@ public class LiveCrawlActor extends RecordActorPrototype {
@Override @Override
public ActorStep transition(ActorStep self) throws Exception { public ActorStep transition(ActorStep self) throws Exception {
logger.info("{}", self);
return switch (self) { return switch (self) {
case Initial() -> { case Initial() -> {
yield new Monitor("-"); yield new Monitor("-");
@@ -75,7 +74,7 @@ public class LiveCrawlActor extends RecordActorPrototype {
yield new LiveCrawl(feedsHash, id); yield new LiveCrawl(feedsHash, id);
} }
case LiveCrawl(String feedsHash, long msgId) -> { case LiveCrawl(String feedsHash, long msgId) -> {
var rsp = processWatcher.waitResponse(mqLiveCrawlerOutbox, ProcessService.ProcessId.LIVE_CRAWLER, msgId); var rsp = processWatcher.waitResponse(mqLiveCrawlerOutbox, ProcessSpawnerService.ProcessId.LIVE_CRAWLER, msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
yield new Error("Crawler failed"); yield new Error("Crawler failed");

View File

@@ -11,7 +11,7 @@ import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.crawling.CrawlRequest; import nu.marginalia.mqapi.crawling.CrawlRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageType; import nu.marginalia.storage.model.FileStorageType;
@@ -51,7 +51,7 @@ public class RecrawlSingleDomainActor extends RecordActorPrototype {
case Crawl (long msgId) -> { case Crawl (long msgId) -> {
var rsp = processWatcher.waitResponse( var rsp = processWatcher.waitResponse(
mqCrawlerOutbox, mqCrawlerOutbox,
ProcessService.ProcessId.CRAWLER, ProcessSpawnerService.ProcessId.CRAWLER,
msgId); msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {

View File

@@ -9,7 +9,7 @@ import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.tasks.ExportTaskRequest; import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -34,7 +34,7 @@ public class TriggerAdjacencyCalculationActor extends RecordActorPrototype {
yield new Run(newMsgId); yield new Run(newMsgId);
} }
case Run(long msgId) -> { case Run(long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
yield new Error("Exporter failed"); yield new Error("Exporter failed");

View File

@@ -5,6 +5,8 @@ import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep; import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.nsfw.NsfwDomainFilter; import nu.marginalia.nsfw.NsfwDomainFilter;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@@ -12,23 +14,26 @@ import nu.marginalia.service.module.ServiceConfiguration;
public class UpdateNsfwFiltersActor extends RecordActorPrototype { public class UpdateNsfwFiltersActor extends RecordActorPrototype {
private final ServiceConfiguration serviceConfiguration; private final ServiceConfiguration serviceConfiguration;
private final NsfwDomainFilter nsfwDomainFilter; private final NsfwDomainFilter nsfwDomainFilter;
private final MqPersistence persistence;
public record Initial() implements ActorStep {} public record Initial(long respondMsgId) implements ActorStep {}
public record Run() implements ActorStep {} public record Run(long respondMsgId) implements ActorStep {}
@Override @Override
public ActorStep transition(ActorStep self) throws Exception { public ActorStep transition(ActorStep self) throws Exception {
return switch(self) { return switch(self) {
case Initial() -> { case Initial(long respondMsgId) -> {
if (serviceConfiguration.node() != 1) { if (serviceConfiguration.node() != 1) {
persistence.updateMessageState(respondMsgId, MqMessageState.ERR);
yield new Error("This actor can only run on node 1"); yield new Error("This actor can only run on node 1");
} }
else { else {
yield new Run(); yield new Run(respondMsgId);
} }
} }
case Run() -> { case Run(long respondMsgId) -> {
nsfwDomainFilter.fetchLists(); nsfwDomainFilter.fetchLists();
persistence.updateMessageState(respondMsgId, MqMessageState.OK);
yield new End(); yield new End();
} }
default -> new Error(); default -> new Error();
@@ -43,11 +48,13 @@ public class UpdateNsfwFiltersActor extends RecordActorPrototype {
@Inject @Inject
public UpdateNsfwFiltersActor(Gson gson, public UpdateNsfwFiltersActor(Gson gson,
ServiceConfiguration serviceConfiguration, ServiceConfiguration serviceConfiguration,
NsfwDomainFilter nsfwDomainFilter) NsfwDomainFilter nsfwDomainFilter,
MqPersistence persistence)
{ {
super(gson); super(gson);
this.serviceConfiguration = serviceConfiguration; this.serviceConfiguration = serviceConfiguration;
this.nsfwDomainFilter = nsfwDomainFilter; this.nsfwDomainFilter = nsfwDomainFilter;
this.persistence = persistence;
} }
} }

View File

@@ -10,6 +10,7 @@ import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.actor.task.DownloadSampleActor; import nu.marginalia.actor.task.DownloadSampleActor;
import nu.marginalia.actor.task.RestoreBackupActor; import nu.marginalia.actor.task.RestoreBackupActor;
import nu.marginalia.actor.task.TriggerAdjacencyCalculationActor; import nu.marginalia.actor.task.TriggerAdjacencyCalculationActor;
import nu.marginalia.actor.task.UpdateNsfwFiltersActor;
import nu.marginalia.functions.execution.api.*; import nu.marginalia.functions.execution.api.*;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.service.server.DiscoverableService; import nu.marginalia.service.server.DiscoverableService;
@@ -263,4 +264,19 @@ public class ExecutorGrpcService
System.exit(0); System.exit(0);
} }
@Override
public void updateNsfwFilters(RpcUpdateNsfwFilters request, StreamObserver<Empty> responseObserver) {
logger.info("Got request {}", request);
try {
actorControlService.startFrom(ExecutorActor.UPDATE_NSFW_LISTS,
new UpdateNsfwFiltersActor.Initial(request.getMsgId()));
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
logger.error("Failed to update nsfw filters", e);
responseObserver.onError(e);
}
}
} }

View File

@@ -29,7 +29,7 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@Singleton @Singleton
public class ProcessService { public class ProcessSpawnerService {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
private final Marker processMarker = MarkerFactory.getMarker("PROCESS"); private final Marker processMarker = MarkerFactory.getMarker("PROCESS");
@@ -88,7 +88,7 @@ public class ProcessService {
} }
@Inject @Inject
public ProcessService(BaseServiceParams params) { public ProcessSpawnerService(BaseServiceParams params) {
this.eventLog = params.eventLog; this.eventLog = params.eventLog;
this.node = params.configuration.node(); this.node = params.configuration.node();
} }

View File

@@ -11,6 +11,7 @@ import nu.marginalia.service.discovery.property.ServicePartition;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
import javax.annotation.CheckReturnValue; import javax.annotation.CheckReturnValue;
import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@@ -59,6 +60,11 @@ public class FeedsClient {
.forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList()))); .forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList())));
} }
public boolean waitReady(Duration duration) throws InterruptedException {
return channelPool.awaitChannel(duration);
}
/** Get the hash of the feed data, for identifying when the data has been updated */ /** Get the hash of the feed data, for identifying when the data has been updated */
public String getFeedDataHash() { public String getFeedDataHash() {
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash) return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash)

View File

@@ -35,6 +35,7 @@ dependencies {
implementation libs.bundles.slf4j implementation libs.bundles.slf4j
implementation libs.commons.lang3 implementation libs.commons.lang3
implementation libs.commons.io implementation libs.commons.io
implementation libs.httpclient
implementation libs.wiremock implementation libs.wiremock
implementation libs.prometheus implementation libs.prometheus

View File

@@ -20,19 +20,36 @@ import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorage; import nu.marginalia.storage.model.FileStorage;
import nu.marginalia.storage.model.FileStorageType; import nu.marginalia.storage.model.FileStorageType;
import nu.marginalia.util.SimpleBlockingThreadPool; import nu.marginalia.util.SimpleBlockingThreadPool;
import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.cookie.StandardCookieSpec;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HeaderElement;
import org.apache.hc.core5.http.HeaderElements;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.io.SocketConfig;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
import org.apache.hc.core5.http.message.MessageSupport;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.sql.SQLException; import java.sql.SQLException;
import java.time.*; import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@@ -55,6 +72,8 @@ public class FeedFetcherService {
private final DomainCoordinator domainCoordinator; private final DomainCoordinator domainCoordinator;
private final HttpClient httpClient;
private volatile boolean updating; private volatile boolean updating;
@Inject @Inject
@@ -71,6 +90,83 @@ public class FeedFetcherService {
this.serviceHeartbeat = serviceHeartbeat; this.serviceHeartbeat = serviceHeartbeat;
this.executorClient = executorClient; this.executorClient = executorClient;
this.domainCoordinator = domainCoordinator; this.domainCoordinator = domainCoordinator;
final ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setSocketTimeout(15, TimeUnit.SECONDS)
.setConnectTimeout(15, TimeUnit.SECONDS)
.setValidateAfterInactivity(TimeValue.ofSeconds(5))
.build();
var connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
.setMaxConnPerRoute(2)
.setMaxConnTotal(50)
.setDefaultConnectionConfig(connectionConfig)
.build();
connectionManager.setDefaultSocketConfig(SocketConfig.custom()
.setSoLinger(TimeValue.ofSeconds(-1))
.setSoTimeout(Timeout.ofSeconds(10))
.build()
);
Thread.ofPlatform().daemon(true).start(() -> {
try {
for (;;) {
TimeUnit.SECONDS.sleep(15);
logger.info("Connection pool stats: {}", connectionManager.getTotalStats());
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
final RequestConfig defaultRequestConfig = RequestConfig.custom()
.setCookieSpec(StandardCookieSpec.IGNORE)
.setResponseTimeout(10, TimeUnit.SECONDS)
.setConnectionRequestTimeout(5, TimeUnit.MINUTES)
.build();
httpClient = HttpClients.custom()
.setDefaultRequestConfig(defaultRequestConfig)
.setConnectionManager(connectionManager)
.setUserAgent(WmsaHome.getUserAgent().uaIdentifier())
.setConnectionManager(connectionManager)
.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
// Default keep-alive duration is 3 minutes, but this is too long for us,
// as we are either going to re-use it fairly quickly or close it for a long time.
//
// So we set it to 30 seconds or clamp the server-provided value to a minimum of 10 seconds.
private static final TimeValue defaultValue = TimeValue.ofSeconds(30);
@Override
public TimeValue getKeepAliveDuration(HttpResponse response, HttpContext context) {
final Iterator<HeaderElement> it = MessageSupport.iterate(response, HeaderElements.KEEP_ALIVE);
while (it.hasNext()) {
final HeaderElement he = it.next();
final String param = he.getName();
final String value = he.getValue();
if (value == null)
continue;
if (!"timeout".equalsIgnoreCase(param))
continue;
try {
long timeout = Long.parseLong(value);
timeout = Math.clamp(timeout, 30, defaultValue.toSeconds());
return TimeValue.ofSeconds(timeout);
} catch (final NumberFormatException ignore) {
break;
}
}
return defaultValue;
}
})
.build();
} }
public enum UpdateMode { public enum UpdateMode {
@@ -86,13 +182,7 @@ public class FeedFetcherService {
try (FeedDbWriter writer = feedDb.createWriter(); try (FeedDbWriter writer = feedDb.createWriter();
HttpClient client = HttpClient.newBuilder() ExecutorService fetchExecutor = Executors.newVirtualThreadPerTaskExecutor();
.connectTimeout(Duration.ofSeconds(15))
.executor(Executors.newCachedThreadPool())
.followRedirects(HttpClient.Redirect.NORMAL)
.version(HttpClient.Version.HTTP_2)
.build();
ExecutorService fetchExecutor = Executors.newCachedThreadPool();
FeedJournal feedJournal = FeedJournal.create(); FeedJournal feedJournal = FeedJournal.create();
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds") var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds")
) { ) {
@@ -137,7 +227,8 @@ public class FeedFetcherService {
FetchResult feedData; FetchResult feedData;
try (DomainLock domainLock = domainCoordinator.lockDomain(new EdgeDomain(feed.domain()))) { try (DomainLock domainLock = domainCoordinator.lockDomain(new EdgeDomain(feed.domain()))) {
feedData = fetchFeedData(feed, client, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag); feedData = fetchFeedData(feed, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
TimeUnit.SECONDS.sleep(1); // Sleep before we yield the lock to avoid hammering the server from multiple processes
} catch (Exception ex) { } catch (Exception ex) {
feedData = new FetchResult.TransientError(); feedData = new FetchResult.TransientError();
} }
@@ -216,7 +307,6 @@ public class FeedFetcherService {
} }
private FetchResult fetchFeedData(FeedDefinition feed, private FetchResult fetchFeedData(FeedDefinition feed,
HttpClient client,
ExecutorService executorService, ExecutorService executorService,
@Nullable String ifModifiedSinceDate, @Nullable String ifModifiedSinceDate,
@Nullable String ifNoneMatchTag) @Nullable String ifNoneMatchTag)
@@ -224,59 +314,63 @@ public class FeedFetcherService {
try { try {
URI uri = new URI(feed.feedUrl()); URI uri = new URI(feed.feedUrl());
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder() var requestBuilder = ClassicRequestBuilder.get(uri)
.GET() .setHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier())
.uri(uri) .setHeader("Accept-Encoding", "gzip")
.header("User-Agent", WmsaHome.getUserAgent().uaIdentifier()) .setHeader("Accept", "text/*, */*;q=0.9");
.header("Accept-Encoding", "gzip")
.header("Accept", "text/*, */*;q=0.9")
.timeout(Duration.ofSeconds(15))
;
// Set the If-Modified-Since or If-None-Match headers if we have them // Set the If-Modified-Since or If-None-Match headers if we have them
// though since there are certain idiosyncrasies in server implementations, // though since there are certain idiosyncrasies in server implementations,
// we avoid setting both at the same time as that may turn a 304 into a 200. // we avoid setting both at the same time as that may turn a 304 into a 200.
if (ifNoneMatchTag != null) { if (ifNoneMatchTag != null) {
requestBuilder.header("If-None-Match", ifNoneMatchTag); requestBuilder.addHeader("If-None-Match", ifNoneMatchTag);
} else if (ifModifiedSinceDate != null) { } else if (ifModifiedSinceDate != null) {
requestBuilder.header("If-Modified-Since", ifModifiedSinceDate); requestBuilder.addHeader("If-Modified-Since", ifModifiedSinceDate);
} }
return httpClient.execute(requestBuilder.build(), rsp -> {
try {
logger.info("Code: {}, URL: {}", rsp.getCode(), uri);
HttpRequest getRequest = requestBuilder.build(); switch (rsp.getCode()) {
case 200 -> {
if (rsp.getEntity() == null) {
return new FetchResult.TransientError(); // No content to read, treat as transient error
}
byte[] responseData = EntityUtils.toByteArray(rsp.getEntity());
for (int i = 0; i < 3; i++) { // Decode the response body based on the Content-Type header
Header contentTypeHeader = rsp.getFirstHeader("Content-Type");
if (contentTypeHeader == null) {
return new FetchResult.TransientError();
}
String contentType = contentTypeHeader.getValue();
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), responseData);
/* Note we need to use an executor to time-limit the send() method in HttpClient, as // Grab the ETag header if it exists
* its support for timeouts only applies to the time until response starts to be received, Header etagHeader = rsp.getFirstHeader("ETag");
* and does not catch the case when the server starts to send data but then hangs. String newEtagValue = etagHeader == null ? null : etagHeader.getValue();
*/
HttpResponse<byte[]> rs = executorService.submit(
() -> client.send(getRequest, HttpResponse.BodyHandlers.ofByteArray()))
.get(15, TimeUnit.SECONDS);
if (rs.statusCode() == 429) { // Too Many Requests return new FetchResult.Success(bodyText, newEtagValue);
int retryAfter = Integer.parseInt(rs.headers().firstValue("Retry-After").orElse("2")); }
Thread.sleep(Duration.ofSeconds(Math.clamp(retryAfter, 1, 5))); case 304 -> {
continue; return new FetchResult.NotModified(); // via If-Modified-Since semantics
} }
case 404 -> {
String newEtagValue = rs.headers().firstValue("ETag").orElse(""); return new FetchResult.PermanentError(); // never try again
}
return switch (rs.statusCode()) { default -> {
case 200 -> { return new FetchResult.TransientError(); // we try again later
byte[] responseData = getResponseData(rs); }
String contentType = rs.headers().firstValue("Content-Type").orElse("");
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), responseData);
yield new FetchResult.Success(bodyText, newEtagValue);
} }
case 304 -> new FetchResult.NotModified(); // via If-Modified-Since semantics }
case 404 -> new FetchResult.PermanentError(); // never try again catch (Exception ex) {
default -> new FetchResult.TransientError(); // we try again later return new FetchResult.PermanentError(); // treat as permanent error
}; }
} finally {
EntityUtils.consumeQuietly(rsp.getEntity());
}
});
} }
catch (Exception ex) { catch (Exception ex) {
logger.debug("Error fetching feed", ex); logger.debug("Error fetching feed", ex);
@@ -285,19 +379,6 @@ public class FeedFetcherService {
return new FetchResult.TransientError(); return new FetchResult.TransientError();
} }
private byte[] getResponseData(HttpResponse<byte[]> response) throws IOException {
String encoding = response.headers().firstValue("Content-Encoding").orElse("");
if ("gzip".equals(encoding)) {
try (var stream = new GZIPInputStream(new ByteArrayInputStream(response.body()))) {
return stream.readAllBytes();
}
}
else {
return response.body();
}
}
public sealed interface FetchResult { public sealed interface FetchResult {
record Success(String value, String etag) implements FetchResult {} record Success(String value, String etag) implements FetchResult {}
record NotModified() implements FetchResult {} record NotModified() implements FetchResult {}

View File

@@ -5,6 +5,8 @@ import com.google.inject.Guice;
import com.google.inject.name.Names; import com.google.inject.name.Names;
import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.coordination.LocalDomainCoordinator;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.rss.db.FeedDb; import nu.marginalia.rss.db.FeedDb;
import nu.marginalia.rss.model.FeedItems; import nu.marginalia.rss.model.FeedItems;
@@ -82,6 +84,7 @@ class FeedFetcherServiceTest extends AbstractModule {
} }
public void configure() { public void configure() {
bind(DomainCoordinator.class).to(LocalDomainCoordinator.class);
bind(HikariDataSource.class).toInstance(dataSource); bind(HikariDataSource.class).toInstance(dataSource);
bind(ServiceRegistryIf.class).toInstance(Mockito.mock(ServiceRegistryIf.class)); bind(ServiceRegistryIf.class).toInstance(Mockito.mock(ServiceRegistryIf.class));
bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration(ServiceId.Executor, 1, "", "", 0, UUID.randomUUID())); bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration(ServiceId.Executor, 1, "", "", 0, UUID.randomUUID()));

View File

@@ -19,6 +19,8 @@ import nu.marginalia.model.crawldata.CrawlerDocumentStatus;
import nu.marginalia.model.idx.WordFlags; import nu.marginalia.model.idx.WordFlags;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
@@ -37,6 +39,7 @@ public class DocumentProcessor {
"text/plain", "text/plain",
"application/pdf"); "application/pdf");
private final Marker converterAuditMarker = MarkerFactory.getMarker("CONVERTER");
private final List<AbstractDocumentProcessorPlugin> processorPlugins = new ArrayList<>(); private final List<AbstractDocumentProcessorPlugin> processorPlugins = new ArrayList<>();
private final AnchorTextKeywords anchorTextKeywords; private final AnchorTextKeywords anchorTextKeywords;
@@ -81,12 +84,13 @@ public class DocumentProcessor {
catch (DisqualifiedException ex) { catch (DisqualifiedException ex) {
ret.state = UrlIndexingState.DISQUALIFIED; ret.state = UrlIndexingState.DISQUALIFIED;
ret.stateReason = ex.reason.toString(); ret.stateReason = ex.reason.toString();
logger.debug("Disqualified {}: {}", ret.url, ex.reason); logger.info(converterAuditMarker, "Disqualified {}: {}", ret.url, ex.reason);
} }
catch (Exception ex) { catch (Exception ex) {
ret.state = UrlIndexingState.DISQUALIFIED; ret.state = UrlIndexingState.DISQUALIFIED;
ret.stateReason = DisqualifiedException.DisqualificationReason.PROCESSING_EXCEPTION.toString(); ret.stateReason = DisqualifiedException.DisqualificationReason.PROCESSING_EXCEPTION.toString();
logger.info("Failed to convert " + crawledDocument.url, ex); logger.info(converterAuditMarker, "Failed to convert {}: {}", crawledDocument.url, ex.getClass().getSimpleName());
logger.warn(converterAuditMarker, "Failed to convert " + crawledDocument.url, ex);
} }
return ret; return ret;

View File

@@ -3,7 +3,6 @@ package nu.marginalia.converting.processor.logic;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import com.google.inject.name.Named; import com.google.inject.name.Named;
import nu.marginalia.converting.model.DisqualifiedException;
import nu.marginalia.language.model.DocumentLanguageData; import nu.marginalia.language.model.DocumentLanguageData;
@Singleton @Singleton
@@ -26,12 +25,9 @@ public class DocumentLengthLogic {
return (int) Math.round((totalWords / (double) numSentences) / 4.); return (int) Math.round((totalWords / (double) numSentences) / 4.);
} }
public void validateLength(DocumentLanguageData dld, public boolean validateLength(DocumentLanguageData dld, double modifier)
double modifier) throws DisqualifiedException
{ {
if (modifier * dld.totalNumWords() < minDocumentLength) { return modifier * dld.totalNumWords() >= minDocumentLength;
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LENGTH);
}
} }
} }

View File

@@ -68,6 +68,7 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
private final HtmlProcessorSpecializations htmlProcessorSpecializations; private final HtmlProcessorSpecializations htmlProcessorSpecializations;
private static final int MAX_DOCUMENT_LENGTH_BYTES = Integer.getInteger("converter.max-body-length",128_000); private static final int MAX_DOCUMENT_LENGTH_BYTES = Integer.getInteger("converter.max-body-length",128_000);
private static boolean lenientProcessing = Boolean.getBoolean("converter.lenientProcessing");
@Inject @Inject
public HtmlDocumentProcessorPlugin( public HtmlDocumentProcessorPlugin(
@@ -108,13 +109,13 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
DocumentClass documentClass) DocumentClass documentClass)
throws DisqualifiedException, URISyntaxException, IOException { throws DisqualifiedException, URISyntaxException, IOException {
if (languageFilter.isBlockedUnicodeRange(crawledDocument.documentBody(512))) { if (!lenientProcessing && languageFilter.isBlockedUnicodeRange(crawledDocument.documentBody(512))) {
throw new DisqualifiedException(DisqualificationReason.LANGUAGE); throw new DisqualifiedException(DisqualificationReason.LANGUAGE);
} }
Document doc = crawledDocument.parseBody(); Document doc = crawledDocument.parseBody();
if (AcceptableAds.hasAcceptableAdsTag(doc)) { if (!lenientProcessing && AcceptableAds.hasAcceptableAdsTag(doc)) {
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.ACCEPTABLE_ADS); throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.ACCEPTABLE_ADS);
} }
@@ -129,25 +130,27 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
final var specialization = htmlProcessorSpecializations.select(generatorParts, url); final var specialization = htmlProcessorSpecializations.select(generatorParts, url);
if (!specialization.shouldIndex(url)) { if (!lenientProcessing && !specialization.shouldIndex(url)) {
throw new DisqualifiedException(DisqualificationReason.IRRELEVANT); throw new DisqualifiedException(DisqualificationReason.IRRELEVANT);
} }
var prunedDoc = specialization.prune(doc); var prunedDoc = specialization.prune(doc);
final int length = getLength(doc); final int length = getLength(doc);
final DocumentFormat format = getDocumentFormat(doc); final DocumentFormat format = getDocumentFormat(doc);
final double quality = documentValuator.getQuality(crawledDocument, format, doc, length); final double quality = documentValuator.getQuality(crawledDocument, format, doc, length);
if (isDisqualified(documentClass, url, quality, doc.title())) { if (!lenientProcessing && isDisqualified(documentClass, url, quality, doc.title())) {
throw new DisqualifiedException(DisqualificationReason.QUALITY); throw new DisqualifiedException(DisqualificationReason.QUALITY);
} }
DocumentLanguageData dld = sentenceExtractorProvider.get().extractSentences(prunedDoc); DocumentLanguageData dld = sentenceExtractorProvider.get().extractSentences(prunedDoc);
checkDocumentLanguage(dld); checkDocumentLanguage(dld);
documentLengthLogic.validateLength(dld, specialization.lengthModifier() * documentClass.lengthLimitModifier());
if (!lenientProcessing && !documentLengthLogic.validateLength(dld, specialization.lengthModifier() * documentClass.lengthLimitModifier())) {
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LENGTH);
}
var ret = new ProcessedDocumentDetails(); var ret = new ProcessedDocumentDetails();

View File

@@ -43,6 +43,7 @@ public class PdfDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
private final DefaultSpecialization defaultSpecialization; private final DefaultSpecialization defaultSpecialization;
private static final Logger logger = LoggerFactory.getLogger(PdfDocumentProcessorPlugin.class); private static final Logger logger = LoggerFactory.getLogger(PdfDocumentProcessorPlugin.class);
private static boolean lenientProcessing = Boolean.getBoolean("converter.lenientProcessing");
@Inject @Inject
public PdfDocumentProcessorPlugin(@Named("max-title-length") Integer maxTitleLength, public PdfDocumentProcessorPlugin(@Named("max-title-length") Integer maxTitleLength,
@@ -81,7 +82,7 @@ public class PdfDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
String documentBody = crawledDocument.documentBody(); String documentBody = crawledDocument.documentBody();
if (languageFilter.isBlockedUnicodeRange(documentBody)) { if (!lenientProcessing && languageFilter.isBlockedUnicodeRange(documentBody)) {
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LANGUAGE); throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LANGUAGE);
} }
@@ -100,7 +101,9 @@ public class PdfDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
checkDocumentLanguage(dld); checkDocumentLanguage(dld);
documentLengthLogic.validateLength(dld, 1.0); if (!lenientProcessing && !documentLengthLogic.validateLength(dld, 1.0)) {
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LENGTH);
}
var ret = new ProcessedDocumentDetails(); var ret = new ProcessedDocumentDetails();

View File

@@ -37,6 +37,8 @@ public class PlainTextDocumentProcessorPlugin extends AbstractDocumentProcessorP
private final ThreadLocalSentenceExtractorProvider sentenceExtractorProvider; private final ThreadLocalSentenceExtractorProvider sentenceExtractorProvider;
private final DocumentLengthLogic documentLengthLogic; private final DocumentLengthLogic documentLengthLogic;
private static boolean lenientProcessing = Boolean.getBoolean("converter.lenientProcessing");
@Inject @Inject
public PlainTextDocumentProcessorPlugin(@Named("max-title-length") Integer maxTitleLength, public PlainTextDocumentProcessorPlugin(@Named("max-title-length") Integer maxTitleLength,
@@ -73,7 +75,7 @@ public class PlainTextDocumentProcessorPlugin extends AbstractDocumentProcessorP
String documentBody = crawledDocument.documentBody(); String documentBody = crawledDocument.documentBody();
if (languageFilter.isBlockedUnicodeRange(documentBody)) { if (!lenientProcessing && languageFilter.isBlockedUnicodeRange(documentBody)) {
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LANGUAGE); throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LANGUAGE);
} }
@@ -83,7 +85,9 @@ public class PlainTextDocumentProcessorPlugin extends AbstractDocumentProcessorP
checkDocumentLanguage(dld); checkDocumentLanguage(dld);
documentLengthLogic.validateLength(dld, 1.0); if (!lenientProcessing && !documentLengthLogic.validateLength(dld, 1.0)) {
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LENGTH);
}
var ret = new ProcessedDocumentDetails(); var ret = new ProcessedDocumentDetails();

View File

@@ -28,6 +28,8 @@ public final class CrawledDocument implements SerializableCrawlData {
@Nullable @Nullable
public String headers; public String headers;
private static int MAX_LENGTH_BYTES = 500_000;
public String documentBody() { public String documentBody() {
return DocumentBodyToString.getStringData( return DocumentBodyToString.getStringData(
ContentType.parse(contentType), ContentType.parse(contentType),
@@ -65,7 +67,7 @@ public final class CrawledDocument implements SerializableCrawlData {
return DocumentBodyToString.getParsedData( return DocumentBodyToString.getParsedData(
ContentType.parse(contentType), ContentType.parse(contentType),
documentBodyBytes, documentBodyBytes,
200_000, MAX_LENGTH_BYTES,
url); url);
} }

View File

@@ -50,6 +50,7 @@ dependencies {
implementation libs.notnull implementation libs.notnull
implementation libs.guava implementation libs.guava
implementation libs.httpclient
implementation dependencies.create(libs.guice.get()) { implementation dependencies.create(libs.guice.get()) {
exclude group: 'com.google.guava' exclude group: 'com.google.guava'
} }

View File

@@ -15,6 +15,7 @@ import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.db.DbDomainQueries; import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.db.DomainBlacklist; import nu.marginalia.db.DomainBlacklist;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.livecrawler.io.HttpClientProvider;
import nu.marginalia.loading.LoaderInputData; import nu.marginalia.loading.LoaderInputData;
import nu.marginalia.loading.documents.DocumentLoaderService; import nu.marginalia.loading.documents.DocumentLoaderService;
import nu.marginalia.loading.documents.KeywordLoaderService; import nu.marginalia.loading.documents.KeywordLoaderService;
@@ -32,12 +33,15 @@ import nu.marginalia.service.module.ServiceDiscoveryModule;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageBaseType; import nu.marginalia.storage.model.FileStorageBaseType;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.core5.io.CloseMode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.security.Security; import java.security.Security;
import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.HashMap; import java.util.HashMap;
@@ -74,7 +78,9 @@ public class LiveCrawlerMain extends ProcessMainClass {
DomainProcessor domainProcessor, DomainProcessor domainProcessor,
FileStorageService fileStorageService, FileStorageService fileStorageService,
KeywordLoaderService keywordLoaderService, KeywordLoaderService keywordLoaderService,
DocumentLoaderService documentLoaderService, DomainCoordinator domainCoordinator, HikariDataSource dataSource) DocumentLoaderService documentLoaderService,
DomainCoordinator domainCoordinator,
HikariDataSource dataSource)
throws Exception throws Exception
{ {
super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX); super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX);
@@ -148,7 +154,10 @@ public class LiveCrawlerMain extends ProcessMainClass {
} }
private void run() throws Exception { private void run() throws Exception {
Path basePath = fileStorageService.getStorageBase(FileStorageBaseType.STORAGE).asPath().resolve("live-crawl-data"); Path basePath = fileStorageService
.getStorageBase(FileStorageBaseType.STORAGE)
.asPath()
.resolve("live-crawl-data");
if (!Files.isDirectory(basePath)) { if (!Files.isDirectory(basePath)) {
Files.createDirectories(basePath); Files.createDirectories(basePath);
@@ -163,21 +172,38 @@ public class LiveCrawlerMain extends ProcessMainClass {
{ {
final Instant cutoff = Instant.now().minus(60, ChronoUnit.DAYS); final Instant cutoff = Instant.now().minus(60, ChronoUnit.DAYS);
/* ------------------------------------------------ */
/* Fetch the latest domains from the feeds database */
/* ------------------------------------------------ */
processHeartbeat.progress(LiveCrawlState.FETCH_LINKS); processHeartbeat.progress(LiveCrawlState.FETCH_LINKS);
Map<String, List<String>> urlsPerDomain = new HashMap<>(10_000); Map<String, List<String>> urlsPerDomain = new HashMap<>(10_000);
if (!feedsClient.waitReady(Duration.ofHours(1))) {
throw new RuntimeException("Feeds client never became ready, cannot proceed with live crawling");
}
feedsClient.getUpdatedDomains(cutoff, urlsPerDomain::put); feedsClient.getUpdatedDomains(cutoff, urlsPerDomain::put);
logger.info("Fetched data for {} domains", urlsPerDomain.size()); logger.info("Fetched data for {} domains", urlsPerDomain.size());
/* ------------------------------------- */
/* Prune the database from old entries */
/* ------------------------------------- */
processHeartbeat.progress(LiveCrawlState.PRUNE_DB); processHeartbeat.progress(LiveCrawlState.PRUNE_DB);
// Remove data that is too old
dataSet.prune(cutoff); dataSet.prune(cutoff);
/* ------------------------------------- */
/* Fetch the links for each domain */
/* ------------------------------------- */
processHeartbeat.progress(LiveCrawlState.CRAWLING); processHeartbeat.progress(LiveCrawlState.CRAWLING);
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainCoordinator, domainQueries, domainBlacklist); CloseableHttpClient client = HttpClientProvider.createClient();
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainCoordinator, domainQueries, client, domainBlacklist);
var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling")) var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling"))
{ {
for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) { for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) {
@@ -190,18 +216,29 @@ public class LiveCrawlerMain extends ProcessMainClass {
fetcher.scheduleRetrieval(domain, urls); fetcher.scheduleRetrieval(domain, urls);
} }
} }
finally {
client.close(CloseMode.GRACEFUL);
}
Path tempPath = dataSet.createWorkDir(); Path tempPath = dataSet.createWorkDir();
try { try {
/* ------------------------------------- */
/* Process the fetched links */
/* ------------------------------------- */
processHeartbeat.progress(LiveCrawlState.PROCESSING); processHeartbeat.progress(LiveCrawlState.PROCESSING);
try (var hb = heartbeat.createAdHocTaskHeartbeat("Processing"); try (var hb = heartbeat.createAdHocTaskHeartbeat("Processing");
var writer = new ConverterBatchWriter(tempPath, 0) var writer = new ConverterBatchWriter(tempPath, 0)
) { ) {
// Offset the documents' ordinals toward the upper range, to avoid an ID collisions with the // We need unique document ids that do not collide with the document id from the main index,
// main indexes (the maximum permissible for doc ordinal is value is 67_108_863, so this // so we offset the documents' ordinals toward the upper range.
// leaves us with a lot of headroom still) //
// The maximum permissible for doc ordinal is value is 67_108_863,
// so this leaves us with a lot of headroom still!
// Expected document count here is order of 10 :^)
writer.setOrdinalOffset(67_000_000); writer.setOrdinalOffset(67_000_000);
for (SerializableCrawlDataStream stream : hb.wrap("Processing", dataSet.getDataStreams())) { for (SerializableCrawlDataStream stream : hb.wrap("Processing", dataSet.getDataStreams())) {
@@ -209,10 +246,15 @@ public class LiveCrawlerMain extends ProcessMainClass {
} }
} }
/* ---------------------------------------------- */
/* Load the processed data into the link database */
/* and construct an index journal for the docs */
/* ---------------------------------------------- */
processHeartbeat.progress(LiveCrawlState.LOADING); processHeartbeat.progress(LiveCrawlState.LOADING);
LoaderInputData lid = new LoaderInputData(tempPath, 1); LoaderInputData lid = new LoaderInputData(tempPath, 1);
DomainIdRegistry domainIdRegistry = new DbDomainIdRegistry(dataSource); DomainIdRegistry domainIdRegistry = new DbDomainIdRegistry(dataSource);
keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, lid); keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, lid);
@@ -224,9 +266,16 @@ public class LiveCrawlerMain extends ProcessMainClass {
FileUtils.deleteDirectory(tempPath.toFile()); FileUtils.deleteDirectory(tempPath.toFile());
} }
// Construct the index
/* ------------------------------------- */
/* Finish up */
/* ------------------------------------- */
processHeartbeat.progress(LiveCrawlState.DONE); processHeartbeat.progress(LiveCrawlState.DONE);
// After we return from here, the LiveCrawlActor will trigger an index construction
// job. Unlike all the stuff we did in this process, it's identical to the real job
// so we don't need to do anything special from this process
} }
} }

View File

@@ -7,7 +7,6 @@ import nu.marginalia.contenttype.ContentType;
import nu.marginalia.contenttype.DocumentBodyToString; import nu.marginalia.contenttype.DocumentBodyToString;
import nu.marginalia.coordination.DomainCoordinator; import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.coordination.DomainLock; import nu.marginalia.coordination.DomainLock;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.retreival.CrawlDelayTimer; import nu.marginalia.crawl.retreival.CrawlDelayTimer;
import nu.marginalia.db.DbDomainQueries; import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.db.DomainBlacklist; import nu.marginalia.db.DomainBlacklist;
@@ -15,24 +14,21 @@ import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
import nu.marginalia.util.SimpleBlockingThreadPool; import nu.marginalia.util.SimpleBlockingThreadPool;
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
/** A simple link scraper that fetches URLs and stores them in a database, /** A simple link scraper that fetches URLs and stores them in a database,
* with no concept of a crawl frontier, WARC output, or other advanced features * with no concept of a crawl frontier, WARC output, or other advanced features
@@ -45,20 +41,21 @@ public class SimpleLinkScraper implements AutoCloseable {
private final LiveCrawlDataSet dataSet; private final LiveCrawlDataSet dataSet;
private final DbDomainQueries domainQueries; private final DbDomainQueries domainQueries;
private final DomainBlacklist domainBlacklist; private final DomainBlacklist domainBlacklist;
private final Duration connectTimeout = Duration.ofSeconds(10);
private final Duration readTimeout = Duration.ofSeconds(10);
private final DomainCoordinator domainCoordinator; private final DomainCoordinator domainCoordinator;
private final static int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 10 * 1024 * 1024); private final static int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 10 * 1024 * 1024);
private final HttpClient httpClient;
public SimpleLinkScraper(LiveCrawlDataSet dataSet, public SimpleLinkScraper(LiveCrawlDataSet dataSet,
DomainCoordinator domainCoordinator, DomainCoordinator domainCoordinator,
DbDomainQueries domainQueries, DbDomainQueries domainQueries,
HttpClient httpClient,
DomainBlacklist domainBlacklist) { DomainBlacklist domainBlacklist) {
this.dataSet = dataSet; this.dataSet = dataSet;
this.domainCoordinator = domainCoordinator; this.domainCoordinator = domainCoordinator;
this.domainQueries = domainQueries; this.domainQueries = domainQueries;
this.domainBlacklist = domainBlacklist; this.domainBlacklist = domainBlacklist;
this.httpClient = httpClient;
} }
public void scheduleRetrieval(EdgeDomain domain, List<String> urls) { public void scheduleRetrieval(EdgeDomain domain, List<String> urls) {
@@ -75,17 +72,19 @@ public class SimpleLinkScraper implements AutoCloseable {
EdgeUrl rootUrl = domain.toRootUrlHttps(); EdgeUrl rootUrl = domain.toRootUrlHttps();
List<EdgeUrl> relevantUrls = new ArrayList<>(); List<EdgeUrl> relevantUrls = new ArrayList<>(Math.max(1, urls.size()));
// Resolve absolute URLs
for (var url : urls) { for (var url : urls) {
Optional<EdgeUrl> optParsedUrl = lp.parseLink(rootUrl, url); Optional<EdgeUrl> optParsedUrl = lp.parseLink(rootUrl, url);
if (optParsedUrl.isEmpty()) {
if (optParsedUrl.isEmpty())
continue; continue;
}
if (dataSet.hasUrl(optParsedUrl.get())) { EdgeUrl absoluteUrl = optParsedUrl.get();
continue;
} if (!dataSet.hasUrl(absoluteUrl))
relevantUrls.add(optParsedUrl.get()); relevantUrls.add(absoluteUrl);
} }
if (relevantUrls.isEmpty()) { if (relevantUrls.isEmpty()) {
@@ -94,16 +93,10 @@ public class SimpleLinkScraper implements AutoCloseable {
int fetched = 0; int fetched = 0;
try (HttpClient client = HttpClient try (// throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove:
.newBuilder()
.connectTimeout(connectTimeout)
.followRedirects(HttpClient.Redirect.NEVER)
.version(HttpClient.Version.HTTP_2)
.build();
// throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove:
DomainLock lock = domainCoordinator.lockDomain(domain) DomainLock lock = domainCoordinator.lockDomain(domain)
) { ) {
SimpleRobotRules rules = fetchRobotsRules(rootUrl, client); SimpleRobotRules rules = fetchRobotsRules(rootUrl);
if (rules == null) { // I/O error fetching robots.txt if (rules == null) { // I/O error fetching robots.txt
// If we can't fetch the robots.txt, // If we can't fetch the robots.txt,
@@ -116,18 +109,19 @@ public class SimpleLinkScraper implements AutoCloseable {
CrawlDelayTimer timer = new CrawlDelayTimer(rules.getCrawlDelay()); CrawlDelayTimer timer = new CrawlDelayTimer(rules.getCrawlDelay());
for (var parsedUrl : relevantUrls) { for (var parsedUrl : relevantUrls) {
if (!rules.isAllowed(parsedUrl.toString())) { if (!rules.isAllowed(parsedUrl.toString())) {
maybeFlagAsBad(parsedUrl); maybeFlagAsBad(parsedUrl);
continue; continue;
} }
switch (fetchUrl(domainId, parsedUrl, timer, client)) { switch (fetchUrl(domainId, parsedUrl, timer)) {
case FetchResult.Success(int id, EdgeUrl docUrl, String body, String headers) -> { case FetchResult.Success(int id, EdgeUrl docUrl, String body, String headers) -> {
dataSet.saveDocument(id, docUrl, body, headers, ""); dataSet.saveDocument(id, docUrl, body, headers, "");
fetched++; fetched++;
} }
case FetchResult.Error(EdgeUrl docUrl) -> maybeFlagAsBad(docUrl); case FetchResult.Error(EdgeUrl docUrl) -> {
maybeFlagAsBad(docUrl);
}
} }
} }
} }
@@ -150,111 +144,107 @@ public class SimpleLinkScraper implements AutoCloseable {
} }
@Nullable @Nullable
private SimpleRobotRules fetchRobotsRules(EdgeUrl rootUrl, HttpClient client) throws IOException, InterruptedException, URISyntaxException { private SimpleRobotRules fetchRobotsRules(EdgeUrl rootUrl) throws URISyntaxException {
var robotsRequest = HttpRequest.newBuilder(rootUrl.withPathAndParam("/robots.txt", null).asURI()) ClassicHttpRequest request = ClassicRequestBuilder.get(rootUrl.withPathAndParam("/robots.txt", null).asURI())
.GET() .setHeader("User-Agent", WmsaHome.getUserAgent().uaString())
.header("User-Agent", WmsaHome.getUserAgent().uaString()) .setHeader("Accept-Encoding", "gzip")
.header("Accept-Encoding","gzip") .build();
.timeout(readTimeout);
// Fetch the robots.txt
try { try {
SimpleRobotRulesParser parser = new SimpleRobotRulesParser(); return httpClient.execute(request, rsp -> {
HttpResponse<byte[]> robotsTxt = client.send(robotsRequest.build(), HttpResponse.BodyHandlers.ofByteArray()); if (rsp.getEntity() == null) {
return null;
if (robotsTxt.statusCode() == 200) { }
return parser.parseContent(rootUrl.toString(), try {
getResponseData(robotsTxt), if (rsp.getCode() == 200) {
robotsTxt.headers().firstValue("Content-Type").orElse("text/plain"), var contentTypeHeader = rsp.getFirstHeader("Content-Type");
WmsaHome.getUserAgent().uaIdentifier()); if (contentTypeHeader == null) {
return null; // No content type header, can't parse
}
return new SimpleRobotRulesParser().parseContent(
rootUrl.toString(),
EntityUtils.toByteArray(rsp.getEntity()),
contentTypeHeader.getValue(),
WmsaHome.getUserAgent().uaIdentifier()
);
} else if (rsp.getCode() == 404) {
return new SimpleRobotRules(SimpleRobotRules.RobotRulesMode.ALLOW_ALL);
}
} finally {
EntityUtils.consumeQuietly(rsp.getEntity());
}
return null;
});
}
catch (IOException e) {
logger.error("Error fetching robots.txt for {}: {}", rootUrl, e.getMessage());
return null; // I/O error fetching robots.txt
}
finally {
try {
TimeUnit.SECONDS.sleep(1);
} }
else if (robotsTxt.statusCode() == 404) { catch (InterruptedException e) {
return new SimpleRobotRules(SimpleRobotRules.RobotRulesMode.ALLOW_ALL); Thread.currentThread().interrupt();
throw new RuntimeException(e);
} }
} }
catch (IOException ex) {
logger.error("Error fetching robots.txt for {}: {} {}", rootUrl, ex.getClass().getSimpleName(), ex.getMessage());
}
return null;
} }
/** Fetch a URL and store it in the database /** Fetch a URL and store it in the database
*/ */
private FetchResult fetchUrl(int domainId, EdgeUrl parsedUrl, CrawlDelayTimer timer, HttpClient client) throws Exception { private FetchResult fetchUrl(int domainId, EdgeUrl parsedUrl, CrawlDelayTimer timer) throws Exception {
timer.waitFetchDelay(); ClassicHttpRequest request = ClassicRequestBuilder.get(parsedUrl.asURI())
.setHeader("User-Agent", WmsaHome.getUserAgent().uaString())
HttpRequest request = HttpRequest.newBuilder(parsedUrl.asURI()) .setHeader("Accept", "text/html")
.GET() .setHeader("Accept-Encoding", "gzip")
.header("User-Agent", WmsaHome.getUserAgent().uaString())
.header("Accept", "text/html")
.header("Accept-Encoding", "gzip")
.timeout(readTimeout)
.build(); .build();
try { try {
HttpResponse<byte[]> response = client.send(request, HttpResponse.BodyHandlers.ofByteArray()); return httpClient.execute(request, rsp -> {
try {
if (rsp.getCode() == 200) {
String contentType = rsp.getFirstHeader("Content-Type").getValue();
if (!contentType.toLowerCase().startsWith("text/html")) {
return new FetchResult.Error(parsedUrl);
}
// Handle rate limiting by waiting and retrying once byte[] body = EntityUtils.toByteArray(rsp.getEntity(), MAX_SIZE);
if (response.statusCode() == 429) {
timer.waitRetryDelay(new HttpFetcherImpl.RateLimitException(
response.headers().firstValue("Retry-After").orElse("5")
));
response = client.send(request, HttpResponse.BodyHandlers.ofByteArray());
}
String contentType = response.headers().firstValue("Content-Type").orElse("").toLowerCase(); String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), body);
if (response.statusCode() == 200) { StringBuilder headersStr = new StringBuilder();
if (!contentType.toLowerCase().startsWith("text/html")) { for (var header : rsp.getHeaders()) {
return new FetchResult.Error(parsedUrl); headersStr.append(header.getName()).append(": ").append(header.getValue()).append("\n");
}
return new FetchResult.Success(domainId, parsedUrl, bodyText, headersStr.toString());
}
} finally {
if (rsp.getEntity() != null) {
EntityUtils.consumeQuietly(rsp.getEntity());
}
} }
return new FetchResult.Error(parsedUrl);
byte[] body = getResponseData(response); });
if (body.length > MAX_SIZE) {
return new FetchResult.Error(parsedUrl);
}
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), body);
return new FetchResult.Success(domainId, parsedUrl, bodyText, headersToString(response.headers()));
}
} }
catch (IOException ex) { catch (IOException e) {
// We don't want a full stack trace on every error, as it's quite common and very noisy logger.error("Error fetching {}: {}", parsedUrl, e.getMessage());
logger.error("Error fetching URL {}: {} {}", parsedUrl, ex.getClass().getSimpleName(), ex.getMessage()); // If we can't fetch the URL, we return an error result
// so that the caller can decide what to do with it.
}
finally {
timer.waitFetchDelay();
} }
return new FetchResult.Error(parsedUrl); return new FetchResult.Error(parsedUrl);
} }
private byte[] getResponseData(HttpResponse<byte[]> response) throws IOException {
String encoding = response.headers().firstValue("Content-Encoding").orElse("");
if ("gzip".equals(encoding)) {
try (var stream = new GZIPInputStream(new ByteArrayInputStream(response.body()))) {
return stream.readAllBytes();
}
}
else {
return response.body();
}
}
sealed interface FetchResult { sealed interface FetchResult {
record Success(int domainId, EdgeUrl url, String body, String headers) implements FetchResult {} record Success(int domainId, EdgeUrl url, String body, String headers) implements FetchResult {}
record Error(EdgeUrl url) implements FetchResult {} record Error(EdgeUrl url) implements FetchResult {}
} }
private String headersToString(HttpHeaders headers) {
StringBuilder headersStr = new StringBuilder();
headers.map().forEach((k, v) -> {
headersStr.append(k).append(": ").append(v).append("\n");
});
return headersStr.toString();
}
@Override @Override
public void close() throws Exception { public void close() throws Exception {
pool.shutDown(); pool.shutDown();

View File

@@ -0,0 +1,126 @@
package nu.marginalia.livecrawler.io;
import com.google.inject.Provider;
import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.cookie.StandardCookieSpec;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.core5.http.HeaderElement;
import org.apache.hc.core5.http.HeaderElements;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.io.SocketConfig;
import org.apache.hc.core5.http.message.MessageSupport;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
public class HttpClientProvider implements Provider<HttpClient> {
private static final HttpClient client;
private static PoolingHttpClientConnectionManager connectionManager;
private static final Logger logger = LoggerFactory.getLogger(HttpClientProvider.class);
static {
try {
client = createClient();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static CloseableHttpClient createClient() throws NoSuchAlgorithmException, KeyManagementException {
final ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setSocketTimeout(15, TimeUnit.SECONDS)
.setConnectTimeout(15, TimeUnit.SECONDS)
.setValidateAfterInactivity(TimeValue.ofSeconds(5))
.build();
connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
.setMaxConnPerRoute(2)
.setMaxConnTotal(50)
.setDefaultConnectionConfig(connectionConfig)
.build();
connectionManager.setDefaultSocketConfig(SocketConfig.custom()
.setSoLinger(TimeValue.ofSeconds(-1))
.setSoTimeout(Timeout.ofSeconds(10))
.build()
);
Thread.ofPlatform().daemon(true).start(() -> {
try {
for (;;) {
TimeUnit.SECONDS.sleep(15);
logger.info("Connection pool stats: {}", connectionManager.getTotalStats());
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
final RequestConfig defaultRequestConfig = RequestConfig.custom()
.setCookieSpec(StandardCookieSpec.IGNORE)
.setResponseTimeout(10, TimeUnit.SECONDS)
.setConnectionRequestTimeout(5, TimeUnit.MINUTES)
.build();
return HttpClients.custom()
.setConnectionManager(connectionManager)
.setRetryStrategy(new RetryStrategy())
.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
// Default keep-alive duration is 3 minutes, but this is too long for us,
// as we are either going to re-use it fairly quickly or close it for a long time.
//
// So we set it to 30 seconds or clamp the server-provided value to a minimum of 10 seconds.
private static final TimeValue defaultValue = TimeValue.ofSeconds(30);
@Override
public TimeValue getKeepAliveDuration(HttpResponse response, HttpContext context) {
final Iterator<HeaderElement> it = MessageSupport.iterate(response, HeaderElements.KEEP_ALIVE);
while (it.hasNext()) {
final HeaderElement he = it.next();
final String param = he.getName();
final String value = he.getValue();
if (value == null)
continue;
if (!"timeout".equalsIgnoreCase(param))
continue;
try {
long timeout = Long.parseLong(value);
timeout = Math.clamp(timeout, 30, defaultValue.toSeconds());
return TimeValue.ofSeconds(timeout);
} catch (final NumberFormatException ignore) {
break;
}
}
return defaultValue;
}
})
.disableRedirectHandling()
.setDefaultRequestConfig(defaultRequestConfig)
.build();
}
@Override
public HttpClient get() {
return client;
}
}

View File

@@ -0,0 +1,79 @@
package nu.marginalia.livecrawler.io;
import org.apache.hc.client5.http.HttpHostConnectException;
import org.apache.hc.client5.http.HttpRequestRetryStrategy;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.util.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
import java.io.IOException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
public class RetryStrategy implements HttpRequestRetryStrategy {
private static final Logger logger = LoggerFactory.getLogger(RetryStrategy.class);
@Override
public boolean retryRequest(HttpRequest request, IOException exception, int executionCount, HttpContext context) {
return switch (exception) {
case SocketTimeoutException ste -> false;
case SSLException ssle -> false;
case UnknownHostException uhe -> false;
case HttpHostConnectException ex -> executionCount < 2;
case SocketException ex -> executionCount < 2;
default -> executionCount <= 3;
};
}
@Override
public boolean retryRequest(HttpResponse response, int executionCount, HttpContext context) {
return switch (response.getCode()) {
case 500, 503 -> executionCount <= 2;
case 429 -> executionCount <= 3;
default -> false;
};
}
@Override
public TimeValue getRetryInterval(HttpRequest request, IOException exception, int executionCount, HttpContext context) {
return TimeValue.ofSeconds(1);
}
@Override
public TimeValue getRetryInterval(HttpResponse response, int executionCount, HttpContext context) {
int statusCode = response.getCode();
// Give 503 a bit more time
if (statusCode == 503) return TimeValue.ofSeconds(5);
if (statusCode == 429) {
// get the Retry-After header
var retryAfterHeader = response.getFirstHeader("Retry-After");
if (retryAfterHeader == null) {
return TimeValue.ofSeconds(3);
}
String retryAfter = retryAfterHeader.getValue();
if (retryAfter == null) {
return TimeValue.ofSeconds(2);
}
try {
int retryAfterTime = Integer.parseInt(retryAfter);
retryAfterTime = Math.clamp(retryAfterTime, 1, 5);
return TimeValue.ofSeconds(retryAfterTime);
} catch (NumberFormatException e) {
logger.warn("Invalid Retry-After header: {}", retryAfter);
}
}
return TimeValue.ofSeconds(2);
}
}

View File

@@ -3,10 +3,13 @@ package nu.marginalia.livecrawler;
import nu.marginalia.coordination.LocalDomainCoordinator; import nu.marginalia.coordination.LocalDomainCoordinator;
import nu.marginalia.db.DomainBlacklistImpl; import nu.marginalia.db.DomainBlacklistImpl;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.livecrawler.io.HttpClientProvider;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.crawldata.CrawledDocument; import nu.marginalia.model.crawldata.CrawledDocument;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.core5.io.CloseMode;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@@ -16,29 +19,34 @@ import org.mockito.Mockito;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List; import java.util.List;
class SimpleLinkScraperTest { class SimpleLinkScraperTest {
private Path tempDir; private Path tempDir;
private LiveCrawlDataSet dataSet; private LiveCrawlDataSet dataSet;
private CloseableHttpClient httpClient;
@BeforeEach @BeforeEach
public void setUp() throws IOException, SQLException { public void setUp() throws IOException, SQLException, NoSuchAlgorithmException, KeyManagementException {
tempDir = Files.createTempDirectory(getClass().getSimpleName()); tempDir = Files.createTempDirectory(getClass().getSimpleName());
dataSet = new LiveCrawlDataSet(tempDir); dataSet = new LiveCrawlDataSet(tempDir);
httpClient = HttpClientProvider.createClient();
} }
@AfterEach @AfterEach
public void tearDown() throws Exception { public void tearDown() throws Exception {
dataSet.close(); dataSet.close();
httpClient.close(CloseMode.IMMEDIATE);
FileUtils.deleteDirectory(tempDir.toFile()); FileUtils.deleteDirectory(tempDir.toFile());
} }
@Test @Test
public void testRetrieveNow() throws Exception { public void testRetrieveNow() throws Exception {
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(), null, Mockito.mock(DomainBlacklistImpl.class)); var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(), null, httpClient, Mockito.mock(DomainBlacklistImpl.class));
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/")); int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
Assertions.assertEquals(1, fetched); Assertions.assertEquals(1, fetched);
@@ -58,7 +66,7 @@ class SimpleLinkScraperTest {
@Test @Test
public void testRetrieveNow_Redundant() throws Exception { public void testRetrieveNow_Redundant() throws Exception {
dataSet.saveDocument(1, new EdgeUrl("https://www.marginalia.nu/"), "<html>", "", "127.0.0.1"); dataSet.saveDocument(1, new EdgeUrl("https://www.marginalia.nu/"), "<html>", "", "127.0.0.1");
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(),null, Mockito.mock(DomainBlacklistImpl.class)); var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(),null, httpClient, Mockito.mock(DomainBlacklistImpl.class));
// If the requested URL is already in the dataSet, we retrieveNow should shortcircuit and not fetch anything // If the requested URL is already in the dataSet, we retrieveNow should shortcircuit and not fetch anything
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/")); int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));

View File

@@ -40,6 +40,8 @@ public class LoaderMain extends ProcessMainClass {
private final KeywordLoaderService keywordLoaderService; private final KeywordLoaderService keywordLoaderService;
private final DocumentLoaderService documentLoaderService; private final DocumentLoaderService documentLoaderService;
private static boolean insertFoundDomains = Boolean.getBoolean("loader.insertFoundDomains");
public static void main(String... args) { public static void main(String... args) {
try { try {
new org.mariadb.jdbc.Driver(); new org.mariadb.jdbc.Driver();
@@ -99,14 +101,29 @@ public class LoaderMain extends ProcessMainClass {
try { try {
var results = ForkJoinPool.commonPool() var results = ForkJoinPool.commonPool()
.invokeAll( .invokeAll(List.of());
List.of(
() -> linksService.loadLinks(domainIdRegistry, heartbeat, inputData), if ( true == insertFoundDomains ) {
() -> keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, inputData), results = ForkJoinPool.commonPool()
() -> documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, inputData), .invokeAll(
() -> domainService.loadDomainMetadata(domainIdRegistry, heartbeat, inputData) List.of(
) () -> linksService.loadLinks(domainIdRegistry, heartbeat, inputData),
); () -> keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, inputData),
() -> documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, inputData),
() -> domainService.loadDomainMetadata(domainIdRegistry, heartbeat, inputData)
)
);
}
else {
results = ForkJoinPool.commonPool()
.invokeAll(
List.of(
() -> keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, inputData),
() -> documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, inputData),
() -> domainService.loadDomainMetadata(domainIdRegistry, heartbeat, inputData)
)
);
}
for (var result : results) { for (var result : results) {
if (result.state() == Future.State.FAILED) { if (result.state() == Future.State.FAILED) {

View File

@@ -25,6 +25,8 @@ import java.util.Set;
@Singleton @Singleton
public class DomainLoaderService { public class DomainLoaderService {
private static boolean insertFoundDomains = Boolean.getBoolean("loader.insertFoundDomains");
private final HikariDataSource dataSource; private final HikariDataSource dataSource;
private final Logger logger = LoggerFactory.getLogger(DomainLoaderService.class); private final Logger logger = LoggerFactory.getLogger(DomainLoaderService.class);
private final int nodeId; private final int nodeId;
@@ -84,25 +86,34 @@ public class DomainLoaderService {
// Add domains that are linked to from the domains we've just crawled, but with -1 affinity meaning they // Add domains that are linked to from the domains we've just crawled, but with -1 affinity meaning they
// can be grabbed by any index node // can be grabbed by any index node
try (var inserter = new DomainInserter(conn, -1); if ( true == insertFoundDomains ) {
var processHeartbeat = heartbeat.createAdHocTaskHeartbeat("INSERT_LINKED_DOMAINS")) { logger.info("Adding found domains");
// Add linked domains, but with -1 affinity meaning they can be grabbed by any index node
int pageIdx = 0;
for (SlopTable.Ref<SlopDomainLinkRecord> page : inputData.listDomainLinkPages()) { try (var inserter = new DomainInserter(conn, -1);
processHeartbeat.progress("INSERT", pageIdx++, domainLinkPageRefs.size()); var processHeartbeat = heartbeat.createAdHocTaskHeartbeat("INSERT_LINKED_DOMAINS")) {
// Add linked domains, but with -1 affinity meaning they can be grabbed by any index node
int pageIdx = 0;
try (var reader = new SlopDomainLinkRecord.Reader(page)) { for (SlopTable.Ref<SlopDomainLinkRecord> page : inputData.listDomainLinkPages()) {
while (reader.hasMore()) { processHeartbeat.progress("INSERT", pageIdx++, domainLinkPageRefs.size());
SlopDomainLinkRecord record = reader.next();
String domainName = record.dest(); try (var reader = new SlopDomainLinkRecord.Reader(page)) {
if (domainNamesAll.add(domainName)) { while (reader.hasMore()) {
inserter.accept(new EdgeDomain(domainName)); SlopDomainLinkRecord record = reader.next();
String domainName = record.dest();
if (domainNamesAll.add(domainName)) {
inserter.accept(new EdgeDomain(domainName));
}
} }
} }
} }
} }
} }
else {
logger.info("Skipping found domains");
}
taskHeartbeat.progress(Steps.UPDATE_AFFINITY_AND_IP); taskHeartbeat.progress(Steps.UPDATE_AFFINITY_AND_IP);

View File

@@ -30,10 +30,11 @@ public class ApiSearchOperator {
public ApiSearchResults query(String query, public ApiSearchResults query(String query,
int count, int count,
int domainCount,
int index, int index,
NsfwFilterTier filterTier) NsfwFilterTier filterTier)
{ {
var rsp = queryClient.search(createParams(query, count, index, filterTier)); var rsp = queryClient.search(createParams(query, count, domainCount, index, filterTier));
return new ApiSearchResults("RESTRICTED", query, return new ApiSearchResults("RESTRICTED", query,
rsp.results() rsp.results()
@@ -44,13 +45,13 @@ public class ApiSearchOperator {
.collect(Collectors.toList())); .collect(Collectors.toList()));
} }
private QueryParams createParams(String query, int count, int index, NsfwFilterTier filterTirer) { private QueryParams createParams(String query, int count, int domainCount, int index, NsfwFilterTier filterTirer) {
SearchSetIdentifier searchSet = selectSearchSet(index); SearchSetIdentifier searchSet = selectSearchSet(index);
return new QueryParams( return new QueryParams(
query, query,
RpcQueryLimits.newBuilder() RpcQueryLimits.newBuilder()
.setResultsByDomain(2) .setResultsByDomain(Math.clamp(domainCount, 1, 100))
.setResultsTotal(Math.min(100, count)) .setResultsTotal(Math.min(100, count))
.setTimeoutMs(150) .setTimeoutMs(150)
.setFetchSize(8192) .setFetchSize(8192)

View File

@@ -119,6 +119,7 @@ public class ApiService extends SparkService {
} }
int count = intParam(request, "count", 20); int count = intParam(request, "count", 20);
int domainCount = intParam(request, "dc", 2);
int index = intParam(request, "index", 3); int index = intParam(request, "index", 3);
int nsfw = intParam(request, "nsfw", 1); int nsfw = intParam(request, "nsfw", 1);
@@ -137,7 +138,7 @@ public class ApiService extends SparkService {
.labels(license.key) .labels(license.key)
.time(() -> .time(() ->
searchOperator searchOperator
.query(query, count, index, nsfwFilterTier) .query(query, count, domainCount, index, nsfwFilterTier)
.withLicense(license.getLicense()) .withLicense(license.getLicense())
); );
} }

View File

@@ -20,7 +20,7 @@ public class BangCommand implements SearchCommandInterface {
{ {
bangsToPattern.put("!g", "https://www.google.com/search?q=%s"); bangsToPattern.put("!g", "https://www.google.com/search?q=%s");
bangsToPattern.put("!ddg", "https://duckduckgo.com/?q=%s"); bangsToPattern.put("!ddg", "https://duckduckgo.com/?q=%s");
bangsToPattern.put("!w", "https://search.marginalia.nu/search?query=%s+site:en.wikipedia.org&profile=wiki"); bangsToPattern.put("!w", "https://old-search.marginalia.nu/search?query=%s+site:en.wikipedia.org&profile=wiki");
} }
@Override @Override

View File

@@ -20,7 +20,7 @@ public class BangCommand implements SearchCommandInterface {
{ {
bangsToPattern.put("!g", "https://www.google.com/search?q=%s"); bangsToPattern.put("!g", "https://www.google.com/search?q=%s");
bangsToPattern.put("!ddg", "https://duckduckgo.com/?q=%s"); bangsToPattern.put("!ddg", "https://duckduckgo.com/?q=%s");
bangsToPattern.put("!w", "https://search.marginalia.nu/search?query=%s+site:en.wikipedia.org&profile=wiki"); bangsToPattern.put("!w", "/search?query=%s+site:en.wikipedia.org");
} }
@Override @Override
@@ -34,7 +34,7 @@ public class BangCommand implements SearchCommandInterface {
if (match.isPresent()) { if (match.isPresent()) {
var url = String.format(redirectPattern, URLEncoder.encode(match.get(), StandardCharsets.UTF_8)); var url = String.format(redirectPattern, URLEncoder.encode(match.get(), StandardCharsets.UTF_8));
new MapModelAndView("redirect.jte", Map.of("url", url)); return Optional.of(new MapModelAndView("redirect.jte", Map.of("url", url)));
} }
} }

View File

@@ -33,19 +33,19 @@
title="This domain is blacklisted and will not be crawled or indexed"> title="This domain is blacklisted and will not be crawled or indexed">
Blacklisted Blacklisted
</span> </span>
@elseif (siteInfo.domainInformation().getNodeAffinity() == 0)
<span
class="bg-blue-50 text-blue-900 border-blue-200 dark:bg-black dark:text-blue-100 border p-1 font-sm rounded"
title="This domain will be crawled by the search engine">
In Crawler Queue
</span>
@elseif (siteInfo.domainInformation().isUnknownDomain()) @elseif (siteInfo.domainInformation().isUnknownDomain())
<span <span
class="bg-purple-50 text-purple-900 border-purple-200 dark:bg-black dark:text-purple-100 border p-1 font-sm rounded" class="bg-purple-50 text-purple-900 border-purple-200 dark:bg-black dark:text-purple-100 border p-1 font-sm rounded"
title="The search engine is not aware of this domain name"> title="The search engine is not aware of this domain name">
Unknown Unknown
</span> </span>
@elseif (siteInfo.domainInformation().isUnknownDomain()) @elseif (siteInfo.domainInformation().getNodeAffinity() == 0)
<span
class="bg-blue-50 text-blue-900 border-blue-200 dark:bg-black dark:text-blue-100 border p-1 font-sm rounded"
title="This domain will be crawled by the search engine">
In Crawler Queue
</span>
@elseif (!siteInfo.domainInformation().isUnknownDomain())
<span <span
class="bg-yellow-50 text-yellow-900 border-yellow-200 dark:bg-black dark:text-yellow-100 border p-1 font-sm rounded" class="bg-yellow-50 text-yellow-900 border-yellow-200 dark:bg-black dark:text-yellow-100 border p-1 font-sm rounded"
title="The search engine is aware of this domain, but it's not slated for crawling"> title="The search engine is aware of this domain, but it's not slated for crawling">

View File

@@ -0,0 +1,19 @@
package nu.marginalia.search.command.commands;
import nu.marginalia.WebsiteUrl;
import nu.marginalia.search.command.SearchParameters;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
class BangCommandTest {
@Test
void testWikipediaRedirect() {
BangCommand bc = new BangCommand();
assertTrue(bc.process(SearchParameters.defaultsForQuery(new WebsiteUrl("test"), "!w plato", 1)).isPresent());
assertFalse(bc.process(SearchParameters.defaultsForQuery(new WebsiteUrl("test"), "plato", 1)).isPresent());
}
}

View File

@@ -20,6 +20,6 @@ public class StatusModule extends AbstractModule {
bind(String.class) bind(String.class)
.annotatedWith(Names.named("searchEngineTestQuery")) .annotatedWith(Names.named("searchEngineTestQuery"))
.toInstance(System.getProperty("status-service.public-query", .toInstance(System.getProperty("status-service.public-query",
"https://marginalia-search.com/search?query=plato&ref=marginalia-automatic-metrics")); "https://old-search.marginalia.nu/search?query=plato&ref=marginalia-automatic-metrics"));
} }
} }

View File

@@ -74,6 +74,8 @@ public class ControlSysActionsService {
Spark.post("/actions/recrawl-all", this::recrawlAll, Redirects.redirectToOverview); Spark.post("/actions/recrawl-all", this::recrawlAll, Redirects.redirectToOverview);
Spark.post("/actions/flush-api-caches", this::flushApiCaches, Redirects.redirectToOverview); Spark.post("/actions/flush-api-caches", this::flushApiCaches, Redirects.redirectToOverview);
Spark.post("/actions/reload-blogs-list", this::reloadBlogsList, Redirects.redirectToOverview); Spark.post("/actions/reload-blogs-list", this::reloadBlogsList, Redirects.redirectToOverview);
Spark.post("/actions/update-nsfw-filters", this::updateNsfwFilters, Redirects.redirectToOverview);
} }
catch (Exception e) { catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@@ -132,6 +134,14 @@ public class ControlSysActionsService {
return ""; return "";
} }
public Object updateNsfwFilters(Request request, Response response) throws Exception {
eventLog.logEvent("USER-ACTION", "UPDATE-NSFW-FILTERS");
executorClient.updateNsfwFilters();
return "";
}
public Object flushApiCaches(Request request, Response response) throws Exception { public Object flushApiCaches(Request request, Response response) throws Exception {
eventLog.logEvent("USER-ACTION", "FLUSH-API-CACHES"); eventLog.logEvent("USER-ACTION", "FLUSH-API-CACHES");
apiOutbox.sendNotice("FLUSH_CACHES", ""); apiOutbox.sendNotice("FLUSH_CACHES", "");

View File

@@ -53,6 +53,31 @@
</div> </div>
</div> </div>
<div class="accordion-item">
<h2 class="accordion-header">
<button class="accordion-button collapsed"
type="button"
data-bs-toggle="collapse"
data-bs-target="#collapseNsfwFilters"
aria-expanded="false"
aria-controls="collapseNsfwFilters">
Update NSFW Filters Definitions
</button>
</h2>
<div id="collapseNsfwFilters" class="accordion-collapse collapse p-3" data-bs-parent="#accordionActions">
<div class="mb-3">
This will fetch NSFW filter definitions.
</div>
<form method="post" action="actions/update-nsfw-filters">
<button
class="btn btn-primary me-md-2"
onclick="return confirm('Confirm update NSFW filters');"
type="submit">
Update NSFW Filter</button>
</form>
</div>
</div>
<div class="accordion-item"> <div class="accordion-item">
<h2 class="accordion-header"> <h2 class="accordion-header">
<button class="accordion-button collapsed" <button class="accordion-button collapsed"