1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

2 Commits

Author SHA1 Message Date
Viktor Lofgren
db7930016a (coordination) Trial the use of zookeeper for coordinating semaphores across multiple crawler-like processes
+ fix two broken tests
2025-06-14 16:20:01 +02:00
Viktor Lofgren
82456ad673 (coordination) Trial the use of zookeeper for coordinating semaphores across multiple crawler-like processes
The performance implication of this needs to be evaluated.  If it does not hold water. some other solution may be required instead.
2025-06-14 16:16:10 +02:00
28 changed files with 325 additions and 198 deletions

View File

@@ -5,12 +5,10 @@ import nu.marginalia.service.discovery.monitor.ServiceChangeMonitor;
import nu.marginalia.service.discovery.monitor.ServiceMonitorIf; import nu.marginalia.service.discovery.monitor.ServiceMonitorIf;
import nu.marginalia.service.discovery.property.ServiceEndpoint; import nu.marginalia.service.discovery.property.ServiceEndpoint;
import nu.marginalia.service.discovery.property.ServiceKey; import nu.marginalia.service.discovery.property.ServiceKey;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static nu.marginalia.service.discovery.property.ServiceEndpoint.InstanceAddress; import static nu.marginalia.service.discovery.property.ServiceEndpoint.InstanceAddress;
@@ -66,6 +64,6 @@ public interface ServiceRegistryIf {
void registerProcess(String processName, int nodeId); void registerProcess(String processName, int nodeId);
void deregisterProcess(String processName, int nodeId); void deregisterProcess(String processName, int nodeId);
void watchProcess(String processName, int nodeId, Consumer<Boolean> callback) throws Exception;
void watchProcessAnyNode(String processName, Collection<Integer> nodes, BiConsumer<Boolean, Integer> callback) throws Exception; InterProcessSemaphoreV2 getSemaphore(String name, int permits) throws Exception;
} }

View File

@@ -6,6 +6,7 @@ import nu.marginalia.service.discovery.monitor.ServiceMonitorIf;
import nu.marginalia.service.discovery.property.ServiceEndpoint; import nu.marginalia.service.discovery.property.ServiceEndpoint;
import nu.marginalia.service.discovery.property.ServiceKey; import nu.marginalia.service.discovery.property.ServiceKey;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher;
@@ -13,10 +14,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.*; import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static nu.marginalia.service.discovery.property.ServiceEndpoint.InstanceAddress; import static nu.marginalia.service.discovery.property.ServiceEndpoint.InstanceAddress;
@@ -283,60 +285,12 @@ public class ZkServiceRegistry implements ServiceRegistryIf {
} }
@Override @Override
public void watchProcess(String processName, int nodeId, Consumer<Boolean> callback) throws Exception { public InterProcessSemaphoreV2 getSemaphore(String name, int permits) {
String path = "/process-locks/" + processName + "/" + nodeId; if (stopped)
throw new IllegalStateException("Service registry is stopped, cannot get semaphore " + name);
// first check if the path exists and call the callback accordingly String path = "/semaphores/" + name;
return new InterProcessSemaphoreV2(curatorFramework, path, permits);
if (curatorFramework.checkExists().forPath(path) != null) {
callback.accept(true);
}
else {
callback.accept(false);
}
curatorFramework.watchers().add()
.usingWatcher((Watcher) change -> {
Watcher.Event.EventType type = change.getType();
if (type == Watcher.Event.EventType.NodeCreated) {
callback.accept(true);
}
if (type == Watcher.Event.EventType.NodeDeleted) {
callback.accept(false);
}
})
.forPath(path);
}
@Override
public void watchProcessAnyNode(String processName, Collection<Integer> nodes, BiConsumer<Boolean, Integer> callback) throws Exception {
for (int node : nodes) {
String path = "/process-locks/" + processName + "/" + node;
// first check if the path exists and call the callback accordingly
if (curatorFramework.checkExists().forPath(path) != null) {
callback.accept(true, node);
}
else {
callback.accept(false, node);
}
curatorFramework.watchers().add()
.usingWatcher((Watcher) change -> {
Watcher.Event.EventType type = change.getType();
if (type == Watcher.Event.EventType.NodeCreated) {
callback.accept(true, node);
}
if (type == Watcher.Event.EventType.NodeDeleted) {
callback.accept(false, node);
}
})
.forPath(path);
}
} }
/* Exposed for tests */ /* Exposed for tests */

View File

@@ -22,6 +22,7 @@ dependencies {
implementation project(':code:common:db') implementation project(':code:common:db')
implementation project(':code:libraries:blocking-thread-pool') implementation project(':code:libraries:blocking-thread-pool')
implementation project(':code:libraries:message-queue') implementation project(':code:libraries:message-queue')
implementation project(':code:libraries:domain-lock')
implementation project(':code:execution:api') implementation project(':code:execution:api')
implementation project(':code:processes:crawling-process:ft-content-type') implementation project(':code:processes:crawling-process:ft-content-type')

View File

@@ -1,66 +0,0 @@
package nu.marginalia.rss.svc;
import nu.marginalia.model.EdgeDomain;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
/** Holds lock objects for each domain, to prevent multiple threads from
* crawling the same domain at the same time.
*/
public class DomainLocks {
// The locks are stored in a map, with the domain name as the key. This map will grow
// relatively big, but should be manageable since the number of domains is limited to
// a few hundred thousand typically.
private final Map<String, Semaphore> locks = new ConcurrentHashMap<>();
/** Returns a lock object corresponding to the given domain. The object is returned as-is,
* and may be held by another thread. The caller is responsible for locking and releasing the lock.
*/
public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException {
return new DomainLock(domain.toString(),
locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits));
}
private Semaphore defaultPermits(String topDomain) {
if (topDomain.equals("wordpress.com"))
return new Semaphore(16);
if (topDomain.equals("blogspot.com"))
return new Semaphore(8);
if (topDomain.equals("neocities.org"))
return new Semaphore(4);
if (topDomain.equals("github.io"))
return new Semaphore(4);
if (topDomain.equals("substack.com")) {
return new Semaphore(1);
}
if (topDomain.endsWith(".edu")) {
return new Semaphore(1);
}
return new Semaphore(2);
}
public static class DomainLock implements AutoCloseable {
private final String domainName;
private final Semaphore semaphore;
DomainLock(String domainName, Semaphore semaphore) throws InterruptedException {
this.domainName = domainName;
this.semaphore = semaphore;
Thread.currentThread().setName("fetching:" + domainName + " [await domain lock]");
semaphore.acquire();
Thread.currentThread().setName("fetching:" + domainName);
}
@Override
public void close() {
semaphore.release();
Thread.currentThread().setName("fetching:" + domainName + " [wrapping up]");
}
}
}

