1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

10 Commits

Author SHA1 Message Date
Viktor Lofgren
5020029c2d (ping) Fix startup sequence for new primary-only flow 2025-06-14 12:48:09 +02:00
Viktor Lofgren
ac44d0b093 (ping) Fix wait logic to use synchronized block 2025-06-14 12:38:16 +02:00
Viktor Lofgren
4b32b9b10e Update DomainAvailabilityRecord to use clamped integer for HTTP response time 2025-06-14 12:37:58 +02:00
Viktor Lofgren
9f041d6631 (ping) Drop the concept of primary and secondary ping instances
There was an idea of having the ping service duck over to a realtime partition when the partition is crawling, but this hasn't been working out well, so the concept will be retired and all nodes will run as primary.
2025-06-14 12:32:08 +02:00
Viktor Lofgren
13fb1efce4 (ping) Populate ASN field on DomainSecurityInformation 2025-06-13 15:45:43 +02:00
Viktor Lofgren
c1225165b7 (ping) Add a summary fields CHANGE_SERIAL_NUMBER and CHANGE_ISSUER to DOMAIN_SECURITY_EVENTS 2025-06-13 15:30:45 +02:00
Viktor Lofgren
67ad7a3bbc (ping) Enhance HTTP ping logic to retry GET requests for specific status codes and add sleep duration between retries 2025-06-13 12:59:56 +02:00
Viktor Lofgren
ed62ec8a35 (random) Sanitize random search results with DOMAIN_AVAILABILITY_INFORMATION join 2025-06-13 10:38:21 +02:00
Viktor Lofgren
42b24cfa34 (ping) Fix NPE in dnsJobConsumer 2025-06-12 14:22:09 +02:00
Viktor Lofgren
1ffaab2da6 (ping) Mute logging along the happy path now that things are working 2025-06-12 14:15:23 +02:00
14 changed files with 104 additions and 115 deletions

View File

@@ -0,0 +1,6 @@
-- Add additional summary columns to DOMAIN_SECURITY_EVENTS table
-- to make it easier to make sense of certificate changes
ALTER TABLE DOMAIN_SECURITY_EVENTS ADD COLUMN CHANGE_CERTIFICATE_SERIAL_NUMBER BOOLEAN NOT NULL DEFAULT FALSE;
ALTER TABLE DOMAIN_SECURITY_EVENTS ADD COLUMN CHANGE_CERTIFICATE_ISSUER BOOLEAN NOT NULL DEFAULT FALSE;
OPTIMIZE TABLE DOMAIN_SECURITY_EVENTS;

View File

