1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

17 Commits

Author SHA1 Message Date
Viktor Lofgren
9f041d6631 (ping) Drop the concept of primary and secondary ping instances
There was an idea of having the ping service duck over to a realtime partition when the partition is crawling, but this hasn't been working out well, so the concept will be retired and all nodes will run as primary.
2025-06-14 12:32:08 +02:00
Viktor Lofgren
13fb1efce4 (ping) Populate ASN field on DomainSecurityInformation 2025-06-13 15:45:43 +02:00
Viktor Lofgren
c1225165b7 (ping) Add a summary fields CHANGE_SERIAL_NUMBER and CHANGE_ISSUER to DOMAIN_SECURITY_EVENTS 2025-06-13 15:30:45 +02:00
Viktor Lofgren
67ad7a3bbc (ping) Enhance HTTP ping logic to retry GET requests for specific status codes and add sleep duration between retries 2025-06-13 12:59:56 +02:00
Viktor Lofgren
ed62ec8a35 (random) Sanitize random search results with DOMAIN_AVAILABILITY_INFORMATION join 2025-06-13 10:38:21 +02:00
Viktor Lofgren
42b24cfa34 (ping) Fix NPE in dnsJobConsumer 2025-06-12 14:22:09 +02:00
Viktor Lofgren
1ffaab2da6 (ping) Mute logging along the happy path now that things are working 2025-06-12 14:15:23 +02:00
Viktor Lofgren
5f93c7f767 (ping) Update PROC_PING_SPAWNER to use REALTIME from SIDELOAD 2025-06-12 14:04:09 +02:00
Viktor Lofgren
4001c68c82 (ping) Update SQL query to include NODE_AFFINITY in historical availability data retrieval 2025-06-12 13:58:50 +02:00
Viktor Lofgren
6b811489c5 (actor) Make ping spawner auto-spawn the process 2025-06-12 13:46:50 +02:00
Viktor Lofgren
e9d317c65d (ping) Parameterize thread counts for availability and DNS job consumers 2025-06-12 13:34:58 +02:00
Viktor Lofgren
16b05a4737 (ping) Reduce maximum total connections in HttpClientProvider to improve resource management 2025-06-12 13:04:55 +02:00
Viktor Lofgren
021cd73cbb (ping) Reduce db contention by moving job scheduling out of the database to RAM 2025-06-12 12:56:33 +02:00
Viktor Lofgren
4253bd53b5 (ping) Fix issue where errors were not correctly labeled in availability 2025-06-12 00:18:07 +02:00
Viktor Lofgren
14c87461a5 (ping) Fix issue where errors were not correctly labeled in availability 2025-06-12 00:04:39 +02:00
Viktor Lofgren
9afed0a18e (ping) Optimize parameters
Reduce socket and connection timeouts in HttpClient and adjust thread counts for job consumers
2025-06-11 16:21:45 +02:00
Viktor Lofgren
afad4deb94 (ping) Fix DB query to prioritize DNS information updates correctly
This also reduces CPU%
2025-06-11 14:58:28 +02:00
25 changed files with 589 additions and 486 deletions

View File

@@ -0,0 +1,6 @@
-- Add additional summary columns to DOMAIN_SECURITY_EVENTS table
-- to make it easier to make sense of certificate changes
ALTER TABLE DOMAIN_SECURITY_EVENTS ADD COLUMN CHANGE_CERTIFICATE_SERIAL_NUMBER BOOLEAN NOT NULL DEFAULT FALSE;
ALTER TABLE DOMAIN_SECURITY_EVENTS ADD COLUMN CHANGE_CERTIFICATE_ISSUER BOOLEAN NOT NULL DEFAULT FALSE;
OPTIMIZE TABLE DOMAIN_SECURITY_EVENTS;

View File

@@ -12,7 +12,7 @@ public enum ExecutorActor {
RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
RECRAWL_SINGLE_DOMAIN(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), RECRAWL_SINGLE_DOMAIN(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
PROC_CRAWLER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), PROC_CRAWLER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
PROC_PING_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.SIDELOAD), PROC_PING_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.REALTIME),
PROC_EXPORT_TASKS_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), PROC_EXPORT_TASKS_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
ADJACENCY_CALCULATION(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), ADJACENCY_CALCULATION(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
EXPORT_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), EXPORT_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),

View File

@@ -3,24 +3,176 @@ package nu.marginalia.actor.proc;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor; import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import nu.marginalia.actor.state.Terminal;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqapi.ping.PingRequest;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessService;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Singleton @Singleton
public class PingMonitorActor extends AbstractProcessSpawnerActor { public class PingMonitorActor extends RecordActorPrototype {
@Inject private final MqPersistence persistence;
public PingMonitorActor(Gson gson, ServiceConfiguration configuration, MqPersistence persistence, ProcessService processService) { private final ProcessService processService;
super(gson,
configuration, private final Logger logger = LoggerFactory.getLogger(getClass());
persistence,
processService, public static final int MAX_ATTEMPTS = 3;
ProcessInboxNames.PING_INBOX, private final String inboxName;
ProcessService.ProcessId.PING); private final ProcessService.ProcessId processId;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final int node;
private final Gson gson;
public record Initial() implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RETRY)
public record Monitor(int errorAttempts) implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RESTART)
public record Run(int attempts) implements ActorStep {}
@Terminal
public record Aborted() implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Initial i -> {
PingRequest request = new PingRequest();
persistence.sendNewMessage(inboxName, null, null,
"PingRequest",
gson.toJson(request),
null);
yield new Monitor(0);
}
case Monitor(int errorAttempts) -> {
for (;;) {
var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty() && !processService.isRunning(processId)) {
synchronized (processId) {
processId.wait(5000);
}
if (errorAttempts > 0) { // Reset the error counter if there is silence in the inbox
yield new Monitor(0);
}
// else continue
} else {
// Special: Associate this thread with the message so that we can get tracking
MqMessageHandlerRegistry.register(messages.getFirst().msgId());
yield new Run(0);
}
}
}
case Run(int attempts) -> {
try {
long startTime = System.currentTimeMillis();
var exec = new TaskExecution();
long endTime = System.currentTimeMillis();
if (exec.isError()) {
if (attempts < MAX_ATTEMPTS)
yield new Run(attempts + 1);
else
yield new Error();
}
else if (endTime - startTime < TimeUnit.SECONDS.toMillis(1)) {
// To avoid boot loops, we transition to error if the process
// didn't run for longer than 1 seconds. This might happen if
// the process crashes before it can reach the heartbeat and inbox
// stages of execution. In this case it would not report having acted
// on its message, and the process would be restarted forever without
// the attempts counter incrementing.
yield new Error("Process terminated within 1 seconds of starting");
}
}
catch (InterruptedException ex) {
// We get this exception when the process is cancelled by the user
processService.kill(processId);
setCurrentMessageToDead();
yield new Aborted();
}
yield new Monitor(attempts);
}
default -> new Error();
};
} }
public String describe() {
return "Spawns a(n) " + processId + " process and monitors its inbox for messages";
}
@Inject
public PingMonitorActor(Gson gson,
ServiceConfiguration configuration,
MqPersistence persistence,
ProcessService processService) throws SQLException {
super(gson);
this.gson = gson;
this.node = configuration.node();
this.persistence = persistence;
this.processService = processService;
this.inboxName = ProcessInboxNames.PING_INBOX + ":" + node;
this.processId = ProcessService.ProcessId.PING;
}
/** Sets the message to dead in the database to avoid
* the service respawning on the same task when we
* re-enable this actor */
private void setCurrentMessageToDead() {
try {
var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty()) // Possibly a race condition where the task is already finished
return;
var theMessage = messages.iterator().next();
persistence.updateMessageState(theMessage.msgId(), MqMessageState.DEAD);
}
catch (SQLException ex) {
logger.error("Tried but failed to set the message for " + processId + " to dead", ex);
}
}
/** Encapsulates the execution of the process in a separate thread so that
* we can interrupt the thread if the process is cancelled */
private class TaskExecution {
private final AtomicBoolean error = new AtomicBoolean(false);
public TaskExecution() throws ExecutionException, InterruptedException {
// Run this call in a separate thread so that this thread can be interrupted waiting for it
executorService.submit(() -> {
try {
processService.trigger(processId);
} catch (Exception e) {
logger.warn("Error in triggering process", e);
error.set(true);
}
}).get(); // Wait for the process to start
}
public boolean isError() {
return error.get();
}
}
} }

