1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

17 Commits

Author SHA1 Message Date
Viktor Lofgren
d556f8ae3a (ping) Ping server should not validate certificates 2025-06-16 00:08:30 +02:00
Viktor Lofgren
e37559837b (crawler) Crawler should validate certificates 2025-06-16 00:06:57 +02:00
Viktor Lofgren
3564c4aaee (ping) Route SSLHandshakeException to ConnectionError as well
This will mean we re-try these as an unencrypted Http connection
2025-06-15 20:31:33 +02:00
Viktor Lofgren
92c54563ab (ping) Reduce retry count on connection errors 2025-06-15 18:39:54 +02:00
Viktor Lofgren
d7a5d90b07 (ping) Store redirect location in availability record 2025-06-15 18:39:33 +02:00
Viktor Lofgren
0a0e88fd6e (ping) Fix schema drift between prod and flyway migrations 2025-06-15 17:20:21 +02:00
Viktor Lofgren
b4fc0c4368 (ping) Fix schema drift between prod and flyway migrations 2025-06-15 17:17:11 +02:00
Viktor Lofgren
87ee8765b8 (ping) Ensure ProtocolError->HTTP_CLIENT_ERROR retains its error message information 2025-06-15 16:54:27 +02:00
Viktor Lofgren
1adf4835fa (ping) Add schema change information to domain security events
Particularly the HTTPS->HTTP-change event appears to be a strong indicator of domain parking.
2025-06-15 16:47:49 +02:00
Viktor Lofgren
b7b5d0bf46 (ping) More accurately detect connection errors 2025-06-15 16:47:07 +02:00
Viktor Lofgren
416059adde (ping) Avoid thread starvation scenario in job scheduling
Adjust the queueing strategy to avoid thread starvation from whale domains with many subdomains all locking on the same semaphore and gunking up all threads by implementing a mechanism that returns jobs that can't be executed to the queue.

This will lead to some queue churn, but it should be fairly manageable given the small number of threads involved, and the fairly long job execution times.
2025-06-15 11:04:34 +02:00
Viktor Lofgren
db7930016a (coordination) Trial the use of zookeeper for coordinating semaphores across multiple crawler-like processes
+ fix two broken tests
2025-06-14 16:20:01 +02:00
Viktor Lofgren
82456ad673 (coordination) Trial the use of zookeeper for coordinating semaphores across multiple crawler-like processes
The performance implication of this needs to be evaluated.  If it does not hold water. some other solution may be required instead.
2025-06-14 16:16:10 +02:00
Viktor Lofgren
0882a6d9cd (ping) Correct retry logic by handling missing Retry-After header 2025-06-14 12:54:07 +02:00
Viktor Lofgren
5020029c2d (ping) Fix startup sequence for new primary-only flow 2025-06-14 12:48:09 +02:00
Viktor Lofgren
ac44d0b093 (ping) Fix wait logic to use synchronized block 2025-06-14 12:38:16 +02:00
Viktor Lofgren
4b32b9b10e Update DomainAvailabilityRecord to use clamped integer for HTTP response time 2025-06-14 12:37:58 +02:00
42 changed files with 549 additions and 265 deletions

View File

@@ -0,0 +1,5 @@
-- Add additional summary columns to DOMAIN_SECURITY_EVENTS table
-- to make it easier to make sense of certificate changes
ALTER TABLE DOMAIN_SECURITY_EVENTS ADD COLUMN CHANGE_SCHEMA ENUM('NONE', 'HTTP_TO_HTTPS', 'HTTPS_TO_HTTP', 'UNKNOWN') NOT NULL DEFAULT 'UNKNOWN';
OPTIMIZE TABLE DOMAIN_SECURITY_EVENTS;

View File

@@ -5,12 +5,10 @@ import nu.marginalia.service.discovery.monitor.ServiceChangeMonitor;
import nu.marginalia.service.discovery.monitor.ServiceMonitorIf; import nu.marginalia.service.discovery.monitor.ServiceMonitorIf;
import nu.marginalia.service.discovery.property.ServiceEndpoint; import nu.marginalia.service.discovery.property.ServiceEndpoint;
import nu.marginalia.service.discovery.property.ServiceKey; import nu.marginalia.service.discovery.property.ServiceKey;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static nu.marginalia.service.discovery.property.ServiceEndpoint.InstanceAddress; import static nu.marginalia.service.discovery.property.ServiceEndpoint.InstanceAddress;
@@ -66,6 +64,6 @@ public interface ServiceRegistryIf {
void registerProcess(String processName, int nodeId); void registerProcess(String processName, int nodeId);
void deregisterProcess(String processName, int nodeId); void deregisterProcess(String processName, int nodeId);
void watchProcess(String processName, int nodeId, Consumer<Boolean> callback) throws Exception;
void watchProcessAnyNode(String processName, Collection<Integer> nodes, BiConsumer<Boolean, Integer> callback) throws Exception; InterProcessSemaphoreV2 getSemaphore(String name, int permits) throws Exception;
} }

View File

@@ -6,6 +6,7 @@ import nu.marginalia.service.discovery.monitor.ServiceMonitorIf;
import nu.marginalia.service.discovery.property.ServiceEndpoint; import nu.marginalia.service.discovery.property.ServiceEndpoint;
import nu.marginalia.service.discovery.property.ServiceKey; import nu.marginalia.service.discovery.property.ServiceKey;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher;
@@ -13,10 +14,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.*; import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static nu.marginalia.service.discovery.property.ServiceEndpoint.InstanceAddress; import static nu.marginalia.service.discovery.property.ServiceEndpoint.InstanceAddress;
@@ -283,60 +285,12 @@ public class ZkServiceRegistry implements ServiceRegistryIf {
} }
@Override @Override
public void watchProcess(String processName, int nodeId, Consumer<Boolean> callback) throws Exception { public InterProcessSemaphoreV2 getSemaphore(String name, int permits) {
String path = "/process-locks/" + processName + "/" + nodeId; if (stopped)
throw new IllegalStateException("Service registry is stopped, cannot get semaphore " + name);
// first check if the path exists and call the callback accordingly String path = "/semaphores/" + name;
return new InterProcessSemaphoreV2(curatorFramework, path, permits);
if (curatorFramework.checkExists().forPath(path) != null) {
callback.accept(true);
}
else {
callback.accept(false);
}
curatorFramework.watchers().add()
.usingWatcher((Watcher) change -> {
Watcher.Event.EventType type = change.getType();
if (type == Watcher.Event.EventType.NodeCreated) {
callback.accept(true);
}
if (type == Watcher.Event.EventType.NodeDeleted) {
callback.accept(false);
}
})
.forPath(path);
}
@Override
public void watchProcessAnyNode(String processName, Collection<Integer> nodes, BiConsumer<Boolean, Integer> callback) throws Exception {
for (int node : nodes) {
String path = "/process-locks/" + processName + "/" + node;
// first check if the path exists and call the callback accordingly
if (curatorFramework.checkExists().forPath(path) != null) {
callback.accept(true, node);
}
else {
callback.accept(false, node);
}
curatorFramework.watchers().add()
.usingWatcher((Watcher) change -> {
Watcher.Event.EventType type = change.getType();
if (type == Watcher.Event.EventType.NodeCreated) {
callback.accept(true, node);
}
if (type == Watcher.Event.EventType.NodeDeleted) {
callback.accept(false, node);
}
})
.forPath(path);
}
} }
/* Exposed for tests */ /* Exposed for tests */

View File

@@ -22,6 +22,7 @@ dependencies {
implementation project(':code:common:db') implementation project(':code:common:db')
implementation project(':code:libraries:blocking-thread-pool') implementation project(':code:libraries:blocking-thread-pool')
implementation project(':code:libraries:message-queue') implementation project(':code:libraries:message-queue')
implementation project(':code:libraries:domain-lock')
implementation project(':code:execution:api') implementation project(':code:execution:api')
implementation project(':code:processes:crawling-process:ft-content-type') implementation project(':code:processes:crawling-process:ft-content-type')

View File

@@ -1,66 +0,0 @@
package nu.marginalia.rss.svc;
import nu.marginalia.model.EdgeDomain;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
/** Holds lock objects for each domain, to prevent multiple threads from
* crawling the same domain at the same time.
*/
public class DomainLocks {
// The locks are stored in a map, with the domain name as the key. This map will grow
// relatively big, but should be manageable since the number of domains is limited to
// a few hundred thousand typically.
private final Map<String, Semaphore> locks = new ConcurrentHashMap<>();
/** Returns a lock object corresponding to the given domain. The object is returned as-is,
* and may be held by another thread. The caller is responsible for locking and releasing the lock.
*/
public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException {
return new DomainLock(domain.toString(),
locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits));
}
private Semaphore defaultPermits(String topDomain) {
if (topDomain.equals("wordpress.com"))
return new Semaphore(16);
if (topDomain.equals("blogspot.com"))
return new Semaphore(8);
if (topDomain.equals("neocities.org"))
return new Semaphore(4);
if (topDomain.equals("github.io"))
return new Semaphore(4);
if (topDomain.equals("substack.com")) {
return new Semaphore(1);
}
if (topDomain.endsWith(".edu")) {
return new Semaphore(1);
}
return new Semaphore(2);
}
public static class DomainLock implements AutoCloseable {
private final String domainName;
private final Semaphore semaphore;
DomainLock(String domainName, Semaphore semaphore) throws InterruptedException {
this.domainName = domainName;
this.semaphore = semaphore;
Thread.currentThread().setName("fetching:" + domainName + " [await domain lock]");
semaphore.acquire();
Thread.currentThread().setName("fetching:" + domainName);
}
@Override
public void close() {
semaphore.release();
Thread.currentThread().setName("fetching:" + domainName + " [wrapping up]");
}
}
}

