1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-05 21:22:39 +02:00

Compare commits

...

25 Commits

Author SHA1 Message Date
Viktor Lofgren
5020029c2d (ping) Fix startup sequence for new primary-only flow 2025-06-14 12:48:09 +02:00
Viktor Lofgren
ac44d0b093 (ping) Fix wait logic to use synchronized block 2025-06-14 12:38:16 +02:00
Viktor Lofgren
4b32b9b10e Update DomainAvailabilityRecord to use clamped integer for HTTP response time 2025-06-14 12:37:58 +02:00
Viktor Lofgren
9f041d6631 (ping) Drop the concept of primary and secondary ping instances
There was an idea of having the ping service duck over to a realtime partition when the partition is crawling, but this hasn't been working out well, so the concept will be retired and all nodes will run as primary.
2025-06-14 12:32:08 +02:00
Viktor Lofgren
13fb1efce4 (ping) Populate ASN field on DomainSecurityInformation 2025-06-13 15:45:43 +02:00
Viktor Lofgren
c1225165b7 (ping) Add a summary fields CHANGE_SERIAL_NUMBER and CHANGE_ISSUER to DOMAIN_SECURITY_EVENTS 2025-06-13 15:30:45 +02:00
Viktor Lofgren
67ad7a3bbc (ping) Enhance HTTP ping logic to retry GET requests for specific status codes and add sleep duration between retries 2025-06-13 12:59:56 +02:00
Viktor Lofgren
ed62ec8a35 (random) Sanitize random search results with DOMAIN_AVAILABILITY_INFORMATION join 2025-06-13 10:38:21 +02:00
Viktor Lofgren
42b24cfa34 (ping) Fix NPE in dnsJobConsumer 2025-06-12 14:22:09 +02:00
Viktor Lofgren
1ffaab2da6 (ping) Mute logging along the happy path now that things are working 2025-06-12 14:15:23 +02:00
Viktor Lofgren
5f93c7f767 (ping) Update PROC_PING_SPAWNER to use REALTIME from SIDELOAD 2025-06-12 14:04:09 +02:00
Viktor Lofgren
4001c68c82 (ping) Update SQL query to include NODE_AFFINITY in historical availability data retrieval 2025-06-12 13:58:50 +02:00
Viktor Lofgren
6b811489c5 (actor) Make ping spawner auto-spawn the process 2025-06-12 13:46:50 +02:00
Viktor Lofgren
e9d317c65d (ping) Parameterize thread counts for availability and DNS job consumers 2025-06-12 13:34:58 +02:00
Viktor Lofgren
16b05a4737 (ping) Reduce maximum total connections in HttpClientProvider to improve resource management 2025-06-12 13:04:55 +02:00
Viktor Lofgren
021cd73cbb (ping) Reduce db contention by moving job scheduling out of the database to RAM 2025-06-12 12:56:33 +02:00
Viktor Lofgren
4253bd53b5 (ping) Fix issue where errors were not correctly labeled in availability 2025-06-12 00:18:07 +02:00
Viktor Lofgren
14c87461a5 (ping) Fix issue where errors were not correctly labeled in availability 2025-06-12 00:04:39 +02:00
Viktor Lofgren
9afed0a18e (ping) Optimize parameters
Reduce socket and connection timeouts in HttpClient and adjust thread counts for job consumers
2025-06-11 16:21:45 +02:00
Viktor Lofgren
afad4deb94 (ping) Fix DB query to prioritize DNS information updates correctly
This also reduces CPU%
2025-06-11 14:58:28 +02:00
Viktor Lofgren
f071c947e4 (ping) Truncate data before inserting into db 2025-06-11 14:29:30 +02:00
Viktor Lofgren
79996c9348 (ping) Adjust thread counts based on observed processing times 2025-06-11 14:29:17 +02:00
Viktor Lofgren
db907ab06a (ping) Update availabilityJobQueue to use put method to block rather than blow up 2025-06-11 14:22:24 +02:00
Viktor Lofgren
c49cd9dd95 (ping) Truncate fields in the builder to give consistent comparison without blowing up the database inserts. 2025-06-11 14:20:54 +02:00
Viktor Lofgren
eec9df3b0a (ping) Truncate X-Frame-Options to 50 characters 2025-06-11 14:17:08 +02:00
26 changed files with 612 additions and 502 deletions

View File

@@ -0,0 +1,6 @@
-- Add additional summary columns to DOMAIN_SECURITY_EVENTS table
-- to make it easier to make sense of certificate changes
ALTER TABLE DOMAIN_SECURITY_EVENTS ADD COLUMN CHANGE_CERTIFICATE_SERIAL_NUMBER BOOLEAN NOT NULL DEFAULT FALSE;
ALTER TABLE DOMAIN_SECURITY_EVENTS ADD COLUMN CHANGE_CERTIFICATE_ISSUER BOOLEAN NOT NULL DEFAULT FALSE;
OPTIMIZE TABLE DOMAIN_SECURITY_EVENTS;

View File

@@ -12,7 +12,7 @@ public enum ExecutorActor {
RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
RECRAWL_SINGLE_DOMAIN(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
PROC_CRAWLER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
PROC_PING_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.SIDELOAD),
PROC_PING_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.REALTIME),
PROC_EXPORT_TASKS_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
ADJACENCY_CALCULATION(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
EXPORT_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),

View File

@@ -3,24 +3,176 @@ package nu.marginalia.actor.proc;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import nu.marginalia.actor.state.Terminal;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqapi.ping.PingRequest;
import nu.marginalia.process.ProcessService;
import nu.marginalia.service.module.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Singleton
public class PingMonitorActor extends AbstractProcessSpawnerActor {
public class PingMonitorActor extends RecordActorPrototype {
@Inject
public PingMonitorActor(Gson gson, ServiceConfiguration configuration, MqPersistence persistence, ProcessService processService) {
super(gson,
configuration,
persistence,
processService,
ProcessInboxNames.PING_INBOX,
ProcessService.ProcessId.PING);
private final MqPersistence persistence;
private final ProcessService processService;
private final Logger logger = LoggerFactory.getLogger(getClass());
public static final int MAX_ATTEMPTS = 3;
private final String inboxName;
private final ProcessService.ProcessId processId;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final int node;
private final Gson gson;
public record Initial() implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RETRY)
public record Monitor(int errorAttempts) implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RESTART)
public record Run(int attempts) implements ActorStep {}
@Terminal
public record Aborted() implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Initial i -> {
PingRequest request = new PingRequest();
persistence.sendNewMessage(inboxName, null, null,
"PingRequest",
gson.toJson(request),
null);
yield new Monitor(0);
}
case Monitor(int errorAttempts) -> {
for (;;) {
var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty() && !processService.isRunning(processId)) {
synchronized (processId) {
processId.wait(5000);
}
if (errorAttempts > 0) { // Reset the error counter if there is silence in the inbox
yield new Monitor(0);
}
// else continue
} else {
// Special: Associate this thread with the message so that we can get tracking
MqMessageHandlerRegistry.register(messages.getFirst().msgId());
yield new Run(0);
}
}
}
case Run(int attempts) -> {
try {
long startTime = System.currentTimeMillis();
var exec = new TaskExecution();
long endTime = System.currentTimeMillis();
if (exec.isError()) {
if (attempts < MAX_ATTEMPTS)
yield new Run(attempts + 1);
else
yield new Error();
}
else if (endTime - startTime < TimeUnit.SECONDS.toMillis(1)) {
// To avoid boot loops, we transition to error if the process
// didn't run for longer than 1 seconds. This might happen if
// the process crashes before it can reach the heartbeat and inbox
// stages of execution. In this case it would not report having acted
// on its message, and the process would be restarted forever without
// the attempts counter incrementing.
yield new Error("Process terminated within 1 seconds of starting");
}
}
catch (InterruptedException ex) {
// We get this exception when the process is cancelled by the user
processService.kill(processId);
setCurrentMessageToDead();
yield new Aborted();
}
yield new Monitor(attempts);
}
default -> new Error();
};
}
public String describe() {
return "Spawns a(n) " + processId + " process and monitors its inbox for messages";
}
@Inject
public PingMonitorActor(Gson gson,
ServiceConfiguration configuration,
MqPersistence persistence,
ProcessService processService) throws SQLException {
super(gson);
this.gson = gson;
this.node = configuration.node();
this.persistence = persistence;
this.processService = processService;
this.inboxName = ProcessInboxNames.PING_INBOX + ":" + node;
this.processId = ProcessService.ProcessId.PING;
}
/** Sets the message to dead in the database to avoid
* the service respawning on the same task when we
* re-enable this actor */
private void setCurrentMessageToDead() {
try {
var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty()) // Possibly a race condition where the task is already finished
return;
var theMessage = messages.iterator().next();
persistence.updateMessageState(theMessage.msgId(), MqMessageState.DEAD);
}
catch (SQLException ex) {
logger.error("Tried but failed to set the message for " + processId + " to dead", ex);
}
}
/** Encapsulates the execution of the process in a separate thread so that
* we can interrupt the thread if the process is cancelled */
private class TaskExecution {
private final AtomicBoolean error = new AtomicBoolean(false);
public TaskExecution() throws ExecutionException, InterruptedException {
// Run this call in a separate thread so that this thread can be interrupted waiting for it
executorService.submit(() -> {
try {
processService.trigger(processId);
} catch (Exception e) {
logger.warn("Error in triggering process", e);
error.set(true);
}
}).get(); // Wait for the process to start
}
public boolean isError() {
return error.get();
}
}
}