View File

@@ -27,10 +27,12 @@ public class DbBrowseDomainsRandom {
public List<BrowseResult> getRandomDomains(int count, DomainBlacklist blacklist, int set) { public List<BrowseResult> getRandomDomains(int count, DomainBlacklist blacklist, int set) {
final String q = """ final String q = """
SELECT DOMAIN_ID, DOMAIN_NAME, INDEXED SELECT EC_RANDOM_DOMAINS.DOMAIN_ID, DOMAIN_NAME, INDEXED
FROM EC_RANDOM_DOMAINS FROM EC_RANDOM_DOMAINS
INNER JOIN EC_DOMAIN ON EC_DOMAIN.ID=DOMAIN_ID INNER JOIN EC_DOMAIN ON EC_DOMAIN.ID=DOMAIN_ID
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION DAI ON DAI.DOMAIN_ID=EC_RANDOM_DOMAINS.DOMAIN_ID
WHERE STATE<2 WHERE STATE<2
AND SERVER_AVAILABLE
AND DOMAIN_SET=? AND DOMAIN_SET=?
AND DOMAIN_ALIAS IS NULL AND DOMAIN_ALIAS IS NULL
ORDER BY RAND() ORDER BY RAND()

View File

@@ -15,6 +15,7 @@ import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Objects;
@Singleton @Singleton
public class PingDao { public class PingDao {
@@ -76,32 +77,6 @@ public class PingDao {
} }
} }
public List<DomainReference> getNewDomains(int nodeId, int cnt) throws SQLException {
List<DomainReference> domains = new ArrayList<>();
try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement("""
SELECT domain_id, domain_name
FROM EC_DOMAIN
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION
ON EC_DOMAIN.domain_id = DOMAIN_AVAILABILITY_INFORMATION.domain_id
WHERE DOMAIN_AVAILABILITY_INFORMATION.server_available IS NULL
AND EC_DOMAIN.NODE_ID = ?
LIMIT ?
"""))
{
ps.setInt(1, nodeId);
ps.setInt(2, cnt);
ResultSet rs = ps.executeQuery();
while (rs.next()) {
domains.add(new DomainReference(rs.getInt("domain_id"), nodeId, rs.getString("domain_name").toLowerCase()));
}
}
return domains;
}
public DomainAvailabilityRecord getDomainPingStatus(int domainId) throws SQLException { public DomainAvailabilityRecord getDomainPingStatus(int domainId) throws SQLException {
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
@@ -132,7 +107,7 @@ public class PingDao {
} }
} }
public DomainDnsRecord getDomainDnsRecord(int dnsRootDomainId) throws SQLException { public DomainDnsRecord getDomainDnsRecord(long dnsRootDomainId) throws SQLException {
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement("SELECT * FROM DOMAIN_DNS_INFORMATION WHERE DNS_ROOT_DOMAIN_ID = ?")) { var ps = conn.prepareStatement("SELECT * FROM DOMAIN_DNS_INFORMATION WHERE DNS_ROOT_DOMAIN_ID = ?")) {
@@ -160,111 +135,123 @@ public class PingDao {
} }
} }
public List<HistoricalAvailabilityData> getNextDomainPingStatuses(int count, int nodeId) throws SQLException { public HistoricalAvailabilityData getHistoricalAvailabilityData(long domainId) throws SQLException {
List<HistoricalAvailabilityData> domainAvailabilityRecords = new ArrayList<>(count);
var query = """ var query = """
SELECT DOMAIN_AVAILABILITY_INFORMATION.*, DOMAIN_SECURITY_INFORMATION.*, EC_DOMAIN.DOMAIN_NAME FROM DOMAIN_AVAILABILITY_INFORMATION SELECT EC_DOMAIN.ID, EC_DOMAIN.DOMAIN_NAME, EC_DOMAIN.NODE_AFFINITY, DOMAIN_AVAILABILITY_INFORMATION.*, DOMAIN_SECURITY_INFORMATION.*
LEFT JOIN DOMAIN_SECURITY_INFORMATION FROM EC_DOMAIN
ON DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID = DOMAIN_SECURITY_INFORMATION.DOMAIN_ID LEFT JOIN DOMAIN_SECURITY_INFORMATION ON DOMAIN_SECURITY_INFORMATION.DOMAIN_ID = EC_DOMAIN.ID
INNER JOIN EC_DOMAIN ON EC_DOMAIN.ID = DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION ON DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID = EC_DOMAIN.ID
WHERE NEXT_SCHEDULED_UPDATE <= ? AND DOMAIN_AVAILABILITY_INFORMATION.NODE_ID = ? WHERE EC_DOMAIN.ID = ?
ORDER BY NEXT_SCHEDULED_UPDATE ASC
LIMIT ?
"""; """;
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement(query)) { var ps = conn.prepareStatement(query)) {
// Use Java time since this is how we generate the timestamps in the ping process
// to avoid timezone weirdness. ps.setLong(1, domainId);
ps.setTimestamp(1, java.sql.Timestamp.from(Instant.now()));
ps.setInt(2, nodeId);
ps.setInt(3, count);
ResultSet rs = ps.executeQuery(); ResultSet rs = ps.executeQuery();
while (rs.next()) { while (rs.next()) {
String domainName = rs.getString("EC_DOMAIN.DOMAIN_NAME"); String domainName = rs.getString("EC_DOMAIN.DOMAIN_NAME");
var domainAvailabilityRecord = new DomainAvailabilityRecord(rs);
if (rs.getObject("DOMAIN_SECURITY_INFORMATION.DOMAIN_ID", Integer.class) != null) { DomainAvailabilityRecord dar;
var securityRecord = new DomainSecurityRecord(rs); DomainSecurityRecord dsr;
domainAvailabilityRecords.add(
new HistoricalAvailabilityData.AvailabilityAndSecurity(domainName, domainAvailabilityRecord, securityRecord) if (rs.getObject("DOMAIN_SECURITY_INFORMATION.DOMAIN_ID", Integer.class) != null)
); dsr = new DomainSecurityRecord(rs);
} else { else
domainAvailabilityRecords.add(new HistoricalAvailabilityData.JustAvailability(domainName, domainAvailabilityRecord)); dsr = null;
if (rs.getObject("DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID", Integer.class) != null)
dar = new DomainAvailabilityRecord(rs);
else
dar = null;
if (dar == null) {
return new HistoricalAvailabilityData.JustDomainReference(new DomainReference(
rs.getInt("EC_DOMAIN.ID"),
rs.getInt("EC_DOMAIN.NODE_AFFINITY"),
domainName.toLowerCase()
));
}
else {
if (dsr != null) {
return new HistoricalAvailabilityData.AvailabilityAndSecurity(domainName, dar, dsr);
} else {
return new HistoricalAvailabilityData.JustAvailability(domainName, dar);
}
} }
} }
} }
return domainAvailabilityRecords;
return null;
} }
public List<DomainDnsRecord> getNextDnsDomainRecords(int count, int nodeId) throws SQLException { public List<UpdateSchedule.UpdateJob<Long, HistoricalAvailabilityData>> getDomainUpdateSchedule(int nodeId) {
List<DomainDnsRecord> domainDnsRecords = new ArrayList<>(count); List<UpdateSchedule.UpdateJob<Long, HistoricalAvailabilityData>> updateJobs = new ArrayList<>();
var query = """
SELECT * FROM DOMAIN_DNS_INFORMATION
WHERE TS_NEXT_DNS_CHECK <= ? AND NODE_AFFINITY = ?
ORDER BY DNS_CHECK_PRIORITY ASC, TS_NEXT_DNS_CHECK ASC
LIMIT ?
""";
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement(query)) { var ps = conn.prepareStatement("""
ps.setTimestamp(1, java.sql.Timestamp.from(Instant.now())); SELECT ID, NEXT_SCHEDULED_UPDATE
ps.setInt(2, nodeId); FROM EC_DOMAIN
ps.setInt(3, count); LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION
ON EC_DOMAIN.ID = DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID
WHERE NODE_AFFINITY = ?
""")) {
ps.setFetchSize(10_000);
ps.setInt(1, nodeId);
ResultSet rs = ps.executeQuery(); ResultSet rs = ps.executeQuery();
while (rs.next()) { while (rs.next()) {
domainDnsRecords.add(new DomainDnsRecord(rs)); long domainId = rs.getLong("ID");
var ts = rs.getTimestamp("NEXT_SCHEDULED_UPDATE");
Instant nextUpdate = ts == null ? Instant.now() : ts.toInstant();
updateJobs.add(new UpdateSchedule.UpdateJob<>(domainId, nextUpdate));
} }
} catch (SQLException e) {
throw new RuntimeException("Failed to retrieve domain update schedule", e);
} }
return domainDnsRecords;
logger.info("Found {} availability update jobs for node {}", updateJobs.size(), nodeId);
return updateJobs;
} }
public List<DomainReference> getOrphanedDomains(int nodeId) { public List<UpdateSchedule.UpdateJob<RootDomainReference, RootDomainReference>> getDnsUpdateSchedule(int nodeId) {
List<DomainReference> orphanedDomains = new ArrayList<>(); List<UpdateSchedule.UpdateJob<RootDomainReference, RootDomainReference>> updateJobs = new ArrayList<>();
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement(""" var ps = conn.prepareStatement("""
SELECT e.DOMAIN_NAME, e.ID SELECT DISTINCT(DOMAIN_TOP),DOMAIN_DNS_INFORMATION.* FROM EC_DOMAIN
FROM EC_DOMAIN e LEFT JOIN DOMAIN_DNS_INFORMATION ON ROOT_DOMAIN_NAME = DOMAIN_TOP
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION d ON e.ID = d.DOMAIN_ID WHERE EC_DOMAIN.NODE_AFFINITY = ?
WHERE d.DOMAIN_ID IS NULL AND e.NODE_AFFINITY = ?;
""")) { """)) {
stmt.setInt(1, nodeId); ps.setFetchSize(10_000);
stmt.setFetchSize(10_000); ps.setInt(1, nodeId);
ResultSet rs = stmt.executeQuery(); ResultSet rs = ps.executeQuery();
while (rs.next()) { while (rs.next()) {
String domainName = rs.getString("DOMAIN_NAME"); Long dnsRootDomainId = rs.getObject("DOMAIN_DNS_INFORMATION.DNS_ROOT_DOMAIN_ID", Long.class);
int domainId = rs.getInt("ID"); String rootDomainName = rs.getString("DOMAIN_TOP");
orphanedDomains.add(new DomainReference(domainId, nodeId, domainName)); if (dnsRootDomainId == null) {
updateJobs.add(
new UpdateSchedule.UpdateJob<>(
new RootDomainReference.ByName(rootDomainName),
Instant.now())
);
}
else {
var record = new DomainDnsRecord(rs);
updateJobs.add(new UpdateSchedule.UpdateJob<>(
new RootDomainReference.ById(dnsRootDomainId),
Objects.requireNonNullElseGet(record.tsNextScheduledUpdate(), Instant::now))
);
}
} }
} } catch (SQLException e) {
catch (SQLException e) { throw new RuntimeException("Failed to retrieve DNS update schedule", e);
throw new RuntimeException("Failed to retrieve orphaned domains", e);
} }
return orphanedDomains; logger.info("Found {} dns update jobs for node {}", updateJobs.size(), nodeId);
}
public List<String> getOrphanedRootDomains(int nodeId) { return updateJobs;
List<String> orphanedDomains = new ArrayList<>();
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT DISTINCT(DOMAIN_TOP)
FROM EC_DOMAIN e
LEFT JOIN DOMAIN_DNS_INFORMATION d ON e.DOMAIN_TOP = d.ROOT_DOMAIN_NAME
WHERE d.ROOT_DOMAIN_NAME IS NULL AND e.NODE_AFFINITY = ?;
""")) {
stmt.setInt(1, nodeId);
stmt.setFetchSize(10_000);
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
String domainName = rs.getString("DOMAIN_TOP");
orphanedDomains.add(domainName.toLowerCase());
}
}
catch (SQLException e) {
throw new RuntimeException("Failed to retrieve orphaned domains", e);
}
return orphanedDomains;
} }
} }

View File

@@ -4,15 +4,15 @@ import com.google.inject.Inject;
import nu.marginalia.ping.model.*; import nu.marginalia.ping.model.*;
import nu.marginalia.ping.svc.DnsPingService; import nu.marginalia.ping.svc.DnsPingService;
import nu.marginalia.ping.svc.HttpPingService; import nu.marginalia.ping.svc.HttpPingService;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ArrayBlockingQueue; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** PingJobScheduler is responsible for scheduling and processing ping jobs /** PingJobScheduler is responsible for scheduling and processing ping jobs
@@ -27,50 +27,13 @@ public class PingJobScheduler {
private static final Logger logger = LoggerFactory.getLogger(PingJobScheduler.class); private static final Logger logger = LoggerFactory.getLogger(PingJobScheduler.class);
sealed interface DnsJob { private static final UpdateSchedule<RootDomainReference, RootDomainReference> dnsUpdateSchedule
Object reference(); = new UpdateSchedule<>(250_000);
private static final UpdateSchedule<Long, HistoricalAvailabilityData> availabilityUpdateSchedule
= new UpdateSchedule<>(250_000);
record DnsFetch(String rootDomain) implements DnsJob { public volatile Instant dnsLastSync = Instant.now();
@Override public volatile Instant availabilityLastSync = Instant.now();
public Object reference() {
return rootDomain;
}
}
record DnsRefresh(DomainDnsRecord oldRecord) implements DnsJob {
@Override
public Object reference() {
return oldRecord.rootDomainName();
}
}
}
sealed interface AvailabilityJob {
Object reference();
record Availability(DomainReference domainReference) implements AvailabilityJob {
@Override
public Object reference() {
return domainReference.domainName();
}
}
record AvailabilityRefresh(String domain, @NotNull DomainAvailabilityRecord availability, @Nullable DomainSecurityRecord securityRecord) implements AvailabilityJob {
@Override
public Object reference() {
return domain;
}
}
}
// Keeps track of ongoing ping and DNS processing to avoid duplicate work,
// which is mainly a scenario that will occur when there is not a lot of data
// in the database. In real-world scenarios, the queues will be full most
// of the time, and prevent this from being an issue.
private static final ConcurrentHashMap<Object, Boolean> processingDomainsAvailability = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<Object, Boolean> processingDomainsDns = new ConcurrentHashMap<>();
private static final ArrayBlockingQueue<DnsJob> dnsJobQueue = new ArrayBlockingQueue<>(8);
private static final ArrayBlockingQueue<AvailabilityJob> availabilityJobQueue = new ArrayBlockingQueue<>(8);
public volatile Integer nodeId = null; public volatile Integer nodeId = null;
public volatile boolean running = false; public volatile boolean running = false;
@@ -95,15 +58,16 @@ public class PingJobScheduler {
running = true; running = true;
allThreads.add(Thread.ofPlatform().daemon().name("new-dns").start(this::fetchNewDnsRecords)); allThreads.add(Thread.ofPlatform().daemon().name("sync-dns").start(this::syncAvailabilityJobs));
allThreads.add(Thread.ofPlatform().daemon().name("new-availability").start(this::fetchNewAvailabilityJobs)); allThreads.add(Thread.ofPlatform().daemon().name("sync-availability").start(this::syncDnsRecords));
allThreads.add(Thread.ofPlatform().daemon().name("update-availability").start(this::updateAvailabilityJobs));
allThreads.add(Thread.ofPlatform().daemon().name("update-dns").start(this::updateDnsJobs));
for (int i = 0; i < 8; i++) { int availabilityThreads = Integer.getInteger("ping.availabilityThreads", 8);
int pingThreads = Integer.getInteger("ping.dnsThreads", 2);
for (int i = 0; i < availabilityThreads; i++) {
allThreads.add(Thread.ofPlatform().daemon().name("availability-job-consumer-" + i).start(this::availabilityJobConsumer)); allThreads.add(Thread.ofPlatform().daemon().name("availability-job-consumer-" + i).start(this::availabilityJobConsumer));
} }
for (int i = 0; i < 2; i++) { for (int i = 0; i < pingThreads; i++) {
allThreads.add(Thread.ofPlatform().daemon().name("dns-job-consumer-" + i).start(this::dnsJobConsumer)); allThreads.add(Thread.ofPlatform().daemon().name("dns-job-consumer-" + i).start(this::dnsJobConsumer));
} }
} }
@@ -122,19 +86,33 @@ public class PingJobScheduler {
} }
public void pause(int nodeId) { public void pause(int nodeId) {
logger.info("Pausing PingJobScheduler for nodeId: {}", nodeId);
if (this.nodeId != null && this.nodeId != nodeId) { if (this.nodeId != null && this.nodeId != nodeId) {
logger.warn("Attempted to pause PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId); logger.warn("Attempted to pause PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId);
return; return;
} }
this.nodeId = null; this.nodeId = null;
availabilityUpdateSchedule.clear();
dnsUpdateSchedule.clear();
logger.info("PingJobScheduler paused"); logger.info("PingJobScheduler paused");
} }
public synchronized void resume(int nodeId) { public synchronized void resume(int nodeId) {
logger.info("Resuming PingJobScheduler for nodeId: {}", nodeId);
if (this.nodeId != null) { if (this.nodeId != null) {
logger.warn("Attempted to resume PingJobScheduler with mismatched nodeId: expected null, got {}", this.nodeId, nodeId); logger.warn("Attempted to resume PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId);
return; return;
} }
availabilityUpdateSchedule.replaceQueue(pingDao.getDomainUpdateSchedule(nodeId));
dnsUpdateSchedule.replaceQueue(pingDao.getDnsUpdateSchedule(nodeId));
dnsLastSync = Instant.now();
availabilityLastSync = Instant.now();
// Flag that we are running again
this.nodeId = nodeId; this.nodeId = nodeId;
notifyAll(); notifyAll();
@@ -150,32 +128,44 @@ public class PingJobScheduler {
private void availabilityJobConsumer() { private void availabilityJobConsumer() {
while (running) { while (running) {
try { try {
AvailabilityJob job = availabilityJobQueue.poll(1, TimeUnit.SECONDS); Integer nid = nodeId;
if (job == null) { if (nid == null) {
continue; // No job available, continue to the next iteration waitForResume();
continue;
}
long nextId = availabilityUpdateSchedule.next();
var data = pingDao.getHistoricalAvailabilityData(nextId);
if (data == null) {
logger.warn("No availability data found for ID: {}", nextId);
continue; // No data to process, skip this iteration
} }
try { try {
switch (job) { List<WritableModel> objects = switch (data) {
case AvailabilityJob.Availability(DomainReference reference) -> { case HistoricalAvailabilityData.JustDomainReference(DomainReference reference)
logger.info("Availability check: {}", reference.domainName()); -> httpPingService.pingDomain(reference, null, null);
pingDao.write(httpPingService.pingDomain(reference, null, null)); case HistoricalAvailabilityData.JustAvailability(String domain, DomainAvailabilityRecord record)
} -> httpPingService.pingDomain(
case AvailabilityJob.AvailabilityRefresh(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security) -> { new DomainReference(record.domainId(), record.nodeId(), domain), record, null);
logger.info("Availability check with reference: {}", domain); case HistoricalAvailabilityData.AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security)
pingDao.write(httpPingService.pingDomain( -> httpPingService.pingDomain(
new DomainReference(availability.domainId(), availability.nodeId(), domain), new DomainReference(availability.domainId(), availability.nodeId(), domain), availability, security);
availability, };
security));
pingDao.write(objects);
// Re-schedule the next update time for the domain
for (var object : objects) {
var ts = object.nextUpdateTime();
if (ts != null) {
availabilityUpdateSchedule.add(nextId, ts);
break;
} }
} }
} }
catch (Exception e) { catch (Exception e) {
logger.error("Error processing availability job for domain: " + job.reference(), e); logger.error("Error processing availability job for domain: " + data.domain(), e);
}
finally {
// Remove the domain from the processing map
processingDomainsAvailability.remove(job.reference());
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@@ -190,30 +180,41 @@ public class PingJobScheduler {
private void dnsJobConsumer() { private void dnsJobConsumer() {
while (running) { while (running) {
try { try {
DnsJob job = dnsJobQueue.poll(1, TimeUnit.SECONDS); Integer nid = nodeId;
if (job == null) { if (nid == null) {
continue; // No job available, continue to the next iteration waitForResume();
continue;
} }
RootDomainReference ref = dnsUpdateSchedule.next();
try { try {
switch (job) { List<WritableModel> objects = switch(ref) {
case DnsJob.DnsFetch(String rootDomain) -> { case RootDomainReference.ById(long id) -> {
logger.info("Fetching DNS records for root domain: {}", rootDomain); var oldRecord = Objects.requireNonNull(pingDao.getDomainDnsRecord(id));
pingDao.write(dnsPingService.pingDomain(rootDomain, null)); yield dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord);
} }
case DnsJob.DnsRefresh(DomainDnsRecord oldRecord) -> { case RootDomainReference.ByName(String name) -> {
logger.info("Refreshing DNS records for domain: {}", oldRecord.rootDomainName()); @Nullable var oldRecord = pingDao.getDomainDnsRecord(name);
pingDao.write(dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord)); yield dnsPingService.pingDomain(name, oldRecord);
}
};
pingDao.write(objects);
// Re-schedule the next update time for the domain
for (var object : objects) {
var ts = object.nextUpdateTime();
if (ts != null) {
dnsUpdateSchedule.add(ref, ts);
break;
} }
} }
} }
catch (Exception e) { catch (Exception e) {
logger.error("Error processing DNS job for domain: " + job.reference(), e); logger.error("Error processing DNS job for domain: " + ref, e);
}
finally {
// Remove the domain from the processing map
processingDomainsDns.remove(job.reference());
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
logger.error("DNS job consumer interrupted", e); logger.error("DNS job consumer interrupted", e);
@@ -224,38 +225,27 @@ public class PingJobScheduler {
} }
} }
private void fetchNewAvailabilityJobs() { private void syncAvailabilityJobs() {
try { try {
while (running) { while (running) {
// If we are suspended, wait for resume
Integer nid = nodeId; Integer nid = nodeId;
if (nid == null) { if (nid == null) {
waitForResume(); waitForResume();
continue; // re-fetch the records after resuming continue;
} }
List<DomainReference> domains = pingDao.getOrphanedDomains(nid); // Check if we need to refresh the availability data
for (DomainReference domain : domains) { Instant nextRefresh = availabilityLastSync.plus(Duration.ofHours(24));
if (Instant.now().isBefore(nextRefresh)) {
if (nodeId == null) { Duration remaining = Duration.between(Instant.now(), nextRefresh);
waitForResume(); TimeUnit.MINUTES.sleep(Math.max(1, remaining.toMinutes()));
break; // re-fetch the records after resuming continue;
}
try {
availabilityJobQueue.put(new AvailabilityJob.Availability(domain));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Failed to add new ping job for domain: " + domain, e);
}
} }
// This is an incredibly expensive operation, so we only do it once a day availabilityUpdateSchedule.replaceQueue(pingDao.getDomainUpdateSchedule(nid));
try { availabilityLastSync = Instant.now();
TimeUnit.HOURS.sleep(24);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} }
} }
catch (Exception e) { catch (Exception e) {
@@ -263,32 +253,26 @@ public class PingJobScheduler {
} }
} }
private void fetchNewDnsRecords() { private void syncDnsRecords() {
try { try {
while (running) { while (running) {
Integer nid = nodeId; Integer nid = nodeId;
if (nid == null) { if (nid == null) {
waitForResume(); waitForResume();
continue; // re-fetch the records after resuming continue; // re-fetch the records after resuming
} }
List<String> rootDomains = pingDao.getOrphanedRootDomains(nid); // Check if we need to refresh the availability data
for (String rootDomain : rootDomains) { Instant nextRefresh = dnsLastSync.plus(Duration.ofHours(24));
if (Instant.now().isBefore(nextRefresh)) {
if (nodeId == null) { Duration remaining = Duration.between(Instant.now(), nextRefresh);
waitForResume(); TimeUnit.MINUTES.sleep(Math.max(1, remaining.toMinutes()));
break; // re-fetch the records after resuming continue;
}
try {
dnsJobQueue.put(new DnsJob.DnsFetch(rootDomain));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Failed to add new DNS job for root domain: " + rootDomain, e);
}
} }
// This is an incredibly expensive operation, so we only do it once a day
TimeUnit.HOURS.sleep(24); dnsUpdateSchedule.replaceQueue(pingDao.getDnsUpdateSchedule(nid));
dnsLastSync = Instant.now();
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@@ -296,68 +280,5 @@ public class PingJobScheduler {
} }
} }
private void updateAvailabilityJobs() {
while (running) {
try {
Integer nid = nodeId;
if (nid == null) {
waitForResume();
continue; // re-fetch the records after resuming
}
var statuses = pingDao.getNextDomainPingStatuses(100, nid);
if (nodeId == null) {
waitForResume();
break; // re-fetch the records after resuming
}
for (var status : statuses) {
var job = switch (status) {
case HistoricalAvailabilityData.JustAvailability(String domain, DomainAvailabilityRecord record)
-> new AvailabilityJob.AvailabilityRefresh(domain, record, null);
case HistoricalAvailabilityData.AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security)
-> new AvailabilityJob.AvailabilityRefresh(domain, availability, security);
};
if (processingDomainsAvailability.putIfAbsent(job.reference(), true) == null) {
availabilityJobQueue.put(job);
}
}
}
catch (Exception e) {
logger.error("Error fetching next domain ping statuses", e);
}
}
}
private void updateDnsJobs() {
while (running) {
try {
Integer nid = nodeId;
if (nid == null) {
waitForResume();
continue; // re-fetch the records after resuming
}
var dnsRecords = pingDao.getNextDnsDomainRecords(1000, nid);
for (var record : dnsRecords) {
if (nodeId == null) {
waitForResume();
break; // re-fetch the records after resuming
}
if (processingDomainsDns.putIfAbsent(record.rootDomainName(), true) == null) {
dnsJobQueue.put(new DnsJob.DnsRefresh(record));
}
}
}
catch (Exception e) {
logger.error("Error fetching next domain DNS records", e);
}
}
}
} }

View File

@@ -10,7 +10,6 @@ import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqapi.ping.PingRequest; import nu.marginalia.mqapi.ping.PingRequest;
import nu.marginalia.nodecfg.NodeConfigurationService; import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.nodecfg.model.NodeConfiguration;
import nu.marginalia.process.ProcessConfiguration; import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule; import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass; import nu.marginalia.process.ProcessMainClass;
@@ -21,7 +20,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.security.Security; import java.security.Security;
import java.util.List;
public class PingMain extends ProcessMainClass { public class PingMain extends ProcessMainClass {
private static final Logger log = LoggerFactory.getLogger(PingMain.class); private static final Logger log = LoggerFactory.getLogger(PingMain.class);
@@ -56,56 +54,6 @@ public class PingMain extends ProcessMainClass {
// Start the ping job scheduler // Start the ping job scheduler
pingJobScheduler.start(true); pingJobScheduler.start(true);
// Watch the crawler process to suspend/resume the ping job scheduler
try {
serviceRegistry.watchProcess("crawler", node, (running) -> {
if (running) {
log.info("Crawler process is running, suspending ping job scheduler.");
pingJobScheduler.pause(node);
} else {
log.warn("Crawler process is not running, resuming ping job scheduler.");
pingJobScheduler.resume(node);
}
});
}
catch (Exception e) {
throw new RuntimeException("Failed to watch crawler process", e);
}
log.info("PingMain started successfully.");
}
public void runSecondary() {
log.info("Starting PingMain...");
List<Integer> crawlerNodes = nodeConfigurationService.getAll()
.stream()
.filter(node -> !node.disabled())
.filter(node -> node.profile().permitBatchCrawl())
.map(NodeConfiguration::node)
.toList()
;
// Start the ping job scheduler
pingJobScheduler.start(true);
// Watch the crawler process to suspend/resume the ping job scheduler
try {
serviceRegistry.watchProcessAnyNode("crawler", crawlerNodes, (running, n) -> {
if (running) {
log.info("Crawler process is running on node {} taking over ", n);
pingJobScheduler.resume(n);
} else {
log.warn("Crawler process stopped, resuming ping job scheduler.");
pingJobScheduler.pause(n);
}
});
}
catch (Exception e) {
throw new RuntimeException("Failed to watch crawler process", e);
}
log.info("PingMain started successfully."); log.info("PingMain started successfully.");
} }
@@ -144,19 +92,8 @@ public class PingMain extends ProcessMainClass {
var instructions = main.fetchInstructions(PingRequest.class); var instructions = main.fetchInstructions(PingRequest.class);
try { try {
switch (instructions.value().runClass) { main.runPrimary();
case "primary": for(;;) main.wait(); // Wait on the object lock to avoid busy-looping
log.info("Running as primary node");
main.runPrimary();
break;
case "secondary":
log.info("Running as secondary node");
main.runSecondary();
break;
default:
throw new IllegalArgumentException("Invalid runClass: " + instructions.value().runClass);
}
for(;;);
} }
catch (Throwable ex) { catch (Throwable ex) {
logger.error("Error running ping process", ex); logger.error("Error running ping process", ex);

View File

@@ -0,0 +1,57 @@
package nu.marginalia.ping;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Comparator;
import java.util.PriorityQueue;
/** In-memory schedule for updates, allowing jobs to be added and processed in order of their scheduled time.
* This is not a particularly high-performance implementation, but exists to take contention off the database's
* timestamp index.
* */
public class UpdateSchedule<T, T2> {
private final PriorityQueue<UpdateJob<T, T2>> updateQueue;
public record UpdateJob<T, T2>(T key, Instant updateTime) {}
public UpdateSchedule(int initialCapacity) {
updateQueue = new PriorityQueue<>(initialCapacity, Comparator.comparing(UpdateJob::updateTime));
}
public synchronized void add(T key, Instant updateTime) {
updateQueue.add(new UpdateJob<>(key, updateTime));
notifyAll();
}
public synchronized T next() throws InterruptedException {
while (true) {
if (updateQueue.isEmpty()) {
wait(); // Wait for a new job to be added
continue;
}
UpdateJob<T, T2> job = updateQueue.peek();
Instant now = Instant.now();
if (job.updateTime.isAfter(now)) {
Duration toWait = Duration.between(now, job.updateTime);
wait(Math.max(1, toWait.toMillis()));
}
else {
updateQueue.poll(); // Remove the job from the queue since it's due
return job.key();
}
}
}
public synchronized void clear() {
updateQueue.clear();
notifyAll();
}
public synchronized void replaceQueue(Collection<UpdateJob<T,T2>> newJobs) {
updateQueue.clear();
updateQueue.addAll(newJobs);
notifyAll();
}
}

View File

@@ -3,6 +3,8 @@ package nu.marginalia.ping.fetcher;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.name.Named; import com.google.inject.name.Named;
import nu.marginalia.ping.model.SingleDnsRecord; import nu.marginalia.ping.model.SingleDnsRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xbill.DNS.ExtendedResolver; import org.xbill.DNS.ExtendedResolver;
import org.xbill.DNS.Lookup; import org.xbill.DNS.Lookup;
import org.xbill.DNS.TextParseException; import org.xbill.DNS.TextParseException;
@@ -17,6 +19,7 @@ import java.util.concurrent.*;
public class PingDnsFetcher { public class PingDnsFetcher {
private final ThreadLocal<ExtendedResolver> resolver; private final ThreadLocal<ExtendedResolver> resolver;
private static final ExecutorService digExecutor = Executors.newFixedThreadPool(100); private static final ExecutorService digExecutor = Executors.newFixedThreadPool(100);
private static final Logger logger = LoggerFactory.getLogger(PingDnsFetcher.class);
private static final int[] RECORD_TYPES = { private static final int[] RECORD_TYPES = {
Type.A, Type.AAAA, Type.NS, Type.MX, Type.TXT, Type.A, Type.AAAA, Type.NS, Type.MX, Type.TXT,
@@ -25,8 +28,7 @@ public class PingDnsFetcher {
@Inject @Inject
public PingDnsFetcher(@Named("ping.nameservers") public PingDnsFetcher(@Named("ping.nameservers")
List<String> nameservers) throws UnknownHostException List<String> nameservers) {
{
resolver = ThreadLocal.withInitial(() -> createResolver(nameservers)); resolver = ThreadLocal.withInitial(() -> createResolver(nameservers));
} }
@@ -81,13 +83,12 @@ public class PingDnsFetcher {
try { try {
results.addAll(future.get(1, TimeUnit.MINUTES)); results.addAll(future.get(1, TimeUnit.MINUTES));
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); logger.error("Error fetching DNS records", e);
System.err.println("Error fetching DNS records: " + e.getMessage());
} }
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
System.err.println("DNS query interrupted: " + e.getMessage()); logger.error("DNS query interrupted", e);
} }
return results; return results;
} }

View File

@@ -83,7 +83,7 @@ public class PingHttpFetcher {
} catch (SocketTimeoutException ex) { } catch (SocketTimeoutException ex) {
return new TimeoutResponse(ex.getMessage()); return new TimeoutResponse(ex.getMessage());
} catch (IOException e) { } catch (IOException e) {
return new ConnectionError(e.getMessage()); return new ConnectionError(e.getClass().getSimpleName());
} }
} }

View File

@@ -44,14 +44,14 @@ public class HttpClientProvider implements Provider<HttpClient> {
private static CloseableHttpClient createClient() throws NoSuchAlgorithmException { private static CloseableHttpClient createClient() throws NoSuchAlgorithmException {
final ConnectionConfig connectionConfig = ConnectionConfig.custom() final ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setSocketTimeout(30, TimeUnit.SECONDS) .setSocketTimeout(15, TimeUnit.SECONDS)
.setConnectTimeout(30, TimeUnit.SECONDS) .setConnectTimeout(15, TimeUnit.SECONDS)
.setValidateAfterInactivity(TimeValue.ofSeconds(5)) .setValidateAfterInactivity(TimeValue.ofSeconds(5))
.build(); .build();
connectionManager = PoolingHttpClientConnectionManagerBuilder.create() connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
.setMaxConnPerRoute(2) .setMaxConnPerRoute(2)
.setMaxConnTotal(5000) .setMaxConnTotal(50)
.setDefaultConnectionConfig(connectionConfig) .setDefaultConnectionConfig(connectionConfig)
.setTlsSocketStrategy( .setTlsSocketStrategy(
new DefaultClientTlsStrategy(SSLContext.getDefault(), NoopHostnameVerifier.INSTANCE)) new DefaultClientTlsStrategy(SSLContext.getDefault(), NoopHostnameVerifier.INSTANCE))

View File

@@ -70,6 +70,11 @@ implements WritableModel
return millis == null ? null : Duration.ofMillis(millis); return millis == null ? null : Duration.ofMillis(millis);
} }
@Override
public Instant nextUpdateTime() {
return nextScheduledUpdate;
}
@Override @Override
public void write(Connection connection) throws SQLException { public void write(Connection connection) throws SQLException {
try (var ps = connection.prepareStatement( try (var ps = connection.prepareStatement(

View File

@@ -60,6 +60,11 @@ public record DomainDnsRecord(
return new Builder(); return new Builder();
} }
@Override
public Instant nextUpdateTime() {
return tsNextScheduledUpdate;
}
@Override @Override
public void write(Connection connection) throws SQLException { public void write(Connection connection) throws SQLException {

View File

@@ -16,6 +16,8 @@ public record DomainSecurityEvent(
boolean certificateProfileChanged, boolean certificateProfileChanged,
boolean certificateSanChanged, boolean certificateSanChanged,
boolean certificatePublicKeyChanged, boolean certificatePublicKeyChanged,
boolean certificateSerialNumberChanged,
boolean certificateIssuerChanged,
Duration oldCertificateTimeToExpiry, Duration oldCertificateTimeToExpiry,
boolean securityHeadersChanged, boolean securityHeadersChanged,
boolean ipChanged, boolean ipChanged,
@@ -41,8 +43,10 @@ public record DomainSecurityEvent(
change_software, change_software,
old_cert_time_to_expiry, old_cert_time_to_expiry,
security_signature_before, security_signature_before,
security_signature_after security_signature_after,
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?) change_certificate_serial_number,
change_certificate_issuer
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""")) """))
{ {
@@ -75,6 +79,9 @@ public record DomainSecurityEvent(
ps.setBytes(14, securitySignatureAfter().compressed()); ps.setBytes(14, securitySignatureAfter().compressed());
} }
ps.setBoolean(15, certificateSerialNumberChanged());
ps.setBoolean(16, certificateIssuerChanged());
ps.executeUpdate(); ps.executeUpdate();
} }
} }

View File

@@ -203,7 +203,6 @@ public record DomainSecurityRecord(
if (sslCertFingerprintSha256() == null) { if (sslCertFingerprintSha256() == null) {
ps.setNull(12, java.sql.Types.BINARY); ps.setNull(12, java.sql.Types.BINARY);
} else { } else {
System.out.println(sslCertFingerprintSha256().length);
ps.setBytes(12, sslCertFingerprintSha256()); ps.setBytes(12, sslCertFingerprintSha256());
} }
if (sslCertSan() == null) { if (sslCertSan() == null) {

View File

@@ -1,6 +1,13 @@
package nu.marginalia.ping.model; package nu.marginalia.ping.model;
public sealed interface HistoricalAvailabilityData { public sealed interface HistoricalAvailabilityData {
public String domain();
record JustDomainReference(DomainReference domainReference) implements HistoricalAvailabilityData {
@Override
public String domain() {
return domainReference.domainName();
}
}
record JustAvailability(String domain, DomainAvailabilityRecord record) implements HistoricalAvailabilityData {} record JustAvailability(String domain, DomainAvailabilityRecord record) implements HistoricalAvailabilityData {}
record AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availabilityRecord, DomainSecurityRecord securityRecord) implements HistoricalAvailabilityData {} record AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availabilityRecord, DomainSecurityRecord securityRecord) implements HistoricalAvailabilityData {}
} }

View File

@@ -0,0 +1,6 @@
package nu.marginalia.ping.model;
public sealed interface RootDomainReference {
record ById(long id) implements RootDomainReference { }
record ByName(String name) implements RootDomainReference { }
}

View File

@@ -1,8 +1,14 @@
package nu.marginalia.ping.model; package nu.marginalia.ping.model;
import javax.annotation.Nullable;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.time.Instant;
public interface WritableModel { public interface WritableModel {
void write(Connection connection) throws SQLException; void write(Connection connection) throws SQLException;
@Nullable
default Instant nextUpdateTime() {
return null;
}
} }

View File

@@ -15,6 +15,8 @@ public record SecurityInformationChange(
boolean isCertificateProfileChanged, boolean isCertificateProfileChanged,
boolean isCertificateSanChanged, boolean isCertificateSanChanged,
boolean isCertificatePublicKeyChanged, boolean isCertificatePublicKeyChanged,
boolean isCertificateSerialNumberChanged,
boolean isCertificateIssuerChanged,
Duration oldCertificateTimeToExpiry, Duration oldCertificateTimeToExpiry,
boolean isSecurityHeadersChanged, boolean isSecurityHeadersChanged,
boolean isIpAddressChanged, boolean isIpAddressChanged,
@@ -30,8 +32,10 @@ public record SecurityInformationChange(
boolean certificateFingerprintChanged = 0 != Arrays.compare(before.sslCertFingerprintSha256(), after.sslCertFingerprintSha256()); boolean certificateFingerprintChanged = 0 != Arrays.compare(before.sslCertFingerprintSha256(), after.sslCertFingerprintSha256());
boolean certificateProfileChanged = before.certificateProfileHash() != after.certificateProfileHash(); boolean certificateProfileChanged = before.certificateProfileHash() != after.certificateProfileHash();
boolean certificateSerialNumberChanged = !Objects.equals(before.sslCertSerialNumber(), after.sslCertSerialNumber());
boolean certificatePublicKeyChanged = 0 != Arrays.compare(before.sslCertPublicKeyHash(), after.sslCertPublicKeyHash()); boolean certificatePublicKeyChanged = 0 != Arrays.compare(before.sslCertPublicKeyHash(), after.sslCertPublicKeyHash());
boolean certificateSanChanged = !Objects.equals(before.sslCertSan(), after.sslCertSan()); boolean certificateSanChanged = !Objects.equals(before.sslCertSan(), after.sslCertSan());
boolean certificateIssuerChanged = !Objects.equals(before.sslCertIssuer(), after.sslCertIssuer());
Duration oldCertificateTimeToExpiry = before.sslCertNotAfter() == null ? null : Duration.between( Duration oldCertificateTimeToExpiry = before.sslCertNotAfter() == null ? null : Duration.between(
Instant.now(), Instant.now(),
@@ -50,6 +54,7 @@ public record SecurityInformationChange(
boolean isChanged = asnChanged boolean isChanged = asnChanged
|| certificateFingerprintChanged || certificateFingerprintChanged
|| securityHeadersChanged || securityHeadersChanged
|| certificateProfileChanged
|| softwareChanged; || softwareChanged;
return new SecurityInformationChange( return new SecurityInformationChange(
@@ -59,6 +64,8 @@ public record SecurityInformationChange(
certificateProfileChanged, certificateProfileChanged,
certificateSanChanged, certificateSanChanged,
certificatePublicKeyChanged, certificatePublicKeyChanged,
certificateSerialNumberChanged,
certificateIssuerChanged,
oldCertificateTimeToExpiry, oldCertificateTimeToExpiry,
securityHeadersChanged, securityHeadersChanged,
ipChanged, ipChanged,

View File

@@ -48,7 +48,6 @@ public class DnsPingService {
switch (changes) { switch (changes) {
case DnsRecordChange.None _ -> {} case DnsRecordChange.None _ -> {}
case DnsRecordChange.Changed changed -> { case DnsRecordChange.Changed changed -> {
logger.info("DNS record for {} changed: {}", newRecord.dnsRootDomainId(), changed);
generatedRecords.add(DomainDnsEvent.builder() generatedRecords.add(DomainDnsEvent.builder()
.rootDomainId(newRecord.dnsRootDomainId()) .rootDomainId(newRecord.dnsRootDomainId())
.nodeId(newRecord.nodeAffinity()) .nodeId(newRecord.nodeAffinity())

View File

@@ -71,23 +71,40 @@ public class DomainAvailabilityInformationFactory {
@Nullable DomainAvailabilityRecord previousRecord, @Nullable DomainAvailabilityRecord previousRecord,
HttpResponse rsp) { HttpResponse rsp) {
Instant lastError = previousRecord != null ? previousRecord.tsLastAvailable() : null; final boolean isAvailable;
final Instant now = Instant.now();
final Instant lastAvailable;
final Instant lastError;
final ErrorClassification errorClassification;
if (rsp.httpStatus() >= 400) {
isAvailable = false;
lastError = now;
lastAvailable = previousRecord != null ? previousRecord.tsLastAvailable() : null;
errorClassification = ErrorClassification.HTTP_SERVER_ERROR;
} else {
isAvailable = true;
lastAvailable = now;
lastError = previousRecord != null ? previousRecord.tsLastError() : null;
errorClassification = ErrorClassification.NONE;
}
return DomainAvailabilityRecord.builder() return DomainAvailabilityRecord.builder()
.domainId(domainId) .domainId(domainId)
.nodeId(nodeId) .nodeId(nodeId)
.serverAvailable(true) .serverAvailable(isAvailable)
.serverIp(address != null ? address.getAddress() : null) .serverIp(address != null ? address.getAddress() : null)
.serverIpAsn(getAsn(address)) .serverIpAsn(getAsn(address))
.httpSchema(HttpSchema.HTTP) .httpSchema(HttpSchema.HTTP)
.httpStatus(rsp.httpStatus()) .httpStatus(rsp.httpStatus())
.errorClassification(errorClassification)
.httpResponseTime(rsp.httpResponseTime()) .httpResponseTime(rsp.httpResponseTime())
.httpEtag(rsp.headers().getFirst("ETag")) .httpEtag(rsp.headers().getFirst("ETag"))
.httpLastModified(rsp.headers().getFirst("Last-Modified")) .httpLastModified(rsp.headers().getFirst("Last-Modified"))
.tsLastPing(Instant.now()) .tsLastPing(now)
.tsLastAvailable(Instant.now()) .tsLastAvailable(lastAvailable)
.tsLastError(lastError) .tsLastError(lastError)
.nextScheduledUpdate(Instant.now().plus(backoffStrategy.getOkInterval())) .nextScheduledUpdate(now.plus(backoffStrategy.getOkInterval()))
.backoffFetchInterval(backoffStrategy.getOkInterval()) .backoffFetchInterval(backoffStrategy.getOkInterval())
.build(); .build();
@@ -117,23 +134,44 @@ public class DomainAvailabilityInformationFactory {
updateTime = Instant.now().plus(backoffStrategy.getOkInterval()); updateTime = Instant.now().plus(backoffStrategy.getOkInterval());
} }
Instant lastError = previousRecord != null ? previousRecord.tsLastAvailable() : null; final boolean isAvailable;
final Instant now = Instant.now();
final Instant lastAvailable;
final Instant lastError;
final ErrorClassification errorClassification;
if (!validationResult.isValid()) {
isAvailable = false;
lastError = now;
lastAvailable = previousRecord != null ? previousRecord.tsLastAvailable() : null;
errorClassification = ErrorClassification.SSL_ERROR;
} else if (rsp.httpStatus() >= 400) {
isAvailable = false;
lastError = now;
lastAvailable = previousRecord != null ? previousRecord.tsLastAvailable() : null;
errorClassification = ErrorClassification.HTTP_SERVER_ERROR;
} else {
isAvailable = true;
lastAvailable = Instant.now();
lastError = previousRecord != null ? previousRecord.tsLastError() : null;
errorClassification = ErrorClassification.NONE;
}
return DomainAvailabilityRecord.builder() return DomainAvailabilityRecord.builder()
.domainId(domainId) .domainId(domainId)
.nodeId(nodeId) .nodeId(nodeId)
.serverAvailable(validationResult.isValid()) .serverAvailable(isAvailable)
.serverIp(address != null ? address.getAddress() : null) .serverIp(address != null ? address.getAddress() : null)
.serverIpAsn(getAsn(address)) .serverIpAsn(getAsn(address))
.httpSchema(HttpSchema.HTTPS) .httpSchema(HttpSchema.HTTPS)
.httpStatus(rsp.httpStatus()) .httpStatus(rsp.httpStatus())
.errorClassification(!validationResult.isValid() ? ErrorClassification.SSL_ERROR : ErrorClassification.NONE) .errorClassification(errorClassification)
.httpResponseTime(rsp.httpResponseTime()) // Placeholder, actual timing not implemented .httpResponseTime(rsp.httpResponseTime()) // Placeholder, actual timing not implemented
.httpEtag(rsp.headers().getFirst("ETag")) .httpEtag(rsp.headers().getFirst("ETag"))
.httpLastModified(rsp.headers().getFirst("Last-Modified")) .httpLastModified(rsp.headers().getFirst("Last-Modified"))
.tsLastPing(Instant.now()) .tsLastPing(now)
.tsLastError(lastError) .tsLastError(lastError)
.tsLastAvailable(Instant.now()) .tsLastAvailable(lastAvailable)
.nextScheduledUpdate(updateTime) .nextScheduledUpdate(updateTime)
.backoffFetchInterval(backoffStrategy.getOkInterval()) .backoffFetchInterval(backoffStrategy.getOkInterval())
.build(); .build();

View File

@@ -8,6 +8,7 @@ import nu.marginalia.ping.ssl.PKIXValidationResult;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateEncodingException; import java.security.cert.CertificateEncodingException;
@@ -21,13 +22,17 @@ public class DomainSecurityInformationFactory {
private static final Logger logger = LoggerFactory.getLogger(DomainSecurityInformationFactory.class); private static final Logger logger = LoggerFactory.getLogger(DomainSecurityInformationFactory.class);
// Vanilla HTTP (not HTTPS) response does not have SSL session information, so we return null // Vanilla HTTP (not HTTPS) response does not have SSL session information, so we return null
public DomainSecurityRecord createHttpSecurityInformation(HttpResponse httpResponse, int domainId, int nodeId) { public DomainSecurityRecord createHttpSecurityInformation(HttpResponse httpResponse,
int domainId, int nodeId,
@Nullable Integer asn
) {
var headers = httpResponse.headers(); var headers = httpResponse.headers();
return DomainSecurityRecord.builder() return DomainSecurityRecord.builder()
.domainId(domainId) .domainId(domainId)
.nodeId(nodeId) .nodeId(nodeId)
.asn(asn)
.httpSchema(HttpSchema.HTTP) .httpSchema(HttpSchema.HTTP)
.httpVersion(httpResponse.version()) .httpVersion(httpResponse.version())
.headerServer(headers.getFirst("Server")) .headerServer(headers.getFirst("Server"))
@@ -47,7 +52,13 @@ public class DomainSecurityInformationFactory {
} }
// HTTPS response // HTTPS response
public DomainSecurityRecord createHttpsSecurityInformation(HttpsResponse httpResponse, PKIXValidationResult validationResult, int domainId, int nodeId) { public DomainSecurityRecord createHttpsSecurityInformation(
HttpsResponse httpResponse,
PKIXValidationResult validationResult,
int domainId,
int nodeId,
@Nullable Integer asn
) {
var headers = httpResponse.headers(); var headers = httpResponse.headers();
@@ -86,6 +97,7 @@ public class DomainSecurityInformationFactory {
return DomainSecurityRecord.builder() return DomainSecurityRecord.builder()
.domainId(domainId) .domainId(domainId)
.nodeId(nodeId) .nodeId(nodeId)
.asn(asn)
.httpSchema(HttpSchema.HTTPS) .httpSchema(HttpSchema.HTTPS)
.headerServer(headers.getFirst("Server")) .headerServer(headers.getFirst("Server"))
.headerCorsAllowOrigin(headers.getFirst("Access-Control-Allow-Origin")) .headerCorsAllowOrigin(headers.getFirst("Access-Control-Allow-Origin"))

View File

@@ -18,6 +18,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.security.cert.X509Certificate; import java.security.cert.X509Certificate;
import java.sql.SQLException; import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@@ -75,8 +76,8 @@ public class HttpPingService {
result = pingHttpFetcher.fetchUrl(url, Method.HEAD, null, null); result = pingHttpFetcher.fetchUrl(url, Method.HEAD, null, null);
if (result instanceof HttpsResponse response && response.httpStatus() == 405) { if (result instanceof HttpsResponse response && shouldTryGET(response.httpStatus())) {
// If we get a 405, we try the GET method instead as not all servers support HEAD requests sleep(Duration.ofSeconds(2));
result = pingHttpFetcher.fetchUrl(url, Method.GET, null, null); result = pingHttpFetcher.fetchUrl(url, Method.GET, null, null);
} }
else if (result instanceof ConnectionError) { else if (result instanceof ConnectionError) {
@@ -84,8 +85,8 @@ public class HttpPingService {
if (!(result2 instanceof ConnectionError)) { if (!(result2 instanceof ConnectionError)) {
result = result2; result = result2;
} }
if (result instanceof HttpResponse response && response.httpStatus() == 405) { if (result instanceof HttpResponse response && shouldTryGET(response.httpStatus())) {
// If we get a 405, we try the GET method instead as not all servers support HEAD requests sleep(Duration.ofSeconds(2));
result = pingHttpFetcher.fetchUrl(alternateUrl, Method.GET, null, null); result = pingHttpFetcher.fetchUrl(alternateUrl, Method.GET, null, null);
} }
} }
@@ -116,7 +117,7 @@ public class HttpPingService {
domainReference.nodeId(), domainReference.nodeId(),
oldPingStatus, oldPingStatus,
ErrorClassification.CONNECTION_ERROR, ErrorClassification.CONNECTION_ERROR,
null); rsp.errorMessage());
newSecurityInformation = null; newSecurityInformation = null;
} }
case TimeoutResponse rsp -> { case TimeoutResponse rsp -> {
@@ -148,7 +149,8 @@ public class HttpPingService {
newSecurityInformation = domainSecurityInformationFactory.createHttpSecurityInformation( newSecurityInformation = domainSecurityInformationFactory.createHttpSecurityInformation(
httpResponse, httpResponse,
domainReference.domainId(), domainReference.domainId(),
domainReference.nodeId() domainReference.nodeId(),
newPingStatus.asn()
); );
} }
case HttpsResponse httpsResponse -> { case HttpsResponse httpsResponse -> {
@@ -166,7 +168,8 @@ public class HttpPingService {
httpsResponse, httpsResponse,
validationResult, validationResult,
domainReference.domainId(), domainReference.domainId(),
domainReference.nodeId() domainReference.nodeId(),
newPingStatus.asn()
); );
} }
} }
@@ -190,6 +193,29 @@ public class HttpPingService {
return generatedRecords; return generatedRecords;
} }
private boolean shouldTryGET(int statusCode) {
if (statusCode < 400) {
return false;
}
if (statusCode == 429) { // Too many requests, we should not retry with GET
return false;
}
// For all other status codes, we can try a GET request, as many severs do not
// cope with HEAD requests properly.
return statusCode < 600;
}
private void sleep(Duration duration) {
try {
Thread.sleep(duration.toMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore the interrupted status
logger.warn("Sleep interrupted", e);
}
}
private void comparePingStatuses(List<WritableModel> generatedRecords, private void comparePingStatuses(List<WritableModel> generatedRecords,
DomainAvailabilityRecord oldPingStatus, DomainAvailabilityRecord oldPingStatus,
DomainAvailabilityRecord newPingStatus) { DomainAvailabilityRecord newPingStatus) {
@@ -258,6 +284,8 @@ public class HttpPingService {
change.isCertificateProfileChanged(), change.isCertificateProfileChanged(),
change.isCertificateSanChanged(), change.isCertificateSanChanged(),
change.isCertificatePublicKeyChanged(), change.isCertificatePublicKeyChanged(),
change.isCertificateSerialNumberChanged(),
change.isCertificateIssuerChanged(),
change.oldCertificateTimeToExpiry(), change.oldCertificateTimeToExpiry(),
change.isSecurityHeadersChanged(), change.isSecurityHeadersChanged(),
change.isIpAddressChanged(), change.isIpAddressChanged(),

View File

@@ -318,6 +318,8 @@ class PingDaoTest {
true, true,
false, false,
true, true,
true,
false,
Duration.ofDays(30), Duration.ofDays(30),
false, false,
false, false,
@@ -330,86 +332,6 @@ class PingDaoTest {
svc.write(event); svc.write(event);
} }
@Test
void getNextDomainPingStatuses() throws SQLException {
var svc = new PingDao(dataSource);
// Create a test domain availability record
var record = new DomainAvailabilityRecord(
1,
1,
true,
new byte[]{127, 0, 0, 1},
40,
0x0F00BA32L,
0x0F00BA34L,
HttpSchema.HTTP,
"etag123",
"Wed, 21 Oct 2023 07:28:00 GMT",
200,
"http://example.com/redirect",
Duration.ofMillis(150),
ErrorClassification.NONE,
"No error",
Instant.now().minus(30, ChronoUnit.SECONDS),
Instant.now().minus(3600, ChronoUnit.SECONDS),
Instant.now().minus(7200, ChronoUnit.SECONDS),
Instant.now().minus(3000, ChronoUnit.SECONDS),
2,
Duration.ofSeconds(60)
);
svc.write(record);
// Fetch the next domain ping statuses
var statuses = svc.getNextDomainPingStatuses(10, 1);
assertFalse(statuses.isEmpty());
assertEquals(1, statuses.size());
}
@Test
void getNextDnsDomainRecords() throws SQLException {
var svc = new PingDao(dataSource);
// Create a test DNS record
var dnsRecord = new DomainDnsRecord(null, "example.com", 2,
List.of("test"),
List.of("test2"),
"test3",
List.of("test4"),
List.of("test5"),
List.of("test6"),
List.of("test7"),
"test8",
Instant.now().minus(3600, ChronoUnit.SECONDS),
Instant.now().minus(3600, ChronoUnit.SECONDS),
4);
svc.write(dnsRecord);
var nextRecords = svc.getNextDnsDomainRecords(1, 2);
assertFalse(nextRecords.isEmpty());
assertEquals(1, nextRecords.size());
}
@Test
void getOrphanedDomains(){
var svc = new PingDao(dataSource);
var orphanedDomains = svc.getOrphanedDomains(1);
System.out.println(orphanedDomains);
assertTrue(orphanedDomains.contains(new DomainReference(1, 1, "www.marginalia.nu")));
assertFalse(orphanedDomains.isEmpty());
var orphanedRootDomains = svc.getOrphanedRootDomains(1);
System.out.println(orphanedRootDomains);
assertTrue(orphanedRootDomains.contains("marginalia.nu"));
}
@Test @Test
void write() { void write() {
var dnsEvent = new DomainDnsEvent( var dnsEvent = new DomainDnsEvent(

View File

@@ -1,9 +1,8 @@
package nu.marginalia.mqapi.ping; package nu.marginalia.mqapi.ping;
public class PingRequest { public class PingRequest {
public final String runClass;
public PingRequest(String runClass) { public PingRequest() {
this.runClass = runClass;
} }
} }