1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

2 Commits

Author SHA1 Message Date
Viktor Lofgren
416059adde (ping) Avoid thread starvation scenario in job scheduling
Adjust the queueing strategy to avoid thread starvation from whale domains with many subdomains all locking on the same semaphore and gunking up all threads by implementing a mechanism that returns jobs that can't be executed to the queue.

This will lead to some queue churn, but it should be fairly manageable given the small number of threads involved, and the fairly long job execution times.
2025-06-15 11:04:34 +02:00
Viktor Lofgren
db7930016a (coordination) Trial the use of zookeeper for coordinating semaphores across multiple crawler-like processes
+ fix two broken tests
2025-06-14 16:20:01 +02:00
8 changed files with 106 additions and 22 deletions

View File

@@ -1,5 +1,6 @@
package nu.marginalia.coordination; package nu.marginalia.coordination;
import com.google.inject.Singleton;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import java.time.Duration; import java.time.Duration;
@@ -9,6 +10,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Singleton
public class LocalDomainCoordinator implements DomainCoordinator { public class LocalDomainCoordinator implements DomainCoordinator {
// The locks are stored in a map, with the domain name as the key. This map will grow // The locks are stored in a map, with the domain name as the key. This map will grow
// relatively big, but should be manageable since the number of domains is limited to // relatively big, but should be manageable since the number of domains is limited to

View File

@@ -1,6 +1,7 @@
package nu.marginalia.coordination; package nu.marginalia.coordination;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named; import com.google.inject.name.Named;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.service.discovery.ServiceRegistryIf; import nu.marginalia.service.discovery.ServiceRegistryIf;
@@ -13,11 +14,14 @@ import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Singleton
public class ZookeeperDomainCoordinator implements DomainCoordinator { public class ZookeeperDomainCoordinator implements DomainCoordinator {
// The locks are stored in a map, with the domain name as the key. This map will grow // The locks are stored in a map, with the domain name as the key. This map will grow
// relatively big, but should be manageable since the number of domains is limited to // relatively big, but should be manageable since the number of domains is limited to
// a few hundred thousand typically. // a few hundred thousand typically.
private final Map<String, InterProcessSemaphoreV2> locks = new ConcurrentHashMap<>(); private final Map<String, InterProcessSemaphoreV2> locks = new ConcurrentHashMap<>();
private final Map<String, Integer> waitCounts = new ConcurrentHashMap<>();
private final ServiceRegistryIf serviceRegistry; private final ServiceRegistryIf serviceRegistry;
private final int nodeId; private final int nodeId;
@@ -32,27 +36,35 @@ public class ZookeeperDomainCoordinator implements DomainCoordinator {
* and may be held by another thread. The caller is responsible for locking and releasing the lock. * and may be held by another thread. The caller is responsible for locking and releasing the lock.
*/ */
public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException { public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException {
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::createSemapore); final String key = domain.topDomain.toLowerCase();
var sem = locks.computeIfAbsent(key, this::createSemapore);
// Increment or add a wait count for the domain
waitCounts.compute(key, (k,value) -> (value == null ? 1 : value + 1));
try { try {
var lease = sem.acquire(); return new ZkDomainLock(sem, sem.acquire());
return new ZkDomainLock(sem, lease);
} }
catch (Exception e) { catch (Exception e) {
throw new RuntimeException("Failed to acquire lock for domain: " + domain.topDomain, e); throw new RuntimeException("Failed to acquire lock for domain: " + domain.topDomain, e);
} }
finally {
// Decrement or remove the wait count for the domain
waitCounts.compute(key, (k,value) -> (value == null || value <= 1) ? null : value - 1);
}
} }
public Optional<DomainLock> tryLockDomain(EdgeDomain domain) throws InterruptedException { public Optional<DomainLock> tryLockDomain(EdgeDomain domain) throws InterruptedException {
return tryLockDomain(domain, Duration.ofSeconds(1)); // Underlying semaphore doesn't have a tryLock method, so we use a short timeout return tryLockDomain(domain, Duration.ofSeconds(1)); // Underlying semaphore doesn't have a tryLock method, so we use a short timeout
} }
public Optional<DomainLock> tryLockDomain(EdgeDomain domain, Duration timeout) throws InterruptedException { public Optional<DomainLock> tryLockDomain(EdgeDomain domain, Duration timeout) throws InterruptedException {
final String key = domain.topDomain.toLowerCase();
var sem = locks.computeIfAbsent(key, this::createSemapore);
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::createSemapore); // Increment or add a wait count for the domain
waitCounts.compute(key, (k,value) -> (value == null ? 1 : value + 1));
try { try {
var lease = sem.acquire(timeout.toMillis(), TimeUnit.MILLISECONDS); // Acquire with timeout var lease = sem.acquire(timeout.toMillis(), TimeUnit.MILLISECONDS); // Acquire with timeout
if (lease != null) { if (lease != null) {
@@ -65,6 +77,9 @@ public class ZookeeperDomainCoordinator implements DomainCoordinator {
catch (Exception e) { catch (Exception e) {
return Optional.empty(); // If we fail to acquire the lock, we return an empty optional return Optional.empty(); // If we fail to acquire the lock, we return an empty optional
} }
finally {
waitCounts.compute(key, (k,value) -> (value == null || value <= 1) ? null : value - 1);
}
} }
private InterProcessSemaphoreV2 createSemapore(String topDomain){ private InterProcessSemaphoreV2 createSemapore(String topDomain){
@@ -81,7 +96,7 @@ public class ZookeeperDomainCoordinator implements DomainCoordinator {
* after this method returns true) * after this method returns true)
*/ */
public boolean isLockableHint(EdgeDomain domain) { public boolean isLockableHint(EdgeDomain domain) {
return true; // Curator does not provide a way to check if a lock is available without acquiring it return !waitCounts.containsKey(domain.topDomain.toLowerCase());
} }
public static class ZkDomainLock implements DomainLock { public static class ZkDomainLock implements DomainLock {

View File

@@ -185,12 +185,12 @@ public class PingDao {
return null; return null;
} }
public List<UpdateSchedule.UpdateJob<Long, HistoricalAvailabilityData>> getDomainUpdateSchedule(int nodeId) { public List<UpdateSchedule.UpdateJob<DomainReference, HistoricalAvailabilityData>> getDomainUpdateSchedule(int nodeId) {
List<UpdateSchedule.UpdateJob<Long, HistoricalAvailabilityData>> updateJobs = new ArrayList<>(); List<UpdateSchedule.UpdateJob<DomainReference, HistoricalAvailabilityData>> updateJobs = new ArrayList<>();
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement(""" var ps = conn.prepareStatement("""
SELECT ID, NEXT_SCHEDULED_UPDATE SELECT ID, DOMAIN_NAME, NEXT_SCHEDULED_UPDATE
FROM EC_DOMAIN FROM EC_DOMAIN
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION
ON EC_DOMAIN.ID = DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID ON EC_DOMAIN.ID = DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID
@@ -200,11 +200,13 @@ public class PingDao {
ps.setInt(1, nodeId); ps.setInt(1, nodeId);
ResultSet rs = ps.executeQuery(); ResultSet rs = ps.executeQuery();
while (rs.next()) { while (rs.next()) {
long domainId = rs.getLong("ID"); int domainId = rs.getInt("ID");
String domainName = rs.getString("DOMAIN_NAME");
var ts = rs.getTimestamp("NEXT_SCHEDULED_UPDATE"); var ts = rs.getTimestamp("NEXT_SCHEDULED_UPDATE");
Instant nextUpdate = ts == null ? Instant.now() : ts.toInstant(); Instant nextUpdate = ts == null ? Instant.now() : ts.toInstant();
updateJobs.add(new UpdateSchedule.UpdateJob<>(domainId, nextUpdate)); var ref = new DomainReference(domainId, nodeId, domainName.toLowerCase());
updateJobs.add(new UpdateSchedule.UpdateJob<>(ref, nextUpdate));
} }
} catch (SQLException e) { } catch (SQLException e) {
throw new RuntimeException("Failed to retrieve domain update schedule", e); throw new RuntimeException("Failed to retrieve domain update schedule", e);
@@ -241,7 +243,7 @@ public class PingDao {
else { else {
var record = new DomainDnsRecord(rs); var record = new DomainDnsRecord(rs);
updateJobs.add(new UpdateSchedule.UpdateJob<>( updateJobs.add(new UpdateSchedule.UpdateJob<>(
new RootDomainReference.ById(dnsRootDomainId), new RootDomainReference.ByIdAndName(dnsRootDomainId, rootDomainName),
Objects.requireNonNullElseGet(record.tsNextScheduledUpdate(), Instant::now)) Objects.requireNonNullElseGet(record.tsNextScheduledUpdate(), Instant::now))
); );
} }

View File

@@ -2,6 +2,7 @@ package nu.marginalia.ping;
import com.google.inject.Inject; import com.google.inject.Inject;
import nu.marginalia.coordination.DomainCoordinator; import nu.marginalia.coordination.DomainCoordinator;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.ping.model.*; import nu.marginalia.ping.model.*;
import nu.marginalia.ping.svc.DnsPingService; import nu.marginalia.ping.svc.DnsPingService;
import nu.marginalia.ping.svc.HttpPingService; import nu.marginalia.ping.svc.HttpPingService;
@@ -31,7 +32,7 @@ public class PingJobScheduler {
private static final UpdateSchedule<RootDomainReference, RootDomainReference> dnsUpdateSchedule private static final UpdateSchedule<RootDomainReference, RootDomainReference> dnsUpdateSchedule
= new UpdateSchedule<>(250_000); = new UpdateSchedule<>(250_000);
private static final UpdateSchedule<Long, HistoricalAvailabilityData> availabilityUpdateSchedule private static final UpdateSchedule<DomainReference, HistoricalAvailabilityData> availabilityUpdateSchedule
= new UpdateSchedule<>(250_000); = new UpdateSchedule<>(250_000);
public volatile Instant dnsLastSync = Instant.now(); public volatile Instant dnsLastSync = Instant.now();
@@ -138,7 +139,15 @@ public class PingJobScheduler {
continue; continue;
} }
long nextId = availabilityUpdateSchedule.next(); DomainReference ref = availabilityUpdateSchedule.nextIf(domain -> {
EdgeDomain domainObj = new EdgeDomain(domain.domainName());
if (!domainCoordinator.isLockableHint(domainObj)) {
return false; // Skip locked domains
}
return true; // Process this domain
});
long nextId = ref.domainId();
var data = pingDao.getHistoricalAvailabilityData(nextId); var data = pingDao.getHistoricalAvailabilityData(nextId);
if (data == null) { if (data == null) {
logger.warn("No availability data found for ID: {}", nextId); logger.warn("No availability data found for ID: {}", nextId);
@@ -163,7 +172,7 @@ public class PingJobScheduler {
for (var object : objects) { for (var object : objects) {
var ts = object.nextUpdateTime(); var ts = object.nextUpdateTime();
if (ts != null) { if (ts != null) {
availabilityUpdateSchedule.add(nextId, ts); availabilityUpdateSchedule.add(ref, ts);
break; break;
} }
} }
@@ -194,7 +203,7 @@ public class PingJobScheduler {
try { try {
List<WritableModel> objects = switch(ref) { List<WritableModel> objects = switch(ref) {
case RootDomainReference.ById(long id) -> { case RootDomainReference.ByIdAndName(long id, String name) -> {
var oldRecord = Objects.requireNonNull(pingDao.getDomainDnsRecord(id)); var oldRecord = Objects.requireNonNull(pingDao.getDomainDnsRecord(id));
yield dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord); yield dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord);
} }

View File

@@ -2,9 +2,8 @@ package nu.marginalia.ping;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.Collection; import java.util.*;
import java.util.Comparator; import java.util.function.Predicate;
import java.util.PriorityQueue;
/** In-memory schedule for updates, allowing jobs to be added and processed in order of their scheduled time. /** In-memory schedule for updates, allowing jobs to be added and processed in order of their scheduled time.
* This is not a particularly high-performance implementation, but exists to take contention off the database's * This is not a particularly high-performance implementation, but exists to take contention off the database's
@@ -23,6 +22,9 @@ public class UpdateSchedule<T, T2> {
notifyAll(); notifyAll();
} }
/** Returns the next job in the queue that is due to be processed.
* If no jobs are due, it will block until a job is added or a job becomes due.
* */
public synchronized T next() throws InterruptedException { public synchronized T next() throws InterruptedException {
while (true) { while (true) {
if (updateQueue.isEmpty()) { if (updateQueue.isEmpty()) {
@@ -44,6 +46,56 @@ public class UpdateSchedule<T, T2> {
} }
} }
/** Returns the first job in the queue matching the predicate that is not scheduled into the future,
* blocking until a job is added or a job becomes due.
*/
public synchronized T nextIf(Predicate<T> predicate) throws InterruptedException {
List<UpdateJob<T, T2>> rejectedJobs = new ArrayList<>();
try {
while (true) {
if (updateQueue.isEmpty()) {
wait(); // Wait for a new job to be added
continue;
}
UpdateJob<T, T2> job = updateQueue.peek();
Instant now = Instant.now();
if (job.updateTime.isAfter(now)) {
Duration toWait = Duration.between(now, job.updateTime);
// Return the rejected jobs to the queue for other threads to process
updateQueue.addAll(rejectedJobs);
if (!rejectedJobs.isEmpty())
notifyAll();
rejectedJobs.clear();
wait(Math.max(1, toWait.toMillis()));
} else {
var candidate = updateQueue.poll(); // Remove the job from the queue since it's due
assert candidate != null : "Update job should not be null at this point, since we just peeked it in a synchronized block";
if (!predicate.test(candidate.key())) {
rejectedJobs.add(candidate);
}
else {
return candidate.key();
}
}
}
}
finally {
// Return the rejected jobs to the queue for other threads to process
updateQueue.addAll(rejectedJobs);
if (!rejectedJobs.isEmpty())
notifyAll();
}
}
public synchronized void clear() { public synchronized void clear() {
updateQueue.clear(); updateQueue.clear();
notifyAll(); notifyAll();

View File

@@ -1,6 +1,6 @@
package nu.marginalia.ping.model; package nu.marginalia.ping.model;
public sealed interface RootDomainReference { public sealed interface RootDomainReference {
record ById(long id) implements RootDomainReference { } record ByIdAndName(long id, String name) implements RootDomainReference { }
record ByName(String name) implements RootDomainReference { } record ByName(String name) implements RootDomainReference { }
} }

View File

@@ -86,7 +86,9 @@ class AvailabilityJobSchedulerTest {
DomainDnsInformationFactory dnsDomainInformationFactory = new DomainDnsInformationFactory(processConfig, pic); DomainDnsInformationFactory dnsDomainInformationFactory = new DomainDnsInformationFactory(processConfig, pic);
PingJobScheduler pingJobScheduler = new PingJobScheduler( PingJobScheduler pingJobScheduler = new PingJobScheduler(
new HttpPingService(pingHttpFetcher, new HttpPingService(
new LocalDomainCoordinator(),
pingHttpFetcher,
new DomainAvailabilityInformationFactory(new GeoIpDictionary(), new BackoffStrategy(pic)), new DomainAvailabilityInformationFactory(new GeoIpDictionary(), new BackoffStrategy(pic)),
new DomainSecurityInformationFactory()), new DomainSecurityInformationFactory()),
new DnsPingService(new PingDnsFetcher(List.of("8.8.8.8", "8.8.4.4")), new DnsPingService(new PingDnsFetcher(List.of("8.8.8.8", "8.8.4.4")),

View File

@@ -2,6 +2,7 @@ package nu.marginalia.ping;
import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.coordination.LocalDomainCoordinator;
import nu.marginalia.geoip.GeoIpDictionary; import nu.marginalia.geoip.GeoIpDictionary;
import nu.marginalia.ping.fetcher.PingHttpFetcher; import nu.marginalia.ping.fetcher.PingHttpFetcher;
import nu.marginalia.ping.io.HttpClientProvider; import nu.marginalia.ping.io.HttpClientProvider;
@@ -63,6 +64,7 @@ class PingHttpServiceTest {
public void testGetSslInfo() throws Exception { public void testGetSslInfo() throws Exception {
var provider = new HttpClientProvider(); var provider = new HttpClientProvider();
var pingService = new HttpPingService( var pingService = new HttpPingService(
new LocalDomainCoordinator(),
new PingHttpFetcher(provider.get()), new PingHttpFetcher(provider.get()),
new DomainAvailabilityInformationFactory(new GeoIpDictionary(), new DomainAvailabilityInformationFactory(new GeoIpDictionary(),
new BackoffStrategy(PingModule.createPingIntervalsConfiguration()) new BackoffStrategy(PingModule.createPingIntervalsConfiguration())