mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 07:32:38 +02:00
Compare commits
2 Commits
deploy-023
...
deploy-023
Author | SHA1 | Date | |
---|---|---|---|
|
db7930016a | ||
|
82456ad673 |
@@ -5,12 +5,10 @@ import nu.marginalia.service.discovery.monitor.ServiceChangeMonitor;
|
|||||||
import nu.marginalia.service.discovery.monitor.ServiceMonitorIf;
|
import nu.marginalia.service.discovery.monitor.ServiceMonitorIf;
|
||||||
import nu.marginalia.service.discovery.property.ServiceEndpoint;
|
import nu.marginalia.service.discovery.property.ServiceEndpoint;
|
||||||
import nu.marginalia.service.discovery.property.ServiceKey;
|
import nu.marginalia.service.discovery.property.ServiceKey;
|
||||||
|
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.function.BiConsumer;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
import static nu.marginalia.service.discovery.property.ServiceEndpoint.InstanceAddress;
|
import static nu.marginalia.service.discovery.property.ServiceEndpoint.InstanceAddress;
|
||||||
|
|
||||||
@@ -66,6 +64,6 @@ public interface ServiceRegistryIf {
|
|||||||
|
|
||||||
void registerProcess(String processName, int nodeId);
|
void registerProcess(String processName, int nodeId);
|
||||||
void deregisterProcess(String processName, int nodeId);
|
void deregisterProcess(String processName, int nodeId);
|
||||||
void watchProcess(String processName, int nodeId, Consumer<Boolean> callback) throws Exception;
|
|
||||||
void watchProcessAnyNode(String processName, Collection<Integer> nodes, BiConsumer<Boolean, Integer> callback) throws Exception;
|
InterProcessSemaphoreV2 getSemaphore(String name, int permits) throws Exception;
|
||||||
}
|
}
|
||||||
|
@@ -6,6 +6,7 @@ import nu.marginalia.service.discovery.monitor.ServiceMonitorIf;
|
|||||||
import nu.marginalia.service.discovery.property.ServiceEndpoint;
|
import nu.marginalia.service.discovery.property.ServiceEndpoint;
|
||||||
import nu.marginalia.service.discovery.property.ServiceKey;
|
import nu.marginalia.service.discovery.property.ServiceKey;
|
||||||
import org.apache.curator.framework.CuratorFramework;
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
|
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
|
||||||
import org.apache.curator.utils.ZKPaths;
|
import org.apache.curator.utils.ZKPaths;
|
||||||
import org.apache.zookeeper.CreateMode;
|
import org.apache.zookeeper.CreateMode;
|
||||||
import org.apache.zookeeper.Watcher;
|
import org.apache.zookeeper.Watcher;
|
||||||
@@ -13,10 +14,11 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.*;
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.UUID;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.BiConsumer;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
import static nu.marginalia.service.discovery.property.ServiceEndpoint.InstanceAddress;
|
import static nu.marginalia.service.discovery.property.ServiceEndpoint.InstanceAddress;
|
||||||
|
|
||||||
@@ -283,60 +285,12 @@ public class ZkServiceRegistry implements ServiceRegistryIf {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void watchProcess(String processName, int nodeId, Consumer<Boolean> callback) throws Exception {
|
public InterProcessSemaphoreV2 getSemaphore(String name, int permits) {
|
||||||
String path = "/process-locks/" + processName + "/" + nodeId;
|
if (stopped)
|
||||||
|
throw new IllegalStateException("Service registry is stopped, cannot get semaphore " + name);
|
||||||
|
|
||||||
// first check if the path exists and call the callback accordingly
|
String path = "/semaphores/" + name;
|
||||||
|
return new InterProcessSemaphoreV2(curatorFramework, path, permits);
|
||||||
if (curatorFramework.checkExists().forPath(path) != null) {
|
|
||||||
callback.accept(true);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
callback.accept(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
curatorFramework.watchers().add()
|
|
||||||
.usingWatcher((Watcher) change -> {
|
|
||||||
Watcher.Event.EventType type = change.getType();
|
|
||||||
|
|
||||||
if (type == Watcher.Event.EventType.NodeCreated) {
|
|
||||||
callback.accept(true);
|
|
||||||
}
|
|
||||||
if (type == Watcher.Event.EventType.NodeDeleted) {
|
|
||||||
callback.accept(false);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.forPath(path);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void watchProcessAnyNode(String processName, Collection<Integer> nodes, BiConsumer<Boolean, Integer> callback) throws Exception {
|
|
||||||
|
|
||||||
for (int node : nodes) {
|
|
||||||
String path = "/process-locks/" + processName + "/" + node;
|
|
||||||
|
|
||||||
// first check if the path exists and call the callback accordingly
|
|
||||||
if (curatorFramework.checkExists().forPath(path) != null) {
|
|
||||||
callback.accept(true, node);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
callback.accept(false, node);
|
|
||||||
}
|
|
||||||
|
|
||||||
curatorFramework.watchers().add()
|
|
||||||
.usingWatcher((Watcher) change -> {
|
|
||||||
Watcher.Event.EventType type = change.getType();
|
|
||||||
|
|
||||||
if (type == Watcher.Event.EventType.NodeCreated) {
|
|
||||||
callback.accept(true, node);
|
|
||||||
}
|
|
||||||
if (type == Watcher.Event.EventType.NodeDeleted) {
|
|
||||||
callback.accept(false, node);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.forPath(path);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Exposed for tests */
|
/* Exposed for tests */
|
||||||
|
@@ -22,6 +22,7 @@ dependencies {
|
|||||||
implementation project(':code:common:db')
|
implementation project(':code:common:db')
|
||||||
implementation project(':code:libraries:blocking-thread-pool')
|
implementation project(':code:libraries:blocking-thread-pool')
|
||||||
implementation project(':code:libraries:message-queue')
|
implementation project(':code:libraries:message-queue')
|
||||||
|
implementation project(':code:libraries:domain-lock')
|
||||||
|
|
||||||
implementation project(':code:execution:api')
|
implementation project(':code:execution:api')
|
||||||
implementation project(':code:processes:crawling-process:ft-content-type')
|
implementation project(':code:processes:crawling-process:ft-content-type')
|
||||||
|
@@ -1,66 +0,0 @@
|
|||||||
package nu.marginalia.rss.svc;
|
|
||||||
|
|
||||||
import nu.marginalia.model.EdgeDomain;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.Semaphore;
|
|
||||||
|
|
||||||
/** Holds lock objects for each domain, to prevent multiple threads from
|
|
||||||
* crawling the same domain at the same time.
|
|
||||||
*/
|
|
||||||
public class DomainLocks {
|
|
||||||
// The locks are stored in a map, with the domain name as the key. This map will grow
|
|
||||||
// relatively big, but should be manageable since the number of domains is limited to
|
|
||||||
// a few hundred thousand typically.
|
|
||||||
private final Map<String, Semaphore> locks = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
/** Returns a lock object corresponding to the given domain. The object is returned as-is,
|
|
||||||
* and may be held by another thread. The caller is responsible for locking and releasing the lock.
|
|
||||||
*/
|
|
||||||
public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException {
|
|
||||||
return new DomainLock(domain.toString(),
|
|
||||||
locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits));
|
|
||||||
}
|
|
||||||
|
|
||||||
private Semaphore defaultPermits(String topDomain) {
|
|
||||||
if (topDomain.equals("wordpress.com"))
|
|
||||||
return new Semaphore(16);
|
|
||||||
if (topDomain.equals("blogspot.com"))
|
|
||||||
return new Semaphore(8);
|
|
||||||
|
|
||||||
if (topDomain.equals("neocities.org"))
|
|
||||||
return new Semaphore(4);
|
|
||||||
if (topDomain.equals("github.io"))
|
|
||||||
return new Semaphore(4);
|
|
||||||
|
|
||||||
if (topDomain.equals("substack.com")) {
|
|
||||||
return new Semaphore(1);
|
|
||||||
}
|
|
||||||
if (topDomain.endsWith(".edu")) {
|
|
||||||
return new Semaphore(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
return new Semaphore(2);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class DomainLock implements AutoCloseable {
|
|
||||||
private final String domainName;
|
|
||||||
private final Semaphore semaphore;
|
|
||||||
|
|
||||||
DomainLock(String domainName, Semaphore semaphore) throws InterruptedException {
|
|
||||||
this.domainName = domainName;
|
|
||||||
this.semaphore = semaphore;
|
|
||||||
|
|
||||||
Thread.currentThread().setName("fetching:" + domainName + " [await domain lock]");
|
|
||||||
semaphore.acquire();
|
|
||||||
Thread.currentThread().setName("fetching:" + domainName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
semaphore.release();
|
|
||||||
Thread.currentThread().setName("fetching:" + domainName + " [wrapping up]");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@@ -5,6 +5,8 @@ import com.opencsv.CSVReader;
|
|||||||
import nu.marginalia.WmsaHome;
|
import nu.marginalia.WmsaHome;
|
||||||
import nu.marginalia.contenttype.ContentType;
|
import nu.marginalia.contenttype.ContentType;
|
||||||
import nu.marginalia.contenttype.DocumentBodyToString;
|
import nu.marginalia.contenttype.DocumentBodyToString;
|
||||||
|
import nu.marginalia.coordination.DomainCoordinator;
|
||||||
|
import nu.marginalia.coordination.DomainLock;
|
||||||
import nu.marginalia.executor.client.ExecutorClient;
|
import nu.marginalia.executor.client.ExecutorClient;
|
||||||
import nu.marginalia.model.EdgeDomain;
|
import nu.marginalia.model.EdgeDomain;
|
||||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||||
@@ -51,12 +53,13 @@ public class FeedFetcherService {
|
|||||||
private final ServiceHeartbeat serviceHeartbeat;
|
private final ServiceHeartbeat serviceHeartbeat;
|
||||||
private final ExecutorClient executorClient;
|
private final ExecutorClient executorClient;
|
||||||
|
|
||||||
private final DomainLocks domainLocks = new DomainLocks();
|
private final DomainCoordinator domainCoordinator;
|
||||||
|
|
||||||
private volatile boolean updating;
|
private volatile boolean updating;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public FeedFetcherService(FeedDb feedDb,
|
public FeedFetcherService(FeedDb feedDb,
|
||||||
|
DomainCoordinator domainCoordinator,
|
||||||
FileStorageService fileStorageService,
|
FileStorageService fileStorageService,
|
||||||
NodeConfigurationService nodeConfigurationService,
|
NodeConfigurationService nodeConfigurationService,
|
||||||
ServiceHeartbeat serviceHeartbeat,
|
ServiceHeartbeat serviceHeartbeat,
|
||||||
@@ -67,6 +70,7 @@ public class FeedFetcherService {
|
|||||||
this.nodeConfigurationService = nodeConfigurationService;
|
this.nodeConfigurationService = nodeConfigurationService;
|
||||||
this.serviceHeartbeat = serviceHeartbeat;
|
this.serviceHeartbeat = serviceHeartbeat;
|
||||||
this.executorClient = executorClient;
|
this.executorClient = executorClient;
|
||||||
|
this.domainCoordinator = domainCoordinator;
|
||||||
}
|
}
|
||||||
|
|
||||||
public enum UpdateMode {
|
public enum UpdateMode {
|
||||||
@@ -132,7 +136,7 @@ public class FeedFetcherService {
|
|||||||
};
|
};
|
||||||
|
|
||||||
FetchResult feedData;
|
FetchResult feedData;
|
||||||
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
|
try (DomainLock domainLock = domainCoordinator.lockDomain(new EdgeDomain(feed.domain()))) {
|
||||||
feedData = fetchFeedData(feed, client, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
|
feedData = fetchFeedData(feed, client, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
feedData = new FetchResult.TransientError();
|
feedData = new FetchResult.TransientError();
|
||||||
|
32
code/libraries/domain-lock/build.gradle
Normal file
32
code/libraries/domain-lock/build.gradle
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
plugins {
|
||||||
|
id 'java'
|
||||||
|
}
|
||||||
|
|
||||||
|
java {
|
||||||
|
toolchain {
|
||||||
|
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||||
|
|
||||||
|
dependencies {
|
||||||
|
implementation libs.bundles.slf4j
|
||||||
|
implementation project(':code:common:model')
|
||||||
|
implementation project(':code:common:config')
|
||||||
|
implementation project(':code:common:service')
|
||||||
|
|
||||||
|
implementation libs.bundles.curator
|
||||||
|
|
||||||
|
implementation libs.guava
|
||||||
|
implementation dependencies.create(libs.guice.get()) {
|
||||||
|
exclude group: 'com.google.guava'
|
||||||
|
}
|
||||||
|
testImplementation libs.bundles.slf4j.test
|
||||||
|
testImplementation libs.bundles.junit
|
||||||
|
testImplementation libs.mockito
|
||||||
|
}
|
||||||
|
|
||||||
|
test {
|
||||||
|
useJUnitPlatform()
|
||||||
|
}
|
@@ -0,0 +1,32 @@
|
|||||||
|
package nu.marginalia.coordination;
|
||||||
|
|
||||||
|
import nu.marginalia.model.EdgeDomain;
|
||||||
|
|
||||||
|
public class DefaultDomainPermits {
|
||||||
|
|
||||||
|
public static int defaultPermits(EdgeDomain domain) {
|
||||||
|
return defaultPermits(domain.topDomain.toLowerCase());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int defaultPermits(String topDomain) {
|
||||||
|
|
||||||
|
if (topDomain.equals("wordpress.com"))
|
||||||
|
return 16;
|
||||||
|
if (topDomain.equals("blogspot.com"))
|
||||||
|
return 8;
|
||||||
|
if (topDomain.equals("tumblr.com"))
|
||||||
|
return 8;
|
||||||
|
if (topDomain.equals("neocities.org"))
|
||||||
|
return 8;
|
||||||
|
if (topDomain.equals("github.io"))
|
||||||
|
return 8;
|
||||||
|
// Substack really dislikes broad-scale crawlers, so we need to be careful
|
||||||
|
// to not get blocked.
|
||||||
|
if (topDomain.equals("substack.com")) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@@ -0,0 +1,17 @@
|
|||||||
|
package nu.marginalia.coordination;
|
||||||
|
|
||||||
|
import com.google.inject.AbstractModule;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class DomainCoordinationModule extends AbstractModule {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(DomainCoordinationModule.class);
|
||||||
|
|
||||||
|
public DomainCoordinationModule() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public void configure() {
|
||||||
|
bind(DomainCoordinator.class).to(ZookeeperDomainCoordinator.class);
|
||||||
|
}
|
||||||
|
}
|
@@ -0,0 +1,13 @@
|
|||||||
|
package nu.marginalia.coordination;
|
||||||
|
|
||||||
|
import nu.marginalia.model.EdgeDomain;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
public interface DomainCoordinator {
|
||||||
|
DomainLock lockDomain(EdgeDomain domain) throws InterruptedException;
|
||||||
|
Optional<DomainLock> tryLockDomain(EdgeDomain domain, Duration timeout) throws InterruptedException;
|
||||||
|
Optional<DomainLock> tryLockDomain(EdgeDomain domain) throws InterruptedException;
|
||||||
|
boolean isLockableHint(EdgeDomain domain);
|
||||||
|
}
|
@@ -0,0 +1,5 @@
|
|||||||
|
package nu.marginalia.coordination;
|
||||||
|
|
||||||
|
public interface DomainLock extends AutoCloseable {
|
||||||
|
void close();
|
||||||
|
}
|
@@ -1,16 +1,15 @@
|
|||||||
package nu.marginalia.crawl.logic;
|
package nu.marginalia.coordination;
|
||||||
|
|
||||||
import nu.marginalia.model.EdgeDomain;
|
import nu.marginalia.model.EdgeDomain;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/** Holds lock objects for each domain, to prevent multiple threads from
|
public class LocalDomainCoordinator implements DomainCoordinator {
|
||||||
* crawling the same domain at the same time.
|
|
||||||
*/
|
|
||||||
public class DomainLocks {
|
|
||||||
// The locks are stored in a map, with the domain name as the key. This map will grow
|
// The locks are stored in a map, with the domain name as the key. This map will grow
|
||||||
// relatively big, but should be manageable since the number of domains is limited to
|
// relatively big, but should be manageable since the number of domains is limited to
|
||||||
// a few hundred thousand typically.
|
// a few hundred thousand typically.
|
||||||
@@ -24,13 +23,25 @@ public class DomainLocks {
|
|||||||
|
|
||||||
sem.acquire();
|
sem.acquire();
|
||||||
|
|
||||||
return new DomainLock(sem);
|
return new LocalDomainLock(sem);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<DomainLock> tryLockDomain(EdgeDomain domain) {
|
public Optional<DomainLock> tryLockDomain(EdgeDomain domain) {
|
||||||
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
|
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
|
||||||
if (sem.tryAcquire(1)) {
|
if (sem.tryAcquire(1)) {
|
||||||
return Optional.of(new DomainLock(sem));
|
return Optional.of(new LocalDomainLock(sem));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// We don't have a lock, so we return an empty optional
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public Optional<DomainLock> tryLockDomain(EdgeDomain domain, Duration timeout) throws InterruptedException {
|
||||||
|
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
|
||||||
|
if (sem.tryAcquire(1, timeout.toMillis(), TimeUnit.MILLISECONDS)) {
|
||||||
|
return Optional.of(new LocalDomainLock(sem));
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// We don't have a lock, so we return an empty optional
|
// We don't have a lock, so we return an empty optional
|
||||||
@@ -39,24 +50,7 @@ public class DomainLocks {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private Semaphore defaultPermits(String topDomain) {
|
private Semaphore defaultPermits(String topDomain) {
|
||||||
if (topDomain.equals("wordpress.com"))
|
return new Semaphore(DefaultDomainPermits.defaultPermits(topDomain));
|
||||||
return new Semaphore(16);
|
|
||||||
if (topDomain.equals("blogspot.com"))
|
|
||||||
return new Semaphore(8);
|
|
||||||
if (topDomain.equals("tumblr.com"))
|
|
||||||
return new Semaphore(8);
|
|
||||||
if (topDomain.equals("neocities.org"))
|
|
||||||
return new Semaphore(8);
|
|
||||||
if (topDomain.equals("github.io"))
|
|
||||||
return new Semaphore(8);
|
|
||||||
|
|
||||||
// Substack really dislikes broad-scale crawlers, so we need to be careful
|
|
||||||
// to not get blocked.
|
|
||||||
if (topDomain.equals("substack.com")) {
|
|
||||||
return new Semaphore(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
return new Semaphore(2);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Returns true if the domain is lockable, i.e. if it is not already locked by another thread.
|
/** Returns true if the domain is lockable, i.e. if it is not already locked by another thread.
|
||||||
@@ -71,15 +65,15 @@ public class DomainLocks {
|
|||||||
return sem.availablePermits() > 0;
|
return sem.availablePermits() > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class DomainLock implements AutoCloseable {
|
public static class LocalDomainLock implements DomainLock {
|
||||||
private final Semaphore semaphore;
|
private final Semaphore semaphore;
|
||||||
|
|
||||||
DomainLock(Semaphore semaphore) {
|
LocalDomainLock(Semaphore semaphore) {
|
||||||
this.semaphore = semaphore;
|
this.semaphore = semaphore;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws Exception {
|
public void close() {
|
||||||
semaphore.release();
|
semaphore.release();
|
||||||
Thread.currentThread().setName("[idle]");
|
Thread.currentThread().setName("[idle]");
|
||||||
}
|
}
|
@@ -0,0 +1,101 @@
|
|||||||
|
package nu.marginalia.coordination;
|
||||||
|
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
import com.google.inject.name.Named;
|
||||||
|
import nu.marginalia.model.EdgeDomain;
|
||||||
|
import nu.marginalia.service.discovery.ServiceRegistryIf;
|
||||||
|
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
|
||||||
|
import org.apache.curator.framework.recipes.locks.Lease;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class ZookeeperDomainCoordinator implements DomainCoordinator {
|
||||||
|
// The locks are stored in a map, with the domain name as the key. This map will grow
|
||||||
|
// relatively big, but should be manageable since the number of domains is limited to
|
||||||
|
// a few hundred thousand typically.
|
||||||
|
private final Map<String, InterProcessSemaphoreV2> locks = new ConcurrentHashMap<>();
|
||||||
|
private final ServiceRegistryIf serviceRegistry;
|
||||||
|
private final int nodeId;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public ZookeeperDomainCoordinator(ServiceRegistryIf serviceRegistry, @Named("wmsa-system-node") int nodeId) {
|
||||||
|
// Zookeeper-specific initialization can be done here if needed
|
||||||
|
this.serviceRegistry = serviceRegistry;
|
||||||
|
this.nodeId = nodeId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns a lock object corresponding to the given domain. The object is returned as-is,
|
||||||
|
* and may be held by another thread. The caller is responsible for locking and releasing the lock.
|
||||||
|
*/
|
||||||
|
public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException {
|
||||||
|
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::createSemapore);
|
||||||
|
|
||||||
|
try {
|
||||||
|
var lease = sem.acquire();
|
||||||
|
|
||||||
|
return new ZkDomainLock(sem, lease);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException("Failed to acquire lock for domain: " + domain.topDomain, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Optional<DomainLock> tryLockDomain(EdgeDomain domain) throws InterruptedException {
|
||||||
|
return tryLockDomain(domain, Duration.ofSeconds(1)); // Underlying semaphore doesn't have a tryLock method, so we use a short timeout
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public Optional<DomainLock> tryLockDomain(EdgeDomain domain, Duration timeout) throws InterruptedException {
|
||||||
|
|
||||||
|
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::createSemapore);
|
||||||
|
|
||||||
|
try {
|
||||||
|
var lease = sem.acquire(timeout.toMillis(), TimeUnit.MILLISECONDS); // Acquire with timeout
|
||||||
|
if (lease != null) {
|
||||||
|
return Optional.of(new ZkDomainLock(sem, lease));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return Optional.empty(); // If we fail to acquire the lease, we return an empty optional
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
return Optional.empty(); // If we fail to acquire the lock, we return an empty optional
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private InterProcessSemaphoreV2 createSemapore(String topDomain){
|
||||||
|
try {
|
||||||
|
return serviceRegistry.getSemaphore(topDomain + ":" + nodeId, DefaultDomainPermits.defaultPermits(topDomain));
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException("Failed to get semaphore for domain: " + topDomain, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns true if the domain is lockable, i.e. if it is not already locked by another thread.
|
||||||
|
* (this is just a hint, and does not guarantee that the domain is actually lockable any time
|
||||||
|
* after this method returns true)
|
||||||
|
*/
|
||||||
|
public boolean isLockableHint(EdgeDomain domain) {
|
||||||
|
return true; // Curator does not provide a way to check if a lock is available without acquiring it
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class ZkDomainLock implements DomainLock {
|
||||||
|
private final InterProcessSemaphoreV2 semaphore;
|
||||||
|
private final Lease lease;
|
||||||
|
|
||||||
|
ZkDomainLock(InterProcessSemaphoreV2 semaphore, Lease lease) {
|
||||||
|
this.semaphore = semaphore;
|
||||||
|
this.lease = lease;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
semaphore.returnLease(lease);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@@ -32,6 +32,7 @@ dependencies {
|
|||||||
implementation project(':code:libraries:message-queue')
|
implementation project(':code:libraries:message-queue')
|
||||||
implementation project(':code:libraries:language-processing')
|
implementation project(':code:libraries:language-processing')
|
||||||
implementation project(':code:libraries:easy-lsh')
|
implementation project(':code:libraries:easy-lsh')
|
||||||
|
implementation project(':code:libraries:domain-lock')
|
||||||
implementation project(':code:processes:crawling-process:model')
|
implementation project(':code:processes:crawling-process:model')
|
||||||
implementation project(':code:processes:crawling-process:model')
|
implementation project(':code:processes:crawling-process:model')
|
||||||
|
|
||||||
|
@@ -10,9 +10,11 @@ import nu.marginalia.WmsaHome;
|
|||||||
import nu.marginalia.atags.model.DomainLinks;
|
import nu.marginalia.atags.model.DomainLinks;
|
||||||
import nu.marginalia.atags.source.AnchorTagsSource;
|
import nu.marginalia.atags.source.AnchorTagsSource;
|
||||||
import nu.marginalia.atags.source.AnchorTagsSourceFactory;
|
import nu.marginalia.atags.source.AnchorTagsSourceFactory;
|
||||||
|
import nu.marginalia.coordination.DomainCoordinationModule;
|
||||||
|
import nu.marginalia.coordination.DomainCoordinator;
|
||||||
|
import nu.marginalia.coordination.DomainLock;
|
||||||
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
|
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
|
||||||
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
|
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
|
||||||
import nu.marginalia.crawl.logic.DomainLocks;
|
|
||||||
import nu.marginalia.crawl.retreival.CrawlDataReference;
|
import nu.marginalia.crawl.retreival.CrawlDataReference;
|
||||||
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
|
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
|
||||||
import nu.marginalia.crawl.retreival.DomainProber;
|
import nu.marginalia.crawl.retreival.DomainProber;
|
||||||
@@ -68,7 +70,7 @@ public class CrawlerMain extends ProcessMainClass {
|
|||||||
private final ServiceRegistryIf serviceRegistry;
|
private final ServiceRegistryIf serviceRegistry;
|
||||||
private final SimpleBlockingThreadPool pool;
|
private final SimpleBlockingThreadPool pool;
|
||||||
|
|
||||||
private final DomainLocks domainLocks = new DomainLocks();
|
private final DomainCoordinator domainCoordinator;
|
||||||
|
|
||||||
private final Map<String, CrawlTask> pendingCrawlTasks = new ConcurrentHashMap<>();
|
private final Map<String, CrawlTask> pendingCrawlTasks = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
@@ -97,6 +99,7 @@ public class CrawlerMain extends ProcessMainClass {
|
|||||||
WarcArchiverFactory warcArchiverFactory,
|
WarcArchiverFactory warcArchiverFactory,
|
||||||
HikariDataSource dataSource,
|
HikariDataSource dataSource,
|
||||||
DomainBlacklist blacklist,
|
DomainBlacklist blacklist,
|
||||||
|
DomainCoordinator domainCoordinator,
|
||||||
ServiceRegistryIf serviceRegistry,
|
ServiceRegistryIf serviceRegistry,
|
||||||
Gson gson) throws InterruptedException {
|
Gson gson) throws InterruptedException {
|
||||||
|
|
||||||
@@ -114,6 +117,7 @@ public class CrawlerMain extends ProcessMainClass {
|
|||||||
this.blacklist = blacklist;
|
this.blacklist = blacklist;
|
||||||
this.node = processConfiguration.node();
|
this.node = processConfiguration.node();
|
||||||
this.serviceRegistry = serviceRegistry;
|
this.serviceRegistry = serviceRegistry;
|
||||||
|
this.domainCoordinator = domainCoordinator;
|
||||||
|
|
||||||
SimpleBlockingThreadPool.ThreadType threadType;
|
SimpleBlockingThreadPool.ThreadType threadType;
|
||||||
if (Boolean.getBoolean("crawler.useVirtualThreads")) {
|
if (Boolean.getBoolean("crawler.useVirtualThreads")) {
|
||||||
@@ -157,6 +161,7 @@ public class CrawlerMain extends ProcessMainClass {
|
|||||||
new CrawlerModule(),
|
new CrawlerModule(),
|
||||||
new ProcessConfigurationModule("crawler"),
|
new ProcessConfigurationModule("crawler"),
|
||||||
new ServiceDiscoveryModule(),
|
new ServiceDiscoveryModule(),
|
||||||
|
new DomainCoordinationModule(),
|
||||||
new DatabaseModule(false)
|
new DatabaseModule(false)
|
||||||
);
|
);
|
||||||
var crawler = injector.getInstance(CrawlerMain.class);
|
var crawler = injector.getInstance(CrawlerMain.class);
|
||||||
@@ -451,7 +456,7 @@ public class CrawlerMain extends ProcessMainClass {
|
|||||||
/** Best effort indicator whether we could start this now without getting stuck in
|
/** Best effort indicator whether we could start this now without getting stuck in
|
||||||
* DomainLocks purgatory */
|
* DomainLocks purgatory */
|
||||||
public boolean canRun() {
|
public boolean canRun() {
|
||||||
return domainLocks.isLockableHint(new EdgeDomain(domain));
|
return domainCoordinator.isLockableHint(new EdgeDomain(domain));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -462,7 +467,7 @@ public class CrawlerMain extends ProcessMainClass {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Optional<DomainLocks.DomainLock> lock = domainLocks.tryLockDomain(new EdgeDomain(domain));
|
Optional<DomainLock> lock = domainCoordinator.tryLockDomain(new EdgeDomain(domain));
|
||||||
// We don't have a lock, so we can't run this task
|
// We don't have a lock, so we can't run this task
|
||||||
// we return to avoid blocking the pool for too long
|
// we return to avoid blocking the pool for too long
|
||||||
if (lock.isEmpty()) {
|
if (lock.isEmpty()) {
|
||||||
@@ -470,7 +475,7 @@ public class CrawlerMain extends ProcessMainClass {
|
|||||||
retryQueue.put(this);
|
retryQueue.put(this);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
DomainLocks.DomainLock domainLock = lock.get();
|
DomainLock domainLock = lock.get();
|
||||||
|
|
||||||
try (domainLock) {
|
try (domainLock) {
|
||||||
Thread.currentThread().setName("crawling:" + domain);
|
Thread.currentThread().setName("crawling:" + domain);
|
||||||
|
@@ -32,6 +32,7 @@ dependencies {
|
|||||||
implementation project(':code:index:api')
|
implementation project(':code:index:api')
|
||||||
implementation project(':code:processes:process-mq-api')
|
implementation project(':code:processes:process-mq-api')
|
||||||
implementation project(':code:libraries:message-queue')
|
implementation project(':code:libraries:message-queue')
|
||||||
|
implementation project(':code:libraries:domain-lock')
|
||||||
implementation project(':code:libraries:language-processing')
|
implementation project(':code:libraries:language-processing')
|
||||||
implementation project(':code:libraries:easy-lsh')
|
implementation project(':code:libraries:easy-lsh')
|
||||||
implementation project(':code:processes:crawling-process')
|
implementation project(':code:processes:crawling-process')
|
||||||
|
@@ -10,6 +10,8 @@ import nu.marginalia.api.feeds.FeedsClient;
|
|||||||
import nu.marginalia.converting.ConverterModule;
|
import nu.marginalia.converting.ConverterModule;
|
||||||
import nu.marginalia.converting.processor.DomainProcessor;
|
import nu.marginalia.converting.processor.DomainProcessor;
|
||||||
import nu.marginalia.converting.writer.ConverterBatchWriter;
|
import nu.marginalia.converting.writer.ConverterBatchWriter;
|
||||||
|
import nu.marginalia.coordination.DomainCoordinationModule;
|
||||||
|
import nu.marginalia.coordination.DomainCoordinator;
|
||||||
import nu.marginalia.db.DbDomainQueries;
|
import nu.marginalia.db.DbDomainQueries;
|
||||||
import nu.marginalia.db.DomainBlacklist;
|
import nu.marginalia.db.DomainBlacklist;
|
||||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||||
@@ -58,6 +60,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
|||||||
private final FileStorageService fileStorageService;
|
private final FileStorageService fileStorageService;
|
||||||
private final KeywordLoaderService keywordLoaderService;
|
private final KeywordLoaderService keywordLoaderService;
|
||||||
private final DocumentLoaderService documentLoaderService;
|
private final DocumentLoaderService documentLoaderService;
|
||||||
|
private final DomainCoordinator domainCoordinator;
|
||||||
private final HikariDataSource dataSource;
|
private final HikariDataSource dataSource;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
@@ -71,7 +74,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
|||||||
DomainProcessor domainProcessor,
|
DomainProcessor domainProcessor,
|
||||||
FileStorageService fileStorageService,
|
FileStorageService fileStorageService,
|
||||||
KeywordLoaderService keywordLoaderService,
|
KeywordLoaderService keywordLoaderService,
|
||||||
DocumentLoaderService documentLoaderService, HikariDataSource dataSource)
|
DocumentLoaderService documentLoaderService, DomainCoordinator domainCoordinator, HikariDataSource dataSource)
|
||||||
throws Exception
|
throws Exception
|
||||||
{
|
{
|
||||||
super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX);
|
super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX);
|
||||||
@@ -84,6 +87,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
|||||||
this.fileStorageService = fileStorageService;
|
this.fileStorageService = fileStorageService;
|
||||||
this.keywordLoaderService = keywordLoaderService;
|
this.keywordLoaderService = keywordLoaderService;
|
||||||
this.documentLoaderService = documentLoaderService;
|
this.documentLoaderService = documentLoaderService;
|
||||||
|
this.domainCoordinator = domainCoordinator;
|
||||||
this.dataSource = dataSource;
|
this.dataSource = dataSource;
|
||||||
|
|
||||||
domainBlacklist.waitUntilLoaded();
|
domainBlacklist.waitUntilLoaded();
|
||||||
@@ -107,6 +111,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
|||||||
try {
|
try {
|
||||||
Injector injector = Guice.createInjector(
|
Injector injector = Guice.createInjector(
|
||||||
new LiveCrawlerModule(),
|
new LiveCrawlerModule(),
|
||||||
|
new DomainCoordinationModule(), // 2 hours lease timeout is enough for the live crawler
|
||||||
new ProcessConfigurationModule("crawler"),
|
new ProcessConfigurationModule("crawler"),
|
||||||
new ConverterModule(),
|
new ConverterModule(),
|
||||||
new ServiceDiscoveryModule(),
|
new ServiceDiscoveryModule(),
|
||||||
@@ -172,7 +177,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
|||||||
|
|
||||||
processHeartbeat.progress(LiveCrawlState.CRAWLING);
|
processHeartbeat.progress(LiveCrawlState.CRAWLING);
|
||||||
|
|
||||||
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainQueries, domainBlacklist);
|
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainCoordinator, domainQueries, domainBlacklist);
|
||||||
var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling"))
|
var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling"))
|
||||||
{
|
{
|
||||||
for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) {
|
for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) {
|
||||||
|
@@ -5,8 +5,9 @@ import crawlercommons.robots.SimpleRobotRulesParser;
|
|||||||
import nu.marginalia.WmsaHome;
|
import nu.marginalia.WmsaHome;
|
||||||
import nu.marginalia.contenttype.ContentType;
|
import nu.marginalia.contenttype.ContentType;
|
||||||
import nu.marginalia.contenttype.DocumentBodyToString;
|
import nu.marginalia.contenttype.DocumentBodyToString;
|
||||||
|
import nu.marginalia.coordination.DomainCoordinator;
|
||||||
|
import nu.marginalia.coordination.DomainLock;
|
||||||
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
|
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
|
||||||
import nu.marginalia.crawl.logic.DomainLocks;
|
|
||||||
import nu.marginalia.crawl.retreival.CrawlDelayTimer;
|
import nu.marginalia.crawl.retreival.CrawlDelayTimer;
|
||||||
import nu.marginalia.db.DbDomainQueries;
|
import nu.marginalia.db.DbDomainQueries;
|
||||||
import nu.marginalia.db.DomainBlacklist;
|
import nu.marginalia.db.DomainBlacklist;
|
||||||
@@ -46,14 +47,16 @@ public class SimpleLinkScraper implements AutoCloseable {
|
|||||||
private final DomainBlacklist domainBlacklist;
|
private final DomainBlacklist domainBlacklist;
|
||||||
private final Duration connectTimeout = Duration.ofSeconds(10);
|
private final Duration connectTimeout = Duration.ofSeconds(10);
|
||||||
private final Duration readTimeout = Duration.ofSeconds(10);
|
private final Duration readTimeout = Duration.ofSeconds(10);
|
||||||
private final DomainLocks domainLocks = new DomainLocks();
|
private final DomainCoordinator domainCoordinator;
|
||||||
|
|
||||||
private final static int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 10 * 1024 * 1024);
|
private final static int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 10 * 1024 * 1024);
|
||||||
|
|
||||||
public SimpleLinkScraper(LiveCrawlDataSet dataSet,
|
public SimpleLinkScraper(LiveCrawlDataSet dataSet,
|
||||||
|
DomainCoordinator domainCoordinator,
|
||||||
DbDomainQueries domainQueries,
|
DbDomainQueries domainQueries,
|
||||||
DomainBlacklist domainBlacklist) {
|
DomainBlacklist domainBlacklist) {
|
||||||
this.dataSet = dataSet;
|
this.dataSet = dataSet;
|
||||||
|
this.domainCoordinator = domainCoordinator;
|
||||||
this.domainQueries = domainQueries;
|
this.domainQueries = domainQueries;
|
||||||
this.domainBlacklist = domainBlacklist;
|
this.domainBlacklist = domainBlacklist;
|
||||||
}
|
}
|
||||||
@@ -98,7 +101,7 @@ public class SimpleLinkScraper implements AutoCloseable {
|
|||||||
.version(HttpClient.Version.HTTP_2)
|
.version(HttpClient.Version.HTTP_2)
|
||||||
.build();
|
.build();
|
||||||
// throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove:
|
// throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove:
|
||||||
DomainLocks.DomainLock lock = domainLocks.lockDomain(domain)
|
DomainLock lock = domainCoordinator.lockDomain(domain)
|
||||||
) {
|
) {
|
||||||
SimpleRobotRules rules = fetchRobotsRules(rootUrl, client);
|
SimpleRobotRules rules = fetchRobotsRules(rootUrl, client);
|
||||||
|
|
||||||
|
@@ -1,5 +1,6 @@
|
|||||||
package nu.marginalia.livecrawler;
|
package nu.marginalia.livecrawler;
|
||||||
|
|
||||||
|
import nu.marginalia.coordination.LocalDomainCoordinator;
|
||||||
import nu.marginalia.db.DomainBlacklistImpl;
|
import nu.marginalia.db.DomainBlacklistImpl;
|
||||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||||
import nu.marginalia.model.EdgeDomain;
|
import nu.marginalia.model.EdgeDomain;
|
||||||
@@ -37,7 +38,7 @@ class SimpleLinkScraperTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRetrieveNow() throws Exception {
|
public void testRetrieveNow() throws Exception {
|
||||||
var scraper = new SimpleLinkScraper(dataSet, null, Mockito.mock(DomainBlacklistImpl.class));
|
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(), null, Mockito.mock(DomainBlacklistImpl.class));
|
||||||
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
|
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
|
||||||
Assertions.assertEquals(1, fetched);
|
Assertions.assertEquals(1, fetched);
|
||||||
|
|
||||||
@@ -57,7 +58,7 @@ class SimpleLinkScraperTest {
|
|||||||
@Test
|
@Test
|
||||||
public void testRetrieveNow_Redundant() throws Exception {
|
public void testRetrieveNow_Redundant() throws Exception {
|
||||||
dataSet.saveDocument(1, new EdgeUrl("https://www.marginalia.nu/"), "<html>", "", "127.0.0.1");
|
dataSet.saveDocument(1, new EdgeUrl("https://www.marginalia.nu/"), "<html>", "", "127.0.0.1");
|
||||||
var scraper = new SimpleLinkScraper(dataSet, null, Mockito.mock(DomainBlacklistImpl.class));
|
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(),null, Mockito.mock(DomainBlacklistImpl.class));
|
||||||
|
|
||||||
// If the requested URL is already in the dataSet, we retrieveNow should shortcircuit and not fetch anything
|
// If the requested URL is already in the dataSet, we retrieveNow should shortcircuit and not fetch anything
|
||||||
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
|
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
|
||||||
|
@@ -27,6 +27,7 @@ dependencies {
|
|||||||
implementation project(':code:common:config')
|
implementation project(':code:common:config')
|
||||||
implementation project(':code:common:service')
|
implementation project(':code:common:service')
|
||||||
|
|
||||||
|
implementation project(':code:libraries:domain-lock')
|
||||||
implementation project(':code:libraries:geo-ip')
|
implementation project(':code:libraries:geo-ip')
|
||||||
implementation project(':code:libraries:message-queue')
|
implementation project(':code:libraries:message-queue')
|
||||||
|
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
package nu.marginalia.ping;
|
package nu.marginalia.ping;
|
||||||
|
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import nu.marginalia.coordination.DomainCoordinator;
|
||||||
import nu.marginalia.ping.model.*;
|
import nu.marginalia.ping.model.*;
|
||||||
import nu.marginalia.ping.svc.DnsPingService;
|
import nu.marginalia.ping.svc.DnsPingService;
|
||||||
import nu.marginalia.ping.svc.HttpPingService;
|
import nu.marginalia.ping.svc.HttpPingService;
|
||||||
@@ -23,6 +24,7 @@ import java.util.concurrent.TimeUnit;
|
|||||||
public class PingJobScheduler {
|
public class PingJobScheduler {
|
||||||
private final HttpPingService httpPingService;
|
private final HttpPingService httpPingService;
|
||||||
private final DnsPingService dnsPingService;
|
private final DnsPingService dnsPingService;
|
||||||
|
private final DomainCoordinator domainCoordinator;
|
||||||
private final PingDao pingDao;
|
private final PingDao pingDao;
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(PingJobScheduler.class);
|
private static final Logger logger = LoggerFactory.getLogger(PingJobScheduler.class);
|
||||||
@@ -43,10 +45,12 @@ public class PingJobScheduler {
|
|||||||
@Inject
|
@Inject
|
||||||
public PingJobScheduler(HttpPingService httpPingService,
|
public PingJobScheduler(HttpPingService httpPingService,
|
||||||
DnsPingService dnsPingService,
|
DnsPingService dnsPingService,
|
||||||
|
DomainCoordinator domainCoordinator,
|
||||||
PingDao pingDao)
|
PingDao pingDao)
|
||||||
{
|
{
|
||||||
this.httpPingService = httpPingService;
|
this.httpPingService = httpPingService;
|
||||||
this.dnsPingService = dnsPingService;
|
this.dnsPingService = dnsPingService;
|
||||||
|
this.domainCoordinator = domainCoordinator;
|
||||||
this.pingDao = pingDao;
|
this.pingDao = pingDao;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -5,15 +5,14 @@ import com.google.inject.Guice;
|
|||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Injector;
|
import com.google.inject.Injector;
|
||||||
import nu.marginalia.WmsaHome;
|
import nu.marginalia.WmsaHome;
|
||||||
|
import nu.marginalia.coordination.DomainCoordinationModule;
|
||||||
import nu.marginalia.geoip.GeoIpDictionary;
|
import nu.marginalia.geoip.GeoIpDictionary;
|
||||||
import nu.marginalia.mq.MessageQueueFactory;
|
import nu.marginalia.mq.MessageQueueFactory;
|
||||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||||
import nu.marginalia.mqapi.ping.PingRequest;
|
import nu.marginalia.mqapi.ping.PingRequest;
|
||||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
|
||||||
import nu.marginalia.process.ProcessConfiguration;
|
import nu.marginalia.process.ProcessConfiguration;
|
||||||
import nu.marginalia.process.ProcessConfigurationModule;
|
import nu.marginalia.process.ProcessConfigurationModule;
|
||||||
import nu.marginalia.process.ProcessMainClass;
|
import nu.marginalia.process.ProcessMainClass;
|
||||||
import nu.marginalia.service.discovery.ServiceRegistryIf;
|
|
||||||
import nu.marginalia.service.module.DatabaseModule;
|
import nu.marginalia.service.module.DatabaseModule;
|
||||||
import nu.marginalia.service.module.ServiceDiscoveryModule;
|
import nu.marginalia.service.module.ServiceDiscoveryModule;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@@ -25,8 +24,6 @@ public class PingMain extends ProcessMainClass {
|
|||||||
private static final Logger log = LoggerFactory.getLogger(PingMain.class);
|
private static final Logger log = LoggerFactory.getLogger(PingMain.class);
|
||||||
|
|
||||||
private final PingJobScheduler pingJobScheduler;
|
private final PingJobScheduler pingJobScheduler;
|
||||||
private final ServiceRegistryIf serviceRegistry;
|
|
||||||
private final NodeConfigurationService nodeConfigurationService;
|
|
||||||
private final int node;
|
private final int node;
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(PingMain.class);
|
private static final Logger logger = LoggerFactory.getLogger(PingMain.class);
|
||||||
@@ -36,15 +33,11 @@ public class PingMain extends ProcessMainClass {
|
|||||||
ProcessConfiguration config,
|
ProcessConfiguration config,
|
||||||
Gson gson,
|
Gson gson,
|
||||||
PingJobScheduler pingJobScheduler,
|
PingJobScheduler pingJobScheduler,
|
||||||
ServiceRegistryIf serviceRegistry,
|
|
||||||
NodeConfigurationService nodeConfigurationService,
|
|
||||||
ProcessConfiguration processConfiguration
|
ProcessConfiguration processConfiguration
|
||||||
) {
|
) {
|
||||||
super(messageQueueFactory, config, gson, ProcessInboxNames.PING_INBOX);
|
super(messageQueueFactory, config, gson, ProcessInboxNames.PING_INBOX);
|
||||||
|
|
||||||
this.pingJobScheduler = pingJobScheduler;
|
this.pingJobScheduler = pingJobScheduler;
|
||||||
this.serviceRegistry = serviceRegistry;
|
|
||||||
this.nodeConfigurationService = nodeConfigurationService;
|
|
||||||
this.node = processConfiguration.node();
|
this.node = processConfiguration.node();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -80,6 +73,7 @@ public class PingMain extends ProcessMainClass {
|
|||||||
Injector injector = Guice.createInjector(
|
Injector injector = Guice.createInjector(
|
||||||
new PingModule(),
|
new PingModule(),
|
||||||
new ServiceDiscoveryModule(),
|
new ServiceDiscoveryModule(),
|
||||||
|
new DomainCoordinationModule(),
|
||||||
new ProcessConfigurationModule("ping"),
|
new ProcessConfigurationModule("ping"),
|
||||||
new DatabaseModule(false)
|
new DatabaseModule(false)
|
||||||
);
|
);
|
||||||
|
@@ -1,3 +1,10 @@
|
|||||||
package nu.marginalia.ping.model;
|
package nu.marginalia.ping.model;
|
||||||
|
|
||||||
public record DomainReference(int domainId, int nodeId, String domainName) { }
|
import nu.marginalia.model.EdgeDomain;
|
||||||
|
|
||||||
|
public record DomainReference(int domainId, int nodeId, String domainName) {
|
||||||
|
public EdgeDomain asEdgeDomain() {
|
||||||
|
return new EdgeDomain(domainName);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
@@ -2,6 +2,7 @@ package nu.marginalia.ping.svc;
|
|||||||
|
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
|
import nu.marginalia.coordination.DomainCoordinator;
|
||||||
import nu.marginalia.ping.fetcher.PingHttpFetcher;
|
import nu.marginalia.ping.fetcher.PingHttpFetcher;
|
||||||
import nu.marginalia.ping.fetcher.response.*;
|
import nu.marginalia.ping.fetcher.response.*;
|
||||||
import nu.marginalia.ping.model.*;
|
import nu.marginalia.ping.model.*;
|
||||||
@@ -26,6 +27,7 @@ import java.util.List;
|
|||||||
@Singleton
|
@Singleton
|
||||||
public class HttpPingService {
|
public class HttpPingService {
|
||||||
|
|
||||||
|
private final DomainCoordinator domainCoordinator;
|
||||||
private final PingHttpFetcher pingHttpFetcher;
|
private final PingHttpFetcher pingHttpFetcher;
|
||||||
|
|
||||||
private final DomainAvailabilityInformationFactory domainAvailabilityInformationFactory;
|
private final DomainAvailabilityInformationFactory domainAvailabilityInformationFactory;
|
||||||
@@ -36,9 +38,11 @@ public class HttpPingService {
|
|||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public HttpPingService(
|
public HttpPingService(
|
||||||
|
DomainCoordinator domainCoordinator,
|
||||||
PingHttpFetcher pingHttpFetcher,
|
PingHttpFetcher pingHttpFetcher,
|
||||||
DomainAvailabilityInformationFactory domainAvailabilityInformationFactory,
|
DomainAvailabilityInformationFactory domainAvailabilityInformationFactory,
|
||||||
DomainSecurityInformationFactory domainSecurityInformationFactory) throws Exception {
|
DomainSecurityInformationFactory domainSecurityInformationFactory) throws Exception {
|
||||||
|
this.domainCoordinator = domainCoordinator;
|
||||||
this.pingHttpFetcher = pingHttpFetcher;
|
this.pingHttpFetcher = pingHttpFetcher;
|
||||||
this.domainAvailabilityInformationFactory = domainAvailabilityInformationFactory;
|
this.domainAvailabilityInformationFactory = domainAvailabilityInformationFactory;
|
||||||
this.domainSecurityInformationFactory = domainSecurityInformationFactory;
|
this.domainSecurityInformationFactory = domainSecurityInformationFactory;
|
||||||
@@ -59,7 +63,8 @@ public class HttpPingService {
|
|||||||
|
|
||||||
public List<WritableModel> pingDomain(DomainReference domainReference,
|
public List<WritableModel> pingDomain(DomainReference domainReference,
|
||||||
@Nullable DomainAvailabilityRecord oldPingStatus,
|
@Nullable DomainAvailabilityRecord oldPingStatus,
|
||||||
@Nullable DomainSecurityRecord oldSecurityInformation) throws SQLException {
|
@Nullable DomainSecurityRecord oldSecurityInformation) throws SQLException, InterruptedException {
|
||||||
|
|
||||||
// First we figure out if the domain maps to an IP address
|
// First we figure out if the domain maps to an IP address
|
||||||
|
|
||||||
List<WritableModel> generatedRecords = new ArrayList<>();
|
List<WritableModel> generatedRecords = new ArrayList<>();
|
||||||
@@ -69,26 +74,31 @@ public class HttpPingService {
|
|||||||
|
|
||||||
if (ipAddress.isEmpty()) {
|
if (ipAddress.isEmpty()) {
|
||||||
result = new UnknownHostError();
|
result = new UnknownHostError();
|
||||||
}
|
} else {
|
||||||
else {
|
// lock the domain to prevent concurrent pings
|
||||||
String url = "https://" + domainReference.domainName() + "/";
|
try (var _ = domainCoordinator.lockDomain(domainReference.asEdgeDomain())) {
|
||||||
String alternateUrl = "http://" + domainReference.domainName() + "/";
|
String url = "https://" + domainReference.domainName() + "/";
|
||||||
|
String alternateUrl = "http://" + domainReference.domainName() + "/";
|
||||||
|
|
||||||
result = pingHttpFetcher.fetchUrl(url, Method.HEAD, null, null);
|
result = pingHttpFetcher.fetchUrl(url, Method.HEAD, null, null);
|
||||||
|
|
||||||
if (result instanceof HttpsResponse response && shouldTryGET(response.httpStatus())) {
|
if (result instanceof HttpsResponse response && shouldTryGET(response.httpStatus())) {
|
||||||
sleep(Duration.ofSeconds(2));
|
|
||||||
result = pingHttpFetcher.fetchUrl(url, Method.GET, null, null);
|
|
||||||
}
|
|
||||||
else if (result instanceof ConnectionError) {
|
|
||||||
var result2 = pingHttpFetcher.fetchUrl(alternateUrl, Method.HEAD, null, null);
|
|
||||||
if (!(result2 instanceof ConnectionError)) {
|
|
||||||
result = result2;
|
|
||||||
}
|
|
||||||
if (result instanceof HttpResponse response && shouldTryGET(response.httpStatus())) {
|
|
||||||
sleep(Duration.ofSeconds(2));
|
sleep(Duration.ofSeconds(2));
|
||||||
result = pingHttpFetcher.fetchUrl(alternateUrl, Method.GET, null, null);
|
result = pingHttpFetcher.fetchUrl(url, Method.GET, null, null);
|
||||||
|
} else if (result instanceof ConnectionError) {
|
||||||
|
var result2 = pingHttpFetcher.fetchUrl(alternateUrl, Method.HEAD, null, null);
|
||||||
|
if (!(result2 instanceof ConnectionError)) {
|
||||||
|
result = result2;
|
||||||
|
}
|
||||||
|
if (result instanceof HttpResponse response && shouldTryGET(response.httpStatus())) {
|
||||||
|
sleep(Duration.ofSeconds(2));
|
||||||
|
result = pingHttpFetcher.fetchUrl(alternateUrl, Method.GET, null, null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add a grace sleep before we yield the semaphore, so that another thread doesn't
|
||||||
|
// immediately hammer the same domain after it's released.
|
||||||
|
sleep(Duration.ofSeconds(1));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -186,8 +196,8 @@ public class HttpPingService {
|
|||||||
}
|
}
|
||||||
if (oldSecurityInformation != null && newSecurityInformation != null) {
|
if (oldSecurityInformation != null && newSecurityInformation != null) {
|
||||||
compareSecurityInformation(generatedRecords,
|
compareSecurityInformation(generatedRecords,
|
||||||
oldSecurityInformation, oldPingStatus,
|
oldSecurityInformation, oldPingStatus,
|
||||||
newSecurityInformation, newPingStatus);
|
newSecurityInformation, newPingStatus);
|
||||||
}
|
}
|
||||||
|
|
||||||
return generatedRecords;
|
return generatedRecords;
|
||||||
|
@@ -2,6 +2,7 @@ package nu.marginalia.ping;
|
|||||||
|
|
||||||
import com.zaxxer.hikari.HikariConfig;
|
import com.zaxxer.hikari.HikariConfig;
|
||||||
import com.zaxxer.hikari.HikariDataSource;
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
|
import nu.marginalia.coordination.LocalDomainCoordinator;
|
||||||
import nu.marginalia.geoip.GeoIpDictionary;
|
import nu.marginalia.geoip.GeoIpDictionary;
|
||||||
import nu.marginalia.ping.fetcher.PingDnsFetcher;
|
import nu.marginalia.ping.fetcher.PingDnsFetcher;
|
||||||
import nu.marginalia.ping.fetcher.PingHttpFetcher;
|
import nu.marginalia.ping.fetcher.PingHttpFetcher;
|
||||||
@@ -85,11 +86,14 @@ class AvailabilityJobSchedulerTest {
|
|||||||
DomainDnsInformationFactory dnsDomainInformationFactory = new DomainDnsInformationFactory(processConfig, pic);
|
DomainDnsInformationFactory dnsDomainInformationFactory = new DomainDnsInformationFactory(processConfig, pic);
|
||||||
|
|
||||||
PingJobScheduler pingJobScheduler = new PingJobScheduler(
|
PingJobScheduler pingJobScheduler = new PingJobScheduler(
|
||||||
new HttpPingService(pingHttpFetcher,
|
new HttpPingService(
|
||||||
|
new LocalDomainCoordinator(),
|
||||||
|
pingHttpFetcher,
|
||||||
new DomainAvailabilityInformationFactory(new GeoIpDictionary(), new BackoffStrategy(pic)),
|
new DomainAvailabilityInformationFactory(new GeoIpDictionary(), new BackoffStrategy(pic)),
|
||||||
new DomainSecurityInformationFactory()),
|
new DomainSecurityInformationFactory()),
|
||||||
new DnsPingService(new PingDnsFetcher(List.of("8.8.8.8", "8.8.4.4")),
|
new DnsPingService(new PingDnsFetcher(List.of("8.8.8.8", "8.8.4.4")),
|
||||||
dnsDomainInformationFactory),
|
dnsDomainInformationFactory),
|
||||||
|
new LocalDomainCoordinator(),
|
||||||
pingDao
|
pingDao
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@@ -2,6 +2,7 @@ package nu.marginalia.ping;
|
|||||||
|
|
||||||
import com.zaxxer.hikari.HikariConfig;
|
import com.zaxxer.hikari.HikariConfig;
|
||||||
import com.zaxxer.hikari.HikariDataSource;
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
|
import nu.marginalia.coordination.LocalDomainCoordinator;
|
||||||
import nu.marginalia.geoip.GeoIpDictionary;
|
import nu.marginalia.geoip.GeoIpDictionary;
|
||||||
import nu.marginalia.ping.fetcher.PingHttpFetcher;
|
import nu.marginalia.ping.fetcher.PingHttpFetcher;
|
||||||
import nu.marginalia.ping.io.HttpClientProvider;
|
import nu.marginalia.ping.io.HttpClientProvider;
|
||||||
@@ -63,6 +64,7 @@ class PingHttpServiceTest {
|
|||||||
public void testGetSslInfo() throws Exception {
|
public void testGetSslInfo() throws Exception {
|
||||||
var provider = new HttpClientProvider();
|
var provider = new HttpClientProvider();
|
||||||
var pingService = new HttpPingService(
|
var pingService = new HttpPingService(
|
||||||
|
new LocalDomainCoordinator(),
|
||||||
new PingHttpFetcher(provider.get()),
|
new PingHttpFetcher(provider.get()),
|
||||||
new DomainAvailabilityInformationFactory(new GeoIpDictionary(),
|
new DomainAvailabilityInformationFactory(new GeoIpDictionary(),
|
||||||
new BackoffStrategy(PingModule.createPingIntervalsConfiguration())
|
new BackoffStrategy(PingModule.createPingIntervalsConfiguration())
|
||||||
|
@@ -37,6 +37,7 @@ dependencies {
|
|||||||
implementation project(':code:functions:domain-info')
|
implementation project(':code:functions:domain-info')
|
||||||
implementation project(':code:functions:domain-info:api')
|
implementation project(':code:functions:domain-info:api')
|
||||||
|
|
||||||
|
implementation project(':code:libraries:domain-lock')
|
||||||
implementation project(':code:libraries:geo-ip')
|
implementation project(':code:libraries:geo-ip')
|
||||||
implementation project(':code:libraries:language-processing')
|
implementation project(':code:libraries:language-processing')
|
||||||
implementation project(':code:libraries:term-frequency-dict')
|
implementation project(':code:libraries:term-frequency-dict')
|
||||||
|
@@ -5,6 +5,7 @@ import com.google.inject.Inject;
|
|||||||
import com.google.inject.Injector;
|
import com.google.inject.Injector;
|
||||||
import io.jooby.ExecutionMode;
|
import io.jooby.ExecutionMode;
|
||||||
import io.jooby.Jooby;
|
import io.jooby.Jooby;
|
||||||
|
import nu.marginalia.coordination.DomainCoordinationModule;
|
||||||
import nu.marginalia.livecapture.LivecaptureModule;
|
import nu.marginalia.livecapture.LivecaptureModule;
|
||||||
import nu.marginalia.service.MainClass;
|
import nu.marginalia.service.MainClass;
|
||||||
import nu.marginalia.service.ServiceId;
|
import nu.marginalia.service.ServiceId;
|
||||||
@@ -29,6 +30,7 @@ public class AssistantMain extends MainClass {
|
|||||||
Injector injector = Guice.createInjector(
|
Injector injector = Guice.createInjector(
|
||||||
new AssistantModule(),
|
new AssistantModule(),
|
||||||
new LivecaptureModule(),
|
new LivecaptureModule(),
|
||||||
|
new DomainCoordinationModule(),
|
||||||
new ServiceConfigurationModule(ServiceId.Assistant),
|
new ServiceConfigurationModule(ServiceId.Assistant),
|
||||||
new ServiceDiscoveryModule(),
|
new ServiceDiscoveryModule(),
|
||||||
new DatabaseModule(false)
|
new DatabaseModule(false)
|
||||||
|
@@ -55,6 +55,7 @@ include 'code:libraries:braille-block-punch-cards'
|
|||||||
include 'code:libraries:language-processing'
|
include 'code:libraries:language-processing'
|
||||||
include 'code:libraries:term-frequency-dict'
|
include 'code:libraries:term-frequency-dict'
|
||||||
include 'code:libraries:test-helpers'
|
include 'code:libraries:test-helpers'
|
||||||
|
include 'code:libraries:domain-lock'
|
||||||
|
|
||||||
include 'code:libraries:message-queue'
|
include 'code:libraries:message-queue'
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user