mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 17:32:39 +02:00
Compare commits
11 Commits
deploy-027
...
deploy-027
Author | SHA1 | Date | |
---|---|---|---|
|
390f053406 | ||
|
b03c43224c | ||
|
9b4ce9e9eb | ||
|
81ac02a695 | ||
|
47f624fb3b | ||
|
c866f19cbb | ||
|
518278493b | ||
|
1ac0bab0b8 | ||
|
08b45ed10a | ||
|
f2cfb91973 | ||
|
2f79524eb3 |
@@ -7,6 +7,7 @@
|
|||||||
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
|
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
</Filters>
|
</Filters>
|
||||||
</Console>
|
</Console>
|
||||||
<Console name="ProcessConsole" target="SYSTEM_OUT">
|
<Console name="ProcessConsole" target="SYSTEM_OUT">
|
||||||
@@ -23,6 +24,7 @@
|
|||||||
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
<MarkerFilter marker="PROCESS" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="PROCESS" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
|
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
</Filters>
|
</Filters>
|
||||||
<SizeBasedTriggeringPolicy size="10MB" />
|
<SizeBasedTriggeringPolicy size="10MB" />
|
||||||
</RollingFile>
|
</RollingFile>
|
||||||
@@ -36,6 +38,16 @@
|
|||||||
<MarkerFilter marker="CRAWLER" onMatch="ALLOW" onMismatch="DENY" />
|
<MarkerFilter marker="CRAWLER" onMatch="ALLOW" onMismatch="DENY" />
|
||||||
</Filters>
|
</Filters>
|
||||||
</RollingFile>
|
</RollingFile>
|
||||||
|
<RollingFile name="LogToFile" fileName="${env:WMSA_LOG_DIR:-/var/log/wmsa}/converter-audit-${env:WMSA_SERVICE_NODE:-0}.log" filePattern="/var/log/wmsa/converter-audit-${env:WMSA_SERVICE_NODE:-0}-log-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz"
|
||||||
|
ignoreExceptions="false">
|
||||||
|
<PatternLayout>
|
||||||
|
<Pattern>%d{yyyy-MM-dd HH:mm:ss,SSS}: %msg{nolookups}%n</Pattern>
|
||||||
|
</PatternLayout>
|
||||||
|
<SizeBasedTriggeringPolicy size="100MB" />
|
||||||
|
<Filters>
|
||||||
|
<MarkerFilter marker="CONVERTER" onMatch="ALLOW" onMismatch="DENY" />
|
||||||
|
</Filters>
|
||||||
|
</RollingFile>
|
||||||
</Appenders>
|
</Appenders>
|
||||||
<Loggers>
|
<Loggers>
|
||||||
<Logger name="org.apache.zookeeper" level="WARN" />
|
<Logger name="org.apache.zookeeper" level="WARN" />
|
||||||
|
@@ -8,6 +8,7 @@
|
|||||||
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
|
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
</Filters>
|
</Filters>
|
||||||
</Console>
|
</Console>
|
||||||
<Console name="ConsoleWarn" target="SYSTEM_OUT">
|
<Console name="ConsoleWarn" target="SYSTEM_OUT">
|
||||||
@@ -18,6 +19,7 @@
|
|||||||
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
|
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
</Filters>
|
</Filters>
|
||||||
</Console>
|
</Console>
|
||||||
<Console name="ConsoleError" target="SYSTEM_OUT">
|
<Console name="ConsoleError" target="SYSTEM_OUT">
|
||||||
@@ -28,6 +30,7 @@
|
|||||||
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
|
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
</Filters>
|
</Filters>
|
||||||
</Console>
|
</Console>
|
||||||
<Console name="ConsoleFatal" target="SYSTEM_OUT">
|
<Console name="ConsoleFatal" target="SYSTEM_OUT">
|
||||||
@@ -38,6 +41,7 @@
|
|||||||
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
|
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
</Filters>
|
</Filters>
|
||||||
</Console>
|
</Console>
|
||||||
<Console name="ProcessConsole" target="SYSTEM_OUT">
|
<Console name="ProcessConsole" target="SYSTEM_OUT">
|
||||||
@@ -57,6 +61,7 @@
|
|||||||
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
|
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||||
</Filters>
|
</Filters>
|
||||||
</RollingFile>
|
</RollingFile>
|
||||||
<RollingFile name="LogToFile" fileName="${env:WMSA_LOG_DIR:-/var/log/wmsa}/crawler-audit-${env:WMSA_SERVICE_NODE:-0}.log" filePattern="/var/log/wmsa/crawler-audit-${env:WMSA_SERVICE_NODE:-0}-log-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz"
|
<RollingFile name="LogToFile" fileName="${env:WMSA_LOG_DIR:-/var/log/wmsa}/crawler-audit-${env:WMSA_SERVICE_NODE:-0}.log" filePattern="/var/log/wmsa/crawler-audit-${env:WMSA_SERVICE_NODE:-0}-log-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz"
|
||||||
@@ -69,6 +74,16 @@
|
|||||||
<MarkerFilter marker="CRAWLER" onMatch="ALLOW" onMismatch="DENY" />
|
<MarkerFilter marker="CRAWLER" onMatch="ALLOW" onMismatch="DENY" />
|
||||||
</Filters>
|
</Filters>
|
||||||
</RollingFile>
|
</RollingFile>
|
||||||
|
<RollingFile name="LogToFile" fileName="${env:WMSA_LOG_DIR:-/var/log/wmsa}/converter-audit-${env:WMSA_SERVICE_NODE:-0}.log" filePattern="/var/log/wmsa/converter-audit-${env:WMSA_SERVICE_NODE:-0}-log-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz"
|
||||||
|
ignoreExceptions="false">
|
||||||
|
<PatternLayout>
|
||||||
|
<Pattern>%d{yyyy-MM-dd HH:mm:ss,SSS}: %msg{nolookups}%n</Pattern>
|
||||||
|
</PatternLayout>
|
||||||
|
<SizeBasedTriggeringPolicy size="100MB" />
|
||||||
|
<Filters>
|
||||||
|
<MarkerFilter marker="CONVERTER" onMatch="ALLOW" onMismatch="DENY" />
|
||||||
|
</Filters>
|
||||||
|
</RollingFile>
|
||||||
</Appenders>
|
</Appenders>
|
||||||
<Loggers>
|
<Loggers>
|
||||||
<Logger name="org.apache.zookeeper" level="WARN" />
|
<Logger name="org.apache.zookeeper" level="WARN" />
|
||||||
|
@@ -2,10 +2,11 @@ package nu.marginalia.actor;
|
|||||||
|
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import nu.marginalia.functions.execution.api.*;
|
import nu.marginalia.functions.execution.api.RpcFsmName;
|
||||||
|
import nu.marginalia.functions.execution.api.RpcProcessId;
|
||||||
import nu.marginalia.mq.MqMessageState;
|
import nu.marginalia.mq.MqMessageState;
|
||||||
import nu.marginalia.mq.persistence.MqPersistence;
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
import nu.marginalia.process.ProcessService;
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.service.module.ServiceConfiguration;
|
import nu.marginalia.service.module.ServiceConfiguration;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@@ -14,18 +15,18 @@ import spark.Spark;
|
|||||||
@Singleton
|
@Singleton
|
||||||
public class ActorApi {
|
public class ActorApi {
|
||||||
private final ExecutorActorControlService actors;
|
private final ExecutorActorControlService actors;
|
||||||
private final ProcessService processService;
|
private final ProcessSpawnerService processSpawnerService;
|
||||||
private final MqPersistence mqPersistence;
|
private final MqPersistence mqPersistence;
|
||||||
private final ServiceConfiguration serviceConfiguration;
|
private final ServiceConfiguration serviceConfiguration;
|
||||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||||
@Inject
|
@Inject
|
||||||
public ActorApi(ExecutorActorControlService actors,
|
public ActorApi(ExecutorActorControlService actors,
|
||||||
ProcessService processService,
|
ProcessSpawnerService processSpawnerService,
|
||||||
MqPersistence mqPersistence,
|
MqPersistence mqPersistence,
|
||||||
ServiceConfiguration serviceConfiguration)
|
ServiceConfiguration serviceConfiguration)
|
||||||
{
|
{
|
||||||
this.actors = actors;
|
this.actors = actors;
|
||||||
this.processService = processService;
|
this.processSpawnerService = processSpawnerService;
|
||||||
this.mqPersistence = mqPersistence;
|
this.mqPersistence = mqPersistence;
|
||||||
this.serviceConfiguration = serviceConfiguration;
|
this.serviceConfiguration = serviceConfiguration;
|
||||||
}
|
}
|
||||||
@@ -43,7 +44,7 @@ public class ActorApi {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Object stopProcess(RpcProcessId processId) {
|
public Object stopProcess(RpcProcessId processId) {
|
||||||
ProcessService.ProcessId id = ProcessService.translateExternalIdBase(processId.getProcessId());
|
ProcessSpawnerService.ProcessId id = ProcessSpawnerService.translateExternalIdBase(processId.getProcessId());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
String inbox = id.name().toLowerCase() + ":" + serviceConfiguration.node();
|
String inbox = id.name().toLowerCase() + ":" + serviceConfiguration.node();
|
||||||
@@ -60,7 +61,7 @@ public class ActorApi {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
processService.kill(id);
|
processSpawnerService.kill(id);
|
||||||
}
|
}
|
||||||
catch (Exception ex) {
|
catch (Exception ex) {
|
||||||
logger.error("Failed to stop process {}", id, ex);
|
logger.error("Failed to stop process {}", id, ex);
|
||||||
|
@@ -4,11 +4,14 @@ import com.google.gson.Gson;
|
|||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import nu.marginalia.actor.prototype.RecordActorPrototype;
|
import nu.marginalia.actor.prototype.RecordActorPrototype;
|
||||||
import nu.marginalia.actor.state.*;
|
import nu.marginalia.actor.state.ActorResumeBehavior;
|
||||||
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
|
import nu.marginalia.actor.state.ActorStep;
|
||||||
import nu.marginalia.process.ProcessService;
|
import nu.marginalia.actor.state.Resume;
|
||||||
|
import nu.marginalia.actor.state.Terminal;
|
||||||
import nu.marginalia.mq.MqMessageState;
|
import nu.marginalia.mq.MqMessageState;
|
||||||
|
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
|
||||||
import nu.marginalia.mq.persistence.MqPersistence;
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.service.module.ServiceConfiguration;
|
import nu.marginalia.service.module.ServiceConfiguration;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@@ -24,13 +27,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||||||
public class AbstractProcessSpawnerActor extends RecordActorPrototype {
|
public class AbstractProcessSpawnerActor extends RecordActorPrototype {
|
||||||
|
|
||||||
private final MqPersistence persistence;
|
private final MqPersistence persistence;
|
||||||
private final ProcessService processService;
|
private final ProcessSpawnerService processSpawnerService;
|
||||||
|
|
||||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||||
|
|
||||||
public static final int MAX_ATTEMPTS = 3;
|
public static final int MAX_ATTEMPTS = 3;
|
||||||
private final String inboxName;
|
private final String inboxName;
|
||||||
private final ProcessService.ProcessId processId;
|
private final ProcessSpawnerService.ProcessId processId;
|
||||||
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
|
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
|
||||||
private final int node;
|
private final int node;
|
||||||
|
|
||||||
@@ -50,7 +53,7 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype {
|
|||||||
for (;;) {
|
for (;;) {
|
||||||
var messages = persistence.eavesdrop(inboxName, 1);
|
var messages = persistence.eavesdrop(inboxName, 1);
|
||||||
|
|
||||||
if (messages.isEmpty() && !processService.isRunning(processId)) {
|
if (messages.isEmpty() && !processSpawnerService.isRunning(processId)) {
|
||||||
synchronized (processId) {
|
synchronized (processId) {
|
||||||
processId.wait(5000);
|
processId.wait(5000);
|
||||||
}
|
}
|
||||||
@@ -92,7 +95,7 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype {
|
|||||||
catch (InterruptedException ex) {
|
catch (InterruptedException ex) {
|
||||||
// We get this exception when the process is cancelled by the user
|
// We get this exception when the process is cancelled by the user
|
||||||
|
|
||||||
processService.kill(processId);
|
processSpawnerService.kill(processId);
|
||||||
setCurrentMessageToDead();
|
setCurrentMessageToDead();
|
||||||
|
|
||||||
yield new Aborted();
|
yield new Aborted();
|
||||||
@@ -112,13 +115,13 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype {
|
|||||||
public AbstractProcessSpawnerActor(Gson gson,
|
public AbstractProcessSpawnerActor(Gson gson,
|
||||||
ServiceConfiguration configuration,
|
ServiceConfiguration configuration,
|
||||||
MqPersistence persistence,
|
MqPersistence persistence,
|
||||||
ProcessService processService,
|
ProcessSpawnerService processSpawnerService,
|
||||||
String inboxName,
|
String inboxName,
|
||||||
ProcessService.ProcessId processId) {
|
ProcessSpawnerService.ProcessId processId) {
|
||||||
super(gson);
|
super(gson);
|
||||||
this.node = configuration.node();
|
this.node = configuration.node();
|
||||||
this.persistence = persistence;
|
this.persistence = persistence;
|
||||||
this.processService = processService;
|
this.processSpawnerService = processSpawnerService;
|
||||||
this.inboxName = inboxName + ":" + node;
|
this.inboxName = inboxName + ":" + node;
|
||||||
this.processId = processId;
|
this.processId = processId;
|
||||||
}
|
}
|
||||||
@@ -149,7 +152,7 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype {
|
|||||||
// Run this call in a separate thread so that this thread can be interrupted waiting for it
|
// Run this call in a separate thread so that this thread can be interrupted waiting for it
|
||||||
executorService.submit(() -> {
|
executorService.submit(() -> {
|
||||||
try {
|
try {
|
||||||
processService.trigger(processId);
|
processSpawnerService.trigger(processId);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn("Error in triggering process", e);
|
logger.warn("Error in triggering process", e);
|
||||||
error.set(true);
|
error.set(true);
|
||||||
|
@@ -4,9 +4,9 @@ import com.google.gson.Gson;
|
|||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
||||||
import nu.marginalia.process.ProcessService;
|
|
||||||
import nu.marginalia.mq.persistence.MqPersistence;
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||||
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.service.module.ServiceConfiguration;
|
import nu.marginalia.service.module.ServiceConfiguration;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
@@ -17,13 +17,13 @@ public class ConverterMonitorActor extends AbstractProcessSpawnerActor {
|
|||||||
public ConverterMonitorActor(Gson gson,
|
public ConverterMonitorActor(Gson gson,
|
||||||
ServiceConfiguration configuration,
|
ServiceConfiguration configuration,
|
||||||
MqPersistence persistence,
|
MqPersistence persistence,
|
||||||
ProcessService processService) {
|
ProcessSpawnerService processSpawnerService) {
|
||||||
super(gson,
|
super(gson,
|
||||||
configuration,
|
configuration,
|
||||||
persistence,
|
persistence,
|
||||||
processService,
|
processSpawnerService,
|
||||||
ProcessInboxNames.CONVERTER_INBOX,
|
ProcessInboxNames.CONVERTER_INBOX,
|
||||||
ProcessService.ProcessId.CONVERTER);
|
ProcessSpawnerService.ProcessId.CONVERTER);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@@ -4,9 +4,9 @@ import com.google.gson.Gson;
|
|||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
||||||
import nu.marginalia.process.ProcessService;
|
|
||||||
import nu.marginalia.mq.persistence.MqPersistence;
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||||
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.service.module.ServiceConfiguration;
|
import nu.marginalia.service.module.ServiceConfiguration;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
@@ -16,13 +16,13 @@ public class CrawlerMonitorActor extends AbstractProcessSpawnerActor {
|
|||||||
public CrawlerMonitorActor(Gson gson,
|
public CrawlerMonitorActor(Gson gson,
|
||||||
ServiceConfiguration configuration,
|
ServiceConfiguration configuration,
|
||||||
MqPersistence persistence,
|
MqPersistence persistence,
|
||||||
ProcessService processService) {
|
ProcessSpawnerService processSpawnerService) {
|
||||||
super(gson,
|
super(gson,
|
||||||
configuration,
|
configuration,
|
||||||
persistence,
|
persistence,
|
||||||
processService,
|
processSpawnerService,
|
||||||
ProcessInboxNames.CRAWLER_INBOX,
|
ProcessInboxNames.CRAWLER_INBOX,
|
||||||
ProcessService.ProcessId.CRAWLER);
|
ProcessSpawnerService.ProcessId.CRAWLER);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@@ -6,7 +6,7 @@ import com.google.inject.Singleton;
|
|||||||
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
||||||
import nu.marginalia.mq.persistence.MqPersistence;
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||||
import nu.marginalia.process.ProcessService;
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.service.module.ServiceConfiguration;
|
import nu.marginalia.service.module.ServiceConfiguration;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
@@ -16,13 +16,13 @@ public class ExportTaskMonitorActor extends AbstractProcessSpawnerActor {
|
|||||||
public ExportTaskMonitorActor(Gson gson,
|
public ExportTaskMonitorActor(Gson gson,
|
||||||
ServiceConfiguration configuration,
|
ServiceConfiguration configuration,
|
||||||
MqPersistence persistence,
|
MqPersistence persistence,
|
||||||
ProcessService processService) {
|
ProcessSpawnerService processSpawnerService) {
|
||||||
super(gson,
|
super(gson,
|
||||||
configuration,
|
configuration,
|
||||||
persistence,
|
persistence,
|
||||||
processService,
|
processSpawnerService,
|
||||||
ProcessInboxNames.EXPORT_TASK_INBOX,
|
ProcessInboxNames.EXPORT_TASK_INBOX,
|
||||||
ProcessService.ProcessId.EXPORT_TASKS);
|
ProcessSpawnerService.ProcessId.EXPORT_TASKS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@@ -4,9 +4,9 @@ import com.google.gson.Gson;
|
|||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
||||||
import nu.marginalia.process.ProcessService;
|
|
||||||
import nu.marginalia.mq.persistence.MqPersistence;
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||||
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.service.module.ServiceConfiguration;
|
import nu.marginalia.service.module.ServiceConfiguration;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
@@ -17,13 +17,13 @@ public class IndexConstructorMonitorActor extends AbstractProcessSpawnerActor {
|
|||||||
public IndexConstructorMonitorActor(Gson gson,
|
public IndexConstructorMonitorActor(Gson gson,
|
||||||
ServiceConfiguration configuration,
|
ServiceConfiguration configuration,
|
||||||
MqPersistence persistence,
|
MqPersistence persistence,
|
||||||
ProcessService processService) {
|
ProcessSpawnerService processSpawnerService) {
|
||||||
super(gson,
|
super(gson,
|
||||||
configuration,
|
configuration,
|
||||||
persistence,
|
persistence,
|
||||||
processService,
|
processSpawnerService,
|
||||||
ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX,
|
ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX,
|
||||||
ProcessService.ProcessId.INDEX_CONSTRUCTOR);
|
ProcessSpawnerService.ProcessId.INDEX_CONSTRUCTOR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@@ -6,7 +6,7 @@ import com.google.inject.Singleton;
|
|||||||
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
||||||
import nu.marginalia.mq.persistence.MqPersistence;
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||||
import nu.marginalia.process.ProcessService;
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.service.module.ServiceConfiguration;
|
import nu.marginalia.service.module.ServiceConfiguration;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
@@ -16,13 +16,13 @@ public class LiveCrawlerMonitorActor extends AbstractProcessSpawnerActor {
|
|||||||
public LiveCrawlerMonitorActor(Gson gson,
|
public LiveCrawlerMonitorActor(Gson gson,
|
||||||
ServiceConfiguration configuration,
|
ServiceConfiguration configuration,
|
||||||
MqPersistence persistence,
|
MqPersistence persistence,
|
||||||
ProcessService processService) {
|
ProcessSpawnerService processSpawnerService) {
|
||||||
super(gson,
|
super(gson,
|
||||||
configuration,
|
configuration,
|
||||||
persistence,
|
persistence,
|
||||||
processService,
|
processSpawnerService,
|
||||||
ProcessInboxNames.LIVE_CRAWLER_INBOX,
|
ProcessInboxNames.LIVE_CRAWLER_INBOX,
|
||||||
ProcessService.ProcessId.LIVE_CRAWLER);
|
ProcessSpawnerService.ProcessId.LIVE_CRAWLER);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@@ -4,9 +4,9 @@ import com.google.gson.Gson;
|
|||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
||||||
import nu.marginalia.process.ProcessService;
|
|
||||||
import nu.marginalia.mq.persistence.MqPersistence;
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||||
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.service.module.ServiceConfiguration;
|
import nu.marginalia.service.module.ServiceConfiguration;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
@@ -17,13 +17,13 @@ public class LoaderMonitorActor extends AbstractProcessSpawnerActor {
|
|||||||
public LoaderMonitorActor(Gson gson,
|
public LoaderMonitorActor(Gson gson,
|
||||||
ServiceConfiguration configuration,
|
ServiceConfiguration configuration,
|
||||||
MqPersistence persistence,
|
MqPersistence persistence,
|
||||||
ProcessService processService) {
|
ProcessSpawnerService processSpawnerService) {
|
||||||
|
|
||||||
super(gson,
|
super(gson,
|
||||||
configuration,
|
configuration,
|
||||||
persistence, processService,
|
persistence, processSpawnerService,
|
||||||
ProcessInboxNames.LOADER_INBOX,
|
ProcessInboxNames.LOADER_INBOX,
|
||||||
ProcessService.ProcessId.LOADER);
|
ProcessSpawnerService.ProcessId.LOADER);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@@ -6,7 +6,7 @@ import com.google.inject.Singleton;
|
|||||||
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
||||||
import nu.marginalia.mq.persistence.MqPersistence;
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||||
import nu.marginalia.process.ProcessService;
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.service.module.ServiceConfiguration;
|
import nu.marginalia.service.module.ServiceConfiguration;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
@@ -16,13 +16,13 @@ public class NdpMonitorActor extends AbstractProcessSpawnerActor {
|
|||||||
public NdpMonitorActor(Gson gson,
|
public NdpMonitorActor(Gson gson,
|
||||||
ServiceConfiguration configuration,
|
ServiceConfiguration configuration,
|
||||||
MqPersistence persistence,
|
MqPersistence persistence,
|
||||||
ProcessService processService) {
|
ProcessSpawnerService processSpawnerService) {
|
||||||
super(gson,
|
super(gson,
|
||||||
configuration,
|
configuration,
|
||||||
persistence,
|
persistence,
|
||||||
processService,
|
processSpawnerService,
|
||||||
ProcessInboxNames.NDP_INBOX,
|
ProcessInboxNames.NDP_INBOX,
|
||||||
ProcessService.ProcessId.NDP);
|
ProcessSpawnerService.ProcessId.NDP);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@@ -13,7 +13,7 @@ import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
|
|||||||
import nu.marginalia.mq.persistence.MqPersistence;
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||||
import nu.marginalia.mqapi.ping.PingRequest;
|
import nu.marginalia.mqapi.ping.PingRequest;
|
||||||
import nu.marginalia.process.ProcessService;
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.service.module.ServiceConfiguration;
|
import nu.marginalia.service.module.ServiceConfiguration;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@@ -33,13 +33,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||||||
public class PingMonitorActor extends RecordActorPrototype {
|
public class PingMonitorActor extends RecordActorPrototype {
|
||||||
|
|
||||||
private final MqPersistence persistence;
|
private final MqPersistence persistence;
|
||||||
private final ProcessService processService;
|
private final ProcessSpawnerService processSpawnerService;
|
||||||
|
|
||||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||||
|
|
||||||
public static final int MAX_ATTEMPTS = 3;
|
public static final int MAX_ATTEMPTS = 3;
|
||||||
private final String inboxName;
|
private final String inboxName;
|
||||||
private final ProcessService.ProcessId processId;
|
private final ProcessSpawnerService.ProcessId processId;
|
||||||
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
|
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
|
||||||
private final int node;
|
private final int node;
|
||||||
private final Gson gson;
|
private final Gson gson;
|
||||||
@@ -68,7 +68,7 @@ public class PingMonitorActor extends RecordActorPrototype {
|
|||||||
for (;;) {
|
for (;;) {
|
||||||
var messages = persistence.eavesdrop(inboxName, 1);
|
var messages = persistence.eavesdrop(inboxName, 1);
|
||||||
|
|
||||||
if (messages.isEmpty() && !processService.isRunning(processId)) {
|
if (messages.isEmpty() && !processSpawnerService.isRunning(processId)) {
|
||||||
synchronized (processId) {
|
synchronized (processId) {
|
||||||
processId.wait(5000);
|
processId.wait(5000);
|
||||||
}
|
}
|
||||||
@@ -110,7 +110,7 @@ public class PingMonitorActor extends RecordActorPrototype {
|
|||||||
catch (InterruptedException ex) {
|
catch (InterruptedException ex) {
|
||||||
// We get this exception when the process is cancelled by the user
|
// We get this exception when the process is cancelled by the user
|
||||||
|
|
||||||
processService.kill(processId);
|
processSpawnerService.kill(processId);
|
||||||
setCurrentMessageToDead();
|
setCurrentMessageToDead();
|
||||||
|
|
||||||
yield new Aborted();
|
yield new Aborted();
|
||||||
@@ -130,14 +130,14 @@ public class PingMonitorActor extends RecordActorPrototype {
|
|||||||
public PingMonitorActor(Gson gson,
|
public PingMonitorActor(Gson gson,
|
||||||
ServiceConfiguration configuration,
|
ServiceConfiguration configuration,
|
||||||
MqPersistence persistence,
|
MqPersistence persistence,
|
||||||
ProcessService processService) throws SQLException {
|
ProcessSpawnerService processSpawnerService) throws SQLException {
|
||||||
super(gson);
|
super(gson);
|
||||||
this.gson = gson;
|
this.gson = gson;
|
||||||
this.node = configuration.node();
|
this.node = configuration.node();
|
||||||
this.persistence = persistence;
|
this.persistence = persistence;
|
||||||
this.processService = processService;
|
this.processSpawnerService = processSpawnerService;
|
||||||
this.inboxName = ProcessInboxNames.PING_INBOX + ":" + node;
|
this.inboxName = ProcessInboxNames.PING_INBOX + ":" + node;
|
||||||
this.processId = ProcessService.ProcessId.PING;
|
this.processId = ProcessSpawnerService.ProcessId.PING;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Sets the message to dead in the database to avoid
|
/** Sets the message to dead in the database to avoid
|
||||||
@@ -166,7 +166,7 @@ public class PingMonitorActor extends RecordActorPrototype {
|
|||||||
// Run this call in a separate thread so that this thread can be interrupted waiting for it
|
// Run this call in a separate thread so that this thread can be interrupted waiting for it
|
||||||
executorService.submit(() -> {
|
executorService.submit(() -> {
|
||||||
try {
|
try {
|
||||||
processService.trigger(processId);
|
processSpawnerService.trigger(processId);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn("Error in triggering process", e);
|
logger.warn("Error in triggering process", e);
|
||||||
error.set(true);
|
error.set(true);
|
||||||
|
@@ -8,7 +8,7 @@ import nu.marginalia.actor.prototype.RecordActorPrototype;
|
|||||||
import nu.marginalia.actor.state.ActorResumeBehavior;
|
import nu.marginalia.actor.state.ActorResumeBehavior;
|
||||||
import nu.marginalia.actor.state.ActorStep;
|
import nu.marginalia.actor.state.ActorStep;
|
||||||
import nu.marginalia.actor.state.Resume;
|
import nu.marginalia.actor.state.Resume;
|
||||||
import nu.marginalia.process.ProcessService;
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.service.control.ServiceEventLog;
|
import nu.marginalia.service.control.ServiceEventLog;
|
||||||
import nu.marginalia.service.module.ServiceConfiguration;
|
import nu.marginalia.service.module.ServiceConfiguration;
|
||||||
|
|
||||||
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit;
|
|||||||
public class ProcessLivenessMonitorActor extends RecordActorPrototype {
|
public class ProcessLivenessMonitorActor extends RecordActorPrototype {
|
||||||
|
|
||||||
private final ServiceEventLog eventLogService;
|
private final ServiceEventLog eventLogService;
|
||||||
private final ProcessService processService;
|
private final ProcessSpawnerService processSpawnerService;
|
||||||
private final HikariDataSource dataSource;
|
private final HikariDataSource dataSource;
|
||||||
|
|
||||||
private final int node;
|
private final int node;
|
||||||
@@ -49,7 +49,7 @@ public class ProcessLivenessMonitorActor extends RecordActorPrototype {
|
|||||||
var processId = heartbeat.getProcessId();
|
var processId = heartbeat.getProcessId();
|
||||||
if (null == processId) continue;
|
if (null == processId) continue;
|
||||||
|
|
||||||
if (processService.isRunning(processId) && heartbeat.lastSeenMillis() < 10_000)
|
if (processSpawnerService.isRunning(processId) && heartbeat.lastSeenMillis() < 10_000)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
flagProcessAsStopped(heartbeat);
|
flagProcessAsStopped(heartbeat);
|
||||||
@@ -72,12 +72,12 @@ public class ProcessLivenessMonitorActor extends RecordActorPrototype {
|
|||||||
public ProcessLivenessMonitorActor(Gson gson,
|
public ProcessLivenessMonitorActor(Gson gson,
|
||||||
ServiceEventLog eventLogService,
|
ServiceEventLog eventLogService,
|
||||||
ServiceConfiguration configuration,
|
ServiceConfiguration configuration,
|
||||||
ProcessService processService,
|
ProcessSpawnerService processSpawnerService,
|
||||||
HikariDataSource dataSource) {
|
HikariDataSource dataSource) {
|
||||||
super(gson);
|
super(gson);
|
||||||
this.node = configuration.node();
|
this.node = configuration.node();
|
||||||
this.eventLogService = eventLogService;
|
this.eventLogService = eventLogService;
|
||||||
this.processService = processService;
|
this.processSpawnerService = processSpawnerService;
|
||||||
this.dataSource = dataSource;
|
this.dataSource = dataSource;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -208,8 +208,8 @@ public class ProcessLivenessMonitorActor extends RecordActorPrototype {
|
|||||||
public boolean isRunning() {
|
public boolean isRunning() {
|
||||||
return "RUNNING".equals(status);
|
return "RUNNING".equals(status);
|
||||||
}
|
}
|
||||||
public ProcessService.ProcessId getProcessId() {
|
public ProcessSpawnerService.ProcessId getProcessId() {
|
||||||
return ProcessService.translateExternalIdBase(processBase);
|
return ProcessSpawnerService.translateExternalIdBase(processBase);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -47,6 +47,8 @@ public class ScrapeFeedsActor extends RecordActorPrototype {
|
|||||||
|
|
||||||
private final Path feedPath = WmsaHome.getHomePath().resolve("data/scrape-urls.txt");
|
private final Path feedPath = WmsaHome.getHomePath().resolve("data/scrape-urls.txt");
|
||||||
|
|
||||||
|
private static boolean insertFoundDomains = Boolean.getBoolean("loader.insertFoundDomains");
|
||||||
|
|
||||||
public record Initial() implements ActorStep {}
|
public record Initial() implements ActorStep {}
|
||||||
@Resume(behavior = ActorResumeBehavior.RETRY)
|
@Resume(behavior = ActorResumeBehavior.RETRY)
|
||||||
public record Wait(String ts) implements ActorStep {}
|
public record Wait(String ts) implements ActorStep {}
|
||||||
@@ -57,6 +59,8 @@ public class ScrapeFeedsActor extends RecordActorPrototype {
|
|||||||
public ActorStep transition(ActorStep self) throws Exception {
|
public ActorStep transition(ActorStep self) throws Exception {
|
||||||
return switch(self) {
|
return switch(self) {
|
||||||
case Initial() -> {
|
case Initial() -> {
|
||||||
|
if (!insertFoundDomains) yield new Error("Domain insertion prohibited, aborting");
|
||||||
|
|
||||||
if (nodeConfigurationService.get(nodeId).profile() != NodeProfile.REALTIME) {
|
if (nodeConfigurationService.get(nodeId).profile() != NodeProfile.REALTIME) {
|
||||||
yield new Error("Invalid node profile for RSS update");
|
yield new Error("Invalid node profile for RSS update");
|
||||||
}
|
}
|
||||||
|
@@ -3,11 +3,11 @@ package nu.marginalia.actor.task;
|
|||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import nu.marginalia.actor.state.ActorControlFlowException;
|
import nu.marginalia.actor.state.ActorControlFlowException;
|
||||||
import nu.marginalia.mq.MqMessageState;
|
|
||||||
import nu.marginalia.mq.persistence.MqPersistence;
|
|
||||||
import nu.marginalia.process.ProcessService;
|
|
||||||
import nu.marginalia.mq.MqMessage;
|
import nu.marginalia.mq.MqMessage;
|
||||||
|
import nu.marginalia.mq.MqMessageState;
|
||||||
import nu.marginalia.mq.outbox.MqOutbox;
|
import nu.marginalia.mq.outbox.MqOutbox;
|
||||||
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@@ -20,13 +20,13 @@ public class ActorProcessWatcher {
|
|||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(ActorProcessWatcher.class);
|
private static final Logger logger = LoggerFactory.getLogger(ActorProcessWatcher.class);
|
||||||
private final MqPersistence persistence;
|
private final MqPersistence persistence;
|
||||||
private final ProcessService processService;
|
private final ProcessSpawnerService processSpawnerService;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public ActorProcessWatcher(MqPersistence persistence,
|
public ActorProcessWatcher(MqPersistence persistence,
|
||||||
ProcessService processService) {
|
ProcessSpawnerService processSpawnerService) {
|
||||||
this.persistence = persistence;
|
this.persistence = persistence;
|
||||||
this.processService = processService;
|
this.processSpawnerService = processSpawnerService;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Wait for a process to start, and then wait for a response from the process,
|
/** Wait for a process to start, and then wait for a response from the process,
|
||||||
@@ -36,7 +36,7 @@ public class ActorProcessWatcher {
|
|||||||
* <p>
|
* <p>
|
||||||
* When interrupted, the process is killed and the message is marked as dead.
|
* When interrupted, the process is killed and the message is marked as dead.
|
||||||
*/
|
*/
|
||||||
public MqMessage waitResponse(MqOutbox outbox, ProcessService.ProcessId processId, long msgId)
|
public MqMessage waitResponse(MqOutbox outbox, ProcessSpawnerService.ProcessId processId, long msgId)
|
||||||
throws ActorControlFlowException, InterruptedException, SQLException
|
throws ActorControlFlowException, InterruptedException, SQLException
|
||||||
{
|
{
|
||||||
// enums values only have a single instance,
|
// enums values only have a single instance,
|
||||||
@@ -65,7 +65,7 @@ public class ActorProcessWatcher {
|
|||||||
// This will prevent the monitor process from attempting to respawn the process as we kill it
|
// This will prevent the monitor process from attempting to respawn the process as we kill it
|
||||||
|
|
||||||
outbox.flagAsDead(msgId);
|
outbox.flagAsDead(msgId);
|
||||||
processService.kill(processId);
|
processSpawnerService.kill(processId);
|
||||||
|
|
||||||
logger.info("Process {} killed due to interrupt", processId);
|
logger.info("Process {} killed due to interrupt", processId);
|
||||||
}
|
}
|
||||||
@@ -94,12 +94,12 @@ public class ActorProcessWatcher {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Wait the specified time for the specified process to start running (does not start the process) */
|
/** Wait the specified time for the specified process to start running (does not start the process) */
|
||||||
private boolean waitForProcess(ProcessService.ProcessId processId, TimeUnit unit, int duration) throws InterruptedException {
|
private boolean waitForProcess(ProcessSpawnerService.ProcessId processId, TimeUnit unit, int duration) throws InterruptedException {
|
||||||
|
|
||||||
// Wait for process to start
|
// Wait for process to start
|
||||||
long deadline = System.currentTimeMillis() + unit.toMillis(duration);
|
long deadline = System.currentTimeMillis() + unit.toMillis(duration);
|
||||||
while (System.currentTimeMillis() < deadline) {
|
while (System.currentTimeMillis() < deadline) {
|
||||||
if (processService.isRunning(processId))
|
if (processSpawnerService.isRunning(processId))
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
TimeUnit.MILLISECONDS.sleep(100);
|
TimeUnit.MILLISECONDS.sleep(100);
|
||||||
|
@@ -12,7 +12,7 @@ import nu.marginalia.mq.MqMessageState;
|
|||||||
import nu.marginalia.mq.outbox.MqOutbox;
|
import nu.marginalia.mq.outbox.MqOutbox;
|
||||||
import nu.marginalia.mqapi.converting.ConvertRequest;
|
import nu.marginalia.mqapi.converting.ConvertRequest;
|
||||||
import nu.marginalia.process.ProcessOutboxes;
|
import nu.marginalia.process.ProcessOutboxes;
|
||||||
import nu.marginalia.process.ProcessService;
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.sideload.RedditSideloadHelper;
|
import nu.marginalia.sideload.RedditSideloadHelper;
|
||||||
import nu.marginalia.sideload.SideloadHelper;
|
import nu.marginalia.sideload.SideloadHelper;
|
||||||
import nu.marginalia.sideload.StackExchangeSideloadHelper;
|
import nu.marginalia.sideload.StackExchangeSideloadHelper;
|
||||||
@@ -218,7 +218,7 @@ public class ConvertActor extends RecordActorPrototype {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
case ConvertWait(FileStorageId destFid, long msgId) -> {
|
case ConvertWait(FileStorageId destFid, long msgId) -> {
|
||||||
var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessService.ProcessId.CONVERTER, msgId);
|
var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessSpawnerService.ProcessId.CONVERTER, msgId);
|
||||||
|
|
||||||
if (rsp.state() != MqMessageState.OK) {
|
if (rsp.state() != MqMessageState.OK) {
|
||||||
yield new Error("Converter failed");
|
yield new Error("Converter failed");
|
||||||
|
@@ -18,7 +18,7 @@ import nu.marginalia.mqapi.index.IndexName;
|
|||||||
import nu.marginalia.mqapi.loading.LoadRequest;
|
import nu.marginalia.mqapi.loading.LoadRequest;
|
||||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||||
import nu.marginalia.process.ProcessOutboxes;
|
import nu.marginalia.process.ProcessOutboxes;
|
||||||
import nu.marginalia.process.ProcessService;
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.service.module.ServiceConfiguration;
|
import nu.marginalia.service.module.ServiceConfiguration;
|
||||||
import nu.marginalia.storage.FileStorageService;
|
import nu.marginalia.storage.FileStorageService;
|
||||||
import nu.marginalia.storage.model.FileStorageId;
|
import nu.marginalia.storage.model.FileStorageId;
|
||||||
@@ -95,7 +95,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
|
|||||||
case Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) when msgId < 0 ->
|
case Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) when msgId < 0 ->
|
||||||
new Convert(crawlId, processedId, mqConverterOutbox.sendAsync(ConvertRequest.forCrawlData(crawlId, processedId)));
|
new Convert(crawlId, processedId, mqConverterOutbox.sendAsync(ConvertRequest.forCrawlData(crawlId, processedId)));
|
||||||
case Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) -> {
|
case Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) -> {
|
||||||
var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessService.ProcessId.CONVERTER, msgId);
|
var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessSpawnerService.ProcessId.CONVERTER, msgId);
|
||||||
|
|
||||||
if (rsp.state() != MqMessageState.OK)
|
if (rsp.state() != MqMessageState.OK)
|
||||||
yield new Error("Converter failed");
|
yield new Error("Converter failed");
|
||||||
@@ -129,7 +129,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
|
|||||||
yield new Load(processedIds, id);
|
yield new Load(processedIds, id);
|
||||||
}
|
}
|
||||||
case Load(List<FileStorageId> processedIds, long msgId) -> {
|
case Load(List<FileStorageId> processedIds, long msgId) -> {
|
||||||
var rsp = processWatcher.waitResponse(mqLoaderOutbox, ProcessService.ProcessId.LOADER, msgId);
|
var rsp = processWatcher.waitResponse(mqLoaderOutbox, ProcessSpawnerService.ProcessId.LOADER, msgId);
|
||||||
|
|
||||||
if (rsp.state() != MqMessageState.OK) {
|
if (rsp.state() != MqMessageState.OK) {
|
||||||
yield new Error("Loader failed");
|
yield new Error("Loader failed");
|
||||||
@@ -165,7 +165,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
|
|||||||
}
|
}
|
||||||
case ReindexFwd(long id) when id < 0 -> new ReindexFwd(createIndex(IndexName.FORWARD));
|
case ReindexFwd(long id) when id < 0 -> new ReindexFwd(createIndex(IndexName.FORWARD));
|
||||||
case ReindexFwd(long id) -> {
|
case ReindexFwd(long id) -> {
|
||||||
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessService.ProcessId.INDEX_CONSTRUCTOR, id);
|
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessSpawnerService.ProcessId.INDEX_CONSTRUCTOR, id);
|
||||||
|
|
||||||
if (rsp.state() != MqMessageState.OK)
|
if (rsp.state() != MqMessageState.OK)
|
||||||
yield new Error("Forward index construction failed");
|
yield new Error("Forward index construction failed");
|
||||||
@@ -174,7 +174,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
|
|||||||
}
|
}
|
||||||
case ReindexFull(long id) when id < 0 -> new ReindexFull(createIndex(IndexName.REVERSE_FULL));
|
case ReindexFull(long id) when id < 0 -> new ReindexFull(createIndex(IndexName.REVERSE_FULL));
|
||||||
case ReindexFull(long id) -> {
|
case ReindexFull(long id) -> {
|
||||||
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessService.ProcessId.INDEX_CONSTRUCTOR, id);
|
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessSpawnerService.ProcessId.INDEX_CONSTRUCTOR, id);
|
||||||
|
|
||||||
if (rsp.state() != MqMessageState.OK)
|
if (rsp.state() != MqMessageState.OK)
|
||||||
yield new Error("Full index construction failed");
|
yield new Error("Full index construction failed");
|
||||||
@@ -183,7 +183,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
|
|||||||
}
|
}
|
||||||
case ReindexPrio(long id) when id < 0 -> new ReindexPrio(createIndex(IndexName.REVERSE_PRIO));
|
case ReindexPrio(long id) when id < 0 -> new ReindexPrio(createIndex(IndexName.REVERSE_PRIO));
|
||||||
case ReindexPrio(long id) -> {
|
case ReindexPrio(long id) -> {
|
||||||
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessService.ProcessId.INDEX_CONSTRUCTOR, id);
|
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessSpawnerService.ProcessId.INDEX_CONSTRUCTOR, id);
|
||||||
|
|
||||||
if (rsp.state() != MqMessageState.OK)
|
if (rsp.state() != MqMessageState.OK)
|
||||||
yield new Error("Prio index construction failed");
|
yield new Error("Prio index construction failed");
|
||||||
|
@@ -13,7 +13,7 @@ import nu.marginalia.mq.MqMessageState;
|
|||||||
import nu.marginalia.mq.outbox.MqOutbox;
|
import nu.marginalia.mq.outbox.MqOutbox;
|
||||||
import nu.marginalia.mqapi.crawling.CrawlRequest;
|
import nu.marginalia.mqapi.crawling.CrawlRequest;
|
||||||
import nu.marginalia.process.ProcessOutboxes;
|
import nu.marginalia.process.ProcessOutboxes;
|
||||||
import nu.marginalia.process.ProcessService;
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.storage.FileStorageService;
|
import nu.marginalia.storage.FileStorageService;
|
||||||
import nu.marginalia.storage.model.FileStorageId;
|
import nu.marginalia.storage.model.FileStorageId;
|
||||||
import nu.marginalia.storage.model.FileStorageType;
|
import nu.marginalia.storage.model.FileStorageType;
|
||||||
@@ -76,7 +76,7 @@ public class CrawlActor extends RecordActorPrototype {
|
|||||||
case Crawl (long msgId, FileStorageId fid, boolean cascadeLoad) -> {
|
case Crawl (long msgId, FileStorageId fid, boolean cascadeLoad) -> {
|
||||||
var rsp = processWatcher.waitResponse(
|
var rsp = processWatcher.waitResponse(
|
||||||
mqCrawlerOutbox,
|
mqCrawlerOutbox,
|
||||||
ProcessService.ProcessId.CRAWLER,
|
ProcessSpawnerService.ProcessId.CRAWLER,
|
||||||
msgId);
|
msgId);
|
||||||
|
|
||||||
if (rsp.state() != MqMessageState.OK) {
|
if (rsp.state() != MqMessageState.OK) {
|
||||||
|
@@ -10,7 +10,7 @@ import nu.marginalia.mq.outbox.MqOutbox;
|
|||||||
import nu.marginalia.mq.persistence.MqPersistence;
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
|
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
|
||||||
import nu.marginalia.process.ProcessOutboxes;
|
import nu.marginalia.process.ProcessOutboxes;
|
||||||
import nu.marginalia.process.ProcessService;
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.storage.FileStorageService;
|
import nu.marginalia.storage.FileStorageService;
|
||||||
import nu.marginalia.storage.model.FileStorageId;
|
import nu.marginalia.storage.model.FileStorageId;
|
||||||
import nu.marginalia.storage.model.FileStorageState;
|
import nu.marginalia.storage.model.FileStorageState;
|
||||||
@@ -55,7 +55,7 @@ public class ExportAtagsActor extends RecordActorPrototype {
|
|||||||
yield new Run(responseMsgId, crawlId, destId, newMsgId);
|
yield new Run(responseMsgId, crawlId, destId, newMsgId);
|
||||||
}
|
}
|
||||||
case Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId, long msgId) -> {
|
case Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId, long msgId) -> {
|
||||||
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
|
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
|
||||||
|
|
||||||
if (rsp.state() != MqMessageState.OK) {
|
if (rsp.state() != MqMessageState.OK) {
|
||||||
storageService.flagFileForDeletion(destId);
|
storageService.flagFileForDeletion(destId);
|
||||||
|
@@ -10,7 +10,7 @@ import nu.marginalia.mq.outbox.MqOutbox;
|
|||||||
import nu.marginalia.mq.persistence.MqPersistence;
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
|
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
|
||||||
import nu.marginalia.process.ProcessOutboxes;
|
import nu.marginalia.process.ProcessOutboxes;
|
||||||
import nu.marginalia.process.ProcessService;
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.storage.FileStorageService;
|
import nu.marginalia.storage.FileStorageService;
|
||||||
import nu.marginalia.storage.model.FileStorageId;
|
import nu.marginalia.storage.model.FileStorageId;
|
||||||
import nu.marginalia.storage.model.FileStorageState;
|
import nu.marginalia.storage.model.FileStorageState;
|
||||||
@@ -54,7 +54,7 @@ public class ExportFeedsActor extends RecordActorPrototype {
|
|||||||
yield new Run(responseMsgId, crawlId, destId, newMsgId);
|
yield new Run(responseMsgId, crawlId, destId, newMsgId);
|
||||||
}
|
}
|
||||||
case Run(long responseMsgId, _, FileStorageId destId, long msgId) -> {
|
case Run(long responseMsgId, _, FileStorageId destId, long msgId) -> {
|
||||||
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
|
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
|
||||||
|
|
||||||
if (rsp.state() != MqMessageState.OK) {
|
if (rsp.state() != MqMessageState.OK) {
|
||||||
storageService.flagFileForDeletion(destId);
|
storageService.flagFileForDeletion(destId);
|
||||||
|
@@ -9,7 +9,7 @@ import nu.marginalia.mq.MqMessageState;
|
|||||||
import nu.marginalia.mq.outbox.MqOutbox;
|
import nu.marginalia.mq.outbox.MqOutbox;
|
||||||
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
|
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
|
||||||
import nu.marginalia.process.ProcessOutboxes;
|
import nu.marginalia.process.ProcessOutboxes;
|
||||||
import nu.marginalia.process.ProcessService;
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.storage.FileStorageService;
|
import nu.marginalia.storage.FileStorageService;
|
||||||
import nu.marginalia.storage.model.FileStorageId;
|
import nu.marginalia.storage.model.FileStorageId;
|
||||||
import nu.marginalia.storage.model.FileStorageState;
|
import nu.marginalia.storage.model.FileStorageState;
|
||||||
@@ -52,7 +52,7 @@ public class ExportSampleDataActor extends RecordActorPrototype {
|
|||||||
yield new Run(crawlId, destId, size, ctFilter, name, newMsgId);
|
yield new Run(crawlId, destId, size, ctFilter, name, newMsgId);
|
||||||
}
|
}
|
||||||
case Run(_, FileStorageId destId, _, _, _, long msgId) -> {
|
case Run(_, FileStorageId destId, _, _, _, long msgId) -> {
|
||||||
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
|
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
|
||||||
|
|
||||||
if (rsp.state() != MqMessageState.OK) {
|
if (rsp.state() != MqMessageState.OK) {
|
||||||
storageService.flagFileForDeletion(destId);
|
storageService.flagFileForDeletion(destId);
|
||||||
|
@@ -10,7 +10,7 @@ import nu.marginalia.mq.outbox.MqOutbox;
|
|||||||
import nu.marginalia.mq.persistence.MqPersistence;
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
|
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
|
||||||
import nu.marginalia.process.ProcessOutboxes;
|
import nu.marginalia.process.ProcessOutboxes;
|
||||||
import nu.marginalia.process.ProcessService;
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.storage.FileStorageService;
|
import nu.marginalia.storage.FileStorageService;
|
||||||
import nu.marginalia.storage.model.FileStorageId;
|
import nu.marginalia.storage.model.FileStorageId;
|
||||||
import nu.marginalia.storage.model.FileStorageState;
|
import nu.marginalia.storage.model.FileStorageState;
|
||||||
@@ -52,7 +52,7 @@ public class ExportTermFreqActor extends RecordActorPrototype {
|
|||||||
yield new Run(responseMsgId, crawlId, destId, newMsgId);
|
yield new Run(responseMsgId, crawlId, destId, newMsgId);
|
||||||
}
|
}
|
||||||
case Run(long responseMsgId, _, FileStorageId destId, long msgId) -> {
|
case Run(long responseMsgId, _, FileStorageId destId, long msgId) -> {
|
||||||
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
|
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
|
||||||
|
|
||||||
if (rsp.state() != MqMessageState.OK) {
|
if (rsp.state() != MqMessageState.OK) {
|
||||||
storageService.flagFileForDeletion(destId);
|
storageService.flagFileForDeletion(destId);
|
||||||
|
@@ -13,7 +13,7 @@ import nu.marginalia.mq.MqMessageState;
|
|||||||
import nu.marginalia.mq.outbox.MqOutbox;
|
import nu.marginalia.mq.outbox.MqOutbox;
|
||||||
import nu.marginalia.mqapi.crawling.LiveCrawlRequest;
|
import nu.marginalia.mqapi.crawling.LiveCrawlRequest;
|
||||||
import nu.marginalia.process.ProcessOutboxes;
|
import nu.marginalia.process.ProcessOutboxes;
|
||||||
import nu.marginalia.process.ProcessService;
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.storage.FileStorageService;
|
import nu.marginalia.storage.FileStorageService;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@@ -74,7 +74,7 @@ public class LiveCrawlActor extends RecordActorPrototype {
|
|||||||
yield new LiveCrawl(feedsHash, id);
|
yield new LiveCrawl(feedsHash, id);
|
||||||
}
|
}
|
||||||
case LiveCrawl(String feedsHash, long msgId) -> {
|
case LiveCrawl(String feedsHash, long msgId) -> {
|
||||||
var rsp = processWatcher.waitResponse(mqLiveCrawlerOutbox, ProcessService.ProcessId.LIVE_CRAWLER, msgId);
|
var rsp = processWatcher.waitResponse(mqLiveCrawlerOutbox, ProcessSpawnerService.ProcessId.LIVE_CRAWLER, msgId);
|
||||||
|
|
||||||
if (rsp.state() != MqMessageState.OK) {
|
if (rsp.state() != MqMessageState.OK) {
|
||||||
yield new Error("Crawler failed");
|
yield new Error("Crawler failed");
|
||||||
|
@@ -11,7 +11,7 @@ import nu.marginalia.mq.MqMessageState;
|
|||||||
import nu.marginalia.mq.outbox.MqOutbox;
|
import nu.marginalia.mq.outbox.MqOutbox;
|
||||||
import nu.marginalia.mqapi.crawling.CrawlRequest;
|
import nu.marginalia.mqapi.crawling.CrawlRequest;
|
||||||
import nu.marginalia.process.ProcessOutboxes;
|
import nu.marginalia.process.ProcessOutboxes;
|
||||||
import nu.marginalia.process.ProcessService;
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import nu.marginalia.storage.FileStorageService;
|
import nu.marginalia.storage.FileStorageService;
|
||||||
import nu.marginalia.storage.model.FileStorageId;
|
import nu.marginalia.storage.model.FileStorageId;
|
||||||
import nu.marginalia.storage.model.FileStorageType;
|
import nu.marginalia.storage.model.FileStorageType;
|
||||||
@@ -51,7 +51,7 @@ public class RecrawlSingleDomainActor extends RecordActorPrototype {
|
|||||||
case Crawl (long msgId) -> {
|
case Crawl (long msgId) -> {
|
||||||
var rsp = processWatcher.waitResponse(
|
var rsp = processWatcher.waitResponse(
|
||||||
mqCrawlerOutbox,
|
mqCrawlerOutbox,
|
||||||
ProcessService.ProcessId.CRAWLER,
|
ProcessSpawnerService.ProcessId.CRAWLER,
|
||||||
msgId);
|
msgId);
|
||||||
|
|
||||||
if (rsp.state() != MqMessageState.OK) {
|
if (rsp.state() != MqMessageState.OK) {
|
||||||
|
@@ -9,7 +9,7 @@ import nu.marginalia.mq.MqMessageState;
|
|||||||
import nu.marginalia.mq.outbox.MqOutbox;
|
import nu.marginalia.mq.outbox.MqOutbox;
|
||||||
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
|
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
|
||||||
import nu.marginalia.process.ProcessOutboxes;
|
import nu.marginalia.process.ProcessOutboxes;
|
||||||
import nu.marginalia.process.ProcessService;
|
import nu.marginalia.process.ProcessSpawnerService;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@@ -34,7 +34,7 @@ public class TriggerAdjacencyCalculationActor extends RecordActorPrototype {
|
|||||||
yield new Run(newMsgId);
|
yield new Run(newMsgId);
|
||||||
}
|
}
|
||||||
case Run(long msgId) -> {
|
case Run(long msgId) -> {
|
||||||
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
|
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
|
||||||
|
|
||||||
if (rsp.state() != MqMessageState.OK) {
|
if (rsp.state() != MqMessageState.OK) {
|
||||||
yield new Error("Exporter failed");
|
yield new Error("Exporter failed");
|
||||||
|
@@ -29,7 +29,7 @@ import java.util.List;
|
|||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
public class ProcessService {
|
public class ProcessSpawnerService {
|
||||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||||
private final Marker processMarker = MarkerFactory.getMarker("PROCESS");
|
private final Marker processMarker = MarkerFactory.getMarker("PROCESS");
|
||||||
|
|
||||||
@@ -88,7 +88,7 @@ public class ProcessService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public ProcessService(BaseServiceParams params) {
|
public ProcessSpawnerService(BaseServiceParams params) {
|
||||||
this.eventLog = params.eventLog;
|
this.eventLog = params.eventLog;
|
||||||
this.node = params.configuration.node();
|
this.node = params.configuration.node();
|
||||||
}
|
}
|
@@ -19,6 +19,8 @@ import nu.marginalia.model.crawldata.CrawlerDocumentStatus;
|
|||||||
import nu.marginalia.model.idx.WordFlags;
|
import nu.marginalia.model.idx.WordFlags;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.slf4j.Marker;
|
||||||
|
import org.slf4j.MarkerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
@@ -37,6 +39,7 @@ public class DocumentProcessor {
|
|||||||
"text/plain",
|
"text/plain",
|
||||||
"application/pdf");
|
"application/pdf");
|
||||||
|
|
||||||
|
private final Marker converterAuditMarker = MarkerFactory.getMarker("CONVERTER");
|
||||||
|
|
||||||
private final List<AbstractDocumentProcessorPlugin> processorPlugins = new ArrayList<>();
|
private final List<AbstractDocumentProcessorPlugin> processorPlugins = new ArrayList<>();
|
||||||
private final AnchorTextKeywords anchorTextKeywords;
|
private final AnchorTextKeywords anchorTextKeywords;
|
||||||
@@ -81,12 +84,13 @@ public class DocumentProcessor {
|
|||||||
catch (DisqualifiedException ex) {
|
catch (DisqualifiedException ex) {
|
||||||
ret.state = UrlIndexingState.DISQUALIFIED;
|
ret.state = UrlIndexingState.DISQUALIFIED;
|
||||||
ret.stateReason = ex.reason.toString();
|
ret.stateReason = ex.reason.toString();
|
||||||
logger.debug("Disqualified {}: {}", ret.url, ex.reason);
|
logger.info(converterAuditMarker, "Disqualified {}: {}", ret.url, ex.reason);
|
||||||
}
|
}
|
||||||
catch (Exception ex) {
|
catch (Exception ex) {
|
||||||
ret.state = UrlIndexingState.DISQUALIFIED;
|
ret.state = UrlIndexingState.DISQUALIFIED;
|
||||||
ret.stateReason = DisqualifiedException.DisqualificationReason.PROCESSING_EXCEPTION.toString();
|
ret.stateReason = DisqualifiedException.DisqualificationReason.PROCESSING_EXCEPTION.toString();
|
||||||
logger.info("Failed to convert " + crawledDocument.url, ex);
|
logger.info(converterAuditMarker, "Failed to convert {}: {}", crawledDocument.url, ex.getClass().getSimpleName());
|
||||||
|
logger.warn(converterAuditMarker, "Failed to convert " + crawledDocument.url, ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
@@ -3,7 +3,6 @@ package nu.marginalia.converting.processor.logic;
|
|||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import com.google.inject.name.Named;
|
import com.google.inject.name.Named;
|
||||||
import nu.marginalia.converting.model.DisqualifiedException;
|
|
||||||
import nu.marginalia.language.model.DocumentLanguageData;
|
import nu.marginalia.language.model.DocumentLanguageData;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
@@ -26,12 +25,9 @@ public class DocumentLengthLogic {
|
|||||||
return (int) Math.round((totalWords / (double) numSentences) / 4.);
|
return (int) Math.round((totalWords / (double) numSentences) / 4.);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void validateLength(DocumentLanguageData dld,
|
public boolean validateLength(DocumentLanguageData dld, double modifier)
|
||||||
double modifier) throws DisqualifiedException
|
|
||||||
{
|
{
|
||||||
if (modifier * dld.totalNumWords() < minDocumentLength) {
|
return modifier * dld.totalNumWords() >= minDocumentLength;
|
||||||
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LENGTH);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@@ -68,6 +68,7 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
|
|||||||
private final HtmlProcessorSpecializations htmlProcessorSpecializations;
|
private final HtmlProcessorSpecializations htmlProcessorSpecializations;
|
||||||
|
|
||||||
private static final int MAX_DOCUMENT_LENGTH_BYTES = Integer.getInteger("converter.max-body-length",128_000);
|
private static final int MAX_DOCUMENT_LENGTH_BYTES = Integer.getInteger("converter.max-body-length",128_000);
|
||||||
|
private static boolean lenientProcessing = Boolean.getBoolean("converter.lenientProcessing");
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public HtmlDocumentProcessorPlugin(
|
public HtmlDocumentProcessorPlugin(
|
||||||
@@ -108,13 +109,13 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
|
|||||||
DocumentClass documentClass)
|
DocumentClass documentClass)
|
||||||
throws DisqualifiedException, URISyntaxException, IOException {
|
throws DisqualifiedException, URISyntaxException, IOException {
|
||||||
|
|
||||||
if (languageFilter.isBlockedUnicodeRange(crawledDocument.documentBody(512))) {
|
if (!lenientProcessing && languageFilter.isBlockedUnicodeRange(crawledDocument.documentBody(512))) {
|
||||||
throw new DisqualifiedException(DisqualificationReason.LANGUAGE);
|
throw new DisqualifiedException(DisqualificationReason.LANGUAGE);
|
||||||
}
|
}
|
||||||
|
|
||||||
Document doc = crawledDocument.parseBody();
|
Document doc = crawledDocument.parseBody();
|
||||||
|
|
||||||
if (AcceptableAds.hasAcceptableAdsTag(doc)) {
|
if (!lenientProcessing && AcceptableAds.hasAcceptableAdsTag(doc)) {
|
||||||
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.ACCEPTABLE_ADS);
|
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.ACCEPTABLE_ADS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -129,25 +130,27 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
|
|||||||
|
|
||||||
final var specialization = htmlProcessorSpecializations.select(generatorParts, url);
|
final var specialization = htmlProcessorSpecializations.select(generatorParts, url);
|
||||||
|
|
||||||
if (!specialization.shouldIndex(url)) {
|
if (!lenientProcessing && !specialization.shouldIndex(url)) {
|
||||||
throw new DisqualifiedException(DisqualificationReason.IRRELEVANT);
|
throw new DisqualifiedException(DisqualificationReason.IRRELEVANT);
|
||||||
}
|
}
|
||||||
|
|
||||||
var prunedDoc = specialization.prune(doc);
|
var prunedDoc = specialization.prune(doc);
|
||||||
|
|
||||||
|
|
||||||
final int length = getLength(doc);
|
final int length = getLength(doc);
|
||||||
final DocumentFormat format = getDocumentFormat(doc);
|
final DocumentFormat format = getDocumentFormat(doc);
|
||||||
final double quality = documentValuator.getQuality(crawledDocument, format, doc, length);
|
final double quality = documentValuator.getQuality(crawledDocument, format, doc, length);
|
||||||
|
|
||||||
if (isDisqualified(documentClass, url, quality, doc.title())) {
|
if (!lenientProcessing && isDisqualified(documentClass, url, quality, doc.title())) {
|
||||||
throw new DisqualifiedException(DisqualificationReason.QUALITY);
|
throw new DisqualifiedException(DisqualificationReason.QUALITY);
|
||||||
}
|
}
|
||||||
|
|
||||||
DocumentLanguageData dld = sentenceExtractorProvider.get().extractSentences(prunedDoc);
|
DocumentLanguageData dld = sentenceExtractorProvider.get().extractSentences(prunedDoc);
|
||||||
|
|
||||||
checkDocumentLanguage(dld);
|
checkDocumentLanguage(dld);
|
||||||
documentLengthLogic.validateLength(dld, specialization.lengthModifier() * documentClass.lengthLimitModifier());
|
|
||||||
|
if (!lenientProcessing && !documentLengthLogic.validateLength(dld, specialization.lengthModifier() * documentClass.lengthLimitModifier())) {
|
||||||
|
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LENGTH);
|
||||||
|
}
|
||||||
|
|
||||||
var ret = new ProcessedDocumentDetails();
|
var ret = new ProcessedDocumentDetails();
|
||||||
|
|
||||||
|
@@ -43,6 +43,7 @@ public class PdfDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
|
|||||||
private final DefaultSpecialization defaultSpecialization;
|
private final DefaultSpecialization defaultSpecialization;
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(PdfDocumentProcessorPlugin.class);
|
private static final Logger logger = LoggerFactory.getLogger(PdfDocumentProcessorPlugin.class);
|
||||||
|
private static boolean lenientProcessing = Boolean.getBoolean("converter.lenientProcessing");
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public PdfDocumentProcessorPlugin(@Named("max-title-length") Integer maxTitleLength,
|
public PdfDocumentProcessorPlugin(@Named("max-title-length") Integer maxTitleLength,
|
||||||
@@ -81,7 +82,7 @@ public class PdfDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
|
|||||||
|
|
||||||
String documentBody = crawledDocument.documentBody();
|
String documentBody = crawledDocument.documentBody();
|
||||||
|
|
||||||
if (languageFilter.isBlockedUnicodeRange(documentBody)) {
|
if (!lenientProcessing && languageFilter.isBlockedUnicodeRange(documentBody)) {
|
||||||
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LANGUAGE);
|
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LANGUAGE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -100,7 +101,9 @@ public class PdfDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
|
|||||||
|
|
||||||
checkDocumentLanguage(dld);
|
checkDocumentLanguage(dld);
|
||||||
|
|
||||||
documentLengthLogic.validateLength(dld, 1.0);
|
if (!lenientProcessing && !documentLengthLogic.validateLength(dld, 1.0)) {
|
||||||
|
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LENGTH);
|
||||||
|
}
|
||||||
|
|
||||||
var ret = new ProcessedDocumentDetails();
|
var ret = new ProcessedDocumentDetails();
|
||||||
|
|
||||||
|
@@ -37,6 +37,8 @@ public class PlainTextDocumentProcessorPlugin extends AbstractDocumentProcessorP
|
|||||||
private final ThreadLocalSentenceExtractorProvider sentenceExtractorProvider;
|
private final ThreadLocalSentenceExtractorProvider sentenceExtractorProvider;
|
||||||
private final DocumentLengthLogic documentLengthLogic;
|
private final DocumentLengthLogic documentLengthLogic;
|
||||||
|
|
||||||
|
private static boolean lenientProcessing = Boolean.getBoolean("converter.lenientProcessing");
|
||||||
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public PlainTextDocumentProcessorPlugin(@Named("max-title-length") Integer maxTitleLength,
|
public PlainTextDocumentProcessorPlugin(@Named("max-title-length") Integer maxTitleLength,
|
||||||
@@ -73,7 +75,7 @@ public class PlainTextDocumentProcessorPlugin extends AbstractDocumentProcessorP
|
|||||||
|
|
||||||
String documentBody = crawledDocument.documentBody();
|
String documentBody = crawledDocument.documentBody();
|
||||||
|
|
||||||
if (languageFilter.isBlockedUnicodeRange(documentBody)) {
|
if (!lenientProcessing && languageFilter.isBlockedUnicodeRange(documentBody)) {
|
||||||
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LANGUAGE);
|
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LANGUAGE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -83,7 +85,9 @@ public class PlainTextDocumentProcessorPlugin extends AbstractDocumentProcessorP
|
|||||||
|
|
||||||
checkDocumentLanguage(dld);
|
checkDocumentLanguage(dld);
|
||||||
|
|
||||||
documentLengthLogic.validateLength(dld, 1.0);
|
if (!lenientProcessing && !documentLengthLogic.validateLength(dld, 1.0)) {
|
||||||
|
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LENGTH);
|
||||||
|
}
|
||||||
|
|
||||||
var ret = new ProcessedDocumentDetails();
|
var ret = new ProcessedDocumentDetails();
|
||||||
|
|
||||||
|
@@ -28,6 +28,8 @@ public final class CrawledDocument implements SerializableCrawlData {
|
|||||||
@Nullable
|
@Nullable
|
||||||
public String headers;
|
public String headers;
|
||||||
|
|
||||||
|
private static int MAX_LENGTH_BYTES = 500_000;
|
||||||
|
|
||||||
public String documentBody() {
|
public String documentBody() {
|
||||||
return DocumentBodyToString.getStringData(
|
return DocumentBodyToString.getStringData(
|
||||||
ContentType.parse(contentType),
|
ContentType.parse(contentType),
|
||||||
@@ -65,7 +67,7 @@ public final class CrawledDocument implements SerializableCrawlData {
|
|||||||
return DocumentBodyToString.getParsedData(
|
return DocumentBodyToString.getParsedData(
|
||||||
ContentType.parse(contentType),
|
ContentType.parse(contentType),
|
||||||
documentBodyBytes,
|
documentBodyBytes,
|
||||||
200_000,
|
MAX_LENGTH_BYTES,
|
||||||
url);
|
url);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -40,6 +40,8 @@ public class LoaderMain extends ProcessMainClass {
|
|||||||
private final KeywordLoaderService keywordLoaderService;
|
private final KeywordLoaderService keywordLoaderService;
|
||||||
private final DocumentLoaderService documentLoaderService;
|
private final DocumentLoaderService documentLoaderService;
|
||||||
|
|
||||||
|
private static boolean insertFoundDomains = Boolean.getBoolean("loader.insertFoundDomains");
|
||||||
|
|
||||||
public static void main(String... args) {
|
public static void main(String... args) {
|
||||||
try {
|
try {
|
||||||
new org.mariadb.jdbc.Driver();
|
new org.mariadb.jdbc.Driver();
|
||||||
@@ -99,14 +101,29 @@ public class LoaderMain extends ProcessMainClass {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
var results = ForkJoinPool.commonPool()
|
var results = ForkJoinPool.commonPool()
|
||||||
.invokeAll(
|
.invokeAll(List.of());
|
||||||
List.of(
|
|
||||||
() -> linksService.loadLinks(domainIdRegistry, heartbeat, inputData),
|
if ( true == insertFoundDomains ) {
|
||||||
() -> keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, inputData),
|
results = ForkJoinPool.commonPool()
|
||||||
() -> documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, inputData),
|
.invokeAll(
|
||||||
() -> domainService.loadDomainMetadata(domainIdRegistry, heartbeat, inputData)
|
List.of(
|
||||||
)
|
() -> linksService.loadLinks(domainIdRegistry, heartbeat, inputData),
|
||||||
);
|
() -> keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, inputData),
|
||||||
|
() -> documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, inputData),
|
||||||
|
() -> domainService.loadDomainMetadata(domainIdRegistry, heartbeat, inputData)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
results = ForkJoinPool.commonPool()
|
||||||
|
.invokeAll(
|
||||||
|
List.of(
|
||||||
|
() -> keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, inputData),
|
||||||
|
() -> documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, inputData),
|
||||||
|
() -> domainService.loadDomainMetadata(domainIdRegistry, heartbeat, inputData)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
for (var result : results) {
|
for (var result : results) {
|
||||||
if (result.state() == Future.State.FAILED) {
|
if (result.state() == Future.State.FAILED) {
|
||||||
|
@@ -25,6 +25,8 @@ import java.util.Set;
|
|||||||
@Singleton
|
@Singleton
|
||||||
public class DomainLoaderService {
|
public class DomainLoaderService {
|
||||||
|
|
||||||
|
private static boolean insertFoundDomains = Boolean.getBoolean("loader.insertFoundDomains");
|
||||||
|
|
||||||
private final HikariDataSource dataSource;
|
private final HikariDataSource dataSource;
|
||||||
private final Logger logger = LoggerFactory.getLogger(DomainLoaderService.class);
|
private final Logger logger = LoggerFactory.getLogger(DomainLoaderService.class);
|
||||||
private final int nodeId;
|
private final int nodeId;
|
||||||
@@ -84,25 +86,34 @@ public class DomainLoaderService {
|
|||||||
|
|
||||||
// Add domains that are linked to from the domains we've just crawled, but with -1 affinity meaning they
|
// Add domains that are linked to from the domains we've just crawled, but with -1 affinity meaning they
|
||||||
// can be grabbed by any index node
|
// can be grabbed by any index node
|
||||||
try (var inserter = new DomainInserter(conn, -1);
|
if ( true == insertFoundDomains ) {
|
||||||
var processHeartbeat = heartbeat.createAdHocTaskHeartbeat("INSERT_LINKED_DOMAINS")) {
|
logger.info("Adding found domains");
|
||||||
// Add linked domains, but with -1 affinity meaning they can be grabbed by any index node
|
|
||||||
int pageIdx = 0;
|
|
||||||
|
|
||||||
for (SlopTable.Ref<SlopDomainLinkRecord> page : inputData.listDomainLinkPages()) {
|
try (var inserter = new DomainInserter(conn, -1);
|
||||||
processHeartbeat.progress("INSERT", pageIdx++, domainLinkPageRefs.size());
|
var processHeartbeat = heartbeat.createAdHocTaskHeartbeat("INSERT_LINKED_DOMAINS")) {
|
||||||
|
// Add linked domains, but with -1 affinity meaning they can be grabbed by any index node
|
||||||
|
int pageIdx = 0;
|
||||||
|
|
||||||
try (var reader = new SlopDomainLinkRecord.Reader(page)) {
|
for (SlopTable.Ref<SlopDomainLinkRecord> page : inputData.listDomainLinkPages()) {
|
||||||
while (reader.hasMore()) {
|
processHeartbeat.progress("INSERT", pageIdx++, domainLinkPageRefs.size());
|
||||||
SlopDomainLinkRecord record = reader.next();
|
|
||||||
String domainName = record.dest();
|
try (var reader = new SlopDomainLinkRecord.Reader(page)) {
|
||||||
if (domainNamesAll.add(domainName)) {
|
while (reader.hasMore()) {
|
||||||
inserter.accept(new EdgeDomain(domainName));
|
SlopDomainLinkRecord record = reader.next();
|
||||||
|
String domainName = record.dest();
|
||||||
|
if (domainNamesAll.add(domainName)) {
|
||||||
|
inserter.accept(new EdgeDomain(domainName));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
logger.info("Skipping found domains");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
taskHeartbeat.progress(Steps.UPDATE_AFFINITY_AND_IP);
|
taskHeartbeat.progress(Steps.UPDATE_AFFINITY_AND_IP);
|
||||||
|
|
||||||
|
@@ -30,10 +30,11 @@ public class ApiSearchOperator {
|
|||||||
|
|
||||||
public ApiSearchResults query(String query,
|
public ApiSearchResults query(String query,
|
||||||
int count,
|
int count,
|
||||||
|
int domainCount,
|
||||||
int index,
|
int index,
|
||||||
NsfwFilterTier filterTier)
|
NsfwFilterTier filterTier)
|
||||||
{
|
{
|
||||||
var rsp = queryClient.search(createParams(query, count, index, filterTier));
|
var rsp = queryClient.search(createParams(query, count, domainCount, index, filterTier));
|
||||||
|
|
||||||
return new ApiSearchResults("RESTRICTED", query,
|
return new ApiSearchResults("RESTRICTED", query,
|
||||||
rsp.results()
|
rsp.results()
|
||||||
@@ -44,13 +45,13 @@ public class ApiSearchOperator {
|
|||||||
.collect(Collectors.toList()));
|
.collect(Collectors.toList()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private QueryParams createParams(String query, int count, int index, NsfwFilterTier filterTirer) {
|
private QueryParams createParams(String query, int count, int domainCount, int index, NsfwFilterTier filterTirer) {
|
||||||
SearchSetIdentifier searchSet = selectSearchSet(index);
|
SearchSetIdentifier searchSet = selectSearchSet(index);
|
||||||
|
|
||||||
return new QueryParams(
|
return new QueryParams(
|
||||||
query,
|
query,
|
||||||
RpcQueryLimits.newBuilder()
|
RpcQueryLimits.newBuilder()
|
||||||
.setResultsByDomain(2)
|
.setResultsByDomain(Math.clamp(domainCount, 1, 100))
|
||||||
.setResultsTotal(Math.min(100, count))
|
.setResultsTotal(Math.min(100, count))
|
||||||
.setTimeoutMs(150)
|
.setTimeoutMs(150)
|
||||||
.setFetchSize(8192)
|
.setFetchSize(8192)
|
||||||
|
@@ -119,6 +119,7 @@ public class ApiService extends SparkService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
int count = intParam(request, "count", 20);
|
int count = intParam(request, "count", 20);
|
||||||
|
int domainCount = intParam(request, "dc", 2);
|
||||||
int index = intParam(request, "index", 3);
|
int index = intParam(request, "index", 3);
|
||||||
int nsfw = intParam(request, "nsfw", 1);
|
int nsfw = intParam(request, "nsfw", 1);
|
||||||
|
|
||||||
@@ -137,7 +138,7 @@ public class ApiService extends SparkService {
|
|||||||
.labels(license.key)
|
.labels(license.key)
|
||||||
.time(() ->
|
.time(() ->
|
||||||
searchOperator
|
searchOperator
|
||||||
.query(query, count, index, nsfwFilterTier)
|
.query(query, count, domainCount, index, nsfwFilterTier)
|
||||||
.withLicense(license.getLicense())
|
.withLicense(license.getLicense())
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@@ -20,7 +20,7 @@ public class BangCommand implements SearchCommandInterface {
|
|||||||
{
|
{
|
||||||
bangsToPattern.put("!g", "https://www.google.com/search?q=%s");
|
bangsToPattern.put("!g", "https://www.google.com/search?q=%s");
|
||||||
bangsToPattern.put("!ddg", "https://duckduckgo.com/?q=%s");
|
bangsToPattern.put("!ddg", "https://duckduckgo.com/?q=%s");
|
||||||
bangsToPattern.put("!w", "https://search.marginalia.nu/search?query=%s+site:en.wikipedia.org&profile=wiki");
|
bangsToPattern.put("!w", "https://old-search.marginalia.nu/search?query=%s+site:en.wikipedia.org&profile=wiki");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@@ -20,7 +20,7 @@ public class BangCommand implements SearchCommandInterface {
|
|||||||
{
|
{
|
||||||
bangsToPattern.put("!g", "https://www.google.com/search?q=%s");
|
bangsToPattern.put("!g", "https://www.google.com/search?q=%s");
|
||||||
bangsToPattern.put("!ddg", "https://duckduckgo.com/?q=%s");
|
bangsToPattern.put("!ddg", "https://duckduckgo.com/?q=%s");
|
||||||
bangsToPattern.put("!w", "https://search.marginalia.nu/search?query=%s+site:en.wikipedia.org&profile=wiki");
|
bangsToPattern.put("!w", "/search?query=%s+site:en.wikipedia.org");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -34,7 +34,7 @@ public class BangCommand implements SearchCommandInterface {
|
|||||||
|
|
||||||
if (match.isPresent()) {
|
if (match.isPresent()) {
|
||||||
var url = String.format(redirectPattern, URLEncoder.encode(match.get(), StandardCharsets.UTF_8));
|
var url = String.format(redirectPattern, URLEncoder.encode(match.get(), StandardCharsets.UTF_8));
|
||||||
new MapModelAndView("redirect.jte", Map.of("url", url));
|
return Optional.of(new MapModelAndView("redirect.jte", Map.of("url", url)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -0,0 +1,19 @@
|
|||||||
|
package nu.marginalia.search.command.commands;
|
||||||
|
|
||||||
|
import nu.marginalia.WebsiteUrl;
|
||||||
|
import nu.marginalia.search.command.SearchParameters;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
class BangCommandTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testWikipediaRedirect() {
|
||||||
|
BangCommand bc = new BangCommand();
|
||||||
|
|
||||||
|
assertTrue(bc.process(SearchParameters.defaultsForQuery(new WebsiteUrl("test"), "!w plato", 1)).isPresent());
|
||||||
|
assertFalse(bc.process(SearchParameters.defaultsForQuery(new WebsiteUrl("test"), "plato", 1)).isPresent());
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user