mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 07:32:38 +02:00
Compare commits
22 Commits
deploy-022
...
deploy-024
Author | SHA1 | Date | |
---|---|---|---|
|
b4fc0c4368 | ||
|
87ee8765b8 | ||
|
1adf4835fa | ||
|
b7b5d0bf46 | ||
|
416059adde | ||
|
db7930016a | ||
|
82456ad673 | ||
|
0882a6d9cd | ||
|
5020029c2d | ||
|
ac44d0b093 | ||
|
4b32b9b10e | ||
|
9f041d6631 | ||
|
13fb1efce4 | ||
|
c1225165b7 | ||
|
67ad7a3bbc | ||
|
ed62ec8a35 | ||
|
42b24cfa34 | ||
|
1ffaab2da6 | ||
|
5f93c7f767 | ||
|
4001c68c82 | ||
|
6b811489c5 | ||
|
e9d317c65d |
@@ -0,0 +1,6 @@
|
||||
-- Add additional summary columns to DOMAIN_SECURITY_EVENTS table
|
||||
-- to make it easier to make sense of certificate changes
|
||||
|
||||
ALTER TABLE DOMAIN_SECURITY_EVENTS ADD COLUMN CHANGE_CERTIFICATE_SERIAL_NUMBER BOOLEAN NOT NULL DEFAULT FALSE;
|
||||
ALTER TABLE DOMAIN_SECURITY_EVENTS ADD COLUMN CHANGE_CERTIFICATE_ISSUER BOOLEAN NOT NULL DEFAULT FALSE;
|
||||
OPTIMIZE TABLE DOMAIN_SECURITY_EVENTS;
|
@@ -0,0 +1,5 @@
|
||||
-- Add additional summary columns to DOMAIN_SECURITY_EVENTS table
|
||||
-- to make it easier to make sense of certificate changes
|
||||
|
||||
ALTER TABLE DOMAIN_SECURITY_EVENTS ADD COLUMN CHANGE_SCHEMA ENUM('NONE', 'HTTP_TO_HTTPS', 'HTTPS_TO_HTTP', 'UNKNOWN') NOT NULL DEFAULT 'UNKNOWN';
|
||||
OPTIMIZE TABLE DOMAIN_SECURITY_EVENTS;
|
@@ -5,12 +5,10 @@ import nu.marginalia.service.discovery.monitor.ServiceChangeMonitor;
|
||||
import nu.marginalia.service.discovery.monitor.ServiceMonitorIf;
|
||||
import nu.marginalia.service.discovery.property.ServiceEndpoint;
|
||||
import nu.marginalia.service.discovery.property.ServiceKey;
|
||||
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static nu.marginalia.service.discovery.property.ServiceEndpoint.InstanceAddress;
|
||||
|
||||
@@ -66,6 +64,6 @@ public interface ServiceRegistryIf {
|
||||
|
||||
void registerProcess(String processName, int nodeId);
|
||||
void deregisterProcess(String processName, int nodeId);
|
||||
void watchProcess(String processName, int nodeId, Consumer<Boolean> callback) throws Exception;
|
||||
void watchProcessAnyNode(String processName, Collection<Integer> nodes, BiConsumer<Boolean, Integer> callback) throws Exception;
|
||||
|
||||
InterProcessSemaphoreV2 getSemaphore(String name, int permits) throws Exception;
|
||||
}
|
||||
|
@@ -6,6 +6,7 @@ import nu.marginalia.service.discovery.monitor.ServiceMonitorIf;
|
||||
import nu.marginalia.service.discovery.property.ServiceEndpoint;
|
||||
import nu.marginalia.service.discovery.property.ServiceKey;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
|
||||
import org.apache.curator.utils.ZKPaths;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
@@ -13,10 +14,11 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static nu.marginalia.service.discovery.property.ServiceEndpoint.InstanceAddress;
|
||||
|
||||
@@ -283,60 +285,12 @@ public class ZkServiceRegistry implements ServiceRegistryIf {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void watchProcess(String processName, int nodeId, Consumer<Boolean> callback) throws Exception {
|
||||
String path = "/process-locks/" + processName + "/" + nodeId;
|
||||
public InterProcessSemaphoreV2 getSemaphore(String name, int permits) {
|
||||
if (stopped)
|
||||
throw new IllegalStateException("Service registry is stopped, cannot get semaphore " + name);
|
||||
|
||||
// first check if the path exists and call the callback accordingly
|
||||
|
||||
if (curatorFramework.checkExists().forPath(path) != null) {
|
||||
callback.accept(true);
|
||||
}
|
||||
else {
|
||||
callback.accept(false);
|
||||
}
|
||||
|
||||
curatorFramework.watchers().add()
|
||||
.usingWatcher((Watcher) change -> {
|
||||
Watcher.Event.EventType type = change.getType();
|
||||
|
||||
if (type == Watcher.Event.EventType.NodeCreated) {
|
||||
callback.accept(true);
|
||||
}
|
||||
if (type == Watcher.Event.EventType.NodeDeleted) {
|
||||
callback.accept(false);
|
||||
}
|
||||
})
|
||||
.forPath(path);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void watchProcessAnyNode(String processName, Collection<Integer> nodes, BiConsumer<Boolean, Integer> callback) throws Exception {
|
||||
|
||||
for (int node : nodes) {
|
||||
String path = "/process-locks/" + processName + "/" + node;
|
||||
|
||||
// first check if the path exists and call the callback accordingly
|
||||
if (curatorFramework.checkExists().forPath(path) != null) {
|
||||
callback.accept(true, node);
|
||||
}
|
||||
else {
|
||||
callback.accept(false, node);
|
||||
}
|
||||
|
||||
curatorFramework.watchers().add()
|
||||
.usingWatcher((Watcher) change -> {
|
||||
Watcher.Event.EventType type = change.getType();
|
||||
|
||||
if (type == Watcher.Event.EventType.NodeCreated) {
|
||||
callback.accept(true, node);
|
||||
}
|
||||
if (type == Watcher.Event.EventType.NodeDeleted) {
|
||||
callback.accept(false, node);
|
||||
}
|
||||
})
|
||||
.forPath(path);
|
||||
}
|
||||
String path = "/semaphores/" + name;
|
||||
return new InterProcessSemaphoreV2(curatorFramework, path, permits);
|
||||
}
|
||||
|
||||
/* Exposed for tests */
|
||||
|
@@ -12,7 +12,7 @@ public enum ExecutorActor {
|
||||
RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||
RECRAWL_SINGLE_DOMAIN(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||
PROC_CRAWLER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||
PROC_PING_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.SIDELOAD),
|
||||
PROC_PING_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.REALTIME),
|
||||
PROC_EXPORT_TASKS_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||
ADJACENCY_CALCULATION(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||
EXPORT_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||
|
@@ -3,24 +3,176 @@ package nu.marginalia.actor.proc;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
||||
import nu.marginalia.actor.prototype.RecordActorPrototype;
|
||||
import nu.marginalia.actor.state.ActorResumeBehavior;
|
||||
import nu.marginalia.actor.state.ActorStep;
|
||||
import nu.marginalia.actor.state.Resume;
|
||||
import nu.marginalia.actor.state.Terminal;
|
||||
import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||
import nu.marginalia.mqapi.ping.PingRequest;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
@Singleton
|
||||
public class PingMonitorActor extends AbstractProcessSpawnerActor {
|
||||
public class PingMonitorActor extends RecordActorPrototype {
|
||||
|
||||
@Inject
|
||||
public PingMonitorActor(Gson gson, ServiceConfiguration configuration, MqPersistence persistence, ProcessService processService) {
|
||||
super(gson,
|
||||
configuration,
|
||||
persistence,
|
||||
processService,
|
||||
ProcessInboxNames.PING_INBOX,
|
||||
ProcessService.ProcessId.PING);
|
||||
private final MqPersistence persistence;
|
||||
private final ProcessService processService;
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
public static final int MAX_ATTEMPTS = 3;
|
||||
private final String inboxName;
|
||||
private final ProcessService.ProcessId processId;
|
||||
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
|
||||
private final int node;
|
||||
private final Gson gson;
|
||||
|
||||
public record Initial() implements ActorStep {}
|
||||
@Resume(behavior = ActorResumeBehavior.RETRY)
|
||||
public record Monitor(int errorAttempts) implements ActorStep {}
|
||||
@Resume(behavior = ActorResumeBehavior.RESTART)
|
||||
public record Run(int attempts) implements ActorStep {}
|
||||
@Terminal
|
||||
public record Aborted() implements ActorStep {}
|
||||
|
||||
@Override
|
||||
public ActorStep transition(ActorStep self) throws Exception {
|
||||
return switch (self) {
|
||||
case Initial i -> {
|
||||
PingRequest request = new PingRequest();
|
||||
|
||||
persistence.sendNewMessage(inboxName, null, null,
|
||||
"PingRequest",
|
||||
gson.toJson(request),
|
||||
null);
|
||||
|
||||
yield new Monitor(0);
|
||||
}
|
||||
case Monitor(int errorAttempts) -> {
|
||||
for (;;) {
|
||||
var messages = persistence.eavesdrop(inboxName, 1);
|
||||
|
||||
if (messages.isEmpty() && !processService.isRunning(processId)) {
|
||||
synchronized (processId) {
|
||||
processId.wait(5000);
|
||||
}
|
||||
|
||||
if (errorAttempts > 0) { // Reset the error counter if there is silence in the inbox
|
||||
yield new Monitor(0);
|
||||
}
|
||||
// else continue
|
||||
} else {
|
||||
// Special: Associate this thread with the message so that we can get tracking
|
||||
MqMessageHandlerRegistry.register(messages.getFirst().msgId());
|
||||
|
||||
yield new Run(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
case Run(int attempts) -> {
|
||||
try {
|
||||
long startTime = System.currentTimeMillis();
|
||||
var exec = new TaskExecution();
|
||||
long endTime = System.currentTimeMillis();
|
||||
|
||||
if (exec.isError()) {
|
||||
if (attempts < MAX_ATTEMPTS)
|
||||
yield new Run(attempts + 1);
|
||||
else
|
||||
yield new Error();
|
||||
}
|
||||
else if (endTime - startTime < TimeUnit.SECONDS.toMillis(1)) {
|
||||
// To avoid boot loops, we transition to error if the process
|
||||
// didn't run for longer than 1 seconds. This might happen if
|
||||
// the process crashes before it can reach the heartbeat and inbox
|
||||
// stages of execution. In this case it would not report having acted
|
||||
// on its message, and the process would be restarted forever without
|
||||
// the attempts counter incrementing.
|
||||
yield new Error("Process terminated within 1 seconds of starting");
|
||||
}
|
||||
}
|
||||
catch (InterruptedException ex) {
|
||||
// We get this exception when the process is cancelled by the user
|
||||
|
||||
processService.kill(processId);
|
||||
setCurrentMessageToDead();
|
||||
|
||||
yield new Aborted();
|
||||
}
|
||||
|
||||
yield new Monitor(attempts);
|
||||
}
|
||||
default -> new Error();
|
||||
};
|
||||
}
|
||||
|
||||
public String describe() {
|
||||
return "Spawns a(n) " + processId + " process and monitors its inbox for messages";
|
||||
}
|
||||
|
||||
@Inject
|
||||
public PingMonitorActor(Gson gson,
|
||||
ServiceConfiguration configuration,
|
||||
MqPersistence persistence,
|
||||
ProcessService processService) throws SQLException {
|
||||
super(gson);
|
||||
this.gson = gson;
|
||||
this.node = configuration.node();
|
||||
this.persistence = persistence;
|
||||
this.processService = processService;
|
||||
this.inboxName = ProcessInboxNames.PING_INBOX + ":" + node;
|
||||
this.processId = ProcessService.ProcessId.PING;
|
||||
}
|
||||
|
||||
/** Sets the message to dead in the database to avoid
|
||||
* the service respawning on the same task when we
|
||||
* re-enable this actor */
|
||||
private void setCurrentMessageToDead() {
|
||||
try {
|
||||
var messages = persistence.eavesdrop(inboxName, 1);
|
||||
|
||||
if (messages.isEmpty()) // Possibly a race condition where the task is already finished
|
||||
return;
|
||||
|
||||
var theMessage = messages.iterator().next();
|
||||
persistence.updateMessageState(theMessage.msgId(), MqMessageState.DEAD);
|
||||
}
|
||||
catch (SQLException ex) {
|
||||
logger.error("Tried but failed to set the message for " + processId + " to dead", ex);
|
||||
}
|
||||
}
|
||||
|
||||
/** Encapsulates the execution of the process in a separate thread so that
|
||||
* we can interrupt the thread if the process is cancelled */
|
||||
private class TaskExecution {
|
||||
private final AtomicBoolean error = new AtomicBoolean(false);
|
||||
public TaskExecution() throws ExecutionException, InterruptedException {
|
||||
// Run this call in a separate thread so that this thread can be interrupted waiting for it
|
||||
executorService.submit(() -> {
|
||||
try {
|
||||
processService.trigger(processId);
|
||||
} catch (Exception e) {
|
||||
logger.warn("Error in triggering process", e);
|
||||
error.set(true);
|
||||
}
|
||||
}).get(); // Wait for the process to start
|
||||
}
|
||||
|
||||
public boolean isError() {
|
||||
return error.get();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -27,10 +27,12 @@ public class DbBrowseDomainsRandom {
|
||||
public List<BrowseResult> getRandomDomains(int count, DomainBlacklist blacklist, int set) {
|
||||
|
||||
final String q = """
|
||||
SELECT DOMAIN_ID, DOMAIN_NAME, INDEXED
|
||||
SELECT EC_RANDOM_DOMAINS.DOMAIN_ID, DOMAIN_NAME, INDEXED
|
||||
FROM EC_RANDOM_DOMAINS
|
||||
INNER JOIN EC_DOMAIN ON EC_DOMAIN.ID=DOMAIN_ID
|
||||
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION DAI ON DAI.DOMAIN_ID=EC_RANDOM_DOMAINS.DOMAIN_ID
|
||||
WHERE STATE<2
|
||||
AND SERVER_AVAILABLE
|
||||
AND DOMAIN_SET=?
|
||||
AND DOMAIN_ALIAS IS NULL
|
||||
ORDER BY RAND()
|
||||
|
@@ -22,6 +22,7 @@ dependencies {
|
||||
implementation project(':code:common:db')
|
||||
implementation project(':code:libraries:blocking-thread-pool')
|
||||
implementation project(':code:libraries:message-queue')
|
||||
implementation project(':code:libraries:domain-lock')
|
||||
|
||||
implementation project(':code:execution:api')
|
||||
implementation project(':code:processes:crawling-process:ft-content-type')
|
||||
|
@@ -1,66 +0,0 @@
|
||||
package nu.marginalia.rss.svc;
|
||||
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
/** Holds lock objects for each domain, to prevent multiple threads from
|
||||
* crawling the same domain at the same time.
|
||||
*/
|
||||
public class DomainLocks {
|
||||
// The locks are stored in a map, with the domain name as the key. This map will grow
|
||||
// relatively big, but should be manageable since the number of domains is limited to
|
||||
// a few hundred thousand typically.
|
||||
private final Map<String, Semaphore> locks = new ConcurrentHashMap<>();
|
||||
|
||||
/** Returns a lock object corresponding to the given domain. The object is returned as-is,
|
||||
* and may be held by another thread. The caller is responsible for locking and releasing the lock.
|
||||
*/
|
||||
public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException {
|
||||
return new DomainLock(domain.toString(),
|
||||
locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits));
|
||||
}
|
||||
|
||||
private Semaphore defaultPermits(String topDomain) {
|
||||
if (topDomain.equals("wordpress.com"))
|
||||
return new Semaphore(16);
|
||||
if (topDomain.equals("blogspot.com"))
|
||||
return new Semaphore(8);
|
||||
|
||||
if (topDomain.equals("neocities.org"))
|
||||
return new Semaphore(4);
|
||||
if (topDomain.equals("github.io"))
|
||||
return new Semaphore(4);
|
||||
|
||||
if (topDomain.equals("substack.com")) {
|
||||
return new Semaphore(1);
|
||||
}
|
||||
if (topDomain.endsWith(".edu")) {
|
||||
return new Semaphore(1);
|
||||
}
|
||||
|
||||
return new Semaphore(2);
|
||||
}
|
||||
|
||||
public static class DomainLock implements AutoCloseable {
|
||||
private final String domainName;
|
||||
private final Semaphore semaphore;
|
||||
|
||||
DomainLock(String domainName, Semaphore semaphore) throws InterruptedException {
|
||||
this.domainName = domainName;
|
||||
this.semaphore = semaphore;
|
||||
|
||||
Thread.currentThread().setName("fetching:" + domainName + " [await domain lock]");
|
||||
semaphore.acquire();
|
||||
Thread.currentThread().setName("fetching:" + domainName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
semaphore.release();
|
||||
Thread.currentThread().setName("fetching:" + domainName + " [wrapping up]");
|
||||
}
|
||||
}
|
||||
}
|
@@ -5,6 +5,8 @@ import com.opencsv.CSVReader;
|
||||
import nu.marginalia.WmsaHome;
|
||||
import nu.marginalia.contenttype.ContentType;
|
||||
import nu.marginalia.contenttype.DocumentBodyToString;
|
||||
import nu.marginalia.coordination.DomainCoordinator;
|
||||
import nu.marginalia.coordination.DomainLock;
|
||||
import nu.marginalia.executor.client.ExecutorClient;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||
@@ -51,12 +53,13 @@ public class FeedFetcherService {
|
||||
private final ServiceHeartbeat serviceHeartbeat;
|
||||
private final ExecutorClient executorClient;
|
||||
|
||||
private final DomainLocks domainLocks = new DomainLocks();
|
||||
private final DomainCoordinator domainCoordinator;
|
||||
|
||||
private volatile boolean updating;
|
||||
|
||||
@Inject
|
||||
public FeedFetcherService(FeedDb feedDb,
|
||||
DomainCoordinator domainCoordinator,
|
||||
FileStorageService fileStorageService,
|
||||
NodeConfigurationService nodeConfigurationService,
|
||||
ServiceHeartbeat serviceHeartbeat,
|
||||
@@ -67,6 +70,7 @@ public class FeedFetcherService {
|
||||
this.nodeConfigurationService = nodeConfigurationService;
|
||||
this.serviceHeartbeat = serviceHeartbeat;
|
||||
this.executorClient = executorClient;
|
||||
this.domainCoordinator = domainCoordinator;
|
||||
}
|
||||
|
||||
public enum UpdateMode {
|
||||
@@ -132,7 +136,7 @@ public class FeedFetcherService {
|
||||
};
|
||||
|
||||
FetchResult feedData;
|
||||
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
|
||||
try (DomainLock domainLock = domainCoordinator.lockDomain(new EdgeDomain(feed.domain()))) {
|
||||
feedData = fetchFeedData(feed, client, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
|
||||
} catch (Exception ex) {
|
||||
feedData = new FetchResult.TransientError();
|
||||
|
32
code/libraries/domain-lock/build.gradle
Normal file
32
code/libraries/domain-lock/build.gradle
Normal file
@@ -0,0 +1,32 @@
|
||||
plugins {
|
||||
id 'java'
|
||||
}
|
||||
|
||||
java {
|
||||
toolchain {
|
||||
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
|
||||
}
|
||||
}
|
||||
|
||||
apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
|
||||
dependencies {
|
||||
implementation libs.bundles.slf4j
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:config')
|
||||
implementation project(':code:common:service')
|
||||
|
||||
implementation libs.bundles.curator
|
||||
|
||||
implementation libs.guava
|
||||
implementation dependencies.create(libs.guice.get()) {
|
||||
exclude group: 'com.google.guava'
|
||||
}
|
||||
testImplementation libs.bundles.slf4j.test
|
||||
testImplementation libs.bundles.junit
|
||||
testImplementation libs.mockito
|
||||
}
|
||||
|
||||
test {
|
||||
useJUnitPlatform()
|
||||
}
|
@@ -0,0 +1,32 @@
|
||||
package nu.marginalia.coordination;
|
||||
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
|
||||
public class DefaultDomainPermits {
|
||||
|
||||
public static int defaultPermits(EdgeDomain domain) {
|
||||
return defaultPermits(domain.topDomain.toLowerCase());
|
||||
}
|
||||
|
||||
public static int defaultPermits(String topDomain) {
|
||||
|
||||
if (topDomain.equals("wordpress.com"))
|
||||
return 16;
|
||||
if (topDomain.equals("blogspot.com"))
|
||||
return 8;
|
||||
if (topDomain.equals("tumblr.com"))
|
||||
return 8;
|
||||
if (topDomain.equals("neocities.org"))
|
||||
return 8;
|
||||
if (topDomain.equals("github.io"))
|
||||
return 8;
|
||||
// Substack really dislikes broad-scale crawlers, so we need to be careful
|
||||
// to not get blocked.
|
||||
if (topDomain.equals("substack.com")) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 2;
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,17 @@
|
||||
package nu.marginalia.coordination;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class DomainCoordinationModule extends AbstractModule {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(DomainCoordinationModule.class);
|
||||
|
||||
public DomainCoordinationModule() {
|
||||
}
|
||||
|
||||
public void configure() {
|
||||
bind(DomainCoordinator.class).to(ZookeeperDomainCoordinator.class);
|
||||
}
|
||||
}
|
@@ -0,0 +1,13 @@
|
||||
package nu.marginalia.coordination;
|
||||
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Optional;
|
||||
|
||||
public interface DomainCoordinator {
|
||||
DomainLock lockDomain(EdgeDomain domain) throws InterruptedException;
|
||||
Optional<DomainLock> tryLockDomain(EdgeDomain domain, Duration timeout) throws InterruptedException;
|
||||
Optional<DomainLock> tryLockDomain(EdgeDomain domain) throws InterruptedException;
|
||||
boolean isLockableHint(EdgeDomain domain);
|
||||
}
|
@@ -0,0 +1,5 @@
|
||||
package nu.marginalia.coordination;
|
||||
|
||||
public interface DomainLock extends AutoCloseable {
|
||||
void close();
|
||||
}
|
@@ -1,16 +1,17 @@
|
||||
package nu.marginalia.crawl.logic;
|
||||
package nu.marginalia.coordination;
|
||||
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/** Holds lock objects for each domain, to prevent multiple threads from
|
||||
* crawling the same domain at the same time.
|
||||
*/
|
||||
public class DomainLocks {
|
||||
@Singleton
|
||||
public class LocalDomainCoordinator implements DomainCoordinator {
|
||||
// The locks are stored in a map, with the domain name as the key. This map will grow
|
||||
// relatively big, but should be manageable since the number of domains is limited to
|
||||
// a few hundred thousand typically.
|
||||
@@ -24,13 +25,25 @@ public class DomainLocks {
|
||||
|
||||
sem.acquire();
|
||||
|
||||
return new DomainLock(sem);
|
||||
return new LocalDomainLock(sem);
|
||||
}
|
||||
|
||||
public Optional<DomainLock> tryLockDomain(EdgeDomain domain) {
|
||||
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
|
||||
if (sem.tryAcquire(1)) {
|
||||
return Optional.of(new DomainLock(sem));
|
||||
return Optional.of(new LocalDomainLock(sem));
|
||||
}
|
||||
else {
|
||||
// We don't have a lock, so we return an empty optional
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public Optional<DomainLock> tryLockDomain(EdgeDomain domain, Duration timeout) throws InterruptedException {
|
||||
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
|
||||
if (sem.tryAcquire(1, timeout.toMillis(), TimeUnit.MILLISECONDS)) {
|
||||
return Optional.of(new LocalDomainLock(sem));
|
||||
}
|
||||
else {
|
||||
// We don't have a lock, so we return an empty optional
|
||||
@@ -39,24 +52,7 @@ public class DomainLocks {
|
||||
}
|
||||
|
||||
private Semaphore defaultPermits(String topDomain) {
|
||||
if (topDomain.equals("wordpress.com"))
|
||||
return new Semaphore(16);
|
||||
if (topDomain.equals("blogspot.com"))
|
||||
return new Semaphore(8);
|
||||
if (topDomain.equals("tumblr.com"))
|
||||
return new Semaphore(8);
|
||||
if (topDomain.equals("neocities.org"))
|
||||
return new Semaphore(8);
|
||||
if (topDomain.equals("github.io"))
|
||||
return new Semaphore(8);
|
||||
|
||||
// Substack really dislikes broad-scale crawlers, so we need to be careful
|
||||
// to not get blocked.
|
||||
if (topDomain.equals("substack.com")) {
|
||||
return new Semaphore(1);
|
||||
}
|
||||
|
||||
return new Semaphore(2);
|
||||
return new Semaphore(DefaultDomainPermits.defaultPermits(topDomain));
|
||||
}
|
||||
|
||||
/** Returns true if the domain is lockable, i.e. if it is not already locked by another thread.
|
||||
@@ -71,15 +67,15 @@ public class DomainLocks {
|
||||
return sem.availablePermits() > 0;
|
||||
}
|
||||
|
||||
public static class DomainLock implements AutoCloseable {
|
||||
public static class LocalDomainLock implements DomainLock {
|
||||
private final Semaphore semaphore;
|
||||
|
||||
DomainLock(Semaphore semaphore) {
|
||||
LocalDomainLock(Semaphore semaphore) {
|
||||
this.semaphore = semaphore;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
public void close() {
|
||||
semaphore.release();
|
||||
Thread.currentThread().setName("[idle]");
|
||||
}
|
@@ -0,0 +1,116 @@
|
||||
package nu.marginalia.coordination;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import com.google.inject.name.Named;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.service.discovery.ServiceRegistryIf;
|
||||
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
|
||||
import org.apache.curator.framework.recipes.locks.Lease;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Singleton
|
||||
public class ZookeeperDomainCoordinator implements DomainCoordinator {
|
||||
// The locks are stored in a map, with the domain name as the key. This map will grow
|
||||
// relatively big, but should be manageable since the number of domains is limited to
|
||||
// a few hundred thousand typically.
|
||||
private final Map<String, InterProcessSemaphoreV2> locks = new ConcurrentHashMap<>();
|
||||
private final Map<String, Integer> waitCounts = new ConcurrentHashMap<>();
|
||||
|
||||
private final ServiceRegistryIf serviceRegistry;
|
||||
private final int nodeId;
|
||||
|
||||
@Inject
|
||||
public ZookeeperDomainCoordinator(ServiceRegistryIf serviceRegistry, @Named("wmsa-system-node") int nodeId) {
|
||||
// Zookeeper-specific initialization can be done here if needed
|
||||
this.serviceRegistry = serviceRegistry;
|
||||
this.nodeId = nodeId;
|
||||
}
|
||||
|
||||
/** Returns a lock object corresponding to the given domain. The object is returned as-is,
|
||||
* and may be held by another thread. The caller is responsible for locking and releasing the lock.
|
||||
*/
|
||||
public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException {
|
||||
final String key = domain.topDomain.toLowerCase();
|
||||
var sem = locks.computeIfAbsent(key, this::createSemapore);
|
||||
|
||||
// Increment or add a wait count for the domain
|
||||
waitCounts.compute(key, (k,value) -> (value == null ? 1 : value + 1));
|
||||
try {
|
||||
return new ZkDomainLock(sem, sem.acquire());
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException("Failed to acquire lock for domain: " + domain.topDomain, e);
|
||||
}
|
||||
finally {
|
||||
// Decrement or remove the wait count for the domain
|
||||
waitCounts.compute(key, (k,value) -> (value == null || value <= 1) ? null : value - 1);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public Optional<DomainLock> tryLockDomain(EdgeDomain domain) throws InterruptedException {
|
||||
return tryLockDomain(domain, Duration.ofSeconds(1)); // Underlying semaphore doesn't have a tryLock method, so we use a short timeout
|
||||
}
|
||||
|
||||
|
||||
public Optional<DomainLock> tryLockDomain(EdgeDomain domain, Duration timeout) throws InterruptedException {
|
||||
final String key = domain.topDomain.toLowerCase();
|
||||
var sem = locks.computeIfAbsent(key, this::createSemapore);
|
||||
|
||||
// Increment or add a wait count for the domain
|
||||
waitCounts.compute(key, (k,value) -> (value == null ? 1 : value + 1));
|
||||
try {
|
||||
var lease = sem.acquire(timeout.toMillis(), TimeUnit.MILLISECONDS); // Acquire with timeout
|
||||
if (lease != null) {
|
||||
return Optional.of(new ZkDomainLock(sem, lease));
|
||||
}
|
||||
else {
|
||||
return Optional.empty(); // If we fail to acquire the lease, we return an empty optional
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
return Optional.empty(); // If we fail to acquire the lock, we return an empty optional
|
||||
}
|
||||
finally {
|
||||
waitCounts.compute(key, (k,value) -> (value == null || value <= 1) ? null : value - 1);
|
||||
}
|
||||
}
|
||||
|
||||
private InterProcessSemaphoreV2 createSemapore(String topDomain){
|
||||
try {
|
||||
return serviceRegistry.getSemaphore(topDomain + ":" + nodeId, DefaultDomainPermits.defaultPermits(topDomain));
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException("Failed to get semaphore for domain: " + topDomain, e);
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns true if the domain is lockable, i.e. if it is not already locked by another thread.
|
||||
* (this is just a hint, and does not guarantee that the domain is actually lockable any time
|
||||
* after this method returns true)
|
||||
*/
|
||||
public boolean isLockableHint(EdgeDomain domain) {
|
||||
return !waitCounts.containsKey(domain.topDomain.toLowerCase());
|
||||
}
|
||||
|
||||
public static class ZkDomainLock implements DomainLock {
|
||||
private final InterProcessSemaphoreV2 semaphore;
|
||||
private final Lease lease;
|
||||
|
||||
ZkDomainLock(InterProcessSemaphoreV2 semaphore, Lease lease) {
|
||||
this.semaphore = semaphore;
|
||||
this.lease = lease;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
semaphore.returnLease(lease);
|
||||
}
|
||||
}
|
||||
}
|
@@ -32,6 +32,7 @@ dependencies {
|
||||
implementation project(':code:libraries:message-queue')
|
||||
implementation project(':code:libraries:language-processing')
|
||||
implementation project(':code:libraries:easy-lsh')
|
||||
implementation project(':code:libraries:domain-lock')
|
||||
implementation project(':code:processes:crawling-process:model')
|
||||
implementation project(':code:processes:crawling-process:model')
|
||||
|
||||
|
@@ -10,9 +10,11 @@ import nu.marginalia.WmsaHome;
|
||||
import nu.marginalia.atags.model.DomainLinks;
|
||||
import nu.marginalia.atags.source.AnchorTagsSource;
|
||||
import nu.marginalia.atags.source.AnchorTagsSourceFactory;
|
||||
import nu.marginalia.coordination.DomainCoordinationModule;
|
||||
import nu.marginalia.coordination.DomainCoordinator;
|
||||
import nu.marginalia.coordination.DomainLock;
|
||||
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
|
||||
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
|
||||
import nu.marginalia.crawl.logic.DomainLocks;
|
||||
import nu.marginalia.crawl.retreival.CrawlDataReference;
|
||||
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
|
||||
import nu.marginalia.crawl.retreival.DomainProber;
|
||||
@@ -68,7 +70,7 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
private final ServiceRegistryIf serviceRegistry;
|
||||
private final SimpleBlockingThreadPool pool;
|
||||
|
||||
private final DomainLocks domainLocks = new DomainLocks();
|
||||
private final DomainCoordinator domainCoordinator;
|
||||
|
||||
private final Map<String, CrawlTask> pendingCrawlTasks = new ConcurrentHashMap<>();
|
||||
|
||||
@@ -97,6 +99,7 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
WarcArchiverFactory warcArchiverFactory,
|
||||
HikariDataSource dataSource,
|
||||
DomainBlacklist blacklist,
|
||||
DomainCoordinator domainCoordinator,
|
||||
ServiceRegistryIf serviceRegistry,
|
||||
Gson gson) throws InterruptedException {
|
||||
|
||||
@@ -114,6 +117,7 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
this.blacklist = blacklist;
|
||||
this.node = processConfiguration.node();
|
||||
this.serviceRegistry = serviceRegistry;
|
||||
this.domainCoordinator = domainCoordinator;
|
||||
|
||||
SimpleBlockingThreadPool.ThreadType threadType;
|
||||
if (Boolean.getBoolean("crawler.useVirtualThreads")) {
|
||||
@@ -157,6 +161,7 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
new CrawlerModule(),
|
||||
new ProcessConfigurationModule("crawler"),
|
||||
new ServiceDiscoveryModule(),
|
||||
new DomainCoordinationModule(),
|
||||
new DatabaseModule(false)
|
||||
);
|
||||
var crawler = injector.getInstance(CrawlerMain.class);
|
||||
@@ -451,7 +456,7 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
/** Best effort indicator whether we could start this now without getting stuck in
|
||||
* DomainLocks purgatory */
|
||||
public boolean canRun() {
|
||||
return domainLocks.isLockableHint(new EdgeDomain(domain));
|
||||
return domainCoordinator.isLockableHint(new EdgeDomain(domain));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -462,7 +467,7 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
return;
|
||||
}
|
||||
|
||||
Optional<DomainLocks.DomainLock> lock = domainLocks.tryLockDomain(new EdgeDomain(domain));
|
||||
Optional<DomainLock> lock = domainCoordinator.tryLockDomain(new EdgeDomain(domain));
|
||||
// We don't have a lock, so we can't run this task
|
||||
// we return to avoid blocking the pool for too long
|
||||
if (lock.isEmpty()) {
|
||||
@@ -470,7 +475,7 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
retryQueue.put(this);
|
||||
return;
|
||||
}
|
||||
DomainLocks.DomainLock domainLock = lock.get();
|
||||
DomainLock domainLock = lock.get();
|
||||
|
||||
try (domainLock) {
|
||||
Thread.currentThread().setName("crawling:" + domain);
|
||||
|
@@ -32,6 +32,7 @@ dependencies {
|
||||
implementation project(':code:index:api')
|
||||
implementation project(':code:processes:process-mq-api')
|
||||
implementation project(':code:libraries:message-queue')
|
||||
implementation project(':code:libraries:domain-lock')
|
||||
implementation project(':code:libraries:language-processing')
|
||||
implementation project(':code:libraries:easy-lsh')
|
||||
implementation project(':code:processes:crawling-process')
|
||||
|
@@ -10,6 +10,8 @@ import nu.marginalia.api.feeds.FeedsClient;
|
||||
import nu.marginalia.converting.ConverterModule;
|
||||
import nu.marginalia.converting.processor.DomainProcessor;
|
||||
import nu.marginalia.converting.writer.ConverterBatchWriter;
|
||||
import nu.marginalia.coordination.DomainCoordinationModule;
|
||||
import nu.marginalia.coordination.DomainCoordinator;
|
||||
import nu.marginalia.db.DbDomainQueries;
|
||||
import nu.marginalia.db.DomainBlacklist;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
@@ -58,6 +60,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
||||
private final FileStorageService fileStorageService;
|
||||
private final KeywordLoaderService keywordLoaderService;
|
||||
private final DocumentLoaderService documentLoaderService;
|
||||
private final DomainCoordinator domainCoordinator;
|
||||
private final HikariDataSource dataSource;
|
||||
|
||||
@Inject
|
||||
@@ -71,7 +74,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
||||
DomainProcessor domainProcessor,
|
||||
FileStorageService fileStorageService,
|
||||
KeywordLoaderService keywordLoaderService,
|
||||
DocumentLoaderService documentLoaderService, HikariDataSource dataSource)
|
||||
DocumentLoaderService documentLoaderService, DomainCoordinator domainCoordinator, HikariDataSource dataSource)
|
||||
throws Exception
|
||||
{
|
||||
super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX);
|
||||
@@ -84,6 +87,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
||||
this.fileStorageService = fileStorageService;
|
||||
this.keywordLoaderService = keywordLoaderService;
|
||||
this.documentLoaderService = documentLoaderService;
|
||||
this.domainCoordinator = domainCoordinator;
|
||||
this.dataSource = dataSource;
|
||||
|
||||
domainBlacklist.waitUntilLoaded();
|
||||
@@ -107,6 +111,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
||||
try {
|
||||
Injector injector = Guice.createInjector(
|
||||
new LiveCrawlerModule(),
|
||||
new DomainCoordinationModule(), // 2 hours lease timeout is enough for the live crawler
|
||||
new ProcessConfigurationModule("crawler"),
|
||||
new ConverterModule(),
|
||||
new ServiceDiscoveryModule(),
|
||||
@@ -172,7 +177,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
||||
|
||||
processHeartbeat.progress(LiveCrawlState.CRAWLING);
|
||||
|
||||
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainQueries, domainBlacklist);
|
||||
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainCoordinator, domainQueries, domainBlacklist);
|
||||
var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling"))
|
||||
{
|
||||
for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) {
|
||||
|
@@ -5,8 +5,9 @@ import crawlercommons.robots.SimpleRobotRulesParser;
|
||||
import nu.marginalia.WmsaHome;
|
||||
import nu.marginalia.contenttype.ContentType;
|
||||
import nu.marginalia.contenttype.DocumentBodyToString;
|
||||
import nu.marginalia.coordination.DomainCoordinator;
|
||||
import nu.marginalia.coordination.DomainLock;
|
||||
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
|
||||
import nu.marginalia.crawl.logic.DomainLocks;
|
||||
import nu.marginalia.crawl.retreival.CrawlDelayTimer;
|
||||
import nu.marginalia.db.DbDomainQueries;
|
||||
import nu.marginalia.db.DomainBlacklist;
|
||||
@@ -46,14 +47,16 @@ public class SimpleLinkScraper implements AutoCloseable {
|
||||
private final DomainBlacklist domainBlacklist;
|
||||
private final Duration connectTimeout = Duration.ofSeconds(10);
|
||||
private final Duration readTimeout = Duration.ofSeconds(10);
|
||||
private final DomainLocks domainLocks = new DomainLocks();
|
||||
private final DomainCoordinator domainCoordinator;
|
||||
|
||||
private final static int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 10 * 1024 * 1024);
|
||||
|
||||
public SimpleLinkScraper(LiveCrawlDataSet dataSet,
|
||||
DomainCoordinator domainCoordinator,
|
||||
DbDomainQueries domainQueries,
|
||||
DomainBlacklist domainBlacklist) {
|
||||
this.dataSet = dataSet;
|
||||
this.domainCoordinator = domainCoordinator;
|
||||
this.domainQueries = domainQueries;
|
||||
this.domainBlacklist = domainBlacklist;
|
||||
}
|
||||
@@ -98,7 +101,7 @@ public class SimpleLinkScraper implements AutoCloseable {
|
||||
.version(HttpClient.Version.HTTP_2)
|
||||
.build();
|
||||
// throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove:
|
||||
DomainLocks.DomainLock lock = domainLocks.lockDomain(domain)
|
||||
DomainLock lock = domainCoordinator.lockDomain(domain)
|
||||
) {
|
||||
SimpleRobotRules rules = fetchRobotsRules(rootUrl, client);
|
||||
|
||||
|
@@ -1,5 +1,6 @@
|
||||
package nu.marginalia.livecrawler;
|
||||
|
||||
import nu.marginalia.coordination.LocalDomainCoordinator;
|
||||
import nu.marginalia.db.DomainBlacklistImpl;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
@@ -37,7 +38,7 @@ class SimpleLinkScraperTest {
|
||||
|
||||
@Test
|
||||
public void testRetrieveNow() throws Exception {
|
||||
var scraper = new SimpleLinkScraper(dataSet, null, Mockito.mock(DomainBlacklistImpl.class));
|
||||
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(), null, Mockito.mock(DomainBlacklistImpl.class));
|
||||
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
|
||||
Assertions.assertEquals(1, fetched);
|
||||
|
||||
@@ -57,7 +58,7 @@ class SimpleLinkScraperTest {
|
||||
@Test
|
||||
public void testRetrieveNow_Redundant() throws Exception {
|
||||
dataSet.saveDocument(1, new EdgeUrl("https://www.marginalia.nu/"), "<html>", "", "127.0.0.1");
|
||||
var scraper = new SimpleLinkScraper(dataSet, null, Mockito.mock(DomainBlacklistImpl.class));
|
||||
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(),null, Mockito.mock(DomainBlacklistImpl.class));
|
||||
|
||||
// If the requested URL is already in the dataSet, we retrieveNow should shortcircuit and not fetch anything
|
||||
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
|
||||
|
@@ -27,6 +27,7 @@ dependencies {
|
||||
implementation project(':code:common:config')
|
||||
implementation project(':code:common:service')
|
||||
|
||||
implementation project(':code:libraries:domain-lock')
|
||||
implementation project(':code:libraries:geo-ip')
|
||||
implementation project(':code:libraries:message-queue')
|
||||
|
||||
|
@@ -137,7 +137,7 @@ public class PingDao {
|
||||
|
||||
public HistoricalAvailabilityData getHistoricalAvailabilityData(long domainId) throws SQLException {
|
||||
var query = """
|
||||
SELECT EC_DOMAIN.ID, EC_DOMAIN.DOMAIN_NAME, DOMAIN_AVAILABILITY_INFORMATION.*, DOMAIN_SECURITY_INFORMATION.*
|
||||
SELECT EC_DOMAIN.ID, EC_DOMAIN.DOMAIN_NAME, EC_DOMAIN.NODE_AFFINITY, DOMAIN_AVAILABILITY_INFORMATION.*, DOMAIN_SECURITY_INFORMATION.*
|
||||
FROM EC_DOMAIN
|
||||
LEFT JOIN DOMAIN_SECURITY_INFORMATION ON DOMAIN_SECURITY_INFORMATION.DOMAIN_ID = EC_DOMAIN.ID
|
||||
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION ON DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID = EC_DOMAIN.ID
|
||||
@@ -168,7 +168,7 @@ public class PingDao {
|
||||
if (dar == null) {
|
||||
return new HistoricalAvailabilityData.JustDomainReference(new DomainReference(
|
||||
rs.getInt("EC_DOMAIN.ID"),
|
||||
rs.getInt("EC_DOMAIN.NODE_ID"),
|
||||
rs.getInt("EC_DOMAIN.NODE_AFFINITY"),
|
||||
domainName.toLowerCase()
|
||||
));
|
||||
}
|
||||
@@ -185,12 +185,12 @@ public class PingDao {
|
||||
return null;
|
||||
}
|
||||
|
||||
public List<UpdateSchedule.UpdateJob<Long, HistoricalAvailabilityData>> getDomainUpdateSchedule(int nodeId) {
|
||||
List<UpdateSchedule.UpdateJob<Long, HistoricalAvailabilityData>> updateJobs = new ArrayList<>();
|
||||
public List<UpdateSchedule.UpdateJob<DomainReference, HistoricalAvailabilityData>> getDomainUpdateSchedule(int nodeId) {
|
||||
List<UpdateSchedule.UpdateJob<DomainReference, HistoricalAvailabilityData>> updateJobs = new ArrayList<>();
|
||||
|
||||
try (var conn = dataSource.getConnection();
|
||||
var ps = conn.prepareStatement("""
|
||||
SELECT ID, NEXT_SCHEDULED_UPDATE
|
||||
SELECT ID, DOMAIN_NAME, NEXT_SCHEDULED_UPDATE
|
||||
FROM EC_DOMAIN
|
||||
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION
|
||||
ON EC_DOMAIN.ID = DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID
|
||||
@@ -200,11 +200,13 @@ public class PingDao {
|
||||
ps.setInt(1, nodeId);
|
||||
ResultSet rs = ps.executeQuery();
|
||||
while (rs.next()) {
|
||||
long domainId = rs.getLong("ID");
|
||||
int domainId = rs.getInt("ID");
|
||||
String domainName = rs.getString("DOMAIN_NAME");
|
||||
var ts = rs.getTimestamp("NEXT_SCHEDULED_UPDATE");
|
||||
Instant nextUpdate = ts == null ? Instant.now() : ts.toInstant();
|
||||
|
||||
updateJobs.add(new UpdateSchedule.UpdateJob<>(domainId, nextUpdate));
|
||||
var ref = new DomainReference(domainId, nodeId, domainName.toLowerCase());
|
||||
updateJobs.add(new UpdateSchedule.UpdateJob<>(ref, nextUpdate));
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new RuntimeException("Failed to retrieve domain update schedule", e);
|
||||
@@ -241,7 +243,7 @@ public class PingDao {
|
||||
else {
|
||||
var record = new DomainDnsRecord(rs);
|
||||
updateJobs.add(new UpdateSchedule.UpdateJob<>(
|
||||
new RootDomainReference.ById(dnsRootDomainId),
|
||||
new RootDomainReference.ByIdAndName(dnsRootDomainId, rootDomainName),
|
||||
Objects.requireNonNullElseGet(record.tsNextScheduledUpdate(), Instant::now))
|
||||
);
|
||||
}
|
||||
|
@@ -1,12 +1,15 @@
|
||||
package nu.marginalia.ping;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import nu.marginalia.coordination.DomainCoordinator;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.ping.model.*;
|
||||
import nu.marginalia.ping.svc.DnsPingService;
|
||||
import nu.marginalia.ping.svc.HttpPingService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
@@ -22,13 +25,14 @@ import java.util.concurrent.TimeUnit;
|
||||
public class PingJobScheduler {
|
||||
private final HttpPingService httpPingService;
|
||||
private final DnsPingService dnsPingService;
|
||||
private final DomainCoordinator domainCoordinator;
|
||||
private final PingDao pingDao;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(PingJobScheduler.class);
|
||||
|
||||
private static final UpdateSchedule<RootDomainReference, RootDomainReference> dnsUpdateSchedule
|
||||
= new UpdateSchedule<>(250_000);
|
||||
private static final UpdateSchedule<Long, HistoricalAvailabilityData> availabilityUpdateSchedule
|
||||
private static final UpdateSchedule<DomainReference, HistoricalAvailabilityData> availabilityUpdateSchedule
|
||||
= new UpdateSchedule<>(250_000);
|
||||
|
||||
public volatile Instant dnsLastSync = Instant.now();
|
||||
@@ -42,14 +46,16 @@ public class PingJobScheduler {
|
||||
@Inject
|
||||
public PingJobScheduler(HttpPingService httpPingService,
|
||||
DnsPingService dnsPingService,
|
||||
DomainCoordinator domainCoordinator,
|
||||
PingDao pingDao)
|
||||
{
|
||||
this.httpPingService = httpPingService;
|
||||
this.dnsPingService = dnsPingService;
|
||||
this.domainCoordinator = domainCoordinator;
|
||||
this.pingDao = pingDao;
|
||||
}
|
||||
|
||||
public synchronized void start(boolean startPaused) {
|
||||
public synchronized void start() {
|
||||
if (running)
|
||||
return;
|
||||
|
||||
@@ -60,11 +66,13 @@ public class PingJobScheduler {
|
||||
allThreads.add(Thread.ofPlatform().daemon().name("sync-dns").start(this::syncAvailabilityJobs));
|
||||
allThreads.add(Thread.ofPlatform().daemon().name("sync-availability").start(this::syncDnsRecords));
|
||||
|
||||
int availabilityThreads = Integer.getInteger("ping.availabilityThreads", 8);
|
||||
int pingThreads = Integer.getInteger("ping.dnsThreads", 2);
|
||||
|
||||
for (int i = 0; i < 8; i++) {
|
||||
for (int i = 0; i < availabilityThreads; i++) {
|
||||
allThreads.add(Thread.ofPlatform().daemon().name("availability-job-consumer-" + i).start(this::availabilityJobConsumer));
|
||||
}
|
||||
for (int i = 0; i < 1; i++) {
|
||||
for (int i = 0; i < pingThreads; i++) {
|
||||
allThreads.add(Thread.ofPlatform().daemon().name("dns-job-consumer-" + i).start(this::dnsJobConsumer));
|
||||
}
|
||||
}
|
||||
@@ -83,6 +91,8 @@ public class PingJobScheduler {
|
||||
}
|
||||
|
||||
public void pause(int nodeId) {
|
||||
logger.info("Pausing PingJobScheduler for nodeId: {}", nodeId);
|
||||
|
||||
if (this.nodeId != null && this.nodeId != nodeId) {
|
||||
logger.warn("Attempted to pause PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId);
|
||||
return;
|
||||
@@ -95,7 +105,8 @@ public class PingJobScheduler {
|
||||
logger.info("PingJobScheduler paused");
|
||||
}
|
||||
|
||||
public synchronized void resume(int nodeId) {
|
||||
public synchronized void enableForNode(int nodeId) {
|
||||
logger.info("Resuming PingJobScheduler for nodeId: {}", nodeId);
|
||||
if (this.nodeId != null) {
|
||||
logger.warn("Attempted to resume PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId);
|
||||
return;
|
||||
@@ -128,7 +139,15 @@ public class PingJobScheduler {
|
||||
continue;
|
||||
}
|
||||
|
||||
long nextId = availabilityUpdateSchedule.next();
|
||||
DomainReference ref = availabilityUpdateSchedule.nextIf(domain -> {
|
||||
EdgeDomain domainObj = new EdgeDomain(domain.domainName());
|
||||
if (!domainCoordinator.isLockableHint(domainObj)) {
|
||||
return false; // Skip locked domains
|
||||
}
|
||||
return true; // Process this domain
|
||||
});
|
||||
|
||||
long nextId = ref.domainId();
|
||||
var data = pingDao.getHistoricalAvailabilityData(nextId);
|
||||
if (data == null) {
|
||||
logger.warn("No availability data found for ID: {}", nextId);
|
||||
@@ -137,24 +156,14 @@ public class PingJobScheduler {
|
||||
|
||||
try {
|
||||
List<WritableModel> objects = switch (data) {
|
||||
case HistoricalAvailabilityData.JustDomainReference(DomainReference reference) -> {
|
||||
logger.info("Processing availability job for domain: {}", reference.domainName());
|
||||
yield httpPingService.pingDomain(reference, null, null);
|
||||
}
|
||||
case HistoricalAvailabilityData.JustAvailability(String domain, DomainAvailabilityRecord record) -> {
|
||||
logger.info("Availability check with no security info: {}", domain);
|
||||
yield httpPingService.pingDomain(
|
||||
new DomainReference(record.domainId(), record.nodeId(), domain),
|
||||
record,
|
||||
null);
|
||||
}
|
||||
case HistoricalAvailabilityData.AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security) -> {
|
||||
logger.info("Availability check with full historical data: {}", domain);
|
||||
yield httpPingService.pingDomain(
|
||||
new DomainReference(availability.domainId(), availability.nodeId(), domain),
|
||||
availability,
|
||||
security);
|
||||
}
|
||||
case HistoricalAvailabilityData.JustDomainReference(DomainReference reference)
|
||||
-> httpPingService.pingDomain(reference, null, null);
|
||||
case HistoricalAvailabilityData.JustAvailability(String domain, DomainAvailabilityRecord record)
|
||||
-> httpPingService.pingDomain(
|
||||
new DomainReference(record.domainId(), record.nodeId(), domain), record, null);
|
||||
case HistoricalAvailabilityData.AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security)
|
||||
-> httpPingService.pingDomain(
|
||||
new DomainReference(availability.domainId(), availability.nodeId(), domain), availability, security);
|
||||
};
|
||||
|
||||
pingDao.write(objects);
|
||||
@@ -163,7 +172,7 @@ public class PingJobScheduler {
|
||||
for (var object : objects) {
|
||||
var ts = object.nextUpdateTime();
|
||||
if (ts != null) {
|
||||
availabilityUpdateSchedule.add(nextId, ts);
|
||||
availabilityUpdateSchedule.add(ref, ts);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -194,13 +203,13 @@ public class PingJobScheduler {
|
||||
|
||||
try {
|
||||
List<WritableModel> objects = switch(ref) {
|
||||
case RootDomainReference.ById(long id) -> {
|
||||
case RootDomainReference.ByIdAndName(long id, String name) -> {
|
||||
var oldRecord = Objects.requireNonNull(pingDao.getDomainDnsRecord(id));
|
||||
yield dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord);
|
||||
}
|
||||
case RootDomainReference.ByName(String name) -> {
|
||||
var oldRecord = pingDao.getDomainDnsRecord(name);
|
||||
yield dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord);
|
||||
@Nullable var oldRecord = pingDao.getDomainDnsRecord(name);
|
||||
yield dnsPingService.pingDomain(name, oldRecord);
|
||||
}
|
||||
};
|
||||
|
||||
|
@@ -5,30 +5,25 @@ import com.google.inject.Guice;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
import nu.marginalia.WmsaHome;
|
||||
import nu.marginalia.coordination.DomainCoordinationModule;
|
||||
import nu.marginalia.geoip.GeoIpDictionary;
|
||||
import nu.marginalia.mq.MessageQueueFactory;
|
||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||
import nu.marginalia.mqapi.ping.PingRequest;
|
||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||
import nu.marginalia.nodecfg.model.NodeConfiguration;
|
||||
import nu.marginalia.process.ProcessConfiguration;
|
||||
import nu.marginalia.process.ProcessConfigurationModule;
|
||||
import nu.marginalia.process.ProcessMainClass;
|
||||
import nu.marginalia.service.discovery.ServiceRegistryIf;
|
||||
import nu.marginalia.service.module.DatabaseModule;
|
||||
import nu.marginalia.service.module.ServiceDiscoveryModule;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.security.Security;
|
||||
import java.util.List;
|
||||
|
||||
public class PingMain extends ProcessMainClass {
|
||||
private static final Logger log = LoggerFactory.getLogger(PingMain.class);
|
||||
|
||||
private final PingJobScheduler pingJobScheduler;
|
||||
private final ServiceRegistryIf serviceRegistry;
|
||||
private final NodeConfigurationService nodeConfigurationService;
|
||||
private final int node;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(PingMain.class);
|
||||
@@ -38,15 +33,11 @@ public class PingMain extends ProcessMainClass {
|
||||
ProcessConfiguration config,
|
||||
Gson gson,
|
||||
PingJobScheduler pingJobScheduler,
|
||||
ServiceRegistryIf serviceRegistry,
|
||||
NodeConfigurationService nodeConfigurationService,
|
||||
ProcessConfiguration processConfiguration
|
||||
) {
|
||||
super(messageQueueFactory, config, gson, ProcessInboxNames.PING_INBOX);
|
||||
|
||||
this.pingJobScheduler = pingJobScheduler;
|
||||
this.serviceRegistry = serviceRegistry;
|
||||
this.nodeConfigurationService = nodeConfigurationService;
|
||||
this.node = processConfiguration.node();
|
||||
}
|
||||
|
||||
@@ -54,57 +45,8 @@ public class PingMain extends ProcessMainClass {
|
||||
log.info("Starting PingMain...");
|
||||
|
||||
// Start the ping job scheduler
|
||||
pingJobScheduler.start(true);
|
||||
|
||||
// Watch the crawler process to suspend/resume the ping job scheduler
|
||||
try {
|
||||
serviceRegistry.watchProcess("crawler", node, (running) -> {
|
||||
if (running) {
|
||||
log.info("Crawler process is running, suspending ping job scheduler.");
|
||||
pingJobScheduler.pause(node);
|
||||
} else {
|
||||
log.warn("Crawler process is not running, resuming ping job scheduler.");
|
||||
pingJobScheduler.resume(node);
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException("Failed to watch crawler process", e);
|
||||
}
|
||||
|
||||
log.info("PingMain started successfully.");
|
||||
}
|
||||
|
||||
|
||||
public void runSecondary() {
|
||||
log.info("Starting PingMain...");
|
||||
|
||||
List<Integer> crawlerNodes = nodeConfigurationService.getAll()
|
||||
.stream()
|
||||
.filter(node -> !node.disabled())
|
||||
.filter(node -> node.profile().permitBatchCrawl())
|
||||
.map(NodeConfiguration::node)
|
||||
.toList()
|
||||
;
|
||||
|
||||
// Start the ping job scheduler
|
||||
pingJobScheduler.start(true);
|
||||
|
||||
// Watch the crawler process to suspend/resume the ping job scheduler
|
||||
try {
|
||||
serviceRegistry.watchProcessAnyNode("crawler", crawlerNodes, (running, n) -> {
|
||||
if (running) {
|
||||
log.info("Crawler process is running on node {} taking over ", n);
|
||||
pingJobScheduler.resume(n);
|
||||
} else {
|
||||
log.warn("Crawler process stopped, resuming ping job scheduler.");
|
||||
pingJobScheduler.pause(n);
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException("Failed to watch crawler process", e);
|
||||
}
|
||||
pingJobScheduler.start();
|
||||
pingJobScheduler.enableForNode(node);
|
||||
|
||||
log.info("PingMain started successfully.");
|
||||
}
|
||||
@@ -131,6 +73,7 @@ public class PingMain extends ProcessMainClass {
|
||||
Injector injector = Guice.createInjector(
|
||||
new PingModule(),
|
||||
new ServiceDiscoveryModule(),
|
||||
new DomainCoordinationModule(),
|
||||
new ProcessConfigurationModule("ping"),
|
||||
new DatabaseModule(false)
|
||||
);
|
||||
@@ -144,19 +87,11 @@ public class PingMain extends ProcessMainClass {
|
||||
var instructions = main.fetchInstructions(PingRequest.class);
|
||||
|
||||
try {
|
||||
switch (instructions.value().runClass) {
|
||||
case "primary":
|
||||
log.info("Running as primary node");
|
||||
main.runPrimary();
|
||||
break;
|
||||
case "secondary":
|
||||
log.info("Running as secondary node");
|
||||
main.runSecondary();
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Invalid runClass: " + instructions.value().runClass);
|
||||
}
|
||||
for(;;);
|
||||
main.runPrimary();
|
||||
for(;;)
|
||||
synchronized (main) { // Wait on the object lock to avoid busy-looping
|
||||
main.wait();
|
||||
}
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
logger.error("Error running ping process", ex);
|
||||
|
@@ -2,9 +2,8 @@ package nu.marginalia.ping;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.*;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
/** In-memory schedule for updates, allowing jobs to be added and processed in order of their scheduled time.
|
||||
* This is not a particularly high-performance implementation, but exists to take contention off the database's
|
||||
@@ -23,6 +22,9 @@ public class UpdateSchedule<T, T2> {
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
/** Returns the next job in the queue that is due to be processed.
|
||||
* If no jobs are due, it will block until a job is added or a job becomes due.
|
||||
* */
|
||||
public synchronized T next() throws InterruptedException {
|
||||
while (true) {
|
||||
if (updateQueue.isEmpty()) {
|
||||
@@ -44,6 +46,56 @@ public class UpdateSchedule<T, T2> {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/** Returns the first job in the queue matching the predicate that is not scheduled into the future,
|
||||
* blocking until a job is added or a job becomes due.
|
||||
*/
|
||||
public synchronized T nextIf(Predicate<T> predicate) throws InterruptedException {
|
||||
List<UpdateJob<T, T2>> rejectedJobs = new ArrayList<>();
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
if (updateQueue.isEmpty()) {
|
||||
wait(); // Wait for a new job to be added
|
||||
continue;
|
||||
}
|
||||
|
||||
UpdateJob<T, T2> job = updateQueue.peek();
|
||||
Instant now = Instant.now();
|
||||
|
||||
if (job.updateTime.isAfter(now)) {
|
||||
Duration toWait = Duration.between(now, job.updateTime);
|
||||
|
||||
// Return the rejected jobs to the queue for other threads to process
|
||||
updateQueue.addAll(rejectedJobs);
|
||||
if (!rejectedJobs.isEmpty())
|
||||
notifyAll();
|
||||
rejectedJobs.clear();
|
||||
|
||||
wait(Math.max(1, toWait.toMillis()));
|
||||
} else {
|
||||
var candidate = updateQueue.poll(); // Remove the job from the queue since it's due
|
||||
|
||||
assert candidate != null : "Update job should not be null at this point, since we just peeked it in a synchronized block";
|
||||
|
||||
if (!predicate.test(candidate.key())) {
|
||||
rejectedJobs.add(candidate);
|
||||
}
|
||||
else {
|
||||
return candidate.key();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
// Return the rejected jobs to the queue for other threads to process
|
||||
updateQueue.addAll(rejectedJobs);
|
||||
if (!rejectedJobs.isEmpty())
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public synchronized void clear() {
|
||||
updateQueue.clear();
|
||||
notifyAll();
|
||||
|
@@ -4,6 +4,7 @@ import com.google.inject.Inject;
|
||||
import nu.marginalia.UserAgent;
|
||||
import nu.marginalia.WmsaHome;
|
||||
import nu.marginalia.ping.fetcher.response.*;
|
||||
import org.apache.hc.client5.http.HttpHostConnectException;
|
||||
import org.apache.hc.client5.http.classic.HttpClient;
|
||||
import org.apache.hc.client5.http.protocol.HttpClientContext;
|
||||
import org.apache.hc.core5.http.Header;
|
||||
@@ -82,9 +83,12 @@ public class PingHttpFetcher {
|
||||
});
|
||||
} catch (SocketTimeoutException ex) {
|
||||
return new TimeoutResponse(ex.getMessage());
|
||||
} catch (HttpHostConnectException e) {
|
||||
return new ConnectionError(e.getClass().getSimpleName());
|
||||
} catch (IOException e) {
|
||||
return new ConnectionError(e.getMessage());
|
||||
return new ProtocolError(e.getClass().getSimpleName());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -50,7 +50,12 @@ public class RetryStrategy implements HttpRequestRetryStrategy {
|
||||
|
||||
if (statusCode == 429) {
|
||||
// get the Retry-After header
|
||||
String retryAfter = response.getFirstHeader("Retry-After").getValue();
|
||||
var retryAfterHeader = response.getFirstHeader("Retry-After");
|
||||
if (retryAfterHeader == null) {
|
||||
return TimeValue.ofSeconds(3);
|
||||
}
|
||||
|
||||
String retryAfter = retryAfterHeader.getValue();
|
||||
if (retryAfter == null) {
|
||||
return TimeValue.ofSeconds(2);
|
||||
}
|
||||
|
@@ -154,7 +154,7 @@ implements WritableModel
|
||||
ps.setNull(12, java.sql.Types.SMALLINT);
|
||||
}
|
||||
else {
|
||||
ps.setShort(12, (short) httpResponseTime().toMillis());
|
||||
ps.setInt(12, Math.clamp(httpResponseTime().toMillis(), 0, 0xFFFF)); // "unsigned short" in SQL
|
||||
}
|
||||
|
||||
if (errorClassification() == null) {
|
||||
|
@@ -1,3 +1,10 @@
|
||||
package nu.marginalia.ping.model;
|
||||
|
||||
public record DomainReference(int domainId, int nodeId, String domainName) { }
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
|
||||
public record DomainReference(int domainId, int nodeId, String domainName) {
|
||||
public EdgeDomain asEdgeDomain() {
|
||||
return new EdgeDomain(domainName);
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -16,6 +16,9 @@ public record DomainSecurityEvent(
|
||||
boolean certificateProfileChanged,
|
||||
boolean certificateSanChanged,
|
||||
boolean certificatePublicKeyChanged,
|
||||
boolean certificateSerialNumberChanged,
|
||||
boolean certificateIssuerChanged,
|
||||
SchemaChange schemaChange,
|
||||
Duration oldCertificateTimeToExpiry,
|
||||
boolean securityHeadersChanged,
|
||||
boolean ipChanged,
|
||||
@@ -41,8 +44,11 @@ public record DomainSecurityEvent(
|
||||
change_software,
|
||||
old_cert_time_to_expiry,
|
||||
security_signature_before,
|
||||
security_signature_after
|
||||
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
|
||||
security_signature_after,
|
||||
change_certificate_serial_number,
|
||||
change_certificate_issuer,
|
||||
change_schema
|
||||
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
|
||||
"""))
|
||||
{
|
||||
|
||||
@@ -75,6 +81,10 @@ public record DomainSecurityEvent(
|
||||
ps.setBytes(14, securitySignatureAfter().compressed());
|
||||
}
|
||||
|
||||
ps.setBoolean(15, certificateSerialNumberChanged());
|
||||
ps.setBoolean(16, certificateIssuerChanged());
|
||||
ps.setString(17, schemaChange.name());
|
||||
|
||||
ps.executeUpdate();
|
||||
}
|
||||
}
|
||||
|
@@ -1,6 +1,6 @@
|
||||
package nu.marginalia.ping.model;
|
||||
|
||||
public sealed interface RootDomainReference {
|
||||
record ById(long id) implements RootDomainReference { }
|
||||
record ByIdAndName(long id, String name) implements RootDomainReference { }
|
||||
record ByName(String name) implements RootDomainReference { }
|
||||
}
|
||||
|
@@ -0,0 +1,12 @@
|
||||
package nu.marginalia.ping.model;
|
||||
|
||||
public enum SchemaChange {
|
||||
UNKNOWN,
|
||||
NONE,
|
||||
HTTP_TO_HTTPS,
|
||||
HTTPS_TO_HTTP;
|
||||
|
||||
public boolean isSignificant() {
|
||||
return this != NONE && this != UNKNOWN;
|
||||
}
|
||||
}
|
@@ -2,6 +2,9 @@ package nu.marginalia.ping.model.comparison;
|
||||
|
||||
import nu.marginalia.ping.model.DomainAvailabilityRecord;
|
||||
import nu.marginalia.ping.model.DomainSecurityRecord;
|
||||
import nu.marginalia.ping.model.HttpSchema;
|
||||
import nu.marginalia.ping.model.SchemaChange;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
@@ -15,10 +18,13 @@ public record SecurityInformationChange(
|
||||
boolean isCertificateProfileChanged,
|
||||
boolean isCertificateSanChanged,
|
||||
boolean isCertificatePublicKeyChanged,
|
||||
boolean isCertificateSerialNumberChanged,
|
||||
boolean isCertificateIssuerChanged,
|
||||
Duration oldCertificateTimeToExpiry,
|
||||
boolean isSecurityHeadersChanged,
|
||||
boolean isIpAddressChanged,
|
||||
boolean isSoftwareHeaderChanged
|
||||
boolean isSoftwareHeaderChanged,
|
||||
SchemaChange schemaChange
|
||||
) {
|
||||
public static SecurityInformationChange between(
|
||||
DomainSecurityRecord before, DomainAvailabilityRecord availabilityBefore,
|
||||
@@ -30,8 +36,10 @@ public record SecurityInformationChange(
|
||||
|
||||
boolean certificateFingerprintChanged = 0 != Arrays.compare(before.sslCertFingerprintSha256(), after.sslCertFingerprintSha256());
|
||||
boolean certificateProfileChanged = before.certificateProfileHash() != after.certificateProfileHash();
|
||||
boolean certificateSerialNumberChanged = !Objects.equals(before.sslCertSerialNumber(), after.sslCertSerialNumber());
|
||||
boolean certificatePublicKeyChanged = 0 != Arrays.compare(before.sslCertPublicKeyHash(), after.sslCertPublicKeyHash());
|
||||
boolean certificateSanChanged = !Objects.equals(before.sslCertSan(), after.sslCertSan());
|
||||
boolean certificateIssuerChanged = !Objects.equals(before.sslCertIssuer(), after.sslCertIssuer());
|
||||
|
||||
Duration oldCertificateTimeToExpiry = before.sslCertNotAfter() == null ? null : Duration.between(
|
||||
Instant.now(),
|
||||
@@ -39,9 +47,10 @@ public record SecurityInformationChange(
|
||||
);
|
||||
|
||||
boolean securityHeadersChanged = before.securityHeadersHash() != after.securityHeadersHash();
|
||||
|
||||
boolean softwareChanged = !Objects.equals(before.headerServer(), after.headerServer());
|
||||
|
||||
SchemaChange schemaChange = getSchemaChange(before, after);
|
||||
|
||||
// Note we don't include IP address changes in the overall change status,
|
||||
// as this is not alone considered a change in security information; we may have
|
||||
// multiple IP addresses for a domain, and the IP address may change frequently
|
||||
@@ -50,7 +59,9 @@ public record SecurityInformationChange(
|
||||
boolean isChanged = asnChanged
|
||||
|| certificateFingerprintChanged
|
||||
|| securityHeadersChanged
|
||||
|| softwareChanged;
|
||||
|| certificateProfileChanged
|
||||
|| softwareChanged
|
||||
|| schemaChange.isSignificant();
|
||||
|
||||
return new SecurityInformationChange(
|
||||
isChanged,
|
||||
@@ -59,12 +70,41 @@ public record SecurityInformationChange(
|
||||
certificateProfileChanged,
|
||||
certificateSanChanged,
|
||||
certificatePublicKeyChanged,
|
||||
certificateSerialNumberChanged,
|
||||
certificateIssuerChanged,
|
||||
oldCertificateTimeToExpiry,
|
||||
securityHeadersChanged,
|
||||
ipChanged,
|
||||
softwareChanged
|
||||
softwareChanged,
|
||||
schemaChange
|
||||
);
|
||||
}
|
||||
|
||||
private static @NotNull SchemaChange getSchemaChange(DomainSecurityRecord before, DomainSecurityRecord after) {
|
||||
if (before.httpSchema() == null || after.httpSchema() == null) {
|
||||
return SchemaChange.UNKNOWN;
|
||||
}
|
||||
|
||||
boolean beforeIsHttp = before.httpSchema() == HttpSchema.HTTP;
|
||||
boolean afterIsHttp = after.httpSchema() == HttpSchema.HTTP;
|
||||
boolean beforeIsHttps = before.httpSchema() == HttpSchema.HTTPS;
|
||||
boolean afterIsHttps = after.httpSchema() == HttpSchema.HTTPS;
|
||||
|
||||
SchemaChange schemaChange;
|
||||
|
||||
if (beforeIsHttp && afterIsHttp) {
|
||||
schemaChange = SchemaChange.NONE;
|
||||
} else if (beforeIsHttps && afterIsHttps) {
|
||||
schemaChange = SchemaChange.NONE;
|
||||
} else if (beforeIsHttp && afterIsHttps) {
|
||||
schemaChange = SchemaChange.HTTP_TO_HTTPS;
|
||||
} else if (beforeIsHttps && afterIsHttp) {
|
||||
schemaChange = SchemaChange.HTTPS_TO_HTTP;
|
||||
} else {
|
||||
schemaChange = SchemaChange.UNKNOWN;
|
||||
}
|
||||
return schemaChange;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@@ -48,7 +48,6 @@ public class DnsPingService {
|
||||
switch (changes) {
|
||||
case DnsRecordChange.None _ -> {}
|
||||
case DnsRecordChange.Changed changed -> {
|
||||
logger.info("DNS record for {} changed: {}", newRecord.dnsRootDomainId(), changed);
|
||||
generatedRecords.add(DomainDnsEvent.builder()
|
||||
.rootDomainId(newRecord.dnsRootDomainId())
|
||||
.nodeId(newRecord.nodeAffinity())
|
||||
|
@@ -8,6 +8,7 @@ import nu.marginalia.ping.ssl.PKIXValidationResult;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.cert.CertificateEncodingException;
|
||||
@@ -21,13 +22,17 @@ public class DomainSecurityInformationFactory {
|
||||
private static final Logger logger = LoggerFactory.getLogger(DomainSecurityInformationFactory.class);
|
||||
|
||||
// Vanilla HTTP (not HTTPS) response does not have SSL session information, so we return null
|
||||
public DomainSecurityRecord createHttpSecurityInformation(HttpResponse httpResponse, int domainId, int nodeId) {
|
||||
public DomainSecurityRecord createHttpSecurityInformation(HttpResponse httpResponse,
|
||||
int domainId, int nodeId,
|
||||
@Nullable Integer asn
|
||||
) {
|
||||
|
||||
var headers = httpResponse.headers();
|
||||
|
||||
return DomainSecurityRecord.builder()
|
||||
.domainId(domainId)
|
||||
.nodeId(nodeId)
|
||||
.asn(asn)
|
||||
.httpSchema(HttpSchema.HTTP)
|
||||
.httpVersion(httpResponse.version())
|
||||
.headerServer(headers.getFirst("Server"))
|
||||
@@ -47,7 +52,13 @@ public class DomainSecurityInformationFactory {
|
||||
}
|
||||
|
||||
// HTTPS response
|
||||
public DomainSecurityRecord createHttpsSecurityInformation(HttpsResponse httpResponse, PKIXValidationResult validationResult, int domainId, int nodeId) {
|
||||
public DomainSecurityRecord createHttpsSecurityInformation(
|
||||
HttpsResponse httpResponse,
|
||||
PKIXValidationResult validationResult,
|
||||
int domainId,
|
||||
int nodeId,
|
||||
@Nullable Integer asn
|
||||
) {
|
||||
|
||||
|
||||
var headers = httpResponse.headers();
|
||||
@@ -86,6 +97,7 @@ public class DomainSecurityInformationFactory {
|
||||
return DomainSecurityRecord.builder()
|
||||
.domainId(domainId)
|
||||
.nodeId(nodeId)
|
||||
.asn(asn)
|
||||
.httpSchema(HttpSchema.HTTPS)
|
||||
.headerServer(headers.getFirst("Server"))
|
||||
.headerCorsAllowOrigin(headers.getFirst("Access-Control-Allow-Origin"))
|
||||
|
@@ -2,6 +2,7 @@ package nu.marginalia.ping.svc;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.coordination.DomainCoordinator;
|
||||
import nu.marginalia.ping.fetcher.PingHttpFetcher;
|
||||
import nu.marginalia.ping.fetcher.response.*;
|
||||
import nu.marginalia.ping.model.*;
|
||||
@@ -18,6 +19,7 @@ import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.sql.SQLException;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
@@ -25,6 +27,7 @@ import java.util.List;
|
||||
@Singleton
|
||||
public class HttpPingService {
|
||||
|
||||
private final DomainCoordinator domainCoordinator;
|
||||
private final PingHttpFetcher pingHttpFetcher;
|
||||
|
||||
private final DomainAvailabilityInformationFactory domainAvailabilityInformationFactory;
|
||||
@@ -35,9 +38,11 @@ public class HttpPingService {
|
||||
|
||||
@Inject
|
||||
public HttpPingService(
|
||||
DomainCoordinator domainCoordinator,
|
||||
PingHttpFetcher pingHttpFetcher,
|
||||
DomainAvailabilityInformationFactory domainAvailabilityInformationFactory,
|
||||
DomainSecurityInformationFactory domainSecurityInformationFactory) throws Exception {
|
||||
this.domainCoordinator = domainCoordinator;
|
||||
this.pingHttpFetcher = pingHttpFetcher;
|
||||
this.domainAvailabilityInformationFactory = domainAvailabilityInformationFactory;
|
||||
this.domainSecurityInformationFactory = domainSecurityInformationFactory;
|
||||
@@ -58,7 +63,8 @@ public class HttpPingService {
|
||||
|
||||
public List<WritableModel> pingDomain(DomainReference domainReference,
|
||||
@Nullable DomainAvailabilityRecord oldPingStatus,
|
||||
@Nullable DomainSecurityRecord oldSecurityInformation) throws SQLException {
|
||||
@Nullable DomainSecurityRecord oldSecurityInformation) throws SQLException, InterruptedException {
|
||||
|
||||
// First we figure out if the domain maps to an IP address
|
||||
|
||||
List<WritableModel> generatedRecords = new ArrayList<>();
|
||||
@@ -68,26 +74,31 @@ public class HttpPingService {
|
||||
|
||||
if (ipAddress.isEmpty()) {
|
||||
result = new UnknownHostError();
|
||||
}
|
||||
else {
|
||||
String url = "https://" + domainReference.domainName() + "/";
|
||||
String alternateUrl = "http://" + domainReference.domainName() + "/";
|
||||
} else {
|
||||
// lock the domain to prevent concurrent pings
|
||||
try (var _ = domainCoordinator.lockDomain(domainReference.asEdgeDomain())) {
|
||||
String url = "https://" + domainReference.domainName() + "/";
|
||||
String alternateUrl = "http://" + domainReference.domainName() + "/";
|
||||
|
||||
result = pingHttpFetcher.fetchUrl(url, Method.HEAD, null, null);
|
||||
result = pingHttpFetcher.fetchUrl(url, Method.HEAD, null, null);
|
||||
|
||||
if (result instanceof HttpsResponse response && response.httpStatus() == 405) {
|
||||
// If we get a 405, we try the GET method instead as not all servers support HEAD requests
|
||||
result = pingHttpFetcher.fetchUrl(url, Method.GET, null, null);
|
||||
}
|
||||
else if (result instanceof ConnectionError) {
|
||||
var result2 = pingHttpFetcher.fetchUrl(alternateUrl, Method.HEAD, null, null);
|
||||
if (!(result2 instanceof ConnectionError)) {
|
||||
result = result2;
|
||||
}
|
||||
if (result instanceof HttpResponse response && response.httpStatus() == 405) {
|
||||
// If we get a 405, we try the GET method instead as not all servers support HEAD requests
|
||||
result = pingHttpFetcher.fetchUrl(alternateUrl, Method.GET, null, null);
|
||||
if (result instanceof HttpsResponse response && shouldTryGET(response.httpStatus())) {
|
||||
sleep(Duration.ofSeconds(2));
|
||||
result = pingHttpFetcher.fetchUrl(url, Method.GET, null, null);
|
||||
} else if (result instanceof ConnectionError) {
|
||||
var result2 = pingHttpFetcher.fetchUrl(alternateUrl, Method.HEAD, null, null);
|
||||
if (!(result2 instanceof ConnectionError)) {
|
||||
result = result2;
|
||||
}
|
||||
if (result instanceof HttpResponse response && shouldTryGET(response.httpStatus())) {
|
||||
sleep(Duration.ofSeconds(2));
|
||||
result = pingHttpFetcher.fetchUrl(alternateUrl, Method.GET, null, null);
|
||||
}
|
||||
}
|
||||
|
||||
// Add a grace sleep before we yield the semaphore, so that another thread doesn't
|
||||
// immediately hammer the same domain after it's released.
|
||||
sleep(Duration.ofSeconds(1));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,7 +127,7 @@ public class HttpPingService {
|
||||
domainReference.nodeId(),
|
||||
oldPingStatus,
|
||||
ErrorClassification.CONNECTION_ERROR,
|
||||
null);
|
||||
rsp.errorMessage());
|
||||
newSecurityInformation = null;
|
||||
}
|
||||
case TimeoutResponse rsp -> {
|
||||
@@ -134,7 +145,7 @@ public class HttpPingService {
|
||||
domainReference.nodeId(),
|
||||
oldPingStatus,
|
||||
ErrorClassification.HTTP_CLIENT_ERROR,
|
||||
null);
|
||||
rsp.errorMessage());
|
||||
newSecurityInformation = null;
|
||||
}
|
||||
case HttpResponse httpResponse -> {
|
||||
@@ -148,7 +159,8 @@ public class HttpPingService {
|
||||
newSecurityInformation = domainSecurityInformationFactory.createHttpSecurityInformation(
|
||||
httpResponse,
|
||||
domainReference.domainId(),
|
||||
domainReference.nodeId()
|
||||
domainReference.nodeId(),
|
||||
newPingStatus.asn()
|
||||
);
|
||||
}
|
||||
case HttpsResponse httpsResponse -> {
|
||||
@@ -166,7 +178,8 @@ public class HttpPingService {
|
||||
httpsResponse,
|
||||
validationResult,
|
||||
domainReference.domainId(),
|
||||
domainReference.nodeId()
|
||||
domainReference.nodeId(),
|
||||
newPingStatus.asn()
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -183,13 +196,36 @@ public class HttpPingService {
|
||||
}
|
||||
if (oldSecurityInformation != null && newSecurityInformation != null) {
|
||||
compareSecurityInformation(generatedRecords,
|
||||
oldSecurityInformation, oldPingStatus,
|
||||
newSecurityInformation, newPingStatus);
|
||||
oldSecurityInformation, oldPingStatus,
|
||||
newSecurityInformation, newPingStatus);
|
||||
}
|
||||
|
||||
return generatedRecords;
|
||||
}
|
||||
|
||||
private boolean shouldTryGET(int statusCode) {
|
||||
if (statusCode < 400) {
|
||||
return false;
|
||||
}
|
||||
if (statusCode == 429) { // Too many requests, we should not retry with GET
|
||||
return false;
|
||||
}
|
||||
|
||||
// For all other status codes, we can try a GET request, as many severs do not
|
||||
// cope with HEAD requests properly.
|
||||
|
||||
return statusCode < 600;
|
||||
}
|
||||
|
||||
private void sleep(Duration duration) {
|
||||
try {
|
||||
Thread.sleep(duration.toMillis());
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt(); // Restore the interrupted status
|
||||
logger.warn("Sleep interrupted", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void comparePingStatuses(List<WritableModel> generatedRecords,
|
||||
DomainAvailabilityRecord oldPingStatus,
|
||||
DomainAvailabilityRecord newPingStatus) {
|
||||
@@ -258,6 +294,9 @@ public class HttpPingService {
|
||||
change.isCertificateProfileChanged(),
|
||||
change.isCertificateSanChanged(),
|
||||
change.isCertificatePublicKeyChanged(),
|
||||
change.isCertificateSerialNumberChanged(),
|
||||
change.isCertificateIssuerChanged(),
|
||||
change.schemaChange(),
|
||||
change.oldCertificateTimeToExpiry(),
|
||||
change.isSecurityHeadersChanged(),
|
||||
change.isIpAddressChanged(),
|
||||
|
@@ -2,6 +2,7 @@ package nu.marginalia.ping;
|
||||
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import nu.marginalia.coordination.LocalDomainCoordinator;
|
||||
import nu.marginalia.geoip.GeoIpDictionary;
|
||||
import nu.marginalia.ping.fetcher.PingDnsFetcher;
|
||||
import nu.marginalia.ping.fetcher.PingHttpFetcher;
|
||||
@@ -85,11 +86,14 @@ class AvailabilityJobSchedulerTest {
|
||||
DomainDnsInformationFactory dnsDomainInformationFactory = new DomainDnsInformationFactory(processConfig, pic);
|
||||
|
||||
PingJobScheduler pingJobScheduler = new PingJobScheduler(
|
||||
new HttpPingService(pingHttpFetcher,
|
||||
new HttpPingService(
|
||||
new LocalDomainCoordinator(),
|
||||
pingHttpFetcher,
|
||||
new DomainAvailabilityInformationFactory(new GeoIpDictionary(), new BackoffStrategy(pic)),
|
||||
new DomainSecurityInformationFactory()),
|
||||
new DnsPingService(new PingDnsFetcher(List.of("8.8.8.8", "8.8.4.4")),
|
||||
dnsDomainInformationFactory),
|
||||
new LocalDomainCoordinator(),
|
||||
pingDao
|
||||
);
|
||||
|
||||
|
@@ -318,6 +318,9 @@ class PingDaoTest {
|
||||
true,
|
||||
false,
|
||||
true,
|
||||
true,
|
||||
false,
|
||||
SchemaChange.NO_CHANGE,
|
||||
Duration.ofDays(30),
|
||||
false,
|
||||
false,
|
||||
|
@@ -2,6 +2,7 @@ package nu.marginalia.ping;
|
||||
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import nu.marginalia.coordination.LocalDomainCoordinator;
|
||||
import nu.marginalia.geoip.GeoIpDictionary;
|
||||
import nu.marginalia.ping.fetcher.PingHttpFetcher;
|
||||
import nu.marginalia.ping.io.HttpClientProvider;
|
||||
@@ -63,6 +64,7 @@ class PingHttpServiceTest {
|
||||
public void testGetSslInfo() throws Exception {
|
||||
var provider = new HttpClientProvider();
|
||||
var pingService = new HttpPingService(
|
||||
new LocalDomainCoordinator(),
|
||||
new PingHttpFetcher(provider.get()),
|
||||
new DomainAvailabilityInformationFactory(new GeoIpDictionary(),
|
||||
new BackoffStrategy(PingModule.createPingIntervalsConfiguration())
|
||||
|
@@ -1,9 +1,8 @@
|
||||
package nu.marginalia.mqapi.ping;
|
||||
|
||||
public class PingRequest {
|
||||
public final String runClass;
|
||||
|
||||
public PingRequest(String runClass) {
|
||||
this.runClass = runClass;
|
||||
public PingRequest() {
|
||||
|
||||
}
|
||||
}
|
||||
|
@@ -37,6 +37,7 @@ dependencies {
|
||||
implementation project(':code:functions:domain-info')
|
||||
implementation project(':code:functions:domain-info:api')
|
||||
|
||||
implementation project(':code:libraries:domain-lock')
|
||||
implementation project(':code:libraries:geo-ip')
|
||||
implementation project(':code:libraries:language-processing')
|
||||
implementation project(':code:libraries:term-frequency-dict')
|
||||
|
@@ -5,6 +5,7 @@ import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
import io.jooby.ExecutionMode;
|
||||
import io.jooby.Jooby;
|
||||
import nu.marginalia.coordination.DomainCoordinationModule;
|
||||
import nu.marginalia.livecapture.LivecaptureModule;
|
||||
import nu.marginalia.service.MainClass;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
@@ -29,6 +30,7 @@ public class AssistantMain extends MainClass {
|
||||
Injector injector = Guice.createInjector(
|
||||
new AssistantModule(),
|
||||
new LivecaptureModule(),
|
||||
new DomainCoordinationModule(),
|
||||
new ServiceConfigurationModule(ServiceId.Assistant),
|
||||
new ServiceDiscoveryModule(),
|
||||
new DatabaseModule(false)
|
||||
|
@@ -55,6 +55,7 @@ include 'code:libraries:braille-block-punch-cards'
|
||||
include 'code:libraries:language-processing'
|
||||
include 'code:libraries:term-frequency-dict'
|
||||
include 'code:libraries:test-helpers'
|
||||
include 'code:libraries:domain-lock'
|
||||
|
||||
include 'code:libraries:message-queue'
|
||||
|
||||
|
Reference in New Issue
Block a user