1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-05 21:22:39 +02:00

(refac) Rename PingJob classes and methods to AvailabilityJob for improved clarity and consistency

This commit is contained in:
Viktor Lofgren
2025-06-11 13:52:18 +02:00
parent 1a9ae1bc40
commit d587544d3a
3 changed files with 27 additions and 26 deletions

View File

@@ -44,16 +44,16 @@ public class PingJobScheduler {
}
}
sealed interface PingJob {
sealed interface AvailabilityJob {
Object reference();
record Ping(DomainReference domainReference) implements PingJob {
record Availability(DomainReference domainReference) implements AvailabilityJob {
@Override
public Object reference() {
return domainReference.domainName();
}
}
record PingRefresh(String domain, @NotNull DomainAvailabilityRecord availability, @Nullable DomainSecurityRecord securityRecord) implements PingJob {
record AvailabilityRefresh(String domain, @NotNull DomainAvailabilityRecord availability, @Nullable DomainSecurityRecord securityRecord) implements AvailabilityJob {
@Override
public Object reference() {
return domain;
@@ -66,11 +66,11 @@ public class PingJobScheduler {
// in the database. In real-world scenarios, the queues will be full most
// of the time, and prevent this from being an issue.
private static final ConcurrentHashMap<Object, Boolean> processingDomainsPing = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<Object, Boolean> processingDomainsAvailability = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<Object, Boolean> processingDomainsDns = new ConcurrentHashMap<>();
private static final ArrayBlockingQueue<DnsJob> dnsJobQueue = new ArrayBlockingQueue<>(8);
private static final ArrayBlockingQueue<PingJob> pingJobQueue = new ArrayBlockingQueue<>(8);
private static final ArrayBlockingQueue<AvailabilityJob> availabilityJobQueue = new ArrayBlockingQueue<>(8);
public volatile Integer nodeId = null;
public volatile boolean running = false;
@@ -96,12 +96,12 @@ public class PingJobScheduler {
running = true;
allThreads.add(Thread.ofPlatform().daemon().name("new-dns").start(this::fetchNewDnsRecords));
allThreads.add(Thread.ofPlatform().daemon().name("new-pings").start(this::fetchNewPingJobs));
allThreads.add(Thread.ofPlatform().daemon().name("update-pings").start(this::updatePingJobs));
allThreads.add(Thread.ofPlatform().daemon().name("new-pings").start(this::fetchNewAvailabilityJobs));
allThreads.add(Thread.ofPlatform().daemon().name("update-pings").start(this::updateAvailabilityJobs));
allThreads.add(Thread.ofPlatform().daemon().name("update-dns").start(this::updateDnsJobs));
for (int i = 0; i < 4; i++) {
allThreads.add(Thread.ofPlatform().daemon().name("ping-job-consumer-" + i).start(this::pingJobConsumer));
allThreads.add(Thread.ofPlatform().daemon().name("ping-job-consumer-" + i).start(this::availabilityJobConsumer));
}
for (int i = 0; i < 4; i++) {
allThreads.add(Thread.ofPlatform().daemon().name("dns-job-consumer-" + i).start(this::dnsJobConsumer));
@@ -147,22 +147,22 @@ public class PingJobScheduler {
}
}
private void pingJobConsumer() {
private void availabilityJobConsumer() {
while (running) {
try {
PingJob job = pingJobQueue.poll(1, TimeUnit.SECONDS);
AvailabilityJob job = availabilityJobQueue.poll(1, TimeUnit.SECONDS);
if (job == null) {
continue; // No job available, continue to the next iteration
}
try {
switch (job) {
case PingJob.Ping(DomainReference reference) -> {
logger.info("Pinging domain: {}", reference.domainName());
case AvailabilityJob.Availability(DomainReference reference) -> {
logger.info("Availability check: {}", reference.domainName());
pingDao.write(httpPingService.pingDomain(reference, null, null));
}
case PingJob.PingRefresh(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security) -> {
logger.info("Pinging domain with reference: {}", domain);
case AvailabilityJob.AvailabilityRefresh(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security) -> {
logger.info("Availability check with reference: {}", domain);
pingDao.write(httpPingService.pingDomain(
new DomainReference(availability.domainId(), availability.nodeId(), domain),
availability,
@@ -171,18 +171,18 @@ public class PingJobScheduler {
}
}
catch (Exception e) {
logger.error("Error processing ping job for domain: " + job.reference(), e);
logger.error("Error processing availability job for domain: " + job.reference(), e);
}
finally {
// Remove the domain from the processing map
processingDomainsPing.remove(job.reference());
processingDomainsAvailability.remove(job.reference());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Ping job consumer interrupted", e);
logger.error("Availability job consumer interrupted", e);
break;
} catch (Exception e) {
logger.error("Error processing ping job", e);
logger.error("Error processing availability job", e);
}
}
}
@@ -224,7 +224,7 @@ public class PingJobScheduler {
}
}
private void fetchNewPingJobs() {
private void fetchNewAvailabilityJobs() {
try {
while (running) {
@@ -243,7 +243,7 @@ public class PingJobScheduler {
}
try {
pingJobQueue.put(new PingJob.Ping(domain));
availabilityJobQueue.put(new AvailabilityJob.Availability(domain));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Failed to add new ping job for domain: " + domain, e);
@@ -296,7 +296,7 @@ public class PingJobScheduler {
}
}
private void updatePingJobs() {
private void updateAvailabilityJobs() {
while (running) {
try {
@@ -316,13 +316,13 @@ public class PingJobScheduler {
for (var status : statuses) {
var job = switch (status) {
case HistoricalAvailabilityData.JustAvailability(String domain, DomainAvailabilityRecord record)
-> new PingJob.PingRefresh(domain, record, null);
-> new AvailabilityJob.AvailabilityRefresh(domain, record, null);
case HistoricalAvailabilityData.AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security)
-> new PingJob.PingRefresh(domain, availability, security);
-> new AvailabilityJob.AvailabilityRefresh(domain, availability, security);
};
if (processingDomainsPing.putIfAbsent(job.reference(), true) == null) {
pingJobQueue.add(job);
if (processingDomainsAvailability.putIfAbsent(job.reference(), true) == null) {
availabilityJobQueue.add(job);
}
}

View File

@@ -32,6 +32,7 @@ public class PingHttpFetcher {
var builder = ClassicRequestBuilder.create(method.name())
.setUri(url)
.addHeader("Accept", "text/*, */*;q=0.9")
.addHeader("User-Agent", userAgent.uaString())
.addHeader("Accept-Encoding", "gzip");
if (etag != null) {

View File

@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
@Testcontainers
@Execution(ExecutionMode.SAME_THREAD)
@Tag("slow")
class PingJobSchedulerTest {
class AvailabilityJobSchedulerTest {
@Container
static MariaDBContainer<?> mariaDBContainer = new MariaDBContainer<>("mariadb")
.withDatabaseName("WMSA_prod")