1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-05 21:22:39 +02:00

Compare commits

...

17 Commits

Author SHA1 Message Date
Viktor Lofgren
9f041d6631 (ping) Drop the concept of primary and secondary ping instances
There was an idea of having the ping service duck over to a realtime partition when the partition is crawling, but this hasn't been working out well, so the concept will be retired and all nodes will run as primary.
2025-06-14 12:32:08 +02:00
Viktor Lofgren
13fb1efce4 (ping) Populate ASN field on DomainSecurityInformation 2025-06-13 15:45:43 +02:00
Viktor Lofgren
c1225165b7 (ping) Add a summary fields CHANGE_SERIAL_NUMBER and CHANGE_ISSUER to DOMAIN_SECURITY_EVENTS 2025-06-13 15:30:45 +02:00
Viktor Lofgren
67ad7a3bbc (ping) Enhance HTTP ping logic to retry GET requests for specific status codes and add sleep duration between retries 2025-06-13 12:59:56 +02:00
Viktor Lofgren
ed62ec8a35 (random) Sanitize random search results with DOMAIN_AVAILABILITY_INFORMATION join 2025-06-13 10:38:21 +02:00
Viktor Lofgren
42b24cfa34 (ping) Fix NPE in dnsJobConsumer 2025-06-12 14:22:09 +02:00
Viktor Lofgren
1ffaab2da6 (ping) Mute logging along the happy path now that things are working 2025-06-12 14:15:23 +02:00
Viktor Lofgren
5f93c7f767 (ping) Update PROC_PING_SPAWNER to use REALTIME from SIDELOAD 2025-06-12 14:04:09 +02:00
Viktor Lofgren
4001c68c82 (ping) Update SQL query to include NODE_AFFINITY in historical availability data retrieval 2025-06-12 13:58:50 +02:00
Viktor Lofgren
6b811489c5 (actor) Make ping spawner auto-spawn the process 2025-06-12 13:46:50 +02:00
Viktor Lofgren
e9d317c65d (ping) Parameterize thread counts for availability and DNS job consumers 2025-06-12 13:34:58 +02:00
Viktor Lofgren
16b05a4737 (ping) Reduce maximum total connections in HttpClientProvider to improve resource management 2025-06-12 13:04:55 +02:00
Viktor Lofgren
021cd73cbb (ping) Reduce db contention by moving job scheduling out of the database to RAM 2025-06-12 12:56:33 +02:00
Viktor Lofgren
4253bd53b5 (ping) Fix issue where errors were not correctly labeled in availability 2025-06-12 00:18:07 +02:00
Viktor Lofgren
14c87461a5 (ping) Fix issue where errors were not correctly labeled in availability 2025-06-12 00:04:39 +02:00
Viktor Lofgren
9afed0a18e (ping) Optimize parameters
Reduce socket and connection timeouts in HttpClient and adjust thread counts for job consumers
2025-06-11 16:21:45 +02:00
Viktor Lofgren
afad4deb94 (ping) Fix DB query to prioritize DNS information updates correctly
This also reduces CPU%
2025-06-11 14:58:28 +02:00
25 changed files with 589 additions and 486 deletions

View File

@@ -0,0 +1,6 @@
-- Add additional summary columns to DOMAIN_SECURITY_EVENTS table
-- to make it easier to make sense of certificate changes
ALTER TABLE DOMAIN_SECURITY_EVENTS ADD COLUMN CHANGE_CERTIFICATE_SERIAL_NUMBER BOOLEAN NOT NULL DEFAULT FALSE;
ALTER TABLE DOMAIN_SECURITY_EVENTS ADD COLUMN CHANGE_CERTIFICATE_ISSUER BOOLEAN NOT NULL DEFAULT FALSE;
OPTIMIZE TABLE DOMAIN_SECURITY_EVENTS;

View File

@@ -12,7 +12,7 @@ public enum ExecutorActor {
RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
RECRAWL_SINGLE_DOMAIN(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
PROC_CRAWLER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
PROC_PING_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.SIDELOAD),
PROC_PING_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.REALTIME),
PROC_EXPORT_TASKS_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
ADJACENCY_CALCULATION(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
EXPORT_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),

View File

@@ -3,24 +3,176 @@ package nu.marginalia.actor.proc;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import nu.marginalia.actor.state.Terminal;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqapi.ping.PingRequest;
import nu.marginalia.process.ProcessService;
import nu.marginalia.service.module.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Singleton
public class PingMonitorActor extends AbstractProcessSpawnerActor {
public class PingMonitorActor extends RecordActorPrototype {
@Inject
public PingMonitorActor(Gson gson, ServiceConfiguration configuration, MqPersistence persistence, ProcessService processService) {
super(gson,
configuration,
persistence,
processService,
ProcessInboxNames.PING_INBOX,
ProcessService.ProcessId.PING);
private final MqPersistence persistence;
private final ProcessService processService;
private final Logger logger = LoggerFactory.getLogger(getClass());
public static final int MAX_ATTEMPTS = 3;
private final String inboxName;
private final ProcessService.ProcessId processId;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final int node;
private final Gson gson;
public record Initial() implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RETRY)
public record Monitor(int errorAttempts) implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RESTART)
public record Run(int attempts) implements ActorStep {}
@Terminal
public record Aborted() implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Initial i -> {
PingRequest request = new PingRequest();
persistence.sendNewMessage(inboxName, null, null,
"PingRequest",
gson.toJson(request),
null);
yield new Monitor(0);
}
case Monitor(int errorAttempts) -> {
for (;;) {
var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty() && !processService.isRunning(processId)) {
synchronized (processId) {
processId.wait(5000);
}
if (errorAttempts > 0) { // Reset the error counter if there is silence in the inbox
yield new Monitor(0);
}
// else continue
} else {
// Special: Associate this thread with the message so that we can get tracking
MqMessageHandlerRegistry.register(messages.getFirst().msgId());
yield new Run(0);
}
}
}
case Run(int attempts) -> {
try {
long startTime = System.currentTimeMillis();
var exec = new TaskExecution();
long endTime = System.currentTimeMillis();
if (exec.isError()) {
if (attempts < MAX_ATTEMPTS)
yield new Run(attempts + 1);
else
yield new Error();
}
else if (endTime - startTime < TimeUnit.SECONDS.toMillis(1)) {
// To avoid boot loops, we transition to error if the process
// didn't run for longer than 1 seconds. This might happen if
// the process crashes before it can reach the heartbeat and inbox
// stages of execution. In this case it would not report having acted
// on its message, and the process would be restarted forever without
// the attempts counter incrementing.
yield new Error("Process terminated within 1 seconds of starting");
}
}
catch (InterruptedException ex) {
// We get this exception when the process is cancelled by the user
processService.kill(processId);
setCurrentMessageToDead();
yield new Aborted();
}
yield new Monitor(attempts);
}
default -> new Error();
};
}
public String describe() {
return "Spawns a(n) " + processId + " process and monitors its inbox for messages";
}
@Inject
public PingMonitorActor(Gson gson,
ServiceConfiguration configuration,
MqPersistence persistence,
ProcessService processService) throws SQLException {
super(gson);
this.gson = gson;
this.node = configuration.node();
this.persistence = persistence;
this.processService = processService;
this.inboxName = ProcessInboxNames.PING_INBOX + ":" + node;
this.processId = ProcessService.ProcessId.PING;
}
/** Sets the message to dead in the database to avoid
* the service respawning on the same task when we
* re-enable this actor */
private void setCurrentMessageToDead() {
try {
var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty()) // Possibly a race condition where the task is already finished
return;
var theMessage = messages.iterator().next();
persistence.updateMessageState(theMessage.msgId(), MqMessageState.DEAD);
}
catch (SQLException ex) {
logger.error("Tried but failed to set the message for " + processId + " to dead", ex);
}
}
/** Encapsulates the execution of the process in a separate thread so that
* we can interrupt the thread if the process is cancelled */
private class TaskExecution {
private final AtomicBoolean error = new AtomicBoolean(false);
public TaskExecution() throws ExecutionException, InterruptedException {
// Run this call in a separate thread so that this thread can be interrupted waiting for it
executorService.submit(() -> {
try {
processService.trigger(processId);
} catch (Exception e) {
logger.warn("Error in triggering process", e);
error.set(true);
}
}).get(); // Wait for the process to start
}
public boolean isError() {
return error.get();
}
}
}