@@ -13,15 +13,12 @@ import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqapi.ping.PingRequest;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.nodecfg.model.NodeProfile;
import nu.marginalia.process.ProcessService;
import nu.marginalia.service.module.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -41,7 +38,6 @@ public class PingMonitorActor extends RecordActorPrototype {
private final ProcessService.ProcessId processId;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final int node;
private final boolean isPrimaryNode;
private final Gson gson;
public record Initial() implements ActorStep {}
@@ -56,7 +52,7 @@ public class PingMonitorActor extends RecordActorPrototype {
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Initial i -> {
PingRequest request = new PingRequest(isPrimaryNode ? "primary": "secondary");
PingRequest request = new PingRequest();
persistence.sendNewMessage(inboxName, null, null,
"PingRequest",
@@ -129,7 +125,6 @@ public class PingMonitorActor extends RecordActorPrototype {
@Inject
public PingMonitorActor(Gson gson,
NodeConfigurationService nodeConfigurationService,
ServiceConfiguration configuration,
MqPersistence persistence,
ProcessService processService) throws SQLException {
@@ -140,9 +135,6 @@ public class PingMonitorActor extends RecordActorPrototype {
this.processService = processService;
this.inboxName = ProcessInboxNames.PING_INBOX + ":" + node;
this.processId = ProcessService.ProcessId.PING;
this.isPrimaryNode = Set.of(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED)
.contains(nodeConfigurationService.get(node).profile());
}
/** Sets the message to dead in the database to avoid

View File

@@ -27,10 +27,12 @@ public class DbBrowseDomainsRandom {
public List<BrowseResult> getRandomDomains(int count, DomainBlacklist blacklist, int set) {
final String q = """
SELECT DOMAIN_ID, DOMAIN_NAME, INDEXED
SELECT EC_RANDOM_DOMAINS.DOMAIN_ID, DOMAIN_NAME, INDEXED
FROM EC_RANDOM_DOMAINS
INNER JOIN EC_DOMAIN ON EC_DOMAIN.ID=DOMAIN_ID
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION DAI ON DAI.DOMAIN_ID=EC_RANDOM_DOMAINS.DOMAIN_ID
WHERE STATE<2
AND SERVER_AVAILABLE
AND DOMAIN_SET=?
AND DOMAIN_ALIAS IS NULL
ORDER BY RAND()

View File

@@ -7,6 +7,7 @@ import nu.marginalia.ping.svc.HttpPingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@@ -49,7 +50,7 @@ public class PingJobScheduler {
this.pingDao = pingDao;
}
public synchronized void start(boolean startPaused) {
public synchronized void start() {
if (running)
return;
@@ -85,6 +86,8 @@ public class PingJobScheduler {
}
public void pause(int nodeId) {
logger.info("Pausing PingJobScheduler for nodeId: {}", nodeId);
if (this.nodeId != null && this.nodeId != nodeId) {
logger.warn("Attempted to pause PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId);
return;
@@ -97,7 +100,8 @@ public class PingJobScheduler {
logger.info("PingJobScheduler paused");
}
public synchronized void resume(int nodeId) {
public synchronized void enableForNode(int nodeId) {
logger.info("Resuming PingJobScheduler for nodeId: {}", nodeId);
if (this.nodeId != null) {
logger.warn("Attempted to resume PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId);
return;
@@ -139,24 +143,14 @@ public class PingJobScheduler {
try {
List<WritableModel> objects = switch (data) {
case HistoricalAvailabilityData.JustDomainReference(DomainReference reference) -> {
logger.info("Processing availability job for domain: {}", reference.domainName());
yield httpPingService.pingDomain(reference, null, null);
}
case HistoricalAvailabilityData.JustAvailability(String domain, DomainAvailabilityRecord record) -> {
logger.info("Availability check with no security info: {}", domain);
yield httpPingService.pingDomain(
new DomainReference(record.domainId(), record.nodeId(), domain),
record,
null);
}
case HistoricalAvailabilityData.AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security) -> {
logger.info("Availability check with full historical data: {}", domain);
yield httpPingService.pingDomain(
new DomainReference(availability.domainId(), availability.nodeId(), domain),
availability,
security);
}
case HistoricalAvailabilityData.JustDomainReference(DomainReference reference)
-> httpPingService.pingDomain(reference, null, null);
case HistoricalAvailabilityData.JustAvailability(String domain, DomainAvailabilityRecord record)
-> httpPingService.pingDomain(
new DomainReference(record.domainId(), record.nodeId(), domain), record, null);
case HistoricalAvailabilityData.AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security)
-> httpPingService.pingDomain(
new DomainReference(availability.domainId(), availability.nodeId(), domain), availability, security);
};
pingDao.write(objects);
@@ -201,8 +195,8 @@ public class PingJobScheduler {
yield dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord);
}
case RootDomainReference.ByName(String name) -> {
var oldRecord = pingDao.getDomainDnsRecord(name);
yield dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord);
@Nullable var oldRecord = pingDao.getDomainDnsRecord(name);
yield dnsPingService.pingDomain(name, oldRecord);
}
};

View File

@@ -10,7 +10,6 @@ import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqapi.ping.PingRequest;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.nodecfg.model.NodeConfiguration;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass;
@@ -21,7 +20,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.Security;
import java.util.List;
public class PingMain extends ProcessMainClass {
private static final Logger log = LoggerFactory.getLogger(PingMain.class);
@@ -54,57 +52,8 @@ public class PingMain extends ProcessMainClass {
log.info("Starting PingMain...");
// Start the ping job scheduler
pingJobScheduler.start(true);
// Watch the crawler process to suspend/resume the ping job scheduler
try {
serviceRegistry.watchProcess("crawler", node, (running) -> {
if (running) {
log.info("Crawler process is running, suspending ping job scheduler.");
pingJobScheduler.pause(node);
} else {
log.warn("Crawler process is not running, resuming ping job scheduler.");
pingJobScheduler.resume(node);
}
});
}
catch (Exception e) {
throw new RuntimeException("Failed to watch crawler process", e);
}
log.info("PingMain started successfully.");
}
public void runSecondary() {
log.info("Starting PingMain...");
List<Integer> crawlerNodes = nodeConfigurationService.getAll()
.stream()
.filter(node -> !node.disabled())
.filter(node -> node.profile().permitBatchCrawl())
.map(NodeConfiguration::node)
.toList()
;
// Start the ping job scheduler
pingJobScheduler.start(true);
// Watch the crawler process to suspend/resume the ping job scheduler
try {
serviceRegistry.watchProcessAnyNode("crawler", crawlerNodes, (running, n) -> {
if (running) {
log.info("Crawler process is running on node {} taking over ", n);
pingJobScheduler.resume(n);
} else {
log.warn("Crawler process stopped, resuming ping job scheduler.");
pingJobScheduler.pause(n);
}
});
}
catch (Exception e) {
throw new RuntimeException("Failed to watch crawler process", e);
}
pingJobScheduler.start();
pingJobScheduler.enableForNode(node);
log.info("PingMain started successfully.");
}
@@ -144,19 +93,11 @@ public class PingMain extends ProcessMainClass {
var instructions = main.fetchInstructions(PingRequest.class);
try {
switch (instructions.value().runClass) {
case "primary":
log.info("Running as primary node");
main.runPrimary();
break;
case "secondary":
log.info("Running as secondary node");
main.runSecondary();
break;
default:
throw new IllegalArgumentException("Invalid runClass: " + instructions.value().runClass);
}
for(;;);
main.runPrimary();
for(;;)
synchronized (main) { // Wait on the object lock to avoid busy-looping
main.wait();
}
}
catch (Throwable ex) {
logger.error("Error running ping process", ex);

View File

@@ -83,7 +83,7 @@ public class PingHttpFetcher {
} catch (SocketTimeoutException ex) {
return new TimeoutResponse(ex.getMessage());
} catch (IOException e) {
return new ConnectionError(e.getMessage());
return new ConnectionError(e.getClass().getSimpleName());
}
}

View File

@@ -154,7 +154,7 @@ implements WritableModel
ps.setNull(12, java.sql.Types.SMALLINT);
}
else {
ps.setShort(12, (short) httpResponseTime().toMillis());
ps.setInt(12, Math.clamp(httpResponseTime().toMillis(), 0, 0xFFFF)); // "unsigned short" in SQL
}
if (errorClassification() == null) {

View File

@@ -16,6 +16,8 @@ public record DomainSecurityEvent(
boolean certificateProfileChanged,
boolean certificateSanChanged,
boolean certificatePublicKeyChanged,
boolean certificateSerialNumberChanged,
boolean certificateIssuerChanged,
Duration oldCertificateTimeToExpiry,
boolean securityHeadersChanged,
boolean ipChanged,
@@ -41,8 +43,10 @@ public record DomainSecurityEvent(
change_software,
old_cert_time_to_expiry,
security_signature_before,
security_signature_after
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
security_signature_after,
change_certificate_serial_number,
change_certificate_issuer
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
"""))
{
@@ -75,6 +79,9 @@ public record DomainSecurityEvent(
ps.setBytes(14, securitySignatureAfter().compressed());
}
ps.setBoolean(15, certificateSerialNumberChanged());
ps.setBoolean(16, certificateIssuerChanged());
ps.executeUpdate();
}
}

View File

@@ -15,6 +15,8 @@ public record SecurityInformationChange(
boolean isCertificateProfileChanged,
boolean isCertificateSanChanged,
boolean isCertificatePublicKeyChanged,
boolean isCertificateSerialNumberChanged,
boolean isCertificateIssuerChanged,
Duration oldCertificateTimeToExpiry,
boolean isSecurityHeadersChanged,
boolean isIpAddressChanged,
@@ -30,8 +32,10 @@ public record SecurityInformationChange(
boolean certificateFingerprintChanged = 0 != Arrays.compare(before.sslCertFingerprintSha256(), after.sslCertFingerprintSha256());
boolean certificateProfileChanged = before.certificateProfileHash() != after.certificateProfileHash();
boolean certificateSerialNumberChanged = !Objects.equals(before.sslCertSerialNumber(), after.sslCertSerialNumber());
boolean certificatePublicKeyChanged = 0 != Arrays.compare(before.sslCertPublicKeyHash(), after.sslCertPublicKeyHash());
boolean certificateSanChanged = !Objects.equals(before.sslCertSan(), after.sslCertSan());
boolean certificateIssuerChanged = !Objects.equals(before.sslCertIssuer(), after.sslCertIssuer());
Duration oldCertificateTimeToExpiry = before.sslCertNotAfter() == null ? null : Duration.between(
Instant.now(),
@@ -50,6 +54,7 @@ public record SecurityInformationChange(
boolean isChanged = asnChanged
|| certificateFingerprintChanged
|| securityHeadersChanged
|| certificateProfileChanged
|| softwareChanged;
return new SecurityInformationChange(
@@ -59,6 +64,8 @@ public record SecurityInformationChange(
certificateProfileChanged,
certificateSanChanged,
certificatePublicKeyChanged,
certificateSerialNumberChanged,
certificateIssuerChanged,
oldCertificateTimeToExpiry,
securityHeadersChanged,
ipChanged,

View File

@@ -48,7 +48,6 @@ public class DnsPingService {
switch (changes) {
case DnsRecordChange.None _ -> {}
case DnsRecordChange.Changed changed -> {
logger.info("DNS record for {} changed: {}", newRecord.dnsRootDomainId(), changed);
generatedRecords.add(DomainDnsEvent.builder()
.rootDomainId(newRecord.dnsRootDomainId())
.nodeId(newRecord.nodeAffinity())

View File

@@ -8,6 +8,7 @@ import nu.marginalia.ping.ssl.PKIXValidationResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateEncodingException;
@@ -21,13 +22,17 @@ public class DomainSecurityInformationFactory {
private static final Logger logger = LoggerFactory.getLogger(DomainSecurityInformationFactory.class);
// Vanilla HTTP (not HTTPS) response does not have SSL session information, so we return null
public DomainSecurityRecord createHttpSecurityInformation(HttpResponse httpResponse, int domainId, int nodeId) {
public DomainSecurityRecord createHttpSecurityInformation(HttpResponse httpResponse,
int domainId, int nodeId,
@Nullable Integer asn
) {
var headers = httpResponse.headers();
return DomainSecurityRecord.builder()
.domainId(domainId)
.nodeId(nodeId)
.asn(asn)
.httpSchema(HttpSchema.HTTP)
.httpVersion(httpResponse.version())
.headerServer(headers.getFirst("Server"))
@@ -47,7 +52,13 @@ public class DomainSecurityInformationFactory {
}
// HTTPS response
public DomainSecurityRecord createHttpsSecurityInformation(HttpsResponse httpResponse, PKIXValidationResult validationResult, int domainId, int nodeId) {
public DomainSecurityRecord createHttpsSecurityInformation(
HttpsResponse httpResponse,
PKIXValidationResult validationResult,
int domainId,
int nodeId,
@Nullable Integer asn
) {
var headers = httpResponse.headers();
@@ -86,6 +97,7 @@ public class DomainSecurityInformationFactory {
return DomainSecurityRecord.builder()
.domainId(domainId)
.nodeId(nodeId)
.asn(asn)
.httpSchema(HttpSchema.HTTPS)
.headerServer(headers.getFirst("Server"))
.headerCorsAllowOrigin(headers.getFirst("Access-Control-Allow-Origin"))

View File

@@ -18,6 +18,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.cert.X509Certificate;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -75,8 +76,8 @@ public class HttpPingService {
result = pingHttpFetcher.fetchUrl(url, Method.HEAD, null, null);
if (result instanceof HttpsResponse response && response.httpStatus() == 405) {
// If we get a 405, we try the GET method instead as not all servers support HEAD requests
if (result instanceof HttpsResponse response && shouldTryGET(response.httpStatus())) {
sleep(Duration.ofSeconds(2));
result = pingHttpFetcher.fetchUrl(url, Method.GET, null, null);
}
else if (result instanceof ConnectionError) {
@@ -84,8 +85,8 @@ public class HttpPingService {
if (!(result2 instanceof ConnectionError)) {
result = result2;
}
if (result instanceof HttpResponse response && response.httpStatus() == 405) {
// If we get a 405, we try the GET method instead as not all servers support HEAD requests
if (result instanceof HttpResponse response && shouldTryGET(response.httpStatus())) {
sleep(Duration.ofSeconds(2));
result = pingHttpFetcher.fetchUrl(alternateUrl, Method.GET, null, null);
}
}
@@ -116,7 +117,7 @@ public class HttpPingService {
domainReference.nodeId(),
oldPingStatus,
ErrorClassification.CONNECTION_ERROR,
null);
rsp.errorMessage());
newSecurityInformation = null;
}
case TimeoutResponse rsp -> {
@@ -148,7 +149,8 @@ public class HttpPingService {
newSecurityInformation = domainSecurityInformationFactory.createHttpSecurityInformation(
httpResponse,
domainReference.domainId(),
domainReference.nodeId()
domainReference.nodeId(),
newPingStatus.asn()
);
}
case HttpsResponse httpsResponse -> {
@@ -166,7 +168,8 @@ public class HttpPingService {
httpsResponse,
validationResult,
domainReference.domainId(),
domainReference.nodeId()
domainReference.nodeId(),
newPingStatus.asn()
);
}
}
@@ -190,6 +193,29 @@ public class HttpPingService {
return generatedRecords;
}
private boolean shouldTryGET(int statusCode) {
if (statusCode < 400) {
return false;
}
if (statusCode == 429) { // Too many requests, we should not retry with GET
return false;
}
// For all other status codes, we can try a GET request, as many severs do not
// cope with HEAD requests properly.
return statusCode < 600;
}
private void sleep(Duration duration) {
try {
Thread.sleep(duration.toMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore the interrupted status
logger.warn("Sleep interrupted", e);
}
}
private void comparePingStatuses(List<WritableModel> generatedRecords,
DomainAvailabilityRecord oldPingStatus,
DomainAvailabilityRecord newPingStatus) {
@@ -258,6 +284,8 @@ public class HttpPingService {
change.isCertificateProfileChanged(),
change.isCertificateSanChanged(),
change.isCertificatePublicKeyChanged(),
change.isCertificateSerialNumberChanged(),
change.isCertificateIssuerChanged(),
change.oldCertificateTimeToExpiry(),
change.isSecurityHeadersChanged(),
change.isIpAddressChanged(),

View File

@@ -318,6 +318,8 @@ class PingDaoTest {
true,
false,
true,
true,
false,
Duration.ofDays(30),
false,
false,

View File

@@ -1,9 +1,8 @@
package nu.marginalia.mqapi.ping;
public class PingRequest {
public final String runClass;
public PingRequest(String runClass) {
this.runClass = runClass;
public PingRequest() {
}
}