View File

@@ -5,6 +5,8 @@ import com.opencsv.CSVReader;
import nu.marginalia.WmsaHome; import nu.marginalia.WmsaHome;
import nu.marginalia.contenttype.ContentType; import nu.marginalia.contenttype.ContentType;
import nu.marginalia.contenttype.DocumentBodyToString; import nu.marginalia.contenttype.DocumentBodyToString;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.coordination.DomainLock;
import nu.marginalia.executor.client.ExecutorClient; import nu.marginalia.executor.client.ExecutorClient;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.nodecfg.NodeConfigurationService; import nu.marginalia.nodecfg.NodeConfigurationService;
@@ -51,12 +53,13 @@ public class FeedFetcherService {
private final ServiceHeartbeat serviceHeartbeat; private final ServiceHeartbeat serviceHeartbeat;
private final ExecutorClient executorClient; private final ExecutorClient executorClient;
private final DomainLocks domainLocks = new DomainLocks(); private final DomainCoordinator domainCoordinator;
private volatile boolean updating; private volatile boolean updating;
@Inject @Inject
public FeedFetcherService(FeedDb feedDb, public FeedFetcherService(FeedDb feedDb,
DomainCoordinator domainCoordinator,
FileStorageService fileStorageService, FileStorageService fileStorageService,
NodeConfigurationService nodeConfigurationService, NodeConfigurationService nodeConfigurationService,
ServiceHeartbeat serviceHeartbeat, ServiceHeartbeat serviceHeartbeat,
@@ -67,6 +70,7 @@ public class FeedFetcherService {
this.nodeConfigurationService = nodeConfigurationService; this.nodeConfigurationService = nodeConfigurationService;
this.serviceHeartbeat = serviceHeartbeat; this.serviceHeartbeat = serviceHeartbeat;
this.executorClient = executorClient; this.executorClient = executorClient;
this.domainCoordinator = domainCoordinator;
} }
public enum UpdateMode { public enum UpdateMode {
@@ -132,7 +136,7 @@ public class FeedFetcherService {
}; };
FetchResult feedData; FetchResult feedData;
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) { try (DomainLock domainLock = domainCoordinator.lockDomain(new EdgeDomain(feed.domain()))) {
feedData = fetchFeedData(feed, client, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag); feedData = fetchFeedData(feed, client, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
} catch (Exception ex) { } catch (Exception ex) {
feedData = new FetchResult.TransientError(); feedData = new FetchResult.TransientError();

View File

@@ -0,0 +1,32 @@
plugins {
id 'java'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
}
}
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation libs.bundles.slf4j
implementation project(':code:common:model')
implementation project(':code:common:config')
implementation project(':code:common:service')
implementation libs.bundles.curator
implementation libs.guava
implementation dependencies.create(libs.guice.get()) {
exclude group: 'com.google.guava'
}
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}
test {
useJUnitPlatform()
}

View File

@@ -0,0 +1,32 @@
package nu.marginalia.coordination;
import nu.marginalia.model.EdgeDomain;
public class DefaultDomainPermits {
public static int defaultPermits(EdgeDomain domain) {
return defaultPermits(domain.topDomain.toLowerCase());
}
public static int defaultPermits(String topDomain) {
if (topDomain.equals("wordpress.com"))
return 16;
if (topDomain.equals("blogspot.com"))
return 8;
if (topDomain.equals("tumblr.com"))
return 8;
if (topDomain.equals("neocities.org"))
return 8;
if (topDomain.equals("github.io"))
return 8;
// Substack really dislikes broad-scale crawlers, so we need to be careful
// to not get blocked.
if (topDomain.equals("substack.com")) {
return 1;
}
return 2;
}
}

View File

@@ -0,0 +1,17 @@
package nu.marginalia.coordination;
import com.google.inject.AbstractModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DomainCoordinationModule extends AbstractModule {
private static final Logger logger = LoggerFactory.getLogger(DomainCoordinationModule.class);
public DomainCoordinationModule() {
}
public void configure() {
bind(DomainCoordinator.class).to(ZookeeperDomainCoordinator.class);
}
}

View File

@@ -0,0 +1,13 @@
package nu.marginalia.coordination;
import nu.marginalia.model.EdgeDomain;
import java.time.Duration;
import java.util.Optional;
public interface DomainCoordinator {
DomainLock lockDomain(EdgeDomain domain) throws InterruptedException;
Optional<DomainLock> tryLockDomain(EdgeDomain domain, Duration timeout) throws InterruptedException;
Optional<DomainLock> tryLockDomain(EdgeDomain domain) throws InterruptedException;
boolean isLockableHint(EdgeDomain domain);
}

View File

@@ -0,0 +1,5 @@
package nu.marginalia.coordination;
public interface DomainLock extends AutoCloseable {
void close();
}

View File

@@ -1,16 +1,15 @@
package nu.marginalia.crawl.logic; package nu.marginalia.coordination;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/** Holds lock objects for each domain, to prevent multiple threads from public class LocalDomainCoordinator implements DomainCoordinator {
* crawling the same domain at the same time.
*/
public class DomainLocks {
// The locks are stored in a map, with the domain name as the key. This map will grow // The locks are stored in a map, with the domain name as the key. This map will grow
// relatively big, but should be manageable since the number of domains is limited to // relatively big, but should be manageable since the number of domains is limited to
// a few hundred thousand typically. // a few hundred thousand typically.
@@ -24,13 +23,25 @@ public class DomainLocks {
sem.acquire(); sem.acquire();
return new DomainLock(sem); return new LocalDomainLock(sem);
} }
public Optional<DomainLock> tryLockDomain(EdgeDomain domain) { public Optional<DomainLock> tryLockDomain(EdgeDomain domain) {
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits); var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
if (sem.tryAcquire(1)) { if (sem.tryAcquire(1)) {
return Optional.of(new DomainLock(sem)); return Optional.of(new LocalDomainLock(sem));
}
else {
// We don't have a lock, so we return an empty optional
return Optional.empty();
}
}
public Optional<DomainLock> tryLockDomain(EdgeDomain domain, Duration timeout) throws InterruptedException {
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
if (sem.tryAcquire(1, timeout.toMillis(), TimeUnit.MILLISECONDS)) {
return Optional.of(new LocalDomainLock(sem));
} }
else { else {
// We don't have a lock, so we return an empty optional // We don't have a lock, so we return an empty optional
@@ -39,24 +50,7 @@ public class DomainLocks {
} }
private Semaphore defaultPermits(String topDomain) { private Semaphore defaultPermits(String topDomain) {
if (topDomain.equals("wordpress.com")) return new Semaphore(DefaultDomainPermits.defaultPermits(topDomain));
return new Semaphore(16);
if (topDomain.equals("blogspot.com"))
return new Semaphore(8);
if (topDomain.equals("tumblr.com"))
return new Semaphore(8);
if (topDomain.equals("neocities.org"))
return new Semaphore(8);
if (topDomain.equals("github.io"))
return new Semaphore(8);
// Substack really dislikes broad-scale crawlers, so we need to be careful
// to not get blocked.
if (topDomain.equals("substack.com")) {
return new Semaphore(1);
}
return new Semaphore(2);
} }
/** Returns true if the domain is lockable, i.e. if it is not already locked by another thread. /** Returns true if the domain is lockable, i.e. if it is not already locked by another thread.
@@ -71,15 +65,15 @@ public class DomainLocks {
return sem.availablePermits() > 0; return sem.availablePermits() > 0;
} }
public static class DomainLock implements AutoCloseable { public static class LocalDomainLock implements DomainLock {
private final Semaphore semaphore; private final Semaphore semaphore;
DomainLock(Semaphore semaphore) { LocalDomainLock(Semaphore semaphore) {
this.semaphore = semaphore; this.semaphore = semaphore;
} }
@Override @Override
public void close() throws Exception { public void close() {
semaphore.release(); semaphore.release();
Thread.currentThread().setName("[idle]"); Thread.currentThread().setName("[idle]");
} }

View File

@@ -0,0 +1,101 @@
package nu.marginalia.coordination;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.service.discovery.ServiceRegistryIf;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
import org.apache.curator.framework.recipes.locks.Lease;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class ZookeeperDomainCoordinator implements DomainCoordinator {
// The locks are stored in a map, with the domain name as the key. This map will grow
// relatively big, but should be manageable since the number of domains is limited to
// a few hundred thousand typically.
private final Map<String, InterProcessSemaphoreV2> locks = new ConcurrentHashMap<>();
private final ServiceRegistryIf serviceRegistry;
private final int nodeId;
@Inject
public ZookeeperDomainCoordinator(ServiceRegistryIf serviceRegistry, @Named("wmsa-system-node") int nodeId) {
// Zookeeper-specific initialization can be done here if needed
this.serviceRegistry = serviceRegistry;
this.nodeId = nodeId;
}
/** Returns a lock object corresponding to the given domain. The object is returned as-is,
* and may be held by another thread. The caller is responsible for locking and releasing the lock.
*/
public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException {
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::createSemapore);
try {
var lease = sem.acquire();
return new ZkDomainLock(sem, lease);
}
catch (Exception e) {
throw new RuntimeException("Failed to acquire lock for domain: " + domain.topDomain, e);
}
}
public Optional<DomainLock> tryLockDomain(EdgeDomain domain) throws InterruptedException {
return tryLockDomain(domain, Duration.ofSeconds(1)); // Underlying semaphore doesn't have a tryLock method, so we use a short timeout
}
public Optional<DomainLock> tryLockDomain(EdgeDomain domain, Duration timeout) throws InterruptedException {
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::createSemapore);
try {
var lease = sem.acquire(timeout.toMillis(), TimeUnit.MILLISECONDS); // Acquire with timeout
if (lease != null) {
return Optional.of(new ZkDomainLock(sem, lease));
}
else {
return Optional.empty(); // If we fail to acquire the lease, we return an empty optional
}
}
catch (Exception e) {
return Optional.empty(); // If we fail to acquire the lock, we return an empty optional
}
}
private InterProcessSemaphoreV2 createSemapore(String topDomain){
try {
return serviceRegistry.getSemaphore(topDomain + ":" + nodeId, DefaultDomainPermits.defaultPermits(topDomain));
}
catch (Exception e) {
throw new RuntimeException("Failed to get semaphore for domain: " + topDomain, e);
}
}
/** Returns true if the domain is lockable, i.e. if it is not already locked by another thread.
* (this is just a hint, and does not guarantee that the domain is actually lockable any time
* after this method returns true)
*/
public boolean isLockableHint(EdgeDomain domain) {
return true; // Curator does not provide a way to check if a lock is available without acquiring it
}
public static class ZkDomainLock implements DomainLock {
private final InterProcessSemaphoreV2 semaphore;
private final Lease lease;
ZkDomainLock(InterProcessSemaphoreV2 semaphore, Lease lease) {
this.semaphore = semaphore;
this.lease = lease;
}
@Override
public void close() {
semaphore.returnLease(lease);
}
}
}

View File

@@ -32,6 +32,7 @@ dependencies {
implementation project(':code:libraries:message-queue') implementation project(':code:libraries:message-queue')
implementation project(':code:libraries:language-processing') implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:easy-lsh') implementation project(':code:libraries:easy-lsh')
implementation project(':code:libraries:domain-lock')
implementation project(':code:processes:crawling-process:model') implementation project(':code:processes:crawling-process:model')
implementation project(':code:processes:crawling-process:model') implementation project(':code:processes:crawling-process:model')

View File

@@ -10,9 +10,11 @@ import nu.marginalia.WmsaHome;
import nu.marginalia.atags.model.DomainLinks; import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.atags.source.AnchorTagsSource; import nu.marginalia.atags.source.AnchorTagsSource;
import nu.marginalia.atags.source.AnchorTagsSourceFactory; import nu.marginalia.atags.source.AnchorTagsSourceFactory;
import nu.marginalia.coordination.DomainCoordinationModule;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.coordination.DomainLock;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.logic.DomainLocks;
import nu.marginalia.crawl.retreival.CrawlDataReference; import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.CrawlerRetreiver; import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.DomainProber; import nu.marginalia.crawl.retreival.DomainProber;
@@ -68,7 +70,7 @@ public class CrawlerMain extends ProcessMainClass {
private final ServiceRegistryIf serviceRegistry; private final ServiceRegistryIf serviceRegistry;
private final SimpleBlockingThreadPool pool; private final SimpleBlockingThreadPool pool;
private final DomainLocks domainLocks = new DomainLocks(); private final DomainCoordinator domainCoordinator;
private final Map<String, CrawlTask> pendingCrawlTasks = new ConcurrentHashMap<>(); private final Map<String, CrawlTask> pendingCrawlTasks = new ConcurrentHashMap<>();
@@ -97,6 +99,7 @@ public class CrawlerMain extends ProcessMainClass {
WarcArchiverFactory warcArchiverFactory, WarcArchiverFactory warcArchiverFactory,
HikariDataSource dataSource, HikariDataSource dataSource,
DomainBlacklist blacklist, DomainBlacklist blacklist,
DomainCoordinator domainCoordinator,
ServiceRegistryIf serviceRegistry, ServiceRegistryIf serviceRegistry,
Gson gson) throws InterruptedException { Gson gson) throws InterruptedException {
@@ -114,6 +117,7 @@ public class CrawlerMain extends ProcessMainClass {
this.blacklist = blacklist; this.blacklist = blacklist;
this.node = processConfiguration.node(); this.node = processConfiguration.node();
this.serviceRegistry = serviceRegistry; this.serviceRegistry = serviceRegistry;
this.domainCoordinator = domainCoordinator;
SimpleBlockingThreadPool.ThreadType threadType; SimpleBlockingThreadPool.ThreadType threadType;
if (Boolean.getBoolean("crawler.useVirtualThreads")) { if (Boolean.getBoolean("crawler.useVirtualThreads")) {
@@ -157,6 +161,7 @@ public class CrawlerMain extends ProcessMainClass {
new CrawlerModule(), new CrawlerModule(),
new ProcessConfigurationModule("crawler"), new ProcessConfigurationModule("crawler"),
new ServiceDiscoveryModule(), new ServiceDiscoveryModule(),
new DomainCoordinationModule(),
new DatabaseModule(false) new DatabaseModule(false)
); );
var crawler = injector.getInstance(CrawlerMain.class); var crawler = injector.getInstance(CrawlerMain.class);
@@ -451,7 +456,7 @@ public class CrawlerMain extends ProcessMainClass {
/** Best effort indicator whether we could start this now without getting stuck in /** Best effort indicator whether we could start this now without getting stuck in
* DomainLocks purgatory */ * DomainLocks purgatory */
public boolean canRun() { public boolean canRun() {
return domainLocks.isLockableHint(new EdgeDomain(domain)); return domainCoordinator.isLockableHint(new EdgeDomain(domain));
} }
@Override @Override
@@ -462,7 +467,7 @@ public class CrawlerMain extends ProcessMainClass {
return; return;
} }
Optional<DomainLocks.DomainLock> lock = domainLocks.tryLockDomain(new EdgeDomain(domain)); Optional<DomainLock> lock = domainCoordinator.tryLockDomain(new EdgeDomain(domain));
// We don't have a lock, so we can't run this task // We don't have a lock, so we can't run this task
// we return to avoid blocking the pool for too long // we return to avoid blocking the pool for too long
if (lock.isEmpty()) { if (lock.isEmpty()) {
@@ -470,7 +475,7 @@ public class CrawlerMain extends ProcessMainClass {
retryQueue.put(this); retryQueue.put(this);
return; return;
} }
DomainLocks.DomainLock domainLock = lock.get(); DomainLock domainLock = lock.get();
try (domainLock) { try (domainLock) {
Thread.currentThread().setName("crawling:" + domain); Thread.currentThread().setName("crawling:" + domain);

View File

@@ -32,6 +32,7 @@ dependencies {
implementation project(':code:index:api') implementation project(':code:index:api')
implementation project(':code:processes:process-mq-api') implementation project(':code:processes:process-mq-api')
implementation project(':code:libraries:message-queue') implementation project(':code:libraries:message-queue')
implementation project(':code:libraries:domain-lock')
implementation project(':code:libraries:language-processing') implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:easy-lsh') implementation project(':code:libraries:easy-lsh')
implementation project(':code:processes:crawling-process') implementation project(':code:processes:crawling-process')

View File

@@ -10,6 +10,8 @@ import nu.marginalia.api.feeds.FeedsClient;
import nu.marginalia.converting.ConverterModule; import nu.marginalia.converting.ConverterModule;
import nu.marginalia.converting.processor.DomainProcessor; import nu.marginalia.converting.processor.DomainProcessor;
import nu.marginalia.converting.writer.ConverterBatchWriter; import nu.marginalia.converting.writer.ConverterBatchWriter;
import nu.marginalia.coordination.DomainCoordinationModule;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.db.DbDomainQueries; import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.db.DomainBlacklist; import nu.marginalia.db.DomainBlacklist;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
@@ -58,6 +60,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
private final FileStorageService fileStorageService; private final FileStorageService fileStorageService;
private final KeywordLoaderService keywordLoaderService; private final KeywordLoaderService keywordLoaderService;
private final DocumentLoaderService documentLoaderService; private final DocumentLoaderService documentLoaderService;
private final DomainCoordinator domainCoordinator;
private final HikariDataSource dataSource; private final HikariDataSource dataSource;
@Inject @Inject
@@ -71,7 +74,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
DomainProcessor domainProcessor, DomainProcessor domainProcessor,
FileStorageService fileStorageService, FileStorageService fileStorageService,
KeywordLoaderService keywordLoaderService, KeywordLoaderService keywordLoaderService,
DocumentLoaderService documentLoaderService, HikariDataSource dataSource) DocumentLoaderService documentLoaderService, DomainCoordinator domainCoordinator, HikariDataSource dataSource)
throws Exception throws Exception
{ {
super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX); super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX);
@@ -84,6 +87,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
this.fileStorageService = fileStorageService; this.fileStorageService = fileStorageService;
this.keywordLoaderService = keywordLoaderService; this.keywordLoaderService = keywordLoaderService;
this.documentLoaderService = documentLoaderService; this.documentLoaderService = documentLoaderService;
this.domainCoordinator = domainCoordinator;
this.dataSource = dataSource; this.dataSource = dataSource;
domainBlacklist.waitUntilLoaded(); domainBlacklist.waitUntilLoaded();
@@ -107,6 +111,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
try { try {
Injector injector = Guice.createInjector( Injector injector = Guice.createInjector(
new LiveCrawlerModule(), new LiveCrawlerModule(),
new DomainCoordinationModule(), // 2 hours lease timeout is enough for the live crawler
new ProcessConfigurationModule("crawler"), new ProcessConfigurationModule("crawler"),
new ConverterModule(), new ConverterModule(),
new ServiceDiscoveryModule(), new ServiceDiscoveryModule(),
@@ -172,7 +177,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
processHeartbeat.progress(LiveCrawlState.CRAWLING); processHeartbeat.progress(LiveCrawlState.CRAWLING);
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainQueries, domainBlacklist); try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainCoordinator, domainQueries, domainBlacklist);
var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling")) var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling"))
{ {
for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) { for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) {

View File

@@ -5,8 +5,9 @@ import crawlercommons.robots.SimpleRobotRulesParser;
import nu.marginalia.WmsaHome; import nu.marginalia.WmsaHome;
import nu.marginalia.contenttype.ContentType; import nu.marginalia.contenttype.ContentType;
import nu.marginalia.contenttype.DocumentBodyToString; import nu.marginalia.contenttype.DocumentBodyToString;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.coordination.DomainLock;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.logic.DomainLocks;
import nu.marginalia.crawl.retreival.CrawlDelayTimer; import nu.marginalia.crawl.retreival.CrawlDelayTimer;
import nu.marginalia.db.DbDomainQueries; import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.db.DomainBlacklist; import nu.marginalia.db.DomainBlacklist;
@@ -46,14 +47,16 @@ public class SimpleLinkScraper implements AutoCloseable {
private final DomainBlacklist domainBlacklist; private final DomainBlacklist domainBlacklist;
private final Duration connectTimeout = Duration.ofSeconds(10); private final Duration connectTimeout = Duration.ofSeconds(10);
private final Duration readTimeout = Duration.ofSeconds(10); private final Duration readTimeout = Duration.ofSeconds(10);
private final DomainLocks domainLocks = new DomainLocks(); private final DomainCoordinator domainCoordinator;
private final static int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 10 * 1024 * 1024); private final static int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 10 * 1024 * 1024);
public SimpleLinkScraper(LiveCrawlDataSet dataSet, public SimpleLinkScraper(LiveCrawlDataSet dataSet,
DomainCoordinator domainCoordinator,
DbDomainQueries domainQueries, DbDomainQueries domainQueries,
DomainBlacklist domainBlacklist) { DomainBlacklist domainBlacklist) {
this.dataSet = dataSet; this.dataSet = dataSet;
this.domainCoordinator = domainCoordinator;
this.domainQueries = domainQueries; this.domainQueries = domainQueries;
this.domainBlacklist = domainBlacklist; this.domainBlacklist = domainBlacklist;
} }
@@ -98,7 +101,7 @@ public class SimpleLinkScraper implements AutoCloseable {
.version(HttpClient.Version.HTTP_2) .version(HttpClient.Version.HTTP_2)
.build(); .build();
// throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove: // throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove:
DomainLocks.DomainLock lock = domainLocks.lockDomain(domain) DomainLock lock = domainCoordinator.lockDomain(domain)
) { ) {
SimpleRobotRules rules = fetchRobotsRules(rootUrl, client); SimpleRobotRules rules = fetchRobotsRules(rootUrl, client);

View File

@@ -1,5 +1,6 @@
package nu.marginalia.livecrawler; package nu.marginalia.livecrawler;
import nu.marginalia.coordination.LocalDomainCoordinator;
import nu.marginalia.db.DomainBlacklistImpl; import nu.marginalia.db.DomainBlacklistImpl;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
@@ -37,7 +38,7 @@ class SimpleLinkScraperTest {
@Test @Test
public void testRetrieveNow() throws Exception { public void testRetrieveNow() throws Exception {
var scraper = new SimpleLinkScraper(dataSet, null, Mockito.mock(DomainBlacklistImpl.class)); var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(), null, Mockito.mock(DomainBlacklistImpl.class));
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/")); int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
Assertions.assertEquals(1, fetched); Assertions.assertEquals(1, fetched);
@@ -57,7 +58,7 @@ class SimpleLinkScraperTest {
@Test @Test
public void testRetrieveNow_Redundant() throws Exception { public void testRetrieveNow_Redundant() throws Exception {
dataSet.saveDocument(1, new EdgeUrl("https://www.marginalia.nu/"), "<html>", "", "127.0.0.1"); dataSet.saveDocument(1, new EdgeUrl("https://www.marginalia.nu/"), "<html>", "", "127.0.0.1");
var scraper = new SimpleLinkScraper(dataSet, null, Mockito.mock(DomainBlacklistImpl.class)); var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(),null, Mockito.mock(DomainBlacklistImpl.class));
// If the requested URL is already in the dataSet, we retrieveNow should shortcircuit and not fetch anything // If the requested URL is already in the dataSet, we retrieveNow should shortcircuit and not fetch anything
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/")); int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));

View File

@@ -27,6 +27,7 @@ dependencies {
implementation project(':code:common:config') implementation project(':code:common:config')
implementation project(':code:common:service') implementation project(':code:common:service')
implementation project(':code:libraries:domain-lock')
implementation project(':code:libraries:geo-ip') implementation project(':code:libraries:geo-ip')
implementation project(':code:libraries:message-queue') implementation project(':code:libraries:message-queue')

View File

@@ -1,6 +1,7 @@
package nu.marginalia.ping; package nu.marginalia.ping;
import com.google.inject.Inject; import com.google.inject.Inject;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.ping.model.*; import nu.marginalia.ping.model.*;
import nu.marginalia.ping.svc.DnsPingService; import nu.marginalia.ping.svc.DnsPingService;
import nu.marginalia.ping.svc.HttpPingService; import nu.marginalia.ping.svc.HttpPingService;
@@ -23,6 +24,7 @@ import java.util.concurrent.TimeUnit;
public class PingJobScheduler { public class PingJobScheduler {
private final HttpPingService httpPingService; private final HttpPingService httpPingService;
private final DnsPingService dnsPingService; private final DnsPingService dnsPingService;
private final DomainCoordinator domainCoordinator;
private final PingDao pingDao; private final PingDao pingDao;
private static final Logger logger = LoggerFactory.getLogger(PingJobScheduler.class); private static final Logger logger = LoggerFactory.getLogger(PingJobScheduler.class);
@@ -43,10 +45,12 @@ public class PingJobScheduler {
@Inject @Inject
public PingJobScheduler(HttpPingService httpPingService, public PingJobScheduler(HttpPingService httpPingService,
DnsPingService dnsPingService, DnsPingService dnsPingService,
DomainCoordinator domainCoordinator,
PingDao pingDao) PingDao pingDao)
{ {
this.httpPingService = httpPingService; this.httpPingService = httpPingService;
this.dnsPingService = dnsPingService; this.dnsPingService = dnsPingService;
this.domainCoordinator = domainCoordinator;
this.pingDao = pingDao; this.pingDao = pingDao;
} }

View File

@@ -5,15 +5,14 @@ import com.google.inject.Guice;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Injector; import com.google.inject.Injector;
import nu.marginalia.WmsaHome; import nu.marginalia.WmsaHome;
import nu.marginalia.coordination.DomainCoordinationModule;
import nu.marginalia.geoip.GeoIpDictionary; import nu.marginalia.geoip.GeoIpDictionary;
import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqapi.ping.PingRequest; import nu.marginalia.mqapi.ping.PingRequest;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.process.ProcessConfiguration; import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule; import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass; import nu.marginalia.process.ProcessMainClass;
import nu.marginalia.service.discovery.ServiceRegistryIf;
import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.module.ServiceDiscoveryModule; import nu.marginalia.service.module.ServiceDiscoveryModule;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -25,8 +24,6 @@ public class PingMain extends ProcessMainClass {
private static final Logger log = LoggerFactory.getLogger(PingMain.class); private static final Logger log = LoggerFactory.getLogger(PingMain.class);
private final PingJobScheduler pingJobScheduler; private final PingJobScheduler pingJobScheduler;
private final ServiceRegistryIf serviceRegistry;
private final NodeConfigurationService nodeConfigurationService;
private final int node; private final int node;
private static final Logger logger = LoggerFactory.getLogger(PingMain.class); private static final Logger logger = LoggerFactory.getLogger(PingMain.class);
@@ -36,15 +33,11 @@ public class PingMain extends ProcessMainClass {
ProcessConfiguration config, ProcessConfiguration config,
Gson gson, Gson gson,
PingJobScheduler pingJobScheduler, PingJobScheduler pingJobScheduler,
ServiceRegistryIf serviceRegistry,
NodeConfigurationService nodeConfigurationService,
ProcessConfiguration processConfiguration ProcessConfiguration processConfiguration
) { ) {
super(messageQueueFactory, config, gson, ProcessInboxNames.PING_INBOX); super(messageQueueFactory, config, gson, ProcessInboxNames.PING_INBOX);
this.pingJobScheduler = pingJobScheduler; this.pingJobScheduler = pingJobScheduler;
this.serviceRegistry = serviceRegistry;
this.nodeConfigurationService = nodeConfigurationService;
this.node = processConfiguration.node(); this.node = processConfiguration.node();
} }
@@ -80,6 +73,7 @@ public class PingMain extends ProcessMainClass {
Injector injector = Guice.createInjector( Injector injector = Guice.createInjector(
new PingModule(), new PingModule(),
new ServiceDiscoveryModule(), new ServiceDiscoveryModule(),
new DomainCoordinationModule(),
new ProcessConfigurationModule("ping"), new ProcessConfigurationModule("ping"),
new DatabaseModule(false) new DatabaseModule(false)
); );

View File

@@ -1,3 +1,10 @@
package nu.marginalia.ping.model; package nu.marginalia.ping.model;
public record DomainReference(int domainId, int nodeId, String domainName) { } import nu.marginalia.model.EdgeDomain;
public record DomainReference(int domainId, int nodeId, String domainName) {
public EdgeDomain asEdgeDomain() {
return new EdgeDomain(domainName);
}
}

View File

@@ -2,6 +2,7 @@ package nu.marginalia.ping.svc;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.ping.fetcher.PingHttpFetcher; import nu.marginalia.ping.fetcher.PingHttpFetcher;
import nu.marginalia.ping.fetcher.response.*; import nu.marginalia.ping.fetcher.response.*;
import nu.marginalia.ping.model.*; import nu.marginalia.ping.model.*;
@@ -26,6 +27,7 @@ import java.util.List;
@Singleton @Singleton
public class HttpPingService { public class HttpPingService {
private final DomainCoordinator domainCoordinator;
private final PingHttpFetcher pingHttpFetcher; private final PingHttpFetcher pingHttpFetcher;
private final DomainAvailabilityInformationFactory domainAvailabilityInformationFactory; private final DomainAvailabilityInformationFactory domainAvailabilityInformationFactory;
@@ -36,9 +38,11 @@ public class HttpPingService {
@Inject @Inject
public HttpPingService( public HttpPingService(
DomainCoordinator domainCoordinator,
PingHttpFetcher pingHttpFetcher, PingHttpFetcher pingHttpFetcher,
DomainAvailabilityInformationFactory domainAvailabilityInformationFactory, DomainAvailabilityInformationFactory domainAvailabilityInformationFactory,
DomainSecurityInformationFactory domainSecurityInformationFactory) throws Exception { DomainSecurityInformationFactory domainSecurityInformationFactory) throws Exception {
this.domainCoordinator = domainCoordinator;
this.pingHttpFetcher = pingHttpFetcher; this.pingHttpFetcher = pingHttpFetcher;
this.domainAvailabilityInformationFactory = domainAvailabilityInformationFactory; this.domainAvailabilityInformationFactory = domainAvailabilityInformationFactory;
this.domainSecurityInformationFactory = domainSecurityInformationFactory; this.domainSecurityInformationFactory = domainSecurityInformationFactory;
@@ -59,7 +63,8 @@ public class HttpPingService {
public List<WritableModel> pingDomain(DomainReference domainReference, public List<WritableModel> pingDomain(DomainReference domainReference,
@Nullable DomainAvailabilityRecord oldPingStatus, @Nullable DomainAvailabilityRecord oldPingStatus,
@Nullable DomainSecurityRecord oldSecurityInformation) throws SQLException { @Nullable DomainSecurityRecord oldSecurityInformation) throws SQLException, InterruptedException {
// First we figure out if the domain maps to an IP address // First we figure out if the domain maps to an IP address
List<WritableModel> generatedRecords = new ArrayList<>(); List<WritableModel> generatedRecords = new ArrayList<>();
@@ -69,26 +74,31 @@ public class HttpPingService {
if (ipAddress.isEmpty()) { if (ipAddress.isEmpty()) {
result = new UnknownHostError(); result = new UnknownHostError();
} } else {
else { // lock the domain to prevent concurrent pings
String url = "https://" + domainReference.domainName() + "/"; try (var _ = domainCoordinator.lockDomain(domainReference.asEdgeDomain())) {
String alternateUrl = "http://" + domainReference.domainName() + "/"; String url = "https://" + domainReference.domainName() + "/";
String alternateUrl = "http://" + domainReference.domainName() + "/";
result = pingHttpFetcher.fetchUrl(url, Method.HEAD, null, null); result = pingHttpFetcher.fetchUrl(url, Method.HEAD, null, null);
if (result instanceof HttpsResponse response && shouldTryGET(response.httpStatus())) { if (result instanceof HttpsResponse response && shouldTryGET(response.httpStatus())) {
sleep(Duration.ofSeconds(2));
result = pingHttpFetcher.fetchUrl(url, Method.GET, null, null);
}
else if (result instanceof ConnectionError) {
var result2 = pingHttpFetcher.fetchUrl(alternateUrl, Method.HEAD, null, null);
if (!(result2 instanceof ConnectionError)) {
result = result2;
}
if (result instanceof HttpResponse response && shouldTryGET(response.httpStatus())) {
sleep(Duration.ofSeconds(2)); sleep(Duration.ofSeconds(2));
result = pingHttpFetcher.fetchUrl(alternateUrl, Method.GET, null, null); result = pingHttpFetcher.fetchUrl(url, Method.GET, null, null);
} else if (result instanceof ConnectionError) {
var result2 = pingHttpFetcher.fetchUrl(alternateUrl, Method.HEAD, null, null);
if (!(result2 instanceof ConnectionError)) {
result = result2;
}
if (result instanceof HttpResponse response && shouldTryGET(response.httpStatus())) {
sleep(Duration.ofSeconds(2));
result = pingHttpFetcher.fetchUrl(alternateUrl, Method.GET, null, null);
}
} }
// Add a grace sleep before we yield the semaphore, so that another thread doesn't
// immediately hammer the same domain after it's released.
sleep(Duration.ofSeconds(1));
} }
} }
@@ -186,8 +196,8 @@ public class HttpPingService {
} }
if (oldSecurityInformation != null && newSecurityInformation != null) { if (oldSecurityInformation != null && newSecurityInformation != null) {
compareSecurityInformation(generatedRecords, compareSecurityInformation(generatedRecords,
oldSecurityInformation, oldPingStatus, oldSecurityInformation, oldPingStatus,
newSecurityInformation, newPingStatus); newSecurityInformation, newPingStatus);
} }
return generatedRecords; return generatedRecords;

View File

@@ -2,6 +2,7 @@ package nu.marginalia.ping;
import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.coordination.LocalDomainCoordinator;
import nu.marginalia.geoip.GeoIpDictionary; import nu.marginalia.geoip.GeoIpDictionary;
import nu.marginalia.ping.fetcher.PingDnsFetcher; import nu.marginalia.ping.fetcher.PingDnsFetcher;
import nu.marginalia.ping.fetcher.PingHttpFetcher; import nu.marginalia.ping.fetcher.PingHttpFetcher;
@@ -85,11 +86,14 @@ class AvailabilityJobSchedulerTest {
DomainDnsInformationFactory dnsDomainInformationFactory = new DomainDnsInformationFactory(processConfig, pic); DomainDnsInformationFactory dnsDomainInformationFactory = new DomainDnsInformationFactory(processConfig, pic);
PingJobScheduler pingJobScheduler = new PingJobScheduler( PingJobScheduler pingJobScheduler = new PingJobScheduler(
new HttpPingService(pingHttpFetcher, new HttpPingService(
new LocalDomainCoordinator(),
pingHttpFetcher,
new DomainAvailabilityInformationFactory(new GeoIpDictionary(), new BackoffStrategy(pic)), new DomainAvailabilityInformationFactory(new GeoIpDictionary(), new BackoffStrategy(pic)),
new DomainSecurityInformationFactory()), new DomainSecurityInformationFactory()),
new DnsPingService(new PingDnsFetcher(List.of("8.8.8.8", "8.8.4.4")), new DnsPingService(new PingDnsFetcher(List.of("8.8.8.8", "8.8.4.4")),
dnsDomainInformationFactory), dnsDomainInformationFactory),
new LocalDomainCoordinator(),
pingDao pingDao
); );

View File

@@ -2,6 +2,7 @@ package nu.marginalia.ping;
import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.coordination.LocalDomainCoordinator;
import nu.marginalia.geoip.GeoIpDictionary; import nu.marginalia.geoip.GeoIpDictionary;
import nu.marginalia.ping.fetcher.PingHttpFetcher; import nu.marginalia.ping.fetcher.PingHttpFetcher;
import nu.marginalia.ping.io.HttpClientProvider; import nu.marginalia.ping.io.HttpClientProvider;
@@ -63,6 +64,7 @@ class PingHttpServiceTest {
public void testGetSslInfo() throws Exception { public void testGetSslInfo() throws Exception {
var provider = new HttpClientProvider(); var provider = new HttpClientProvider();
var pingService = new HttpPingService( var pingService = new HttpPingService(
new LocalDomainCoordinator(),
new PingHttpFetcher(provider.get()), new PingHttpFetcher(provider.get()),
new DomainAvailabilityInformationFactory(new GeoIpDictionary(), new DomainAvailabilityInformationFactory(new GeoIpDictionary(),
new BackoffStrategy(PingModule.createPingIntervalsConfiguration()) new BackoffStrategy(PingModule.createPingIntervalsConfiguration())

View File

@@ -37,6 +37,7 @@ dependencies {
implementation project(':code:functions:domain-info') implementation project(':code:functions:domain-info')
implementation project(':code:functions:domain-info:api') implementation project(':code:functions:domain-info:api')
implementation project(':code:libraries:domain-lock')
implementation project(':code:libraries:geo-ip') implementation project(':code:libraries:geo-ip')
implementation project(':code:libraries:language-processing') implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:term-frequency-dict') implementation project(':code:libraries:term-frequency-dict')

View File

@@ -5,6 +5,7 @@ import com.google.inject.Inject;
import com.google.inject.Injector; import com.google.inject.Injector;
import io.jooby.ExecutionMode; import io.jooby.ExecutionMode;
import io.jooby.Jooby; import io.jooby.Jooby;
import nu.marginalia.coordination.DomainCoordinationModule;
import nu.marginalia.livecapture.LivecaptureModule; import nu.marginalia.livecapture.LivecaptureModule;
import nu.marginalia.service.MainClass; import nu.marginalia.service.MainClass;
import nu.marginalia.service.ServiceId; import nu.marginalia.service.ServiceId;
@@ -29,6 +30,7 @@ public class AssistantMain extends MainClass {
Injector injector = Guice.createInjector( Injector injector = Guice.createInjector(
new AssistantModule(), new AssistantModule(),
new LivecaptureModule(), new LivecaptureModule(),
new DomainCoordinationModule(),
new ServiceConfigurationModule(ServiceId.Assistant), new ServiceConfigurationModule(ServiceId.Assistant),
new ServiceDiscoveryModule(), new ServiceDiscoveryModule(),
new DatabaseModule(false) new DatabaseModule(false)

View File

@@ -55,6 +55,7 @@ include 'code:libraries:braille-block-punch-cards'
include 'code:libraries:language-processing' include 'code:libraries:language-processing'
include 'code:libraries:term-frequency-dict' include 'code:libraries:term-frequency-dict'
include 'code:libraries:test-helpers' include 'code:libraries:test-helpers'
include 'code:libraries:domain-lock'
include 'code:libraries:message-queue' include 'code:libraries:message-queue'