View File

@@ -27,10 +27,12 @@ public class DbBrowseDomainsRandom {
public List<BrowseResult> getRandomDomains(int count, DomainBlacklist blacklist, int set) {
final String q = """
SELECT DOMAIN_ID, DOMAIN_NAME, INDEXED
SELECT EC_RANDOM_DOMAINS.DOMAIN_ID, DOMAIN_NAME, INDEXED
FROM EC_RANDOM_DOMAINS
INNER JOIN EC_DOMAIN ON EC_DOMAIN.ID=DOMAIN_ID
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION DAI ON DAI.DOMAIN_ID=EC_RANDOM_DOMAINS.DOMAIN_ID
WHERE STATE<2
AND SERVER_AVAILABLE
AND DOMAIN_SET=?
AND DOMAIN_ALIAS IS NULL
ORDER BY RAND()

View File

@@ -15,6 +15,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
@Singleton
public class PingDao {
@@ -76,32 +77,6 @@ public class PingDao {
}
}
public List<DomainReference> getNewDomains(int nodeId, int cnt) throws SQLException {
List<DomainReference> domains = new ArrayList<>();
try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement("""
SELECT domain_id, domain_name
FROM EC_DOMAIN
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION
ON EC_DOMAIN.domain_id = DOMAIN_AVAILABILITY_INFORMATION.domain_id
WHERE DOMAIN_AVAILABILITY_INFORMATION.server_available IS NULL
AND EC_DOMAIN.NODE_ID = ?
LIMIT ?
"""))
{
ps.setInt(1, nodeId);
ps.setInt(2, cnt);
ResultSet rs = ps.executeQuery();
while (rs.next()) {
domains.add(new DomainReference(rs.getInt("domain_id"), nodeId, rs.getString("domain_name").toLowerCase()));
}
}
return domains;
}
public DomainAvailabilityRecord getDomainPingStatus(int domainId) throws SQLException {
try (var conn = dataSource.getConnection();
@@ -132,7 +107,7 @@ public class PingDao {
}
}
public DomainDnsRecord getDomainDnsRecord(int dnsRootDomainId) throws SQLException {
public DomainDnsRecord getDomainDnsRecord(long dnsRootDomainId) throws SQLException {
try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement("SELECT * FROM DOMAIN_DNS_INFORMATION WHERE DNS_ROOT_DOMAIN_ID = ?")) {
@@ -160,111 +135,123 @@ public class PingDao {
}
}
public List<HistoricalAvailabilityData> getNextDomainPingStatuses(int count, int nodeId) throws SQLException {
List<HistoricalAvailabilityData> domainAvailabilityRecords = new ArrayList<>(count);
public HistoricalAvailabilityData getHistoricalAvailabilityData(long domainId) throws SQLException {
var query = """
SELECT DOMAIN_AVAILABILITY_INFORMATION.*, DOMAIN_SECURITY_INFORMATION.*, EC_DOMAIN.DOMAIN_NAME FROM DOMAIN_AVAILABILITY_INFORMATION
LEFT JOIN DOMAIN_SECURITY_INFORMATION
ON DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID = DOMAIN_SECURITY_INFORMATION.DOMAIN_ID
INNER JOIN EC_DOMAIN ON EC_DOMAIN.ID = DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID
WHERE NEXT_SCHEDULED_UPDATE <= ? AND DOMAIN_AVAILABILITY_INFORMATION.NODE_ID = ?
ORDER BY NEXT_SCHEDULED_UPDATE ASC
LIMIT ?
SELECT EC_DOMAIN.ID, EC_DOMAIN.DOMAIN_NAME, EC_DOMAIN.NODE_AFFINITY, DOMAIN_AVAILABILITY_INFORMATION.*, DOMAIN_SECURITY_INFORMATION.*
FROM EC_DOMAIN
LEFT JOIN DOMAIN_SECURITY_INFORMATION ON DOMAIN_SECURITY_INFORMATION.DOMAIN_ID = EC_DOMAIN.ID
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION ON DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID = EC_DOMAIN.ID
WHERE EC_DOMAIN.ID = ?
""";
try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement(query)) {
// Use Java time since this is how we generate the timestamps in the ping process
// to avoid timezone weirdness.
ps.setTimestamp(1, java.sql.Timestamp.from(Instant.now()));
ps.setInt(2, nodeId);
ps.setInt(3, count);
ps.setLong(1, domainId);
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String domainName = rs.getString("EC_DOMAIN.DOMAIN_NAME");
var domainAvailabilityRecord = new DomainAvailabilityRecord(rs);
if (rs.getObject("DOMAIN_SECURITY_INFORMATION.DOMAIN_ID", Integer.class) != null) {
var securityRecord = new DomainSecurityRecord(rs);
domainAvailabilityRecords.add(
new HistoricalAvailabilityData.AvailabilityAndSecurity(domainName, domainAvailabilityRecord, securityRecord)
);
} else {
domainAvailabilityRecords.add(new HistoricalAvailabilityData.JustAvailability(domainName, domainAvailabilityRecord));
DomainAvailabilityRecord dar;
DomainSecurityRecord dsr;
if (rs.getObject("DOMAIN_SECURITY_INFORMATION.DOMAIN_ID", Integer.class) != null)
dsr = new DomainSecurityRecord(rs);
else
dsr = null;
if (rs.getObject("DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID", Integer.class) != null)
dar = new DomainAvailabilityRecord(rs);
else
dar = null;
if (dar == null) {
return new HistoricalAvailabilityData.JustDomainReference(new DomainReference(
rs.getInt("EC_DOMAIN.ID"),
rs.getInt("EC_DOMAIN.NODE_AFFINITY"),
domainName.toLowerCase()
));
}
else {
if (dsr != null) {
return new HistoricalAvailabilityData.AvailabilityAndSecurity(domainName, dar, dsr);
} else {
return new HistoricalAvailabilityData.JustAvailability(domainName, dar);
}
}
}
}
return domainAvailabilityRecords;
return null;
}
public List<DomainDnsRecord> getNextDnsDomainRecords(int count, int nodeId) throws SQLException {
List<DomainDnsRecord> domainDnsRecords = new ArrayList<>(count);
public List<UpdateSchedule.UpdateJob<Long, HistoricalAvailabilityData>> getDomainUpdateSchedule(int nodeId) {
List<UpdateSchedule.UpdateJob<Long, HistoricalAvailabilityData>> updateJobs = new ArrayList<>();
var query = """
SELECT * FROM DOMAIN_DNS_INFORMATION
WHERE TS_NEXT_DNS_CHECK <= ? AND NODE_AFFINITY = ?
ORDER BY DNS_CHECK_PRIORITY ASC, TS_NEXT_DNS_CHECK ASC
LIMIT ?
""";
try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement(query)) {
ps.setTimestamp(1, java.sql.Timestamp.from(Instant.now()));
ps.setInt(2, nodeId);
ps.setInt(3, count);
var ps = conn.prepareStatement("""
SELECT ID, NEXT_SCHEDULED_UPDATE
FROM EC_DOMAIN
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION
ON EC_DOMAIN.ID = DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID
WHERE NODE_AFFINITY = ?
""")) {
ps.setFetchSize(10_000);
ps.setInt(1, nodeId);
ResultSet rs = ps.executeQuery();
while (rs.next()) {
domainDnsRecords.add(new DomainDnsRecord(rs));
long domainId = rs.getLong("ID");
var ts = rs.getTimestamp("NEXT_SCHEDULED_UPDATE");
Instant nextUpdate = ts == null ? Instant.now() : ts.toInstant();
updateJobs.add(new UpdateSchedule.UpdateJob<>(domainId, nextUpdate));
}
} catch (SQLException e) {
throw new RuntimeException("Failed to retrieve domain update schedule", e);
}
return domainDnsRecords;
logger.info("Found {} availability update jobs for node {}", updateJobs.size(), nodeId);
return updateJobs;
}
public List<DomainReference> getOrphanedDomains(int nodeId) {
List<DomainReference> orphanedDomains = new ArrayList<>();
public List<UpdateSchedule.UpdateJob<RootDomainReference, RootDomainReference>> getDnsUpdateSchedule(int nodeId) {
List<UpdateSchedule.UpdateJob<RootDomainReference, RootDomainReference>> updateJobs = new ArrayList<>();
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT e.DOMAIN_NAME, e.ID
FROM EC_DOMAIN e
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION d ON e.ID = d.DOMAIN_ID
WHERE d.DOMAIN_ID IS NULL AND e.NODE_AFFINITY = ?;
var ps = conn.prepareStatement("""
SELECT DISTINCT(DOMAIN_TOP),DOMAIN_DNS_INFORMATION.* FROM EC_DOMAIN
LEFT JOIN DOMAIN_DNS_INFORMATION ON ROOT_DOMAIN_NAME = DOMAIN_TOP
WHERE EC_DOMAIN.NODE_AFFINITY = ?
""")) {
stmt.setInt(1, nodeId);
stmt.setFetchSize(10_000);
ResultSet rs = stmt.executeQuery();
ps.setFetchSize(10_000);
ps.setInt(1, nodeId);
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String domainName = rs.getString("DOMAIN_NAME");
int domainId = rs.getInt("ID");
Long dnsRootDomainId = rs.getObject("DOMAIN_DNS_INFORMATION.DNS_ROOT_DOMAIN_ID", Long.class);
String rootDomainName = rs.getString("DOMAIN_TOP");
orphanedDomains.add(new DomainReference(domainId, nodeId, domainName));
if (dnsRootDomainId == null) {
updateJobs.add(
new UpdateSchedule.UpdateJob<>(
new RootDomainReference.ByName(rootDomainName),
Instant.now())
);
}
else {
var record = new DomainDnsRecord(rs);
updateJobs.add(new UpdateSchedule.UpdateJob<>(
new RootDomainReference.ById(dnsRootDomainId),
Objects.requireNonNullElseGet(record.tsNextScheduledUpdate(), Instant::now))
);
}
}
}
catch (SQLException e) {
throw new RuntimeException("Failed to retrieve orphaned domains", e);
} catch (SQLException e) {
throw new RuntimeException("Failed to retrieve DNS update schedule", e);
}
return orphanedDomains;
}
logger.info("Found {} dns update jobs for node {}", updateJobs.size(), nodeId);
public List<String> getOrphanedRootDomains(int nodeId) {
List<String> orphanedDomains = new ArrayList<>();
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT DISTINCT(DOMAIN_TOP)
FROM EC_DOMAIN e
LEFT JOIN DOMAIN_DNS_INFORMATION d ON e.DOMAIN_TOP = d.ROOT_DOMAIN_NAME
WHERE d.ROOT_DOMAIN_NAME IS NULL AND e.NODE_AFFINITY = ?;
""")) {
stmt.setInt(1, nodeId);
stmt.setFetchSize(10_000);
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
String domainName = rs.getString("DOMAIN_TOP");
orphanedDomains.add(domainName.toLowerCase());
}
}
catch (SQLException e) {
throw new RuntimeException("Failed to retrieve orphaned domains", e);
}
return orphanedDomains;
return updateJobs;
}
}

