1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 17:32:39 +02:00

Compare commits

...

13 Commits

Author SHA1 Message Date
Viktor Lofgren
67ad7a3bbc (ping) Enhance HTTP ping logic to retry GET requests for specific status codes and add sleep duration between retries 2025-06-13 12:59:56 +02:00
Viktor Lofgren
ed62ec8a35 (random) Sanitize random search results with DOMAIN_AVAILABILITY_INFORMATION join 2025-06-13 10:38:21 +02:00
Viktor Lofgren
42b24cfa34 (ping) Fix NPE in dnsJobConsumer 2025-06-12 14:22:09 +02:00
Viktor Lofgren
1ffaab2da6 (ping) Mute logging along the happy path now that things are working 2025-06-12 14:15:23 +02:00
Viktor Lofgren
5f93c7f767 (ping) Update PROC_PING_SPAWNER to use REALTIME from SIDELOAD 2025-06-12 14:04:09 +02:00
Viktor Lofgren
4001c68c82 (ping) Update SQL query to include NODE_AFFINITY in historical availability data retrieval 2025-06-12 13:58:50 +02:00
Viktor Lofgren
6b811489c5 (actor) Make ping spawner auto-spawn the process 2025-06-12 13:46:50 +02:00
Viktor Lofgren
e9d317c65d (ping) Parameterize thread counts for availability and DNS job consumers 2025-06-12 13:34:58 +02:00
Viktor Lofgren
16b05a4737 (ping) Reduce maximum total connections in HttpClientProvider to improve resource management 2025-06-12 13:04:55 +02:00
Viktor Lofgren
021cd73cbb (ping) Reduce db contention by moving job scheduling out of the database to RAM 2025-06-12 12:56:33 +02:00
Viktor Lofgren
4253bd53b5 (ping) Fix issue where errors were not correctly labeled in availability 2025-06-12 00:18:07 +02:00
Viktor Lofgren
14c87461a5 (ping) Fix issue where errors were not correctly labeled in availability 2025-06-12 00:04:39 +02:00
Viktor Lofgren
9afed0a18e (ping) Optimize parameters
Reduce socket and connection timeouts in HttpClient and adjust thread counts for job consumers
2025-06-11 16:21:45 +02:00
19 changed files with 549 additions and 412 deletions

View File

@@ -12,7 +12,7 @@ public enum ExecutorActor {
RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
RECRAWL_SINGLE_DOMAIN(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), RECRAWL_SINGLE_DOMAIN(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
PROC_CRAWLER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), PROC_CRAWLER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
PROC_PING_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.SIDELOAD), PROC_PING_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.REALTIME),
PROC_EXPORT_TASKS_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), PROC_EXPORT_TASKS_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
ADJACENCY_CALCULATION(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), ADJACENCY_CALCULATION(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
EXPORT_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), EXPORT_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),

View File

@@ -3,24 +3,184 @@ package nu.marginalia.actor.proc;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor; import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import nu.marginalia.actor.state.Terminal;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqapi.ping.PingRequest;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.nodecfg.model.NodeProfile;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Singleton @Singleton
public class PingMonitorActor extends AbstractProcessSpawnerActor { public class PingMonitorActor extends RecordActorPrototype {
@Inject private final MqPersistence persistence;
public PingMonitorActor(Gson gson, ServiceConfiguration configuration, MqPersistence persistence, ProcessService processService) { private final ProcessService processService;
super(gson,
configuration, private final Logger logger = LoggerFactory.getLogger(getClass());
persistence,
processService, public static final int MAX_ATTEMPTS = 3;
ProcessInboxNames.PING_INBOX, private final String inboxName;
ProcessService.ProcessId.PING); private final ProcessService.ProcessId processId;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final int node;
private final boolean isPrimaryNode;
private final Gson gson;
public record Initial() implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RETRY)
public record Monitor(int errorAttempts) implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RESTART)
public record Run(int attempts) implements ActorStep {}
@Terminal
public record Aborted() implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Initial i -> {
PingRequest request = new PingRequest(isPrimaryNode ? "primary": "secondary");
persistence.sendNewMessage(inboxName, null, null,
"PingRequest",
gson.toJson(request),
null);
yield new Monitor(0);
}
case Monitor(int errorAttempts) -> {
for (;;) {
var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty() && !processService.isRunning(processId)) {
synchronized (processId) {
processId.wait(5000);
}
if (errorAttempts > 0) { // Reset the error counter if there is silence in the inbox
yield new Monitor(0);
}
// else continue
} else {
// Special: Associate this thread with the message so that we can get tracking
MqMessageHandlerRegistry.register(messages.getFirst().msgId());
yield new Run(0);
}
}
}
case Run(int attempts) -> {
try {
long startTime = System.currentTimeMillis();
var exec = new TaskExecution();
long endTime = System.currentTimeMillis();
if (exec.isError()) {
if (attempts < MAX_ATTEMPTS)
yield new Run(attempts + 1);
else
yield new Error();
}
else if (endTime - startTime < TimeUnit.SECONDS.toMillis(1)) {
// To avoid boot loops, we transition to error if the process
// didn't run for longer than 1 seconds. This might happen if
// the process crashes before it can reach the heartbeat and inbox
// stages of execution. In this case it would not report having acted
// on its message, and the process would be restarted forever without
// the attempts counter incrementing.
yield new Error("Process terminated within 1 seconds of starting");
}
}
catch (InterruptedException ex) {
// We get this exception when the process is cancelled by the user
processService.kill(processId);
setCurrentMessageToDead();
yield new Aborted();
}
yield new Monitor(attempts);
}
default -> new Error();
};
} }
public String describe() {
return "Spawns a(n) " + processId + " process and monitors its inbox for messages";
}
@Inject
public PingMonitorActor(Gson gson,
NodeConfigurationService nodeConfigurationService,
ServiceConfiguration configuration,
MqPersistence persistence,
ProcessService processService) throws SQLException {
super(gson);
this.gson = gson;
this.node = configuration.node();
this.persistence = persistence;
this.processService = processService;
this.inboxName = ProcessInboxNames.PING_INBOX + ":" + node;
this.processId = ProcessService.ProcessId.PING;
this.isPrimaryNode = Set.of(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED)
.contains(nodeConfigurationService.get(node).profile());
}
/** Sets the message to dead in the database to avoid
* the service respawning on the same task when we
* re-enable this actor */
private void setCurrentMessageToDead() {
try {
var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty()) // Possibly a race condition where the task is already finished
return;
var theMessage = messages.iterator().next();
persistence.updateMessageState(theMessage.msgId(), MqMessageState.DEAD);
}
catch (SQLException ex) {
logger.error("Tried but failed to set the message for " + processId + " to dead", ex);
}
}
/** Encapsulates the execution of the process in a separate thread so that
* we can interrupt the thread if the process is cancelled */
private class TaskExecution {
private final AtomicBoolean error = new AtomicBoolean(false);
public TaskExecution() throws ExecutionException, InterruptedException {
// Run this call in a separate thread so that this thread can be interrupted waiting for it
executorService.submit(() -> {
try {
processService.trigger(processId);
} catch (Exception e) {
logger.warn("Error in triggering process", e);
error.set(true);
}
}).get(); // Wait for the process to start
}
public boolean isError() {
return error.get();
}
}
} }