View File

@@ -27,10 +27,12 @@ public class DbBrowseDomainsRandom {
public List<BrowseResult> getRandomDomains(int count, DomainBlacklist blacklist, int set) {
final String q = """
SELECT DOMAIN_ID, DOMAIN_NAME, INDEXED
SELECT EC_RANDOM_DOMAINS.DOMAIN_ID, DOMAIN_NAME, INDEXED
FROM EC_RANDOM_DOMAINS
INNER JOIN EC_DOMAIN ON EC_DOMAIN.ID=DOMAIN_ID
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION DAI ON DAI.DOMAIN_ID=EC_RANDOM_DOMAINS.DOMAIN_ID
WHERE STATE<2
AND SERVER_AVAILABLE
AND DOMAIN_SET=?
AND DOMAIN_ALIAS IS NULL
ORDER BY RAND()

View File

@@ -47,6 +47,7 @@ dependencies {
implementation libs.bundles.curator
implementation libs.bundles.mariadb
implementation libs.bundles.httpcomponents
implementation libs.commons.lang3
implementation 'org.bouncycastle:bcprov-jdk18on:1.80'
implementation 'org.bouncycastle:bcpkix-jdk18on:1.80'

View File

@@ -15,6 +15,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
@Singleton
public class PingDao {
@@ -76,32 +77,6 @@ public class PingDao {
}
}
public List<DomainReference> getNewDomains(int nodeId, int cnt) throws SQLException {
List<DomainReference> domains = new ArrayList<>();
try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement("""
SELECT domain_id, domain_name
FROM EC_DOMAIN
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION
ON EC_DOMAIN.domain_id = DOMAIN_AVAILABILITY_INFORMATION.domain_id
WHERE DOMAIN_AVAILABILITY_INFORMATION.server_available IS NULL
AND EC_DOMAIN.NODE_ID = ?
LIMIT ?
"""))
{
ps.setInt(1, nodeId);
ps.setInt(2, cnt);
ResultSet rs = ps.executeQuery();
while (rs.next()) {
domains.add(new DomainReference(rs.getInt("domain_id"), nodeId, rs.getString("domain_name").toLowerCase()));
}
}
return domains;
}
public DomainAvailabilityRecord getDomainPingStatus(int domainId) throws SQLException {
try (var conn = dataSource.getConnection();
@@ -132,7 +107,7 @@ public class PingDao {
}
}
public DomainDnsRecord getDomainDnsRecord(int dnsRootDomainId) throws SQLException {
public DomainDnsRecord getDomainDnsRecord(long dnsRootDomainId) throws SQLException {
try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement("SELECT * FROM DOMAIN_DNS_INFORMATION WHERE DNS_ROOT_DOMAIN_ID = ?")) {
@@ -160,111 +135,123 @@ public class PingDao {
}
}
public List<HistoricalAvailabilityData> getNextDomainPingStatuses(int count, int nodeId) throws SQLException {
List<HistoricalAvailabilityData> domainAvailabilityRecords = new ArrayList<>(count);
public HistoricalAvailabilityData getHistoricalAvailabilityData(long domainId) throws SQLException {
var query = """
SELECT DOMAIN_AVAILABILITY_INFORMATION.*, DOMAIN_SECURITY_INFORMATION.*, EC_DOMAIN.DOMAIN_NAME FROM DOMAIN_AVAILABILITY_INFORMATION
LEFT JOIN DOMAIN_SECURITY_INFORMATION
ON DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID = DOMAIN_SECURITY_INFORMATION.DOMAIN_ID
INNER JOIN EC_DOMAIN ON EC_DOMAIN.ID = DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID
WHERE NEXT_SCHEDULED_UPDATE <= ? AND DOMAIN_AVAILABILITY_INFORMATION.NODE_ID = ?
ORDER BY NEXT_SCHEDULED_UPDATE ASC
LIMIT ?
SELECT EC_DOMAIN.ID, EC_DOMAIN.DOMAIN_NAME, EC_DOMAIN.NODE_AFFINITY, DOMAIN_AVAILABILITY_INFORMATION.*, DOMAIN_SECURITY_INFORMATION.*
FROM EC_DOMAIN
LEFT JOIN DOMAIN_SECURITY_INFORMATION ON DOMAIN_SECURITY_INFORMATION.DOMAIN_ID = EC_DOMAIN.ID
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION ON DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID = EC_DOMAIN.ID
WHERE EC_DOMAIN.ID = ?
""";
try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement(query)) {
// Use Java time since this is how we generate the timestamps in the ping process
// to avoid timezone weirdness.
ps.setTimestamp(1, java.sql.Timestamp.from(Instant.now()));
ps.setInt(2, nodeId);
ps.setInt(3, count);
ps.setLong(1, domainId);
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String domainName = rs.getString("EC_DOMAIN.DOMAIN_NAME");
var domainAvailabilityRecord = new DomainAvailabilityRecord(rs);
if (rs.getObject("DOMAIN_SECURITY_INFORMATION.DOMAIN_ID", Integer.class) != null) {
var securityRecord = new DomainSecurityRecord(rs);
domainAvailabilityRecords.add(
new HistoricalAvailabilityData.AvailabilityAndSecurity(domainName, domainAvailabilityRecord, securityRecord)
);
} else {
domainAvailabilityRecords.add(new HistoricalAvailabilityData.JustAvailability(domainName, domainAvailabilityRecord));
DomainAvailabilityRecord dar;
DomainSecurityRecord dsr;
if (rs.getObject("DOMAIN_SECURITY_INFORMATION.DOMAIN_ID", Integer.class) != null)
dsr = new DomainSecurityRecord(rs);
else
dsr = null;
if (rs.getObject("DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID", Integer.class) != null)
dar = new DomainAvailabilityRecord(rs);
else
dar = null;
if (dar == null) {
return new HistoricalAvailabilityData.JustDomainReference(new DomainReference(
rs.getInt("EC_DOMAIN.ID"),
rs.getInt("EC_DOMAIN.NODE_AFFINITY"),
domainName.toLowerCase()
));
}
else {
if (dsr != null) {
return new HistoricalAvailabilityData.AvailabilityAndSecurity(domainName, dar, dsr);
} else {
return new HistoricalAvailabilityData.JustAvailability(domainName, dar);
}
}
}
}
return domainAvailabilityRecords;
return null;
}
public List<DomainDnsRecord> getNextDnsDomainRecords(int count, int nodeId) throws SQLException {
List<DomainDnsRecord> domainDnsRecords = new ArrayList<>(count);
public List<UpdateSchedule.UpdateJob<Long, HistoricalAvailabilityData>> getDomainUpdateSchedule(int nodeId) {
List<UpdateSchedule.UpdateJob<Long, HistoricalAvailabilityData>> updateJobs = new ArrayList<>();
var query = """
SELECT * FROM DOMAIN_DNS_INFORMATION
WHERE TS_NEXT_DNS_CHECK <= ? AND NODE_AFFINITY = ?
ORDER BY DNS_CHECK_PRIORITY ASC, TS_NEXT_DNS_CHECK ASC
LIMIT ?
""";
try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement(query)) {
ps.setTimestamp(1, java.sql.Timestamp.from(Instant.now()));
ps.setInt(2, nodeId);
ps.setInt(3, count);
var ps = conn.prepareStatement("""
SELECT ID, NEXT_SCHEDULED_UPDATE
FROM EC_DOMAIN
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION
ON EC_DOMAIN.ID = DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID
WHERE NODE_AFFINITY = ?
""")) {
ps.setFetchSize(10_000);
ps.setInt(1, nodeId);
ResultSet rs = ps.executeQuery();
while (rs.next()) {
domainDnsRecords.add(new DomainDnsRecord(rs));
long domainId = rs.getLong("ID");
var ts = rs.getTimestamp("NEXT_SCHEDULED_UPDATE");
Instant nextUpdate = ts == null ? Instant.now() : ts.toInstant();
updateJobs.add(new UpdateSchedule.UpdateJob<>(domainId, nextUpdate));
}
} catch (SQLException e) {
throw new RuntimeException("Failed to retrieve domain update schedule", e);
}
return domainDnsRecords;
logger.info("Found {} availability update jobs for node {}", updateJobs.size(), nodeId);
return updateJobs;
}
public List<DomainReference> getOrphanedDomains(int nodeId) {
List<DomainReference> orphanedDomains = new ArrayList<>();
public List<UpdateSchedule.UpdateJob<RootDomainReference, RootDomainReference>> getDnsUpdateSchedule(int nodeId) {
List<UpdateSchedule.UpdateJob<RootDomainReference, RootDomainReference>> updateJobs = new ArrayList<>();
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT e.DOMAIN_NAME, e.ID
FROM EC_DOMAIN e
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION d ON e.ID = d.DOMAIN_ID
WHERE d.DOMAIN_ID IS NULL AND e.NODE_AFFINITY = ?;
var ps = conn.prepareStatement("""
SELECT DISTINCT(DOMAIN_TOP),DOMAIN_DNS_INFORMATION.* FROM EC_DOMAIN
LEFT JOIN DOMAIN_DNS_INFORMATION ON ROOT_DOMAIN_NAME = DOMAIN_TOP
WHERE EC_DOMAIN.NODE_AFFINITY = ?
""")) {
stmt.setInt(1, nodeId);
stmt.setFetchSize(10_000);
ResultSet rs = stmt.executeQuery();
ps.setFetchSize(10_000);
ps.setInt(1, nodeId);
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String domainName = rs.getString("DOMAIN_NAME");
int domainId = rs.getInt("ID");
Long dnsRootDomainId = rs.getObject("DOMAIN_DNS_INFORMATION.DNS_ROOT_DOMAIN_ID", Long.class);
String rootDomainName = rs.getString("DOMAIN_TOP");
orphanedDomains.add(new DomainReference(domainId, nodeId, domainName));
if (dnsRootDomainId == null) {
updateJobs.add(
new UpdateSchedule.UpdateJob<>(
new RootDomainReference.ByName(rootDomainName),
Instant.now())
);
}
else {
var record = new DomainDnsRecord(rs);
updateJobs.add(new UpdateSchedule.UpdateJob<>(
new RootDomainReference.ById(dnsRootDomainId),
Objects.requireNonNullElseGet(record.tsNextScheduledUpdate(), Instant::now))
);
}
}
}
catch (SQLException e) {
throw new RuntimeException("Failed to retrieve orphaned domains", e);
} catch (SQLException e) {
throw new RuntimeException("Failed to retrieve DNS update schedule", e);
}
return orphanedDomains;
}
logger.info("Found {} dns update jobs for node {}", updateJobs.size(), nodeId);
public List<String> getOrphanedRootDomains(int nodeId) {
List<String> orphanedDomains = new ArrayList<>();
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT DISTINCT(DOMAIN_TOP)
FROM EC_DOMAIN e
LEFT JOIN DOMAIN_DNS_INFORMATION d ON e.DOMAIN_TOP = d.ROOT_DOMAIN_NAME
WHERE d.ROOT_DOMAIN_NAME IS NULL AND e.NODE_AFFINITY = ?;
""")) {
stmt.setInt(1, nodeId);
stmt.setFetchSize(10_000);
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
String domainName = rs.getString("DOMAIN_TOP");
orphanedDomains.add(domainName.toLowerCase());
}
}
catch (SQLException e) {
throw new RuntimeException("Failed to retrieve orphaned domains", e);
}
return orphanedDomains;
return updateJobs;
}
}