View File

@@ -4,15 +4,15 @@ import com.google.inject.Inject;
import nu.marginalia.ping.model.*;
import nu.marginalia.ping.svc.DnsPingService;
import nu.marginalia.ping.svc.HttpPingService;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/** PingJobScheduler is responsible for scheduling and processing ping jobs
@@ -27,50 +27,13 @@ public class PingJobScheduler {
private static final Logger logger = LoggerFactory.getLogger(PingJobScheduler.class);
sealed interface DnsJob {
Object reference();
private static final UpdateSchedule<RootDomainReference, RootDomainReference> dnsUpdateSchedule
= new UpdateSchedule<>(250_000);
private static final UpdateSchedule<Long, HistoricalAvailabilityData> availabilityUpdateSchedule
= new UpdateSchedule<>(250_000);
record DnsFetch(String rootDomain) implements DnsJob {
@Override
public Object reference() {
return rootDomain;
}
}
record DnsRefresh(DomainDnsRecord oldRecord) implements DnsJob {
@Override
public Object reference() {
return oldRecord.rootDomainName();
}
}
}
sealed interface AvailabilityJob {
Object reference();
record Availability(DomainReference domainReference) implements AvailabilityJob {
@Override
public Object reference() {
return domainReference.domainName();
}
}
record AvailabilityRefresh(String domain, @NotNull DomainAvailabilityRecord availability, @Nullable DomainSecurityRecord securityRecord) implements AvailabilityJob {
@Override
public Object reference() {
return domain;
}
}
}
// Keeps track of ongoing ping and DNS processing to avoid duplicate work,
// which is mainly a scenario that will occur when there is not a lot of data
// in the database. In real-world scenarios, the queues will be full most
// of the time, and prevent this from being an issue.
private static final ConcurrentHashMap<Object, Boolean> processingDomainsAvailability = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<Object, Boolean> processingDomainsDns = new ConcurrentHashMap<>();
private static final ArrayBlockingQueue<DnsJob> dnsJobQueue = new ArrayBlockingQueue<>(8);
private static final ArrayBlockingQueue<AvailabilityJob> availabilityJobQueue = new ArrayBlockingQueue<>(8);
public volatile Instant dnsLastSync = Instant.now();
public volatile Instant availabilityLastSync = Instant.now();
public volatile Integer nodeId = null;
public volatile boolean running = false;
@@ -95,15 +58,16 @@ public class PingJobScheduler {
running = true;
allThreads.add(Thread.ofPlatform().daemon().name("new-dns").start(this::fetchNewDnsRecords));
allThreads.add(Thread.ofPlatform().daemon().name("new-availability").start(this::fetchNewAvailabilityJobs));
allThreads.add(Thread.ofPlatform().daemon().name("update-availability").start(this::updateAvailabilityJobs));
allThreads.add(Thread.ofPlatform().daemon().name("update-dns").start(this::updateDnsJobs));
allThreads.add(Thread.ofPlatform().daemon().name("sync-dns").start(this::syncAvailabilityJobs));
allThreads.add(Thread.ofPlatform().daemon().name("sync-availability").start(this::syncDnsRecords));
for (int i = 0; i < 8; i++) {
int availabilityThreads = Integer.getInteger("ping.availabilityThreads", 8);
int pingThreads = Integer.getInteger("ping.dnsThreads", 2);
for (int i = 0; i < availabilityThreads; i++) {
allThreads.add(Thread.ofPlatform().daemon().name("availability-job-consumer-" + i).start(this::availabilityJobConsumer));
}
for (int i = 0; i < 2; i++) {
for (int i = 0; i < pingThreads; i++) {
allThreads.add(Thread.ofPlatform().daemon().name("dns-job-consumer-" + i).start(this::dnsJobConsumer));
}
}
@@ -122,19 +86,33 @@ public class PingJobScheduler {
}
public void pause(int nodeId) {
logger.info("Pausing PingJobScheduler for nodeId: {}", nodeId);
if (this.nodeId != null && this.nodeId != nodeId) {
logger.warn("Attempted to pause PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId);
return;
}
this.nodeId = null;
availabilityUpdateSchedule.clear();
dnsUpdateSchedule.clear();
logger.info("PingJobScheduler paused");
}
public synchronized void resume(int nodeId) {
logger.info("Resuming PingJobScheduler for nodeId: {}", nodeId);
if (this.nodeId != null) {
logger.warn("Attempted to resume PingJobScheduler with mismatched nodeId: expected null, got {}", this.nodeId, nodeId);
logger.warn("Attempted to resume PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId);
return;
}
availabilityUpdateSchedule.replaceQueue(pingDao.getDomainUpdateSchedule(nodeId));
dnsUpdateSchedule.replaceQueue(pingDao.getDnsUpdateSchedule(nodeId));
dnsLastSync = Instant.now();
availabilityLastSync = Instant.now();
// Flag that we are running again
this.nodeId = nodeId;
notifyAll();
@@ -150,32 +128,44 @@ public class PingJobScheduler {
private void availabilityJobConsumer() {
while (running) {
try {
AvailabilityJob job = availabilityJobQueue.poll(1, TimeUnit.SECONDS);
if (job == null) {
continue; // No job available, continue to the next iteration
Integer nid = nodeId;
if (nid == null) {
waitForResume();
continue;
}
long nextId = availabilityUpdateSchedule.next();
var data = pingDao.getHistoricalAvailabilityData(nextId);
if (data == null) {
logger.warn("No availability data found for ID: {}", nextId);
continue; // No data to process, skip this iteration
}
try {
switch (job) {
case AvailabilityJob.Availability(DomainReference reference) -> {
logger.info("Availability check: {}", reference.domainName());
pingDao.write(httpPingService.pingDomain(reference, null, null));
}
case AvailabilityJob.AvailabilityRefresh(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security) -> {
logger.info("Availability check with reference: {}", domain);
pingDao.write(httpPingService.pingDomain(
new DomainReference(availability.domainId(), availability.nodeId(), domain),
availability,
security));
List<WritableModel> objects = switch (data) {
case HistoricalAvailabilityData.JustDomainReference(DomainReference reference)
-> httpPingService.pingDomain(reference, null, null);
case HistoricalAvailabilityData.JustAvailability(String domain, DomainAvailabilityRecord record)
-> httpPingService.pingDomain(
new DomainReference(record.domainId(), record.nodeId(), domain), record, null);
case HistoricalAvailabilityData.AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security)
-> httpPingService.pingDomain(
new DomainReference(availability.domainId(), availability.nodeId(), domain), availability, security);
};
pingDao.write(objects);
// Re-schedule the next update time for the domain
for (var object : objects) {
var ts = object.nextUpdateTime();
if (ts != null) {
availabilityUpdateSchedule.add(nextId, ts);
break;
}
}
}
catch (Exception e) {
logger.error("Error processing availability job for domain: " + job.reference(), e);
}
finally {
// Remove the domain from the processing map
processingDomainsAvailability.remove(job.reference());
logger.error("Error processing availability job for domain: " + data.domain(), e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -190,30 +180,41 @@ public class PingJobScheduler {
private void dnsJobConsumer() {
while (running) {
try {
DnsJob job = dnsJobQueue.poll(1, TimeUnit.SECONDS);
if (job == null) {
continue; // No job available, continue to the next iteration
Integer nid = nodeId;
if (nid == null) {
waitForResume();
continue;
}
RootDomainReference ref = dnsUpdateSchedule.next();
try {
switch (job) {
case DnsJob.DnsFetch(String rootDomain) -> {
logger.info("Fetching DNS records for root domain: {}", rootDomain);
pingDao.write(dnsPingService.pingDomain(rootDomain, null));
List<WritableModel> objects = switch(ref) {
case RootDomainReference.ById(long id) -> {
var oldRecord = Objects.requireNonNull(pingDao.getDomainDnsRecord(id));
yield dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord);
}
case DnsJob.DnsRefresh(DomainDnsRecord oldRecord) -> {
logger.info("Refreshing DNS records for domain: {}", oldRecord.rootDomainName());
pingDao.write(dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord));
case RootDomainReference.ByName(String name) -> {
@Nullable var oldRecord = pingDao.getDomainDnsRecord(name);
yield dnsPingService.pingDomain(name, oldRecord);
}
};
pingDao.write(objects);
// Re-schedule the next update time for the domain
for (var object : objects) {
var ts = object.nextUpdateTime();
if (ts != null) {
dnsUpdateSchedule.add(ref, ts);
break;
}
}
}
catch (Exception e) {
logger.error("Error processing DNS job for domain: " + job.reference(), e);
}
finally {
// Remove the domain from the processing map
processingDomainsDns.remove(job.reference());
logger.error("Error processing DNS job for domain: " + ref, e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("DNS job consumer interrupted", e);
@@ -224,38 +225,27 @@ public class PingJobScheduler {
}
}
private void fetchNewAvailabilityJobs() {
private void syncAvailabilityJobs() {
try {
while (running) {
// If we are suspended, wait for resume
Integer nid = nodeId;
if (nid == null) {
waitForResume();
continue; // re-fetch the records after resuming
continue;
}
List<DomainReference> domains = pingDao.getOrphanedDomains(nid);
for (DomainReference domain : domains) {
if (nodeId == null) {
waitForResume();
break; // re-fetch the records after resuming
}
try {
availabilityJobQueue.put(new AvailabilityJob.Availability(domain));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Failed to add new ping job for domain: " + domain, e);
}
// Check if we need to refresh the availability data
Instant nextRefresh = availabilityLastSync.plus(Duration.ofHours(24));
if (Instant.now().isBefore(nextRefresh)) {
Duration remaining = Duration.between(Instant.now(), nextRefresh);
TimeUnit.MINUTES.sleep(Math.max(1, remaining.toMinutes()));
continue;
}
// This is an incredibly expensive operation, so we only do it once a day
try {
TimeUnit.HOURS.sleep(24);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
availabilityUpdateSchedule.replaceQueue(pingDao.getDomainUpdateSchedule(nid));
availabilityLastSync = Instant.now();
}
}
catch (Exception e) {
@@ -263,32 +253,26 @@ public class PingJobScheduler {
}
}
private void fetchNewDnsRecords() {
private void syncDnsRecords() {
try {
while (running) {
Integer nid = nodeId;
if (nid == null) {
waitForResume();
continue; // re-fetch the records after resuming
}
List<String> rootDomains = pingDao.getOrphanedRootDomains(nid);
for (String rootDomain : rootDomains) {
if (nodeId == null) {
waitForResume();
break; // re-fetch the records after resuming
}
try {
dnsJobQueue.put(new DnsJob.DnsFetch(rootDomain));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Failed to add new DNS job for root domain: " + rootDomain, e);
}
// Check if we need to refresh the availability data
Instant nextRefresh = dnsLastSync.plus(Duration.ofHours(24));
if (Instant.now().isBefore(nextRefresh)) {
Duration remaining = Duration.between(Instant.now(), nextRefresh);
TimeUnit.MINUTES.sleep(Math.max(1, remaining.toMinutes()));
continue;
}
// This is an incredibly expensive operation, so we only do it once a day
TimeUnit.HOURS.sleep(24);
dnsUpdateSchedule.replaceQueue(pingDao.getDnsUpdateSchedule(nid));
dnsLastSync = Instant.now();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -296,68 +280,5 @@ public class PingJobScheduler {
}
}
private void updateAvailabilityJobs() {
while (running) {
try {
Integer nid = nodeId;
if (nid == null) {
waitForResume();
continue; // re-fetch the records after resuming
}
var statuses = pingDao.getNextDomainPingStatuses(100, nid);
if (nodeId == null) {
waitForResume();
break; // re-fetch the records after resuming
}
for (var status : statuses) {
var job = switch (status) {
case HistoricalAvailabilityData.JustAvailability(String domain, DomainAvailabilityRecord record)
-> new AvailabilityJob.AvailabilityRefresh(domain, record, null);
case HistoricalAvailabilityData.AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security)
-> new AvailabilityJob.AvailabilityRefresh(domain, availability, security);
};
if (processingDomainsAvailability.putIfAbsent(job.reference(), true) == null) {
availabilityJobQueue.put(job);
}
}
}
catch (Exception e) {
logger.error("Error fetching next domain ping statuses", e);
}
}
}
private void updateDnsJobs() {
while (running) {
try {
Integer nid = nodeId;
if (nid == null) {
waitForResume();
continue; // re-fetch the records after resuming
}
var dnsRecords = pingDao.getNextDnsDomainRecords(1000, nid);
for (var record : dnsRecords) {
if (nodeId == null) {
waitForResume();
break; // re-fetch the records after resuming
}
if (processingDomainsDns.putIfAbsent(record.rootDomainName(), true) == null) {
dnsJobQueue.put(new DnsJob.DnsRefresh(record));
}
}
}
catch (Exception e) {
logger.error("Error fetching next domain DNS records", e);
}
}
}
}

View File

@@ -10,7 +10,6 @@ import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqapi.ping.PingRequest;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.nodecfg.model.NodeConfiguration;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass;
@@ -21,7 +20,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.Security;
import java.util.List;
public class PingMain extends ProcessMainClass {
private static final Logger log = LoggerFactory.getLogger(PingMain.class);
@@ -56,56 +54,6 @@ public class PingMain extends ProcessMainClass {
// Start the ping job scheduler
pingJobScheduler.start(true);
// Watch the crawler process to suspend/resume the ping job scheduler
try {
serviceRegistry.watchProcess("crawler", node, (running) -> {
if (running) {
log.info("Crawler process is running, suspending ping job scheduler.");
pingJobScheduler.pause(node);
} else {
log.warn("Crawler process is not running, resuming ping job scheduler.");
pingJobScheduler.resume(node);
}
});
}
catch (Exception e) {
throw new RuntimeException("Failed to watch crawler process", e);
}
log.info("PingMain started successfully.");
}
public void runSecondary() {
log.info("Starting PingMain...");
List<Integer> crawlerNodes = nodeConfigurationService.getAll()
.stream()
.filter(node -> !node.disabled())
.filter(node -> node.profile().permitBatchCrawl())
.map(NodeConfiguration::node)
.toList()
;
// Start the ping job scheduler
pingJobScheduler.start(true);
// Watch the crawler process to suspend/resume the ping job scheduler
try {
serviceRegistry.watchProcessAnyNode("crawler", crawlerNodes, (running, n) -> {
if (running) {
log.info("Crawler process is running on node {} taking over ", n);
pingJobScheduler.resume(n);
} else {
log.warn("Crawler process stopped, resuming ping job scheduler.");
pingJobScheduler.pause(n);
}
});
}
catch (Exception e) {
throw new RuntimeException("Failed to watch crawler process", e);
}
log.info("PingMain started successfully.");
}
@@ -144,19 +92,8 @@ public class PingMain extends ProcessMainClass {
var instructions = main.fetchInstructions(PingRequest.class);
try {
switch (instructions.value().runClass) {
case "primary":
log.info("Running as primary node");
main.runPrimary();
break;
case "secondary":
log.info("Running as secondary node");
main.runSecondary();
break;
default:
throw new IllegalArgumentException("Invalid runClass: " + instructions.value().runClass);
}
for(;;);
main.runPrimary();
for(;;) main.wait(); // Wait on the object lock to avoid busy-looping
}
catch (Throwable ex) {
logger.error("Error running ping process", ex);

View File

@@ -0,0 +1,57 @@
package nu.marginalia.ping;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Comparator;
import java.util.PriorityQueue;
/** In-memory schedule for updates, allowing jobs to be added and processed in order of their scheduled time.
* This is not a particularly high-performance implementation, but exists to take contention off the database's
* timestamp index.
* */
public class UpdateSchedule<T, T2> {
private final PriorityQueue<UpdateJob<T, T2>> updateQueue;
public record UpdateJob<T, T2>(T key, Instant updateTime) {}
public UpdateSchedule(int initialCapacity) {
updateQueue = new PriorityQueue<>(initialCapacity, Comparator.comparing(UpdateJob::updateTime));
}
public synchronized void add(T key, Instant updateTime) {
updateQueue.add(new UpdateJob<>(key, updateTime));
notifyAll();
}
public synchronized T next() throws InterruptedException {
while (true) {
if (updateQueue.isEmpty()) {
wait(); // Wait for a new job to be added
continue;
}
UpdateJob<T, T2> job = updateQueue.peek();
Instant now = Instant.now();
if (job.updateTime.isAfter(now)) {
Duration toWait = Duration.between(now, job.updateTime);
wait(Math.max(1, toWait.toMillis()));
}
else {
updateQueue.poll(); // Remove the job from the queue since it's due
return job.key();
}
}
}
public synchronized void clear() {
updateQueue.clear();
notifyAll();
}
public synchronized void replaceQueue(Collection<UpdateJob<T,T2>> newJobs) {
updateQueue.clear();
updateQueue.addAll(newJobs);
notifyAll();
}
}

