1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

19 Commits

Author SHA1 Message Date
Viktor Lofgren
52582a6d7d (experiment) Also add clients to loom experiment 2025-07-16 18:08:00 +02:00
Viktor Lofgren
ec0e39ad32 (experiment) Also add clients to loom experiment 2025-07-16 17:28:57 +02:00
Viktor Lofgren
6a15aee4b0 (ping) Fix arithmetic errors in backoff strategy due to long overflow 2025-07-16 17:23:36 +02:00
Viktor Lofgren
bd5111e8a2 (experimental) Add flag for using loom/virtual threads in gRPC executor 2025-07-16 17:12:07 +02:00
Viktor Lofgren
1ecbeb0272 (doc) Update ROADMAP.md 2025-07-14 13:38:34 +02:00
Viktor Lofgren
390f053406 (api) Add query parameter 'dc' for specifying the max number of results per domain 2025-07-14 10:09:30 +02:00
Viktor Lofgren
b03c43224c (search) Fix redirects in new search UI 2025-07-11 23:44:45 +02:00
Viktor Lofgren
9b4ce9e9eb (search) Fix !w redirect 2025-07-11 23:28:09 +02:00
Viktor
81ac02a695 Merge pull request #209 from us3r1d/master
added converter.insertFoundDomains property
2025-07-11 21:34:04 +02:00
krystal
47f624fb3b changed converter.insertFoundDomains to loader.insertFoundDomains 2025-07-11 12:13:45 -07:00
krystal
c866f19cbb added converter.insertFoundDomains property 2025-07-10 15:36:59 -07:00
Viktor Lofgren
518278493b (converter) Increase the max byte length when parsing crawled documents to 500 kB from 200 kB. 2025-07-08 21:22:02 +02:00
Viktor Lofgren
1ac0bab0b8 (converter) Also exclude length checks when lenient processing is enabled 2025-07-08 20:37:53 +02:00
Viktor Lofgren
08b45ed10a (converter) Add system property converter.lenientProcessing to disable most disqualification checks 2025-07-08 19:44:51 +02:00
Viktor Lofgren
f2cfb91973 (converter) Add audit log of converter errors and rejections 2025-07-08 19:15:41 +02:00
Viktor Lofgren
2f79524eb3 (refac) Rename ProcessService to ProcessSpawnerService for clarity 2025-07-07 15:48:44 +02:00
Viktor Lofgren
3b00142c96 (search) Don't say unknown domains are in the crawler queue 2025-07-06 18:42:36 +02:00
Viktor Lofgren
294ab19177 (status) Use old-search for status service instead of marginalia-search.com 2025-07-06 15:40:53 +02:00
Viktor Lofgren
6f1659ecb2 (control) Add GUI for NSFW Filter Update trigger 2025-06-25 16:03:27 +02:00
53 changed files with 370 additions and 172 deletions

View File

@@ -48,10 +48,6 @@ filter for any API consumer.
I've talked to the stract dev and he does not think it's a good idea to mimic their optics language, which is quite ad-hoc, but instead to work together to find some new common description language for this. I've talked to the stract dev and he does not think it's a good idea to mimic their optics language, which is quite ad-hoc, but instead to work together to find some new common description language for this.
## Show favicons next to search results
This is expected from search engines. Basic proof of concept sketch of fetching this data has been done, but the feature is some way from being reality.
## Specialized crawler for github ## Specialized crawler for github
One of the search engine's biggest limitations right now is that it does not index github at all. A specialized crawler that fetches at least the readme.md would go a long way toward providing search capabilities in this domain. One of the search engine's biggest limitations right now is that it does not index github at all. A specialized crawler that fetches at least the readme.md would go a long way toward providing search capabilities in this domain.
@@ -66,6 +62,10 @@ The documents database probably should have some sort of flag indicating it's a
PDF parsing is known to be a bit of a security liability so some thought needs to be put in PDF parsing is known to be a bit of a security liability so some thought needs to be put in
that direction as well. that direction as well.
## Show favicons next to search results (COMPLETED 2025-03)
This is expected from search engines. Basic proof of concept sketch of fetching this data has been done, but the feature is some way from being reality.
## Web Design Overhaul (COMPLETED 2025-01) ## Web Design Overhaul (COMPLETED 2025-01)
The design is kinda clunky and hard to maintain, and needlessly outdated-looking. The design is kinda clunky and hard to maintain, and needlessly outdated-looking.

View File