View File

@@ -4,15 +4,15 @@ import com.google.inject.Inject;
import nu.marginalia.ping.model.*;
import nu.marginalia.ping.svc.DnsPingService;
import nu.marginalia.ping.svc.HttpPingService;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/** PingJobScheduler is responsible for scheduling and processing ping jobs
@@ -27,50 +27,13 @@ public class PingJobScheduler {
private static final Logger logger = LoggerFactory.getLogger(PingJobScheduler.class);
sealed interface DnsJob {
Object reference();
private static final UpdateSchedule<RootDomainReference, RootDomainReference> dnsUpdateSchedule
= new UpdateSchedule<>(250_000);
private static final UpdateSchedule<Long, HistoricalAvailabilityData> availabilityUpdateSchedule
= new UpdateSchedule<>(250_000);
record DnsFetch(String rootDomain) implements DnsJob {
@Override
public Object reference() {
return rootDomain;
}
}
record DnsRefresh(DomainDnsRecord oldRecord) implements DnsJob {
@Override
public Object reference() {
return oldRecord.rootDomainName();
}
}
}
sealed interface AvailabilityJob {
Object reference();
record Availability(DomainReference domainReference) implements AvailabilityJob {
@Override
public Object reference() {
return domainReference.domainName();
}
}
record AvailabilityRefresh(String domain, @NotNull DomainAvailabilityRecord availability, @Nullable DomainSecurityRecord securityRecord) implements AvailabilityJob {
@Override
public Object reference() {
return domain;
}
}
}
// Keeps track of ongoing ping and DNS processing to avoid duplicate work,
// which is mainly a scenario that will occur when there is not a lot of data
// in the database. In real-world scenarios, the queues will be full most
// of the time, and prevent this from being an issue.
private static final ConcurrentHashMap<Object, Boolean> processingDomainsAvailability = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<Object, Boolean> processingDomainsDns = new ConcurrentHashMap<>();
private static final ArrayBlockingQueue<DnsJob> dnsJobQueue = new ArrayBlockingQueue<>(8);
private static final ArrayBlockingQueue<AvailabilityJob> availabilityJobQueue = new ArrayBlockingQueue<>(8);
public volatile Instant dnsLastSync = Instant.now();
public volatile Instant availabilityLastSync = Instant.now();
public volatile Integer nodeId = null;
public volatile boolean running = false;
@@ -87,7 +50,7 @@ public class PingJobScheduler {
this.pingDao = pingDao;
}
public synchronized void start(boolean startPaused) {
public synchronized void start() {
if (running)
return;
@@ -95,15 +58,16 @@ public class PingJobScheduler {
running = true;
allThreads.add(Thread.ofPlatform().daemon().name("new-dns").start(this::fetchNewDnsRecords));
allThreads.add(Thread.ofPlatform().daemon().name("new-pings").start(this::fetchNewAvailabilityJobs));
allThreads.add(Thread.ofPlatform().daemon().name("update-pings").start(this::updateAvailabilityJobs));
allThreads.add(Thread.ofPlatform().daemon().name("update-dns").start(this::updateDnsJobs));
allThreads.add(Thread.ofPlatform().daemon().name("sync-dns").start(this::syncAvailabilityJobs));
allThreads.add(Thread.ofPlatform().daemon().name("sync-availability").start(this::syncDnsRecords));
for (int i = 0; i < 4; i++) {
allThreads.add(Thread.ofPlatform().daemon().name("ping-job-consumer-" + i).start(this::availabilityJobConsumer));
int availabilityThreads = Integer.getInteger("ping.availabilityThreads", 8);
int pingThreads = Integer.getInteger("ping.dnsThreads", 2);
for (int i = 0; i < availabilityThreads; i++) {
allThreads.add(Thread.ofPlatform().daemon().name("availability-job-consumer-" + i).start(this::availabilityJobConsumer));
}
for (int i = 0; i < 4; i++) {
for (int i = 0; i < pingThreads; i++) {
allThreads.add(Thread.ofPlatform().daemon().name("dns-job-consumer-" + i).start(this::dnsJobConsumer));
}
}
@@ -122,19 +86,33 @@ public class PingJobScheduler {
}
public void pause(int nodeId) {
logger.info("Pausing PingJobScheduler for nodeId: {}", nodeId);
if (this.nodeId != null && this.nodeId != nodeId) {
logger.warn("Attempted to pause PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId);
return;
}
this.nodeId = null;
availabilityUpdateSchedule.clear();
dnsUpdateSchedule.clear();
logger.info("PingJobScheduler paused");
}
public synchronized void resume(int nodeId) {
public synchronized void enableForNode(int nodeId) {
logger.info("Resuming PingJobScheduler for nodeId: {}", nodeId);
if (this.nodeId != null) {
logger.warn("Attempted to resume PingJobScheduler with mismatched nodeId: expected null, got {}", this.nodeId, nodeId);
logger.warn("Attempted to resume PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId);
return;
}
availabilityUpdateSchedule.replaceQueue(pingDao.getDomainUpdateSchedule(nodeId));
dnsUpdateSchedule.replaceQueue(pingDao.getDnsUpdateSchedule(nodeId));
dnsLastSync = Instant.now();
availabilityLastSync = Instant.now();
// Flag that we are running again
this.nodeId = nodeId;
notifyAll();
@@ -150,32 +128,44 @@ public class PingJobScheduler {
private void availabilityJobConsumer() {
while (running) {
try {
AvailabilityJob job = availabilityJobQueue.poll(1, TimeUnit.SECONDS);
if (job == null) {
continue; // No job available, continue to the next iteration
Integer nid = nodeId;
if (nid == null) {
waitForResume();
continue;
}
long nextId = availabilityUpdateSchedule.next();
var data = pingDao.getHistoricalAvailabilityData(nextId);
if (data == null) {
logger.warn("No availability data found for ID: {}", nextId);
continue; // No data to process, skip this iteration
}
try {
switch (job) {
case AvailabilityJob.Availability(DomainReference reference) -> {
logger.info("Availability check: {}", reference.domainName());
pingDao.write(httpPingService.pingDomain(reference, null, null));
}
case AvailabilityJob.AvailabilityRefresh(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security) -> {
logger.info("Availability check with reference: {}", domain);
pingDao.write(httpPingService.pingDomain(
new DomainReference(availability.domainId(), availability.nodeId(), domain),
availability,
security));
List<WritableModel> objects = switch (data) {
case HistoricalAvailabilityData.JustDomainReference(DomainReference reference)
-> httpPingService.pingDomain(reference, null, null);
case HistoricalAvailabilityData.JustAvailability(String domain, DomainAvailabilityRecord record)
-> httpPingService.pingDomain(
new DomainReference(record.domainId(), record.nodeId(), domain), record, null);
case HistoricalAvailabilityData.AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security)
-> httpPingService.pingDomain(
new DomainReference(availability.domainId(), availability.nodeId(), domain), availability, security);
};
pingDao.write(objects);
// Re-schedule the next update time for the domain
for (var object : objects) {
var ts = object.nextUpdateTime();
if (ts != null) {
availabilityUpdateSchedule.add(nextId, ts);
break;
}
}
}
catch (Exception e) {
logger.error("Error processing availability job for domain: " + job.reference(), e);
}
finally {
// Remove the domain from the processing map
processingDomainsAvailability.remove(job.reference());
logger.error("Error processing availability job for domain: " + data.domain(), e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -190,30 +180,41 @@ public class PingJobScheduler {
private void dnsJobConsumer() {
while (running) {
try {
DnsJob job = dnsJobQueue.poll(1, TimeUnit.SECONDS);
if (job == null) {
continue; // No job available, continue to the next iteration
Integer nid = nodeId;
if (nid == null) {
waitForResume();
continue;
}
RootDomainReference ref = dnsUpdateSchedule.next();
try {
switch (job) {
case DnsJob.DnsFetch(String rootDomain) -> {
logger.info("Fetching DNS records for root domain: {}", rootDomain);
pingDao.write(dnsPingService.pingDomain(rootDomain, null));
List<WritableModel> objects = switch(ref) {
case RootDomainReference.ById(long id) -> {
var oldRecord = Objects.requireNonNull(pingDao.getDomainDnsRecord(id));
yield dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord);
}
case DnsJob.DnsRefresh(DomainDnsRecord oldRecord) -> {
logger.info("Refreshing DNS records for domain: {}", oldRecord.rootDomainName());
pingDao.write(dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord));
case RootDomainReference.ByName(String name) -> {
@Nullable var oldRecord = pingDao.getDomainDnsRecord(name);
yield dnsPingService.pingDomain(name, oldRecord);
}
};
pingDao.write(objects);
// Re-schedule the next update time for the domain
for (var object : objects) {
var ts = object.nextUpdateTime();
if (ts != null) {
dnsUpdateSchedule.add(ref, ts);
break;
}
}
}
catch (Exception e) {
logger.error("Error processing DNS job for domain: " + job.reference(), e);
}
finally {
// Remove the domain from the processing map
processingDomainsDns.remove(job.reference());
logger.error("Error processing DNS job for domain: " + ref, e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("DNS job consumer interrupted", e);
@@ -224,38 +225,27 @@ public class PingJobScheduler {
}
}
private void fetchNewAvailabilityJobs() {
private void syncAvailabilityJobs() {
try {
while (running) {
// If we are suspended, wait for resume
Integer nid = nodeId;
if (nid == null) {
waitForResume();
continue; // re-fetch the records after resuming
continue;
}
List<DomainReference> domains = pingDao.getOrphanedDomains(nid);
for (DomainReference domain : domains) {
if (nodeId == null) {
waitForResume();
break; // re-fetch the records after resuming
}
try {
availabilityJobQueue.put(new AvailabilityJob.Availability(domain));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Failed to add new ping job for domain: " + domain, e);
}
// Check if we need to refresh the availability data
Instant nextRefresh = availabilityLastSync.plus(Duration.ofHours(24));
if (Instant.now().isBefore(nextRefresh)) {
Duration remaining = Duration.between(Instant.now(), nextRefresh);
TimeUnit.MINUTES.sleep(Math.max(1, remaining.toMinutes()));
continue;
}
// This is an incredibly expensive operation, so we only do it once a day
try {
TimeUnit.HOURS.sleep(24);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
availabilityUpdateSchedule.replaceQueue(pingDao.getDomainUpdateSchedule(nid));
availabilityLastSync = Instant.now();
}
}
catch (Exception e) {
@@ -263,32 +253,26 @@ public class PingJobScheduler {
}
}
private void fetchNewDnsRecords() {
private void syncDnsRecords() {
try {
while (running) {
Integer nid = nodeId;
if (nid == null) {
waitForResume();
continue; // re-fetch the records after resuming
}
List<String> rootDomains = pingDao.getOrphanedRootDomains(nid);
for (String rootDomain : rootDomains) {
if (nodeId == null) {
waitForResume();
break; // re-fetch the records after resuming
}
try {
dnsJobQueue.put(new DnsJob.DnsFetch(rootDomain));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Failed to add new DNS job for root domain: " + rootDomain, e);
}
// Check if we need to refresh the availability data
Instant nextRefresh = dnsLastSync.plus(Duration.ofHours(24));
if (Instant.now().isBefore(nextRefresh)) {
Duration remaining = Duration.between(Instant.now(), nextRefresh);
TimeUnit.MINUTES.sleep(Math.max(1, remaining.toMinutes()));
continue;
}
// This is an incredibly expensive operation, so we only do it once a day
TimeUnit.HOURS.sleep(24);
dnsUpdateSchedule.replaceQueue(pingDao.getDnsUpdateSchedule(nid));
dnsLastSync = Instant.now();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -296,68 +280,5 @@ public class PingJobScheduler {
}
}
private void updateAvailabilityJobs() {
while (running) {
try {
Integer nid = nodeId;
if (nid == null) {
waitForResume();
continue; // re-fetch the records after resuming
}
var statuses = pingDao.getNextDomainPingStatuses(100, nid);
if (nodeId == null) {
waitForResume();
break; // re-fetch the records after resuming
}
for (var status : statuses) {
var job = switch (status) {
case HistoricalAvailabilityData.JustAvailability(String domain, DomainAvailabilityRecord record)
-> new AvailabilityJob.AvailabilityRefresh(domain, record, null);
case HistoricalAvailabilityData.AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security)
-> new AvailabilityJob.AvailabilityRefresh(domain, availability, security);
};
if (processingDomainsAvailability.putIfAbsent(job.reference(), true) == null) {
availabilityJobQueue.add(job);
}
}
}
catch (Exception e) {
logger.error("Error fetching next domain ping statuses", e);
}
}
}
private void updateDnsJobs() {
while (running) {
try {
Integer nid = nodeId;
if (nid == null) {
waitForResume();
continue; // re-fetch the records after resuming
}
var dnsRecords = pingDao.getNextDnsDomainRecords(1000, nid);
for (var record : dnsRecords) {
if (nodeId == null) {
waitForResume();
break; // re-fetch the records after resuming
}
if (processingDomainsDns.putIfAbsent(record.rootDomainName(), true) == null) {
dnsJobQueue.put(new DnsJob.DnsRefresh(record));
}
}
}
catch (Exception e) {
logger.error("Error fetching next domain DNS records", e);
}
}
}
}

View File

@@ -10,7 +10,6 @@ import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqapi.ping.PingRequest;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.nodecfg.model.NodeConfiguration;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass;
@@ -21,7 +20,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.Security;
import java.util.List;
public class PingMain extends ProcessMainClass {
private static final Logger log = LoggerFactory.getLogger(PingMain.class);
@@ -54,57 +52,8 @@ public class PingMain extends ProcessMainClass {
log.info("Starting PingMain...");
// Start the ping job scheduler
pingJobScheduler.start(true);
// Watch the crawler process to suspend/resume the ping job scheduler
try {
serviceRegistry.watchProcess("crawler", node, (running) -> {
if (running) {
log.info("Crawler process is running, suspending ping job scheduler.");
pingJobScheduler.pause(node);
} else {
log.warn("Crawler process is not running, resuming ping job scheduler.");
pingJobScheduler.resume(node);
}
});
}
catch (Exception e) {
throw new RuntimeException("Failed to watch crawler process", e);
}
log.info("PingMain started successfully.");
}
public void runSecondary() {
log.info("Starting PingMain...");
List<Integer> crawlerNodes = nodeConfigurationService.getAll()
.stream()
.filter(node -> !node.disabled())
.filter(node -> node.profile().permitBatchCrawl())
.map(NodeConfiguration::node)
.toList()
;
// Start the ping job scheduler
pingJobScheduler.start(true);
// Watch the crawler process to suspend/resume the ping job scheduler
try {
serviceRegistry.watchProcessAnyNode("crawler", crawlerNodes, (running, n) -> {
if (running) {
log.info("Crawler process is running on node {} taking over ", n);
pingJobScheduler.resume(n);
} else {
log.warn("Crawler process stopped, resuming ping job scheduler.");
pingJobScheduler.pause(n);
}
});
}
catch (Exception e) {
throw new RuntimeException("Failed to watch crawler process", e);
}
pingJobScheduler.start();
pingJobScheduler.enableForNode(node);
log.info("PingMain started successfully.");
}
@@ -144,19 +93,11 @@ public class PingMain extends ProcessMainClass {
var instructions = main.fetchInstructions(PingRequest.class);
try {
switch (instructions.value().runClass) {
case "primary":
log.info("Running as primary node");
main.runPrimary();
break;
case "secondary":
log.info("Running as secondary node");
main.runSecondary();
break;
default:
throw new IllegalArgumentException("Invalid runClass: " + instructions.value().runClass);
}
for(;;);
main.runPrimary();
for(;;)
synchronized (main) { // Wait on the object lock to avoid busy-looping
main.wait();
}
}
catch (Throwable ex) {
logger.error("Error running ping process", ex);

View File

@@ -0,0 +1,57 @@
package nu.marginalia.ping;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Comparator;
import java.util.PriorityQueue;
/** In-memory schedule for updates, allowing jobs to be added and processed in order of their scheduled time.
* This is not a particularly high-performance implementation, but exists to take contention off the database's
* timestamp index.
* */
public class UpdateSchedule<T, T2> {
private final PriorityQueue<UpdateJob<T, T2>> updateQueue;
public record UpdateJob<T, T2>(T key, Instant updateTime) {}
public UpdateSchedule(int initialCapacity) {
updateQueue = new PriorityQueue<>(initialCapacity, Comparator.comparing(UpdateJob::updateTime));
}
public synchronized void add(T key, Instant updateTime) {
updateQueue.add(new UpdateJob<>(key, updateTime));
notifyAll();
}
public synchronized T next() throws InterruptedException {
while (true) {
if (updateQueue.isEmpty()) {
wait(); // Wait for a new job to be added
continue;
}
UpdateJob<T, T2> job = updateQueue.peek();
Instant now = Instant.now();
if (job.updateTime.isAfter(now)) {
Duration toWait = Duration.between(now, job.updateTime);
wait(Math.max(1, toWait.toMillis()));
}
else {
updateQueue.poll(); // Remove the job from the queue since it's due
return job.key();
}
}
}
public synchronized void clear() {
updateQueue.clear();
notifyAll();
}
public synchronized void replaceQueue(Collection<UpdateJob<T,T2>> newJobs) {
updateQueue.clear();
updateQueue.addAll(newJobs);
notifyAll();
}
}