View File

@@ -3,6 +3,8 @@ package nu.marginalia.ping.fetcher;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import nu.marginalia.ping.model.SingleDnsRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xbill.DNS.ExtendedResolver;
import org.xbill.DNS.Lookup;
import org.xbill.DNS.TextParseException;
@@ -17,6 +19,7 @@ import java.util.concurrent.*;
public class PingDnsFetcher {
private final ThreadLocal<ExtendedResolver> resolver;
private static final ExecutorService digExecutor = Executors.newFixedThreadPool(100);
private static final Logger logger = LoggerFactory.getLogger(PingDnsFetcher.class);
private static final int[] RECORD_TYPES = {
Type.A, Type.AAAA, Type.NS, Type.MX, Type.TXT,
@@ -25,8 +28,7 @@ public class PingDnsFetcher {
@Inject
public PingDnsFetcher(@Named("ping.nameservers")
List<String> nameservers) throws UnknownHostException
{
List<String> nameservers) {
resolver = ThreadLocal.withInitial(() -> createResolver(nameservers));
}
@@ -81,13 +83,12 @@ public class PingDnsFetcher {
try {
results.addAll(future.get(1, TimeUnit.MINUTES));
} catch (Exception e) {
e.printStackTrace();
System.err.println("Error fetching DNS records: " + e.getMessage());
logger.error("Error fetching DNS records", e);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("DNS query interrupted: " + e.getMessage());
logger.error("DNS query interrupted", e);
}
return results;
}

View File

@@ -83,7 +83,7 @@ public class PingHttpFetcher {
} catch (SocketTimeoutException ex) {
return new TimeoutResponse(ex.getMessage());
} catch (IOException e) {
return new ConnectionError(e.getMessage());
return new ConnectionError(e.getClass().getSimpleName());
}
}

View File

@@ -44,14 +44,14 @@ public class HttpClientProvider implements Provider<HttpClient> {
private static CloseableHttpClient createClient() throws NoSuchAlgorithmException {
final ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setSocketTimeout(30, TimeUnit.SECONDS)
.setConnectTimeout(30, TimeUnit.SECONDS)
.setSocketTimeout(15, TimeUnit.SECONDS)
.setConnectTimeout(15, TimeUnit.SECONDS)
.setValidateAfterInactivity(TimeValue.ofSeconds(5))
.build();
connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
.setMaxConnPerRoute(2)
.setMaxConnTotal(5000)
.setMaxConnTotal(50)
.setDefaultConnectionConfig(connectionConfig)
.setTlsSocketStrategy(
new DefaultClientTlsStrategy(SSLContext.getDefault(), NoopHostnameVerifier.INSTANCE))

View File

@@ -70,6 +70,11 @@ implements WritableModel
return millis == null ? null : Duration.ofMillis(millis);
}
@Override
public Instant nextUpdateTime() {
return nextScheduledUpdate;
}
@Override
public void write(Connection connection) throws SQLException {
try (var ps = connection.prepareStatement(

View File

@@ -60,6 +60,11 @@ public record DomainDnsRecord(
return new Builder();
}
@Override
public Instant nextUpdateTime() {
return tsNextScheduledUpdate;
}
@Override
public void write(Connection connection) throws SQLException {

View File

@@ -16,6 +16,8 @@ public record DomainSecurityEvent(
boolean certificateProfileChanged,
boolean certificateSanChanged,
boolean certificatePublicKeyChanged,
boolean certificateSerialNumberChanged,
boolean certificateIssuerChanged,
Duration oldCertificateTimeToExpiry,
boolean securityHeadersChanged,
boolean ipChanged,
@@ -41,8 +43,10 @@ public record DomainSecurityEvent(
change_software,
old_cert_time_to_expiry,
security_signature_before,
security_signature_after
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
security_signature_after,
change_certificate_serial_number,
change_certificate_issuer
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
"""))
{
@@ -75,6 +79,9 @@ public record DomainSecurityEvent(
ps.setBytes(14, securitySignatureAfter().compressed());
}
ps.setBoolean(15, certificateSerialNumberChanged());
ps.setBoolean(16, certificateIssuerChanged());
ps.executeUpdate();
}
}

View File

@@ -203,7 +203,6 @@ public record DomainSecurityRecord(
if (sslCertFingerprintSha256() == null) {
ps.setNull(12, java.sql.Types.BINARY);
} else {
System.out.println(sslCertFingerprintSha256().length);
ps.setBytes(12, sslCertFingerprintSha256());
}
if (sslCertSan() == null) {

View File

@@ -1,6 +1,13 @@
package nu.marginalia.ping.model;
public sealed interface HistoricalAvailabilityData {
public String domain();
record JustDomainReference(DomainReference domainReference) implements HistoricalAvailabilityData {
@Override
public String domain() {
return domainReference.domainName();
}
}
record JustAvailability(String domain, DomainAvailabilityRecord record) implements HistoricalAvailabilityData {}
record AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availabilityRecord, DomainSecurityRecord securityRecord) implements HistoricalAvailabilityData {}
}

View File

@@ -0,0 +1,6 @@
package nu.marginalia.ping.model;
public sealed interface RootDomainReference {
record ById(long id) implements RootDomainReference { }
record ByName(String name) implements RootDomainReference { }
}

View File

@@ -1,8 +1,14 @@
package nu.marginalia.ping.model;
import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.Instant;
public interface WritableModel {
void write(Connection connection) throws SQLException;
@Nullable
default Instant nextUpdateTime() {
return null;
}
}

View File

@@ -15,6 +15,8 @@ public record SecurityInformationChange(
boolean isCertificateProfileChanged,
boolean isCertificateSanChanged,
boolean isCertificatePublicKeyChanged,
boolean isCertificateSerialNumberChanged,
boolean isCertificateIssuerChanged,
Duration oldCertificateTimeToExpiry,
boolean isSecurityHeadersChanged,
boolean isIpAddressChanged,
@@ -30,8 +32,10 @@ public record SecurityInformationChange(
boolean certificateFingerprintChanged = 0 != Arrays.compare(before.sslCertFingerprintSha256(), after.sslCertFingerprintSha256());
boolean certificateProfileChanged = before.certificateProfileHash() != after.certificateProfileHash();
boolean certificateSerialNumberChanged = !Objects.equals(before.sslCertSerialNumber(), after.sslCertSerialNumber());
boolean certificatePublicKeyChanged = 0 != Arrays.compare(before.sslCertPublicKeyHash(), after.sslCertPublicKeyHash());
boolean certificateSanChanged = !Objects.equals(before.sslCertSan(), after.sslCertSan());
boolean certificateIssuerChanged = !Objects.equals(before.sslCertIssuer(), after.sslCertIssuer());
Duration oldCertificateTimeToExpiry = before.sslCertNotAfter() == null ? null : Duration.between(
Instant.now(),
@@ -50,6 +54,7 @@ public record SecurityInformationChange(
boolean isChanged = asnChanged
|| certificateFingerprintChanged
|| securityHeadersChanged
|| certificateProfileChanged
|| softwareChanged;
return new SecurityInformationChange(
@@ -59,6 +64,8 @@ public record SecurityInformationChange(
certificateProfileChanged,
certificateSanChanged,
certificatePublicKeyChanged,
certificateSerialNumberChanged,
certificateIssuerChanged,
oldCertificateTimeToExpiry,
securityHeadersChanged,
ipChanged,

View File

@@ -48,7 +48,6 @@ public class DnsPingService {
switch (changes) {
case DnsRecordChange.None _ -> {}
case DnsRecordChange.Changed changed -> {
logger.info("DNS record for {} changed: {}", newRecord.dnsRootDomainId(), changed);
generatedRecords.add(DomainDnsEvent.builder()
.rootDomainId(newRecord.dnsRootDomainId())
.nodeId(newRecord.nodeAffinity())

View File

@@ -71,23 +71,40 @@ public class DomainAvailabilityInformationFactory {
@Nullable DomainAvailabilityRecord previousRecord,
HttpResponse rsp) {
Instant lastError = previousRecord != null ? previousRecord.tsLastAvailable() : null;
final boolean isAvailable;
final Instant now = Instant.now();
final Instant lastAvailable;
final Instant lastError;
final ErrorClassification errorClassification;
if (rsp.httpStatus() >= 400) {
isAvailable = false;
lastError = now;
lastAvailable = previousRecord != null ? previousRecord.tsLastAvailable() : null;
errorClassification = ErrorClassification.HTTP_SERVER_ERROR;
} else {
isAvailable = true;
lastAvailable = now;
lastError = previousRecord != null ? previousRecord.tsLastError() : null;
errorClassification = ErrorClassification.NONE;
}
return DomainAvailabilityRecord.builder()
.domainId(domainId)
.nodeId(nodeId)
.serverAvailable(true)
.serverAvailable(isAvailable)
.serverIp(address != null ? address.getAddress() : null)
.serverIpAsn(getAsn(address))
.httpSchema(HttpSchema.HTTP)
.httpStatus(rsp.httpStatus())
.errorClassification(errorClassification)
.httpResponseTime(rsp.httpResponseTime())
.httpEtag(rsp.headers().getFirst("ETag"))
.httpLastModified(rsp.headers().getFirst("Last-Modified"))
.tsLastPing(Instant.now())
.tsLastAvailable(Instant.now())
.tsLastPing(now)
.tsLastAvailable(lastAvailable)
.tsLastError(lastError)
.nextScheduledUpdate(Instant.now().plus(backoffStrategy.getOkInterval()))
.nextScheduledUpdate(now.plus(backoffStrategy.getOkInterval()))
.backoffFetchInterval(backoffStrategy.getOkInterval())
.build();
@@ -117,23 +134,44 @@ public class DomainAvailabilityInformationFactory {
updateTime = Instant.now().plus(backoffStrategy.getOkInterval());
}
Instant lastError = previousRecord != null ? previousRecord.tsLastAvailable() : null;
final boolean isAvailable;
final Instant now = Instant.now();
final Instant lastAvailable;
final Instant lastError;
final ErrorClassification errorClassification;
if (!validationResult.isValid()) {
isAvailable = false;
lastError = now;
lastAvailable = previousRecord != null ? previousRecord.tsLastAvailable() : null;
errorClassification = ErrorClassification.SSL_ERROR;
} else if (rsp.httpStatus() >= 400) {
isAvailable = false;
lastError = now;
lastAvailable = previousRecord != null ? previousRecord.tsLastAvailable() : null;
errorClassification = ErrorClassification.HTTP_SERVER_ERROR;
} else {
isAvailable = true;
lastAvailable = Instant.now();
lastError = previousRecord != null ? previousRecord.tsLastError() : null;
errorClassification = ErrorClassification.NONE;
}
return DomainAvailabilityRecord.builder()
.domainId(domainId)
.nodeId(nodeId)
.serverAvailable(validationResult.isValid())
.serverAvailable(isAvailable)
.serverIp(address != null ? address.getAddress() : null)
.serverIpAsn(getAsn(address))
.httpSchema(HttpSchema.HTTPS)
.httpStatus(rsp.httpStatus())
.errorClassification(!validationResult.isValid() ? ErrorClassification.SSL_ERROR : ErrorClassification.NONE)
.errorClassification(errorClassification)
.httpResponseTime(rsp.httpResponseTime()) // Placeholder, actual timing not implemented
.httpEtag(rsp.headers().getFirst("ETag"))
.httpLastModified(rsp.headers().getFirst("Last-Modified"))
.tsLastPing(Instant.now())
.tsLastPing(now)
.tsLastError(lastError)
.tsLastAvailable(Instant.now())
.tsLastAvailable(lastAvailable)
.nextScheduledUpdate(updateTime)
.backoffFetchInterval(backoffStrategy.getOkInterval())
.build();

View File

@@ -8,6 +8,7 @@ import nu.marginalia.ping.ssl.PKIXValidationResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateEncodingException;
@@ -21,13 +22,17 @@ public class DomainSecurityInformationFactory {
private static final Logger logger = LoggerFactory.getLogger(DomainSecurityInformationFactory.class);
// Vanilla HTTP (not HTTPS) response does not have SSL session information, so we return null
public DomainSecurityRecord createHttpSecurityInformation(HttpResponse httpResponse, int domainId, int nodeId) {
public DomainSecurityRecord createHttpSecurityInformation(HttpResponse httpResponse,
int domainId, int nodeId,
@Nullable Integer asn
) {
var headers = httpResponse.headers();
return DomainSecurityRecord.builder()
.domainId(domainId)
.nodeId(nodeId)
.asn(asn)
.httpSchema(HttpSchema.HTTP)
.httpVersion(httpResponse.version())
.headerServer(headers.getFirst("Server"))
@@ -47,7 +52,13 @@ public class DomainSecurityInformationFactory {
}
// HTTPS response
public DomainSecurityRecord createHttpsSecurityInformation(HttpsResponse httpResponse, PKIXValidationResult validationResult, int domainId, int nodeId) {
public DomainSecurityRecord createHttpsSecurityInformation(
HttpsResponse httpResponse,
PKIXValidationResult validationResult,
int domainId,
int nodeId,
@Nullable Integer asn
) {
var headers = httpResponse.headers();
@@ -86,6 +97,7 @@ public class DomainSecurityInformationFactory {
return DomainSecurityRecord.builder()
.domainId(domainId)
.nodeId(nodeId)
.asn(asn)
.httpSchema(HttpSchema.HTTPS)
.headerServer(headers.getFirst("Server"))
.headerCorsAllowOrigin(headers.getFirst("Access-Control-Allow-Origin"))

View File

@@ -18,6 +18,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.cert.X509Certificate;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -75,8 +76,8 @@ public class HttpPingService {
result = pingHttpFetcher.fetchUrl(url, Method.HEAD, null, null);
if (result instanceof HttpsResponse response && response.httpStatus() == 405) {
// If we get a 405, we try the GET method instead as not all servers support HEAD requests
if (result instanceof HttpsResponse response && shouldTryGET(response.httpStatus())) {
sleep(Duration.ofSeconds(2));
result = pingHttpFetcher.fetchUrl(url, Method.GET, null, null);
}
else if (result instanceof ConnectionError) {
@@ -84,8 +85,8 @@ public class HttpPingService {
if (!(result2 instanceof ConnectionError)) {
result = result2;
}
if (result instanceof HttpResponse response && response.httpStatus() == 405) {
// If we get a 405, we try the GET method instead as not all servers support HEAD requests
if (result instanceof HttpResponse response && shouldTryGET(response.httpStatus())) {
sleep(Duration.ofSeconds(2));
result = pingHttpFetcher.fetchUrl(alternateUrl, Method.GET, null, null);
}
}
@@ -116,7 +117,7 @@ public class HttpPingService {
domainReference.nodeId(),
oldPingStatus,
ErrorClassification.CONNECTION_ERROR,
null);
rsp.errorMessage());
newSecurityInformation = null;
}
case TimeoutResponse rsp -> {
@@ -148,7 +149,8 @@ public class HttpPingService {
newSecurityInformation = domainSecurityInformationFactory.createHttpSecurityInformation(
httpResponse,
domainReference.domainId(),
domainReference.nodeId()
domainReference.nodeId(),
newPingStatus.asn()
);
}
case HttpsResponse httpsResponse -> {
@@ -166,7 +168,8 @@ public class HttpPingService {
httpsResponse,
validationResult,
domainReference.domainId(),
domainReference.nodeId()
domainReference.nodeId(),
newPingStatus.asn()
);
}
}
@@ -190,6 +193,29 @@ public class HttpPingService {
return generatedRecords;
}
private boolean shouldTryGET(int statusCode) {
if (statusCode < 400) {
return false;
}
if (statusCode == 429) { // Too many requests, we should not retry with GET
return false;
}
// For all other status codes, we can try a GET request, as many severs do not
// cope with HEAD requests properly.
return statusCode < 600;
}
private void sleep(Duration duration) {
try {
Thread.sleep(duration.toMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore the interrupted status
logger.warn("Sleep interrupted", e);
}
}
private void comparePingStatuses(List<WritableModel> generatedRecords,
DomainAvailabilityRecord oldPingStatus,
DomainAvailabilityRecord newPingStatus) {
@@ -258,6 +284,8 @@ public class HttpPingService {
change.isCertificateProfileChanged(),
change.isCertificateSanChanged(),
change.isCertificatePublicKeyChanged(),
change.isCertificateSerialNumberChanged(),
change.isCertificateIssuerChanged(),
change.oldCertificateTimeToExpiry(),
change.isSecurityHeadersChanged(),
change.isIpAddressChanged(),

View File

@@ -318,6 +318,8 @@ class PingDaoTest {
true,
false,
true,
true,
false,
Duration.ofDays(30),
false,
false,
@@ -330,86 +332,6 @@ class PingDaoTest {
svc.write(event);
}
@Test
void getNextDomainPingStatuses() throws SQLException {
var svc = new PingDao(dataSource);
// Create a test domain availability record
var record = new DomainAvailabilityRecord(
1,
1,
true,
new byte[]{127, 0, 0, 1},
40,
0x0F00BA32L,
0x0F00BA34L,
HttpSchema.HTTP,
"etag123",
"Wed, 21 Oct 2023 07:28:00 GMT",
200,
"http://example.com/redirect",
Duration.ofMillis(150),
ErrorClassification.NONE,
"No error",
Instant.now().minus(30, ChronoUnit.SECONDS),
Instant.now().minus(3600, ChronoUnit.SECONDS),
Instant.now().minus(7200, ChronoUnit.SECONDS),
Instant.now().minus(3000, ChronoUnit.SECONDS),
2,
Duration.ofSeconds(60)
);
svc.write(record);
// Fetch the next domain ping statuses
var statuses = svc.getNextDomainPingStatuses(10, 1);
assertFalse(statuses.isEmpty());
assertEquals(1, statuses.size());
}
@Test
void getNextDnsDomainRecords() throws SQLException {
var svc = new PingDao(dataSource);
// Create a test DNS record
var dnsRecord = new DomainDnsRecord(null, "example.com", 2,
List.of("test"),
List.of("test2"),
"test3",
List.of("test4"),
List.of("test5"),
List.of("test6"),
List.of("test7"),
"test8",
Instant.now().minus(3600, ChronoUnit.SECONDS),
Instant.now().minus(3600, ChronoUnit.SECONDS),
4);
svc.write(dnsRecord);
var nextRecords = svc.getNextDnsDomainRecords(1, 2);
assertFalse(nextRecords.isEmpty());
assertEquals(1, nextRecords.size());
}
@Test
void getOrphanedDomains(){
var svc = new PingDao(dataSource);
var orphanedDomains = svc.getOrphanedDomains(1);
System.out.println(orphanedDomains);
assertTrue(orphanedDomains.contains(new DomainReference(1, 1, "www.marginalia.nu")));
assertFalse(orphanedDomains.isEmpty());
var orphanedRootDomains = svc.getOrphanedRootDomains(1);
System.out.println(orphanedRootDomains);
assertTrue(orphanedRootDomains.contains("marginalia.nu"));
}
@Test
void write() {
var dnsEvent = new DomainDnsEvent(

View File

@@ -1,9 +1,8 @@
package nu.marginalia.mqapi.ping;
public class PingRequest {
public final String runClass;
public PingRequest(String runClass) {
this.runClass = runClass;
public PingRequest() {
}
}