1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

6 Commits

Author SHA1 Message Date
Viktor Lofgren
42b24cfa34 (ping) Fix NPE in dnsJobConsumer 2025-06-12 14:22:09 +02:00
Viktor Lofgren
1ffaab2da6 (ping) Mute logging along the happy path now that things are working 2025-06-12 14:15:23 +02:00
Viktor Lofgren
5f93c7f767 (ping) Update PROC_PING_SPAWNER to use REALTIME from SIDELOAD 2025-06-12 14:04:09 +02:00
Viktor Lofgren
4001c68c82 (ping) Update SQL query to include NODE_AFFINITY in historical availability data retrieval 2025-06-12 13:58:50 +02:00
Viktor Lofgren
6b811489c5 (actor) Make ping spawner auto-spawn the process 2025-06-12 13:46:50 +02:00
Viktor Lofgren
e9d317c65d (ping) Parameterize thread counts for availability and DNS job consumers 2025-06-12 13:34:58 +02:00
5 changed files with 191 additions and 36 deletions

View File

@@ -12,7 +12,7 @@ public enum ExecutorActor {
RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
RECRAWL_SINGLE_DOMAIN(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
PROC_CRAWLER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
PROC_PING_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.SIDELOAD),
PROC_PING_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.REALTIME),
PROC_EXPORT_TASKS_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
ADJACENCY_CALCULATION(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
EXPORT_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),

View File

@@ -3,24 +3,184 @@ package nu.marginalia.actor.proc;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import nu.marginalia.actor.state.Terminal;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqapi.ping.PingRequest;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.nodecfg.model.NodeProfile;
import nu.marginalia.process.ProcessService;
import nu.marginalia.service.module.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Singleton
public class PingMonitorActor extends AbstractProcessSpawnerActor {
public class PingMonitorActor extends RecordActorPrototype {
@Inject
public PingMonitorActor(Gson gson, ServiceConfiguration configuration, MqPersistence persistence, ProcessService processService) {
super(gson,
configuration,
persistence,
processService,
ProcessInboxNames.PING_INBOX,
ProcessService.ProcessId.PING);
private final MqPersistence persistence;
private final ProcessService processService;
private final Logger logger = LoggerFactory.getLogger(getClass());
public static final int MAX_ATTEMPTS = 3;
private final String inboxName;
private final ProcessService.ProcessId processId;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final int node;
private final boolean isPrimaryNode;
private final Gson gson;
public record Initial() implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RETRY)
public record Monitor(int errorAttempts) implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RESTART)
public record Run(int attempts) implements ActorStep {}
@Terminal
public record Aborted() implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Initial i -> {
PingRequest request = new PingRequest(isPrimaryNode ? "primary": "secondary");
persistence.sendNewMessage(inboxName, null, null,
"PingRequest",
gson.toJson(request),
null);
yield new Monitor(0);
}
case Monitor(int errorAttempts) -> {
for (;;) {
var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty() && !processService.isRunning(processId)) {
synchronized (processId) {
processId.wait(5000);
}
if (errorAttempts > 0) { // Reset the error counter if there is silence in the inbox
yield new Monitor(0);
}
// else continue
} else {
// Special: Associate this thread with the message so that we can get tracking
MqMessageHandlerRegistry.register(messages.getFirst().msgId());
yield new Run(0);
}
}
}
case Run(int attempts) -> {
try {
long startTime = System.currentTimeMillis();
var exec = new TaskExecution();
long endTime = System.currentTimeMillis();
if (exec.isError()) {
if (attempts < MAX_ATTEMPTS)
yield new Run(attempts + 1);
else
yield new Error();
}
else if (endTime - startTime < TimeUnit.SECONDS.toMillis(1)) {
// To avoid boot loops, we transition to error if the process
// didn't run for longer than 1 seconds. This might happen if
// the process crashes before it can reach the heartbeat and inbox
// stages of execution. In this case it would not report having acted
// on its message, and the process would be restarted forever without
// the attempts counter incrementing.
yield new Error("Process terminated within 1 seconds of starting");
}
}
catch (InterruptedException ex) {
// We get this exception when the process is cancelled by the user
processService.kill(processId);
setCurrentMessageToDead();
yield new Aborted();
}
yield new Monitor(attempts);
}
default -> new Error();
};
}
public String describe() {
return "Spawns a(n) " + processId + " process and monitors its inbox for messages";
}
@Inject
public PingMonitorActor(Gson gson,
NodeConfigurationService nodeConfigurationService,
ServiceConfiguration configuration,
MqPersistence persistence,
ProcessService processService) throws SQLException {
super(gson);
this.gson = gson;
this.node = configuration.node();
this.persistence = persistence;
this.processService = processService;
this.inboxName = ProcessInboxNames.PING_INBOX + ":" + node;
this.processId = ProcessService.ProcessId.PING;
this.isPrimaryNode = Set.of(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED)
.contains(nodeConfigurationService.get(node).profile());
}
/** Sets the message to dead in the database to avoid
* the service respawning on the same task when we
* re-enable this actor */
private void setCurrentMessageToDead() {
try {
var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty()) // Possibly a race condition where the task is already finished
return;
var theMessage = messages.iterator().next();
persistence.updateMessageState(theMessage.msgId(), MqMessageState.DEAD);
}
catch (SQLException ex) {
logger.error("Tried but failed to set the message for " + processId + " to dead", ex);
}
}
/** Encapsulates the execution of the process in a separate thread so that
* we can interrupt the thread if the process is cancelled */
private class TaskExecution {
private final AtomicBoolean error = new AtomicBoolean(false);
public TaskExecution() throws ExecutionException, InterruptedException {
// Run this call in a separate thread so that this thread can be interrupted waiting for it
executorService.submit(() -> {
try {
processService.trigger(processId);
} catch (Exception e) {
logger.warn("Error in triggering process", e);
error.set(true);
}
}).get(); // Wait for the process to start
}
public boolean isError() {
return error.get();
}
}
}

