mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 17:32:39 +02:00
Compare commits
2 Commits
deploy-023
...
deploy-023
Author | SHA1 | Date | |
---|---|---|---|
|
9f041d6631 | ||
|
13fb1efce4 |
@@ -13,15 +13,12 @@ import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
|
|||||||
import nu.marginalia.mq.persistence.MqPersistence;
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||||
import nu.marginalia.mqapi.ping.PingRequest;
|
import nu.marginalia.mqapi.ping.PingRequest;
|
||||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
|
||||||
import nu.marginalia.nodecfg.model.NodeProfile;
|
|
||||||
import nu.marginalia.process.ProcessService;
|
import nu.marginalia.process.ProcessService;
|
||||||
import nu.marginalia.service.module.ServiceConfiguration;
|
import nu.marginalia.service.module.ServiceConfiguration;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
@@ -41,7 +38,6 @@ public class PingMonitorActor extends RecordActorPrototype {
|
|||||||
private final ProcessService.ProcessId processId;
|
private final ProcessService.ProcessId processId;
|
||||||
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
|
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
|
||||||
private final int node;
|
private final int node;
|
||||||
private final boolean isPrimaryNode;
|
|
||||||
private final Gson gson;
|
private final Gson gson;
|
||||||
|
|
||||||
public record Initial() implements ActorStep {}
|
public record Initial() implements ActorStep {}
|
||||||
@@ -56,7 +52,7 @@ public class PingMonitorActor extends RecordActorPrototype {
|
|||||||
public ActorStep transition(ActorStep self) throws Exception {
|
public ActorStep transition(ActorStep self) throws Exception {
|
||||||
return switch (self) {
|
return switch (self) {
|
||||||
case Initial i -> {
|
case Initial i -> {
|
||||||
PingRequest request = new PingRequest(isPrimaryNode ? "primary": "secondary");
|
PingRequest request = new PingRequest();
|
||||||
|
|
||||||
persistence.sendNewMessage(inboxName, null, null,
|
persistence.sendNewMessage(inboxName, null, null,
|
||||||
"PingRequest",
|
"PingRequest",
|
||||||
@@ -129,7 +125,6 @@ public class PingMonitorActor extends RecordActorPrototype {
|
|||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public PingMonitorActor(Gson gson,
|
public PingMonitorActor(Gson gson,
|
||||||
NodeConfigurationService nodeConfigurationService,
|
|
||||||
ServiceConfiguration configuration,
|
ServiceConfiguration configuration,
|
||||||
MqPersistence persistence,
|
MqPersistence persistence,
|
||||||
ProcessService processService) throws SQLException {
|
ProcessService processService) throws SQLException {
|
||||||
@@ -140,9 +135,6 @@ public class PingMonitorActor extends RecordActorPrototype {
|
|||||||
this.processService = processService;
|
this.processService = processService;
|
||||||
this.inboxName = ProcessInboxNames.PING_INBOX + ":" + node;
|
this.inboxName = ProcessInboxNames.PING_INBOX + ":" + node;
|
||||||
this.processId = ProcessService.ProcessId.PING;
|
this.processId = ProcessService.ProcessId.PING;
|
||||||
|
|
||||||
this.isPrimaryNode = Set.of(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED)
|
|
||||||
.contains(nodeConfigurationService.get(node).profile());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Sets the message to dead in the database to avoid
|
/** Sets the message to dead in the database to avoid
|
||||||
|
@@ -10,7 +10,6 @@ import nu.marginalia.mq.MessageQueueFactory;
|
|||||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||||
import nu.marginalia.mqapi.ping.PingRequest;
|
import nu.marginalia.mqapi.ping.PingRequest;
|
||||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||||
import nu.marginalia.nodecfg.model.NodeConfiguration;
|
|
||||||
import nu.marginalia.process.ProcessConfiguration;
|
import nu.marginalia.process.ProcessConfiguration;
|
||||||
import nu.marginalia.process.ProcessConfigurationModule;
|
import nu.marginalia.process.ProcessConfigurationModule;
|
||||||
import nu.marginalia.process.ProcessMainClass;
|
import nu.marginalia.process.ProcessMainClass;
|
||||||
@@ -21,7 +20,6 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.security.Security;
|
import java.security.Security;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
public class PingMain extends ProcessMainClass {
|
public class PingMain extends ProcessMainClass {
|
||||||
private static final Logger log = LoggerFactory.getLogger(PingMain.class);
|
private static final Logger log = LoggerFactory.getLogger(PingMain.class);
|
||||||
@@ -56,56 +54,6 @@ public class PingMain extends ProcessMainClass {
|
|||||||
// Start the ping job scheduler
|
// Start the ping job scheduler
|
||||||
pingJobScheduler.start(true);
|
pingJobScheduler.start(true);
|
||||||
|
|
||||||
// Watch the crawler process to suspend/resume the ping job scheduler
|
|
||||||
try {
|
|
||||||
serviceRegistry.watchProcess("crawler", node, (running) -> {
|
|
||||||
if (running) {
|
|
||||||
log.info("Crawler process is running, suspending ping job scheduler.");
|
|
||||||
pingJobScheduler.pause(node);
|
|
||||||
} else {
|
|
||||||
log.warn("Crawler process is not running, resuming ping job scheduler.");
|
|
||||||
pingJobScheduler.resume(node);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
throw new RuntimeException("Failed to watch crawler process", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
log.info("PingMain started successfully.");
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public void runSecondary() {
|
|
||||||
log.info("Starting PingMain...");
|
|
||||||
|
|
||||||
List<Integer> crawlerNodes = nodeConfigurationService.getAll()
|
|
||||||
.stream()
|
|
||||||
.filter(node -> !node.disabled())
|
|
||||||
.filter(node -> node.profile().permitBatchCrawl())
|
|
||||||
.map(NodeConfiguration::node)
|
|
||||||
.toList()
|
|
||||||
;
|
|
||||||
|
|
||||||
// Start the ping job scheduler
|
|
||||||
pingJobScheduler.start(true);
|
|
||||||
|
|
||||||
// Watch the crawler process to suspend/resume the ping job scheduler
|
|
||||||
try {
|
|
||||||
serviceRegistry.watchProcessAnyNode("crawler", crawlerNodes, (running, n) -> {
|
|
||||||
if (running) {
|
|
||||||
log.info("Crawler process is running on node {} taking over ", n);
|
|
||||||
pingJobScheduler.resume(n);
|
|
||||||
} else {
|
|
||||||
log.warn("Crawler process stopped, resuming ping job scheduler.");
|
|
||||||
pingJobScheduler.pause(n);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
throw new RuntimeException("Failed to watch crawler process", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
log.info("PingMain started successfully.");
|
log.info("PingMain started successfully.");
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -144,19 +92,8 @@ public class PingMain extends ProcessMainClass {
|
|||||||
var instructions = main.fetchInstructions(PingRequest.class);
|
var instructions = main.fetchInstructions(PingRequest.class);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
switch (instructions.value().runClass) {
|
|
||||||
case "primary":
|
|
||||||
log.info("Running as primary node");
|
|
||||||
main.runPrimary();
|
main.runPrimary();
|
||||||
break;
|
for(;;) main.wait(); // Wait on the object lock to avoid busy-looping
|
||||||
case "secondary":
|
|
||||||
log.info("Running as secondary node");
|
|
||||||
main.runSecondary();
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
throw new IllegalArgumentException("Invalid runClass: " + instructions.value().runClass);
|
|
||||||
}
|
|
||||||
for(;;);
|
|
||||||
}
|
}
|
||||||
catch (Throwable ex) {
|
catch (Throwable ex) {
|
||||||
logger.error("Error running ping process", ex);
|
logger.error("Error running ping process", ex);
|
||||||
|
@@ -8,6 +8,7 @@ import nu.marginalia.ping.ssl.PKIXValidationResult;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import java.security.MessageDigest;
|
import java.security.MessageDigest;
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.security.cert.CertificateEncodingException;
|
import java.security.cert.CertificateEncodingException;
|
||||||
@@ -21,13 +22,17 @@ public class DomainSecurityInformationFactory {
|
|||||||
private static final Logger logger = LoggerFactory.getLogger(DomainSecurityInformationFactory.class);
|
private static final Logger logger = LoggerFactory.getLogger(DomainSecurityInformationFactory.class);
|
||||||
|
|
||||||
// Vanilla HTTP (not HTTPS) response does not have SSL session information, so we return null
|
// Vanilla HTTP (not HTTPS) response does not have SSL session information, so we return null
|
||||||
public DomainSecurityRecord createHttpSecurityInformation(HttpResponse httpResponse, int domainId, int nodeId) {
|
public DomainSecurityRecord createHttpSecurityInformation(HttpResponse httpResponse,
|
||||||
|
int domainId, int nodeId,
|
||||||
|
@Nullable Integer asn
|
||||||
|
) {
|
||||||
|
|
||||||
var headers = httpResponse.headers();
|
var headers = httpResponse.headers();
|
||||||
|
|
||||||
return DomainSecurityRecord.builder()
|
return DomainSecurityRecord.builder()
|
||||||
.domainId(domainId)
|
.domainId(domainId)
|
||||||
.nodeId(nodeId)
|
.nodeId(nodeId)
|
||||||
|
.asn(asn)
|
||||||
.httpSchema(HttpSchema.HTTP)
|
.httpSchema(HttpSchema.HTTP)
|
||||||
.httpVersion(httpResponse.version())
|
.httpVersion(httpResponse.version())
|
||||||
.headerServer(headers.getFirst("Server"))
|
.headerServer(headers.getFirst("Server"))
|
||||||
@@ -47,7 +52,13 @@ public class DomainSecurityInformationFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// HTTPS response
|
// HTTPS response
|
||||||
public DomainSecurityRecord createHttpsSecurityInformation(HttpsResponse httpResponse, PKIXValidationResult validationResult, int domainId, int nodeId) {
|
public DomainSecurityRecord createHttpsSecurityInformation(
|
||||||
|
HttpsResponse httpResponse,
|
||||||
|
PKIXValidationResult validationResult,
|
||||||
|
int domainId,
|
||||||
|
int nodeId,
|
||||||
|
@Nullable Integer asn
|
||||||
|
) {
|
||||||
|
|
||||||
|
|
||||||
var headers = httpResponse.headers();
|
var headers = httpResponse.headers();
|
||||||
@@ -86,6 +97,7 @@ public class DomainSecurityInformationFactory {
|
|||||||
return DomainSecurityRecord.builder()
|
return DomainSecurityRecord.builder()
|
||||||
.domainId(domainId)
|
.domainId(domainId)
|
||||||
.nodeId(nodeId)
|
.nodeId(nodeId)
|
||||||
|
.asn(asn)
|
||||||
.httpSchema(HttpSchema.HTTPS)
|
.httpSchema(HttpSchema.HTTPS)
|
||||||
.headerServer(headers.getFirst("Server"))
|
.headerServer(headers.getFirst("Server"))
|
||||||
.headerCorsAllowOrigin(headers.getFirst("Access-Control-Allow-Origin"))
|
.headerCorsAllowOrigin(headers.getFirst("Access-Control-Allow-Origin"))
|
||||||
|
@@ -149,7 +149,8 @@ public class HttpPingService {
|
|||||||
newSecurityInformation = domainSecurityInformationFactory.createHttpSecurityInformation(
|
newSecurityInformation = domainSecurityInformationFactory.createHttpSecurityInformation(
|
||||||
httpResponse,
|
httpResponse,
|
||||||
domainReference.domainId(),
|
domainReference.domainId(),
|
||||||
domainReference.nodeId()
|
domainReference.nodeId(),
|
||||||
|
newPingStatus.asn()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
case HttpsResponse httpsResponse -> {
|
case HttpsResponse httpsResponse -> {
|
||||||
@@ -167,7 +168,8 @@ public class HttpPingService {
|
|||||||
httpsResponse,
|
httpsResponse,
|
||||||
validationResult,
|
validationResult,
|
||||||
domainReference.domainId(),
|
domainReference.domainId(),
|
||||||
domainReference.nodeId()
|
domainReference.nodeId(),
|
||||||
|
newPingStatus.asn()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,9 +1,8 @@
|
|||||||
package nu.marginalia.mqapi.ping;
|
package nu.marginalia.mqapi.ping;
|
||||||
|
|
||||||
public class PingRequest {
|
public class PingRequest {
|
||||||
public final String runClass;
|
|
||||||
|
|
||||||
public PingRequest(String runClass) {
|
public PingRequest() {
|
||||||
this.runClass = runClass;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user