View File

@@ -3,6 +3,8 @@ package nu.marginalia.ping.fetcher;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import nu.marginalia.ping.model.SingleDnsRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xbill.DNS.ExtendedResolver;
import org.xbill.DNS.Lookup;
import org.xbill.DNS.TextParseException;
@@ -17,6 +19,7 @@ import java.util.concurrent.*;
public class PingDnsFetcher {
private final ThreadLocal<ExtendedResolver> resolver;
private static final ExecutorService digExecutor = Executors.newFixedThreadPool(100);
private static final Logger logger = LoggerFactory.getLogger(PingDnsFetcher.class);
private static final int[] RECORD_TYPES = {
Type.A, Type.AAAA, Type.NS, Type.MX, Type.TXT,
@@ -25,8 +28,7 @@ public class PingDnsFetcher {
@Inject
public PingDnsFetcher(@Named("ping.nameservers")
List<String> nameservers) throws UnknownHostException
{
List<String> nameservers) {
resolver = ThreadLocal.withInitial(() -> createResolver(nameservers));
}
@@ -81,13 +83,12 @@ public class PingDnsFetcher {
try {
results.addAll(future.get(1, TimeUnit.MINUTES));
} catch (Exception e) {
e.printStackTrace();
System.err.println("Error fetching DNS records: " + e.getMessage());
logger.error("Error fetching DNS records", e);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("DNS query interrupted: " + e.getMessage());
logger.error("DNS query interrupted", e);
}
return results;
}

View File

@@ -83,7 +83,7 @@ public class PingHttpFetcher {
} catch (SocketTimeoutException ex) {
return new TimeoutResponse(ex.getMessage());
} catch (IOException e) {
return new ConnectionError(e.getMessage());
return new ConnectionError(e.getClass().getSimpleName());
}
}

View File

@@ -44,14 +44,14 @@ public class HttpClientProvider implements Provider<HttpClient> {
private static CloseableHttpClient createClient() throws NoSuchAlgorithmException {
final ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setSocketTimeout(30, TimeUnit.SECONDS)
.setConnectTimeout(30, TimeUnit.SECONDS)
.setSocketTimeout(15, TimeUnit.SECONDS)
.setConnectTimeout(15, TimeUnit.SECONDS)
.setValidateAfterInactivity(TimeValue.ofSeconds(5))
.build();
connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
.setMaxConnPerRoute(2)
.setMaxConnTotal(5000)
.setMaxConnTotal(50)
.setDefaultConnectionConfig(connectionConfig)
.setTlsSocketStrategy(
new DefaultClientTlsStrategy(SSLContext.getDefault(), NoopHostnameVerifier.INSTANCE))

View File

@@ -70,6 +70,11 @@ implements WritableModel
return millis == null ? null : Duration.ofMillis(millis);
}
@Override
public Instant nextUpdateTime() {
return nextScheduledUpdate;
}
@Override
public void write(Connection connection) throws SQLException {
try (var ps = connection.prepareStatement(
@@ -149,7 +154,7 @@ implements WritableModel
ps.setNull(12, java.sql.Types.SMALLINT);
}
else {
ps.setShort(12, (short) httpResponseTime().toMillis());
ps.setInt(12, Math.clamp(httpResponseTime().toMillis(), 0, 0xFFFF)); // "unsigned short" in SQL
}
if (errorClassification() == null) {

View File

@@ -60,6 +60,11 @@ public record DomainDnsRecord(
return new Builder();
}
@Override
public Instant nextUpdateTime() {
return tsNextScheduledUpdate;
}
@Override
public void write(Connection connection) throws SQLException {

View File

@@ -16,6 +16,8 @@ public record DomainSecurityEvent(
boolean certificateProfileChanged,
boolean certificateSanChanged,
boolean certificatePublicKeyChanged,
boolean certificateSerialNumberChanged,
boolean certificateIssuerChanged,
Duration oldCertificateTimeToExpiry,
boolean securityHeadersChanged,
boolean ipChanged,
@@ -41,8 +43,10 @@ public record DomainSecurityEvent(
change_software,
old_cert_time_to_expiry,
security_signature_before,
security_signature_after
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
security_signature_after,
change_certificate_serial_number,
change_certificate_issuer
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
"""))
{
@@ -75,6 +79,9 @@ public record DomainSecurityEvent(
ps.setBytes(14, securitySignatureAfter().compressed());
}
ps.setBoolean(15, certificateSerialNumberChanged());
ps.setBoolean(16, certificateIssuerChanged());
ps.executeUpdate();
}
}

View File

@@ -1,5 +1,7 @@
package nu.marginalia.ping.model;
import org.apache.commons.lang3.StringUtils;
import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -201,7 +203,6 @@ public record DomainSecurityRecord(
if (sslCertFingerprintSha256() == null) {
ps.setNull(12, java.sql.Types.BINARY);
} else {
System.out.println(sslCertFingerprintSha256().length);
ps.setBytes(12, sslCertFingerprintSha256());
}
if (sslCertSan() == null) {
@@ -359,12 +360,12 @@ public record DomainSecurityRecord(
}
public Builder httpVersion(String httpVersion) {
this.httpVersion = httpVersion;
this.httpVersion = StringUtils.truncate(httpVersion, 10);
return this;
}
public Builder httpCompression(String httpCompression) {
this.httpCompression = httpCompression;
this.httpCompression = StringUtils.truncate(httpCompression, 50);
return this;
}
@@ -384,12 +385,12 @@ public record DomainSecurityRecord(
}
public Builder sslCertIssuer(String sslCertIssuer) {
this.sslCertIssuer = sslCertIssuer;
this.sslCertIssuer = StringUtils.truncate(sslCertIssuer, 255);
return this;
}
public Builder sslCertSubject(String sslCertSubject) {
this.sslCertSubject = sslCertSubject;
this.sslCertSubject = StringUtils.truncate(sslCertSubject, 255);
return this;
}
@@ -459,37 +460,37 @@ public record DomainSecurityRecord(
}
public Builder headerStrictTransportSecurity(String headerStrictTransportSecurity) {
this.headerStrictTransportSecurity = headerStrictTransportSecurity;
this.headerStrictTransportSecurity = StringUtils.truncate(headerStrictTransportSecurity, 255);
return this;
}
public Builder headerReferrerPolicy(String headerReferrerPolicy) {
this.headerReferrerPolicy = headerReferrerPolicy;
this.headerReferrerPolicy = StringUtils.truncate(headerReferrerPolicy, 50);
return this;
}
public Builder headerXFrameOptions(String headerXFrameOptions) {
this.headerXFrameOptions = headerXFrameOptions;
this.headerXFrameOptions = StringUtils.truncate(headerXFrameOptions, 50);
return this;
}
public Builder headerXContentTypeOptions(String headerXContentTypeOptions) {
this.headerXContentTypeOptions = headerXContentTypeOptions;
this.headerXContentTypeOptions = StringUtils.truncate(headerXContentTypeOptions, 50);
return this;
}
public Builder headerXXssProtection(String headerXXssProtection) {
this.headerXXssProtection = headerXXssProtection;
this.headerXXssProtection = StringUtils.truncate(headerXXssProtection, 50);
return this;
}
public Builder headerServer(String headerServer) {
this.headerServer = headerServer;
this.headerServer = StringUtils.truncate(headerServer, 255);
return this;
}
public Builder headerXPoweredBy(String headerXPoweredBy) {
this.headerXPoweredBy = headerXPoweredBy;
this.headerXPoweredBy = StringUtils.truncate(headerXPoweredBy, 255);
return this;
}

View File

@@ -1,6 +1,13 @@
package nu.marginalia.ping.model;
public sealed interface HistoricalAvailabilityData {
public String domain();
record JustDomainReference(DomainReference domainReference) implements HistoricalAvailabilityData {
@Override
public String domain() {
return domainReference.domainName();
}
}
record JustAvailability(String domain, DomainAvailabilityRecord record) implements HistoricalAvailabilityData {}
record AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availabilityRecord, DomainSecurityRecord securityRecord) implements HistoricalAvailabilityData {}
}

View File

@@ -0,0 +1,6 @@
package nu.marginalia.ping.model;
public sealed interface RootDomainReference {
record ById(long id) implements RootDomainReference { }
record ByName(String name) implements RootDomainReference { }
}

View File

@@ -1,8 +1,14 @@
package nu.marginalia.ping.model;
import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.Instant;
public interface WritableModel {
void write(Connection connection) throws SQLException;
@Nullable
default Instant nextUpdateTime() {
return null;
}
}

View File

@@ -15,6 +15,8 @@ public record SecurityInformationChange(
boolean isCertificateProfileChanged,
boolean isCertificateSanChanged,
boolean isCertificatePublicKeyChanged,
boolean isCertificateSerialNumberChanged,
boolean isCertificateIssuerChanged,
Duration oldCertificateTimeToExpiry,
boolean isSecurityHeadersChanged,
boolean isIpAddressChanged,
@@ -30,8 +32,10 @@ public record SecurityInformationChange(
boolean certificateFingerprintChanged = 0 != Arrays.compare(before.sslCertFingerprintSha256(), after.sslCertFingerprintSha256());
boolean certificateProfileChanged = before.certificateProfileHash() != after.certificateProfileHash();
boolean certificateSerialNumberChanged = !Objects.equals(before.sslCertSerialNumber(), after.sslCertSerialNumber());
boolean certificatePublicKeyChanged = 0 != Arrays.compare(before.sslCertPublicKeyHash(), after.sslCertPublicKeyHash());
boolean certificateSanChanged = !Objects.equals(before.sslCertSan(), after.sslCertSan());
boolean certificateIssuerChanged = !Objects.equals(before.sslCertIssuer(), after.sslCertIssuer());
Duration oldCertificateTimeToExpiry = before.sslCertNotAfter() == null ? null : Duration.between(
Instant.now(),
@@ -50,6 +54,7 @@ public record SecurityInformationChange(
boolean isChanged = asnChanged
|| certificateFingerprintChanged
|| securityHeadersChanged
|| certificateProfileChanged
|| softwareChanged;
return new SecurityInformationChange(
@@ -59,6 +64,8 @@ public record SecurityInformationChange(
certificateProfileChanged,
certificateSanChanged,
certificatePublicKeyChanged,
certificateSerialNumberChanged,
certificateIssuerChanged,
oldCertificateTimeToExpiry,
securityHeadersChanged,
ipChanged,

View File

@@ -48,7 +48,6 @@ public class DnsPingService {
switch (changes) {
case DnsRecordChange.None _ -> {}
case DnsRecordChange.Changed changed -> {
logger.info("DNS record for {} changed: {}", newRecord.dnsRootDomainId(), changed);
generatedRecords.add(DomainDnsEvent.builder()
.rootDomainId(newRecord.dnsRootDomainId())
.nodeId(newRecord.nodeAffinity())

View File

@@ -71,23 +71,40 @@ public class DomainAvailabilityInformationFactory {
@Nullable DomainAvailabilityRecord previousRecord,
HttpResponse rsp) {
Instant lastError = previousRecord != null ? previousRecord.tsLastAvailable() : null;
final boolean isAvailable;
final Instant now = Instant.now();
final Instant lastAvailable;
final Instant lastError;
final ErrorClassification errorClassification;
if (rsp.httpStatus() >= 400) {
isAvailable = false;
lastError = now;
lastAvailable = previousRecord != null ? previousRecord.tsLastAvailable() : null;
errorClassification = ErrorClassification.HTTP_SERVER_ERROR;
} else {
isAvailable = true;
lastAvailable = now;
lastError = previousRecord != null ? previousRecord.tsLastError() : null;
errorClassification = ErrorClassification.NONE;
}
return DomainAvailabilityRecord.builder()
.domainId(domainId)
.nodeId(nodeId)
.serverAvailable(true)
.serverAvailable(isAvailable)
.serverIp(address != null ? address.getAddress() : null)
.serverIpAsn(getAsn(address))
.httpSchema(HttpSchema.HTTP)
.httpStatus(rsp.httpStatus())
.errorClassification(errorClassification)
.httpResponseTime(rsp.httpResponseTime())
.httpEtag(rsp.headers().getFirst("ETag"))
.httpLastModified(rsp.headers().getFirst("Last-Modified"))
.tsLastPing(Instant.now())
.tsLastAvailable(Instant.now())
.tsLastPing(now)
.tsLastAvailable(lastAvailable)
.tsLastError(lastError)
.nextScheduledUpdate(Instant.now().plus(backoffStrategy.getOkInterval()))
.nextScheduledUpdate(now.plus(backoffStrategy.getOkInterval()))
.backoffFetchInterval(backoffStrategy.getOkInterval())
.build();
@@ -117,23 +134,44 @@ public class DomainAvailabilityInformationFactory {
updateTime = Instant.now().plus(backoffStrategy.getOkInterval());
}
Instant lastError = previousRecord != null ? previousRecord.tsLastAvailable() : null;
final boolean isAvailable;
final Instant now = Instant.now();
final Instant lastAvailable;
final Instant lastError;
final ErrorClassification errorClassification;
if (!validationResult.isValid()) {
isAvailable = false;
lastError = now;
lastAvailable = previousRecord != null ? previousRecord.tsLastAvailable() : null;
errorClassification = ErrorClassification.SSL_ERROR;
} else if (rsp.httpStatus() >= 400) {
isAvailable = false;
lastError = now;
lastAvailable = previousRecord != null ? previousRecord.tsLastAvailable() : null;
errorClassification = ErrorClassification.HTTP_SERVER_ERROR;
} else {
isAvailable = true;
lastAvailable = Instant.now();
lastError = previousRecord != null ? previousRecord.tsLastError() : null;
errorClassification = ErrorClassification.NONE;
}
return DomainAvailabilityRecord.builder()
.domainId(domainId)
.nodeId(nodeId)
.serverAvailable(validationResult.isValid())
.serverAvailable(isAvailable)
.serverIp(address != null ? address.getAddress() : null)
.serverIpAsn(getAsn(address))
.httpSchema(HttpSchema.HTTPS)
.httpStatus(rsp.httpStatus())
.errorClassification(!validationResult.isValid() ? ErrorClassification.SSL_ERROR : ErrorClassification.NONE)
.errorClassification(errorClassification)
.httpResponseTime(rsp.httpResponseTime()) // Placeholder, actual timing not implemented
.httpEtag(rsp.headers().getFirst("ETag"))
.httpLastModified(rsp.headers().getFirst("Last-Modified"))
.tsLastPing(Instant.now())
.tsLastPing(now)
.tsLastError(lastError)
.tsLastAvailable(Instant.now())
.tsLastAvailable(lastAvailable)
.nextScheduledUpdate(updateTime)
.backoffFetchInterval(backoffStrategy.getOkInterval())
.build();

View File

@@ -8,6 +8,7 @@ import nu.marginalia.ping.ssl.PKIXValidationResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateEncodingException;
@@ -21,13 +22,17 @@ public class DomainSecurityInformationFactory {
private static final Logger logger = LoggerFactory.getLogger(DomainSecurityInformationFactory.class);
// Vanilla HTTP (not HTTPS) response does not have SSL session information, so we return null
public DomainSecurityRecord createHttpSecurityInformation(HttpResponse httpResponse, int domainId, int nodeId) {
public DomainSecurityRecord createHttpSecurityInformation(HttpResponse httpResponse,
int domainId, int nodeId,
@Nullable Integer asn
) {
var headers = httpResponse.headers();
return DomainSecurityRecord.builder()
.domainId(domainId)
.nodeId(nodeId)
.asn(asn)
.httpSchema(HttpSchema.HTTP)
.httpVersion(httpResponse.version())
.headerServer(headers.getFirst("Server"))
@@ -47,7 +52,13 @@ public class DomainSecurityInformationFactory {
}
// HTTPS response
public DomainSecurityRecord createHttpsSecurityInformation(HttpsResponse httpResponse, PKIXValidationResult validationResult, int domainId, int nodeId) {
public DomainSecurityRecord createHttpsSecurityInformation(
HttpsResponse httpResponse,
PKIXValidationResult validationResult,
int domainId,
int nodeId,
@Nullable Integer asn
) {
var headers = httpResponse.headers();
@@ -86,6 +97,7 @@ public class DomainSecurityInformationFactory {
return DomainSecurityRecord.builder()
.domainId(domainId)
.nodeId(nodeId)
.asn(asn)
.httpSchema(HttpSchema.HTTPS)
.headerServer(headers.getFirst("Server"))
.headerCorsAllowOrigin(headers.getFirst("Access-Control-Allow-Origin"))

View File

@@ -18,6 +18,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.cert.X509Certificate;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -75,8 +76,8 @@ public class HttpPingService {
result = pingHttpFetcher.fetchUrl(url, Method.HEAD, null, null);
if (result instanceof HttpsResponse response && response.httpStatus() == 405) {
// If we get a 405, we try the GET method instead as not all servers support HEAD requests
if (result instanceof HttpsResponse response && shouldTryGET(response.httpStatus())) {
sleep(Duration.ofSeconds(2));
result = pingHttpFetcher.fetchUrl(url, Method.GET, null, null);
}
else if (result instanceof ConnectionError) {
@@ -84,8 +85,8 @@ public class HttpPingService {
if (!(result2 instanceof ConnectionError)) {
result = result2;
}
if (result instanceof HttpResponse response && response.httpStatus() == 405) {
// If we get a 405, we try the GET method instead as not all servers support HEAD requests
if (result instanceof HttpResponse response && shouldTryGET(response.httpStatus())) {
sleep(Duration.ofSeconds(2));
result = pingHttpFetcher.fetchUrl(alternateUrl, Method.GET, null, null);
}
}
@@ -116,7 +117,7 @@ public class HttpPingService {
domainReference.nodeId(),
oldPingStatus,
ErrorClassification.CONNECTION_ERROR,
null);
rsp.errorMessage());
newSecurityInformation = null;
}
case TimeoutResponse rsp -> {
@@ -148,7 +149,8 @@ public class HttpPingService {
newSecurityInformation = domainSecurityInformationFactory.createHttpSecurityInformation(
httpResponse,
domainReference.domainId(),
domainReference.nodeId()
domainReference.nodeId(),
newPingStatus.asn()
);
}
case HttpsResponse httpsResponse -> {
@@ -166,7 +168,8 @@ public class HttpPingService {
httpsResponse,
validationResult,
domainReference.domainId(),
domainReference.nodeId()
domainReference.nodeId(),
newPingStatus.asn()
);
}
}
@@ -190,6 +193,29 @@ public class HttpPingService {
return generatedRecords;
}
private boolean shouldTryGET(int statusCode) {
if (statusCode < 400) {
return false;
}
if (statusCode == 429) { // Too many requests, we should not retry with GET
return false;
}
// For all other status codes, we can try a GET request, as many severs do not
// cope with HEAD requests properly.
return statusCode < 600;
}
private void sleep(Duration duration) {
try {
Thread.sleep(duration.toMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore the interrupted status
logger.warn("Sleep interrupted", e);
}
}
private void comparePingStatuses(List<WritableModel> generatedRecords,
DomainAvailabilityRecord oldPingStatus,
DomainAvailabilityRecord newPingStatus) {
@@ -258,6 +284,8 @@ public class HttpPingService {
change.isCertificateProfileChanged(),
change.isCertificateSanChanged(),
change.isCertificatePublicKeyChanged(),
change.isCertificateSerialNumberChanged(),
change.isCertificateIssuerChanged(),
change.oldCertificateTimeToExpiry(),
change.isSecurityHeadersChanged(),
change.isIpAddressChanged(),

View File

@@ -318,6 +318,8 @@ class PingDaoTest {
true,
false,
true,
true,
false,
Duration.ofDays(30),
false,
false,
@@ -330,86 +332,6 @@ class PingDaoTest {
svc.write(event);
}
@Test
void getNextDomainPingStatuses() throws SQLException {
var svc = new PingDao(dataSource);
// Create a test domain availability record
var record = new DomainAvailabilityRecord(
1,
1,
true,
new byte[]{127, 0, 0, 1},
40,
0x0F00BA32L,
0x0F00BA34L,
HttpSchema.HTTP,
"etag123",
"Wed, 21 Oct 2023 07:28:00 GMT",
200,
"http://example.com/redirect",
Duration.ofMillis(150),
ErrorClassification.NONE,
"No error",
Instant.now().minus(30, ChronoUnit.SECONDS),
Instant.now().minus(3600, ChronoUnit.SECONDS),
Instant.now().minus(7200, ChronoUnit.SECONDS),
Instant.now().minus(3000, ChronoUnit.SECONDS),
2,
Duration.ofSeconds(60)
);
svc.write(record);
// Fetch the next domain ping statuses
var statuses = svc.getNextDomainPingStatuses(10, 1);
assertFalse(statuses.isEmpty());
assertEquals(1, statuses.size());
}
@Test
void getNextDnsDomainRecords() throws SQLException {
var svc = new PingDao(dataSource);
// Create a test DNS record
var dnsRecord = new DomainDnsRecord(null, "example.com", 2,
List.of("test"),
List.of("test2"),
"test3",
List.of("test4"),
List.of("test5"),
List.of("test6"),
List.of("test7"),
"test8",
Instant.now().minus(3600, ChronoUnit.SECONDS),
Instant.now().minus(3600, ChronoUnit.SECONDS),
4);
svc.write(dnsRecord);
var nextRecords = svc.getNextDnsDomainRecords(1, 2);
assertFalse(nextRecords.isEmpty());
assertEquals(1, nextRecords.size());
}
@Test
void getOrphanedDomains(){
var svc = new PingDao(dataSource);
var orphanedDomains = svc.getOrphanedDomains(1);
System.out.println(orphanedDomains);
assertTrue(orphanedDomains.contains(new DomainReference(1, 1, "www.marginalia.nu")));
assertFalse(orphanedDomains.isEmpty());
var orphanedRootDomains = svc.getOrphanedRootDomains(1);
System.out.println(orphanedRootDomains);
assertTrue(orphanedRootDomains.contains("marginalia.nu"));
}
@Test
void write() {
var dnsEvent = new DomainDnsEvent(

View File

@@ -1,9 +1,8 @@
package nu.marginalia.mqapi.ping;
public class PingRequest {
public final String runClass;
public PingRequest(String runClass) {
this.runClass = runClass;
public PingRequest() {
}
}