View File

@@ -137,7 +137,7 @@ public class PingDao {
public HistoricalAvailabilityData getHistoricalAvailabilityData(long domainId) throws SQLException {
var query = """
SELECT EC_DOMAIN.ID, EC_DOMAIN.DOMAIN_NAME, DOMAIN_AVAILABILITY_INFORMATION.*, DOMAIN_SECURITY_INFORMATION.*
SELECT EC_DOMAIN.ID, EC_DOMAIN.DOMAIN_NAME, EC_DOMAIN.NODE_AFFINITY, DOMAIN_AVAILABILITY_INFORMATION.*, DOMAIN_SECURITY_INFORMATION.*
FROM EC_DOMAIN
LEFT JOIN DOMAIN_SECURITY_INFORMATION ON DOMAIN_SECURITY_INFORMATION.DOMAIN_ID = EC_DOMAIN.ID
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION ON DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID = EC_DOMAIN.ID
@@ -168,7 +168,7 @@ public class PingDao {
if (dar == null) {
return new HistoricalAvailabilityData.JustDomainReference(new DomainReference(
rs.getInt("EC_DOMAIN.ID"),
rs.getInt("EC_DOMAIN.NODE_ID"),
rs.getInt("EC_DOMAIN.NODE_AFFINITY"),
domainName.toLowerCase()
));
}

View File

@@ -7,6 +7,7 @@ import nu.marginalia.ping.svc.HttpPingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@@ -60,11 +61,13 @@ public class PingJobScheduler {
allThreads.add(Thread.ofPlatform().daemon().name("sync-dns").start(this::syncAvailabilityJobs));
allThreads.add(Thread.ofPlatform().daemon().name("sync-availability").start(this::syncDnsRecords));
int availabilityThreads = Integer.getInteger("ping.availabilityThreads", 8);
int pingThreads = Integer.getInteger("ping.dnsThreads", 2);
for (int i = 0; i < 8; i++) {
for (int i = 0; i < availabilityThreads; i++) {
allThreads.add(Thread.ofPlatform().daemon().name("availability-job-consumer-" + i).start(this::availabilityJobConsumer));
}
for (int i = 0; i < 1; i++) {
for (int i = 0; i < pingThreads; i++) {
allThreads.add(Thread.ofPlatform().daemon().name("dns-job-consumer-" + i).start(this::dnsJobConsumer));
}
}
@@ -83,6 +86,8 @@ public class PingJobScheduler {
}
public void pause(int nodeId) {
logger.info("Pausing PingJobScheduler for nodeId: {}", nodeId);
if (this.nodeId != null && this.nodeId != nodeId) {
logger.warn("Attempted to pause PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId);
return;
@@ -96,6 +101,7 @@ public class PingJobScheduler {
}
public synchronized void resume(int nodeId) {
logger.info("Resuming PingJobScheduler for nodeId: {}", nodeId);
if (this.nodeId != null) {
logger.warn("Attempted to resume PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId);
return;
@@ -137,24 +143,14 @@ public class PingJobScheduler {
try {
List<WritableModel> objects = switch (data) {
case HistoricalAvailabilityData.JustDomainReference(DomainReference reference) -> {
logger.info("Processing availability job for domain: {}", reference.domainName());
yield httpPingService.pingDomain(reference, null, null);
}
case HistoricalAvailabilityData.JustAvailability(String domain, DomainAvailabilityRecord record) -> {
logger.info("Availability check with no security info: {}", domain);
yield httpPingService.pingDomain(
new DomainReference(record.domainId(), record.nodeId(), domain),
record,
null);
}
case HistoricalAvailabilityData.AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security) -> {
logger.info("Availability check with full historical data: {}", domain);
yield httpPingService.pingDomain(
new DomainReference(availability.domainId(), availability.nodeId(), domain),
availability,
security);
}
case HistoricalAvailabilityData.JustDomainReference(DomainReference reference)
-> httpPingService.pingDomain(reference, null, null);
case HistoricalAvailabilityData.JustAvailability(String domain, DomainAvailabilityRecord record)
-> httpPingService.pingDomain(
new DomainReference(record.domainId(), record.nodeId(), domain), record, null);
case HistoricalAvailabilityData.AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security)
-> httpPingService.pingDomain(
new DomainReference(availability.domainId(), availability.nodeId(), domain), availability, security);
};
pingDao.write(objects);
@@ -199,8 +195,8 @@ public class PingJobScheduler {
yield dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord);
}
case RootDomainReference.ByName(String name) -> {
var oldRecord = pingDao.getDomainDnsRecord(name);
yield dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord);
@Nullable var oldRecord = pingDao.getDomainDnsRecord(name);
yield dnsPingService.pingDomain(name, oldRecord);
}
};

View File

@@ -48,7 +48,6 @@ public class DnsPingService {
switch (changes) {
case DnsRecordChange.None _ -> {}
case DnsRecordChange.Changed changed -> {
logger.info("DNS record for {} changed: {}", newRecord.dnsRootDomainId(), changed);
generatedRecords.add(DomainDnsEvent.builder()
.rootDomainId(newRecord.dnsRootDomainId())
.nodeId(newRecord.nodeAffinity())