@@ -13,6 +13,7 @@ import nu.marginalia.service.discovery.property.ServicePartition;
import nu.marginalia.util.NamedExecutorFactory; import nu.marginalia.util.NamedExecutorFactory;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Function; import java.util.function.Function;
@Singleton @Singleton
@@ -20,10 +21,15 @@ public class GrpcChannelPoolFactory {
private final NodeConfigurationWatcher nodeConfigurationWatcher; private final NodeConfigurationWatcher nodeConfigurationWatcher;
private final ServiceRegistryIf serviceRegistryIf; private final ServiceRegistryIf serviceRegistryIf;
private static final Executor executor = NamedExecutorFactory.createFixed("gRPC-Channel-Pool",
Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 2, 32)); private static final boolean useLoom = Boolean.getBoolean("system.experimentalUseLoom");
private static final Executor offloadExecutor = NamedExecutorFactory.createFixed("gRPC-Offload-Pool",
Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 2, 32)); private static final Executor executor = useLoom
? Executors.newVirtualThreadPerTaskExecutor()
: NamedExecutorFactory.createFixed("gRPC-Channel-Pool", Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 2, 32));
private static final Executor offloadExecutor = useLoom
? Executors.newVirtualThreadPerTaskExecutor()
: NamedExecutorFactory.createFixed("gRPC-Offload-Pool", Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 2, 32));
@Inject @Inject
public GrpcChannelPoolFactory(NodeConfigurationWatcher nodeConfigurationWatcher, public GrpcChannelPoolFactory(NodeConfigurationWatcher nodeConfigurationWatcher,

View File

@@ -13,9 +13,14 @@ import nu.marginalia.util.NamedExecutorFactory;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class GrpcServer { public class GrpcServer {
private final Server server; private final Server server;
private static final boolean useLoom = Boolean.getBoolean("system.experimentalUseLoom");
public GrpcServer(ServiceConfiguration config, public GrpcServer(ServiceConfiguration config,
ServiceRegistryIf serviceRegistry, ServiceRegistryIf serviceRegistry,
ServicePartition partition, ServicePartition partition,
@@ -26,8 +31,13 @@ public class GrpcServer {
int nThreads = Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 2, 16); int nThreads = Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 2, 16);
// Start the gRPC server // Start the gRPC server
ExecutorService workExecutor = useLoom ?
Executors.newVirtualThreadPerTaskExecutor() :
NamedExecutorFactory.createFixed("nettyExecutor", nThreads);
var grpcServerBuilder = NettyServerBuilder.forAddress(new InetSocketAddress(config.bindAddress(), port)) var grpcServerBuilder = NettyServerBuilder.forAddress(new InetSocketAddress(config.bindAddress(), port))
.executor(NamedExecutorFactory.createFixed("nettyExecutor", nThreads)) .executor(workExecutor)
.workerEventLoopGroup(new NioEventLoopGroup(nThreads, NamedExecutorFactory.createFixed("Worker-ELG", nThreads))) .workerEventLoopGroup(new NioEventLoopGroup(nThreads, NamedExecutorFactory.createFixed("Worker-ELG", nThreads)))
.bossEventLoopGroup(new NioEventLoopGroup(nThreads, NamedExecutorFactory.createFixed("Boss-ELG", nThreads))) .bossEventLoopGroup(new NioEventLoopGroup(nThreads, NamedExecutorFactory.createFixed("Boss-ELG", nThreads)))
.channelType(NioServerSocketChannel.class); .channelType(NioServerSocketChannel.class);

View File

@@ -7,6 +7,7 @@
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
</Filters> </Filters>
</Console> </Console>
<Console name="ProcessConsole" target="SYSTEM_OUT"> <Console name="ProcessConsole" target="SYSTEM_OUT">
@@ -23,6 +24,7 @@
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="PROCESS" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="PROCESS" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
</Filters> </Filters>
<SizeBasedTriggeringPolicy size="10MB" /> <SizeBasedTriggeringPolicy size="10MB" />
</RollingFile> </RollingFile>
@@ -36,6 +38,16 @@
<MarkerFilter marker="CRAWLER" onMatch="ALLOW" onMismatch="DENY" /> <MarkerFilter marker="CRAWLER" onMatch="ALLOW" onMismatch="DENY" />
</Filters> </Filters>
</RollingFile> </RollingFile>
<RollingFile name="LogToFile" fileName="${env:WMSA_LOG_DIR:-/var/log/wmsa}/converter-audit-${env:WMSA_SERVICE_NODE:-0}.log" filePattern="/var/log/wmsa/converter-audit-${env:WMSA_SERVICE_NODE:-0}-log-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz"
ignoreExceptions="false">
<PatternLayout>
<Pattern>%d{yyyy-MM-dd HH:mm:ss,SSS}: %msg{nolookups}%n</Pattern>
</PatternLayout>
<SizeBasedTriggeringPolicy size="100MB" />
<Filters>
<MarkerFilter marker="CONVERTER" onMatch="ALLOW" onMismatch="DENY" />
</Filters>
</RollingFile>
</Appenders> </Appenders>
<Loggers> <Loggers>
<Logger name="org.apache.zookeeper" level="WARN" /> <Logger name="org.apache.zookeeper" level="WARN" />

View File

@@ -8,6 +8,7 @@
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
</Filters> </Filters>
</Console> </Console>
<Console name="ConsoleWarn" target="SYSTEM_OUT"> <Console name="ConsoleWarn" target="SYSTEM_OUT">
@@ -18,6 +19,7 @@
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
</Filters> </Filters>
</Console> </Console>
<Console name="ConsoleError" target="SYSTEM_OUT"> <Console name="ConsoleError" target="SYSTEM_OUT">
@@ -28,6 +30,7 @@
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
</Filters> </Filters>
</Console> </Console>
<Console name="ConsoleFatal" target="SYSTEM_OUT"> <Console name="ConsoleFatal" target="SYSTEM_OUT">
@@ -38,6 +41,7 @@
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
</Filters> </Filters>
</Console> </Console>
<Console name="ProcessConsole" target="SYSTEM_OUT"> <Console name="ProcessConsole" target="SYSTEM_OUT">
@@ -57,6 +61,7 @@
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" /> <MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
</Filters> </Filters>
</RollingFile> </RollingFile>
<RollingFile name="LogToFile" fileName="${env:WMSA_LOG_DIR:-/var/log/wmsa}/crawler-audit-${env:WMSA_SERVICE_NODE:-0}.log" filePattern="/var/log/wmsa/crawler-audit-${env:WMSA_SERVICE_NODE:-0}-log-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz" <RollingFile name="LogToFile" fileName="${env:WMSA_LOG_DIR:-/var/log/wmsa}/crawler-audit-${env:WMSA_SERVICE_NODE:-0}.log" filePattern="/var/log/wmsa/crawler-audit-${env:WMSA_SERVICE_NODE:-0}-log-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz"
@@ -69,6 +74,16 @@
<MarkerFilter marker="CRAWLER" onMatch="ALLOW" onMismatch="DENY" /> <MarkerFilter marker="CRAWLER" onMatch="ALLOW" onMismatch="DENY" />
</Filters> </Filters>
</RollingFile> </RollingFile>
<RollingFile name="LogToFile" fileName="${env:WMSA_LOG_DIR:-/var/log/wmsa}/converter-audit-${env:WMSA_SERVICE_NODE:-0}.log" filePattern="/var/log/wmsa/converter-audit-${env:WMSA_SERVICE_NODE:-0}-log-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz"
ignoreExceptions="false">
<PatternLayout>
<Pattern>%d{yyyy-MM-dd HH:mm:ss,SSS}: %msg{nolookups}%n</Pattern>
</PatternLayout>
<SizeBasedTriggeringPolicy size="100MB" />
<Filters>
<MarkerFilter marker="CONVERTER" onMatch="ALLOW" onMismatch="DENY" />
</Filters>
</RollingFile>
</Appenders> </Appenders>
<Loggers> <Loggers>
<Logger name="org.apache.zookeeper" level="WARN" /> <Logger name="org.apache.zookeeper" level="WARN" />

View File

@@ -9,6 +9,7 @@ import nu.marginalia.executor.storage.FileStorageFile;
import nu.marginalia.executor.upload.UploadDirContents; import nu.marginalia.executor.upload.UploadDirContents;
import nu.marginalia.executor.upload.UploadDirItem; import nu.marginalia.executor.upload.UploadDirItem;
import nu.marginalia.functions.execution.api.*; import nu.marginalia.functions.execution.api.*;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.service.ServiceId; import nu.marginalia.service.ServiceId;
import nu.marginalia.service.client.GrpcChannelPoolFactory; import nu.marginalia.service.client.GrpcChannelPoolFactory;
import nu.marginalia.service.client.GrpcMultiNodeChannelPool; import nu.marginalia.service.client.GrpcMultiNodeChannelPool;
@@ -25,27 +26,37 @@ import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List; import java.util.List;
import static nu.marginalia.functions.execution.api.ExecutorApiGrpc.ExecutorApiBlockingStub; import static nu.marginalia.functions.execution.api.ExecutorApiGrpc.ExecutorApiBlockingStub;
@Singleton @Singleton
public class ExecutorClient { public class ExecutorClient {
private final MqPersistence persistence;
private final GrpcMultiNodeChannelPool<ExecutorApiBlockingStub> channelPool; private final GrpcMultiNodeChannelPool<ExecutorApiBlockingStub> channelPool;
private static final Logger logger = LoggerFactory.getLogger(ExecutorClient.class); private static final Logger logger = LoggerFactory.getLogger(ExecutorClient.class);
private final ServiceRegistryIf registry; private final ServiceRegistryIf registry;
@Inject @Inject
public ExecutorClient(ServiceRegistryIf registry, public ExecutorClient(ServiceRegistryIf registry,
MqPersistence persistence,
GrpcChannelPoolFactory grpcChannelPoolFactory) GrpcChannelPoolFactory grpcChannelPoolFactory)
{ {
this.registry = registry; this.registry = registry;
this.persistence = persistence;
this.channelPool = grpcChannelPoolFactory this.channelPool = grpcChannelPoolFactory
.createMulti( .createMulti(
ServiceKey.forGrpcApi(ExecutorApiGrpc.class, ServicePartition.multi()), ServiceKey.forGrpcApi(ExecutorApiGrpc.class, ServicePartition.multi()),
ExecutorApiGrpc::newBlockingStub); ExecutorApiGrpc::newBlockingStub);
} }
private long createTrackingTokenMsg(String task, int node, Duration ttl) throws Exception {
return persistence.sendNewMessage("task-tracking[" + node + "]", "export-client", null, task, "", ttl);
}
public void startFsm(int node, String actorName) { public void startFsm(int node, String actorName) {
channelPool.call(ExecutorApiBlockingStub::startFsm) channelPool.call(ExecutorApiBlockingStub::startFsm)
.forNode(node) .forNode(node)
@@ -96,6 +107,16 @@ public class ExecutorClient {
.build()); .build());
} }
public long updateNsfwFilters() throws Exception {
long msgId = createTrackingTokenMsg("nsfw-filters", 1, Duration.ofHours(6));
channelPool.call(ExecutorApiBlockingStub::updateNsfwFilters)
.forNode(1)
.run(RpcUpdateNsfwFilters.newBuilder().setMsgId(msgId).build());
return msgId;
}
public ActorRunStates getActorStates(int node) { public ActorRunStates getActorStates(int node) {
try { try {
var rs = channelPool.call(ExecutorApiBlockingStub::getActorStates) var rs = channelPool.call(ExecutorApiBlockingStub::getActorStates)

View File

@@ -18,6 +18,8 @@ service ExecutorApi {
rpc calculateAdjacencies(Empty) returns (Empty) {} rpc calculateAdjacencies(Empty) returns (Empty) {}
rpc restoreBackup(RpcFileStorageId) returns (Empty) {} rpc restoreBackup(RpcFileStorageId) returns (Empty) {}
rpc updateNsfwFilters(RpcUpdateNsfwFilters) returns (Empty) {}
rpc restartExecutorService(Empty) returns (Empty) {} rpc restartExecutorService(Empty) returns (Empty) {}
} }
@@ -66,6 +68,9 @@ message RpcExportRequest {
int64 fileStorageId = 1; int64 fileStorageId = 1;
int64 msgId = 2; int64 msgId = 2;
} }
message RpcUpdateNsfwFilters {
int64 msgId = 1;
}
message RpcFileStorageIdWithDomainName { message RpcFileStorageIdWithDomainName {
int64 fileStorageId = 1; int64 fileStorageId = 1;
string targetDomainName = 2; string targetDomainName = 2;

View File

@@ -2,10 +2,11 @@ package nu.marginalia.actor;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.functions.execution.api.*; import nu.marginalia.functions.execution.api.RpcFsmName;
import nu.marginalia.functions.execution.api.RpcProcessId;
import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -14,18 +15,18 @@ import spark.Spark;
@Singleton @Singleton
public class ActorApi { public class ActorApi {
private final ExecutorActorControlService actors; private final ExecutorActorControlService actors;
private final ProcessService processService; private final ProcessSpawnerService processSpawnerService;
private final MqPersistence mqPersistence; private final MqPersistence mqPersistence;
private final ServiceConfiguration serviceConfiguration; private final ServiceConfiguration serviceConfiguration;
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
@Inject @Inject
public ActorApi(ExecutorActorControlService actors, public ActorApi(ExecutorActorControlService actors,
ProcessService processService, ProcessSpawnerService processSpawnerService,
MqPersistence mqPersistence, MqPersistence mqPersistence,
ServiceConfiguration serviceConfiguration) ServiceConfiguration serviceConfiguration)
{ {
this.actors = actors; this.actors = actors;
this.processService = processService; this.processSpawnerService = processSpawnerService;
this.mqPersistence = mqPersistence; this.mqPersistence = mqPersistence;
this.serviceConfiguration = serviceConfiguration; this.serviceConfiguration = serviceConfiguration;
} }
@@ -43,7 +44,7 @@ public class ActorApi {
} }
public Object stopProcess(RpcProcessId processId) { public Object stopProcess(RpcProcessId processId) {
ProcessService.ProcessId id = ProcessService.translateExternalIdBase(processId.getProcessId()); ProcessSpawnerService.ProcessId id = ProcessSpawnerService.translateExternalIdBase(processId.getProcessId());
try { try {
String inbox = id.name().toLowerCase() + ":" + serviceConfiguration.node(); String inbox = id.name().toLowerCase() + ":" + serviceConfiguration.node();
@@ -60,7 +61,7 @@ public class ActorApi {
} }
} }
processService.kill(id); processSpawnerService.kill(id);
} }
catch (Exception ex) { catch (Exception ex) {
logger.error("Failed to stop process {}", id, ex); logger.error("Failed to stop process {}", id, ex);

View File

@@ -6,7 +6,7 @@ import java.util.Set;
public enum ExecutorActor { public enum ExecutorActor {
PREC_EXPORT_ALL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), PREC_EXPORT_ALL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
SYNC_NSFW_LISTS(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), UPDATE_NSFW_LISTS(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.SIDELOAD, NodeProfile.REALTIME),
CRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), CRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),

View File

@@ -113,7 +113,7 @@ public class ExecutorActorControlService {
register(ExecutorActor.UPDATE_RSS, updateRssActor); register(ExecutorActor.UPDATE_RSS, updateRssActor);
register(ExecutorActor.MIGRATE_CRAWL_DATA, migrateCrawlDataActor); register(ExecutorActor.MIGRATE_CRAWL_DATA, migrateCrawlDataActor);
register(ExecutorActor.SYNC_NSFW_LISTS, updateNsfwFiltersActor); register(ExecutorActor.UPDATE_NSFW_LISTS, updateNsfwFiltersActor);
if (serviceConfiguration.node() == 1) { if (serviceConfiguration.node() == 1) {
register(ExecutorActor.PREC_EXPORT_ALL, exportAllPrecessionActor); register(ExecutorActor.PREC_EXPORT_ALL, exportAllPrecessionActor);

View File

@@ -4,11 +4,14 @@ import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.*; import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry; import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.process.ProcessService; import nu.marginalia.actor.state.Resume;
import nu.marginalia.actor.state.Terminal;
import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -24,13 +27,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class AbstractProcessSpawnerActor extends RecordActorPrototype { public class AbstractProcessSpawnerActor extends RecordActorPrototype {
private final MqPersistence persistence; private final MqPersistence persistence;
private final ProcessService processService; private final ProcessSpawnerService processSpawnerService;
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
public static final int MAX_ATTEMPTS = 3; public static final int MAX_ATTEMPTS = 3;
private final String inboxName; private final String inboxName;
private final ProcessService.ProcessId processId; private final ProcessSpawnerService.ProcessId processId;
private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final int node; private final int node;
@@ -50,7 +53,7 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype {
for (;;) { for (;;) {
var messages = persistence.eavesdrop(inboxName, 1); var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty() && !processService.isRunning(processId)) { if (messages.isEmpty() && !processSpawnerService.isRunning(processId)) {
synchronized (processId) { synchronized (processId) {
processId.wait(5000); processId.wait(5000);
} }
@@ -92,7 +95,7 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype {
catch (InterruptedException ex) { catch (InterruptedException ex) {
// We get this exception when the process is cancelled by the user // We get this exception when the process is cancelled by the user
processService.kill(processId); processSpawnerService.kill(processId);
setCurrentMessageToDead(); setCurrentMessageToDead();
yield new Aborted(); yield new Aborted();
@@ -112,13 +115,13 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype {
public AbstractProcessSpawnerActor(Gson gson, public AbstractProcessSpawnerActor(Gson gson,
ServiceConfiguration configuration, ServiceConfiguration configuration,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService, ProcessSpawnerService processSpawnerService,
String inboxName, String inboxName,
ProcessService.ProcessId processId) { ProcessSpawnerService.ProcessId processId) {
super(gson); super(gson);
this.node = configuration.node(); this.node = configuration.node();
this.persistence = persistence; this.persistence = persistence;
this.processService = processService; this.processSpawnerService = processSpawnerService;
this.inboxName = inboxName + ":" + node; this.inboxName = inboxName + ":" + node;
this.processId = processId; this.processId = processId;
} }
@@ -149,7 +152,7 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype {
// Run this call in a separate thread so that this thread can be interrupted waiting for it // Run this call in a separate thread so that this thread can be interrupted waiting for it
executorService.submit(() -> { executorService.submit(() -> {
try { try {
processService.trigger(processId); processSpawnerService.trigger(processId);
} catch (Exception e) { } catch (Exception e) {
logger.warn("Error in triggering process", e); logger.warn("Error in triggering process", e);
error.set(true); error.set(true);

View File

@@ -4,9 +4,9 @@ import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor; import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@Singleton @Singleton
@@ -17,13 +17,13 @@ public class ConverterMonitorActor extends AbstractProcessSpawnerActor {
public ConverterMonitorActor(Gson gson, public ConverterMonitorActor(Gson gson,
ServiceConfiguration configuration, ServiceConfiguration configuration,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessSpawnerService processSpawnerService) {
super(gson, super(gson,
configuration, configuration,
persistence, persistence,
processService, processSpawnerService,
ProcessInboxNames.CONVERTER_INBOX, ProcessInboxNames.CONVERTER_INBOX,
ProcessService.ProcessId.CONVERTER); ProcessSpawnerService.ProcessId.CONVERTER);
} }

View File

@@ -4,9 +4,9 @@ import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor; import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@Singleton @Singleton
@@ -16,13 +16,13 @@ public class CrawlerMonitorActor extends AbstractProcessSpawnerActor {
public CrawlerMonitorActor(Gson gson, public CrawlerMonitorActor(Gson gson,
ServiceConfiguration configuration, ServiceConfiguration configuration,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessSpawnerService processSpawnerService) {
super(gson, super(gson,
configuration, configuration,
persistence, persistence,
processService, processSpawnerService,
ProcessInboxNames.CRAWLER_INBOX, ProcessInboxNames.CRAWLER_INBOX,
ProcessService.ProcessId.CRAWLER); ProcessSpawnerService.ProcessId.CRAWLER);
} }

View File

@@ -6,7 +6,7 @@ import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor; import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@Singleton @Singleton
@@ -16,13 +16,13 @@ public class ExportTaskMonitorActor extends AbstractProcessSpawnerActor {
public ExportTaskMonitorActor(Gson gson, public ExportTaskMonitorActor(Gson gson,
ServiceConfiguration configuration, ServiceConfiguration configuration,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessSpawnerService processSpawnerService) {
super(gson, super(gson,
configuration, configuration,
persistence, persistence,
processService, processSpawnerService,
ProcessInboxNames.EXPORT_TASK_INBOX, ProcessInboxNames.EXPORT_TASK_INBOX,
ProcessService.ProcessId.EXPORT_TASKS); ProcessSpawnerService.ProcessId.EXPORT_TASKS);
} }

View File

@@ -4,9 +4,9 @@ import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor; import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@Singleton @Singleton
@@ -17,13 +17,13 @@ public class IndexConstructorMonitorActor extends AbstractProcessSpawnerActor {
public IndexConstructorMonitorActor(Gson gson, public IndexConstructorMonitorActor(Gson gson,
ServiceConfiguration configuration, ServiceConfiguration configuration,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessSpawnerService processSpawnerService) {
super(gson, super(gson,
configuration, configuration,
persistence, persistence,
processService, processSpawnerService,
ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX, ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX,
ProcessService.ProcessId.INDEX_CONSTRUCTOR); ProcessSpawnerService.ProcessId.INDEX_CONSTRUCTOR);
} }

View File

@@ -6,7 +6,7 @@ import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor; import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@Singleton @Singleton
@@ -16,13 +16,13 @@ public class LiveCrawlerMonitorActor extends AbstractProcessSpawnerActor {
public LiveCrawlerMonitorActor(Gson gson, public LiveCrawlerMonitorActor(Gson gson,
ServiceConfiguration configuration, ServiceConfiguration configuration,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessSpawnerService processSpawnerService) {
super(gson, super(gson,
configuration, configuration,
persistence, persistence,
processService, processSpawnerService,
ProcessInboxNames.LIVE_CRAWLER_INBOX, ProcessInboxNames.LIVE_CRAWLER_INBOX,
ProcessService.ProcessId.LIVE_CRAWLER); ProcessSpawnerService.ProcessId.LIVE_CRAWLER);
} }

View File

@@ -4,9 +4,9 @@ import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor; import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@Singleton @Singleton
@@ -17,13 +17,13 @@ public class LoaderMonitorActor extends AbstractProcessSpawnerActor {
public LoaderMonitorActor(Gson gson, public LoaderMonitorActor(Gson gson,
ServiceConfiguration configuration, ServiceConfiguration configuration,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessSpawnerService processSpawnerService) {
super(gson, super(gson,
configuration, configuration,
persistence, processService, persistence, processSpawnerService,
ProcessInboxNames.LOADER_INBOX, ProcessInboxNames.LOADER_INBOX,
ProcessService.ProcessId.LOADER); ProcessSpawnerService.ProcessId.LOADER);
} }
} }

View File

@@ -6,7 +6,7 @@ import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor; import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@Singleton @Singleton
@@ -16,13 +16,13 @@ public class NdpMonitorActor extends AbstractProcessSpawnerActor {
public NdpMonitorActor(Gson gson, public NdpMonitorActor(Gson gson,
ServiceConfiguration configuration, ServiceConfiguration configuration,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessSpawnerService processSpawnerService) {
super(gson, super(gson,
configuration, configuration,
persistence, persistence,
processService, processSpawnerService,
ProcessInboxNames.NDP_INBOX, ProcessInboxNames.NDP_INBOX,
ProcessService.ProcessId.NDP); ProcessSpawnerService.ProcessId.NDP);
} }

View File

@@ -13,7 +13,7 @@ import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqapi.ping.PingRequest; import nu.marginalia.mqapi.ping.PingRequest;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -25,17 +25,21 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
// Unlike other monitor actors, the ping monitor will not merely wait for a request
// to be sent, but send one itself, hence we can't extend AbstractProcessSpawnerActor
// but have to reimplement a lot of the same logic ourselves.
@Singleton @Singleton
public class PingMonitorActor extends RecordActorPrototype { public class PingMonitorActor extends RecordActorPrototype {
private final MqPersistence persistence; private final MqPersistence persistence;
private final ProcessService processService; private final ProcessSpawnerService processSpawnerService;
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
public static final int MAX_ATTEMPTS = 3; public static final int MAX_ATTEMPTS = 3;
private final String inboxName; private final String inboxName;
private final ProcessService.ProcessId processId; private final ProcessSpawnerService.ProcessId processId;
private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final int node; private final int node;
private final Gson gson; private final Gson gson;
@@ -53,7 +57,6 @@ public class PingMonitorActor extends RecordActorPrototype {
return switch (self) { return switch (self) {
case Initial i -> { case Initial i -> {
PingRequest request = new PingRequest(); PingRequest request = new PingRequest();
persistence.sendNewMessage(inboxName, null, null, persistence.sendNewMessage(inboxName, null, null,
"PingRequest", "PingRequest",
gson.toJson(request), gson.toJson(request),
@@ -65,7 +68,7 @@ public class PingMonitorActor extends RecordActorPrototype {
for (;;) { for (;;) {
var messages = persistence.eavesdrop(inboxName, 1); var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty() && !processService.isRunning(processId)) { if (messages.isEmpty() && !processSpawnerService.isRunning(processId)) {
synchronized (processId) { synchronized (processId) {
processId.wait(5000); processId.wait(5000);
} }
@@ -107,7 +110,7 @@ public class PingMonitorActor extends RecordActorPrototype {
catch (InterruptedException ex) { catch (InterruptedException ex) {
// We get this exception when the process is cancelled by the user // We get this exception when the process is cancelled by the user
processService.kill(processId); processSpawnerService.kill(processId);
setCurrentMessageToDead(); setCurrentMessageToDead();
yield new Aborted(); yield new Aborted();
@@ -127,14 +130,14 @@ public class PingMonitorActor extends RecordActorPrototype {
public PingMonitorActor(Gson gson, public PingMonitorActor(Gson gson,
ServiceConfiguration configuration, ServiceConfiguration configuration,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) throws SQLException { ProcessSpawnerService processSpawnerService) throws SQLException {
super(gson); super(gson);
this.gson = gson; this.gson = gson;
this.node = configuration.node(); this.node = configuration.node();
this.persistence = persistence; this.persistence = persistence;
this.processService = processService; this.processSpawnerService = processSpawnerService;
this.inboxName = ProcessInboxNames.PING_INBOX + ":" + node; this.inboxName = ProcessInboxNames.PING_INBOX + ":" + node;
this.processId = ProcessService.ProcessId.PING; this.processId = ProcessSpawnerService.ProcessId.PING;
} }
/** Sets the message to dead in the database to avoid /** Sets the message to dead in the database to avoid
@@ -163,7 +166,7 @@ public class PingMonitorActor extends RecordActorPrototype {
// Run this call in a separate thread so that this thread can be interrupted waiting for it // Run this call in a separate thread so that this thread can be interrupted waiting for it
executorService.submit(() -> { executorService.submit(() -> {
try { try {
processService.trigger(processId); processSpawnerService.trigger(processId);
} catch (Exception e) { } catch (Exception e) {
logger.warn("Error in triggering process", e); logger.warn("Error in triggering process", e);
error.set(true); error.set(true);

View File

@@ -8,7 +8,7 @@ import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior; import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorStep; import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume; import nu.marginalia.actor.state.Resume;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit;
public class ProcessLivenessMonitorActor extends RecordActorPrototype { public class ProcessLivenessMonitorActor extends RecordActorPrototype {
private final ServiceEventLog eventLogService; private final ServiceEventLog eventLogService;
private final ProcessService processService; private final ProcessSpawnerService processSpawnerService;
private final HikariDataSource dataSource; private final HikariDataSource dataSource;
private final int node; private final int node;
@@ -49,7 +49,7 @@ public class ProcessLivenessMonitorActor extends RecordActorPrototype {
var processId = heartbeat.getProcessId(); var processId = heartbeat.getProcessId();
if (null == processId) continue; if (null == processId) continue;
if (processService.isRunning(processId) && heartbeat.lastSeenMillis() < 10_000) if (processSpawnerService.isRunning(processId) && heartbeat.lastSeenMillis() < 10_000)
continue; continue;
flagProcessAsStopped(heartbeat); flagProcessAsStopped(heartbeat);
@@ -72,12 +72,12 @@ public class ProcessLivenessMonitorActor extends RecordActorPrototype {
public ProcessLivenessMonitorActor(Gson gson, public ProcessLivenessMonitorActor(Gson gson,
ServiceEventLog eventLogService, ServiceEventLog eventLogService,
ServiceConfiguration configuration, ServiceConfiguration configuration,
ProcessService processService, ProcessSpawnerService processSpawnerService,
HikariDataSource dataSource) { HikariDataSource dataSource) {
super(gson); super(gson);
this.node = configuration.node(); this.node = configuration.node();
this.eventLogService = eventLogService; this.eventLogService = eventLogService;
this.processService = processService; this.processSpawnerService = processSpawnerService;
this.dataSource = dataSource; this.dataSource = dataSource;
} }
@@ -208,8 +208,8 @@ public class ProcessLivenessMonitorActor extends RecordActorPrototype {
public boolean isRunning() { public boolean isRunning() {
return "RUNNING".equals(status); return "RUNNING".equals(status);
} }
public ProcessService.ProcessId getProcessId() { public ProcessSpawnerService.ProcessId getProcessId() {
return ProcessService.translateExternalIdBase(processBase); return ProcessSpawnerService.translateExternalIdBase(processBase);
} }
} }

View File

@@ -47,6 +47,8 @@ public class ScrapeFeedsActor extends RecordActorPrototype {
private final Path feedPath = WmsaHome.getHomePath().resolve("data/scrape-urls.txt"); private final Path feedPath = WmsaHome.getHomePath().resolve("data/scrape-urls.txt");
private static boolean insertFoundDomains = Boolean.getBoolean("loader.insertFoundDomains");
public record Initial() implements ActorStep {} public record Initial() implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RETRY) @Resume(behavior = ActorResumeBehavior.RETRY)
public record Wait(String ts) implements ActorStep {} public record Wait(String ts) implements ActorStep {}
@@ -57,6 +59,8 @@ public class ScrapeFeedsActor extends RecordActorPrototype {
public ActorStep transition(ActorStep self) throws Exception { public ActorStep transition(ActorStep self) throws Exception {
return switch(self) { return switch(self) {
case Initial() -> { case Initial() -> {
if (!insertFoundDomains) yield new Error("Domain insertion prohibited, aborting");
if (nodeConfigurationService.get(nodeId).profile() != NodeProfile.REALTIME) { if (nodeConfigurationService.get(nodeId).profile() != NodeProfile.REALTIME) {
yield new Error("Invalid node profile for RSS update"); yield new Error("Invalid node profile for RSS update");
} }

View File

@@ -3,11 +3,11 @@ package nu.marginalia.actor.task;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.state.ActorControlFlowException; import nu.marginalia.actor.state.ActorControlFlowException;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.process.ProcessSpawnerService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -20,13 +20,13 @@ public class ActorProcessWatcher {
private static final Logger logger = LoggerFactory.getLogger(ActorProcessWatcher.class); private static final Logger logger = LoggerFactory.getLogger(ActorProcessWatcher.class);
private final MqPersistence persistence; private final MqPersistence persistence;
private final ProcessService processService; private final ProcessSpawnerService processSpawnerService;
@Inject @Inject
public ActorProcessWatcher(MqPersistence persistence, public ActorProcessWatcher(MqPersistence persistence,
ProcessService processService) { ProcessSpawnerService processSpawnerService) {
this.persistence = persistence; this.persistence = persistence;
this.processService = processService; this.processSpawnerService = processSpawnerService;
} }
/** Wait for a process to start, and then wait for a response from the process, /** Wait for a process to start, and then wait for a response from the process,
@@ -36,7 +36,7 @@ public class ActorProcessWatcher {
* <p> * <p>
* When interrupted, the process is killed and the message is marked as dead. * When interrupted, the process is killed and the message is marked as dead.
*/ */
public MqMessage waitResponse(MqOutbox outbox, ProcessService.ProcessId processId, long msgId) public MqMessage waitResponse(MqOutbox outbox, ProcessSpawnerService.ProcessId processId, long msgId)
throws ActorControlFlowException, InterruptedException, SQLException throws ActorControlFlowException, InterruptedException, SQLException
{ {
// enums values only have a single instance, // enums values only have a single instance,
@@ -65,7 +65,7 @@ public class ActorProcessWatcher {
// This will prevent the monitor process from attempting to respawn the process as we kill it // This will prevent the monitor process from attempting to respawn the process as we kill it
outbox.flagAsDead(msgId); outbox.flagAsDead(msgId);
processService.kill(processId); processSpawnerService.kill(processId);
logger.info("Process {} killed due to interrupt", processId); logger.info("Process {} killed due to interrupt", processId);
} }
@@ -94,12 +94,12 @@ public class ActorProcessWatcher {
} }
/** Wait the specified time for the specified process to start running (does not start the process) */ /** Wait the specified time for the specified process to start running (does not start the process) */
private boolean waitForProcess(ProcessService.ProcessId processId, TimeUnit unit, int duration) throws InterruptedException { private boolean waitForProcess(ProcessSpawnerService.ProcessId processId, TimeUnit unit, int duration) throws InterruptedException {
// Wait for process to start // Wait for process to start
long deadline = System.currentTimeMillis() + unit.toMillis(duration); long deadline = System.currentTimeMillis() + unit.toMillis(duration);
while (System.currentTimeMillis() < deadline) { while (System.currentTimeMillis() < deadline) {
if (processService.isRunning(processId)) if (processSpawnerService.isRunning(processId))
return true; return true;
TimeUnit.MILLISECONDS.sleep(100); TimeUnit.MILLISECONDS.sleep(100);

View File

@@ -12,7 +12,7 @@ import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.converting.ConvertRequest; import nu.marginalia.mqapi.converting.ConvertRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.sideload.RedditSideloadHelper; import nu.marginalia.sideload.RedditSideloadHelper;
import nu.marginalia.sideload.SideloadHelper; import nu.marginalia.sideload.SideloadHelper;
import nu.marginalia.sideload.StackExchangeSideloadHelper; import nu.marginalia.sideload.StackExchangeSideloadHelper;
@@ -218,7 +218,7 @@ public class ConvertActor extends RecordActorPrototype {
); );
} }
case ConvertWait(FileStorageId destFid, long msgId) -> { case ConvertWait(FileStorageId destFid, long msgId) -> {
var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessService.ProcessId.CONVERTER, msgId); var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessSpawnerService.ProcessId.CONVERTER, msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
yield new Error("Converter failed"); yield new Error("Converter failed");

View File

@@ -18,7 +18,7 @@ import nu.marginalia.mqapi.index.IndexName;
import nu.marginalia.mqapi.loading.LoadRequest; import nu.marginalia.mqapi.loading.LoadRequest;
import nu.marginalia.nodecfg.NodeConfigurationService; import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageId;
@@ -95,7 +95,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
case Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) when msgId < 0 -> case Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) when msgId < 0 ->
new Convert(crawlId, processedId, mqConverterOutbox.sendAsync(ConvertRequest.forCrawlData(crawlId, processedId))); new Convert(crawlId, processedId, mqConverterOutbox.sendAsync(ConvertRequest.forCrawlData(crawlId, processedId)));
case Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) -> { case Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) -> {
var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessService.ProcessId.CONVERTER, msgId); var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessSpawnerService.ProcessId.CONVERTER, msgId);
if (rsp.state() != MqMessageState.OK) if (rsp.state() != MqMessageState.OK)
yield new Error("Converter failed"); yield new Error("Converter failed");
@@ -129,7 +129,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
yield new Load(processedIds, id); yield new Load(processedIds, id);
} }
case Load(List<FileStorageId> processedIds, long msgId) -> { case Load(List<FileStorageId> processedIds, long msgId) -> {
var rsp = processWatcher.waitResponse(mqLoaderOutbox, ProcessService.ProcessId.LOADER, msgId); var rsp = processWatcher.waitResponse(mqLoaderOutbox, ProcessSpawnerService.ProcessId.LOADER, msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
yield new Error("Loader failed"); yield new Error("Loader failed");
@@ -165,7 +165,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
} }
case ReindexFwd(long id) when id < 0 -> new ReindexFwd(createIndex(IndexName.FORWARD)); case ReindexFwd(long id) when id < 0 -> new ReindexFwd(createIndex(IndexName.FORWARD));
case ReindexFwd(long id) -> { case ReindexFwd(long id) -> {
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessService.ProcessId.INDEX_CONSTRUCTOR, id); var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessSpawnerService.ProcessId.INDEX_CONSTRUCTOR, id);
if (rsp.state() != MqMessageState.OK) if (rsp.state() != MqMessageState.OK)
yield new Error("Forward index construction failed"); yield new Error("Forward index construction failed");
@@ -174,7 +174,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
} }
case ReindexFull(long id) when id < 0 -> new ReindexFull(createIndex(IndexName.REVERSE_FULL)); case ReindexFull(long id) when id < 0 -> new ReindexFull(createIndex(IndexName.REVERSE_FULL));
case ReindexFull(long id) -> { case ReindexFull(long id) -> {
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessService.ProcessId.INDEX_CONSTRUCTOR, id); var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessSpawnerService.ProcessId.INDEX_CONSTRUCTOR, id);
if (rsp.state() != MqMessageState.OK) if (rsp.state() != MqMessageState.OK)
yield new Error("Full index construction failed"); yield new Error("Full index construction failed");
@@ -183,7 +183,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
} }
case ReindexPrio(long id) when id < 0 -> new ReindexPrio(createIndex(IndexName.REVERSE_PRIO)); case ReindexPrio(long id) when id < 0 -> new ReindexPrio(createIndex(IndexName.REVERSE_PRIO));
case ReindexPrio(long id) -> { case ReindexPrio(long id) -> {
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessService.ProcessId.INDEX_CONSTRUCTOR, id); var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessSpawnerService.ProcessId.INDEX_CONSTRUCTOR, id);
if (rsp.state() != MqMessageState.OK) if (rsp.state() != MqMessageState.OK)
yield new Error("Prio index construction failed"); yield new Error("Prio index construction failed");

View File

@@ -13,7 +13,7 @@ import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.crawling.CrawlRequest; import nu.marginalia.mqapi.crawling.CrawlRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageType; import nu.marginalia.storage.model.FileStorageType;
@@ -76,7 +76,7 @@ public class CrawlActor extends RecordActorPrototype {
case Crawl (long msgId, FileStorageId fid, boolean cascadeLoad) -> { case Crawl (long msgId, FileStorageId fid, boolean cascadeLoad) -> {
var rsp = processWatcher.waitResponse( var rsp = processWatcher.waitResponse(
mqCrawlerOutbox, mqCrawlerOutbox,
ProcessService.ProcessId.CRAWLER, ProcessSpawnerService.ProcessId.CRAWLER,
msgId); msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {

View File

@@ -10,7 +10,7 @@ import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.tasks.ExportTaskRequest; import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageState; import nu.marginalia.storage.model.FileStorageState;
@@ -55,7 +55,7 @@ public class ExportAtagsActor extends RecordActorPrototype {
yield new Run(responseMsgId, crawlId, destId, newMsgId); yield new Run(responseMsgId, crawlId, destId, newMsgId);
} }
case Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId, long msgId) -> { case Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
storageService.flagFileForDeletion(destId); storageService.flagFileForDeletion(destId);

View File

@@ -10,7 +10,7 @@ import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.tasks.ExportTaskRequest; import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageState; import nu.marginalia.storage.model.FileStorageState;
@@ -54,7 +54,7 @@ public class ExportFeedsActor extends RecordActorPrototype {
yield new Run(responseMsgId, crawlId, destId, newMsgId); yield new Run(responseMsgId, crawlId, destId, newMsgId);
} }
case Run(long responseMsgId, _, FileStorageId destId, long msgId) -> { case Run(long responseMsgId, _, FileStorageId destId, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
storageService.flagFileForDeletion(destId); storageService.flagFileForDeletion(destId);

View File

@@ -9,7 +9,7 @@ import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.tasks.ExportTaskRequest; import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageState; import nu.marginalia.storage.model.FileStorageState;
@@ -52,7 +52,7 @@ public class ExportSampleDataActor extends RecordActorPrototype {
yield new Run(crawlId, destId, size, ctFilter, name, newMsgId); yield new Run(crawlId, destId, size, ctFilter, name, newMsgId);
} }
case Run(_, FileStorageId destId, _, _, _, long msgId) -> { case Run(_, FileStorageId destId, _, _, _, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
storageService.flagFileForDeletion(destId); storageService.flagFileForDeletion(destId);

View File

@@ -10,7 +10,7 @@ import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.tasks.ExportTaskRequest; import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageState; import nu.marginalia.storage.model.FileStorageState;
@@ -52,7 +52,7 @@ public class ExportTermFreqActor extends RecordActorPrototype {
yield new Run(responseMsgId, crawlId, destId, newMsgId); yield new Run(responseMsgId, crawlId, destId, newMsgId);
} }
case Run(long responseMsgId, _, FileStorageId destId, long msgId) -> { case Run(long responseMsgId, _, FileStorageId destId, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
storageService.flagFileForDeletion(destId); storageService.flagFileForDeletion(destId);

View File

@@ -13,7 +13,7 @@ import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.crawling.LiveCrawlRequest; import nu.marginalia.mqapi.crawling.LiveCrawlRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -44,7 +44,6 @@ public class LiveCrawlActor extends RecordActorPrototype {
@Override @Override
public ActorStep transition(ActorStep self) throws Exception { public ActorStep transition(ActorStep self) throws Exception {
logger.info("{}", self);
return switch (self) { return switch (self) {
case Initial() -> { case Initial() -> {
yield new Monitor("-"); yield new Monitor("-");
@@ -75,7 +74,7 @@ public class LiveCrawlActor extends RecordActorPrototype {
yield new LiveCrawl(feedsHash, id); yield new LiveCrawl(feedsHash, id);
} }
case LiveCrawl(String feedsHash, long msgId) -> { case LiveCrawl(String feedsHash, long msgId) -> {
var rsp = processWatcher.waitResponse(mqLiveCrawlerOutbox, ProcessService.ProcessId.LIVE_CRAWLER, msgId); var rsp = processWatcher.waitResponse(mqLiveCrawlerOutbox, ProcessSpawnerService.ProcessId.LIVE_CRAWLER, msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
yield new Error("Crawler failed"); yield new Error("Crawler failed");

View File

@@ -11,7 +11,7 @@ import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.crawling.CrawlRequest; import nu.marginalia.mqapi.crawling.CrawlRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageType; import nu.marginalia.storage.model.FileStorageType;
@@ -51,7 +51,7 @@ public class RecrawlSingleDomainActor extends RecordActorPrototype {
case Crawl (long msgId) -> { case Crawl (long msgId) -> {
var rsp = processWatcher.waitResponse( var rsp = processWatcher.waitResponse(
mqCrawlerOutbox, mqCrawlerOutbox,
ProcessService.ProcessId.CRAWLER, ProcessSpawnerService.ProcessId.CRAWLER,
msgId); msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {

View File

@@ -9,7 +9,7 @@ import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.tasks.ExportTaskRequest; import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessSpawnerService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -34,7 +34,7 @@ public class TriggerAdjacencyCalculationActor extends RecordActorPrototype {
yield new Run(newMsgId); yield new Run(newMsgId);
} }
case Run(long msgId) -> { case Run(long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
yield new Error("Exporter failed"); yield new Error("Exporter failed");

View File

@@ -5,6 +5,8 @@ import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep; import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.nsfw.NsfwDomainFilter; import nu.marginalia.nsfw.NsfwDomainFilter;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@@ -12,23 +14,26 @@ import nu.marginalia.service.module.ServiceConfiguration;
public class UpdateNsfwFiltersActor extends RecordActorPrototype { public class UpdateNsfwFiltersActor extends RecordActorPrototype {
private final ServiceConfiguration serviceConfiguration; private final ServiceConfiguration serviceConfiguration;
private final NsfwDomainFilter nsfwDomainFilter; private final NsfwDomainFilter nsfwDomainFilter;
private final MqPersistence persistence;
public record Initial() implements ActorStep {} public record Initial(long respondMsgId) implements ActorStep {}
public record Run() implements ActorStep {} public record Run(long respondMsgId) implements ActorStep {}
@Override @Override
public ActorStep transition(ActorStep self) throws Exception { public ActorStep transition(ActorStep self) throws Exception {
return switch(self) { return switch(self) {
case Initial() -> { case Initial(long respondMsgId) -> {
if (serviceConfiguration.node() != 1) { if (serviceConfiguration.node() != 1) {
persistence.updateMessageState(respondMsgId, MqMessageState.ERR);
yield new Error("This actor can only run on node 1"); yield new Error("This actor can only run on node 1");
} }
else { else {
yield new Run(); yield new Run(respondMsgId);
} }
} }
case Run() -> { case Run(long respondMsgId) -> {
nsfwDomainFilter.fetchLists(); nsfwDomainFilter.fetchLists();
persistence.updateMessageState(respondMsgId, MqMessageState.OK);
yield new End(); yield new End();
} }
default -> new Error(); default -> new Error();
@@ -43,11 +48,13 @@ public class UpdateNsfwFiltersActor extends RecordActorPrototype {
@Inject @Inject
public UpdateNsfwFiltersActor(Gson gson, public UpdateNsfwFiltersActor(Gson gson,
ServiceConfiguration serviceConfiguration, ServiceConfiguration serviceConfiguration,
NsfwDomainFilter nsfwDomainFilter) NsfwDomainFilter nsfwDomainFilter,
MqPersistence persistence)
{ {
super(gson); super(gson);
this.serviceConfiguration = serviceConfiguration; this.serviceConfiguration = serviceConfiguration;
this.nsfwDomainFilter = nsfwDomainFilter; this.nsfwDomainFilter = nsfwDomainFilter;
this.persistence = persistence;
} }
} }

View File

@@ -10,6 +10,7 @@ import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.actor.task.DownloadSampleActor; import nu.marginalia.actor.task.DownloadSampleActor;
import nu.marginalia.actor.task.RestoreBackupActor; import nu.marginalia.actor.task.RestoreBackupActor;
import nu.marginalia.actor.task.TriggerAdjacencyCalculationActor; import nu.marginalia.actor.task.TriggerAdjacencyCalculationActor;
import nu.marginalia.actor.task.UpdateNsfwFiltersActor;
import nu.marginalia.functions.execution.api.*; import nu.marginalia.functions.execution.api.*;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.service.server.DiscoverableService; import nu.marginalia.service.server.DiscoverableService;
@@ -263,4 +264,19 @@ public class ExecutorGrpcService
System.exit(0); System.exit(0);
} }
@Override
public void updateNsfwFilters(RpcUpdateNsfwFilters request, StreamObserver<Empty> responseObserver) {
logger.info("Got request {}", request);
try {
actorControlService.startFrom(ExecutorActor.UPDATE_NSFW_LISTS,
new UpdateNsfwFiltersActor.Initial(request.getMsgId()));
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
logger.error("Failed to update nsfw filters", e);
responseObserver.onError(e);
}
}
} }

View File

@@ -29,7 +29,7 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@Singleton @Singleton
public class ProcessService { public class ProcessSpawnerService {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
private final Marker processMarker = MarkerFactory.getMarker("PROCESS"); private final Marker processMarker = MarkerFactory.getMarker("PROCESS");
@@ -88,7 +88,7 @@ public class ProcessService {
} }
@Inject @Inject
public ProcessService(BaseServiceParams params) { public ProcessSpawnerService(BaseServiceParams params) {
this.eventLog = params.eventLog; this.eventLog = params.eventLog;
this.node = params.configuration.node(); this.node = params.configuration.node();
} }

View File

@@ -19,6 +19,8 @@ import nu.marginalia.model.crawldata.CrawlerDocumentStatus;
import nu.marginalia.model.idx.WordFlags; import nu.marginalia.model.idx.WordFlags;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
@@ -37,6 +39,7 @@ public class DocumentProcessor {
"text/plain", "text/plain",
"application/pdf"); "application/pdf");
private final Marker converterAuditMarker = MarkerFactory.getMarker("CONVERTER");
private final List<AbstractDocumentProcessorPlugin> processorPlugins = new ArrayList<>(); private final List<AbstractDocumentProcessorPlugin> processorPlugins = new ArrayList<>();
private final AnchorTextKeywords anchorTextKeywords; private final AnchorTextKeywords anchorTextKeywords;
@@ -81,12 +84,13 @@ public class DocumentProcessor {
catch (DisqualifiedException ex) { catch (DisqualifiedException ex) {
ret.state = UrlIndexingState.DISQUALIFIED; ret.state = UrlIndexingState.DISQUALIFIED;
ret.stateReason = ex.reason.toString(); ret.stateReason = ex.reason.toString();
logger.debug("Disqualified {}: {}", ret.url, ex.reason); logger.info(converterAuditMarker, "Disqualified {}: {}", ret.url, ex.reason);
} }
catch (Exception ex) { catch (Exception ex) {
ret.state = UrlIndexingState.DISQUALIFIED; ret.state = UrlIndexingState.DISQUALIFIED;
ret.stateReason = DisqualifiedException.DisqualificationReason.PROCESSING_EXCEPTION.toString(); ret.stateReason = DisqualifiedException.DisqualificationReason.PROCESSING_EXCEPTION.toString();
logger.info("Failed to convert " + crawledDocument.url, ex); logger.info(converterAuditMarker, "Failed to convert {}: {}", crawledDocument.url, ex.getClass().getSimpleName());
logger.warn(converterAuditMarker, "Failed to convert " + crawledDocument.url, ex);
} }
return ret; return ret;

View File

@@ -3,7 +3,6 @@ package nu.marginalia.converting.processor.logic;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import com.google.inject.name.Named; import com.google.inject.name.Named;
import nu.marginalia.converting.model.DisqualifiedException;
import nu.marginalia.language.model.DocumentLanguageData; import nu.marginalia.language.model.DocumentLanguageData;
@Singleton @Singleton
@@ -26,12 +25,9 @@ public class DocumentLengthLogic {
return (int) Math.round((totalWords / (double) numSentences) / 4.); return (int) Math.round((totalWords / (double) numSentences) / 4.);
} }
public void validateLength(DocumentLanguageData dld, public boolean validateLength(DocumentLanguageData dld, double modifier)
double modifier) throws DisqualifiedException
{ {
if (modifier * dld.totalNumWords() < minDocumentLength) { return modifier * dld.totalNumWords() >= minDocumentLength;
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LENGTH);
}
} }
} }

View File

@@ -68,6 +68,7 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
private final HtmlProcessorSpecializations htmlProcessorSpecializations; private final HtmlProcessorSpecializations htmlProcessorSpecializations;
private static final int MAX_DOCUMENT_LENGTH_BYTES = Integer.getInteger("converter.max-body-length",128_000); private static final int MAX_DOCUMENT_LENGTH_BYTES = Integer.getInteger("converter.max-body-length",128_000);
private static boolean lenientProcessing = Boolean.getBoolean("converter.lenientProcessing");
@Inject @Inject
public HtmlDocumentProcessorPlugin( public HtmlDocumentProcessorPlugin(
@@ -108,13 +109,13 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
DocumentClass documentClass) DocumentClass documentClass)
throws DisqualifiedException, URISyntaxException, IOException { throws DisqualifiedException, URISyntaxException, IOException {
if (languageFilter.isBlockedUnicodeRange(crawledDocument.documentBody(512))) { if (!lenientProcessing && languageFilter.isBlockedUnicodeRange(crawledDocument.documentBody(512))) {
throw new DisqualifiedException(DisqualificationReason.LANGUAGE); throw new DisqualifiedException(DisqualificationReason.LANGUAGE);
} }
Document doc = crawledDocument.parseBody(); Document doc = crawledDocument.parseBody();
if (AcceptableAds.hasAcceptableAdsTag(doc)) { if (!lenientProcessing && AcceptableAds.hasAcceptableAdsTag(doc)) {
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.ACCEPTABLE_ADS); throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.ACCEPTABLE_ADS);
} }
@@ -129,25 +130,27 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
final var specialization = htmlProcessorSpecializations.select(generatorParts, url); final var specialization = htmlProcessorSpecializations.select(generatorParts, url);
if (!specialization.shouldIndex(url)) { if (!lenientProcessing && !specialization.shouldIndex(url)) {
throw new DisqualifiedException(DisqualificationReason.IRRELEVANT); throw new DisqualifiedException(DisqualificationReason.IRRELEVANT);
} }
var prunedDoc = specialization.prune(doc); var prunedDoc = specialization.prune(doc);
final int length = getLength(doc); final int length = getLength(doc);
final DocumentFormat format = getDocumentFormat(doc); final DocumentFormat format = getDocumentFormat(doc);
final double quality = documentValuator.getQuality(crawledDocument, format, doc, length); final double quality = documentValuator.getQuality(crawledDocument, format, doc, length);
if (isDisqualified(documentClass, url, quality, doc.title())) { if (!lenientProcessing && isDisqualified(documentClass, url, quality, doc.title())) {
throw new DisqualifiedException(DisqualificationReason.QUALITY); throw new DisqualifiedException(DisqualificationReason.QUALITY);
} }
DocumentLanguageData dld = sentenceExtractorProvider.get().extractSentences(prunedDoc); DocumentLanguageData dld = sentenceExtractorProvider.get().extractSentences(prunedDoc);
checkDocumentLanguage(dld); checkDocumentLanguage(dld);
documentLengthLogic.validateLength(dld, specialization.lengthModifier() * documentClass.lengthLimitModifier());
if (!lenientProcessing && !documentLengthLogic.validateLength(dld, specialization.lengthModifier() * documentClass.lengthLimitModifier())) {
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LENGTH);
}
var ret = new ProcessedDocumentDetails(); var ret = new ProcessedDocumentDetails();

View File

@@ -43,6 +43,7 @@ public class PdfDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
private final DefaultSpecialization defaultSpecialization; private final DefaultSpecialization defaultSpecialization;
private static final Logger logger = LoggerFactory.getLogger(PdfDocumentProcessorPlugin.class); private static final Logger logger = LoggerFactory.getLogger(PdfDocumentProcessorPlugin.class);
private static boolean lenientProcessing = Boolean.getBoolean("converter.lenientProcessing");
@Inject @Inject
public PdfDocumentProcessorPlugin(@Named("max-title-length") Integer maxTitleLength, public PdfDocumentProcessorPlugin(@Named("max-title-length") Integer maxTitleLength,
@@ -81,7 +82,7 @@ public class PdfDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
String documentBody = crawledDocument.documentBody(); String documentBody = crawledDocument.documentBody();
if (languageFilter.isBlockedUnicodeRange(documentBody)) { if (!lenientProcessing && languageFilter.isBlockedUnicodeRange(documentBody)) {
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LANGUAGE); throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LANGUAGE);
} }
@@ -100,7 +101,9 @@ public class PdfDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
checkDocumentLanguage(dld); checkDocumentLanguage(dld);
documentLengthLogic.validateLength(dld, 1.0); if (!lenientProcessing && !documentLengthLogic.validateLength(dld, 1.0)) {
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LENGTH);
}
var ret = new ProcessedDocumentDetails(); var ret = new ProcessedDocumentDetails();

View File

@@ -37,6 +37,8 @@ public class PlainTextDocumentProcessorPlugin extends AbstractDocumentProcessorP
private final ThreadLocalSentenceExtractorProvider sentenceExtractorProvider; private final ThreadLocalSentenceExtractorProvider sentenceExtractorProvider;
private final DocumentLengthLogic documentLengthLogic; private final DocumentLengthLogic documentLengthLogic;
private static boolean lenientProcessing = Boolean.getBoolean("converter.lenientProcessing");
@Inject @Inject
public PlainTextDocumentProcessorPlugin(@Named("max-title-length") Integer maxTitleLength, public PlainTextDocumentProcessorPlugin(@Named("max-title-length") Integer maxTitleLength,
@@ -73,7 +75,7 @@ public class PlainTextDocumentProcessorPlugin extends AbstractDocumentProcessorP
String documentBody = crawledDocument.documentBody(); String documentBody = crawledDocument.documentBody();
if (languageFilter.isBlockedUnicodeRange(documentBody)) { if (!lenientProcessing && languageFilter.isBlockedUnicodeRange(documentBody)) {
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LANGUAGE); throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LANGUAGE);
} }
@@ -83,7 +85,9 @@ public class PlainTextDocumentProcessorPlugin extends AbstractDocumentProcessorP
checkDocumentLanguage(dld); checkDocumentLanguage(dld);
documentLengthLogic.validateLength(dld, 1.0); if (!lenientProcessing && !documentLengthLogic.validateLength(dld, 1.0)) {
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LENGTH);
}
var ret = new ProcessedDocumentDetails(); var ret = new ProcessedDocumentDetails();

View File

@@ -28,6 +28,8 @@ public final class CrawledDocument implements SerializableCrawlData {
@Nullable @Nullable
public String headers; public String headers;
private static int MAX_LENGTH_BYTES = 500_000;
public String documentBody() { public String documentBody() {
return DocumentBodyToString.getStringData( return DocumentBodyToString.getStringData(
ContentType.parse(contentType), ContentType.parse(contentType),
@@ -65,7 +67,7 @@ public final class CrawledDocument implements SerializableCrawlData {
return DocumentBodyToString.getParsedData( return DocumentBodyToString.getParsedData(
ContentType.parse(contentType), ContentType.parse(contentType),
documentBodyBytes, documentBodyBytes,
200_000, MAX_LENGTH_BYTES,
url); url);
} }

View File

@@ -40,6 +40,8 @@ public class LoaderMain extends ProcessMainClass {
private final KeywordLoaderService keywordLoaderService; private final KeywordLoaderService keywordLoaderService;
private final DocumentLoaderService documentLoaderService; private final DocumentLoaderService documentLoaderService;
private static boolean insertFoundDomains = Boolean.getBoolean("loader.insertFoundDomains");
public static void main(String... args) { public static void main(String... args) {
try { try {
new org.mariadb.jdbc.Driver(); new org.mariadb.jdbc.Driver();
@@ -99,14 +101,29 @@ public class LoaderMain extends ProcessMainClass {
try { try {
var results = ForkJoinPool.commonPool() var results = ForkJoinPool.commonPool()
.invokeAll( .invokeAll(List.of());
List.of(
() -> linksService.loadLinks(domainIdRegistry, heartbeat, inputData), if ( true == insertFoundDomains ) {
() -> keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, inputData), results = ForkJoinPool.commonPool()
() -> documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, inputData), .invokeAll(
() -> domainService.loadDomainMetadata(domainIdRegistry, heartbeat, inputData) List.of(
) () -> linksService.loadLinks(domainIdRegistry, heartbeat, inputData),
); () -> keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, inputData),
() -> documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, inputData),
() -> domainService.loadDomainMetadata(domainIdRegistry, heartbeat, inputData)
)
);
}
else {
results = ForkJoinPool.commonPool()
.invokeAll(
List.of(
() -> keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, inputData),
() -> documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, inputData),
() -> domainService.loadDomainMetadata(domainIdRegistry, heartbeat, inputData)
)
);
}
for (var result : results) { for (var result : results) {
if (result.state() == Future.State.FAILED) { if (result.state() == Future.State.FAILED) {

View File

@@ -25,6 +25,8 @@ import java.util.Set;
@Singleton @Singleton
public class DomainLoaderService { public class DomainLoaderService {
private static boolean insertFoundDomains = Boolean.getBoolean("loader.insertFoundDomains");
private final HikariDataSource dataSource; private final HikariDataSource dataSource;
private final Logger logger = LoggerFactory.getLogger(DomainLoaderService.class); private final Logger logger = LoggerFactory.getLogger(DomainLoaderService.class);
private final int nodeId; private final int nodeId;
@@ -84,25 +86,34 @@ public class DomainLoaderService {
// Add domains that are linked to from the domains we've just crawled, but with -1 affinity meaning they // Add domains that are linked to from the domains we've just crawled, but with -1 affinity meaning they
// can be grabbed by any index node // can be grabbed by any index node
try (var inserter = new DomainInserter(conn, -1); if ( true == insertFoundDomains ) {
var processHeartbeat = heartbeat.createAdHocTaskHeartbeat("INSERT_LINKED_DOMAINS")) { logger.info("Adding found domains");
// Add linked domains, but with -1 affinity meaning they can be grabbed by any index node
int pageIdx = 0;
for (SlopTable.Ref<SlopDomainLinkRecord> page : inputData.listDomainLinkPages()) { try (var inserter = new DomainInserter(conn, -1);
processHeartbeat.progress("INSERT", pageIdx++, domainLinkPageRefs.size()); var processHeartbeat = heartbeat.createAdHocTaskHeartbeat("INSERT_LINKED_DOMAINS")) {
// Add linked domains, but with -1 affinity meaning they can be grabbed by any index node
int pageIdx = 0;
try (var reader = new SlopDomainLinkRecord.Reader(page)) { for (SlopTable.Ref<SlopDomainLinkRecord> page : inputData.listDomainLinkPages()) {
while (reader.hasMore()) { processHeartbeat.progress("INSERT", pageIdx++, domainLinkPageRefs.size());
SlopDomainLinkRecord record = reader.next();
String domainName = record.dest(); try (var reader = new SlopDomainLinkRecord.Reader(page)) {
if (domainNamesAll.add(domainName)) { while (reader.hasMore()) {
inserter.accept(new EdgeDomain(domainName)); SlopDomainLinkRecord record = reader.next();
String domainName = record.dest();
if (domainNamesAll.add(domainName)) {
inserter.accept(new EdgeDomain(domainName));
}
} }
} }
} }
} }
} }
else {
logger.info("Skipping found domains");
}
taskHeartbeat.progress(Steps.UPDATE_AFFINITY_AND_IP); taskHeartbeat.progress(Steps.UPDATE_AFFINITY_AND_IP);

View File

@@ -61,7 +61,7 @@ public class BackoffStrategy {
}; };
double backoffMinutes = baseInterval.toMinutes() double backoffMinutes = baseInterval.toMinutes()
* Math.pow(multiplier, backoffConsecutiveFailures - 1); * Math.pow(multiplier, Math.clamp(backoffConsecutiveFailures, 1, 10));
Duration newDuration = Duration.ofMinutes(Math.round(0.5+backoffMinutes)); Duration newDuration = Duration.ofMinutes(Math.round(0.5+backoffMinutes));
if (newDuration.compareTo(maxInterval) > 0) { if (newDuration.compareTo(maxInterval) > 0) {

View File

@@ -30,10 +30,11 @@ public class ApiSearchOperator {
public ApiSearchResults query(String query, public ApiSearchResults query(String query,
int count, int count,
int domainCount,
int index, int index,
NsfwFilterTier filterTier) NsfwFilterTier filterTier)
{ {
var rsp = queryClient.search(createParams(query, count, index, filterTier)); var rsp = queryClient.search(createParams(query, count, domainCount, index, filterTier));
return new ApiSearchResults("RESTRICTED", query, return new ApiSearchResults("RESTRICTED", query,
rsp.results() rsp.results()
@@ -44,13 +45,13 @@ public class ApiSearchOperator {
.collect(Collectors.toList())); .collect(Collectors.toList()));
} }
private QueryParams createParams(String query, int count, int index, NsfwFilterTier filterTirer) { private QueryParams createParams(String query, int count, int domainCount, int index, NsfwFilterTier filterTirer) {
SearchSetIdentifier searchSet = selectSearchSet(index); SearchSetIdentifier searchSet = selectSearchSet(index);
return new QueryParams( return new QueryParams(
query, query,
RpcQueryLimits.newBuilder() RpcQueryLimits.newBuilder()
.setResultsByDomain(2) .setResultsByDomain(Math.clamp(domainCount, 1, 100))
.setResultsTotal(Math.min(100, count)) .setResultsTotal(Math.min(100, count))
.setTimeoutMs(150) .setTimeoutMs(150)
.setFetchSize(8192) .setFetchSize(8192)

View File

@@ -119,6 +119,7 @@ public class ApiService extends SparkService {
} }
int count = intParam(request, "count", 20); int count = intParam(request, "count", 20);
int domainCount = intParam(request, "dc", 2);
int index = intParam(request, "index", 3); int index = intParam(request, "index", 3);
int nsfw = intParam(request, "nsfw", 1); int nsfw = intParam(request, "nsfw", 1);
@@ -137,7 +138,7 @@ public class ApiService extends SparkService {
.labels(license.key) .labels(license.key)
.time(() -> .time(() ->
searchOperator searchOperator
.query(query, count, index, nsfwFilterTier) .query(query, count, domainCount, index, nsfwFilterTier)
.withLicense(license.getLicense()) .withLicense(license.getLicense())
); );
} }

View File

@@ -20,7 +20,7 @@ public class BangCommand implements SearchCommandInterface {
{ {
bangsToPattern.put("!g", "https://www.google.com/search?q=%s"); bangsToPattern.put("!g", "https://www.google.com/search?q=%s");
bangsToPattern.put("!ddg", "https://duckduckgo.com/?q=%s"); bangsToPattern.put("!ddg", "https://duckduckgo.com/?q=%s");
bangsToPattern.put("!w", "https://search.marginalia.nu/search?query=%s+site:en.wikipedia.org&profile=wiki"); bangsToPattern.put("!w", "https://old-search.marginalia.nu/search?query=%s+site:en.wikipedia.org&profile=wiki");
} }
@Override @Override

View File

@@ -20,7 +20,7 @@ public class BangCommand implements SearchCommandInterface {
{ {
bangsToPattern.put("!g", "https://www.google.com/search?q=%s"); bangsToPattern.put("!g", "https://www.google.com/search?q=%s");
bangsToPattern.put("!ddg", "https://duckduckgo.com/?q=%s"); bangsToPattern.put("!ddg", "https://duckduckgo.com/?q=%s");
bangsToPattern.put("!w", "https://search.marginalia.nu/search?query=%s+site:en.wikipedia.org&profile=wiki"); bangsToPattern.put("!w", "/search?query=%s+site:en.wikipedia.org");
} }
@Override @Override
@@ -34,7 +34,7 @@ public class BangCommand implements SearchCommandInterface {
if (match.isPresent()) { if (match.isPresent()) {
var url = String.format(redirectPattern, URLEncoder.encode(match.get(), StandardCharsets.UTF_8)); var url = String.format(redirectPattern, URLEncoder.encode(match.get(), StandardCharsets.UTF_8));
new MapModelAndView("redirect.jte", Map.of("url", url)); return Optional.of(new MapModelAndView("redirect.jte", Map.of("url", url)));
} }
} }

View File

@@ -33,19 +33,19 @@
title="This domain is blacklisted and will not be crawled or indexed"> title="This domain is blacklisted and will not be crawled or indexed">
Blacklisted Blacklisted
</span> </span>
@elseif (siteInfo.domainInformation().getNodeAffinity() == 0)
<span
class="bg-blue-50 text-blue-900 border-blue-200 dark:bg-black dark:text-blue-100 border p-1 font-sm rounded"
title="This domain will be crawled by the search engine">
In Crawler Queue
</span>
@elseif (siteInfo.domainInformation().isUnknownDomain()) @elseif (siteInfo.domainInformation().isUnknownDomain())
<span <span
class="bg-purple-50 text-purple-900 border-purple-200 dark:bg-black dark:text-purple-100 border p-1 font-sm rounded" class="bg-purple-50 text-purple-900 border-purple-200 dark:bg-black dark:text-purple-100 border p-1 font-sm rounded"
title="The search engine is not aware of this domain name"> title="The search engine is not aware of this domain name">
Unknown Unknown
</span> </span>
@elseif (siteInfo.domainInformation().isUnknownDomain()) @elseif (siteInfo.domainInformation().getNodeAffinity() == 0)
<span
class="bg-blue-50 text-blue-900 border-blue-200 dark:bg-black dark:text-blue-100 border p-1 font-sm rounded"
title="This domain will be crawled by the search engine">
In Crawler Queue
</span>
@elseif (!siteInfo.domainInformation().isUnknownDomain())
<span <span
class="bg-yellow-50 text-yellow-900 border-yellow-200 dark:bg-black dark:text-yellow-100 border p-1 font-sm rounded" class="bg-yellow-50 text-yellow-900 border-yellow-200 dark:bg-black dark:text-yellow-100 border p-1 font-sm rounded"
title="The search engine is aware of this domain, but it's not slated for crawling"> title="The search engine is aware of this domain, but it's not slated for crawling">

View File

@@ -0,0 +1,19 @@
package nu.marginalia.search.command.commands;
import nu.marginalia.WebsiteUrl;
import nu.marginalia.search.command.SearchParameters;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
class BangCommandTest {
@Test
void testWikipediaRedirect() {
BangCommand bc = new BangCommand();
assertTrue(bc.process(SearchParameters.defaultsForQuery(new WebsiteUrl("test"), "!w plato", 1)).isPresent());
assertFalse(bc.process(SearchParameters.defaultsForQuery(new WebsiteUrl("test"), "plato", 1)).isPresent());
}
}

View File

@@ -20,6 +20,6 @@ public class StatusModule extends AbstractModule {
bind(String.class) bind(String.class)
.annotatedWith(Names.named("searchEngineTestQuery")) .annotatedWith(Names.named("searchEngineTestQuery"))
.toInstance(System.getProperty("status-service.public-query", .toInstance(System.getProperty("status-service.public-query",
"https://marginalia-search.com/search?query=plato&ref=marginalia-automatic-metrics")); "https://old-search.marginalia.nu/search?query=plato&ref=marginalia-automatic-metrics"));
} }
} }

View File

@@ -74,6 +74,8 @@ public class ControlSysActionsService {
Spark.post("/actions/recrawl-all", this::recrawlAll, Redirects.redirectToOverview); Spark.post("/actions/recrawl-all", this::recrawlAll, Redirects.redirectToOverview);
Spark.post("/actions/flush-api-caches", this::flushApiCaches, Redirects.redirectToOverview); Spark.post("/actions/flush-api-caches", this::flushApiCaches, Redirects.redirectToOverview);
Spark.post("/actions/reload-blogs-list", this::reloadBlogsList, Redirects.redirectToOverview); Spark.post("/actions/reload-blogs-list", this::reloadBlogsList, Redirects.redirectToOverview);
Spark.post("/actions/update-nsfw-filters", this::updateNsfwFilters, Redirects.redirectToOverview);
} }
catch (Exception e) { catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@@ -132,6 +134,14 @@ public class ControlSysActionsService {
return ""; return "";
} }
public Object updateNsfwFilters(Request request, Response response) throws Exception {
eventLog.logEvent("USER-ACTION", "UPDATE-NSFW-FILTERS");
executorClient.updateNsfwFilters();
return "";
}
public Object flushApiCaches(Request request, Response response) throws Exception { public Object flushApiCaches(Request request, Response response) throws Exception {
eventLog.logEvent("USER-ACTION", "FLUSH-API-CACHES"); eventLog.logEvent("USER-ACTION", "FLUSH-API-CACHES");
apiOutbox.sendNotice("FLUSH_CACHES", ""); apiOutbox.sendNotice("FLUSH_CACHES", "");

View File

@@ -53,6 +53,31 @@
</div> </div>
</div> </div>
<div class="accordion-item">
<h2 class="accordion-header">
<button class="accordion-button collapsed"
type="button"
data-bs-toggle="collapse"
data-bs-target="#collapseNsfwFilters"
aria-expanded="false"
aria-controls="collapseNsfwFilters">
Update NSFW Filters Definitions
</button>
</h2>
<div id="collapseNsfwFilters" class="accordion-collapse collapse p-3" data-bs-parent="#accordionActions">
<div class="mb-3">
This will fetch NSFW filter definitions.
</div>
<form method="post" action="actions/update-nsfw-filters">
<button
class="btn btn-primary me-md-2"
onclick="return confirm('Confirm update NSFW filters');"
type="submit">
Update NSFW Filter</button>
</form>
</div>
</div>
<div class="accordion-item"> <div class="accordion-item">
<h2 class="accordion-header"> <h2 class="accordion-header">
<button class="accordion-button collapsed" <button class="accordion-button collapsed"