1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

3 Commits

Author SHA1 Message Date
Viktor Lofgren
82456ad673 (coordination) Trial the use of zookeeper for coordinating semaphores across multiple crawler-like processes
The performance implication of this needs to be evaluated.  If it does not hold water. some other solution may be required instead.
2025-06-14 16:16:10 +02:00
Viktor Lofgren
0882a6d9cd (ping) Correct retry logic by handling missing Retry-After header 2025-06-14 12:54:07 +02:00
Viktor Lofgren
5020029c2d (ping) Fix startup sequence for new primary-only flow 2025-06-14 12:48:09 +02:00
28 changed files with 330 additions and 201 deletions

View File

@@ -5,12 +5,10 @@ import nu.marginalia.service.discovery.monitor.ServiceChangeMonitor;
import nu.marginalia.service.discovery.monitor.ServiceMonitorIf;
import nu.marginalia.service.discovery.property.ServiceEndpoint;
import nu.marginalia.service.discovery.property.ServiceKey;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static nu.marginalia.service.discovery.property.ServiceEndpoint.InstanceAddress;
@@ -66,6 +64,6 @@ public interface ServiceRegistryIf {
void registerProcess(String processName, int nodeId);
void deregisterProcess(String processName, int nodeId);
void watchProcess(String processName, int nodeId, Consumer<Boolean> callback) throws Exception;
void watchProcessAnyNode(String processName, Collection<Integer> nodes, BiConsumer<Boolean, Integer> callback) throws Exception;
InterProcessSemaphoreV2 getSemaphore(String name, int permits) throws Exception;
}

View File