View File

@@ -27,10 +27,12 @@ public class DbBrowseDomainsRandom {
public List<BrowseResult> getRandomDomains(int count, DomainBlacklist blacklist, int set) { public List<BrowseResult> getRandomDomains(int count, DomainBlacklist blacklist, int set) {
final String q = """ final String q = """
SELECT DOMAIN_ID, DOMAIN_NAME, INDEXED SELECT EC_RANDOM_DOMAINS.DOMAIN_ID, DOMAIN_NAME, INDEXED
FROM EC_RANDOM_DOMAINS FROM EC_RANDOM_DOMAINS
INNER JOIN EC_DOMAIN ON EC_DOMAIN.ID=DOMAIN_ID INNER JOIN EC_DOMAIN ON EC_DOMAIN.ID=DOMAIN_ID
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION DAI ON DAI.DOMAIN_ID=EC_RANDOM_DOMAINS.DOMAIN_ID
WHERE STATE<2 WHERE STATE<2
AND SERVER_AVAILABLE
AND DOMAIN_SET=? AND DOMAIN_SET=?
AND DOMAIN_ALIAS IS NULL AND DOMAIN_ALIAS IS NULL
ORDER BY RAND() ORDER BY RAND()

View File

@@ -15,6 +15,7 @@ import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Objects;
@Singleton @Singleton
public class PingDao { public class PingDao {
@@ -76,32 +77,6 @@ public class PingDao {
} }
} }
public List<DomainReference> getNewDomains(int nodeId, int cnt) throws SQLException {
List<DomainReference> domains = new ArrayList<>();
try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement("""
SELECT domain_id, domain_name
FROM EC_DOMAIN
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION
ON EC_DOMAIN.domain_id = DOMAIN_AVAILABILITY_INFORMATION.domain_id
WHERE DOMAIN_AVAILABILITY_INFORMATION.server_available IS NULL
AND EC_DOMAIN.NODE_ID = ?
LIMIT ?
"""))
{
ps.setInt(1, nodeId);
ps.setInt(2, cnt);
ResultSet rs = ps.executeQuery();
while (rs.next()) {
domains.add(new DomainReference(rs.getInt("domain_id"), nodeId, rs.getString("domain_name").toLowerCase()));
}
}
return domains;
}
public DomainAvailabilityRecord getDomainPingStatus(int domainId) throws SQLException { public DomainAvailabilityRecord getDomainPingStatus(int domainId) throws SQLException {
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
@@ -132,7 +107,7 @@ public class PingDao {
} }
} }
public DomainDnsRecord getDomainDnsRecord(int dnsRootDomainId) throws SQLException { public DomainDnsRecord getDomainDnsRecord(long dnsRootDomainId) throws SQLException {
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement("SELECT * FROM DOMAIN_DNS_INFORMATION WHERE DNS_ROOT_DOMAIN_ID = ?")) { var ps = conn.prepareStatement("SELECT * FROM DOMAIN_DNS_INFORMATION WHERE DNS_ROOT_DOMAIN_ID = ?")) {
@@ -160,111 +135,123 @@ public class PingDao {
} }
} }
public List<HistoricalAvailabilityData> getNextDomainPingStatuses(int count, int nodeId) throws SQLException { public HistoricalAvailabilityData getHistoricalAvailabilityData(long domainId) throws SQLException {
List<HistoricalAvailabilityData> domainAvailabilityRecords = new ArrayList<>(count);
var query = """ var query = """
SELECT DOMAIN_AVAILABILITY_INFORMATION.*, DOMAIN_SECURITY_INFORMATION.*, EC_DOMAIN.DOMAIN_NAME FROM DOMAIN_AVAILABILITY_INFORMATION SELECT EC_DOMAIN.ID, EC_DOMAIN.DOMAIN_NAME, EC_DOMAIN.NODE_AFFINITY, DOMAIN_AVAILABILITY_INFORMATION.*, DOMAIN_SECURITY_INFORMATION.*
LEFT JOIN DOMAIN_SECURITY_INFORMATION FROM EC_DOMAIN
ON DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID = DOMAIN_SECURITY_INFORMATION.DOMAIN_ID LEFT JOIN DOMAIN_SECURITY_INFORMATION ON DOMAIN_SECURITY_INFORMATION.DOMAIN_ID = EC_DOMAIN.ID
INNER JOIN EC_DOMAIN ON EC_DOMAIN.ID = DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION ON DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID = EC_DOMAIN.ID
WHERE NEXT_SCHEDULED_UPDATE <= ? AND DOMAIN_AVAILABILITY_INFORMATION.NODE_ID = ? WHERE EC_DOMAIN.ID = ?
ORDER BY NEXT_SCHEDULED_UPDATE ASC
LIMIT ?
"""; """;
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement(query)) { var ps = conn.prepareStatement(query)) {
// Use Java time since this is how we generate the timestamps in the ping process
// to avoid timezone weirdness. ps.setLong(1, domainId);
ps.setTimestamp(1, java.sql.Timestamp.from(Instant.now()));
ps.setInt(2, nodeId);
ps.setInt(3, count);
ResultSet rs = ps.executeQuery(); ResultSet rs = ps.executeQuery();
while (rs.next()) { while (rs.next()) {
String domainName = rs.getString("EC_DOMAIN.DOMAIN_NAME"); String domainName = rs.getString("EC_DOMAIN.DOMAIN_NAME");
var domainAvailabilityRecord = new DomainAvailabilityRecord(rs);
if (rs.getObject("DOMAIN_SECURITY_INFORMATION.DOMAIN_ID", Integer.class) != null) { DomainAvailabilityRecord dar;
var securityRecord = new DomainSecurityRecord(rs); DomainSecurityRecord dsr;
domainAvailabilityRecords.add(
new HistoricalAvailabilityData.AvailabilityAndSecurity(domainName, domainAvailabilityRecord, securityRecord) if (rs.getObject("DOMAIN_SECURITY_INFORMATION.DOMAIN_ID", Integer.class) != null)
); dsr = new DomainSecurityRecord(rs);
} else { else
domainAvailabilityRecords.add(new HistoricalAvailabilityData.JustAvailability(domainName, domainAvailabilityRecord)); dsr = null;
if (rs.getObject("DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID", Integer.class) != null)
dar = new DomainAvailabilityRecord(rs);
else
dar = null;
if (dar == null) {
return new HistoricalAvailabilityData.JustDomainReference(new DomainReference(
rs.getInt("EC_DOMAIN.ID"),
rs.getInt("EC_DOMAIN.NODE_AFFINITY"),
domainName.toLowerCase()
));
}
else {
if (dsr != null) {
return new HistoricalAvailabilityData.AvailabilityAndSecurity(domainName, dar, dsr);
} else {
return new HistoricalAvailabilityData.JustAvailability(domainName, dar);
}
} }
} }
} }
return domainAvailabilityRecords;
return null;
} }
public List<DomainDnsRecord> getNextDnsDomainRecords(int count, int nodeId) throws SQLException { public List<UpdateSchedule.UpdateJob<Long, HistoricalAvailabilityData>> getDomainUpdateSchedule(int nodeId) {
List<DomainDnsRecord> domainDnsRecords = new ArrayList<>(count); List<UpdateSchedule.UpdateJob<Long, HistoricalAvailabilityData>> updateJobs = new ArrayList<>();
var query = """
SELECT * FROM DOMAIN_DNS_INFORMATION
WHERE TS_NEXT_DNS_CHECK <= ? AND NODE_AFFINITY = ?
ORDER BY DNS_CHECK_PRIORITY DESC, TS_NEXT_DNS_CHECK ASC
LIMIT ?
""";
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement(query)) { var ps = conn.prepareStatement("""
ps.setTimestamp(1, java.sql.Timestamp.from(Instant.now())); SELECT ID, NEXT_SCHEDULED_UPDATE
ps.setInt(2, nodeId); FROM EC_DOMAIN
ps.setInt(3, count); LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION
ON EC_DOMAIN.ID = DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID
WHERE NODE_AFFINITY = ?
""")) {
ps.setFetchSize(10_000);
ps.setInt(1, nodeId);
ResultSet rs = ps.executeQuery(); ResultSet rs = ps.executeQuery();
while (rs.next()) { while (rs.next()) {
domainDnsRecords.add(new DomainDnsRecord(rs)); long domainId = rs.getLong("ID");
var ts = rs.getTimestamp("NEXT_SCHEDULED_UPDATE");
Instant nextUpdate = ts == null ? Instant.now() : ts.toInstant();
updateJobs.add(new UpdateSchedule.UpdateJob<>(domainId, nextUpdate));
} }
} catch (SQLException e) {
throw new RuntimeException("Failed to retrieve domain update schedule", e);
} }
return domainDnsRecords;
logger.info("Found {} availability update jobs for node {}", updateJobs.size(), nodeId);
return updateJobs;
} }
public List<DomainReference> getOrphanedDomains(int nodeId) { public List<UpdateSchedule.UpdateJob<RootDomainReference, RootDomainReference>> getDnsUpdateSchedule(int nodeId) {
List<DomainReference> orphanedDomains = new ArrayList<>(); List<UpdateSchedule.UpdateJob<RootDomainReference, RootDomainReference>> updateJobs = new ArrayList<>();
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement(""" var ps = conn.prepareStatement("""
SELECT e.DOMAIN_NAME, e.ID SELECT DISTINCT(DOMAIN_TOP),DOMAIN_DNS_INFORMATION.* FROM EC_DOMAIN
FROM EC_DOMAIN e LEFT JOIN DOMAIN_DNS_INFORMATION ON ROOT_DOMAIN_NAME = DOMAIN_TOP
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION d ON e.ID = d.DOMAIN_ID WHERE EC_DOMAIN.NODE_AFFINITY = ?
WHERE d.DOMAIN_ID IS NULL AND e.NODE_AFFINITY = ?;
""")) { """)) {
stmt.setInt(1, nodeId); ps.setFetchSize(10_000);
stmt.setFetchSize(10_000); ps.setInt(1, nodeId);
ResultSet rs = stmt.executeQuery(); ResultSet rs = ps.executeQuery();
while (rs.next()) { while (rs.next()) {
String domainName = rs.getString("DOMAIN_NAME"); Long dnsRootDomainId = rs.getObject("DOMAIN_DNS_INFORMATION.DNS_ROOT_DOMAIN_ID", Long.class);
int domainId = rs.getInt("ID"); String rootDomainName = rs.getString("DOMAIN_TOP");
orphanedDomains.add(new DomainReference(domainId, nodeId, domainName)); if (dnsRootDomainId == null) {
updateJobs.add(
new UpdateSchedule.UpdateJob<>(
new RootDomainReference.ByName(rootDomainName),
Instant.now())
);
}
else {
var record = new DomainDnsRecord(rs);
updateJobs.add(new UpdateSchedule.UpdateJob<>(
new RootDomainReference.ById(dnsRootDomainId),
Objects.requireNonNullElseGet(record.tsNextScheduledUpdate(), Instant::now))
);
}
} }
} } catch (SQLException e) {
catch (SQLException e) { throw new RuntimeException("Failed to retrieve DNS update schedule", e);
throw new RuntimeException("Failed to retrieve orphaned domains", e);
} }
return orphanedDomains; logger.info("Found {} dns update jobs for node {}", updateJobs.size(), nodeId);
}
public List<String> getOrphanedRootDomains(int nodeId) { return updateJobs;
List<String> orphanedDomains = new ArrayList<>();
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT DISTINCT(DOMAIN_TOP)
FROM EC_DOMAIN e
LEFT JOIN DOMAIN_DNS_INFORMATION d ON e.DOMAIN_TOP = d.ROOT_DOMAIN_NAME
WHERE d.ROOT_DOMAIN_NAME IS NULL AND e.NODE_AFFINITY = ?;
""")) {
stmt.setInt(1, nodeId);
stmt.setFetchSize(10_000);
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
String domainName = rs.getString("DOMAIN_TOP");
orphanedDomains.add(domainName.toLowerCase());
}
}
catch (SQLException e) {
throw new RuntimeException("Failed to retrieve orphaned domains", e);
}
return orphanedDomains;
} }
} }

View File

@@ -4,15 +4,15 @@ import com.google.inject.Inject;
import nu.marginalia.ping.model.*; import nu.marginalia.ping.model.*;
import nu.marginalia.ping.svc.DnsPingService; import nu.marginalia.ping.svc.DnsPingService;
import nu.marginalia.ping.svc.HttpPingService; import nu.marginalia.ping.svc.HttpPingService;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ArrayBlockingQueue; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** PingJobScheduler is responsible for scheduling and processing ping jobs /** PingJobScheduler is responsible for scheduling and processing ping jobs
@@ -27,50 +27,13 @@ public class PingJobScheduler {
private static final Logger logger = LoggerFactory.getLogger(PingJobScheduler.class); private static final Logger logger = LoggerFactory.getLogger(PingJobScheduler.class);
sealed interface DnsJob { private static final UpdateSchedule<RootDomainReference, RootDomainReference> dnsUpdateSchedule
Object reference(); = new UpdateSchedule<>(250_000);
private static final UpdateSchedule<Long, HistoricalAvailabilityData> availabilityUpdateSchedule
= new UpdateSchedule<>(250_000);
record DnsFetch(String rootDomain) implements DnsJob { public volatile Instant dnsLastSync = Instant.now();
@Override public volatile Instant availabilityLastSync = Instant.now();
public Object reference() {
return rootDomain;
}
}
record DnsRefresh(DomainDnsRecord oldRecord) implements DnsJob {
@Override
public Object reference() {
return oldRecord.rootDomainName();
}
}
}
sealed interface AvailabilityJob {
Object reference();
record Availability(DomainReference domainReference) implements AvailabilityJob {
@Override
public Object reference() {
return domainReference.domainName();
}
}
record AvailabilityRefresh(String domain, @NotNull DomainAvailabilityRecord availability, @Nullable DomainSecurityRecord securityRecord) implements AvailabilityJob {
@Override
public Object reference() {
return domain;
}
}
}
// Keeps track of ongoing ping and DNS processing to avoid duplicate work,
// which is mainly a scenario that will occur when there is not a lot of data
// in the database. In real-world scenarios, the queues will be full most
// of the time, and prevent this from being an issue.
private static final ConcurrentHashMap<Object, Boolean> processingDomainsAvailability = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<Object, Boolean> processingDomainsDns = new ConcurrentHashMap<>();
private static final ArrayBlockingQueue<DnsJob> dnsJobQueue = new ArrayBlockingQueue<>(8);
private static final ArrayBlockingQueue<AvailabilityJob> availabilityJobQueue = new ArrayBlockingQueue<>(8);
public volatile Integer nodeId = null; public volatile Integer nodeId = null;
public volatile boolean running = false; public volatile boolean running = false;
@@ -95,15 +58,16 @@ public class PingJobScheduler {
running = true; running = true;
allThreads.add(Thread.ofPlatform().daemon().name("new-dns").start(this::fetchNewDnsRecords)); allThreads.add(Thread.ofPlatform().daemon().name("sync-dns").start(this::syncAvailabilityJobs));
allThreads.add(Thread.ofPlatform().daemon().name("new-availability").start(this::fetchNewAvailabilityJobs)); allThreads.add(Thread.ofPlatform().daemon().name("sync-availability").start(this::syncDnsRecords));
allThreads.add(Thread.ofPlatform().daemon().name("update-availability").start(this::updateAvailabilityJobs));
allThreads.add(Thread.ofPlatform().daemon().name("update-dns").start(this::updateDnsJobs));
for (int i = 0; i < 8; i++) { int availabilityThreads = Integer.getInteger("ping.availabilityThreads", 8);
int pingThreads = Integer.getInteger("ping.dnsThreads", 2);
for (int i = 0; i < availabilityThreads; i++) {
allThreads.add(Thread.ofPlatform().daemon().name("availability-job-consumer-" + i).start(this::availabilityJobConsumer)); allThreads.add(Thread.ofPlatform().daemon().name("availability-job-consumer-" + i).start(this::availabilityJobConsumer));
} }
for (int i = 0; i < 2; i++) { for (int i = 0; i < pingThreads; i++) {
allThreads.add(Thread.ofPlatform().daemon().name("dns-job-consumer-" + i).start(this::dnsJobConsumer)); allThreads.add(Thread.ofPlatform().daemon().name("dns-job-consumer-" + i).start(this::dnsJobConsumer));
} }
} }
@@ -122,19 +86,33 @@ public class PingJobScheduler {
} }
public void pause(int nodeId) { public void pause(int nodeId) {
logger.info("Pausing PingJobScheduler for nodeId: {}", nodeId);
if (this.nodeId != null && this.nodeId != nodeId) { if (this.nodeId != null && this.nodeId != nodeId) {
logger.warn("Attempted to pause PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId); logger.warn("Attempted to pause PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId);
return; return;
} }
this.nodeId = null; this.nodeId = null;
availabilityUpdateSchedule.clear();
dnsUpdateSchedule.clear();
logger.info("PingJobScheduler paused"); logger.info("PingJobScheduler paused");
} }
public synchronized void resume(int nodeId) { public synchronized void resume(int nodeId) {
logger.info("Resuming PingJobScheduler for nodeId: {}", nodeId);
if (this.nodeId != null) { if (this.nodeId != null) {
logger.warn("Attempted to resume PingJobScheduler with mismatched nodeId: expected null, got {}", this.nodeId, nodeId); logger.warn("Attempted to resume PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId);
return; return;
} }
availabilityUpdateSchedule.replaceQueue(pingDao.getDomainUpdateSchedule(nodeId));
dnsUpdateSchedule.replaceQueue(pingDao.getDnsUpdateSchedule(nodeId));
dnsLastSync = Instant.now();
availabilityLastSync = Instant.now();
// Flag that we are running again
this.nodeId = nodeId; this.nodeId = nodeId;
notifyAll(); notifyAll();
@@ -150,32 +128,44 @@ public class PingJobScheduler {
private void availabilityJobConsumer() { private void availabilityJobConsumer() {
while (running) { while (running) {
try { try {
AvailabilityJob job = availabilityJobQueue.poll(1, TimeUnit.SECONDS); Integer nid = nodeId;
if (job == null) { if (nid == null) {
continue; // No job available, continue to the next iteration waitForResume();
continue;
}
long nextId = availabilityUpdateSchedule.next();
var data = pingDao.getHistoricalAvailabilityData(nextId);
if (data == null) {
logger.warn("No availability data found for ID: {}", nextId);
continue; // No data to process, skip this iteration
} }
try { try {
switch (job) { List<WritableModel> objects = switch (data) {
case AvailabilityJob.Availability(DomainReference reference) -> { case HistoricalAvailabilityData.JustDomainReference(DomainReference reference)
logger.info("Availability check: {}", reference.domainName()); -> httpPingService.pingDomain(reference, null, null);
pingDao.write(httpPingService.pingDomain(reference, null, null)); case HistoricalAvailabilityData.JustAvailability(String domain, DomainAvailabilityRecord record)
} -> httpPingService.pingDomain(
case AvailabilityJob.AvailabilityRefresh(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security) -> { new DomainReference(record.domainId(), record.nodeId(), domain), record, null);
logger.info("Availability check with reference: {}", domain); case HistoricalAvailabilityData.AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security)
pingDao.write(httpPingService.pingDomain( -> httpPingService.pingDomain(
new DomainReference(availability.domainId(), availability.nodeId(), domain), new DomainReference(availability.domainId(), availability.nodeId(), domain), availability, security);
availability, };
security));
pingDao.write(objects);
// Re-schedule the next update time for the domain
for (var object : objects) {
var ts = object.nextUpdateTime();
if (ts != null) {
availabilityUpdateSchedule.add(nextId, ts);
break;
} }
} }
} }
catch (Exception e) { catch (Exception e) {
logger.error("Error processing availability job for domain: " + job.reference(), e); logger.error("Error processing availability job for domain: " + data.domain(), e);
}
finally {
// Remove the domain from the processing map
processingDomainsAvailability.remove(job.reference());
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@@ -190,30 +180,41 @@ public class PingJobScheduler {
private void dnsJobConsumer() { private void dnsJobConsumer() {
while (running) { while (running) {
try { try {
DnsJob job = dnsJobQueue.poll(1, TimeUnit.SECONDS); Integer nid = nodeId;
if (job == null) { if (nid == null) {
continue; // No job available, continue to the next iteration waitForResume();
continue;
} }
RootDomainReference ref = dnsUpdateSchedule.next();
try { try {
switch (job) { List<WritableModel> objects = switch(ref) {
case DnsJob.DnsFetch(String rootDomain) -> { case RootDomainReference.ById(long id) -> {
logger.info("Fetching DNS records for root domain: {}", rootDomain); var oldRecord = Objects.requireNonNull(pingDao.getDomainDnsRecord(id));
pingDao.write(dnsPingService.pingDomain(rootDomain, null)); yield dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord);
} }
case DnsJob.DnsRefresh(DomainDnsRecord oldRecord) -> { case RootDomainReference.ByName(String name) -> {
logger.info("Refreshing DNS records for domain: {}", oldRecord.rootDomainName()); @Nullable var oldRecord = pingDao.getDomainDnsRecord(name);
pingDao.write(dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord)); yield dnsPingService.pingDomain(name, oldRecord);
}
};
pingDao.write(objects);
// Re-schedule the next update time for the domain
for (var object : objects) {
var ts = object.nextUpdateTime();
if (ts != null) {
dnsUpdateSchedule.add(ref, ts);
break;
} }
} }
} }
catch (Exception e) { catch (Exception e) {
logger.error("Error processing DNS job for domain: " + job.reference(), e); logger.error("Error processing DNS job for domain: " + ref, e);
}
finally {
// Remove the domain from the processing map
processingDomainsDns.remove(job.reference());
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
logger.error("DNS job consumer interrupted", e); logger.error("DNS job consumer interrupted", e);
@@ -224,38 +225,27 @@ public class PingJobScheduler {
} }
} }
private void fetchNewAvailabilityJobs() { private void syncAvailabilityJobs() {
try { try {
while (running) { while (running) {
// If we are suspended, wait for resume
Integer nid = nodeId; Integer nid = nodeId;
if (nid == null) { if (nid == null) {
waitForResume(); waitForResume();
continue; // re-fetch the records after resuming continue;
} }
List<DomainReference> domains = pingDao.getOrphanedDomains(nid); // Check if we need to refresh the availability data
for (DomainReference domain : domains) { Instant nextRefresh = availabilityLastSync.plus(Duration.ofHours(24));
if (Instant.now().isBefore(nextRefresh)) {
if (nodeId == null) { Duration remaining = Duration.between(Instant.now(), nextRefresh);
waitForResume(); TimeUnit.MINUTES.sleep(Math.max(1, remaining.toMinutes()));
break; // re-fetch the records after resuming continue;
}
try {
availabilityJobQueue.put(new AvailabilityJob.Availability(domain));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Failed to add new ping job for domain: " + domain, e);
}
} }
// This is an incredibly expensive operation, so we only do it once a day availabilityUpdateSchedule.replaceQueue(pingDao.getDomainUpdateSchedule(nid));
try { availabilityLastSync = Instant.now();
TimeUnit.HOURS.sleep(24);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} }
} }
catch (Exception e) { catch (Exception e) {
@@ -263,32 +253,26 @@ public class PingJobScheduler {
} }
} }
private void fetchNewDnsRecords() { private void syncDnsRecords() {
try { try {
while (running) { while (running) {
Integer nid = nodeId; Integer nid = nodeId;
if (nid == null) { if (nid == null) {
waitForResume(); waitForResume();
continue; // re-fetch the records after resuming continue; // re-fetch the records after resuming
} }
List<String> rootDomains = pingDao.getOrphanedRootDomains(nid); // Check if we need to refresh the availability data
for (String rootDomain : rootDomains) { Instant nextRefresh = dnsLastSync.plus(Duration.ofHours(24));
if (Instant.now().isBefore(nextRefresh)) {
if (nodeId == null) { Duration remaining = Duration.between(Instant.now(), nextRefresh);
waitForResume(); TimeUnit.MINUTES.sleep(Math.max(1, remaining.toMinutes()));
break; // re-fetch the records after resuming continue;
}
try {
dnsJobQueue.put(new DnsJob.DnsFetch(rootDomain));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Failed to add new DNS job for root domain: " + rootDomain, e);
}
} }
// This is an incredibly expensive operation, so we only do it once a day
TimeUnit.HOURS.sleep(24); dnsUpdateSchedule.replaceQueue(pingDao.getDnsUpdateSchedule(nid));
dnsLastSync = Instant.now();
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@@ -296,68 +280,5 @@ public class PingJobScheduler {
} }
} }
private void updateAvailabilityJobs() {
while (running) {
try {
Integer nid = nodeId;
if (nid == null) {
waitForResume();
continue; // re-fetch the records after resuming
}
var statuses = pingDao.getNextDomainPingStatuses(100, nid);
if (nodeId == null) {
waitForResume();
break; // re-fetch the records after resuming
}
for (var status : statuses) {
var job = switch (status) {
case HistoricalAvailabilityData.JustAvailability(String domain, DomainAvailabilityRecord record)
-> new AvailabilityJob.AvailabilityRefresh(domain, record, null);
case HistoricalAvailabilityData.AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security)
-> new AvailabilityJob.AvailabilityRefresh(domain, availability, security);
};
if (processingDomainsAvailability.putIfAbsent(job.reference(), true) == null) {
availabilityJobQueue.put(job);
}
}
}
catch (Exception e) {
logger.error("Error fetching next domain ping statuses", e);
}
}
}
private void updateDnsJobs() {
while (running) {
try {
Integer nid = nodeId;
if (nid == null) {
waitForResume();
continue; // re-fetch the records after resuming
}
var dnsRecords = pingDao.getNextDnsDomainRecords(1000, nid);
for (var record : dnsRecords) {
if (nodeId == null) {
waitForResume();
break; // re-fetch the records after resuming
}
if (processingDomainsDns.putIfAbsent(record.rootDomainName(), true) == null) {
dnsJobQueue.put(new DnsJob.DnsRefresh(record));
}
}
}
catch (Exception e) {
logger.error("Error fetching next domain DNS records", e);
}
}
}
} }

View File

@@ -0,0 +1,57 @@
package nu.marginalia.ping;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Comparator;
import java.util.PriorityQueue;
/** In-memory schedule for updates, allowing jobs to be added and processed in order of their scheduled time.
* This is not a particularly high-performance implementation, but exists to take contention off the database's
* timestamp index.
* */
public class UpdateSchedule<T, T2> {
private final PriorityQueue<UpdateJob<T, T2>> updateQueue;
public record UpdateJob<T, T2>(T key, Instant updateTime) {}
public UpdateSchedule(int initialCapacity) {
updateQueue = new PriorityQueue<>(initialCapacity, Comparator.comparing(UpdateJob::updateTime));
}
public synchronized void add(T key, Instant updateTime) {
updateQueue.add(new UpdateJob<>(key, updateTime));
notifyAll();
}
public synchronized T next() throws InterruptedException {
while (true) {
if (updateQueue.isEmpty()) {
wait(); // Wait for a new job to be added
continue;
}
UpdateJob<T, T2> job = updateQueue.peek();
Instant now = Instant.now();
if (job.updateTime.isAfter(now)) {
Duration toWait = Duration.between(now, job.updateTime);
wait(Math.max(1, toWait.toMillis()));
}
else {
updateQueue.poll(); // Remove the job from the queue since it's due
return job.key();
}
}
}
public synchronized void clear() {
updateQueue.clear();
notifyAll();
}
public synchronized void replaceQueue(Collection<UpdateJob<T,T2>> newJobs) {
updateQueue.clear();
updateQueue.addAll(newJobs);
notifyAll();
}
}

View File

@@ -3,6 +3,8 @@ package nu.marginalia.ping.fetcher;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.name.Named; import com.google.inject.name.Named;
import nu.marginalia.ping.model.SingleDnsRecord; import nu.marginalia.ping.model.SingleDnsRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xbill.DNS.ExtendedResolver; import org.xbill.DNS.ExtendedResolver;
import org.xbill.DNS.Lookup; import org.xbill.DNS.Lookup;
import org.xbill.DNS.TextParseException; import org.xbill.DNS.TextParseException;
@@ -17,6 +19,7 @@ import java.util.concurrent.*;
public class PingDnsFetcher { public class PingDnsFetcher {
private final ThreadLocal<ExtendedResolver> resolver; private final ThreadLocal<ExtendedResolver> resolver;
private static final ExecutorService digExecutor = Executors.newFixedThreadPool(100); private static final ExecutorService digExecutor = Executors.newFixedThreadPool(100);
private static final Logger logger = LoggerFactory.getLogger(PingDnsFetcher.class);
private static final int[] RECORD_TYPES = { private static final int[] RECORD_TYPES = {
Type.A, Type.AAAA, Type.NS, Type.MX, Type.TXT, Type.A, Type.AAAA, Type.NS, Type.MX, Type.TXT,
@@ -25,8 +28,7 @@ public class PingDnsFetcher {
@Inject @Inject
public PingDnsFetcher(@Named("ping.nameservers") public PingDnsFetcher(@Named("ping.nameservers")
List<String> nameservers) throws UnknownHostException List<String> nameservers) {
{
resolver = ThreadLocal.withInitial(() -> createResolver(nameservers)); resolver = ThreadLocal.withInitial(() -> createResolver(nameservers));
} }
@@ -81,13 +83,12 @@ public class PingDnsFetcher {
try { try {
results.addAll(future.get(1, TimeUnit.MINUTES)); results.addAll(future.get(1, TimeUnit.MINUTES));
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); logger.error("Error fetching DNS records", e);
System.err.println("Error fetching DNS records: " + e.getMessage());
} }
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
System.err.println("DNS query interrupted: " + e.getMessage()); logger.error("DNS query interrupted", e);
} }
return results; return results;
} }

View File

@@ -83,7 +83,7 @@ public class PingHttpFetcher {
} catch (SocketTimeoutException ex) { } catch (SocketTimeoutException ex) {
return new TimeoutResponse(ex.getMessage()); return new TimeoutResponse(ex.getMessage());
} catch (IOException e) { } catch (IOException e) {
return new ConnectionError(e.getMessage()); return new ConnectionError(e.getClass().getSimpleName());
} }
} }

View File

@@ -44,14 +44,14 @@ public class HttpClientProvider implements Provider<HttpClient> {
private static CloseableHttpClient createClient() throws NoSuchAlgorithmException { private static CloseableHttpClient createClient() throws NoSuchAlgorithmException {
final ConnectionConfig connectionConfig = ConnectionConfig.custom() final ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setSocketTimeout(30, TimeUnit.SECONDS) .setSocketTimeout(15, TimeUnit.SECONDS)
.setConnectTimeout(30, TimeUnit.SECONDS) .setConnectTimeout(15, TimeUnit.SECONDS)
.setValidateAfterInactivity(TimeValue.ofSeconds(5)) .setValidateAfterInactivity(TimeValue.ofSeconds(5))
.build(); .build();
connectionManager = PoolingHttpClientConnectionManagerBuilder.create() connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
.setMaxConnPerRoute(2) .setMaxConnPerRoute(2)
.setMaxConnTotal(5000) .setMaxConnTotal(50)
.setDefaultConnectionConfig(connectionConfig) .setDefaultConnectionConfig(connectionConfig)
.setTlsSocketStrategy( .setTlsSocketStrategy(
new DefaultClientTlsStrategy(SSLContext.getDefault(), NoopHostnameVerifier.INSTANCE)) new DefaultClientTlsStrategy(SSLContext.getDefault(), NoopHostnameVerifier.INSTANCE))

View File

@@ -70,6 +70,11 @@ implements WritableModel
return millis == null ? null : Duration.ofMillis(millis); return millis == null ? null : Duration.ofMillis(millis);
} }
@Override
public Instant nextUpdateTime() {
return nextScheduledUpdate;
}
@Override @Override
public void write(Connection connection) throws SQLException { public void write(Connection connection) throws SQLException {
try (var ps = connection.prepareStatement( try (var ps = connection.prepareStatement(

View File

@@ -60,6 +60,11 @@ public record DomainDnsRecord(
return new Builder(); return new Builder();
} }
@Override
public Instant nextUpdateTime() {
return tsNextScheduledUpdate;
}
@Override @Override
public void write(Connection connection) throws SQLException { public void write(Connection connection) throws SQLException {

View File

@@ -203,7 +203,6 @@ public record DomainSecurityRecord(
if (sslCertFingerprintSha256() == null) { if (sslCertFingerprintSha256() == null) {
ps.setNull(12, java.sql.Types.BINARY); ps.setNull(12, java.sql.Types.BINARY);
} else { } else {
System.out.println(sslCertFingerprintSha256().length);
ps.setBytes(12, sslCertFingerprintSha256()); ps.setBytes(12, sslCertFingerprintSha256());
} }
if (sslCertSan() == null) { if (sslCertSan() == null) {

View File

@@ -1,6 +1,13 @@
package nu.marginalia.ping.model; package nu.marginalia.ping.model;
public sealed interface HistoricalAvailabilityData { public sealed interface HistoricalAvailabilityData {
public String domain();
record JustDomainReference(DomainReference domainReference) implements HistoricalAvailabilityData {
@Override
public String domain() {
return domainReference.domainName();
}
}
record JustAvailability(String domain, DomainAvailabilityRecord record) implements HistoricalAvailabilityData {} record JustAvailability(String domain, DomainAvailabilityRecord record) implements HistoricalAvailabilityData {}
record AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availabilityRecord, DomainSecurityRecord securityRecord) implements HistoricalAvailabilityData {} record AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availabilityRecord, DomainSecurityRecord securityRecord) implements HistoricalAvailabilityData {}
} }

View File

@@ -0,0 +1,6 @@
package nu.marginalia.ping.model;
public sealed interface RootDomainReference {
record ById(long id) implements RootDomainReference { }
record ByName(String name) implements RootDomainReference { }
}

View File

@@ -1,8 +1,14 @@
package nu.marginalia.ping.model; package nu.marginalia.ping.model;
import javax.annotation.Nullable;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.time.Instant;
public interface WritableModel { public interface WritableModel {
void write(Connection connection) throws SQLException; void write(Connection connection) throws SQLException;
@Nullable
default Instant nextUpdateTime() {
return null;
}
} }

View File

@@ -48,7 +48,6 @@ public class DnsPingService {
switch (changes) { switch (changes) {
case DnsRecordChange.None _ -> {} case DnsRecordChange.None _ -> {}
case DnsRecordChange.Changed changed -> { case DnsRecordChange.Changed changed -> {
logger.info("DNS record for {} changed: {}", newRecord.dnsRootDomainId(), changed);
generatedRecords.add(DomainDnsEvent.builder() generatedRecords.add(DomainDnsEvent.builder()
.rootDomainId(newRecord.dnsRootDomainId()) .rootDomainId(newRecord.dnsRootDomainId())
.nodeId(newRecord.nodeAffinity()) .nodeId(newRecord.nodeAffinity())

View File

@@ -71,23 +71,40 @@ public class DomainAvailabilityInformationFactory {
@Nullable DomainAvailabilityRecord previousRecord, @Nullable DomainAvailabilityRecord previousRecord,
HttpResponse rsp) { HttpResponse rsp) {
Instant lastError = previousRecord != null ? previousRecord.tsLastAvailable() : null; final boolean isAvailable;
final Instant now = Instant.now();
final Instant lastAvailable;
final Instant lastError;
final ErrorClassification errorClassification;
if (rsp.httpStatus() >= 400) {
isAvailable = false;
lastError = now;
lastAvailable = previousRecord != null ? previousRecord.tsLastAvailable() : null;
errorClassification = ErrorClassification.HTTP_SERVER_ERROR;
} else {
isAvailable = true;
lastAvailable = now;
lastError = previousRecord != null ? previousRecord.tsLastError() : null;
errorClassification = ErrorClassification.NONE;
}
return DomainAvailabilityRecord.builder() return DomainAvailabilityRecord.builder()
.domainId(domainId) .domainId(domainId)
.nodeId(nodeId) .nodeId(nodeId)
.serverAvailable(true) .serverAvailable(isAvailable)
.serverIp(address != null ? address.getAddress() : null) .serverIp(address != null ? address.getAddress() : null)
.serverIpAsn(getAsn(address)) .serverIpAsn(getAsn(address))
.httpSchema(HttpSchema.HTTP) .httpSchema(HttpSchema.HTTP)
.httpStatus(rsp.httpStatus()) .httpStatus(rsp.httpStatus())
.errorClassification(errorClassification)
.httpResponseTime(rsp.httpResponseTime()) .httpResponseTime(rsp.httpResponseTime())
.httpEtag(rsp.headers().getFirst("ETag")) .httpEtag(rsp.headers().getFirst("ETag"))
.httpLastModified(rsp.headers().getFirst("Last-Modified")) .httpLastModified(rsp.headers().getFirst("Last-Modified"))
.tsLastPing(Instant.now()) .tsLastPing(now)
.tsLastAvailable(Instant.now()) .tsLastAvailable(lastAvailable)
.tsLastError(lastError) .tsLastError(lastError)
.nextScheduledUpdate(Instant.now().plus(backoffStrategy.getOkInterval())) .nextScheduledUpdate(now.plus(backoffStrategy.getOkInterval()))
.backoffFetchInterval(backoffStrategy.getOkInterval()) .backoffFetchInterval(backoffStrategy.getOkInterval())
.build(); .build();
@@ -117,23 +134,44 @@ public class DomainAvailabilityInformationFactory {
updateTime = Instant.now().plus(backoffStrategy.getOkInterval()); updateTime = Instant.now().plus(backoffStrategy.getOkInterval());
} }
Instant lastError = previousRecord != null ? previousRecord.tsLastAvailable() : null; final boolean isAvailable;
final Instant now = Instant.now();
final Instant lastAvailable;
final Instant lastError;
final ErrorClassification errorClassification;
if (!validationResult.isValid()) {
isAvailable = false;
lastError = now;
lastAvailable = previousRecord != null ? previousRecord.tsLastAvailable() : null;
errorClassification = ErrorClassification.SSL_ERROR;
} else if (rsp.httpStatus() >= 400) {
isAvailable = false;
lastError = now;
lastAvailable = previousRecord != null ? previousRecord.tsLastAvailable() : null;
errorClassification = ErrorClassification.HTTP_SERVER_ERROR;
} else {
isAvailable = true;
lastAvailable = Instant.now();
lastError = previousRecord != null ? previousRecord.tsLastError() : null;
errorClassification = ErrorClassification.NONE;
}
return DomainAvailabilityRecord.builder() return DomainAvailabilityRecord.builder()
.domainId(domainId) .domainId(domainId)
.nodeId(nodeId) .nodeId(nodeId)
.serverAvailable(validationResult.isValid()) .serverAvailable(isAvailable)
.serverIp(address != null ? address.getAddress() : null) .serverIp(address != null ? address.getAddress() : null)
.serverIpAsn(getAsn(address)) .serverIpAsn(getAsn(address))
.httpSchema(HttpSchema.HTTPS) .httpSchema(HttpSchema.HTTPS)
.httpStatus(rsp.httpStatus()) .httpStatus(rsp.httpStatus())
.errorClassification(!validationResult.isValid() ? ErrorClassification.SSL_ERROR : ErrorClassification.NONE) .errorClassification(errorClassification)
.httpResponseTime(rsp.httpResponseTime()) // Placeholder, actual timing not implemented .httpResponseTime(rsp.httpResponseTime()) // Placeholder, actual timing not implemented
.httpEtag(rsp.headers().getFirst("ETag")) .httpEtag(rsp.headers().getFirst("ETag"))
.httpLastModified(rsp.headers().getFirst("Last-Modified")) .httpLastModified(rsp.headers().getFirst("Last-Modified"))
.tsLastPing(Instant.now()) .tsLastPing(now)
.tsLastError(lastError) .tsLastError(lastError)
.tsLastAvailable(Instant.now()) .tsLastAvailable(lastAvailable)
.nextScheduledUpdate(updateTime) .nextScheduledUpdate(updateTime)
.backoffFetchInterval(backoffStrategy.getOkInterval()) .backoffFetchInterval(backoffStrategy.getOkInterval())
.build(); .build();

View File

@@ -18,6 +18,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.security.cert.X509Certificate; import java.security.cert.X509Certificate;
import java.sql.SQLException; import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@@ -75,8 +76,8 @@ public class HttpPingService {
result = pingHttpFetcher.fetchUrl(url, Method.HEAD, null, null); result = pingHttpFetcher.fetchUrl(url, Method.HEAD, null, null);
if (result instanceof HttpsResponse response && response.httpStatus() == 405) { if (result instanceof HttpsResponse response && shouldTryGET(response.httpStatus())) {
// If we get a 405, we try the GET method instead as not all servers support HEAD requests sleep(Duration.ofSeconds(2));
result = pingHttpFetcher.fetchUrl(url, Method.GET, null, null); result = pingHttpFetcher.fetchUrl(url, Method.GET, null, null);
} }
else if (result instanceof ConnectionError) { else if (result instanceof ConnectionError) {
@@ -84,8 +85,8 @@ public class HttpPingService {
if (!(result2 instanceof ConnectionError)) { if (!(result2 instanceof ConnectionError)) {
result = result2; result = result2;
} }
if (result instanceof HttpResponse response && response.httpStatus() == 405) { if (result instanceof HttpResponse response && shouldTryGET(response.httpStatus())) {
// If we get a 405, we try the GET method instead as not all servers support HEAD requests sleep(Duration.ofSeconds(2));
result = pingHttpFetcher.fetchUrl(alternateUrl, Method.GET, null, null); result = pingHttpFetcher.fetchUrl(alternateUrl, Method.GET, null, null);
} }
} }
@@ -116,7 +117,7 @@ public class HttpPingService {
domainReference.nodeId(), domainReference.nodeId(),
oldPingStatus, oldPingStatus,
ErrorClassification.CONNECTION_ERROR, ErrorClassification.CONNECTION_ERROR,
null); rsp.errorMessage());
newSecurityInformation = null; newSecurityInformation = null;
} }
case TimeoutResponse rsp -> { case TimeoutResponse rsp -> {
@@ -190,6 +191,29 @@ public class HttpPingService {
return generatedRecords; return generatedRecords;
} }
private boolean shouldTryGET(int statusCode) {
if (statusCode < 400) {
return false;
}
if (statusCode == 429) { // Too many requests, we should not retry with GET
return false;
}
// For all other status codes, we can try a GET request, as many severs do not
// cope with HEAD requests properly.
return statusCode < 600;
}
private void sleep(Duration duration) {
try {
Thread.sleep(duration.toMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore the interrupted status
logger.warn("Sleep interrupted", e);
}
}
private void comparePingStatuses(List<WritableModel> generatedRecords, private void comparePingStatuses(List<WritableModel> generatedRecords,
DomainAvailabilityRecord oldPingStatus, DomainAvailabilityRecord oldPingStatus,
DomainAvailabilityRecord newPingStatus) { DomainAvailabilityRecord newPingStatus) {

View File

@@ -330,86 +330,6 @@ class PingDaoTest {
svc.write(event); svc.write(event);
} }
@Test
void getNextDomainPingStatuses() throws SQLException {
var svc = new PingDao(dataSource);
// Create a test domain availability record
var record = new DomainAvailabilityRecord(
1,
1,
true,
new byte[]{127, 0, 0, 1},
40,
0x0F00BA32L,
0x0F00BA34L,
HttpSchema.HTTP,
"etag123",
"Wed, 21 Oct 2023 07:28:00 GMT",
200,
"http://example.com/redirect",
Duration.ofMillis(150),
ErrorClassification.NONE,
"No error",
Instant.now().minus(30, ChronoUnit.SECONDS),
Instant.now().minus(3600, ChronoUnit.SECONDS),
Instant.now().minus(7200, ChronoUnit.SECONDS),
Instant.now().minus(3000, ChronoUnit.SECONDS),
2,
Duration.ofSeconds(60)
);
svc.write(record);
// Fetch the next domain ping statuses
var statuses = svc.getNextDomainPingStatuses(10, 1);
assertFalse(statuses.isEmpty());
assertEquals(1, statuses.size());
}
@Test
void getNextDnsDomainRecords() throws SQLException {
var svc = new PingDao(dataSource);
// Create a test DNS record
var dnsRecord = new DomainDnsRecord(null, "example.com", 2,
List.of("test"),
List.of("test2"),
"test3",
List.of("test4"),
List.of("test5"),
List.of("test6"),
List.of("test7"),
"test8",
Instant.now().minus(3600, ChronoUnit.SECONDS),
Instant.now().minus(3600, ChronoUnit.SECONDS),
4);
svc.write(dnsRecord);
var nextRecords = svc.getNextDnsDomainRecords(1, 2);
assertFalse(nextRecords.isEmpty());
assertEquals(1, nextRecords.size());
}
@Test
void getOrphanedDomains(){
var svc = new PingDao(dataSource);
var orphanedDomains = svc.getOrphanedDomains(1);
System.out.println(orphanedDomains);
assertTrue(orphanedDomains.contains(new DomainReference(1, 1, "www.marginalia.nu")));
assertFalse(orphanedDomains.isEmpty());
var orphanedRootDomains = svc.getOrphanedRootDomains(1);
System.out.println(orphanedRootDomains);
assertTrue(orphanedRootDomains.contains("marginalia.nu"));
}
@Test @Test
void write() { void write() {
var dnsEvent = new DomainDnsEvent( var dnsEvent = new DomainDnsEvent(