mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 07:32:38 +02:00
Compare commits
22 Commits
deploy-026
...
deploy-027
Author | SHA1 | Date | |
---|---|---|---|
|
52582a6d7d | ||
|
ec0e39ad32 | ||
|
6a15aee4b0 | ||
|
bd5111e8a2 | ||
|
1ecbeb0272 | ||
|
390f053406 | ||
|
b03c43224c | ||
|
9b4ce9e9eb | ||
|
81ac02a695 | ||
|
47f624fb3b | ||
|
c866f19cbb | ||
|
518278493b | ||
|
1ac0bab0b8 | ||
|
08b45ed10a | ||
|
f2cfb91973 | ||
|
2f79524eb3 | ||
|
3b00142c96 | ||
|
294ab19177 | ||
|
6f1659ecb2 | ||
|
982dcb28f0 | ||
|
fc686d8b2e | ||
|
69ef0f334a |
@@ -48,10 +48,6 @@ filter for any API consumer.
|
||||
|
||||
I've talked to the stract dev and he does not think it's a good idea to mimic their optics language, which is quite ad-hoc, but instead to work together to find some new common description language for this.
|
||||
|
||||
## Show favicons next to search results
|
||||
|
||||
This is expected from search engines. Basic proof of concept sketch of fetching this data has been done, but the feature is some way from being reality.
|
||||
|
||||
## Specialized crawler for github
|
||||
|
||||
One of the search engine's biggest limitations right now is that it does not index github at all. A specialized crawler that fetches at least the readme.md would go a long way toward providing search capabilities in this domain.
|
||||
@@ -66,6 +62,10 @@ The documents database probably should have some sort of flag indicating it's a
|
||||
PDF parsing is known to be a bit of a security liability so some thought needs to be put in
|
||||
that direction as well.
|
||||
|
||||
## Show favicons next to search results (COMPLETED 2025-03)
|
||||
|
||||
This is expected from search engines. Basic proof of concept sketch of fetching this data has been done, but the feature is some way from being reality.
|
||||
|
||||
## Web Design Overhaul (COMPLETED 2025-01)
|
||||
|
||||
The design is kinda clunky and hard to maintain, and needlessly outdated-looking.
|
||||
|
@@ -13,6 +13,7 @@ import nu.marginalia.service.discovery.property.ServicePartition;
|
||||
import nu.marginalia.util.NamedExecutorFactory;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.function.Function;
|
||||
|
||||
@Singleton
|
||||
@@ -20,10 +21,15 @@ public class GrpcChannelPoolFactory {
|
||||
|
||||
private final NodeConfigurationWatcher nodeConfigurationWatcher;
|
||||
private final ServiceRegistryIf serviceRegistryIf;
|
||||
private static final Executor executor = NamedExecutorFactory.createFixed("gRPC-Channel-Pool",
|
||||
Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 2, 32));
|
||||
private static final Executor offloadExecutor = NamedExecutorFactory.createFixed("gRPC-Offload-Pool",
|
||||
Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 2, 32));
|
||||
|
||||
private static final boolean useLoom = Boolean.getBoolean("system.experimentalUseLoom");
|
||||
|
||||
private static final Executor executor = useLoom
|
||||
? Executors.newVirtualThreadPerTaskExecutor()
|
||||
: NamedExecutorFactory.createFixed("gRPC-Channel-Pool", Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 2, 32));
|
||||
private static final Executor offloadExecutor = useLoom
|
||||
? Executors.newVirtualThreadPerTaskExecutor()
|
||||
: NamedExecutorFactory.createFixed("gRPC-Offload-Pool", Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 2, 32));
|
||||
|
||||
@Inject
|
||||
public GrpcChannelPoolFactory(NodeConfigurationWatcher nodeConfigurationWatcher,
|
||||
|
@@ -13,9 +13,14 @@ import nu.marginalia.util.NamedExecutorFactory;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
public class GrpcServer {
|
||||
private final Server server;
|
||||
|
||||
private static final boolean useLoom = Boolean.getBoolean("system.experimentalUseLoom");
|
||||
|
||||
public GrpcServer(ServiceConfiguration config,
|
||||
ServiceRegistryIf serviceRegistry,
|
||||
ServicePartition partition,
|
||||
@@ -26,8 +31,13 @@ public class GrpcServer {
|
||||
int nThreads = Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 2, 16);
|
||||
|
||||
// Start the gRPC server
|
||||
|
||||
ExecutorService workExecutor = useLoom ?
|
||||
Executors.newVirtualThreadPerTaskExecutor() :
|
||||
NamedExecutorFactory.createFixed("nettyExecutor", nThreads);
|
||||
|
||||
var grpcServerBuilder = NettyServerBuilder.forAddress(new InetSocketAddress(config.bindAddress(), port))
|
||||
.executor(NamedExecutorFactory.createFixed("nettyExecutor", nThreads))
|
||||
.executor(workExecutor)
|
||||
.workerEventLoopGroup(new NioEventLoopGroup(nThreads, NamedExecutorFactory.createFixed("Worker-ELG", nThreads)))
|
||||
.bossEventLoopGroup(new NioEventLoopGroup(nThreads, NamedExecutorFactory.createFixed("Boss-ELG", nThreads)))
|
||||
.channelType(NioServerSocketChannel.class);
|
||||
|
@@ -7,6 +7,7 @@
|
||||
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
</Filters>
|
||||
</Console>
|
||||
<Console name="ProcessConsole" target="SYSTEM_OUT">
|
||||
@@ -23,6 +24,7 @@
|
||||
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="PROCESS" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
</Filters>
|
||||
<SizeBasedTriggeringPolicy size="10MB" />
|
||||
</RollingFile>
|
||||
@@ -36,6 +38,16 @@
|
||||
<MarkerFilter marker="CRAWLER" onMatch="ALLOW" onMismatch="DENY" />
|
||||
</Filters>
|
||||
</RollingFile>
|
||||
<RollingFile name="LogToFile" fileName="${env:WMSA_LOG_DIR:-/var/log/wmsa}/converter-audit-${env:WMSA_SERVICE_NODE:-0}.log" filePattern="/var/log/wmsa/converter-audit-${env:WMSA_SERVICE_NODE:-0}-log-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz"
|
||||
ignoreExceptions="false">
|
||||
<PatternLayout>
|
||||
<Pattern>%d{yyyy-MM-dd HH:mm:ss,SSS}: %msg{nolookups}%n</Pattern>
|
||||
</PatternLayout>
|
||||
<SizeBasedTriggeringPolicy size="100MB" />
|
||||
<Filters>
|
||||
<MarkerFilter marker="CONVERTER" onMatch="ALLOW" onMismatch="DENY" />
|
||||
</Filters>
|
||||
</RollingFile>
|
||||
</Appenders>
|
||||
<Loggers>
|
||||
<Logger name="org.apache.zookeeper" level="WARN" />
|
||||
|
@@ -8,6 +8,7 @@
|
||||
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
</Filters>
|
||||
</Console>
|
||||
<Console name="ConsoleWarn" target="SYSTEM_OUT">
|
||||
@@ -18,6 +19,7 @@
|
||||
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
</Filters>
|
||||
</Console>
|
||||
<Console name="ConsoleError" target="SYSTEM_OUT">
|
||||
@@ -28,6 +30,7 @@
|
||||
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
</Filters>
|
||||
</Console>
|
||||
<Console name="ConsoleFatal" target="SYSTEM_OUT">
|
||||
@@ -38,6 +41,7 @@
|
||||
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
</Filters>
|
||||
</Console>
|
||||
<Console name="ProcessConsole" target="SYSTEM_OUT">
|
||||
@@ -57,6 +61,7 @@
|
||||
<MarkerFilter marker="QUERY" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="HTTP" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="CRAWLER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
<MarkerFilter marker="CONVERTER" onMatch="DENY" onMismatch="NEUTRAL" />
|
||||
</Filters>
|
||||
</RollingFile>
|
||||
<RollingFile name="LogToFile" fileName="${env:WMSA_LOG_DIR:-/var/log/wmsa}/crawler-audit-${env:WMSA_SERVICE_NODE:-0}.log" filePattern="/var/log/wmsa/crawler-audit-${env:WMSA_SERVICE_NODE:-0}-log-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz"
|
||||
@@ -69,6 +74,16 @@
|
||||
<MarkerFilter marker="CRAWLER" onMatch="ALLOW" onMismatch="DENY" />
|
||||
</Filters>
|
||||
</RollingFile>
|
||||
<RollingFile name="LogToFile" fileName="${env:WMSA_LOG_DIR:-/var/log/wmsa}/converter-audit-${env:WMSA_SERVICE_NODE:-0}.log" filePattern="/var/log/wmsa/converter-audit-${env:WMSA_SERVICE_NODE:-0}-log-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz"
|
||||
ignoreExceptions="false">
|
||||
<PatternLayout>
|
||||
<Pattern>%d{yyyy-MM-dd HH:mm:ss,SSS}: %msg{nolookups}%n</Pattern>
|
||||
</PatternLayout>
|
||||
<SizeBasedTriggeringPolicy size="100MB" />
|
||||
<Filters>
|
||||
<MarkerFilter marker="CONVERTER" onMatch="ALLOW" onMismatch="DENY" />
|
||||
</Filters>
|
||||
</RollingFile>
|
||||
</Appenders>
|
||||
<Loggers>
|
||||
<Logger name="org.apache.zookeeper" level="WARN" />
|
||||
|
@@ -9,6 +9,7 @@ import nu.marginalia.executor.storage.FileStorageFile;
|
||||
import nu.marginalia.executor.upload.UploadDirContents;
|
||||
import nu.marginalia.executor.upload.UploadDirItem;
|
||||
import nu.marginalia.functions.execution.api.*;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.service.client.GrpcChannelPoolFactory;
|
||||
import nu.marginalia.service.client.GrpcMultiNodeChannelPool;
|
||||
@@ -25,27 +26,37 @@ import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
|
||||
import static nu.marginalia.functions.execution.api.ExecutorApiGrpc.ExecutorApiBlockingStub;
|
||||
|
||||
@Singleton
|
||||
public class ExecutorClient {
|
||||
private final MqPersistence persistence;
|
||||
private final GrpcMultiNodeChannelPool<ExecutorApiBlockingStub> channelPool;
|
||||
private static final Logger logger = LoggerFactory.getLogger(ExecutorClient.class);
|
||||
private final ServiceRegistryIf registry;
|
||||
|
||||
@Inject
|
||||
public ExecutorClient(ServiceRegistryIf registry,
|
||||
MqPersistence persistence,
|
||||
GrpcChannelPoolFactory grpcChannelPoolFactory)
|
||||
{
|
||||
this.registry = registry;
|
||||
this.persistence = persistence;
|
||||
this.channelPool = grpcChannelPoolFactory
|
||||
.createMulti(
|
||||
ServiceKey.forGrpcApi(ExecutorApiGrpc.class, ServicePartition.multi()),
|
||||
ExecutorApiGrpc::newBlockingStub);
|
||||
}
|
||||
|
||||
private long createTrackingTokenMsg(String task, int node, Duration ttl) throws Exception {
|
||||
return persistence.sendNewMessage("task-tracking[" + node + "]", "export-client", null, task, "", ttl);
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void startFsm(int node, String actorName) {
|
||||
channelPool.call(ExecutorApiBlockingStub::startFsm)
|
||||
.forNode(node)
|
||||
@@ -96,6 +107,16 @@ public class ExecutorClient {
|
||||
.build());
|
||||
}
|
||||
|
||||
public long updateNsfwFilters() throws Exception {
|
||||
long msgId = createTrackingTokenMsg("nsfw-filters", 1, Duration.ofHours(6));
|
||||
|
||||
channelPool.call(ExecutorApiBlockingStub::updateNsfwFilters)
|
||||
.forNode(1)
|
||||
.run(RpcUpdateNsfwFilters.newBuilder().setMsgId(msgId).build());
|
||||
|
||||
return msgId;
|
||||
}
|
||||
|
||||
public ActorRunStates getActorStates(int node) {
|
||||
try {
|
||||
var rs = channelPool.call(ExecutorApiBlockingStub::getActorStates)
|
||||
|
@@ -18,6 +18,8 @@ service ExecutorApi {
|
||||
rpc calculateAdjacencies(Empty) returns (Empty) {}
|
||||
rpc restoreBackup(RpcFileStorageId) returns (Empty) {}
|
||||
|
||||
rpc updateNsfwFilters(RpcUpdateNsfwFilters) returns (Empty) {}
|
||||
|
||||
rpc restartExecutorService(Empty) returns (Empty) {}
|
||||
}
|
||||
|
||||
@@ -66,6 +68,9 @@ message RpcExportRequest {
|
||||
int64 fileStorageId = 1;
|
||||
int64 msgId = 2;
|
||||
}
|
||||
message RpcUpdateNsfwFilters {
|
||||
int64 msgId = 1;
|
||||
}
|
||||
message RpcFileStorageIdWithDomainName {
|
||||
int64 fileStorageId = 1;
|
||||
string targetDomainName = 2;
|
||||
|
@@ -2,10 +2,11 @@ package nu.marginalia.actor;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.functions.execution.api.*;
|
||||
import nu.marginalia.functions.execution.api.RpcFsmName;
|
||||
import nu.marginalia.functions.execution.api.RpcProcessId;
|
||||
import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -14,18 +15,18 @@ import spark.Spark;
|
||||
@Singleton
|
||||
public class ActorApi {
|
||||
private final ExecutorActorControlService actors;
|
||||
private final ProcessService processService;
|
||||
private final ProcessSpawnerService processSpawnerService;
|
||||
private final MqPersistence mqPersistence;
|
||||
private final ServiceConfiguration serviceConfiguration;
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
@Inject
|
||||
public ActorApi(ExecutorActorControlService actors,
|
||||
ProcessService processService,
|
||||
ProcessSpawnerService processSpawnerService,
|
||||
MqPersistence mqPersistence,
|
||||
ServiceConfiguration serviceConfiguration)
|
||||
{
|
||||
this.actors = actors;
|
||||
this.processService = processService;
|
||||
this.processSpawnerService = processSpawnerService;
|
||||
this.mqPersistence = mqPersistence;
|
||||
this.serviceConfiguration = serviceConfiguration;
|
||||
}
|
||||
@@ -43,7 +44,7 @@ public class ActorApi {
|
||||
}
|
||||
|
||||
public Object stopProcess(RpcProcessId processId) {
|
||||
ProcessService.ProcessId id = ProcessService.translateExternalIdBase(processId.getProcessId());
|
||||
ProcessSpawnerService.ProcessId id = ProcessSpawnerService.translateExternalIdBase(processId.getProcessId());
|
||||
|
||||
try {
|
||||
String inbox = id.name().toLowerCase() + ":" + serviceConfiguration.node();
|
||||
@@ -60,7 +61,7 @@ public class ActorApi {
|
||||
}
|
||||
|
||||
}
|
||||
processService.kill(id);
|
||||
processSpawnerService.kill(id);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.error("Failed to stop process {}", id, ex);
|
||||
|
@@ -6,7 +6,7 @@ import java.util.Set;
|
||||
|
||||
public enum ExecutorActor {
|
||||
PREC_EXPORT_ALL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||
SYNC_NSFW_LISTS(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||
UPDATE_NSFW_LISTS(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.SIDELOAD, NodeProfile.REALTIME),
|
||||
|
||||
CRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||
RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||
|
@@ -113,7 +113,7 @@ public class ExecutorActorControlService {
|
||||
register(ExecutorActor.UPDATE_RSS, updateRssActor);
|
||||
|
||||
register(ExecutorActor.MIGRATE_CRAWL_DATA, migrateCrawlDataActor);
|
||||
register(ExecutorActor.SYNC_NSFW_LISTS, updateNsfwFiltersActor);
|
||||
register(ExecutorActor.UPDATE_NSFW_LISTS, updateNsfwFiltersActor);
|
||||
|
||||
if (serviceConfiguration.node() == 1) {
|
||||
register(ExecutorActor.PREC_EXPORT_ALL, exportAllPrecessionActor);
|
||||
|
@@ -4,11 +4,14 @@ import com.google.gson.Gson;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.actor.prototype.RecordActorPrototype;
|
||||
import nu.marginalia.actor.state.*;
|
||||
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.actor.state.ActorResumeBehavior;
|
||||
import nu.marginalia.actor.state.ActorStep;
|
||||
import nu.marginalia.actor.state.Resume;
|
||||
import nu.marginalia.actor.state.Terminal;
|
||||
import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -24,13 +27,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
public class AbstractProcessSpawnerActor extends RecordActorPrototype {
|
||||
|
||||
private final MqPersistence persistence;
|
||||
private final ProcessService processService;
|
||||
private final ProcessSpawnerService processSpawnerService;
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
public static final int MAX_ATTEMPTS = 3;
|
||||
private final String inboxName;
|
||||
private final ProcessService.ProcessId processId;
|
||||
private final ProcessSpawnerService.ProcessId processId;
|
||||
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
|
||||
private final int node;
|
||||
|
||||
@@ -50,7 +53,7 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype {
|
||||
for (;;) {
|
||||
var messages = persistence.eavesdrop(inboxName, 1);
|
||||
|
||||
if (messages.isEmpty() && !processService.isRunning(processId)) {
|
||||
if (messages.isEmpty() && !processSpawnerService.isRunning(processId)) {
|
||||
synchronized (processId) {
|
||||
processId.wait(5000);
|
||||
}
|
||||
@@ -92,7 +95,7 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype {
|
||||
catch (InterruptedException ex) {
|
||||
// We get this exception when the process is cancelled by the user
|
||||
|
||||
processService.kill(processId);
|
||||
processSpawnerService.kill(processId);
|
||||
setCurrentMessageToDead();
|
||||
|
||||
yield new Aborted();
|
||||
@@ -112,13 +115,13 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype {
|
||||
public AbstractProcessSpawnerActor(Gson gson,
|
||||
ServiceConfiguration configuration,
|
||||
MqPersistence persistence,
|
||||
ProcessService processService,
|
||||
ProcessSpawnerService processSpawnerService,
|
||||
String inboxName,
|
||||
ProcessService.ProcessId processId) {
|
||||
ProcessSpawnerService.ProcessId processId) {
|
||||
super(gson);
|
||||
this.node = configuration.node();
|
||||
this.persistence = persistence;
|
||||
this.processService = processService;
|
||||
this.processSpawnerService = processSpawnerService;
|
||||
this.inboxName = inboxName + ":" + node;
|
||||
this.processId = processId;
|
||||
}
|
||||
@@ -149,7 +152,7 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype {
|
||||
// Run this call in a separate thread so that this thread can be interrupted waiting for it
|
||||
executorService.submit(() -> {
|
||||
try {
|
||||
processService.trigger(processId);
|
||||
processSpawnerService.trigger(processId);
|
||||
} catch (Exception e) {
|
||||
logger.warn("Error in triggering process", e);
|
||||
error.set(true);
|
||||
|
@@ -4,9 +4,9 @@ import com.google.gson.Gson;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
|
||||
@Singleton
|
||||
@@ -17,13 +17,13 @@ public class ConverterMonitorActor extends AbstractProcessSpawnerActor {
|
||||
public ConverterMonitorActor(Gson gson,
|
||||
ServiceConfiguration configuration,
|
||||
MqPersistence persistence,
|
||||
ProcessService processService) {
|
||||
ProcessSpawnerService processSpawnerService) {
|
||||
super(gson,
|
||||
configuration,
|
||||
persistence,
|
||||
processService,
|
||||
processSpawnerService,
|
||||
ProcessInboxNames.CONVERTER_INBOX,
|
||||
ProcessService.ProcessId.CONVERTER);
|
||||
ProcessSpawnerService.ProcessId.CONVERTER);
|
||||
}
|
||||
|
||||
|
||||
|
@@ -4,9 +4,9 @@ import com.google.gson.Gson;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
|
||||
@Singleton
|
||||
@@ -16,13 +16,13 @@ public class CrawlerMonitorActor extends AbstractProcessSpawnerActor {
|
||||
public CrawlerMonitorActor(Gson gson,
|
||||
ServiceConfiguration configuration,
|
||||
MqPersistence persistence,
|
||||
ProcessService processService) {
|
||||
ProcessSpawnerService processSpawnerService) {
|
||||
super(gson,
|
||||
configuration,
|
||||
persistence,
|
||||
processService,
|
||||
processSpawnerService,
|
||||
ProcessInboxNames.CRAWLER_INBOX,
|
||||
ProcessService.ProcessId.CRAWLER);
|
||||
ProcessSpawnerService.ProcessId.CRAWLER);
|
||||
}
|
||||
|
||||
|
||||
|
@@ -6,7 +6,7 @@ import com.google.inject.Singleton;
|
||||
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
|
||||
@Singleton
|
||||
@@ -16,13 +16,13 @@ public class ExportTaskMonitorActor extends AbstractProcessSpawnerActor {
|
||||
public ExportTaskMonitorActor(Gson gson,
|
||||
ServiceConfiguration configuration,
|
||||
MqPersistence persistence,
|
||||
ProcessService processService) {
|
||||
ProcessSpawnerService processSpawnerService) {
|
||||
super(gson,
|
||||
configuration,
|
||||
persistence,
|
||||
processService,
|
||||
processSpawnerService,
|
||||
ProcessInboxNames.EXPORT_TASK_INBOX,
|
||||
ProcessService.ProcessId.EXPORT_TASKS);
|
||||
ProcessSpawnerService.ProcessId.EXPORT_TASKS);
|
||||
}
|
||||
|
||||
|
||||
|
@@ -4,9 +4,9 @@ import com.google.gson.Gson;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
|
||||
@Singleton
|
||||
@@ -17,13 +17,13 @@ public class IndexConstructorMonitorActor extends AbstractProcessSpawnerActor {
|
||||
public IndexConstructorMonitorActor(Gson gson,
|
||||
ServiceConfiguration configuration,
|
||||
MqPersistence persistence,
|
||||
ProcessService processService) {
|
||||
ProcessSpawnerService processSpawnerService) {
|
||||
super(gson,
|
||||
configuration,
|
||||
persistence,
|
||||
processService,
|
||||
processSpawnerService,
|
||||
ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX,
|
||||
ProcessService.ProcessId.INDEX_CONSTRUCTOR);
|
||||
ProcessSpawnerService.ProcessId.INDEX_CONSTRUCTOR);
|
||||
}
|
||||
|
||||
|
||||
|
@@ -6,7 +6,7 @@ import com.google.inject.Singleton;
|
||||
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
|
||||
@Singleton
|
||||
@@ -16,13 +16,13 @@ public class LiveCrawlerMonitorActor extends AbstractProcessSpawnerActor {
|
||||
public LiveCrawlerMonitorActor(Gson gson,
|
||||
ServiceConfiguration configuration,
|
||||
MqPersistence persistence,
|
||||
ProcessService processService) {
|
||||
ProcessSpawnerService processSpawnerService) {
|
||||
super(gson,
|
||||
configuration,
|
||||
persistence,
|
||||
processService,
|
||||
processSpawnerService,
|
||||
ProcessInboxNames.LIVE_CRAWLER_INBOX,
|
||||
ProcessService.ProcessId.LIVE_CRAWLER);
|
||||
ProcessSpawnerService.ProcessId.LIVE_CRAWLER);
|
||||
}
|
||||
|
||||
|
||||
|
@@ -4,9 +4,9 @@ import com.google.gson.Gson;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
|
||||
@Singleton
|
||||
@@ -17,13 +17,13 @@ public class LoaderMonitorActor extends AbstractProcessSpawnerActor {
|
||||
public LoaderMonitorActor(Gson gson,
|
||||
ServiceConfiguration configuration,
|
||||
MqPersistence persistence,
|
||||
ProcessService processService) {
|
||||
ProcessSpawnerService processSpawnerService) {
|
||||
|
||||
super(gson,
|
||||
configuration,
|
||||
persistence, processService,
|
||||
persistence, processSpawnerService,
|
||||
ProcessInboxNames.LOADER_INBOX,
|
||||
ProcessService.ProcessId.LOADER);
|
||||
ProcessSpawnerService.ProcessId.LOADER);
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -6,7 +6,7 @@ import com.google.inject.Singleton;
|
||||
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
|
||||
@Singleton
|
||||
@@ -16,13 +16,13 @@ public class NdpMonitorActor extends AbstractProcessSpawnerActor {
|
||||
public NdpMonitorActor(Gson gson,
|
||||
ServiceConfiguration configuration,
|
||||
MqPersistence persistence,
|
||||
ProcessService processService) {
|
||||
ProcessSpawnerService processSpawnerService) {
|
||||
super(gson,
|
||||
configuration,
|
||||
persistence,
|
||||
processService,
|
||||
processSpawnerService,
|
||||
ProcessInboxNames.NDP_INBOX,
|
||||
ProcessService.ProcessId.NDP);
|
||||
ProcessSpawnerService.ProcessId.NDP);
|
||||
}
|
||||
|
||||
|
||||
|
@@ -13,7 +13,7 @@ import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||
import nu.marginalia.mqapi.ping.PingRequest;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -25,17 +25,21 @@ import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
||||
// Unlike other monitor actors, the ping monitor will not merely wait for a request
|
||||
// to be sent, but send one itself, hence we can't extend AbstractProcessSpawnerActor
|
||||
// but have to reimplement a lot of the same logic ourselves.
|
||||
@Singleton
|
||||
public class PingMonitorActor extends RecordActorPrototype {
|
||||
|
||||
private final MqPersistence persistence;
|
||||
private final ProcessService processService;
|
||||
private final ProcessSpawnerService processSpawnerService;
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
public static final int MAX_ATTEMPTS = 3;
|
||||
private final String inboxName;
|
||||
private final ProcessService.ProcessId processId;
|
||||
private final ProcessSpawnerService.ProcessId processId;
|
||||
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
|
||||
private final int node;
|
||||
private final Gson gson;
|
||||
@@ -53,7 +57,6 @@ public class PingMonitorActor extends RecordActorPrototype {
|
||||
return switch (self) {
|
||||
case Initial i -> {
|
||||
PingRequest request = new PingRequest();
|
||||
|
||||
persistence.sendNewMessage(inboxName, null, null,
|
||||
"PingRequest",
|
||||
gson.toJson(request),
|
||||
@@ -65,7 +68,7 @@ public class PingMonitorActor extends RecordActorPrototype {
|
||||
for (;;) {
|
||||
var messages = persistence.eavesdrop(inboxName, 1);
|
||||
|
||||
if (messages.isEmpty() && !processService.isRunning(processId)) {
|
||||
if (messages.isEmpty() && !processSpawnerService.isRunning(processId)) {
|
||||
synchronized (processId) {
|
||||
processId.wait(5000);
|
||||
}
|
||||
@@ -107,7 +110,7 @@ public class PingMonitorActor extends RecordActorPrototype {
|
||||
catch (InterruptedException ex) {
|
||||
// We get this exception when the process is cancelled by the user
|
||||
|
||||
processService.kill(processId);
|
||||
processSpawnerService.kill(processId);
|
||||
setCurrentMessageToDead();
|
||||
|
||||
yield new Aborted();
|
||||
@@ -127,14 +130,14 @@ public class PingMonitorActor extends RecordActorPrototype {
|
||||
public PingMonitorActor(Gson gson,
|
||||
ServiceConfiguration configuration,
|
||||
MqPersistence persistence,
|
||||
ProcessService processService) throws SQLException {
|
||||
ProcessSpawnerService processSpawnerService) throws SQLException {
|
||||
super(gson);
|
||||
this.gson = gson;
|
||||
this.node = configuration.node();
|
||||
this.persistence = persistence;
|
||||
this.processService = processService;
|
||||
this.processSpawnerService = processSpawnerService;
|
||||
this.inboxName = ProcessInboxNames.PING_INBOX + ":" + node;
|
||||
this.processId = ProcessService.ProcessId.PING;
|
||||
this.processId = ProcessSpawnerService.ProcessId.PING;
|
||||
}
|
||||
|
||||
/** Sets the message to dead in the database to avoid
|
||||
@@ -163,7 +166,7 @@ public class PingMonitorActor extends RecordActorPrototype {
|
||||
// Run this call in a separate thread so that this thread can be interrupted waiting for it
|
||||
executorService.submit(() -> {
|
||||
try {
|
||||
processService.trigger(processId);
|
||||
processSpawnerService.trigger(processId);
|
||||
} catch (Exception e) {
|
||||
logger.warn("Error in triggering process", e);
|
||||
error.set(true);
|
||||
|
@@ -8,7 +8,7 @@ import nu.marginalia.actor.prototype.RecordActorPrototype;
|
||||
import nu.marginalia.actor.state.ActorResumeBehavior;
|
||||
import nu.marginalia.actor.state.ActorStep;
|
||||
import nu.marginalia.actor.state.Resume;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.service.control.ServiceEventLog;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
|
||||
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit;
|
||||
public class ProcessLivenessMonitorActor extends RecordActorPrototype {
|
||||
|
||||
private final ServiceEventLog eventLogService;
|
||||
private final ProcessService processService;
|
||||
private final ProcessSpawnerService processSpawnerService;
|
||||
private final HikariDataSource dataSource;
|
||||
|
||||
private final int node;
|
||||
@@ -49,7 +49,7 @@ public class ProcessLivenessMonitorActor extends RecordActorPrototype {
|
||||
var processId = heartbeat.getProcessId();
|
||||
if (null == processId) continue;
|
||||
|
||||
if (processService.isRunning(processId) && heartbeat.lastSeenMillis() < 10_000)
|
||||
if (processSpawnerService.isRunning(processId) && heartbeat.lastSeenMillis() < 10_000)
|
||||
continue;
|
||||
|
||||
flagProcessAsStopped(heartbeat);
|
||||
@@ -72,12 +72,12 @@ public class ProcessLivenessMonitorActor extends RecordActorPrototype {
|
||||
public ProcessLivenessMonitorActor(Gson gson,
|
||||
ServiceEventLog eventLogService,
|
||||
ServiceConfiguration configuration,
|
||||
ProcessService processService,
|
||||
ProcessSpawnerService processSpawnerService,
|
||||
HikariDataSource dataSource) {
|
||||
super(gson);
|
||||
this.node = configuration.node();
|
||||
this.eventLogService = eventLogService;
|
||||
this.processService = processService;
|
||||
this.processSpawnerService = processSpawnerService;
|
||||
this.dataSource = dataSource;
|
||||
}
|
||||
|
||||
@@ -208,8 +208,8 @@ public class ProcessLivenessMonitorActor extends RecordActorPrototype {
|
||||
public boolean isRunning() {
|
||||
return "RUNNING".equals(status);
|
||||
}
|
||||
public ProcessService.ProcessId getProcessId() {
|
||||
return ProcessService.translateExternalIdBase(processBase);
|
||||
public ProcessSpawnerService.ProcessId getProcessId() {
|
||||
return ProcessSpawnerService.translateExternalIdBase(processBase);
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -47,6 +47,8 @@ public class ScrapeFeedsActor extends RecordActorPrototype {
|
||||
|
||||
private final Path feedPath = WmsaHome.getHomePath().resolve("data/scrape-urls.txt");
|
||||
|
||||
private static boolean insertFoundDomains = Boolean.getBoolean("loader.insertFoundDomains");
|
||||
|
||||
public record Initial() implements ActorStep {}
|
||||
@Resume(behavior = ActorResumeBehavior.RETRY)
|
||||
public record Wait(String ts) implements ActorStep {}
|
||||
@@ -57,6 +59,8 @@ public class ScrapeFeedsActor extends RecordActorPrototype {
|
||||
public ActorStep transition(ActorStep self) throws Exception {
|
||||
return switch(self) {
|
||||
case Initial() -> {
|
||||
if (!insertFoundDomains) yield new Error("Domain insertion prohibited, aborting");
|
||||
|
||||
if (nodeConfigurationService.get(nodeId).profile() != NodeProfile.REALTIME) {
|
||||
yield new Error("Invalid node profile for RSS update");
|
||||
}
|
||||
|
@@ -3,11 +3,11 @@ package nu.marginalia.actor.task;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.actor.state.ActorControlFlowException;
|
||||
import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.mq.MqMessage;
|
||||
import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -20,13 +20,13 @@ public class ActorProcessWatcher {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ActorProcessWatcher.class);
|
||||
private final MqPersistence persistence;
|
||||
private final ProcessService processService;
|
||||
private final ProcessSpawnerService processSpawnerService;
|
||||
|
||||
@Inject
|
||||
public ActorProcessWatcher(MqPersistence persistence,
|
||||
ProcessService processService) {
|
||||
ProcessSpawnerService processSpawnerService) {
|
||||
this.persistence = persistence;
|
||||
this.processService = processService;
|
||||
this.processSpawnerService = processSpawnerService;
|
||||
}
|
||||
|
||||
/** Wait for a process to start, and then wait for a response from the process,
|
||||
@@ -36,7 +36,7 @@ public class ActorProcessWatcher {
|
||||
* <p>
|
||||
* When interrupted, the process is killed and the message is marked as dead.
|
||||
*/
|
||||
public MqMessage waitResponse(MqOutbox outbox, ProcessService.ProcessId processId, long msgId)
|
||||
public MqMessage waitResponse(MqOutbox outbox, ProcessSpawnerService.ProcessId processId, long msgId)
|
||||
throws ActorControlFlowException, InterruptedException, SQLException
|
||||
{
|
||||
// enums values only have a single instance,
|
||||
@@ -65,7 +65,7 @@ public class ActorProcessWatcher {
|
||||
// This will prevent the monitor process from attempting to respawn the process as we kill it
|
||||
|
||||
outbox.flagAsDead(msgId);
|
||||
processService.kill(processId);
|
||||
processSpawnerService.kill(processId);
|
||||
|
||||
logger.info("Process {} killed due to interrupt", processId);
|
||||
}
|
||||
@@ -94,12 +94,12 @@ public class ActorProcessWatcher {
|
||||
}
|
||||
|
||||
/** Wait the specified time for the specified process to start running (does not start the process) */
|
||||
private boolean waitForProcess(ProcessService.ProcessId processId, TimeUnit unit, int duration) throws InterruptedException {
|
||||
private boolean waitForProcess(ProcessSpawnerService.ProcessId processId, TimeUnit unit, int duration) throws InterruptedException {
|
||||
|
||||
// Wait for process to start
|
||||
long deadline = System.currentTimeMillis() + unit.toMillis(duration);
|
||||
while (System.currentTimeMillis() < deadline) {
|
||||
if (processService.isRunning(processId))
|
||||
if (processSpawnerService.isRunning(processId))
|
||||
return true;
|
||||
|
||||
TimeUnit.MILLISECONDS.sleep(100);
|
||||
|
@@ -12,7 +12,7 @@ import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mqapi.converting.ConvertRequest;
|
||||
import nu.marginalia.process.ProcessOutboxes;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.sideload.RedditSideloadHelper;
|
||||
import nu.marginalia.sideload.SideloadHelper;
|
||||
import nu.marginalia.sideload.StackExchangeSideloadHelper;
|
||||
@@ -218,7 +218,7 @@ public class ConvertActor extends RecordActorPrototype {
|
||||
);
|
||||
}
|
||||
case ConvertWait(FileStorageId destFid, long msgId) -> {
|
||||
var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessService.ProcessId.CONVERTER, msgId);
|
||||
var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessSpawnerService.ProcessId.CONVERTER, msgId);
|
||||
|
||||
if (rsp.state() != MqMessageState.OK) {
|
||||
yield new Error("Converter failed");
|
||||
|
@@ -18,7 +18,7 @@ import nu.marginalia.mqapi.index.IndexName;
|
||||
import nu.marginalia.mqapi.loading.LoadRequest;
|
||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||
import nu.marginalia.process.ProcessOutboxes;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
@@ -95,7 +95,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
|
||||
case Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) when msgId < 0 ->
|
||||
new Convert(crawlId, processedId, mqConverterOutbox.sendAsync(ConvertRequest.forCrawlData(crawlId, processedId)));
|
||||
case Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) -> {
|
||||
var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessService.ProcessId.CONVERTER, msgId);
|
||||
var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessSpawnerService.ProcessId.CONVERTER, msgId);
|
||||
|
||||
if (rsp.state() != MqMessageState.OK)
|
||||
yield new Error("Converter failed");
|
||||
@@ -129,7 +129,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
|
||||
yield new Load(processedIds, id);
|
||||
}
|
||||
case Load(List<FileStorageId> processedIds, long msgId) -> {
|
||||
var rsp = processWatcher.waitResponse(mqLoaderOutbox, ProcessService.ProcessId.LOADER, msgId);
|
||||
var rsp = processWatcher.waitResponse(mqLoaderOutbox, ProcessSpawnerService.ProcessId.LOADER, msgId);
|
||||
|
||||
if (rsp.state() != MqMessageState.OK) {
|
||||
yield new Error("Loader failed");
|
||||
@@ -165,7 +165,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
|
||||
}
|
||||
case ReindexFwd(long id) when id < 0 -> new ReindexFwd(createIndex(IndexName.FORWARD));
|
||||
case ReindexFwd(long id) -> {
|
||||
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessService.ProcessId.INDEX_CONSTRUCTOR, id);
|
||||
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessSpawnerService.ProcessId.INDEX_CONSTRUCTOR, id);
|
||||
|
||||
if (rsp.state() != MqMessageState.OK)
|
||||
yield new Error("Forward index construction failed");
|
||||
@@ -174,7 +174,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
|
||||
}
|
||||
case ReindexFull(long id) when id < 0 -> new ReindexFull(createIndex(IndexName.REVERSE_FULL));
|
||||
case ReindexFull(long id) -> {
|
||||
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessService.ProcessId.INDEX_CONSTRUCTOR, id);
|
||||
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessSpawnerService.ProcessId.INDEX_CONSTRUCTOR, id);
|
||||
|
||||
if (rsp.state() != MqMessageState.OK)
|
||||
yield new Error("Full index construction failed");
|
||||
@@ -183,7 +183,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
|
||||
}
|
||||
case ReindexPrio(long id) when id < 0 -> new ReindexPrio(createIndex(IndexName.REVERSE_PRIO));
|
||||
case ReindexPrio(long id) -> {
|
||||
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessService.ProcessId.INDEX_CONSTRUCTOR, id);
|
||||
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessSpawnerService.ProcessId.INDEX_CONSTRUCTOR, id);
|
||||
|
||||
if (rsp.state() != MqMessageState.OK)
|
||||
yield new Error("Prio index construction failed");
|
||||
|
@@ -13,7 +13,7 @@ import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mqapi.crawling.CrawlRequest;
|
||||
import nu.marginalia.process.ProcessOutboxes;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import nu.marginalia.storage.model.FileStorageType;
|
||||
@@ -76,7 +76,7 @@ public class CrawlActor extends RecordActorPrototype {
|
||||
case Crawl (long msgId, FileStorageId fid, boolean cascadeLoad) -> {
|
||||
var rsp = processWatcher.waitResponse(
|
||||
mqCrawlerOutbox,
|
||||
ProcessService.ProcessId.CRAWLER,
|
||||
ProcessSpawnerService.ProcessId.CRAWLER,
|
||||
msgId);
|
||||
|
||||
if (rsp.state() != MqMessageState.OK) {
|
||||
|
@@ -10,7 +10,7 @@ import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
|
||||
import nu.marginalia.process.ProcessOutboxes;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import nu.marginalia.storage.model.FileStorageState;
|
||||
@@ -55,7 +55,7 @@ public class ExportAtagsActor extends RecordActorPrototype {
|
||||
yield new Run(responseMsgId, crawlId, destId, newMsgId);
|
||||
}
|
||||
case Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId, long msgId) -> {
|
||||
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
|
||||
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
|
||||
|
||||
if (rsp.state() != MqMessageState.OK) {
|
||||
storageService.flagFileForDeletion(destId);
|
||||
|
@@ -10,7 +10,7 @@ import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
|
||||
import nu.marginalia.process.ProcessOutboxes;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import nu.marginalia.storage.model.FileStorageState;
|
||||
@@ -54,7 +54,7 @@ public class ExportFeedsActor extends RecordActorPrototype {
|
||||
yield new Run(responseMsgId, crawlId, destId, newMsgId);
|
||||
}
|
||||
case Run(long responseMsgId, _, FileStorageId destId, long msgId) -> {
|
||||
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
|
||||
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
|
||||
|
||||
if (rsp.state() != MqMessageState.OK) {
|
||||
storageService.flagFileForDeletion(destId);
|
||||
|
@@ -9,7 +9,7 @@ import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
|
||||
import nu.marginalia.process.ProcessOutboxes;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import nu.marginalia.storage.model.FileStorageState;
|
||||
@@ -52,7 +52,7 @@ public class ExportSampleDataActor extends RecordActorPrototype {
|
||||
yield new Run(crawlId, destId, size, ctFilter, name, newMsgId);
|
||||
}
|
||||
case Run(_, FileStorageId destId, _, _, _, long msgId) -> {
|
||||
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
|
||||
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
|
||||
|
||||
if (rsp.state() != MqMessageState.OK) {
|
||||
storageService.flagFileForDeletion(destId);
|
||||
|
@@ -10,7 +10,7 @@ import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
|
||||
import nu.marginalia.process.ProcessOutboxes;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import nu.marginalia.storage.model.FileStorageState;
|
||||
@@ -52,7 +52,7 @@ public class ExportTermFreqActor extends RecordActorPrototype {
|
||||
yield new Run(responseMsgId, crawlId, destId, newMsgId);
|
||||
}
|
||||
case Run(long responseMsgId, _, FileStorageId destId, long msgId) -> {
|
||||
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
|
||||
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
|
||||
|
||||
if (rsp.state() != MqMessageState.OK) {
|
||||
storageService.flagFileForDeletion(destId);
|
||||
|
@@ -13,7 +13,7 @@ import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mqapi.crawling.LiveCrawlRequest;
|
||||
import nu.marginalia.process.ProcessOutboxes;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.slf4j.Logger;
|
||||
@@ -44,7 +44,6 @@ public class LiveCrawlActor extends RecordActorPrototype {
|
||||
|
||||
@Override
|
||||
public ActorStep transition(ActorStep self) throws Exception {
|
||||
logger.info("{}", self);
|
||||
return switch (self) {
|
||||
case Initial() -> {
|
||||
yield new Monitor("-");
|
||||
@@ -75,7 +74,7 @@ public class LiveCrawlActor extends RecordActorPrototype {
|
||||
yield new LiveCrawl(feedsHash, id);
|
||||
}
|
||||
case LiveCrawl(String feedsHash, long msgId) -> {
|
||||
var rsp = processWatcher.waitResponse(mqLiveCrawlerOutbox, ProcessService.ProcessId.LIVE_CRAWLER, msgId);
|
||||
var rsp = processWatcher.waitResponse(mqLiveCrawlerOutbox, ProcessSpawnerService.ProcessId.LIVE_CRAWLER, msgId);
|
||||
|
||||
if (rsp.state() != MqMessageState.OK) {
|
||||
yield new Error("Crawler failed");
|
||||
|
@@ -11,7 +11,7 @@ import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mqapi.crawling.CrawlRequest;
|
||||
import nu.marginalia.process.ProcessOutboxes;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import nu.marginalia.storage.model.FileStorageType;
|
||||
@@ -51,7 +51,7 @@ public class RecrawlSingleDomainActor extends RecordActorPrototype {
|
||||
case Crawl (long msgId) -> {
|
||||
var rsp = processWatcher.waitResponse(
|
||||
mqCrawlerOutbox,
|
||||
ProcessService.ProcessId.CRAWLER,
|
||||
ProcessSpawnerService.ProcessId.CRAWLER,
|
||||
msgId);
|
||||
|
||||
if (rsp.state() != MqMessageState.OK) {
|
||||
|
@@ -9,7 +9,7 @@ import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
|
||||
import nu.marginalia.process.ProcessOutboxes;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.process.ProcessSpawnerService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -34,7 +34,7 @@ public class TriggerAdjacencyCalculationActor extends RecordActorPrototype {
|
||||
yield new Run(newMsgId);
|
||||
}
|
||||
case Run(long msgId) -> {
|
||||
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
|
||||
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessSpawnerService.ProcessId.EXPORT_TASKS, msgId);
|
||||
|
||||
if (rsp.state() != MqMessageState.OK) {
|
||||
yield new Error("Exporter failed");
|
||||
|
@@ -5,6 +5,8 @@ import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.actor.prototype.RecordActorPrototype;
|
||||
import nu.marginalia.actor.state.ActorStep;
|
||||
import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.nsfw.NsfwDomainFilter;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
|
||||
@@ -12,23 +14,26 @@ import nu.marginalia.service.module.ServiceConfiguration;
|
||||
public class UpdateNsfwFiltersActor extends RecordActorPrototype {
|
||||
private final ServiceConfiguration serviceConfiguration;
|
||||
private final NsfwDomainFilter nsfwDomainFilter;
|
||||
private final MqPersistence persistence;
|
||||
|
||||
public record Initial() implements ActorStep {}
|
||||
public record Run() implements ActorStep {}
|
||||
public record Initial(long respondMsgId) implements ActorStep {}
|
||||
public record Run(long respondMsgId) implements ActorStep {}
|
||||
|
||||
@Override
|
||||
public ActorStep transition(ActorStep self) throws Exception {
|
||||
return switch(self) {
|
||||
case Initial() -> {
|
||||
case Initial(long respondMsgId) -> {
|
||||
if (serviceConfiguration.node() != 1) {
|
||||
persistence.updateMessageState(respondMsgId, MqMessageState.ERR);
|
||||
yield new Error("This actor can only run on node 1");
|
||||
}
|
||||
else {
|
||||
yield new Run();
|
||||
yield new Run(respondMsgId);
|
||||
}
|
||||
}
|
||||
case Run() -> {
|
||||
case Run(long respondMsgId) -> {
|
||||
nsfwDomainFilter.fetchLists();
|
||||
persistence.updateMessageState(respondMsgId, MqMessageState.OK);
|
||||
yield new End();
|
||||
}
|
||||
default -> new Error();
|
||||
@@ -43,11 +48,13 @@ public class UpdateNsfwFiltersActor extends RecordActorPrototype {
|
||||
@Inject
|
||||
public UpdateNsfwFiltersActor(Gson gson,
|
||||
ServiceConfiguration serviceConfiguration,
|
||||
NsfwDomainFilter nsfwDomainFilter)
|
||||
NsfwDomainFilter nsfwDomainFilter,
|
||||
MqPersistence persistence)
|
||||
{
|
||||
super(gson);
|
||||
this.serviceConfiguration = serviceConfiguration;
|
||||
this.nsfwDomainFilter = nsfwDomainFilter;
|
||||
this.persistence = persistence;
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -10,6 +10,7 @@ import nu.marginalia.actor.state.ActorStateInstance;
|
||||
import nu.marginalia.actor.task.DownloadSampleActor;
|
||||
import nu.marginalia.actor.task.RestoreBackupActor;
|
||||
import nu.marginalia.actor.task.TriggerAdjacencyCalculationActor;
|
||||
import nu.marginalia.actor.task.UpdateNsfwFiltersActor;
|
||||
import nu.marginalia.functions.execution.api.*;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
import nu.marginalia.service.server.DiscoverableService;
|
||||
@@ -263,4 +264,19 @@ public class ExecutorGrpcService
|
||||
System.exit(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateNsfwFilters(RpcUpdateNsfwFilters request, StreamObserver<Empty> responseObserver) {
|
||||
logger.info("Got request {}", request);
|
||||
try {
|
||||
actorControlService.startFrom(ExecutorActor.UPDATE_NSFW_LISTS,
|
||||
new UpdateNsfwFiltersActor.Initial(request.getMsgId()));
|
||||
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("Failed to update nsfw filters", e);
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -29,7 +29,7 @@ import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Singleton
|
||||
public class ProcessService {
|
||||
public class ProcessSpawnerService {
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
private final Marker processMarker = MarkerFactory.getMarker("PROCESS");
|
||||
|
||||
@@ -88,7 +88,7 @@ public class ProcessService {
|
||||
}
|
||||
|
||||
@Inject
|
||||
public ProcessService(BaseServiceParams params) {
|
||||
public ProcessSpawnerService(BaseServiceParams params) {
|
||||
this.eventLog = params.eventLog;
|
||||
this.node = params.configuration.node();
|
||||
}
|
@@ -11,6 +11,7 @@ import nu.marginalia.service.discovery.property.ServicePartition;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
|
||||
import javax.annotation.CheckReturnValue;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@@ -59,6 +60,11 @@ public class FeedsClient {
|
||||
.forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList())));
|
||||
}
|
||||
|
||||
public boolean waitReady(Duration duration) throws InterruptedException {
|
||||
return channelPool.awaitChannel(duration);
|
||||
}
|
||||
|
||||
|
||||
/** Get the hash of the feed data, for identifying when the data has been updated */
|
||||
public String getFeedDataHash() {
|
||||
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash)
|
||||
|
@@ -35,6 +35,7 @@ dependencies {
|
||||
implementation libs.bundles.slf4j
|
||||
implementation libs.commons.lang3
|
||||
implementation libs.commons.io
|
||||
implementation libs.httpclient
|
||||
implementation libs.wiremock
|
||||
|
||||
implementation libs.prometheus
|
||||
|
@@ -20,19 +20,36 @@ import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorage;
|
||||
import nu.marginalia.storage.model.FileStorageType;
|
||||
import nu.marginalia.util.SimpleBlockingThreadPool;
|
||||
import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
|
||||
import org.apache.hc.client5.http.classic.HttpClient;
|
||||
import org.apache.hc.client5.http.config.ConnectionConfig;
|
||||
import org.apache.hc.client5.http.config.RequestConfig;
|
||||
import org.apache.hc.client5.http.cookie.StandardCookieSpec;
|
||||
import org.apache.hc.client5.http.impl.classic.HttpClients;
|
||||
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
|
||||
import org.apache.hc.core5.http.Header;
|
||||
import org.apache.hc.core5.http.HeaderElement;
|
||||
import org.apache.hc.core5.http.HeaderElements;
|
||||
import org.apache.hc.core5.http.HttpResponse;
|
||||
import org.apache.hc.core5.http.io.SocketConfig;
|
||||
import org.apache.hc.core5.http.io.entity.EntityUtils;
|
||||
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
|
||||
import org.apache.hc.core5.http.message.MessageSupport;
|
||||
import org.apache.hc.core5.http.protocol.HttpContext;
|
||||
import org.apache.hc.core5.util.TimeValue;
|
||||
import org.apache.hc.core5.util.Timeout;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.sql.SQLException;
|
||||
import java.time.*;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
@@ -55,6 +72,8 @@ public class FeedFetcherService {
|
||||
|
||||
private final DomainCoordinator domainCoordinator;
|
||||
|
||||
private final HttpClient httpClient;
|
||||
|
||||
private volatile boolean updating;
|
||||
|
||||
@Inject
|
||||
@@ -71,6 +90,83 @@ public class FeedFetcherService {
|
||||
this.serviceHeartbeat = serviceHeartbeat;
|
||||
this.executorClient = executorClient;
|
||||
this.domainCoordinator = domainCoordinator;
|
||||
|
||||
final ConnectionConfig connectionConfig = ConnectionConfig.custom()
|
||||
.setSocketTimeout(15, TimeUnit.SECONDS)
|
||||
.setConnectTimeout(15, TimeUnit.SECONDS)
|
||||
.setValidateAfterInactivity(TimeValue.ofSeconds(5))
|
||||
.build();
|
||||
|
||||
|
||||
var connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
|
||||
.setMaxConnPerRoute(2)
|
||||
.setMaxConnTotal(50)
|
||||
.setDefaultConnectionConfig(connectionConfig)
|
||||
.build();
|
||||
|
||||
connectionManager.setDefaultSocketConfig(SocketConfig.custom()
|
||||
.setSoLinger(TimeValue.ofSeconds(-1))
|
||||
.setSoTimeout(Timeout.ofSeconds(10))
|
||||
.build()
|
||||
);
|
||||
|
||||
Thread.ofPlatform().daemon(true).start(() -> {
|
||||
try {
|
||||
for (;;) {
|
||||
TimeUnit.SECONDS.sleep(15);
|
||||
logger.info("Connection pool stats: {}", connectionManager.getTotalStats());
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
});
|
||||
|
||||
final RequestConfig defaultRequestConfig = RequestConfig.custom()
|
||||
.setCookieSpec(StandardCookieSpec.IGNORE)
|
||||
.setResponseTimeout(10, TimeUnit.SECONDS)
|
||||
.setConnectionRequestTimeout(5, TimeUnit.MINUTES)
|
||||
.build();
|
||||
|
||||
httpClient = HttpClients.custom()
|
||||
.setDefaultRequestConfig(defaultRequestConfig)
|
||||
.setConnectionManager(connectionManager)
|
||||
.setUserAgent(WmsaHome.getUserAgent().uaIdentifier())
|
||||
.setConnectionManager(connectionManager)
|
||||
.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
|
||||
// Default keep-alive duration is 3 minutes, but this is too long for us,
|
||||
// as we are either going to re-use it fairly quickly or close it for a long time.
|
||||
//
|
||||
// So we set it to 30 seconds or clamp the server-provided value to a minimum of 10 seconds.
|
||||
private static final TimeValue defaultValue = TimeValue.ofSeconds(30);
|
||||
|
||||
@Override
|
||||
public TimeValue getKeepAliveDuration(HttpResponse response, HttpContext context) {
|
||||
final Iterator<HeaderElement> it = MessageSupport.iterate(response, HeaderElements.KEEP_ALIVE);
|
||||
|
||||
while (it.hasNext()) {
|
||||
final HeaderElement he = it.next();
|
||||
final String param = he.getName();
|
||||
final String value = he.getValue();
|
||||
|
||||
if (value == null)
|
||||
continue;
|
||||
if (!"timeout".equalsIgnoreCase(param))
|
||||
continue;
|
||||
|
||||
try {
|
||||
long timeout = Long.parseLong(value);
|
||||
timeout = Math.clamp(timeout, 30, defaultValue.toSeconds());
|
||||
return TimeValue.ofSeconds(timeout);
|
||||
} catch (final NumberFormatException ignore) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return defaultValue;
|
||||
}
|
||||
})
|
||||
.build();
|
||||
|
||||
}
|
||||
|
||||
public enum UpdateMode {
|
||||
@@ -86,13 +182,7 @@ public class FeedFetcherService {
|
||||
|
||||
|
||||
try (FeedDbWriter writer = feedDb.createWriter();
|
||||
HttpClient client = HttpClient.newBuilder()
|
||||
.connectTimeout(Duration.ofSeconds(15))
|
||||
.executor(Executors.newCachedThreadPool())
|
||||
.followRedirects(HttpClient.Redirect.NORMAL)
|
||||
.version(HttpClient.Version.HTTP_2)
|
||||
.build();
|
||||
ExecutorService fetchExecutor = Executors.newCachedThreadPool();
|
||||
ExecutorService fetchExecutor = Executors.newVirtualThreadPerTaskExecutor();
|
||||
FeedJournal feedJournal = FeedJournal.create();
|
||||
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds")
|
||||
) {
|
||||
@@ -137,7 +227,8 @@ public class FeedFetcherService {
|
||||
|
||||
FetchResult feedData;
|
||||
try (DomainLock domainLock = domainCoordinator.lockDomain(new EdgeDomain(feed.domain()))) {
|
||||
feedData = fetchFeedData(feed, client, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
|
||||
feedData = fetchFeedData(feed, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
|
||||
TimeUnit.SECONDS.sleep(1); // Sleep before we yield the lock to avoid hammering the server from multiple processes
|
||||
} catch (Exception ex) {
|
||||
feedData = new FetchResult.TransientError();
|
||||
}
|
||||
@@ -216,7 +307,6 @@ public class FeedFetcherService {
|
||||
}
|
||||
|
||||
private FetchResult fetchFeedData(FeedDefinition feed,
|
||||
HttpClient client,
|
||||
ExecutorService executorService,
|
||||
@Nullable String ifModifiedSinceDate,
|
||||
@Nullable String ifNoneMatchTag)
|
||||
@@ -224,59 +314,63 @@ public class FeedFetcherService {
|
||||
try {
|
||||
URI uri = new URI(feed.feedUrl());
|
||||
|
||||
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
|
||||
.GET()
|
||||
.uri(uri)
|
||||
.header("User-Agent", WmsaHome.getUserAgent().uaIdentifier())
|
||||
.header("Accept-Encoding", "gzip")
|
||||
.header("Accept", "text/*, */*;q=0.9")
|
||||
.timeout(Duration.ofSeconds(15))
|
||||
;
|
||||
var requestBuilder = ClassicRequestBuilder.get(uri)
|
||||
.setHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier())
|
||||
.setHeader("Accept-Encoding", "gzip")
|
||||
.setHeader("Accept", "text/*, */*;q=0.9");
|
||||
|
||||
// Set the If-Modified-Since or If-None-Match headers if we have them
|
||||
// though since there are certain idiosyncrasies in server implementations,
|
||||
// we avoid setting both at the same time as that may turn a 304 into a 200.
|
||||
if (ifNoneMatchTag != null) {
|
||||
requestBuilder.header("If-None-Match", ifNoneMatchTag);
|
||||
requestBuilder.addHeader("If-None-Match", ifNoneMatchTag);
|
||||
} else if (ifModifiedSinceDate != null) {
|
||||
requestBuilder.header("If-Modified-Since", ifModifiedSinceDate);
|
||||
requestBuilder.addHeader("If-Modified-Since", ifModifiedSinceDate);
|
||||
}
|
||||
|
||||
return httpClient.execute(requestBuilder.build(), rsp -> {
|
||||
try {
|
||||
logger.info("Code: {}, URL: {}", rsp.getCode(), uri);
|
||||
|
||||
HttpRequest getRequest = requestBuilder.build();
|
||||
switch (rsp.getCode()) {
|
||||
case 200 -> {
|
||||
if (rsp.getEntity() == null) {
|
||||
return new FetchResult.TransientError(); // No content to read, treat as transient error
|
||||
}
|
||||
byte[] responseData = EntityUtils.toByteArray(rsp.getEntity());
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
// Decode the response body based on the Content-Type header
|
||||
Header contentTypeHeader = rsp.getFirstHeader("Content-Type");
|
||||
if (contentTypeHeader == null) {
|
||||
return new FetchResult.TransientError();
|
||||
}
|
||||
String contentType = contentTypeHeader.getValue();
|
||||
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), responseData);
|
||||
|
||||
/* Note we need to use an executor to time-limit the send() method in HttpClient, as
|
||||
* its support for timeouts only applies to the time until response starts to be received,
|
||||
* and does not catch the case when the server starts to send data but then hangs.
|
||||
*/
|
||||
HttpResponse<byte[]> rs = executorService.submit(
|
||||
() -> client.send(getRequest, HttpResponse.BodyHandlers.ofByteArray()))
|
||||
.get(15, TimeUnit.SECONDS);
|
||||
// Grab the ETag header if it exists
|
||||
Header etagHeader = rsp.getFirstHeader("ETag");
|
||||
String newEtagValue = etagHeader == null ? null : etagHeader.getValue();
|
||||
|
||||
if (rs.statusCode() == 429) { // Too Many Requests
|
||||
int retryAfter = Integer.parseInt(rs.headers().firstValue("Retry-After").orElse("2"));
|
||||
Thread.sleep(Duration.ofSeconds(Math.clamp(retryAfter, 1, 5)));
|
||||
continue;
|
||||
}
|
||||
|
||||
String newEtagValue = rs.headers().firstValue("ETag").orElse("");
|
||||
|
||||
return switch (rs.statusCode()) {
|
||||
case 200 -> {
|
||||
byte[] responseData = getResponseData(rs);
|
||||
|
||||
String contentType = rs.headers().firstValue("Content-Type").orElse("");
|
||||
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), responseData);
|
||||
|
||||
yield new FetchResult.Success(bodyText, newEtagValue);
|
||||
return new FetchResult.Success(bodyText, newEtagValue);
|
||||
}
|
||||
case 304 -> {
|
||||
return new FetchResult.NotModified(); // via If-Modified-Since semantics
|
||||
}
|
||||
case 404 -> {
|
||||
return new FetchResult.PermanentError(); // never try again
|
||||
}
|
||||
default -> {
|
||||
return new FetchResult.TransientError(); // we try again later
|
||||
}
|
||||
}
|
||||
case 304 -> new FetchResult.NotModified(); // via If-Modified-Since semantics
|
||||
case 404 -> new FetchResult.PermanentError(); // never try again
|
||||
default -> new FetchResult.TransientError(); // we try again later
|
||||
};
|
||||
}
|
||||
}
|
||||
catch (Exception ex) {
|
||||
return new FetchResult.PermanentError(); // treat as permanent error
|
||||
}
|
||||
finally {
|
||||
EntityUtils.consumeQuietly(rsp.getEntity());
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.debug("Error fetching feed", ex);
|
||||
@@ -285,19 +379,6 @@ public class FeedFetcherService {
|
||||
return new FetchResult.TransientError();
|
||||
}
|
||||
|
||||
private byte[] getResponseData(HttpResponse<byte[]> response) throws IOException {
|
||||
String encoding = response.headers().firstValue("Content-Encoding").orElse("");
|
||||
|
||||
if ("gzip".equals(encoding)) {
|
||||
try (var stream = new GZIPInputStream(new ByteArrayInputStream(response.body()))) {
|
||||
return stream.readAllBytes();
|
||||
}
|
||||
}
|
||||
else {
|
||||
return response.body();
|
||||
}
|
||||
}
|
||||
|
||||
public sealed interface FetchResult {
|
||||
record Success(String value, String etag) implements FetchResult {}
|
||||
record NotModified() implements FetchResult {}
|
||||
|
@@ -5,6 +5,8 @@ import com.google.inject.Guice;
|
||||
import com.google.inject.name.Names;
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import nu.marginalia.coordination.DomainCoordinator;
|
||||
import nu.marginalia.coordination.LocalDomainCoordinator;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.rss.db.FeedDb;
|
||||
import nu.marginalia.rss.model.FeedItems;
|
||||
@@ -82,6 +84,7 @@ class FeedFetcherServiceTest extends AbstractModule {
|
||||
}
|
||||
|
||||
public void configure() {
|
||||
bind(DomainCoordinator.class).to(LocalDomainCoordinator.class);
|
||||
bind(HikariDataSource.class).toInstance(dataSource);
|
||||
bind(ServiceRegistryIf.class).toInstance(Mockito.mock(ServiceRegistryIf.class));
|
||||
bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration(ServiceId.Executor, 1, "", "", 0, UUID.randomUUID()));
|
||||
|
@@ -19,6 +19,8 @@ import nu.marginalia.model.crawldata.CrawlerDocumentStatus;
|
||||
import nu.marginalia.model.idx.WordFlags;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.Marker;
|
||||
import org.slf4j.MarkerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
@@ -37,6 +39,7 @@ public class DocumentProcessor {
|
||||
"text/plain",
|
||||
"application/pdf");
|
||||
|
||||
private final Marker converterAuditMarker = MarkerFactory.getMarker("CONVERTER");
|
||||
|
||||
private final List<AbstractDocumentProcessorPlugin> processorPlugins = new ArrayList<>();
|
||||
private final AnchorTextKeywords anchorTextKeywords;
|
||||
@@ -81,12 +84,13 @@ public class DocumentProcessor {
|
||||
catch (DisqualifiedException ex) {
|
||||
ret.state = UrlIndexingState.DISQUALIFIED;
|
||||
ret.stateReason = ex.reason.toString();
|
||||
logger.debug("Disqualified {}: {}", ret.url, ex.reason);
|
||||
logger.info(converterAuditMarker, "Disqualified {}: {}", ret.url, ex.reason);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
ret.state = UrlIndexingState.DISQUALIFIED;
|
||||
ret.stateReason = DisqualifiedException.DisqualificationReason.PROCESSING_EXCEPTION.toString();
|
||||
logger.info("Failed to convert " + crawledDocument.url, ex);
|
||||
logger.info(converterAuditMarker, "Failed to convert {}: {}", crawledDocument.url, ex.getClass().getSimpleName());
|
||||
logger.warn(converterAuditMarker, "Failed to convert " + crawledDocument.url, ex);
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
@@ -3,7 +3,6 @@ package nu.marginalia.converting.processor.logic;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import com.google.inject.name.Named;
|
||||
import nu.marginalia.converting.model.DisqualifiedException;
|
||||
import nu.marginalia.language.model.DocumentLanguageData;
|
||||
|
||||
@Singleton
|
||||
@@ -26,12 +25,9 @@ public class DocumentLengthLogic {
|
||||
return (int) Math.round((totalWords / (double) numSentences) / 4.);
|
||||
}
|
||||
|
||||
public void validateLength(DocumentLanguageData dld,
|
||||
double modifier) throws DisqualifiedException
|
||||
public boolean validateLength(DocumentLanguageData dld, double modifier)
|
||||
{
|
||||
if (modifier * dld.totalNumWords() < minDocumentLength) {
|
||||
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LENGTH);
|
||||
}
|
||||
return modifier * dld.totalNumWords() >= minDocumentLength;
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -68,6 +68,7 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
|
||||
private final HtmlProcessorSpecializations htmlProcessorSpecializations;
|
||||
|
||||
private static final int MAX_DOCUMENT_LENGTH_BYTES = Integer.getInteger("converter.max-body-length",128_000);
|
||||
private static boolean lenientProcessing = Boolean.getBoolean("converter.lenientProcessing");
|
||||
|
||||
@Inject
|
||||
public HtmlDocumentProcessorPlugin(
|
||||
@@ -108,13 +109,13 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
|
||||
DocumentClass documentClass)
|
||||
throws DisqualifiedException, URISyntaxException, IOException {
|
||||
|
||||
if (languageFilter.isBlockedUnicodeRange(crawledDocument.documentBody(512))) {
|
||||
if (!lenientProcessing && languageFilter.isBlockedUnicodeRange(crawledDocument.documentBody(512))) {
|
||||
throw new DisqualifiedException(DisqualificationReason.LANGUAGE);
|
||||
}
|
||||
|
||||
Document doc = crawledDocument.parseBody();
|
||||
|
||||
if (AcceptableAds.hasAcceptableAdsTag(doc)) {
|
||||
if (!lenientProcessing && AcceptableAds.hasAcceptableAdsTag(doc)) {
|
||||
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.ACCEPTABLE_ADS);
|
||||
}
|
||||
|
||||
@@ -129,25 +130,27 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
|
||||
|
||||
final var specialization = htmlProcessorSpecializations.select(generatorParts, url);
|
||||
|
||||
if (!specialization.shouldIndex(url)) {
|
||||
if (!lenientProcessing && !specialization.shouldIndex(url)) {
|
||||
throw new DisqualifiedException(DisqualificationReason.IRRELEVANT);
|
||||
}
|
||||
|
||||
var prunedDoc = specialization.prune(doc);
|
||||
|
||||
|
||||
final int length = getLength(doc);
|
||||
final DocumentFormat format = getDocumentFormat(doc);
|
||||
final double quality = documentValuator.getQuality(crawledDocument, format, doc, length);
|
||||
|
||||
if (isDisqualified(documentClass, url, quality, doc.title())) {
|
||||
if (!lenientProcessing && isDisqualified(documentClass, url, quality, doc.title())) {
|
||||
throw new DisqualifiedException(DisqualificationReason.QUALITY);
|
||||
}
|
||||
|
||||
DocumentLanguageData dld = sentenceExtractorProvider.get().extractSentences(prunedDoc);
|
||||
|
||||
checkDocumentLanguage(dld);
|
||||
documentLengthLogic.validateLength(dld, specialization.lengthModifier() * documentClass.lengthLimitModifier());
|
||||
|
||||
if (!lenientProcessing && !documentLengthLogic.validateLength(dld, specialization.lengthModifier() * documentClass.lengthLimitModifier())) {
|
||||
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LENGTH);
|
||||
}
|
||||
|
||||
var ret = new ProcessedDocumentDetails();
|
||||
|
||||
|
@@ -43,6 +43,7 @@ public class PdfDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
|
||||
private final DefaultSpecialization defaultSpecialization;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(PdfDocumentProcessorPlugin.class);
|
||||
private static boolean lenientProcessing = Boolean.getBoolean("converter.lenientProcessing");
|
||||
|
||||
@Inject
|
||||
public PdfDocumentProcessorPlugin(@Named("max-title-length") Integer maxTitleLength,
|
||||
@@ -81,7 +82,7 @@ public class PdfDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
|
||||
|
||||
String documentBody = crawledDocument.documentBody();
|
||||
|
||||
if (languageFilter.isBlockedUnicodeRange(documentBody)) {
|
||||
if (!lenientProcessing && languageFilter.isBlockedUnicodeRange(documentBody)) {
|
||||
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LANGUAGE);
|
||||
}
|
||||
|
||||
@@ -100,7 +101,9 @@ public class PdfDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
|
||||
|
||||
checkDocumentLanguage(dld);
|
||||
|
||||
documentLengthLogic.validateLength(dld, 1.0);
|
||||
if (!lenientProcessing && !documentLengthLogic.validateLength(dld, 1.0)) {
|
||||
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LENGTH);
|
||||
}
|
||||
|
||||
var ret = new ProcessedDocumentDetails();
|
||||
|
||||
|
@@ -37,6 +37,8 @@ public class PlainTextDocumentProcessorPlugin extends AbstractDocumentProcessorP
|
||||
private final ThreadLocalSentenceExtractorProvider sentenceExtractorProvider;
|
||||
private final DocumentLengthLogic documentLengthLogic;
|
||||
|
||||
private static boolean lenientProcessing = Boolean.getBoolean("converter.lenientProcessing");
|
||||
|
||||
|
||||
@Inject
|
||||
public PlainTextDocumentProcessorPlugin(@Named("max-title-length") Integer maxTitleLength,
|
||||
@@ -73,7 +75,7 @@ public class PlainTextDocumentProcessorPlugin extends AbstractDocumentProcessorP
|
||||
|
||||
String documentBody = crawledDocument.documentBody();
|
||||
|
||||
if (languageFilter.isBlockedUnicodeRange(documentBody)) {
|
||||
if (!lenientProcessing && languageFilter.isBlockedUnicodeRange(documentBody)) {
|
||||
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LANGUAGE);
|
||||
}
|
||||
|
||||
@@ -83,7 +85,9 @@ public class PlainTextDocumentProcessorPlugin extends AbstractDocumentProcessorP
|
||||
|
||||
checkDocumentLanguage(dld);
|
||||
|
||||
documentLengthLogic.validateLength(dld, 1.0);
|
||||
if (!lenientProcessing && !documentLengthLogic.validateLength(dld, 1.0)) {
|
||||
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LENGTH);
|
||||
}
|
||||
|
||||
var ret = new ProcessedDocumentDetails();
|
||||
|
||||
|
@@ -28,6 +28,8 @@ public final class CrawledDocument implements SerializableCrawlData {
|
||||
@Nullable
|
||||
public String headers;
|
||||
|
||||
private static int MAX_LENGTH_BYTES = 500_000;
|
||||
|
||||
public String documentBody() {
|
||||
return DocumentBodyToString.getStringData(
|
||||
ContentType.parse(contentType),
|
||||
@@ -65,7 +67,7 @@ public final class CrawledDocument implements SerializableCrawlData {
|
||||
return DocumentBodyToString.getParsedData(
|
||||
ContentType.parse(contentType),
|
||||
documentBodyBytes,
|
||||
200_000,
|
||||
MAX_LENGTH_BYTES,
|
||||
url);
|
||||
}
|
||||
|
||||
|
@@ -50,6 +50,7 @@ dependencies {
|
||||
|
||||
implementation libs.notnull
|
||||
implementation libs.guava
|
||||
implementation libs.httpclient
|
||||
implementation dependencies.create(libs.guice.get()) {
|
||||
exclude group: 'com.google.guava'
|
||||
}
|
||||
|
@@ -15,6 +15,7 @@ import nu.marginalia.coordination.DomainCoordinator;
|
||||
import nu.marginalia.db.DbDomainQueries;
|
||||
import nu.marginalia.db.DomainBlacklist;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.livecrawler.io.HttpClientProvider;
|
||||
import nu.marginalia.loading.LoaderInputData;
|
||||
import nu.marginalia.loading.documents.DocumentLoaderService;
|
||||
import nu.marginalia.loading.documents.KeywordLoaderService;
|
||||
@@ -32,12 +33,15 @@ import nu.marginalia.service.module.ServiceDiscoveryModule;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorageBaseType;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
||||
import org.apache.hc.core5.io.CloseMode;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.security.Security;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.HashMap;
|
||||
@@ -74,7 +78,9 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
||||
DomainProcessor domainProcessor,
|
||||
FileStorageService fileStorageService,
|
||||
KeywordLoaderService keywordLoaderService,
|
||||
DocumentLoaderService documentLoaderService, DomainCoordinator domainCoordinator, HikariDataSource dataSource)
|
||||
DocumentLoaderService documentLoaderService,
|
||||
DomainCoordinator domainCoordinator,
|
||||
HikariDataSource dataSource)
|
||||
throws Exception
|
||||
{
|
||||
super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX);
|
||||
@@ -148,7 +154,10 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
||||
}
|
||||
|
||||
private void run() throws Exception {
|
||||
Path basePath = fileStorageService.getStorageBase(FileStorageBaseType.STORAGE).asPath().resolve("live-crawl-data");
|
||||
Path basePath = fileStorageService
|
||||
.getStorageBase(FileStorageBaseType.STORAGE)
|
||||
.asPath()
|
||||
.resolve("live-crawl-data");
|
||||
|
||||
if (!Files.isDirectory(basePath)) {
|
||||
Files.createDirectories(basePath);
|
||||
@@ -163,21 +172,38 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
||||
{
|
||||
final Instant cutoff = Instant.now().minus(60, ChronoUnit.DAYS);
|
||||
|
||||
/* ------------------------------------------------ */
|
||||
/* Fetch the latest domains from the feeds database */
|
||||
/* ------------------------------------------------ */
|
||||
|
||||
processHeartbeat.progress(LiveCrawlState.FETCH_LINKS);
|
||||
|
||||
Map<String, List<String>> urlsPerDomain = new HashMap<>(10_000);
|
||||
if (!feedsClient.waitReady(Duration.ofHours(1))) {
|
||||
throw new RuntimeException("Feeds client never became ready, cannot proceed with live crawling");
|
||||
}
|
||||
feedsClient.getUpdatedDomains(cutoff, urlsPerDomain::put);
|
||||
|
||||
logger.info("Fetched data for {} domains", urlsPerDomain.size());
|
||||
|
||||
|
||||
/* ------------------------------------- */
|
||||
/* Prune the database from old entries */
|
||||
/* ------------------------------------- */
|
||||
|
||||
processHeartbeat.progress(LiveCrawlState.PRUNE_DB);
|
||||
|
||||
// Remove data that is too old
|
||||
dataSet.prune(cutoff);
|
||||
|
||||
|
||||
/* ------------------------------------- */
|
||||
/* Fetch the links for each domain */
|
||||
/* ------------------------------------- */
|
||||
|
||||
processHeartbeat.progress(LiveCrawlState.CRAWLING);
|
||||
|
||||
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainCoordinator, domainQueries, domainBlacklist);
|
||||
CloseableHttpClient client = HttpClientProvider.createClient();
|
||||
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainCoordinator, domainQueries, client, domainBlacklist);
|
||||
var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling"))
|
||||
{
|
||||
for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) {
|
||||
@@ -190,18 +216,29 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
||||
fetcher.scheduleRetrieval(domain, urls);
|
||||
}
|
||||
}
|
||||
finally {
|
||||
client.close(CloseMode.GRACEFUL);
|
||||
}
|
||||
|
||||
Path tempPath = dataSet.createWorkDir();
|
||||
|
||||
|
||||
try {
|
||||
/* ------------------------------------- */
|
||||
/* Process the fetched links */
|
||||
/* ------------------------------------- */
|
||||
|
||||
processHeartbeat.progress(LiveCrawlState.PROCESSING);
|
||||
|
||||
try (var hb = heartbeat.createAdHocTaskHeartbeat("Processing");
|
||||
var writer = new ConverterBatchWriter(tempPath, 0)
|
||||
) {
|
||||
// Offset the documents' ordinals toward the upper range, to avoid an ID collisions with the
|
||||
// main indexes (the maximum permissible for doc ordinal is value is 67_108_863, so this
|
||||
// leaves us with a lot of headroom still)
|
||||
// We need unique document ids that do not collide with the document id from the main index,
|
||||
// so we offset the documents' ordinals toward the upper range.
|
||||
//
|
||||
// The maximum permissible for doc ordinal is value is 67_108_863,
|
||||
// so this leaves us with a lot of headroom still!
|
||||
// Expected document count here is order of 10 :^)
|
||||
writer.setOrdinalOffset(67_000_000);
|
||||
|
||||
for (SerializableCrawlDataStream stream : hb.wrap("Processing", dataSet.getDataStreams())) {
|
||||
@@ -209,10 +246,15 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* ---------------------------------------------- */
|
||||
/* Load the processed data into the link database */
|
||||
/* and construct an index journal for the docs */
|
||||
/* ---------------------------------------------- */
|
||||
|
||||
processHeartbeat.progress(LiveCrawlState.LOADING);
|
||||
|
||||
LoaderInputData lid = new LoaderInputData(tempPath, 1);
|
||||
|
||||
DomainIdRegistry domainIdRegistry = new DbDomainIdRegistry(dataSource);
|
||||
|
||||
keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, lid);
|
||||
@@ -224,9 +266,16 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
||||
FileUtils.deleteDirectory(tempPath.toFile());
|
||||
}
|
||||
|
||||
// Construct the index
|
||||
|
||||
/* ------------------------------------- */
|
||||
/* Finish up */
|
||||
/* ------------------------------------- */
|
||||
|
||||
processHeartbeat.progress(LiveCrawlState.DONE);
|
||||
|
||||
// After we return from here, the LiveCrawlActor will trigger an index construction
|
||||
// job. Unlike all the stuff we did in this process, it's identical to the real job
|
||||
// so we don't need to do anything special from this process
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -7,7 +7,6 @@ import nu.marginalia.contenttype.ContentType;
|
||||
import nu.marginalia.contenttype.DocumentBodyToString;
|
||||
import nu.marginalia.coordination.DomainCoordinator;
|
||||
import nu.marginalia.coordination.DomainLock;
|
||||
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
|
||||
import nu.marginalia.crawl.retreival.CrawlDelayTimer;
|
||||
import nu.marginalia.db.DbDomainQueries;
|
||||
import nu.marginalia.db.DomainBlacklist;
|
||||
@@ -15,24 +14,21 @@ import nu.marginalia.link_parser.LinkParser;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.model.EdgeUrl;
|
||||
import nu.marginalia.util.SimpleBlockingThreadPool;
|
||||
import org.apache.hc.client5.http.classic.HttpClient;
|
||||
import org.apache.hc.core5.http.ClassicHttpRequest;
|
||||
import org.apache.hc.core5.http.io.entity.EntityUtils;
|
||||
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpHeaders;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
/** A simple link scraper that fetches URLs and stores them in a database,
|
||||
* with no concept of a crawl frontier, WARC output, or other advanced features
|
||||
@@ -45,20 +41,21 @@ public class SimpleLinkScraper implements AutoCloseable {
|
||||
private final LiveCrawlDataSet dataSet;
|
||||
private final DbDomainQueries domainQueries;
|
||||
private final DomainBlacklist domainBlacklist;
|
||||
private final Duration connectTimeout = Duration.ofSeconds(10);
|
||||
private final Duration readTimeout = Duration.ofSeconds(10);
|
||||
private final DomainCoordinator domainCoordinator;
|
||||
|
||||
private final static int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 10 * 1024 * 1024);
|
||||
private final HttpClient httpClient;
|
||||
|
||||
public SimpleLinkScraper(LiveCrawlDataSet dataSet,
|
||||
DomainCoordinator domainCoordinator,
|
||||
DbDomainQueries domainQueries,
|
||||
HttpClient httpClient,
|
||||
DomainBlacklist domainBlacklist) {
|
||||
this.dataSet = dataSet;
|
||||
this.domainCoordinator = domainCoordinator;
|
||||
this.domainQueries = domainQueries;
|
||||
this.domainBlacklist = domainBlacklist;
|
||||
this.httpClient = httpClient;
|
||||
}
|
||||
|
||||
public void scheduleRetrieval(EdgeDomain domain, List<String> urls) {
|
||||
@@ -75,17 +72,19 @@ public class SimpleLinkScraper implements AutoCloseable {
|
||||
|
||||
EdgeUrl rootUrl = domain.toRootUrlHttps();
|
||||
|
||||
List<EdgeUrl> relevantUrls = new ArrayList<>();
|
||||
List<EdgeUrl> relevantUrls = new ArrayList<>(Math.max(1, urls.size()));
|
||||
|
||||
// Resolve absolute URLs
|
||||
for (var url : urls) {
|
||||
Optional<EdgeUrl> optParsedUrl = lp.parseLink(rootUrl, url);
|
||||
if (optParsedUrl.isEmpty()) {
|
||||
|
||||
if (optParsedUrl.isEmpty())
|
||||
continue;
|
||||
}
|
||||
if (dataSet.hasUrl(optParsedUrl.get())) {
|
||||
continue;
|
||||
}
|
||||
relevantUrls.add(optParsedUrl.get());
|
||||
|
||||
EdgeUrl absoluteUrl = optParsedUrl.get();
|
||||
|
||||
if (!dataSet.hasUrl(absoluteUrl))
|
||||
relevantUrls.add(absoluteUrl);
|
||||
}
|
||||
|
||||
if (relevantUrls.isEmpty()) {
|
||||
@@ -94,16 +93,10 @@ public class SimpleLinkScraper implements AutoCloseable {
|
||||
|
||||
int fetched = 0;
|
||||
|
||||
try (HttpClient client = HttpClient
|
||||
.newBuilder()
|
||||
.connectTimeout(connectTimeout)
|
||||
.followRedirects(HttpClient.Redirect.NEVER)
|
||||
.version(HttpClient.Version.HTTP_2)
|
||||
.build();
|
||||
// throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove:
|
||||
try (// throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove:
|
||||
DomainLock lock = domainCoordinator.lockDomain(domain)
|
||||
) {
|
||||
SimpleRobotRules rules = fetchRobotsRules(rootUrl, client);
|
||||
SimpleRobotRules rules = fetchRobotsRules(rootUrl);
|
||||
|
||||
if (rules == null) { // I/O error fetching robots.txt
|
||||
// If we can't fetch the robots.txt,
|
||||
@@ -116,18 +109,19 @@ public class SimpleLinkScraper implements AutoCloseable {
|
||||
CrawlDelayTimer timer = new CrawlDelayTimer(rules.getCrawlDelay());
|
||||
|
||||
for (var parsedUrl : relevantUrls) {
|
||||
|
||||
if (!rules.isAllowed(parsedUrl.toString())) {
|
||||
maybeFlagAsBad(parsedUrl);
|
||||
continue;
|
||||
}
|
||||
|
||||
switch (fetchUrl(domainId, parsedUrl, timer, client)) {
|
||||
switch (fetchUrl(domainId, parsedUrl, timer)) {
|
||||
case FetchResult.Success(int id, EdgeUrl docUrl, String body, String headers) -> {
|
||||
dataSet.saveDocument(id, docUrl, body, headers, "");
|
||||
fetched++;
|
||||
}
|
||||
case FetchResult.Error(EdgeUrl docUrl) -> maybeFlagAsBad(docUrl);
|
||||
case FetchResult.Error(EdgeUrl docUrl) -> {
|
||||
maybeFlagAsBad(docUrl);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -150,111 +144,107 @@ public class SimpleLinkScraper implements AutoCloseable {
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private SimpleRobotRules fetchRobotsRules(EdgeUrl rootUrl, HttpClient client) throws IOException, InterruptedException, URISyntaxException {
|
||||
var robotsRequest = HttpRequest.newBuilder(rootUrl.withPathAndParam("/robots.txt", null).asURI())
|
||||
.GET()
|
||||
.header("User-Agent", WmsaHome.getUserAgent().uaString())
|
||||
.header("Accept-Encoding","gzip")
|
||||
.timeout(readTimeout);
|
||||
|
||||
// Fetch the robots.txt
|
||||
private SimpleRobotRules fetchRobotsRules(EdgeUrl rootUrl) throws URISyntaxException {
|
||||
ClassicHttpRequest request = ClassicRequestBuilder.get(rootUrl.withPathAndParam("/robots.txt", null).asURI())
|
||||
.setHeader("User-Agent", WmsaHome.getUserAgent().uaString())
|
||||
.setHeader("Accept-Encoding", "gzip")
|
||||
.build();
|
||||
|
||||
try {
|
||||
SimpleRobotRulesParser parser = new SimpleRobotRulesParser();
|
||||
HttpResponse<byte[]> robotsTxt = client.send(robotsRequest.build(), HttpResponse.BodyHandlers.ofByteArray());
|
||||
|
||||
if (robotsTxt.statusCode() == 200) {
|
||||
return parser.parseContent(rootUrl.toString(),
|
||||
getResponseData(robotsTxt),
|
||||
robotsTxt.headers().firstValue("Content-Type").orElse("text/plain"),
|
||||
WmsaHome.getUserAgent().uaIdentifier());
|
||||
return httpClient.execute(request, rsp -> {
|
||||
if (rsp.getEntity() == null) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
if (rsp.getCode() == 200) {
|
||||
var contentTypeHeader = rsp.getFirstHeader("Content-Type");
|
||||
if (contentTypeHeader == null) {
|
||||
return null; // No content type header, can't parse
|
||||
}
|
||||
return new SimpleRobotRulesParser().parseContent(
|
||||
rootUrl.toString(),
|
||||
EntityUtils.toByteArray(rsp.getEntity()),
|
||||
contentTypeHeader.getValue(),
|
||||
WmsaHome.getUserAgent().uaIdentifier()
|
||||
);
|
||||
} else if (rsp.getCode() == 404) {
|
||||
return new SimpleRobotRules(SimpleRobotRules.RobotRulesMode.ALLOW_ALL);
|
||||
}
|
||||
} finally {
|
||||
EntityUtils.consumeQuietly(rsp.getEntity());
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
catch (IOException e) {
|
||||
logger.error("Error fetching robots.txt for {}: {}", rootUrl, e.getMessage());
|
||||
return null; // I/O error fetching robots.txt
|
||||
}
|
||||
finally {
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
}
|
||||
else if (robotsTxt.statusCode() == 404) {
|
||||
return new SimpleRobotRules(SimpleRobotRules.RobotRulesMode.ALLOW_ALL);
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
catch (IOException ex) {
|
||||
logger.error("Error fetching robots.txt for {}: {} {}", rootUrl, ex.getClass().getSimpleName(), ex.getMessage());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/** Fetch a URL and store it in the database
|
||||
*/
|
||||
private FetchResult fetchUrl(int domainId, EdgeUrl parsedUrl, CrawlDelayTimer timer, HttpClient client) throws Exception {
|
||||
private FetchResult fetchUrl(int domainId, EdgeUrl parsedUrl, CrawlDelayTimer timer) throws Exception {
|
||||
|
||||
timer.waitFetchDelay();
|
||||
|
||||
HttpRequest request = HttpRequest.newBuilder(parsedUrl.asURI())
|
||||
.GET()
|
||||
.header("User-Agent", WmsaHome.getUserAgent().uaString())
|
||||
.header("Accept", "text/html")
|
||||
.header("Accept-Encoding", "gzip")
|
||||
.timeout(readTimeout)
|
||||
ClassicHttpRequest request = ClassicRequestBuilder.get(parsedUrl.asURI())
|
||||
.setHeader("User-Agent", WmsaHome.getUserAgent().uaString())
|
||||
.setHeader("Accept", "text/html")
|
||||
.setHeader("Accept-Encoding", "gzip")
|
||||
.build();
|
||||
|
||||
try {
|
||||
HttpResponse<byte[]> response = client.send(request, HttpResponse.BodyHandlers.ofByteArray());
|
||||
return httpClient.execute(request, rsp -> {
|
||||
try {
|
||||
if (rsp.getCode() == 200) {
|
||||
String contentType = rsp.getFirstHeader("Content-Type").getValue();
|
||||
if (!contentType.toLowerCase().startsWith("text/html")) {
|
||||
return new FetchResult.Error(parsedUrl);
|
||||
}
|
||||
|
||||
// Handle rate limiting by waiting and retrying once
|
||||
if (response.statusCode() == 429) {
|
||||
timer.waitRetryDelay(new HttpFetcherImpl.RateLimitException(
|
||||
response.headers().firstValue("Retry-After").orElse("5")
|
||||
));
|
||||
response = client.send(request, HttpResponse.BodyHandlers.ofByteArray());
|
||||
}
|
||||
byte[] body = EntityUtils.toByteArray(rsp.getEntity(), MAX_SIZE);
|
||||
|
||||
String contentType = response.headers().firstValue("Content-Type").orElse("").toLowerCase();
|
||||
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), body);
|
||||
|
||||
if (response.statusCode() == 200) {
|
||||
if (!contentType.toLowerCase().startsWith("text/html")) {
|
||||
return new FetchResult.Error(parsedUrl);
|
||||
StringBuilder headersStr = new StringBuilder();
|
||||
for (var header : rsp.getHeaders()) {
|
||||
headersStr.append(header.getName()).append(": ").append(header.getValue()).append("\n");
|
||||
}
|
||||
|
||||
return new FetchResult.Success(domainId, parsedUrl, bodyText, headersStr.toString());
|
||||
}
|
||||
} finally {
|
||||
if (rsp.getEntity() != null) {
|
||||
EntityUtils.consumeQuietly(rsp.getEntity());
|
||||
}
|
||||
}
|
||||
|
||||
byte[] body = getResponseData(response);
|
||||
if (body.length > MAX_SIZE) {
|
||||
return new FetchResult.Error(parsedUrl);
|
||||
}
|
||||
|
||||
String bodyText = DocumentBodyToString.getStringData(ContentType.parse(contentType), body);
|
||||
|
||||
return new FetchResult.Success(domainId, parsedUrl, bodyText, headersToString(response.headers()));
|
||||
}
|
||||
return new FetchResult.Error(parsedUrl);
|
||||
});
|
||||
}
|
||||
catch (IOException ex) {
|
||||
// We don't want a full stack trace on every error, as it's quite common and very noisy
|
||||
logger.error("Error fetching URL {}: {} {}", parsedUrl, ex.getClass().getSimpleName(), ex.getMessage());
|
||||
catch (IOException e) {
|
||||
logger.error("Error fetching {}: {}", parsedUrl, e.getMessage());
|
||||
// If we can't fetch the URL, we return an error result
|
||||
// so that the caller can decide what to do with it.
|
||||
}
|
||||
finally {
|
||||
timer.waitFetchDelay();
|
||||
}
|
||||
|
||||
return new FetchResult.Error(parsedUrl);
|
||||
}
|
||||
|
||||
private byte[] getResponseData(HttpResponse<byte[]> response) throws IOException {
|
||||
String encoding = response.headers().firstValue("Content-Encoding").orElse("");
|
||||
|
||||
if ("gzip".equals(encoding)) {
|
||||
try (var stream = new GZIPInputStream(new ByteArrayInputStream(response.body()))) {
|
||||
return stream.readAllBytes();
|
||||
}
|
||||
}
|
||||
else {
|
||||
return response.body();
|
||||
}
|
||||
}
|
||||
|
||||
sealed interface FetchResult {
|
||||
record Success(int domainId, EdgeUrl url, String body, String headers) implements FetchResult {}
|
||||
record Error(EdgeUrl url) implements FetchResult {}
|
||||
}
|
||||
|
||||
private String headersToString(HttpHeaders headers) {
|
||||
StringBuilder headersStr = new StringBuilder();
|
||||
headers.map().forEach((k, v) -> {
|
||||
headersStr.append(k).append(": ").append(v).append("\n");
|
||||
});
|
||||
return headersStr.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
pool.shutDown();
|
||||
|
@@ -0,0 +1,126 @@
|
||||
package nu.marginalia.livecrawler.io;
|
||||
|
||||
import com.google.inject.Provider;
|
||||
import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
|
||||
import org.apache.hc.client5.http.classic.HttpClient;
|
||||
import org.apache.hc.client5.http.config.ConnectionConfig;
|
||||
import org.apache.hc.client5.http.config.RequestConfig;
|
||||
import org.apache.hc.client5.http.cookie.StandardCookieSpec;
|
||||
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
||||
import org.apache.hc.client5.http.impl.classic.HttpClients;
|
||||
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
|
||||
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
|
||||
import org.apache.hc.core5.http.HeaderElement;
|
||||
import org.apache.hc.core5.http.HeaderElements;
|
||||
import org.apache.hc.core5.http.HttpResponse;
|
||||
import org.apache.hc.core5.http.io.SocketConfig;
|
||||
import org.apache.hc.core5.http.message.MessageSupport;
|
||||
import org.apache.hc.core5.http.protocol.HttpContext;
|
||||
import org.apache.hc.core5.util.TimeValue;
|
||||
import org.apache.hc.core5.util.Timeout;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class HttpClientProvider implements Provider<HttpClient> {
|
||||
private static final HttpClient client;
|
||||
private static PoolingHttpClientConnectionManager connectionManager;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(HttpClientProvider.class);
|
||||
|
||||
static {
|
||||
try {
|
||||
client = createClient();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static CloseableHttpClient createClient() throws NoSuchAlgorithmException, KeyManagementException {
|
||||
final ConnectionConfig connectionConfig = ConnectionConfig.custom()
|
||||
.setSocketTimeout(15, TimeUnit.SECONDS)
|
||||
.setConnectTimeout(15, TimeUnit.SECONDS)
|
||||
.setValidateAfterInactivity(TimeValue.ofSeconds(5))
|
||||
.build();
|
||||
|
||||
|
||||
connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
|
||||
.setMaxConnPerRoute(2)
|
||||
.setMaxConnTotal(50)
|
||||
.setDefaultConnectionConfig(connectionConfig)
|
||||
.build();
|
||||
|
||||
connectionManager.setDefaultSocketConfig(SocketConfig.custom()
|
||||
.setSoLinger(TimeValue.ofSeconds(-1))
|
||||
.setSoTimeout(Timeout.ofSeconds(10))
|
||||
.build()
|
||||
);
|
||||
|
||||
Thread.ofPlatform().daemon(true).start(() -> {
|
||||
try {
|
||||
for (;;) {
|
||||
TimeUnit.SECONDS.sleep(15);
|
||||
logger.info("Connection pool stats: {}", connectionManager.getTotalStats());
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
});
|
||||
|
||||
final RequestConfig defaultRequestConfig = RequestConfig.custom()
|
||||
.setCookieSpec(StandardCookieSpec.IGNORE)
|
||||
.setResponseTimeout(10, TimeUnit.SECONDS)
|
||||
.setConnectionRequestTimeout(5, TimeUnit.MINUTES)
|
||||
.build();
|
||||
|
||||
return HttpClients.custom()
|
||||
.setConnectionManager(connectionManager)
|
||||
.setRetryStrategy(new RetryStrategy())
|
||||
.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
|
||||
// Default keep-alive duration is 3 minutes, but this is too long for us,
|
||||
// as we are either going to re-use it fairly quickly or close it for a long time.
|
||||
//
|
||||
// So we set it to 30 seconds or clamp the server-provided value to a minimum of 10 seconds.
|
||||
private static final TimeValue defaultValue = TimeValue.ofSeconds(30);
|
||||
|
||||
@Override
|
||||
public TimeValue getKeepAliveDuration(HttpResponse response, HttpContext context) {
|
||||
final Iterator<HeaderElement> it = MessageSupport.iterate(response, HeaderElements.KEEP_ALIVE);
|
||||
|
||||
while (it.hasNext()) {
|
||||
final HeaderElement he = it.next();
|
||||
final String param = he.getName();
|
||||
final String value = he.getValue();
|
||||
|
||||
if (value == null)
|
||||
continue;
|
||||
if (!"timeout".equalsIgnoreCase(param))
|
||||
continue;
|
||||
|
||||
try {
|
||||
long timeout = Long.parseLong(value);
|
||||
timeout = Math.clamp(timeout, 30, defaultValue.toSeconds());
|
||||
return TimeValue.ofSeconds(timeout);
|
||||
} catch (final NumberFormatException ignore) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return defaultValue;
|
||||
}
|
||||
})
|
||||
.disableRedirectHandling()
|
||||
.setDefaultRequestConfig(defaultRequestConfig)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpClient get() {
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
@@ -0,0 +1,79 @@
|
||||
package nu.marginalia.livecrawler.io;
|
||||
|
||||
import org.apache.hc.client5.http.HttpHostConnectException;
|
||||
import org.apache.hc.client5.http.HttpRequestRetryStrategy;
|
||||
import org.apache.hc.core5.http.HttpRequest;
|
||||
import org.apache.hc.core5.http.HttpResponse;
|
||||
import org.apache.hc.core5.http.protocol.HttpContext;
|
||||
import org.apache.hc.core5.util.TimeValue;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.net.ssl.SSLException;
|
||||
import java.io.IOException;
|
||||
import java.net.SocketException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
public class RetryStrategy implements HttpRequestRetryStrategy {
|
||||
private static final Logger logger = LoggerFactory.getLogger(RetryStrategy.class);
|
||||
|
||||
@Override
|
||||
public boolean retryRequest(HttpRequest request, IOException exception, int executionCount, HttpContext context) {
|
||||
return switch (exception) {
|
||||
case SocketTimeoutException ste -> false;
|
||||
case SSLException ssle -> false;
|
||||
case UnknownHostException uhe -> false;
|
||||
case HttpHostConnectException ex -> executionCount < 2;
|
||||
case SocketException ex -> executionCount < 2;
|
||||
default -> executionCount <= 3;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean retryRequest(HttpResponse response, int executionCount, HttpContext context) {
|
||||
return switch (response.getCode()) {
|
||||
case 500, 503 -> executionCount <= 2;
|
||||
case 429 -> executionCount <= 3;
|
||||
default -> false;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue getRetryInterval(HttpRequest request, IOException exception, int executionCount, HttpContext context) {
|
||||
return TimeValue.ofSeconds(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue getRetryInterval(HttpResponse response, int executionCount, HttpContext context) {
|
||||
|
||||
int statusCode = response.getCode();
|
||||
|
||||
// Give 503 a bit more time
|
||||
if (statusCode == 503) return TimeValue.ofSeconds(5);
|
||||
|
||||
if (statusCode == 429) {
|
||||
// get the Retry-After header
|
||||
var retryAfterHeader = response.getFirstHeader("Retry-After");
|
||||
if (retryAfterHeader == null) {
|
||||
return TimeValue.ofSeconds(3);
|
||||
}
|
||||
|
||||
String retryAfter = retryAfterHeader.getValue();
|
||||
if (retryAfter == null) {
|
||||
return TimeValue.ofSeconds(2);
|
||||
}
|
||||
|
||||
try {
|
||||
int retryAfterTime = Integer.parseInt(retryAfter);
|
||||
retryAfterTime = Math.clamp(retryAfterTime, 1, 5);
|
||||
|
||||
return TimeValue.ofSeconds(retryAfterTime);
|
||||
} catch (NumberFormatException e) {
|
||||
logger.warn("Invalid Retry-After header: {}", retryAfter);
|
||||
}
|
||||
}
|
||||
|
||||
return TimeValue.ofSeconds(2);
|
||||
}
|
||||
}
|
@@ -3,10 +3,13 @@ package nu.marginalia.livecrawler;
|
||||
import nu.marginalia.coordination.LocalDomainCoordinator;
|
||||
import nu.marginalia.db.DomainBlacklistImpl;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.livecrawler.io.HttpClientProvider;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.model.EdgeUrl;
|
||||
import nu.marginalia.model.crawldata.CrawledDocument;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
||||
import org.apache.hc.core5.io.CloseMode;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
@@ -16,29 +19,34 @@ import org.mockito.Mockito;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.sql.SQLException;
|
||||
import java.util.List;
|
||||
|
||||
class SimpleLinkScraperTest {
|
||||
private Path tempDir;
|
||||
private LiveCrawlDataSet dataSet;
|
||||
private CloseableHttpClient httpClient;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws IOException, SQLException {
|
||||
public void setUp() throws IOException, SQLException, NoSuchAlgorithmException, KeyManagementException {
|
||||
tempDir = Files.createTempDirectory(getClass().getSimpleName());
|
||||
dataSet = new LiveCrawlDataSet(tempDir);
|
||||
httpClient = HttpClientProvider.createClient();
|
||||
}
|
||||
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() throws Exception {
|
||||
dataSet.close();
|
||||
httpClient.close(CloseMode.IMMEDIATE);
|
||||
FileUtils.deleteDirectory(tempDir.toFile());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetrieveNow() throws Exception {
|
||||
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(), null, Mockito.mock(DomainBlacklistImpl.class));
|
||||
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(), null, httpClient, Mockito.mock(DomainBlacklistImpl.class));
|
||||
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
|
||||
Assertions.assertEquals(1, fetched);
|
||||
|
||||
@@ -58,7 +66,7 @@ class SimpleLinkScraperTest {
|
||||
@Test
|
||||
public void testRetrieveNow_Redundant() throws Exception {
|
||||
dataSet.saveDocument(1, new EdgeUrl("https://www.marginalia.nu/"), "<html>", "", "127.0.0.1");
|
||||
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(),null, Mockito.mock(DomainBlacklistImpl.class));
|
||||
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(),null, httpClient, Mockito.mock(DomainBlacklistImpl.class));
|
||||
|
||||
// If the requested URL is already in the dataSet, we retrieveNow should shortcircuit and not fetch anything
|
||||
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
|
||||
|
@@ -40,6 +40,8 @@ public class LoaderMain extends ProcessMainClass {
|
||||
private final KeywordLoaderService keywordLoaderService;
|
||||
private final DocumentLoaderService documentLoaderService;
|
||||
|
||||
private static boolean insertFoundDomains = Boolean.getBoolean("loader.insertFoundDomains");
|
||||
|
||||
public static void main(String... args) {
|
||||
try {
|
||||
new org.mariadb.jdbc.Driver();
|
||||
@@ -99,14 +101,29 @@ public class LoaderMain extends ProcessMainClass {
|
||||
|
||||
try {
|
||||
var results = ForkJoinPool.commonPool()
|
||||
.invokeAll(
|
||||
List.of(
|
||||
() -> linksService.loadLinks(domainIdRegistry, heartbeat, inputData),
|
||||
() -> keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, inputData),
|
||||
() -> documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, inputData),
|
||||
() -> domainService.loadDomainMetadata(domainIdRegistry, heartbeat, inputData)
|
||||
)
|
||||
);
|
||||
.invokeAll(List.of());
|
||||
|
||||
if ( true == insertFoundDomains ) {
|
||||
results = ForkJoinPool.commonPool()
|
||||
.invokeAll(
|
||||
List.of(
|
||||
() -> linksService.loadLinks(domainIdRegistry, heartbeat, inputData),
|
||||
() -> keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, inputData),
|
||||
() -> documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, inputData),
|
||||
() -> domainService.loadDomainMetadata(domainIdRegistry, heartbeat, inputData)
|
||||
)
|
||||
);
|
||||
}
|
||||
else {
|
||||
results = ForkJoinPool.commonPool()
|
||||
.invokeAll(
|
||||
List.of(
|
||||
() -> keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, inputData),
|
||||
() -> documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, inputData),
|
||||
() -> domainService.loadDomainMetadata(domainIdRegistry, heartbeat, inputData)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
for (var result : results) {
|
||||
if (result.state() == Future.State.FAILED) {
|
||||
|
@@ -25,6 +25,8 @@ import java.util.Set;
|
||||
@Singleton
|
||||
public class DomainLoaderService {
|
||||
|
||||
private static boolean insertFoundDomains = Boolean.getBoolean("loader.insertFoundDomains");
|
||||
|
||||
private final HikariDataSource dataSource;
|
||||
private final Logger logger = LoggerFactory.getLogger(DomainLoaderService.class);
|
||||
private final int nodeId;
|
||||
@@ -84,25 +86,34 @@ public class DomainLoaderService {
|
||||
|
||||
// Add domains that are linked to from the domains we've just crawled, but with -1 affinity meaning they
|
||||
// can be grabbed by any index node
|
||||
try (var inserter = new DomainInserter(conn, -1);
|
||||
var processHeartbeat = heartbeat.createAdHocTaskHeartbeat("INSERT_LINKED_DOMAINS")) {
|
||||
// Add linked domains, but with -1 affinity meaning they can be grabbed by any index node
|
||||
int pageIdx = 0;
|
||||
if ( true == insertFoundDomains ) {
|
||||
logger.info("Adding found domains");
|
||||
|
||||
for (SlopTable.Ref<SlopDomainLinkRecord> page : inputData.listDomainLinkPages()) {
|
||||
processHeartbeat.progress("INSERT", pageIdx++, domainLinkPageRefs.size());
|
||||
try (var inserter = new DomainInserter(conn, -1);
|
||||
var processHeartbeat = heartbeat.createAdHocTaskHeartbeat("INSERT_LINKED_DOMAINS")) {
|
||||
// Add linked domains, but with -1 affinity meaning they can be grabbed by any index node
|
||||
int pageIdx = 0;
|
||||
|
||||
try (var reader = new SlopDomainLinkRecord.Reader(page)) {
|
||||
while (reader.hasMore()) {
|
||||
SlopDomainLinkRecord record = reader.next();
|
||||
String domainName = record.dest();
|
||||
if (domainNamesAll.add(domainName)) {
|
||||
inserter.accept(new EdgeDomain(domainName));
|
||||
for (SlopTable.Ref<SlopDomainLinkRecord> page : inputData.listDomainLinkPages()) {
|
||||
processHeartbeat.progress("INSERT", pageIdx++, domainLinkPageRefs.size());
|
||||
|
||||
try (var reader = new SlopDomainLinkRecord.Reader(page)) {
|
||||
while (reader.hasMore()) {
|
||||
SlopDomainLinkRecord record = reader.next();
|
||||
String domainName = record.dest();
|
||||
if (domainNamesAll.add(domainName)) {
|
||||
inserter.accept(new EdgeDomain(domainName));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
logger.info("Skipping found domains");
|
||||
}
|
||||
|
||||
|
||||
taskHeartbeat.progress(Steps.UPDATE_AFFINITY_AND_IP);
|
||||
|
||||
|
@@ -61,7 +61,7 @@ public class BackoffStrategy {
|
||||
};
|
||||
|
||||
double backoffMinutes = baseInterval.toMinutes()
|
||||
* Math.pow(multiplier, backoffConsecutiveFailures - 1);
|
||||
* Math.pow(multiplier, Math.clamp(backoffConsecutiveFailures, 1, 10));
|
||||
|
||||
Duration newDuration = Duration.ofMinutes(Math.round(0.5+backoffMinutes));
|
||||
if (newDuration.compareTo(maxInterval) > 0) {
|
||||
|
@@ -30,10 +30,11 @@ public class ApiSearchOperator {
|
||||
|
||||
public ApiSearchResults query(String query,
|
||||
int count,
|
||||
int domainCount,
|
||||
int index,
|
||||
NsfwFilterTier filterTier)
|
||||
{
|
||||
var rsp = queryClient.search(createParams(query, count, index, filterTier));
|
||||
var rsp = queryClient.search(createParams(query, count, domainCount, index, filterTier));
|
||||
|
||||
return new ApiSearchResults("RESTRICTED", query,
|
||||
rsp.results()
|
||||
@@ -44,13 +45,13 @@ public class ApiSearchOperator {
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
private QueryParams createParams(String query, int count, int index, NsfwFilterTier filterTirer) {
|
||||
private QueryParams createParams(String query, int count, int domainCount, int index, NsfwFilterTier filterTirer) {
|
||||
SearchSetIdentifier searchSet = selectSearchSet(index);
|
||||
|
||||
return new QueryParams(
|
||||
query,
|
||||
RpcQueryLimits.newBuilder()
|
||||
.setResultsByDomain(2)
|
||||
.setResultsByDomain(Math.clamp(domainCount, 1, 100))
|
||||
.setResultsTotal(Math.min(100, count))
|
||||
.setTimeoutMs(150)
|
||||
.setFetchSize(8192)
|
||||
|
@@ -119,6 +119,7 @@ public class ApiService extends SparkService {
|
||||
}
|
||||
|
||||
int count = intParam(request, "count", 20);
|
||||
int domainCount = intParam(request, "dc", 2);
|
||||
int index = intParam(request, "index", 3);
|
||||
int nsfw = intParam(request, "nsfw", 1);
|
||||
|
||||
@@ -137,7 +138,7 @@ public class ApiService extends SparkService {
|
||||
.labels(license.key)
|
||||
.time(() ->
|
||||
searchOperator
|
||||
.query(query, count, index, nsfwFilterTier)
|
||||
.query(query, count, domainCount, index, nsfwFilterTier)
|
||||
.withLicense(license.getLicense())
|
||||
);
|
||||
}
|
||||
|
@@ -20,7 +20,7 @@ public class BangCommand implements SearchCommandInterface {
|
||||
{
|
||||
bangsToPattern.put("!g", "https://www.google.com/search?q=%s");
|
||||
bangsToPattern.put("!ddg", "https://duckduckgo.com/?q=%s");
|
||||
bangsToPattern.put("!w", "https://search.marginalia.nu/search?query=%s+site:en.wikipedia.org&profile=wiki");
|
||||
bangsToPattern.put("!w", "https://old-search.marginalia.nu/search?query=%s+site:en.wikipedia.org&profile=wiki");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@@ -20,7 +20,7 @@ public class BangCommand implements SearchCommandInterface {
|
||||
{
|
||||
bangsToPattern.put("!g", "https://www.google.com/search?q=%s");
|
||||
bangsToPattern.put("!ddg", "https://duckduckgo.com/?q=%s");
|
||||
bangsToPattern.put("!w", "https://search.marginalia.nu/search?query=%s+site:en.wikipedia.org&profile=wiki");
|
||||
bangsToPattern.put("!w", "/search?query=%s+site:en.wikipedia.org");
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -34,7 +34,7 @@ public class BangCommand implements SearchCommandInterface {
|
||||
|
||||
if (match.isPresent()) {
|
||||
var url = String.format(redirectPattern, URLEncoder.encode(match.get(), StandardCharsets.UTF_8));
|
||||
new MapModelAndView("redirect.jte", Map.of("url", url));
|
||||
return Optional.of(new MapModelAndView("redirect.jte", Map.of("url", url)));
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -33,19 +33,19 @@
|
||||
title="This domain is blacklisted and will not be crawled or indexed">
|
||||
Blacklisted
|
||||
</span>
|
||||
@elseif (siteInfo.domainInformation().getNodeAffinity() == 0)
|
||||
<span
|
||||
class="bg-blue-50 text-blue-900 border-blue-200 dark:bg-black dark:text-blue-100 border p-1 font-sm rounded"
|
||||
title="This domain will be crawled by the search engine">
|
||||
In Crawler Queue
|
||||
</span>
|
||||
@elseif (siteInfo.domainInformation().isUnknownDomain())
|
||||
<span
|
||||
class="bg-purple-50 text-purple-900 border-purple-200 dark:bg-black dark:text-purple-100 border p-1 font-sm rounded"
|
||||
title="The search engine is not aware of this domain name">
|
||||
Unknown
|
||||
</span>
|
||||
@elseif (siteInfo.domainInformation().isUnknownDomain())
|
||||
@elseif (siteInfo.domainInformation().getNodeAffinity() == 0)
|
||||
<span
|
||||
class="bg-blue-50 text-blue-900 border-blue-200 dark:bg-black dark:text-blue-100 border p-1 font-sm rounded"
|
||||
title="This domain will be crawled by the search engine">
|
||||
In Crawler Queue
|
||||
</span>
|
||||
@elseif (!siteInfo.domainInformation().isUnknownDomain())
|
||||
<span
|
||||
class="bg-yellow-50 text-yellow-900 border-yellow-200 dark:bg-black dark:text-yellow-100 border p-1 font-sm rounded"
|
||||
title="The search engine is aware of this domain, but it's not slated for crawling">
|
||||
|
@@ -0,0 +1,19 @@
|
||||
package nu.marginalia.search.command.commands;
|
||||
|
||||
import nu.marginalia.WebsiteUrl;
|
||||
import nu.marginalia.search.command.SearchParameters;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
class BangCommandTest {
|
||||
|
||||
@Test
|
||||
void testWikipediaRedirect() {
|
||||
BangCommand bc = new BangCommand();
|
||||
|
||||
assertTrue(bc.process(SearchParameters.defaultsForQuery(new WebsiteUrl("test"), "!w plato", 1)).isPresent());
|
||||
assertFalse(bc.process(SearchParameters.defaultsForQuery(new WebsiteUrl("test"), "plato", 1)).isPresent());
|
||||
}
|
||||
}
|
@@ -20,6 +20,6 @@ public class StatusModule extends AbstractModule {
|
||||
bind(String.class)
|
||||
.annotatedWith(Names.named("searchEngineTestQuery"))
|
||||
.toInstance(System.getProperty("status-service.public-query",
|
||||
"https://marginalia-search.com/search?query=plato&ref=marginalia-automatic-metrics"));
|
||||
"https://old-search.marginalia.nu/search?query=plato&ref=marginalia-automatic-metrics"));
|
||||
}
|
||||
}
|
||||
|
@@ -74,6 +74,8 @@ public class ControlSysActionsService {
|
||||
Spark.post("/actions/recrawl-all", this::recrawlAll, Redirects.redirectToOverview);
|
||||
Spark.post("/actions/flush-api-caches", this::flushApiCaches, Redirects.redirectToOverview);
|
||||
Spark.post("/actions/reload-blogs-list", this::reloadBlogsList, Redirects.redirectToOverview);
|
||||
|
||||
Spark.post("/actions/update-nsfw-filters", this::updateNsfwFilters, Redirects.redirectToOverview);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
@@ -132,6 +134,14 @@ public class ControlSysActionsService {
|
||||
return "";
|
||||
}
|
||||
|
||||
public Object updateNsfwFilters(Request request, Response response) throws Exception {
|
||||
eventLog.logEvent("USER-ACTION", "UPDATE-NSFW-FILTERS");
|
||||
|
||||
executorClient.updateNsfwFilters();
|
||||
|
||||
return "";
|
||||
}
|
||||
|
||||
public Object flushApiCaches(Request request, Response response) throws Exception {
|
||||
eventLog.logEvent("USER-ACTION", "FLUSH-API-CACHES");
|
||||
apiOutbox.sendNotice("FLUSH_CACHES", "");
|
||||
|
@@ -53,6 +53,31 @@
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="accordion-item">
|
||||
<h2 class="accordion-header">
|
||||
<button class="accordion-button collapsed"
|
||||
type="button"
|
||||
data-bs-toggle="collapse"
|
||||
data-bs-target="#collapseNsfwFilters"
|
||||
aria-expanded="false"
|
||||
aria-controls="collapseNsfwFilters">
|
||||
Update NSFW Filters Definitions
|
||||
</button>
|
||||
</h2>
|
||||
<div id="collapseNsfwFilters" class="accordion-collapse collapse p-3" data-bs-parent="#accordionActions">
|
||||
<div class="mb-3">
|
||||
This will fetch NSFW filter definitions.
|
||||
</div>
|
||||
<form method="post" action="actions/update-nsfw-filters">
|
||||
<button
|
||||
class="btn btn-primary me-md-2"
|
||||
onclick="return confirm('Confirm update NSFW filters');"
|
||||
type="submit">
|
||||
Update NSFW Filter</button>
|
||||
</form>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="accordion-item">
|
||||
<h2 class="accordion-header">
|
||||
<button class="accordion-button collapsed"
|
||||
|
Reference in New Issue
Block a user