@@ -6,6 +6,7 @@ import nu.marginalia.service.discovery.monitor.ServiceMonitorIf;
import nu.marginalia.service.discovery.property.ServiceEndpoint;
import nu.marginalia.service.discovery.property.ServiceKey;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
@@ -13,10 +14,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static nu.marginalia.service.discovery.property.ServiceEndpoint.InstanceAddress;
@@ -283,60 +285,12 @@ public class ZkServiceRegistry implements ServiceRegistryIf {
}
@Override
public void watchProcess(String processName, int nodeId, Consumer<Boolean> callback) throws Exception {
String path = "/process-locks/" + processName + "/" + nodeId;
public InterProcessSemaphoreV2 getSemaphore(String name, int permits) {
if (stopped)
throw new IllegalStateException("Service registry is stopped, cannot get semaphore " + name);
// first check if the path exists and call the callback accordingly
if (curatorFramework.checkExists().forPath(path) != null) {
callback.accept(true);
}
else {
callback.accept(false);
}
curatorFramework.watchers().add()
.usingWatcher((Watcher) change -> {
Watcher.Event.EventType type = change.getType();
if (type == Watcher.Event.EventType.NodeCreated) {
callback.accept(true);
}
if (type == Watcher.Event.EventType.NodeDeleted) {
callback.accept(false);
}
})
.forPath(path);
}
@Override
public void watchProcessAnyNode(String processName, Collection<Integer> nodes, BiConsumer<Boolean, Integer> callback) throws Exception {
for (int node : nodes) {
String path = "/process-locks/" + processName + "/" + node;
// first check if the path exists and call the callback accordingly
if (curatorFramework.checkExists().forPath(path) != null) {
callback.accept(true, node);
}
else {
callback.accept(false, node);
}
curatorFramework.watchers().add()
.usingWatcher((Watcher) change -> {
Watcher.Event.EventType type = change.getType();
if (type == Watcher.Event.EventType.NodeCreated) {
callback.accept(true, node);
}
if (type == Watcher.Event.EventType.NodeDeleted) {
callback.accept(false, node);
}
})
.forPath(path);
}
String path = "/semaphores/" + name;
return new InterProcessSemaphoreV2(curatorFramework, path, permits);
}
/* Exposed for tests */

View File

@@ -22,6 +22,7 @@ dependencies {
implementation project(':code:common:db')
implementation project(':code:libraries:blocking-thread-pool')
implementation project(':code:libraries:message-queue')
implementation project(':code:libraries:domain-lock')
implementation project(':code:execution:api')
implementation project(':code:processes:crawling-process:ft-content-type')

View File

@@ -1,66 +0,0 @@
package nu.marginalia.rss.svc;
import nu.marginalia.model.EdgeDomain;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
/** Holds lock objects for each domain, to prevent multiple threads from
* crawling the same domain at the same time.
*/
public class DomainLocks {
// The locks are stored in a map, with the domain name as the key. This map will grow
// relatively big, but should be manageable since the number of domains is limited to
// a few hundred thousand typically.
private final Map<String, Semaphore> locks = new ConcurrentHashMap<>();
/** Returns a lock object corresponding to the given domain. The object is returned as-is,
* and may be held by another thread. The caller is responsible for locking and releasing the lock.
*/
public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException {
return new DomainLock(domain.toString(),
locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits));
}
private Semaphore defaultPermits(String topDomain) {
if (topDomain.equals("wordpress.com"))
return new Semaphore(16);
if (topDomain.equals("blogspot.com"))
return new Semaphore(8);
if (topDomain.equals("neocities.org"))
return new Semaphore(4);
if (topDomain.equals("github.io"))
return new Semaphore(4);
if (topDomain.equals("substack.com")) {
return new Semaphore(1);
}
if (topDomain.endsWith(".edu")) {
return new Semaphore(1);
}
return new Semaphore(2);
}
public static class DomainLock implements AutoCloseable {
private final String domainName;
private final Semaphore semaphore;
DomainLock(String domainName, Semaphore semaphore) throws InterruptedException {
this.domainName = domainName;
this.semaphore = semaphore;
Thread.currentThread().setName("fetching:" + domainName + " [await domain lock]");
semaphore.acquire();
Thread.currentThread().setName("fetching:" + domainName);
}
@Override
public void close() {
semaphore.release();
Thread.currentThread().setName("fetching:" + domainName + " [wrapping up]");
}
}
}

View File

@@ -5,6 +5,8 @@ import com.opencsv.CSVReader;
import nu.marginalia.WmsaHome;
import nu.marginalia.contenttype.ContentType;
import nu.marginalia.contenttype.DocumentBodyToString;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.coordination.DomainLock;
import nu.marginalia.executor.client.ExecutorClient;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.nodecfg.NodeConfigurationService;
@@ -51,12 +53,13 @@ public class FeedFetcherService {
private final ServiceHeartbeat serviceHeartbeat;
private final ExecutorClient executorClient;
private final DomainLocks domainLocks = new DomainLocks();
private final DomainCoordinator domainCoordinator;
private volatile boolean updating;
@Inject
public FeedFetcherService(FeedDb feedDb,
DomainCoordinator domainCoordinator,
FileStorageService fileStorageService,
NodeConfigurationService nodeConfigurationService,
ServiceHeartbeat serviceHeartbeat,
@@ -67,6 +70,7 @@ public class FeedFetcherService {
this.nodeConfigurationService = nodeConfigurationService;
this.serviceHeartbeat = serviceHeartbeat;
this.executorClient = executorClient;
this.domainCoordinator = domainCoordinator;
}
public enum UpdateMode {
@@ -132,7 +136,7 @@ public class FeedFetcherService {
};
FetchResult feedData;
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
try (DomainLock domainLock = domainCoordinator.lockDomain(new EdgeDomain(feed.domain()))) {
feedData = fetchFeedData(feed, client, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
} catch (Exception ex) {
feedData = new FetchResult.TransientError();

View File

@@ -0,0 +1,32 @@
plugins {
id 'java'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
}
}
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation libs.bundles.slf4j
implementation project(':code:common:model')
implementation project(':code:common:config')
implementation project(':code:common:service')
implementation libs.bundles.curator
implementation libs.guava
implementation dependencies.create(libs.guice.get()) {
exclude group: 'com.google.guava'
}
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}
test {
useJUnitPlatform()
}

View File

@@ -0,0 +1,32 @@
package nu.marginalia.coordination;
import nu.marginalia.model.EdgeDomain;
public class DefaultDomainPermits {
public static int defaultPermits(EdgeDomain domain) {
return defaultPermits(domain.topDomain.toLowerCase());
}
public static int defaultPermits(String topDomain) {
if (topDomain.equals("wordpress.com"))
return 16;
if (topDomain.equals("blogspot.com"))
return 8;
if (topDomain.equals("tumblr.com"))
return 8;
if (topDomain.equals("neocities.org"))
return 8;
if (topDomain.equals("github.io"))
return 8;
// Substack really dislikes broad-scale crawlers, so we need to be careful
// to not get blocked.
if (topDomain.equals("substack.com")) {
return 1;
}
return 2;
}
}

View File

@@ -0,0 +1,17 @@
package nu.marginalia.coordination;
import com.google.inject.AbstractModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DomainCoordinationModule extends AbstractModule {
private static final Logger logger = LoggerFactory.getLogger(DomainCoordinationModule.class);
public DomainCoordinationModule() {
}
public void configure() {
bind(DomainCoordinator.class).to(ZookeeperDomainCoordinator.class);
}
}

View File

@@ -0,0 +1,13 @@
package nu.marginalia.coordination;
import nu.marginalia.model.EdgeDomain;
import java.time.Duration;
import java.util.Optional;
public interface DomainCoordinator {
DomainLock lockDomain(EdgeDomain domain) throws InterruptedException;
Optional<DomainLock> tryLockDomain(EdgeDomain domain, Duration timeout) throws InterruptedException;
Optional<DomainLock> tryLockDomain(EdgeDomain domain) throws InterruptedException;
boolean isLockableHint(EdgeDomain domain);
}

View File

@@ -0,0 +1,5 @@
package nu.marginalia.coordination;
public interface DomainLock extends AutoCloseable {
void close();
}

View File

@@ -1,16 +1,15 @@
package nu.marginalia.crawl.logic;
package nu.marginalia.coordination;
import nu.marginalia.model.EdgeDomain;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/** Holds lock objects for each domain, to prevent multiple threads from
* crawling the same domain at the same time.
*/
public class DomainLocks {
public class LocalDomainCoordinator implements DomainCoordinator {
// The locks are stored in a map, with the domain name as the key. This map will grow
// relatively big, but should be manageable since the number of domains is limited to
// a few hundred thousand typically.
@@ -24,13 +23,25 @@ public class DomainLocks {
sem.acquire();
return new DomainLock(sem);
return new LocalDomainLock(sem);
}
public Optional<DomainLock> tryLockDomain(EdgeDomain domain) {
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
if (sem.tryAcquire(1)) {
return Optional.of(new DomainLock(sem));
return Optional.of(new LocalDomainLock(sem));
}
else {
// We don't have a lock, so we return an empty optional
return Optional.empty();
}
}
public Optional<DomainLock> tryLockDomain(EdgeDomain domain, Duration timeout) throws InterruptedException {
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
if (sem.tryAcquire(1, timeout.toMillis(), TimeUnit.MILLISECONDS)) {
return Optional.of(new LocalDomainLock(sem));
}
else {
// We don't have a lock, so we return an empty optional
@@ -39,24 +50,7 @@ public class DomainLocks {
}
private Semaphore defaultPermits(String topDomain) {
if (topDomain.equals("wordpress.com"))
return new Semaphore(16);
if (topDomain.equals("blogspot.com"))
return new Semaphore(8);
if (topDomain.equals("tumblr.com"))
return new Semaphore(8);
if (topDomain.equals("neocities.org"))
return new Semaphore(8);
if (topDomain.equals("github.io"))
return new Semaphore(8);
// Substack really dislikes broad-scale crawlers, so we need to be careful
// to not get blocked.
if (topDomain.equals("substack.com")) {
return new Semaphore(1);
}
return new Semaphore(2);
return new Semaphore(DefaultDomainPermits.defaultPermits(topDomain));
}
/** Returns true if the domain is lockable, i.e. if it is not already locked by another thread.
@@ -71,15 +65,15 @@ public class DomainLocks {
return sem.availablePermits() > 0;
}
public static class DomainLock implements AutoCloseable {
public static class LocalDomainLock implements DomainLock {
private final Semaphore semaphore;
DomainLock(Semaphore semaphore) {
LocalDomainLock(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void close() throws Exception {
public void close() {
semaphore.release();
Thread.currentThread().setName("[idle]");
}

View File

@@ -0,0 +1,101 @@
package nu.marginalia.coordination;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.service.discovery.ServiceRegistryIf;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
import org.apache.curator.framework.recipes.locks.Lease;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class ZookeeperDomainCoordinator implements DomainCoordinator {
// The locks are stored in a map, with the domain name as the key. This map will grow
// relatively big, but should be manageable since the number of domains is limited to
// a few hundred thousand typically.
private final Map<String, InterProcessSemaphoreV2> locks = new ConcurrentHashMap<>();
private final ServiceRegistryIf serviceRegistry;
private final int nodeId;
@Inject
public ZookeeperDomainCoordinator(ServiceRegistryIf serviceRegistry, @Named("wmsa-system-node") int nodeId) {
// Zookeeper-specific initialization can be done here if needed
this.serviceRegistry = serviceRegistry;
this.nodeId = nodeId;
}
/** Returns a lock object corresponding to the given domain. The object is returned as-is,
* and may be held by another thread. The caller is responsible for locking and releasing the lock.
*/
public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException {
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::createSemapore);
try {
var lease = sem.acquire();
return new ZkDomainLock(sem, lease);
}
catch (Exception e) {
throw new RuntimeException("Failed to acquire lock for domain: " + domain.topDomain, e);
}
}
public Optional<DomainLock> tryLockDomain(EdgeDomain domain) throws InterruptedException {
return tryLockDomain(domain, Duration.ofSeconds(1)); // Underlying semaphore doesn't have a tryLock method, so we use a short timeout
}
public Optional<DomainLock> tryLockDomain(EdgeDomain domain, Duration timeout) throws InterruptedException {
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::createSemapore);
try {
var lease = sem.acquire(timeout.toMillis(), TimeUnit.MILLISECONDS); // Acquire with timeout
if (lease != null) {
return Optional.of(new ZkDomainLock(sem, lease));
}
else {
return Optional.empty(); // If we fail to acquire the lease, we return an empty optional
}
}
catch (Exception e) {
return Optional.empty(); // If we fail to acquire the lock, we return an empty optional
}
}
private InterProcessSemaphoreV2 createSemapore(String topDomain){
try {
return serviceRegistry.getSemaphore(topDomain + ":" + nodeId, DefaultDomainPermits.defaultPermits(topDomain));
}
catch (Exception e) {
throw new RuntimeException("Failed to get semaphore for domain: " + topDomain, e);
}
}
/** Returns true if the domain is lockable, i.e. if it is not already locked by another thread.
* (this is just a hint, and does not guarantee that the domain is actually lockable any time
* after this method returns true)
*/
public boolean isLockableHint(EdgeDomain domain) {
return true; // Curator does not provide a way to check if a lock is available without acquiring it
}
public static class ZkDomainLock implements DomainLock {
private final InterProcessSemaphoreV2 semaphore;
private final Lease lease;
ZkDomainLock(InterProcessSemaphoreV2 semaphore, Lease lease) {
this.semaphore = semaphore;
this.lease = lease;
}
@Override
public void close() {
semaphore.returnLease(lease);
}
}
}

View File

@@ -32,6 +32,7 @@ dependencies {
implementation project(':code:libraries:message-queue')
implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:easy-lsh')
implementation project(':code:libraries:domain-lock')
implementation project(':code:processes:crawling-process:model')
implementation project(':code:processes:crawling-process:model')

View File

@@ -10,9 +10,11 @@ import nu.marginalia.WmsaHome;
import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.atags.source.AnchorTagsSource;
import nu.marginalia.atags.source.AnchorTagsSourceFactory;
import nu.marginalia.coordination.DomainCoordinationModule;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.coordination.DomainLock;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.logic.DomainLocks;
import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.DomainProber;
@@ -68,7 +70,7 @@ public class CrawlerMain extends ProcessMainClass {
private final ServiceRegistryIf serviceRegistry;
private final SimpleBlockingThreadPool pool;
private final DomainLocks domainLocks = new DomainLocks();
private final DomainCoordinator domainCoordinator;
private final Map<String, CrawlTask> pendingCrawlTasks = new ConcurrentHashMap<>();
@@ -97,6 +99,7 @@ public class CrawlerMain extends ProcessMainClass {
WarcArchiverFactory warcArchiverFactory,
HikariDataSource dataSource,
DomainBlacklist blacklist,
DomainCoordinator domainCoordinator,
ServiceRegistryIf serviceRegistry,
Gson gson) throws InterruptedException {
@@ -114,6 +117,7 @@ public class CrawlerMain extends ProcessMainClass {
this.blacklist = blacklist;
this.node = processConfiguration.node();
this.serviceRegistry = serviceRegistry;
this.domainCoordinator = domainCoordinator;
SimpleBlockingThreadPool.ThreadType threadType;
if (Boolean.getBoolean("crawler.useVirtualThreads")) {
@@ -157,6 +161,7 @@ public class CrawlerMain extends ProcessMainClass {
new CrawlerModule(),
new ProcessConfigurationModule("crawler"),
new ServiceDiscoveryModule(),
new DomainCoordinationModule(),
new DatabaseModule(false)
);
var crawler = injector.getInstance(CrawlerMain.class);
@@ -451,7 +456,7 @@ public class CrawlerMain extends ProcessMainClass {
/** Best effort indicator whether we could start this now without getting stuck in
* DomainLocks purgatory */
public boolean canRun() {
return domainLocks.isLockableHint(new EdgeDomain(domain));
return domainCoordinator.isLockableHint(new EdgeDomain(domain));
}
@Override
@@ -462,7 +467,7 @@ public class CrawlerMain extends ProcessMainClass {
return;
}
Optional<DomainLocks.DomainLock> lock = domainLocks.tryLockDomain(new EdgeDomain(domain));
Optional<DomainLock> lock = domainCoordinator.tryLockDomain(new EdgeDomain(domain));
// We don't have a lock, so we can't run this task
// we return to avoid blocking the pool for too long
if (lock.isEmpty()) {
@@ -470,7 +475,7 @@ public class CrawlerMain extends ProcessMainClass {
retryQueue.put(this);
return;
}
DomainLocks.DomainLock domainLock = lock.get();
DomainLock domainLock = lock.get();
try (domainLock) {
Thread.currentThread().setName("crawling:" + domain);

View File

@@ -32,6 +32,7 @@ dependencies {
implementation project(':code:index:api')
implementation project(':code:processes:process-mq-api')
implementation project(':code:libraries:message-queue')
implementation project(':code:libraries:domain-lock')
implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:easy-lsh')
implementation project(':code:processes:crawling-process')

View File

@@ -10,6 +10,8 @@ import nu.marginalia.api.feeds.FeedsClient;
import nu.marginalia.converting.ConverterModule;
import nu.marginalia.converting.processor.DomainProcessor;
import nu.marginalia.converting.writer.ConverterBatchWriter;
import nu.marginalia.coordination.DomainCoordinationModule;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.db.DomainBlacklist;
import nu.marginalia.io.SerializableCrawlDataStream;
@@ -58,6 +60,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
private final FileStorageService fileStorageService;
private final KeywordLoaderService keywordLoaderService;
private final DocumentLoaderService documentLoaderService;
private final DomainCoordinator domainCoordinator;
private final HikariDataSource dataSource;
@Inject
@@ -71,7 +74,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
DomainProcessor domainProcessor,
FileStorageService fileStorageService,
KeywordLoaderService keywordLoaderService,
DocumentLoaderService documentLoaderService, HikariDataSource dataSource)
DocumentLoaderService documentLoaderService, DomainCoordinator domainCoordinator, HikariDataSource dataSource)
throws Exception
{
super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX);
@@ -84,6 +87,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
this.fileStorageService = fileStorageService;
this.keywordLoaderService = keywordLoaderService;
this.documentLoaderService = documentLoaderService;
this.domainCoordinator = domainCoordinator;
this.dataSource = dataSource;
domainBlacklist.waitUntilLoaded();
@@ -107,6 +111,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
try {
Injector injector = Guice.createInjector(
new LiveCrawlerModule(),
new DomainCoordinationModule(), // 2 hours lease timeout is enough for the live crawler
new ProcessConfigurationModule("crawler"),
new ConverterModule(),
new ServiceDiscoveryModule(),
@@ -172,7 +177,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
processHeartbeat.progress(LiveCrawlState.CRAWLING);
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainQueries, domainBlacklist);
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainCoordinator, domainQueries, domainBlacklist);
var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling"))
{
for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) {

View File

@@ -5,8 +5,9 @@ import crawlercommons.robots.SimpleRobotRulesParser;
import nu.marginalia.WmsaHome;
import nu.marginalia.contenttype.ContentType;
import nu.marginalia.contenttype.DocumentBodyToString;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.coordination.DomainLock;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.logic.DomainLocks;
import nu.marginalia.crawl.retreival.CrawlDelayTimer;
import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.db.DomainBlacklist;
@@ -46,14 +47,16 @@ public class SimpleLinkScraper implements AutoCloseable {
private final DomainBlacklist domainBlacklist;
private final Duration connectTimeout = Duration.ofSeconds(10);
private final Duration readTimeout = Duration.ofSeconds(10);
private final DomainLocks domainLocks = new DomainLocks();
private final DomainCoordinator domainCoordinator;
private final static int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 10 * 1024 * 1024);
public SimpleLinkScraper(LiveCrawlDataSet dataSet,
DomainCoordinator domainCoordinator,
DbDomainQueries domainQueries,
DomainBlacklist domainBlacklist) {
this.dataSet = dataSet;
this.domainCoordinator = domainCoordinator;
this.domainQueries = domainQueries;
this.domainBlacklist = domainBlacklist;
}
@@ -98,7 +101,7 @@ public class SimpleLinkScraper implements AutoCloseable {
.version(HttpClient.Version.HTTP_2)
.build();
// throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove:
DomainLocks.DomainLock lock = domainLocks.lockDomain(domain)
DomainLock lock = domainCoordinator.lockDomain(domain)
) {
SimpleRobotRules rules = fetchRobotsRules(rootUrl, client);

View File

@@ -1,5 +1,6 @@
package nu.marginalia.livecrawler;
import nu.marginalia.coordination.LocalDomainCoordinator;
import nu.marginalia.db.DomainBlacklistImpl;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.model.EdgeDomain;
@@ -37,7 +38,7 @@ class SimpleLinkScraperTest {
@Test
public void testRetrieveNow() throws Exception {
var scraper = new SimpleLinkScraper(dataSet, null, Mockito.mock(DomainBlacklistImpl.class));
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(), null, Mockito.mock(DomainBlacklistImpl.class));
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
Assertions.assertEquals(1, fetched);
@@ -57,7 +58,7 @@ class SimpleLinkScraperTest {
@Test
public void testRetrieveNow_Redundant() throws Exception {
dataSet.saveDocument(1, new EdgeUrl("https://www.marginalia.nu/"), "<html>", "", "127.0.0.1");
var scraper = new SimpleLinkScraper(dataSet, null, Mockito.mock(DomainBlacklistImpl.class));
var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(),null, Mockito.mock(DomainBlacklistImpl.class));
// If the requested URL is already in the dataSet, we retrieveNow should shortcircuit and not fetch anything
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));

View File

@@ -27,6 +27,7 @@ dependencies {
implementation project(':code:common:config')
implementation project(':code:common:service')
implementation project(':code:libraries:domain-lock')
implementation project(':code:libraries:geo-ip')
implementation project(':code:libraries:message-queue')

View File

@@ -1,6 +1,7 @@
package nu.marginalia.ping;
import com.google.inject.Inject;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.ping.model.*;
import nu.marginalia.ping.svc.DnsPingService;
import nu.marginalia.ping.svc.HttpPingService;
@@ -23,6 +24,7 @@ import java.util.concurrent.TimeUnit;
public class PingJobScheduler {
private final HttpPingService httpPingService;
private final DnsPingService dnsPingService;
private final DomainCoordinator domainCoordinator;
private final PingDao pingDao;
private static final Logger logger = LoggerFactory.getLogger(PingJobScheduler.class);
@@ -43,14 +45,16 @@ public class PingJobScheduler {
@Inject
public PingJobScheduler(HttpPingService httpPingService,
DnsPingService dnsPingService,
DomainCoordinator domainCoordinator,
PingDao pingDao)
{
this.httpPingService = httpPingService;
this.dnsPingService = dnsPingService;
this.domainCoordinator = domainCoordinator;
this.pingDao = pingDao;
}
public synchronized void start(boolean startPaused) {
public synchronized void start() {
if (running)
return;
@@ -100,7 +104,7 @@ public class PingJobScheduler {
logger.info("PingJobScheduler paused");
}
public synchronized void resume(int nodeId) {
public synchronized void enableForNode(int nodeId) {
logger.info("Resuming PingJobScheduler for nodeId: {}", nodeId);
if (this.nodeId != null) {
logger.warn("Attempted to resume PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId);

View File

@@ -5,15 +5,14 @@ import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.WmsaHome;
import nu.marginalia.coordination.DomainCoordinationModule;
import nu.marginalia.geoip.GeoIpDictionary;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqapi.ping.PingRequest;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass;
import nu.marginalia.service.discovery.ServiceRegistryIf;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.module.ServiceDiscoveryModule;
import org.slf4j.Logger;
@@ -25,8 +24,6 @@ public class PingMain extends ProcessMainClass {
private static final Logger log = LoggerFactory.getLogger(PingMain.class);
private final PingJobScheduler pingJobScheduler;
private final ServiceRegistryIf serviceRegistry;
private final NodeConfigurationService nodeConfigurationService;
private final int node;
private static final Logger logger = LoggerFactory.getLogger(PingMain.class);
@@ -36,15 +33,11 @@ public class PingMain extends ProcessMainClass {
ProcessConfiguration config,
Gson gson,
PingJobScheduler pingJobScheduler,
ServiceRegistryIf serviceRegistry,
NodeConfigurationService nodeConfigurationService,
ProcessConfiguration processConfiguration
) {
super(messageQueueFactory, config, gson, ProcessInboxNames.PING_INBOX);
this.pingJobScheduler = pingJobScheduler;
this.serviceRegistry = serviceRegistry;
this.nodeConfigurationService = nodeConfigurationService;
this.node = processConfiguration.node();
}
@@ -52,7 +45,8 @@ public class PingMain extends ProcessMainClass {
log.info("Starting PingMain...");
// Start the ping job scheduler
pingJobScheduler.start(true);
pingJobScheduler.start();
pingJobScheduler.enableForNode(node);
log.info("PingMain started successfully.");
}
@@ -79,6 +73,7 @@ public class PingMain extends ProcessMainClass {
Injector injector = Guice.createInjector(
new PingModule(),
new ServiceDiscoveryModule(),
new DomainCoordinationModule(),
new ProcessConfigurationModule("ping"),
new DatabaseModule(false)
);

View File

@@ -50,7 +50,12 @@ public class RetryStrategy implements HttpRequestRetryStrategy {
if (statusCode == 429) {
// get the Retry-After header
String retryAfter = response.getFirstHeader("Retry-After").getValue();
var retryAfterHeader = response.getFirstHeader("Retry-After");
if (retryAfterHeader == null) {
return TimeValue.ofSeconds(3);
}
String retryAfter = retryAfterHeader.getValue();
if (retryAfter == null) {
return TimeValue.ofSeconds(2);
}

View File

@@ -1,3 +1,10 @@
package nu.marginalia.ping.model;
public record DomainReference(int domainId, int nodeId, String domainName) { }
import nu.marginalia.model.EdgeDomain;
public record DomainReference(int domainId, int nodeId, String domainName) {
public EdgeDomain asEdgeDomain() {
return new EdgeDomain(domainName);
}
}

View File

@@ -2,6 +2,7 @@ package nu.marginalia.ping.svc;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.ping.fetcher.PingHttpFetcher;
import nu.marginalia.ping.fetcher.response.*;
import nu.marginalia.ping.model.*;
@@ -26,6 +27,7 @@ import java.util.List;
@Singleton
public class HttpPingService {
private final DomainCoordinator domainCoordinator;
private final PingHttpFetcher pingHttpFetcher;
private final DomainAvailabilityInformationFactory domainAvailabilityInformationFactory;
@@ -36,9 +38,11 @@ public class HttpPingService {
@Inject
public HttpPingService(
DomainCoordinator domainCoordinator,
PingHttpFetcher pingHttpFetcher,
DomainAvailabilityInformationFactory domainAvailabilityInformationFactory,
DomainSecurityInformationFactory domainSecurityInformationFactory) throws Exception {
this.domainCoordinator = domainCoordinator;
this.pingHttpFetcher = pingHttpFetcher;
this.domainAvailabilityInformationFactory = domainAvailabilityInformationFactory;
this.domainSecurityInformationFactory = domainSecurityInformationFactory;
@@ -59,7 +63,8 @@ public class HttpPingService {
public List<WritableModel> pingDomain(DomainReference domainReference,
@Nullable DomainAvailabilityRecord oldPingStatus,
@Nullable DomainSecurityRecord oldSecurityInformation) throws SQLException {
@Nullable DomainSecurityRecord oldSecurityInformation) throws SQLException, InterruptedException {
// First we figure out if the domain maps to an IP address
List<WritableModel> generatedRecords = new ArrayList<>();
@@ -69,8 +74,9 @@ public class HttpPingService {
if (ipAddress.isEmpty()) {
result = new UnknownHostError();
}
else {
} else {
// lock the domain to prevent concurrent pings
try (var _ = domainCoordinator.lockDomain(domainReference.asEdgeDomain())) {
String url = "https://" + domainReference.domainName() + "/";
String alternateUrl = "http://" + domainReference.domainName() + "/";
@@ -79,8 +85,7 @@ public class HttpPingService {
if (result instanceof HttpsResponse response && shouldTryGET(response.httpStatus())) {
sleep(Duration.ofSeconds(2));
result = pingHttpFetcher.fetchUrl(url, Method.GET, null, null);
}
else if (result instanceof ConnectionError) {
} else if (result instanceof ConnectionError) {
var result2 = pingHttpFetcher.fetchUrl(alternateUrl, Method.HEAD, null, null);
if (!(result2 instanceof ConnectionError)) {
result = result2;
@@ -90,6 +95,11 @@ public class HttpPingService {
result = pingHttpFetcher.fetchUrl(alternateUrl, Method.GET, null, null);
}
}
// Add a grace sleep before we yield the semaphore, so that another thread doesn't
// immediately hammer the same domain after it's released.
sleep(Duration.ofSeconds(1));
}
}

View File

@@ -2,6 +2,7 @@ package nu.marginalia.ping;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.coordination.LocalDomainCoordinator;
import nu.marginalia.geoip.GeoIpDictionary;
import nu.marginalia.ping.fetcher.PingDnsFetcher;
import nu.marginalia.ping.fetcher.PingHttpFetcher;
@@ -90,6 +91,7 @@ class AvailabilityJobSchedulerTest {
new DomainSecurityInformationFactory()),
new DnsPingService(new PingDnsFetcher(List.of("8.8.8.8", "8.8.4.4")),
dnsDomainInformationFactory),
new LocalDomainCoordinator(),
pingDao
);

View File

@@ -37,6 +37,7 @@ dependencies {
implementation project(':code:functions:domain-info')
implementation project(':code:functions:domain-info:api')
implementation project(':code:libraries:domain-lock')
implementation project(':code:libraries:geo-ip')
implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:term-frequency-dict')

View File

@@ -5,6 +5,7 @@ import com.google.inject.Inject;
import com.google.inject.Injector;
import io.jooby.ExecutionMode;
import io.jooby.Jooby;
import nu.marginalia.coordination.DomainCoordinationModule;
import nu.marginalia.livecapture.LivecaptureModule;
import nu.marginalia.service.MainClass;
import nu.marginalia.service.ServiceId;
@@ -29,6 +30,7 @@ public class AssistantMain extends MainClass {
Injector injector = Guice.createInjector(
new AssistantModule(),
new LivecaptureModule(),
new DomainCoordinationModule(),
new ServiceConfigurationModule(ServiceId.Assistant),
new ServiceDiscoveryModule(),
new DatabaseModule(false)

View File

@@ -55,6 +55,7 @@ include 'code:libraries:braille-block-punch-cards'
include 'code:libraries:language-processing'
include 'code:libraries:term-frequency-dict'
include 'code:libraries:test-helpers'
include 'code:libraries:domain-lock'
include 'code:libraries:message-queue'