View File

@@ -5,6 +5,8 @@ import com.opencsv.CSVReader;
import nu.marginalia.WmsaHome; import nu.marginalia.WmsaHome;
import nu.marginalia.contenttype.ContentType; import nu.marginalia.contenttype.ContentType;
import nu.marginalia.contenttype.DocumentBodyToString; import nu.marginalia.contenttype.DocumentBodyToString;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.coordination.DomainLock;
import nu.marginalia.executor.client.ExecutorClient; import nu.marginalia.executor.client.ExecutorClient;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.nodecfg.NodeConfigurationService; import nu.marginalia.nodecfg.NodeConfigurationService;
@@ -51,12 +53,13 @@ public class FeedFetcherService {
private final ServiceHeartbeat serviceHeartbeat; private final ServiceHeartbeat serviceHeartbeat;
private final ExecutorClient executorClient; private final ExecutorClient executorClient;
private final DomainLocks domainLocks = new DomainLocks(); private final DomainCoordinator domainCoordinator;
private volatile boolean updating; private volatile boolean updating;
@Inject @Inject
public FeedFetcherService(FeedDb feedDb, public FeedFetcherService(FeedDb feedDb,
DomainCoordinator domainCoordinator,
FileStorageService fileStorageService, FileStorageService fileStorageService,
NodeConfigurationService nodeConfigurationService, NodeConfigurationService nodeConfigurationService,
ServiceHeartbeat serviceHeartbeat, ServiceHeartbeat serviceHeartbeat,
@@ -67,6 +70,7 @@ public class FeedFetcherService {
this.nodeConfigurationService = nodeConfigurationService; this.nodeConfigurationService = nodeConfigurationService;
this.serviceHeartbeat = serviceHeartbeat; this.serviceHeartbeat = serviceHeartbeat;
this.executorClient = executorClient; this.executorClient = executorClient;
this.domainCoordinator = domainCoordinator;
} }
public enum UpdateMode { public enum UpdateMode {
@@ -132,7 +136,7 @@ public class FeedFetcherService {
}; };
FetchResult feedData; FetchResult feedData;
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) { try (DomainLock domainLock = domainCoordinator.lockDomain(new EdgeDomain(feed.domain()))) {
feedData = fetchFeedData(feed, client, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag); feedData = fetchFeedData(feed, client, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
} catch (Exception ex) { } catch (Exception ex) {
feedData = new FetchResult.TransientError(); feedData = new FetchResult.TransientError();

View File

@@ -0,0 +1,32 @@
plugins {
id 'java'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
}
}
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation libs.bundles.slf4j
implementation project(':code:common:model')
implementation project(':code:common:config')
implementation project(':code:common:service')
implementation libs.bundles.curator
implementation libs.guava
implementation dependencies.create(libs.guice.get()) {
exclude group: 'com.google.guava'
}
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}
test {
useJUnitPlatform()
}

View File

@@ -0,0 +1,32 @@
package nu.marginalia.coordination;
import nu.marginalia.model.EdgeDomain;
public class DefaultDomainPermits {
public static int defaultPermits(EdgeDomain domain) {
return defaultPermits(domain.topDomain.toLowerCase());
}
public static int defaultPermits(String topDomain) {
if (topDomain.equals("wordpress.com"))
return 16;
if (topDomain.equals("blogspot.com"))
return 8;
if (topDomain.equals("tumblr.com"))
return 8;
if (topDomain.equals("neocities.org"))
return 8;
if (topDomain.equals("github.io"))
return 8;
// Substack really dislikes broad-scale crawlers, so we need to be careful
// to not get blocked.
if (topDomain.equals("substack.com")) {
return 1;
}
return 2;
}
}

View File

@@ -0,0 +1,17 @@
package nu.marginalia.coordination;
import com.google.inject.AbstractModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DomainCoordinationModule extends AbstractModule {
private static final Logger logger = LoggerFactory.getLogger(DomainCoordinationModule.class);
public DomainCoordinationModule() {
}
public void configure() {
bind(DomainCoordinator.class).to(ZookeeperDomainCoordinator.class);
}
}

View File

@@ -0,0 +1,13 @@
package nu.marginalia.coordination;
import nu.marginalia.model.EdgeDomain;
import java.time.Duration;
import java.util.Optional;
public interface DomainCoordinator {
DomainLock lockDomain(EdgeDomain domain) throws InterruptedException;
Optional<DomainLock> tryLockDomain(EdgeDomain domain, Duration timeout) throws InterruptedException;
Optional<DomainLock> tryLockDomain(EdgeDomain domain) throws InterruptedException;
boolean isLockableHint(EdgeDomain domain);
}

View File

@@ -0,0 +1,5 @@
package nu.marginalia.coordination;
public interface DomainLock extends AutoCloseable {
void close();
}

View File

@@ -1,16 +1,17 @@
package nu.marginalia.crawl.logic; package nu.marginalia.coordination;
import com.google.inject.Singleton;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/** Holds lock objects for each domain, to prevent multiple threads from @Singleton
* crawling the same domain at the same time. public class LocalDomainCoordinator implements DomainCoordinator {
*/
public class DomainLocks {
// The locks are stored in a map, with the domain name as the key. This map will grow // The locks are stored in a map, with the domain name as the key. This map will grow
// relatively big, but should be manageable since the number of domains is limited to // relatively big, but should be manageable since the number of domains is limited to
// a few hundred thousand typically. // a few hundred thousand typically.
@@ -24,13 +25,25 @@ public class DomainLocks {
sem.acquire(); sem.acquire();
return new DomainLock(sem); return new LocalDomainLock(sem);
} }
public Optional<DomainLock> tryLockDomain(EdgeDomain domain) { public Optional<DomainLock> tryLockDomain(EdgeDomain domain) {
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits); var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
if (sem.tryAcquire(1)) { if (sem.tryAcquire(1)) {
return Optional.of(new DomainLock(sem)); return Optional.of(new LocalDomainLock(sem));
}
else {
// We don't have a lock, so we return an empty optional
return Optional.empty();
}
}
public Optional<DomainLock> tryLockDomain(EdgeDomain domain, Duration timeout) throws InterruptedException {
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
if (sem.tryAcquire(1, timeout.toMillis(), TimeUnit.MILLISECONDS)) {
return Optional.of(new LocalDomainLock(sem));
} }
else { else {
// We don't have a lock, so we return an empty optional // We don't have a lock, so we return an empty optional
@@ -39,24 +52,7 @@ public class DomainLocks {
} }
private Semaphore defaultPermits(String topDomain) { private Semaphore defaultPermits(String topDomain) {
if (topDomain.equals("wordpress.com")) return new Semaphore(DefaultDomainPermits.defaultPermits(topDomain));
return new Semaphore(16);
if (topDomain.equals("blogspot.com"))
return new Semaphore(8);
if (topDomain.equals("tumblr.com"))
return new Semaphore(8);
if (topDomain.equals("neocities.org"))
return new Semaphore(8);
if (topDomain.equals("github.io"))
return new Semaphore(8);
// Substack really dislikes broad-scale crawlers, so we need to be careful
// to not get blocked.
if (topDomain.equals("substack.com")) {
return new Semaphore(1);
}
return new Semaphore(2);
} }
/** Returns true if the domain is lockable, i.e. if it is not already locked by another thread. /** Returns true if the domain is lockable, i.e. if it is not already locked by another thread.
@@ -71,15 +67,15 @@ public class DomainLocks {
return sem.availablePermits() > 0; return sem.availablePermits() > 0;
} }
public static class DomainLock implements AutoCloseable { public static class LocalDomainLock implements DomainLock {
private final Semaphore semaphore; private final Semaphore semaphore;
DomainLock(Semaphore semaphore) { LocalDomainLock(Semaphore semaphore) {
this.semaphore = semaphore; this.semaphore = semaphore;
} }
@Override @Override
public void close() throws Exception { public void close() {
semaphore.release(); semaphore.release();
Thread.currentThread().setName("[idle]"); Thread.currentThread().setName("[idle]");
} }

View File

@@ -0,0 +1,116 @@
package nu.marginalia.coordination;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.service.discovery.ServiceRegistryIf;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
import org.apache.curator.framework.recipes.locks.Lease;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@Singleton
public class ZookeeperDomainCoordinator implements DomainCoordinator {
// The locks are stored in a map, with the domain name as the key. This map will grow
// relatively big, but should be manageable since the number of domains is limited to
// a few hundred thousand typically.
private final Map<String, InterProcessSemaphoreV2> locks = new ConcurrentHashMap<>();
private final Map<String, Integer> waitCounts = new ConcurrentHashMap<>();
private final ServiceRegistryIf serviceRegistry;
private final int nodeId;
@Inject
public ZookeeperDomainCoordinator(ServiceRegistryIf serviceRegistry, @Named("wmsa-system-node") int nodeId) {
// Zookeeper-specific initialization can be done here if needed
this.serviceRegistry = serviceRegistry;
this.nodeId = nodeId;
}
/** Returns a lock object corresponding to the given domain. The object is returned as-is,
* and may be held by another thread. The caller is responsible for locking and releasing the lock.
*/
public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException {
final String key = domain.topDomain.toLowerCase();
var sem = locks.computeIfAbsent(key, this::createSemapore);
// Increment or add a wait count for the domain
waitCounts.compute(key, (k,value) -> (value == null ? 1 : value + 1));
try {
return new ZkDomainLock(sem, sem.acquire());
}
catch (Exception e) {
throw new RuntimeException("Failed to acquire lock for domain: " + domain.topDomain, e);
}
finally {
// Decrement or remove the wait count for the domain
waitCounts.compute(key, (k,value) -> (value == null || value <= 1) ? null : value - 1);
}
}
public Optional<DomainLock> tryLockDomain(EdgeDomain domain) throws InterruptedException {
return tryLockDomain(domain, Duration.ofSeconds(1)); // Underlying semaphore doesn't have a tryLock method, so we use a short timeout
}
public Optional<DomainLock> tryLockDomain(EdgeDomain domain, Duration timeout) throws InterruptedException {
final String key = domain.topDomain.toLowerCase();
var sem = locks.computeIfAbsent(key, this::createSemapore);
// Increment or add a wait count for the domain
waitCounts.compute(key, (k,value) -> (value == null ? 1 : value + 1));
try {
var lease = sem.acquire(timeout.toMillis(), TimeUnit.MILLISECONDS); // Acquire with timeout
if (lease != null) {
return Optional.of(new ZkDomainLock(sem, lease));
}
else {
return Optional.empty(); // If we fail to acquire the lease, we return an empty optional
}
}
catch (Exception e) {
return Optional.empty(); // If we fail to acquire the lock, we return an empty optional
}
finally {
waitCounts.compute(key, (k,value) -> (value == null || value <= 1) ? null : value - 1);
}
}
private InterProcessSemaphoreV2 createSemapore(String topDomain){
try {
return serviceRegistry.getSemaphore(topDomain + ":" + nodeId, DefaultDomainPermits.defaultPermits(topDomain));
}
catch (Exception e) {
throw new RuntimeException("Failed to get semaphore for domain: " + topDomain, e);
}
}
/** Returns true if the domain is lockable, i.e. if it is not already locked by another thread.
* (this is just a hint, and does not guarantee that the domain is actually lockable any time
* after this method returns true)
*/
public boolean isLockableHint(EdgeDomain domain) {
return !waitCounts.containsKey(domain.topDomain.toLowerCase());
}
public static class ZkDomainLock implements DomainLock {
private final InterProcessSemaphoreV2 semaphore;
private final Lease lease;
ZkDomainLock(InterProcessSemaphoreV2 semaphore, Lease lease) {
this.semaphore = semaphore;
this.lease = lease;
}
@Override
public void close() {
semaphore.returnLease(lease);
}
}
}

View File

@@ -32,6 +32,7 @@ dependencies {
implementation project(':code:libraries:message-queue') implementation project(':code:libraries:message-queue')
implementation project(':code:libraries:language-processing') implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:easy-lsh') implementation project(':code:libraries:easy-lsh')
implementation project(':code:libraries:domain-lock')
implementation project(':code:processes:crawling-process:model') implementation project(':code:processes:crawling-process:model')
implementation project(':code:processes:crawling-process:model') implementation project(':code:processes:crawling-process:model')

View File

@@ -10,9 +10,11 @@ import nu.marginalia.WmsaHome;
import nu.marginalia.atags.model.DomainLinks; import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.atags.source.AnchorTagsSource; import nu.marginalia.atags.source.AnchorTagsSource;
import nu.marginalia.atags.source.AnchorTagsSourceFactory; import nu.marginalia.atags.source.AnchorTagsSourceFactory;
import nu.marginalia.coordination.DomainCoordinationModule;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.coordination.DomainLock;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.logic.DomainLocks;
import nu.marginalia.crawl.retreival.CrawlDataReference; import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.CrawlerRetreiver; import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.DomainProber; import nu.marginalia.crawl.retreival.DomainProber;
@@ -68,7 +70,7 @@ public class CrawlerMain extends ProcessMainClass {
private final ServiceRegistryIf serviceRegistry; private final ServiceRegistryIf serviceRegistry;
private final SimpleBlockingThreadPool pool; private final SimpleBlockingThreadPool pool;
private final DomainLocks domainLocks = new DomainLocks(); private final DomainCoordinator domainCoordinator;
private final Map<String, CrawlTask> pendingCrawlTasks = new ConcurrentHashMap<>(); private final Map<String, CrawlTask> pendingCrawlTasks = new ConcurrentHashMap<>();
@@ -97,6 +99,7 @@ public class CrawlerMain extends ProcessMainClass {
WarcArchiverFactory warcArchiverFactory, WarcArchiverFactory warcArchiverFactory,
HikariDataSource dataSource, HikariDataSource dataSource,
DomainBlacklist blacklist, DomainBlacklist blacklist,
DomainCoordinator domainCoordinator,
ServiceRegistryIf serviceRegistry, ServiceRegistryIf serviceRegistry,
Gson gson) throws InterruptedException { Gson gson) throws InterruptedException {
@@ -114,6 +117,7 @@ public class CrawlerMain extends ProcessMainClass {
this.blacklist = blacklist; this.blacklist = blacklist;
this.node = processConfiguration.node(); this.node = processConfiguration.node();
this.serviceRegistry = serviceRegistry; this.serviceRegistry = serviceRegistry;
this.domainCoordinator = domainCoordinator;
SimpleBlockingThreadPool.ThreadType threadType; SimpleBlockingThreadPool.ThreadType threadType;
if (Boolean.getBoolean("crawler.useVirtualThreads")) { if (Boolean.getBoolean("crawler.useVirtualThreads")) {
@@ -157,6 +161,7 @@ public class CrawlerMain extends ProcessMainClass {
new CrawlerModule(), new CrawlerModule(),
new ProcessConfigurationModule("crawler"), new ProcessConfigurationModule("crawler"),
new ServiceDiscoveryModule(), new ServiceDiscoveryModule(),
new DomainCoordinationModule(),
new DatabaseModule(false) new DatabaseModule(false)
); );
var crawler = injector.getInstance(CrawlerMain.class); var crawler = injector.getInstance(CrawlerMain.class);
@@ -451,7 +456,7 @@ public class CrawlerMain extends ProcessMainClass {
/** Best effort indicator whether we could start this now without getting stuck in /** Best effort indicator whether we could start this now without getting stuck in
* DomainLocks purgatory */ * DomainLocks purgatory */
public boolean canRun() { public boolean canRun() {
return domainLocks.isLockableHint(new EdgeDomain(domain)); return domainCoordinator.isLockableHint(new EdgeDomain(domain));
} }
@Override @Override
@@ -462,7 +467,7 @@ public class CrawlerMain extends ProcessMainClass {
return; return;
} }
Optional<DomainLocks.DomainLock> lock = domainLocks.tryLockDomain(new EdgeDomain(domain)); Optional<DomainLock> lock = domainCoordinator.tryLockDomain(new EdgeDomain(domain));
// We don't have a lock, so we can't run this task // We don't have a lock, so we can't run this task
// we return to avoid blocking the pool for too long // we return to avoid blocking the pool for too long
if (lock.isEmpty()) { if (lock.isEmpty()) {
@@ -470,7 +475,7 @@ public class CrawlerMain extends ProcessMainClass {
retryQueue.put(this); retryQueue.put(this);
return; return;
} }
DomainLocks.DomainLock domainLock = lock.get(); DomainLock domainLock = lock.get();
try (domainLock) { try (domainLock) {
Thread.currentThread().setName("crawling:" + domain); Thread.currentThread().setName("crawling:" + domain);

View File

@@ -36,7 +36,6 @@ import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
import org.apache.hc.core5.http.message.MessageSupport; import org.apache.hc.core5.http.message.MessageSupport;
import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.pool.PoolStats; import org.apache.hc.core5.pool.PoolStats;
import org.apache.hc.core5.ssl.SSLContextBuilder;
import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout; import org.apache.hc.core5.util.Timeout;
import org.jsoup.Jsoup; import org.jsoup.Jsoup;
@@ -49,15 +48,12 @@ import org.slf4j.MarkerFactory;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException; import javax.net.ssl.SSLException;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.IOException; import java.io.IOException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.security.KeyManagementException; import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.security.cert.X509Certificate;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.*; import java.util.*;
@@ -99,42 +95,12 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
.setValidateAfterInactivity(TimeValue.ofSeconds(5)) .setValidateAfterInactivity(TimeValue.ofSeconds(5))
.build(); .build();
// No-op up front validation of server certificates.
//
// We will validate certificates later, after the connection is established
// as we want to store the certificate chain and validation
// outcome to the database.
var trustMeBro = new X509TrustManager() {
private X509Certificate[] lastServerCertChain;
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) {
}
@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) {
this.lastServerCertChain = chain.clone();
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
public X509Certificate[] getLastServerCertChain() {
return lastServerCertChain != null ? lastServerCertChain.clone() : null;
}
};
SSLContext sslContext = SSLContextBuilder.create().build();
sslContext.init(null, new TrustManager[]{trustMeBro}, null);
connectionManager = PoolingHttpClientConnectionManagerBuilder.create() connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
.setMaxConnPerRoute(2) .setMaxConnPerRoute(2)
.setMaxConnTotal(5000) .setMaxConnTotal(5000)
.setDefaultConnectionConfig(connectionConfig) .setDefaultConnectionConfig(connectionConfig)
.setTlsSocketStrategy(new DefaultClientTlsStrategy(sslContext)) .setTlsSocketStrategy(new DefaultClientTlsStrategy(SSLContext.getDefault()))
.build(); .build();
connectionManager.setDefaultSocketConfig(SocketConfig.custom() connectionManager.setDefaultSocketConfig(SocketConfig.custom()

View File

@@ -32,6 +32,7 @@ dependencies {
implementation project(':code:index:api') implementation project(':code:index:api')
implementation project(':code:processes:process-mq-api') implementation project(':code:processes:process-mq-api')
implementation project(':code:libraries:message-queue') implementation project(':code:libraries:message-queue')
implementation project(':code:libraries:domain-lock')
implementation project(':code:libraries:language-processing') implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:easy-lsh') implementation project(':code:libraries:easy-lsh')
implementation project(':code:processes:crawling-process') implementation project(':code:processes:crawling-process')

View File

@@ -10,6 +10,8 @@ import nu.marginalia.api.feeds.FeedsClient;
import nu.marginalia.converting.ConverterModule; import nu.marginalia.converting.ConverterModule;
import nu.marginalia.converting.processor.DomainProcessor; import nu.marginalia.converting.processor.DomainProcessor;
import nu.marginalia.converting.writer.ConverterBatchWriter; import nu.marginalia.converting.writer.ConverterBatchWriter;
import nu.marginalia.coordination.DomainCoordinationModule;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.db.DbDomainQueries; import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.db.DomainBlacklist; import nu.marginalia.db.DomainBlacklist;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
@@ -58,6 +60,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
private final FileStorageService fileStorageService; private final FileStorageService fileStorageService;
private final KeywordLoaderService keywordLoaderService; private final KeywordLoaderService keywordLoaderService;
private final DocumentLoaderService documentLoaderService; private final DocumentLoaderService documentLoaderService;
private final DomainCoordinator domainCoordinator;
private final HikariDataSource dataSource; private final HikariDataSource dataSource;
@Inject @Inject
@@ -71,7 +74,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
DomainProcessor domainProcessor, DomainProcessor domainProcessor,
FileStorageService fileStorageService, FileStorageService fileStorageService,
KeywordLoaderService keywordLoaderService, KeywordLoaderService keywordLoaderService,
DocumentLoaderService documentLoaderService, HikariDataSource dataSource) DocumentLoaderService documentLoaderService, DomainCoordinator domainCoordinator, HikariDataSource dataSource)
throws Exception throws Exception
{ {
super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX); super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX);
@@ -84,6 +87,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
this.fileStorageService = fileStorageService; this.fileStorageService = fileStorageService;
this.keywordLoaderService = keywordLoaderService; this.keywordLoaderService = keywordLoaderService;
this.documentLoaderService = documentLoaderService; this.documentLoaderService = documentLoaderService;
this.domainCoordinator = domainCoordinator;
this.dataSource = dataSource; this.dataSource = dataSource;
domainBlacklist.waitUntilLoaded(); domainBlacklist.waitUntilLoaded();
@@ -107,6 +111,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
try { try {
Injector injector = Guice.createInjector( Injector injector = Guice.createInjector(
new LiveCrawlerModule(), new LiveCrawlerModule(),
new DomainCoordinationModule(), // 2 hours lease timeout is enough for the live crawler
new ProcessConfigurationModule("crawler"), new ProcessConfigurationModule("crawler"),
new ConverterModule(), new ConverterModule(),
new ServiceDiscoveryModule(), new ServiceDiscoveryModule(),
@@ -172,7 +177,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
processHeartbeat.progress(LiveCrawlState.CRAWLING); processHeartbeat.progress(LiveCrawlState.CRAWLING);
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainQueries, domainBlacklist); try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainCoordinator, domainQueries, domainBlacklist);
var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling")) var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling"))
{ {
for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) { for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) {

View File

@@ -5,8 +5,9 @@ import crawlercommons.robots.SimpleRobotRulesParser;
import nu.marginalia.WmsaHome; import nu.marginalia.WmsaHome;
import nu.marginalia.contenttype.ContentType; import nu.marginalia.contenttype.ContentType;
import nu.marginalia.contenttype.DocumentBodyToString; import nu.marginalia.contenttype.DocumentBodyToString;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.coordination.DomainLock;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.logic.DomainLocks;
import nu.marginalia.crawl.retreival.CrawlDelayTimer; import nu.marginalia.crawl.retreival.CrawlDelayTimer;
import nu.marginalia.db.DbDomainQueries; import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.db.DomainBlacklist; import nu.marginalia.db.DomainBlacklist;
@@ -46,14 +47,16 @@ public class SimpleLinkScraper implements AutoCloseable {
private final DomainBlacklist domainBlacklist; private final DomainBlacklist domainBlacklist;
private final Duration connectTimeout = Duration.ofSeconds(10); private final Duration connectTimeout = Duration.ofSeconds(10);
private final Duration readTimeout = Duration.ofSeconds(10); private final Duration readTimeout = Duration.ofSeconds(10);
private final DomainLocks domainLocks = new DomainLocks(); private final DomainCoordinator domainCoordinator;
private final static int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 10 * 1024 * 1024); private final static int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 10 * 1024 * 1024);
public SimpleLinkScraper(LiveCrawlDataSet dataSet, public SimpleLinkScraper(LiveCrawlDataSet dataSet,
DomainCoordinator domainCoordinator,
DbDomainQueries domainQueries, DbDomainQueries domainQueries,
DomainBlacklist domainBlacklist) { DomainBlacklist domainBlacklist) {
this.dataSet = dataSet; this.dataSet = dataSet;
this.domainCoordinator = domainCoordinator;
this.domainQueries = domainQueries; this.domainQueries = domainQueries;
this.domainBlacklist = domainBlacklist; this.domainBlacklist = domainBlacklist;
} }
@@ -98,7 +101,7 @@ public class SimpleLinkScraper implements AutoCloseable {
.version(HttpClient.Version.HTTP_2) .version(HttpClient.Version.HTTP_2)
.build(); .build();
// throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove: // throttle concurrent access per domain; IDE will complain it's not used, but it holds a semaphore -- do not remove:
DomainLocks.DomainLock lock = domainLocks.lockDomain(domain) DomainLock lock = domainCoordinator.lockDomain(domain)
) { ) {
SimpleRobotRules rules = fetchRobotsRules(rootUrl, client); SimpleRobotRules rules = fetchRobotsRules(rootUrl, client);

View File

@@ -1,5 +1,6 @@
package nu.marginalia.livecrawler; package nu.marginalia.livecrawler;
import nu.marginalia.coordination.LocalDomainCoordinator;
import nu.marginalia.db.DomainBlacklistImpl; import nu.marginalia.db.DomainBlacklistImpl;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
@@ -37,7 +38,7 @@ class SimpleLinkScraperTest {
@Test @Test
public void testRetrieveNow() throws Exception { public void testRetrieveNow() throws Exception {
var scraper = new SimpleLinkScraper(dataSet, null, Mockito.mock(DomainBlacklistImpl.class)); var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(), null, Mockito.mock(DomainBlacklistImpl.class));
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/")); int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));
Assertions.assertEquals(1, fetched); Assertions.assertEquals(1, fetched);
@@ -57,7 +58,7 @@ class SimpleLinkScraperTest {
@Test @Test
public void testRetrieveNow_Redundant() throws Exception { public void testRetrieveNow_Redundant() throws Exception {
dataSet.saveDocument(1, new EdgeUrl("https://www.marginalia.nu/"), "<html>", "", "127.0.0.1"); dataSet.saveDocument(1, new EdgeUrl("https://www.marginalia.nu/"), "<html>", "", "127.0.0.1");
var scraper = new SimpleLinkScraper(dataSet, null, Mockito.mock(DomainBlacklistImpl.class)); var scraper = new SimpleLinkScraper(dataSet, new LocalDomainCoordinator(),null, Mockito.mock(DomainBlacklistImpl.class));
// If the requested URL is already in the dataSet, we retrieveNow should shortcircuit and not fetch anything // If the requested URL is already in the dataSet, we retrieveNow should shortcircuit and not fetch anything
int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/")); int fetched = scraper.retrieveNow(new EdgeDomain("www.marginalia.nu"), 1, List.of("https://www.marginalia.nu/"));

View File

@@ -27,6 +27,7 @@ dependencies {
implementation project(':code:common:config') implementation project(':code:common:config')
implementation project(':code:common:service') implementation project(':code:common:service')
implementation project(':code:libraries:domain-lock')
implementation project(':code:libraries:geo-ip') implementation project(':code:libraries:geo-ip')
implementation project(':code:libraries:message-queue') implementation project(':code:libraries:message-queue')

View File

@@ -185,12 +185,12 @@ public class PingDao {
return null; return null;
} }
public List<UpdateSchedule.UpdateJob<Long, HistoricalAvailabilityData>> getDomainUpdateSchedule(int nodeId) { public List<UpdateSchedule.UpdateJob<DomainReference, HistoricalAvailabilityData>> getDomainUpdateSchedule(int nodeId) {
List<UpdateSchedule.UpdateJob<Long, HistoricalAvailabilityData>> updateJobs = new ArrayList<>(); List<UpdateSchedule.UpdateJob<DomainReference, HistoricalAvailabilityData>> updateJobs = new ArrayList<>();
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement(""" var ps = conn.prepareStatement("""
SELECT ID, NEXT_SCHEDULED_UPDATE SELECT ID, DOMAIN_NAME, NEXT_SCHEDULED_UPDATE
FROM EC_DOMAIN FROM EC_DOMAIN
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION
ON EC_DOMAIN.ID = DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID ON EC_DOMAIN.ID = DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID
@@ -200,11 +200,13 @@ public class PingDao {
ps.setInt(1, nodeId); ps.setInt(1, nodeId);
ResultSet rs = ps.executeQuery(); ResultSet rs = ps.executeQuery();
while (rs.next()) { while (rs.next()) {
long domainId = rs.getLong("ID"); int domainId = rs.getInt("ID");
String domainName = rs.getString("DOMAIN_NAME");
var ts = rs.getTimestamp("NEXT_SCHEDULED_UPDATE"); var ts = rs.getTimestamp("NEXT_SCHEDULED_UPDATE");
Instant nextUpdate = ts == null ? Instant.now() : ts.toInstant(); Instant nextUpdate = ts == null ? Instant.now() : ts.toInstant();
updateJobs.add(new UpdateSchedule.UpdateJob<>(domainId, nextUpdate)); var ref = new DomainReference(domainId, nodeId, domainName.toLowerCase());
updateJobs.add(new UpdateSchedule.UpdateJob<>(ref, nextUpdate));
} }
} catch (SQLException e) { } catch (SQLException e) {
throw new RuntimeException("Failed to retrieve domain update schedule", e); throw new RuntimeException("Failed to retrieve domain update schedule", e);
@@ -241,7 +243,7 @@ public class PingDao {
else { else {
var record = new DomainDnsRecord(rs); var record = new DomainDnsRecord(rs);
updateJobs.add(new UpdateSchedule.UpdateJob<>( updateJobs.add(new UpdateSchedule.UpdateJob<>(
new RootDomainReference.ById(dnsRootDomainId), new RootDomainReference.ByIdAndName(dnsRootDomainId, rootDomainName),
Objects.requireNonNullElseGet(record.tsNextScheduledUpdate(), Instant::now)) Objects.requireNonNullElseGet(record.tsNextScheduledUpdate(), Instant::now))
); );
} }

View File

@@ -1,6 +1,8 @@
package nu.marginalia.ping; package nu.marginalia.ping;
import com.google.inject.Inject; import com.google.inject.Inject;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.ping.model.*; import nu.marginalia.ping.model.*;
import nu.marginalia.ping.svc.DnsPingService; import nu.marginalia.ping.svc.DnsPingService;
import nu.marginalia.ping.svc.HttpPingService; import nu.marginalia.ping.svc.HttpPingService;
@@ -23,13 +25,14 @@ import java.util.concurrent.TimeUnit;
public class PingJobScheduler { public class PingJobScheduler {
private final HttpPingService httpPingService; private final HttpPingService httpPingService;
private final DnsPingService dnsPingService; private final DnsPingService dnsPingService;
private final DomainCoordinator domainCoordinator;
private final PingDao pingDao; private final PingDao pingDao;
private static final Logger logger = LoggerFactory.getLogger(PingJobScheduler.class); private static final Logger logger = LoggerFactory.getLogger(PingJobScheduler.class);
private static final UpdateSchedule<RootDomainReference, RootDomainReference> dnsUpdateSchedule private static final UpdateSchedule<RootDomainReference, RootDomainReference> dnsUpdateSchedule
= new UpdateSchedule<>(250_000); = new UpdateSchedule<>(250_000);
private static final UpdateSchedule<Long, HistoricalAvailabilityData> availabilityUpdateSchedule private static final UpdateSchedule<DomainReference, HistoricalAvailabilityData> availabilityUpdateSchedule
= new UpdateSchedule<>(250_000); = new UpdateSchedule<>(250_000);
public volatile Instant dnsLastSync = Instant.now(); public volatile Instant dnsLastSync = Instant.now();
@@ -43,14 +46,16 @@ public class PingJobScheduler {
@Inject @Inject
public PingJobScheduler(HttpPingService httpPingService, public PingJobScheduler(HttpPingService httpPingService,
DnsPingService dnsPingService, DnsPingService dnsPingService,
DomainCoordinator domainCoordinator,
PingDao pingDao) PingDao pingDao)
{ {
this.httpPingService = httpPingService; this.httpPingService = httpPingService;
this.dnsPingService = dnsPingService; this.dnsPingService = dnsPingService;
this.domainCoordinator = domainCoordinator;
this.pingDao = pingDao; this.pingDao = pingDao;
} }
public synchronized void start(boolean startPaused) { public synchronized void start() {
if (running) if (running)
return; return;
@@ -100,7 +105,7 @@ public class PingJobScheduler {
logger.info("PingJobScheduler paused"); logger.info("PingJobScheduler paused");
} }
public synchronized void resume(int nodeId) { public synchronized void enableForNode(int nodeId) {
logger.info("Resuming PingJobScheduler for nodeId: {}", nodeId); logger.info("Resuming PingJobScheduler for nodeId: {}", nodeId);
if (this.nodeId != null) { if (this.nodeId != null) {
logger.warn("Attempted to resume PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId); logger.warn("Attempted to resume PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId);
@@ -134,7 +139,15 @@ public class PingJobScheduler {
continue; continue;
} }
long nextId = availabilityUpdateSchedule.next(); DomainReference ref = availabilityUpdateSchedule.nextIf(domain -> {
EdgeDomain domainObj = new EdgeDomain(domain.domainName());
if (!domainCoordinator.isLockableHint(domainObj)) {
return false; // Skip locked domains
}
return true; // Process this domain
});
long nextId = ref.domainId();
var data = pingDao.getHistoricalAvailabilityData(nextId); var data = pingDao.getHistoricalAvailabilityData(nextId);
if (data == null) { if (data == null) {
logger.warn("No availability data found for ID: {}", nextId); logger.warn("No availability data found for ID: {}", nextId);
@@ -159,7 +172,7 @@ public class PingJobScheduler {
for (var object : objects) { for (var object : objects) {
var ts = object.nextUpdateTime(); var ts = object.nextUpdateTime();
if (ts != null) { if (ts != null) {
availabilityUpdateSchedule.add(nextId, ts); availabilityUpdateSchedule.add(ref, ts);
break; break;
} }
} }
@@ -190,7 +203,7 @@ public class PingJobScheduler {
try { try {
List<WritableModel> objects = switch(ref) { List<WritableModel> objects = switch(ref) {
case RootDomainReference.ById(long id) -> { case RootDomainReference.ByIdAndName(long id, String name) -> {
var oldRecord = Objects.requireNonNull(pingDao.getDomainDnsRecord(id)); var oldRecord = Objects.requireNonNull(pingDao.getDomainDnsRecord(id));
yield dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord); yield dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord);
} }

View File

@@ -5,15 +5,14 @@ import com.google.inject.Guice;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Injector; import com.google.inject.Injector;
import nu.marginalia.WmsaHome; import nu.marginalia.WmsaHome;
import nu.marginalia.coordination.DomainCoordinationModule;
import nu.marginalia.geoip.GeoIpDictionary; import nu.marginalia.geoip.GeoIpDictionary;
import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqapi.ping.PingRequest; import nu.marginalia.mqapi.ping.PingRequest;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.process.ProcessConfiguration; import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule; import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass; import nu.marginalia.process.ProcessMainClass;
import nu.marginalia.service.discovery.ServiceRegistryIf;
import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.module.ServiceDiscoveryModule; import nu.marginalia.service.module.ServiceDiscoveryModule;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -25,8 +24,6 @@ public class PingMain extends ProcessMainClass {
private static final Logger log = LoggerFactory.getLogger(PingMain.class); private static final Logger log = LoggerFactory.getLogger(PingMain.class);
private final PingJobScheduler pingJobScheduler; private final PingJobScheduler pingJobScheduler;
private final ServiceRegistryIf serviceRegistry;
private final NodeConfigurationService nodeConfigurationService;
private final int node; private final int node;
private static final Logger logger = LoggerFactory.getLogger(PingMain.class); private static final Logger logger = LoggerFactory.getLogger(PingMain.class);
@@ -36,15 +33,11 @@ public class PingMain extends ProcessMainClass {
ProcessConfiguration config, ProcessConfiguration config,
Gson gson, Gson gson,
PingJobScheduler pingJobScheduler, PingJobScheduler pingJobScheduler,
ServiceRegistryIf serviceRegistry,
NodeConfigurationService nodeConfigurationService,
ProcessConfiguration processConfiguration ProcessConfiguration processConfiguration
) { ) {
super(messageQueueFactory, config, gson, ProcessInboxNames.PING_INBOX); super(messageQueueFactory, config, gson, ProcessInboxNames.PING_INBOX);
this.pingJobScheduler = pingJobScheduler; this.pingJobScheduler = pingJobScheduler;
this.serviceRegistry = serviceRegistry;
this.nodeConfigurationService = nodeConfigurationService;
this.node = processConfiguration.node(); this.node = processConfiguration.node();
} }
@@ -52,7 +45,8 @@ public class PingMain extends ProcessMainClass {
log.info("Starting PingMain..."); log.info("Starting PingMain...");
// Start the ping job scheduler // Start the ping job scheduler
pingJobScheduler.start(true); pingJobScheduler.start();
pingJobScheduler.enableForNode(node);
log.info("PingMain started successfully."); log.info("PingMain started successfully.");
} }
@@ -79,6 +73,7 @@ public class PingMain extends ProcessMainClass {
Injector injector = Guice.createInjector( Injector injector = Guice.createInjector(
new PingModule(), new PingModule(),
new ServiceDiscoveryModule(), new ServiceDiscoveryModule(),
new DomainCoordinationModule(),
new ProcessConfigurationModule("ping"), new ProcessConfigurationModule("ping"),
new DatabaseModule(false) new DatabaseModule(false)
); );
@@ -93,7 +88,10 @@ public class PingMain extends ProcessMainClass {
try { try {
main.runPrimary(); main.runPrimary();
for(;;) main.wait(); // Wait on the object lock to avoid busy-looping for(;;)
synchronized (main) { // Wait on the object lock to avoid busy-looping
main.wait();
}
} }
catch (Throwable ex) { catch (Throwable ex) {
logger.error("Error running ping process", ex); logger.error("Error running ping process", ex);

View File

@@ -2,9 +2,8 @@ package nu.marginalia.ping;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.Collection; import java.util.*;
import java.util.Comparator; import java.util.function.Predicate;
import java.util.PriorityQueue;
/** In-memory schedule for updates, allowing jobs to be added and processed in order of their scheduled time. /** In-memory schedule for updates, allowing jobs to be added and processed in order of their scheduled time.
* This is not a particularly high-performance implementation, but exists to take contention off the database's * This is not a particularly high-performance implementation, but exists to take contention off the database's
@@ -23,6 +22,9 @@ public class UpdateSchedule<T, T2> {
notifyAll(); notifyAll();
} }
/** Returns the next job in the queue that is due to be processed.
* If no jobs are due, it will block until a job is added or a job becomes due.
* */
public synchronized T next() throws InterruptedException { public synchronized T next() throws InterruptedException {
while (true) { while (true) {
if (updateQueue.isEmpty()) { if (updateQueue.isEmpty()) {
@@ -44,6 +46,56 @@ public class UpdateSchedule<T, T2> {
} }
} }
/** Returns the first job in the queue matching the predicate that is not scheduled into the future,
* blocking until a job is added or a job becomes due.
*/
public synchronized T nextIf(Predicate<T> predicate) throws InterruptedException {
List<UpdateJob<T, T2>> rejectedJobs = new ArrayList<>();
try {
while (true) {
if (updateQueue.isEmpty()) {
wait(); // Wait for a new job to be added
continue;
}
UpdateJob<T, T2> job = updateQueue.peek();
Instant now = Instant.now();
if (job.updateTime.isAfter(now)) {
Duration toWait = Duration.between(now, job.updateTime);
// Return the rejected jobs to the queue for other threads to process
updateQueue.addAll(rejectedJobs);
if (!rejectedJobs.isEmpty())
notifyAll();
rejectedJobs.clear();
wait(Math.max(1, toWait.toMillis()));
} else {
var candidate = updateQueue.poll(); // Remove the job from the queue since it's due
assert candidate != null : "Update job should not be null at this point, since we just peeked it in a synchronized block";
if (!predicate.test(candidate.key())) {
rejectedJobs.add(candidate);
}
else {
return candidate.key();
}
}
}
}
finally {
// Return the rejected jobs to the queue for other threads to process
updateQueue.addAll(rejectedJobs);
if (!rejectedJobs.isEmpty())
notifyAll();
}
}
public synchronized void clear() { public synchronized void clear() {
updateQueue.clear(); updateQueue.clear();
notifyAll(); notifyAll();

View File

@@ -4,12 +4,14 @@ import com.google.inject.Inject;
import nu.marginalia.UserAgent; import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome; import nu.marginalia.WmsaHome;
import nu.marginalia.ping.fetcher.response.*; import nu.marginalia.ping.fetcher.response.*;
import org.apache.hc.client5.http.HttpHostConnectException;
import org.apache.hc.client5.http.classic.HttpClient; import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.io.entity.EntityUtils; import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
import javax.net.ssl.SSLHandshakeException;
import java.io.IOException; import java.io.IOException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.time.Duration; import java.time.Duration;
@@ -82,9 +84,12 @@ public class PingHttpFetcher {
}); });
} catch (SocketTimeoutException ex) { } catch (SocketTimeoutException ex) {
return new TimeoutResponse(ex.getMessage()); return new TimeoutResponse(ex.getMessage());
} catch (IOException e) { } catch (HttpHostConnectException | SSLHandshakeException e) {
return new ConnectionError(e.getClass().getSimpleName()); return new ConnectionError(e.getClass().getSimpleName());
} catch (IOException e) {
return new ProtocolError(e.getClass().getSimpleName());
} }
} }
} }

View File

@@ -18,13 +18,18 @@ import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.io.SocketConfig; import org.apache.hc.core5.http.io.SocketConfig;
import org.apache.hc.core5.http.message.MessageSupport; import org.apache.hc.core5.http.message.MessageSupport;
import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.ssl.SSLContextBuilder;
import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout; import org.apache.hc.core5.util.Timeout;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.security.cert.X509Certificate;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@@ -37,24 +42,55 @@ public class HttpClientProvider implements Provider<HttpClient> {
static { static {
try { try {
client = createClient(); client = createClient();
} catch (NoSuchAlgorithmException e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
private static CloseableHttpClient createClient() throws NoSuchAlgorithmException { private static CloseableHttpClient createClient() throws NoSuchAlgorithmException, KeyManagementException {
final ConnectionConfig connectionConfig = ConnectionConfig.custom() final ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setSocketTimeout(15, TimeUnit.SECONDS) .setSocketTimeout(15, TimeUnit.SECONDS)
.setConnectTimeout(15, TimeUnit.SECONDS) .setConnectTimeout(15, TimeUnit.SECONDS)
.setValidateAfterInactivity(TimeValue.ofSeconds(5)) .setValidateAfterInactivity(TimeValue.ofSeconds(5))
.build(); .build();
// No-op up front validation of server certificates.
//
// We will validate certificates later, after the connection is established
// as we want to store the certificate chain and validation
// outcome to the database.
var trustMeBro = new X509TrustManager() {
private X509Certificate[] lastServerCertChain;
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) {
}
@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) {
this.lastServerCertChain = chain.clone();
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
public X509Certificate[] getLastServerCertChain() {
return lastServerCertChain != null ? lastServerCertChain.clone() : null;
}
};
SSLContext sslContext = SSLContextBuilder.create().build();
sslContext.init(null, new TrustManager[]{trustMeBro}, null);
connectionManager = PoolingHttpClientConnectionManagerBuilder.create() connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
.setMaxConnPerRoute(2) .setMaxConnPerRoute(2)
.setMaxConnTotal(50) .setMaxConnTotal(50)
.setDefaultConnectionConfig(connectionConfig) .setDefaultConnectionConfig(connectionConfig)
.setTlsSocketStrategy( .setTlsSocketStrategy(
new DefaultClientTlsStrategy(SSLContext.getDefault(), NoopHostnameVerifier.INSTANCE)) new DefaultClientTlsStrategy(sslContext, NoopHostnameVerifier.INSTANCE))
.build(); .build();
connectionManager.setDefaultSocketConfig(SocketConfig.custom() connectionManager.setDefaultSocketConfig(SocketConfig.custom()

View File

@@ -1,5 +1,6 @@
package nu.marginalia.ping.io; package nu.marginalia.ping.io;
import org.apache.hc.client5.http.HttpHostConnectException;
import org.apache.hc.client5.http.HttpRequestRetryStrategy; import org.apache.hc.client5.http.HttpRequestRetryStrategy;
import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.HttpResponse;
@@ -22,6 +23,7 @@ public class RetryStrategy implements HttpRequestRetryStrategy {
case SocketTimeoutException ste -> false; case SocketTimeoutException ste -> false;
case SSLException ssle -> false; case SSLException ssle -> false;
case UnknownHostException uhe -> false; case UnknownHostException uhe -> false;
case HttpHostConnectException ex -> executionCount <= 2; // Only retry once for connection errors
default -> executionCount <= 3; default -> executionCount <= 3;
}; };
} }
@@ -50,7 +52,12 @@ public class RetryStrategy implements HttpRequestRetryStrategy {
if (statusCode == 429) { if (statusCode == 429) {
// get the Retry-After header // get the Retry-After header
String retryAfter = response.getFirstHeader("Retry-After").getValue(); var retryAfterHeader = response.getFirstHeader("Retry-After");
if (retryAfterHeader == null) {
return TimeValue.ofSeconds(3);
}
String retryAfter = retryAfterHeader.getValue();
if (retryAfter == null) { if (retryAfter == null) {
return TimeValue.ofSeconds(2); return TimeValue.ofSeconds(2);
} }

View File

@@ -1,5 +1,7 @@
package nu.marginalia.ping.model; package nu.marginalia.ping.model;
import org.apache.commons.lang3.StringUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.sql.Connection; import java.sql.Connection;
import java.sql.ResultSet; import java.sql.ResultSet;
@@ -154,7 +156,7 @@ implements WritableModel
ps.setNull(12, java.sql.Types.SMALLINT); ps.setNull(12, java.sql.Types.SMALLINT);
} }
else { else {
ps.setShort(12, (short) httpResponseTime().toMillis()); ps.setInt(12, Math.clamp(httpResponseTime().toMillis(), 0, 0xFFFF)); // "unsigned short" in SQL
} }
if (errorClassification() == null) { if (errorClassification() == null) {
@@ -279,7 +281,7 @@ implements WritableModel
} }
public Builder httpLocation(String httpLocation) { public Builder httpLocation(String httpLocation) {
this.httpLocation = httpLocation; this.httpLocation = StringUtils.abbreviate(httpLocation, "...",255);
return this; return this;
} }

View File

@@ -1,3 +1,10 @@
package nu.marginalia.ping.model; package nu.marginalia.ping.model;
public record DomainReference(int domainId, int nodeId, String domainName) { } import nu.marginalia.model.EdgeDomain;
public record DomainReference(int domainId, int nodeId, String domainName) {
public EdgeDomain asEdgeDomain() {
return new EdgeDomain(domainName);
}
}

View File

@@ -18,6 +18,7 @@ public record DomainSecurityEvent(
boolean certificatePublicKeyChanged, boolean certificatePublicKeyChanged,
boolean certificateSerialNumberChanged, boolean certificateSerialNumberChanged,
boolean certificateIssuerChanged, boolean certificateIssuerChanged,
SchemaChange schemaChange,
Duration oldCertificateTimeToExpiry, Duration oldCertificateTimeToExpiry,
boolean securityHeadersChanged, boolean securityHeadersChanged,
boolean ipChanged, boolean ipChanged,
@@ -45,8 +46,9 @@ public record DomainSecurityEvent(
security_signature_before, security_signature_before,
security_signature_after, security_signature_after,
change_certificate_serial_number, change_certificate_serial_number,
change_certificate_issuer change_certificate_issuer,
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) change_schema
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""")) """))
{ {
@@ -81,6 +83,7 @@ public record DomainSecurityEvent(
ps.setBoolean(15, certificateSerialNumberChanged()); ps.setBoolean(15, certificateSerialNumberChanged());
ps.setBoolean(16, certificateIssuerChanged()); ps.setBoolean(16, certificateIssuerChanged());
ps.setString(17, schemaChange.name());
ps.executeUpdate(); ps.executeUpdate();
} }

View File

@@ -1,6 +1,6 @@
package nu.marginalia.ping.model; package nu.marginalia.ping.model;
public sealed interface RootDomainReference { public sealed interface RootDomainReference {
record ById(long id) implements RootDomainReference { } record ByIdAndName(long id, String name) implements RootDomainReference { }
record ByName(String name) implements RootDomainReference { } record ByName(String name) implements RootDomainReference { }
} }

View File

@@ -0,0 +1,12 @@
package nu.marginalia.ping.model;
public enum SchemaChange {
UNKNOWN,
NONE,
HTTP_TO_HTTPS,
HTTPS_TO_HTTP;
public boolean isSignificant() {
return this != NONE && this != UNKNOWN;
}
}

View File

@@ -2,6 +2,9 @@ package nu.marginalia.ping.model.comparison;
import nu.marginalia.ping.model.DomainAvailabilityRecord; import nu.marginalia.ping.model.DomainAvailabilityRecord;
import nu.marginalia.ping.model.DomainSecurityRecord; import nu.marginalia.ping.model.DomainSecurityRecord;
import nu.marginalia.ping.model.HttpSchema;
import nu.marginalia.ping.model.SchemaChange;
import org.jetbrains.annotations.NotNull;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
@@ -20,7 +23,8 @@ public record SecurityInformationChange(
Duration oldCertificateTimeToExpiry, Duration oldCertificateTimeToExpiry,
boolean isSecurityHeadersChanged, boolean isSecurityHeadersChanged,
boolean isIpAddressChanged, boolean isIpAddressChanged,
boolean isSoftwareHeaderChanged boolean isSoftwareHeaderChanged,
SchemaChange schemaChange
) { ) {
public static SecurityInformationChange between( public static SecurityInformationChange between(
DomainSecurityRecord before, DomainAvailabilityRecord availabilityBefore, DomainSecurityRecord before, DomainAvailabilityRecord availabilityBefore,
@@ -43,9 +47,10 @@ public record SecurityInformationChange(
); );
boolean securityHeadersChanged = before.securityHeadersHash() != after.securityHeadersHash(); boolean securityHeadersChanged = before.securityHeadersHash() != after.securityHeadersHash();
boolean softwareChanged = !Objects.equals(before.headerServer(), after.headerServer()); boolean softwareChanged = !Objects.equals(before.headerServer(), after.headerServer());
SchemaChange schemaChange = getSchemaChange(before, after);
// Note we don't include IP address changes in the overall change status, // Note we don't include IP address changes in the overall change status,
// as this is not alone considered a change in security information; we may have // as this is not alone considered a change in security information; we may have
// multiple IP addresses for a domain, and the IP address may change frequently // multiple IP addresses for a domain, and the IP address may change frequently
@@ -55,7 +60,8 @@ public record SecurityInformationChange(
|| certificateFingerprintChanged || certificateFingerprintChanged
|| securityHeadersChanged || securityHeadersChanged
|| certificateProfileChanged || certificateProfileChanged
|| softwareChanged; || softwareChanged
|| schemaChange.isSignificant();
return new SecurityInformationChange( return new SecurityInformationChange(
isChanged, isChanged,
@@ -69,9 +75,36 @@ public record SecurityInformationChange(
oldCertificateTimeToExpiry, oldCertificateTimeToExpiry,
securityHeadersChanged, securityHeadersChanged,
ipChanged, ipChanged,
softwareChanged softwareChanged,
schemaChange
); );
} }
private static @NotNull SchemaChange getSchemaChange(DomainSecurityRecord before, DomainSecurityRecord after) {
if (before.httpSchema() == null || after.httpSchema() == null) {
return SchemaChange.UNKNOWN;
}
boolean beforeIsHttp = before.httpSchema() == HttpSchema.HTTP;
boolean afterIsHttp = after.httpSchema() == HttpSchema.HTTP;
boolean beforeIsHttps = before.httpSchema() == HttpSchema.HTTPS;
boolean afterIsHttps = after.httpSchema() == HttpSchema.HTTPS;
SchemaChange schemaChange;
if (beforeIsHttp && afterIsHttp) {
schemaChange = SchemaChange.NONE;
} else if (beforeIsHttps && afterIsHttps) {
schemaChange = SchemaChange.NONE;
} else if (beforeIsHttp && afterIsHttps) {
schemaChange = SchemaChange.HTTP_TO_HTTPS;
} else if (beforeIsHttps && afterIsHttp) {
schemaChange = SchemaChange.HTTPS_TO_HTTP;
} else {
schemaChange = SchemaChange.UNKNOWN;
}
return schemaChange;
}
} }

View File

@@ -96,6 +96,7 @@ public class DomainAvailabilityInformationFactory {
.serverIp(address != null ? address.getAddress() : null) .serverIp(address != null ? address.getAddress() : null)
.serverIpAsn(getAsn(address)) .serverIpAsn(getAsn(address))
.httpSchema(HttpSchema.HTTP) .httpSchema(HttpSchema.HTTP)
.httpLocation(rsp.headers().getFirst("Location"))
.httpStatus(rsp.httpStatus()) .httpStatus(rsp.httpStatus())
.errorClassification(errorClassification) .errorClassification(errorClassification)
.httpResponseTime(rsp.httpResponseTime()) .httpResponseTime(rsp.httpResponseTime())
@@ -164,6 +165,7 @@ public class DomainAvailabilityInformationFactory {
.serverIp(address != null ? address.getAddress() : null) .serverIp(address != null ? address.getAddress() : null)
.serverIpAsn(getAsn(address)) .serverIpAsn(getAsn(address))
.httpSchema(HttpSchema.HTTPS) .httpSchema(HttpSchema.HTTPS)
.httpLocation(rsp.headers().getFirst("Location"))
.httpStatus(rsp.httpStatus()) .httpStatus(rsp.httpStatus())
.errorClassification(errorClassification) .errorClassification(errorClassification)
.httpResponseTime(rsp.httpResponseTime()) // Placeholder, actual timing not implemented .httpResponseTime(rsp.httpResponseTime()) // Placeholder, actual timing not implemented

View File

@@ -2,6 +2,7 @@ package nu.marginalia.ping.svc;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.ping.fetcher.PingHttpFetcher; import nu.marginalia.ping.fetcher.PingHttpFetcher;
import nu.marginalia.ping.fetcher.response.*; import nu.marginalia.ping.fetcher.response.*;
import nu.marginalia.ping.model.*; import nu.marginalia.ping.model.*;
@@ -26,6 +27,7 @@ import java.util.List;
@Singleton @Singleton
public class HttpPingService { public class HttpPingService {
private final DomainCoordinator domainCoordinator;
private final PingHttpFetcher pingHttpFetcher; private final PingHttpFetcher pingHttpFetcher;
private final DomainAvailabilityInformationFactory domainAvailabilityInformationFactory; private final DomainAvailabilityInformationFactory domainAvailabilityInformationFactory;
@@ -36,9 +38,11 @@ public class HttpPingService {
@Inject @Inject
public HttpPingService( public HttpPingService(
DomainCoordinator domainCoordinator,
PingHttpFetcher pingHttpFetcher, PingHttpFetcher pingHttpFetcher,
DomainAvailabilityInformationFactory domainAvailabilityInformationFactory, DomainAvailabilityInformationFactory domainAvailabilityInformationFactory,
DomainSecurityInformationFactory domainSecurityInformationFactory) throws Exception { DomainSecurityInformationFactory domainSecurityInformationFactory) throws Exception {
this.domainCoordinator = domainCoordinator;
this.pingHttpFetcher = pingHttpFetcher; this.pingHttpFetcher = pingHttpFetcher;
this.domainAvailabilityInformationFactory = domainAvailabilityInformationFactory; this.domainAvailabilityInformationFactory = domainAvailabilityInformationFactory;
this.domainSecurityInformationFactory = domainSecurityInformationFactory; this.domainSecurityInformationFactory = domainSecurityInformationFactory;
@@ -59,7 +63,8 @@ public class HttpPingService {
public List<WritableModel> pingDomain(DomainReference domainReference, public List<WritableModel> pingDomain(DomainReference domainReference,
@Nullable DomainAvailabilityRecord oldPingStatus, @Nullable DomainAvailabilityRecord oldPingStatus,
@Nullable DomainSecurityRecord oldSecurityInformation) throws SQLException { @Nullable DomainSecurityRecord oldSecurityInformation) throws SQLException, InterruptedException {
// First we figure out if the domain maps to an IP address // First we figure out if the domain maps to an IP address
List<WritableModel> generatedRecords = new ArrayList<>(); List<WritableModel> generatedRecords = new ArrayList<>();
@@ -69,26 +74,31 @@ public class HttpPingService {
if (ipAddress.isEmpty()) { if (ipAddress.isEmpty()) {
result = new UnknownHostError(); result = new UnknownHostError();
} } else {
else { // lock the domain to prevent concurrent pings
String url = "https://" + domainReference.domainName() + "/"; try (var _ = domainCoordinator.lockDomain(domainReference.asEdgeDomain())) {
String alternateUrl = "http://" + domainReference.domainName() + "/"; String url = "https://" + domainReference.domainName() + "/";
String alternateUrl = "http://" + domainReference.domainName() + "/";
result = pingHttpFetcher.fetchUrl(url, Method.HEAD, null, null); result = pingHttpFetcher.fetchUrl(url, Method.HEAD, null, null);
if (result instanceof HttpsResponse response && shouldTryGET(response.httpStatus())) { if (result instanceof HttpsResponse response && shouldTryGET(response.httpStatus())) {
sleep(Duration.ofSeconds(2));
result = pingHttpFetcher.fetchUrl(url, Method.GET, null, null);
}
else if (result instanceof ConnectionError) {
var result2 = pingHttpFetcher.fetchUrl(alternateUrl, Method.HEAD, null, null);
if (!(result2 instanceof ConnectionError)) {
result = result2;
}
if (result instanceof HttpResponse response && shouldTryGET(response.httpStatus())) {
sleep(Duration.ofSeconds(2)); sleep(Duration.ofSeconds(2));
result = pingHttpFetcher.fetchUrl(alternateUrl, Method.GET, null, null); result = pingHttpFetcher.fetchUrl(url, Method.GET, null, null);
} else if (result instanceof ConnectionError) {
var result2 = pingHttpFetcher.fetchUrl(alternateUrl, Method.HEAD, null, null);
if (!(result2 instanceof ConnectionError)) {
result = result2;
}
if (result instanceof HttpResponse response && shouldTryGET(response.httpStatus())) {
sleep(Duration.ofSeconds(2));
result = pingHttpFetcher.fetchUrl(alternateUrl, Method.GET, null, null);
}
} }
// Add a grace sleep before we yield the semaphore, so that another thread doesn't
// immediately hammer the same domain after it's released.
sleep(Duration.ofSeconds(1));
} }
} }
@@ -135,7 +145,7 @@ public class HttpPingService {
domainReference.nodeId(), domainReference.nodeId(),
oldPingStatus, oldPingStatus,
ErrorClassification.HTTP_CLIENT_ERROR, ErrorClassification.HTTP_CLIENT_ERROR,
null); rsp.errorMessage());
newSecurityInformation = null; newSecurityInformation = null;
} }
case HttpResponse httpResponse -> { case HttpResponse httpResponse -> {
@@ -186,8 +196,8 @@ public class HttpPingService {
} }
if (oldSecurityInformation != null && newSecurityInformation != null) { if (oldSecurityInformation != null && newSecurityInformation != null) {
compareSecurityInformation(generatedRecords, compareSecurityInformation(generatedRecords,
oldSecurityInformation, oldPingStatus, oldSecurityInformation, oldPingStatus,
newSecurityInformation, newPingStatus); newSecurityInformation, newPingStatus);
} }
return generatedRecords; return generatedRecords;
@@ -286,6 +296,7 @@ public class HttpPingService {
change.isCertificatePublicKeyChanged(), change.isCertificatePublicKeyChanged(),
change.isCertificateSerialNumberChanged(), change.isCertificateSerialNumberChanged(),
change.isCertificateIssuerChanged(), change.isCertificateIssuerChanged(),
change.schemaChange(),
change.oldCertificateTimeToExpiry(), change.oldCertificateTimeToExpiry(),
change.isSecurityHeadersChanged(), change.isSecurityHeadersChanged(),
change.isIpAddressChanged(), change.isIpAddressChanged(),

View File

@@ -2,6 +2,7 @@ package nu.marginalia.ping;
import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.coordination.LocalDomainCoordinator;
import nu.marginalia.geoip.GeoIpDictionary; import nu.marginalia.geoip.GeoIpDictionary;
import nu.marginalia.ping.fetcher.PingDnsFetcher; import nu.marginalia.ping.fetcher.PingDnsFetcher;
import nu.marginalia.ping.fetcher.PingHttpFetcher; import nu.marginalia.ping.fetcher.PingHttpFetcher;
@@ -85,11 +86,14 @@ class AvailabilityJobSchedulerTest {
DomainDnsInformationFactory dnsDomainInformationFactory = new DomainDnsInformationFactory(processConfig, pic); DomainDnsInformationFactory dnsDomainInformationFactory = new DomainDnsInformationFactory(processConfig, pic);
PingJobScheduler pingJobScheduler = new PingJobScheduler( PingJobScheduler pingJobScheduler = new PingJobScheduler(
new HttpPingService(pingHttpFetcher, new HttpPingService(
new LocalDomainCoordinator(),
pingHttpFetcher,
new DomainAvailabilityInformationFactory(new GeoIpDictionary(), new BackoffStrategy(pic)), new DomainAvailabilityInformationFactory(new GeoIpDictionary(), new BackoffStrategy(pic)),
new DomainSecurityInformationFactory()), new DomainSecurityInformationFactory()),
new DnsPingService(new PingDnsFetcher(List.of("8.8.8.8", "8.8.4.4")), new DnsPingService(new PingDnsFetcher(List.of("8.8.8.8", "8.8.4.4")),
dnsDomainInformationFactory), dnsDomainInformationFactory),
new LocalDomainCoordinator(),
pingDao pingDao
); );

View File

@@ -320,6 +320,7 @@ class PingDaoTest {
true, true,
true, true,
false, false,
SchemaChange.NONE,
Duration.ofDays(30), Duration.ofDays(30),
false, false,
false, false,

View File

@@ -2,6 +2,7 @@ package nu.marginalia.ping;
import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.coordination.LocalDomainCoordinator;
import nu.marginalia.geoip.GeoIpDictionary; import nu.marginalia.geoip.GeoIpDictionary;
import nu.marginalia.ping.fetcher.PingHttpFetcher; import nu.marginalia.ping.fetcher.PingHttpFetcher;
import nu.marginalia.ping.io.HttpClientProvider; import nu.marginalia.ping.io.HttpClientProvider;
@@ -63,6 +64,7 @@ class PingHttpServiceTest {
public void testGetSslInfo() throws Exception { public void testGetSslInfo() throws Exception {
var provider = new HttpClientProvider(); var provider = new HttpClientProvider();
var pingService = new HttpPingService( var pingService = new HttpPingService(
new LocalDomainCoordinator(),
new PingHttpFetcher(provider.get()), new PingHttpFetcher(provider.get()),
new DomainAvailabilityInformationFactory(new GeoIpDictionary(), new DomainAvailabilityInformationFactory(new GeoIpDictionary(),
new BackoffStrategy(PingModule.createPingIntervalsConfiguration()) new BackoffStrategy(PingModule.createPingIntervalsConfiguration())

View File

@@ -37,6 +37,7 @@ dependencies {
implementation project(':code:functions:domain-info') implementation project(':code:functions:domain-info')
implementation project(':code:functions:domain-info:api') implementation project(':code:functions:domain-info:api')
implementation project(':code:libraries:domain-lock')
implementation project(':code:libraries:geo-ip') implementation project(':code:libraries:geo-ip')
implementation project(':code:libraries:language-processing') implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:term-frequency-dict') implementation project(':code:libraries:term-frequency-dict')

View File

@@ -5,6 +5,7 @@ import com.google.inject.Inject;
import com.google.inject.Injector; import com.google.inject.Injector;
import io.jooby.ExecutionMode; import io.jooby.ExecutionMode;
import io.jooby.Jooby; import io.jooby.Jooby;
import nu.marginalia.coordination.DomainCoordinationModule;
import nu.marginalia.livecapture.LivecaptureModule; import nu.marginalia.livecapture.LivecaptureModule;
import nu.marginalia.service.MainClass; import nu.marginalia.service.MainClass;
import nu.marginalia.service.ServiceId; import nu.marginalia.service.ServiceId;
@@ -29,6 +30,7 @@ public class AssistantMain extends MainClass {
Injector injector = Guice.createInjector( Injector injector = Guice.createInjector(
new AssistantModule(), new AssistantModule(),
new LivecaptureModule(), new LivecaptureModule(),
new DomainCoordinationModule(),
new ServiceConfigurationModule(ServiceId.Assistant), new ServiceConfigurationModule(ServiceId.Assistant),
new ServiceDiscoveryModule(), new ServiceDiscoveryModule(),
new DatabaseModule(false) new DatabaseModule(false)

View File

@@ -55,6 +55,7 @@ include 'code:libraries:braille-block-punch-cards'
include 'code:libraries:language-processing' include 'code:libraries:language-processing'
include 'code:libraries:term-frequency-dict' include 'code:libraries:term-frequency-dict'
include 'code:libraries:test-helpers' include 'code:libraries:test-helpers'
include 'code:libraries:domain-lock'
include 'code:libraries:message-queue' include 'code:libraries:message-queue'