mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 07:32:38 +02:00
Compare commits
11 Commits
deploy-021
...
deploy-022
Author | SHA1 | Date | |
---|---|---|---|
|
16b05a4737 | ||
|
021cd73cbb | ||
|
4253bd53b5 | ||
|
14c87461a5 | ||
|
9afed0a18e | ||
|
afad4deb94 | ||
|
f071c947e4 | ||
|
79996c9348 | ||
|
db907ab06a | ||
|
c49cd9dd95 | ||
|
eec9df3b0a |
@@ -47,6 +47,7 @@ dependencies {
|
||||
implementation libs.bundles.curator
|
||||
implementation libs.bundles.mariadb
|
||||
implementation libs.bundles.httpcomponents
|
||||
implementation libs.commons.lang3
|
||||
|
||||
implementation 'org.bouncycastle:bcprov-jdk18on:1.80'
|
||||
implementation 'org.bouncycastle:bcpkix-jdk18on:1.80'
|
||||
|
@@ -15,6 +15,7 @@ import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
@Singleton
|
||||
public class PingDao {
|
||||
@@ -76,32 +77,6 @@ public class PingDao {
|
||||
}
|
||||
}
|
||||
|
||||
public List<DomainReference> getNewDomains(int nodeId, int cnt) throws SQLException {
|
||||
List<DomainReference> domains = new ArrayList<>();
|
||||
try (var conn = dataSource.getConnection();
|
||||
var ps = conn.prepareStatement("""
|
||||
SELECT domain_id, domain_name
|
||||
FROM EC_DOMAIN
|
||||
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION
|
||||
ON EC_DOMAIN.domain_id = DOMAIN_AVAILABILITY_INFORMATION.domain_id
|
||||
WHERE DOMAIN_AVAILABILITY_INFORMATION.server_available IS NULL
|
||||
AND EC_DOMAIN.NODE_ID = ?
|
||||
LIMIT ?
|
||||
"""))
|
||||
{
|
||||
ps.setInt(1, nodeId);
|
||||
ps.setInt(2, cnt);
|
||||
|
||||
ResultSet rs = ps.executeQuery();
|
||||
|
||||
while (rs.next()) {
|
||||
domains.add(new DomainReference(rs.getInt("domain_id"), nodeId, rs.getString("domain_name").toLowerCase()));
|
||||
}
|
||||
}
|
||||
|
||||
return domains;
|
||||
}
|
||||
|
||||
public DomainAvailabilityRecord getDomainPingStatus(int domainId) throws SQLException {
|
||||
|
||||
try (var conn = dataSource.getConnection();
|
||||
@@ -132,7 +107,7 @@ public class PingDao {
|
||||
}
|
||||
}
|
||||
|
||||
public DomainDnsRecord getDomainDnsRecord(int dnsRootDomainId) throws SQLException {
|
||||
public DomainDnsRecord getDomainDnsRecord(long dnsRootDomainId) throws SQLException {
|
||||
try (var conn = dataSource.getConnection();
|
||||
var ps = conn.prepareStatement("SELECT * FROM DOMAIN_DNS_INFORMATION WHERE DNS_ROOT_DOMAIN_ID = ?")) {
|
||||
|
||||
@@ -160,111 +135,123 @@ public class PingDao {
|
||||
}
|
||||
}
|
||||
|
||||
public List<HistoricalAvailabilityData> getNextDomainPingStatuses(int count, int nodeId) throws SQLException {
|
||||
List<HistoricalAvailabilityData> domainAvailabilityRecords = new ArrayList<>(count);
|
||||
|
||||
public HistoricalAvailabilityData getHistoricalAvailabilityData(long domainId) throws SQLException {
|
||||
var query = """
|
||||
SELECT DOMAIN_AVAILABILITY_INFORMATION.*, DOMAIN_SECURITY_INFORMATION.*, EC_DOMAIN.DOMAIN_NAME FROM DOMAIN_AVAILABILITY_INFORMATION
|
||||
LEFT JOIN DOMAIN_SECURITY_INFORMATION
|
||||
ON DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID = DOMAIN_SECURITY_INFORMATION.DOMAIN_ID
|
||||
INNER JOIN EC_DOMAIN ON EC_DOMAIN.ID = DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID
|
||||
WHERE NEXT_SCHEDULED_UPDATE <= ? AND DOMAIN_AVAILABILITY_INFORMATION.NODE_ID = ?
|
||||
ORDER BY NEXT_SCHEDULED_UPDATE ASC
|
||||
LIMIT ?
|
||||
SELECT EC_DOMAIN.ID, EC_DOMAIN.DOMAIN_NAME, DOMAIN_AVAILABILITY_INFORMATION.*, DOMAIN_SECURITY_INFORMATION.*
|
||||
FROM EC_DOMAIN
|
||||
LEFT JOIN DOMAIN_SECURITY_INFORMATION ON DOMAIN_SECURITY_INFORMATION.DOMAIN_ID = EC_DOMAIN.ID
|
||||
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION ON DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID = EC_DOMAIN.ID
|
||||
WHERE EC_DOMAIN.ID = ?
|
||||
""";
|
||||
try (var conn = dataSource.getConnection();
|
||||
var ps = conn.prepareStatement(query)) {
|
||||
// Use Java time since this is how we generate the timestamps in the ping process
|
||||
// to avoid timezone weirdness.
|
||||
ps.setTimestamp(1, java.sql.Timestamp.from(Instant.now()));
|
||||
ps.setInt(2, nodeId);
|
||||
ps.setInt(3, count);
|
||||
|
||||
ps.setLong(1, domainId);
|
||||
|
||||
ResultSet rs = ps.executeQuery();
|
||||
while (rs.next()) {
|
||||
String domainName = rs.getString("EC_DOMAIN.DOMAIN_NAME");
|
||||
var domainAvailabilityRecord = new DomainAvailabilityRecord(rs);
|
||||
if (rs.getObject("DOMAIN_SECURITY_INFORMATION.DOMAIN_ID", Integer.class) != null) {
|
||||
var securityRecord = new DomainSecurityRecord(rs);
|
||||
domainAvailabilityRecords.add(
|
||||
new HistoricalAvailabilityData.AvailabilityAndSecurity(domainName, domainAvailabilityRecord, securityRecord)
|
||||
);
|
||||
|
||||
DomainAvailabilityRecord dar;
|
||||
DomainSecurityRecord dsr;
|
||||
|
||||
if (rs.getObject("DOMAIN_SECURITY_INFORMATION.DOMAIN_ID", Integer.class) != null)
|
||||
dsr = new DomainSecurityRecord(rs);
|
||||
else
|
||||
dsr = null;
|
||||
|
||||
if (rs.getObject("DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID", Integer.class) != null)
|
||||
dar = new DomainAvailabilityRecord(rs);
|
||||
else
|
||||
dar = null;
|
||||
|
||||
if (dar == null) {
|
||||
return new HistoricalAvailabilityData.JustDomainReference(new DomainReference(
|
||||
rs.getInt("EC_DOMAIN.ID"),
|
||||
rs.getInt("EC_DOMAIN.NODE_ID"),
|
||||
domainName.toLowerCase()
|
||||
));
|
||||
}
|
||||
else {
|
||||
if (dsr != null) {
|
||||
return new HistoricalAvailabilityData.AvailabilityAndSecurity(domainName, dar, dsr);
|
||||
} else {
|
||||
domainAvailabilityRecords.add(new HistoricalAvailabilityData.JustAvailability(domainName, domainAvailabilityRecord));
|
||||
return new HistoricalAvailabilityData.JustAvailability(domainName, dar);
|
||||
}
|
||||
}
|
||||
}
|
||||
return domainAvailabilityRecords;
|
||||
}
|
||||
|
||||
public List<DomainDnsRecord> getNextDnsDomainRecords(int count, int nodeId) throws SQLException {
|
||||
List<DomainDnsRecord> domainDnsRecords = new ArrayList<>(count);
|
||||
return null;
|
||||
}
|
||||
|
||||
public List<UpdateSchedule.UpdateJob<Long, HistoricalAvailabilityData>> getDomainUpdateSchedule(int nodeId) {
|
||||
List<UpdateSchedule.UpdateJob<Long, HistoricalAvailabilityData>> updateJobs = new ArrayList<>();
|
||||
|
||||
var query = """
|
||||
SELECT * FROM DOMAIN_DNS_INFORMATION
|
||||
WHERE TS_NEXT_DNS_CHECK <= ? AND NODE_AFFINITY = ?
|
||||
ORDER BY DNS_CHECK_PRIORITY ASC, TS_NEXT_DNS_CHECK ASC
|
||||
LIMIT ?
|
||||
""";
|
||||
try (var conn = dataSource.getConnection();
|
||||
var ps = conn.prepareStatement(query)) {
|
||||
ps.setTimestamp(1, java.sql.Timestamp.from(Instant.now()));
|
||||
ps.setInt(2, nodeId);
|
||||
ps.setInt(3, count);
|
||||
var ps = conn.prepareStatement("""
|
||||
SELECT ID, NEXT_SCHEDULED_UPDATE
|
||||
FROM EC_DOMAIN
|
||||
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION
|
||||
ON EC_DOMAIN.ID = DOMAIN_AVAILABILITY_INFORMATION.DOMAIN_ID
|
||||
WHERE NODE_AFFINITY = ?
|
||||
""")) {
|
||||
ps.setFetchSize(10_000);
|
||||
ps.setInt(1, nodeId);
|
||||
ResultSet rs = ps.executeQuery();
|
||||
while (rs.next()) {
|
||||
domainDnsRecords.add(new DomainDnsRecord(rs));
|
||||
long domainId = rs.getLong("ID");
|
||||
var ts = rs.getTimestamp("NEXT_SCHEDULED_UPDATE");
|
||||
Instant nextUpdate = ts == null ? Instant.now() : ts.toInstant();
|
||||
|
||||
updateJobs.add(new UpdateSchedule.UpdateJob<>(domainId, nextUpdate));
|
||||
}
|
||||
}
|
||||
return domainDnsRecords;
|
||||
} catch (SQLException e) {
|
||||
throw new RuntimeException("Failed to retrieve domain update schedule", e);
|
||||
}
|
||||
|
||||
public List<DomainReference> getOrphanedDomains(int nodeId) {
|
||||
List<DomainReference> orphanedDomains = new ArrayList<>();
|
||||
logger.info("Found {} availability update jobs for node {}", updateJobs.size(), nodeId);
|
||||
|
||||
return updateJobs;
|
||||
}
|
||||
|
||||
public List<UpdateSchedule.UpdateJob<RootDomainReference, RootDomainReference>> getDnsUpdateSchedule(int nodeId) {
|
||||
List<UpdateSchedule.UpdateJob<RootDomainReference, RootDomainReference>> updateJobs = new ArrayList<>();
|
||||
|
||||
try (var conn = dataSource.getConnection();
|
||||
var stmt = conn.prepareStatement("""
|
||||
SELECT e.DOMAIN_NAME, e.ID
|
||||
FROM EC_DOMAIN e
|
||||
LEFT JOIN DOMAIN_AVAILABILITY_INFORMATION d ON e.ID = d.DOMAIN_ID
|
||||
WHERE d.DOMAIN_ID IS NULL AND e.NODE_AFFINITY = ?;
|
||||
var ps = conn.prepareStatement("""
|
||||
SELECT DISTINCT(DOMAIN_TOP),DOMAIN_DNS_INFORMATION.* FROM EC_DOMAIN
|
||||
LEFT JOIN DOMAIN_DNS_INFORMATION ON ROOT_DOMAIN_NAME = DOMAIN_TOP
|
||||
WHERE EC_DOMAIN.NODE_AFFINITY = ?
|
||||
""")) {
|
||||
stmt.setInt(1, nodeId);
|
||||
stmt.setFetchSize(10_000);
|
||||
ResultSet rs = stmt.executeQuery();
|
||||
ps.setFetchSize(10_000);
|
||||
ps.setInt(1, nodeId);
|
||||
ResultSet rs = ps.executeQuery();
|
||||
while (rs.next()) {
|
||||
String domainName = rs.getString("DOMAIN_NAME");
|
||||
int domainId = rs.getInt("ID");
|
||||
Long dnsRootDomainId = rs.getObject("DOMAIN_DNS_INFORMATION.DNS_ROOT_DOMAIN_ID", Long.class);
|
||||
String rootDomainName = rs.getString("DOMAIN_TOP");
|
||||
|
||||
orphanedDomains.add(new DomainReference(domainId, nodeId, domainName));
|
||||
if (dnsRootDomainId == null) {
|
||||
updateJobs.add(
|
||||
new UpdateSchedule.UpdateJob<>(
|
||||
new RootDomainReference.ByName(rootDomainName),
|
||||
Instant.now())
|
||||
);
|
||||
}
|
||||
else {
|
||||
var record = new DomainDnsRecord(rs);
|
||||
updateJobs.add(new UpdateSchedule.UpdateJob<>(
|
||||
new RootDomainReference.ById(dnsRootDomainId),
|
||||
Objects.requireNonNullElseGet(record.tsNextScheduledUpdate(), Instant::now))
|
||||
);
|
||||
}
|
||||
}
|
||||
catch (SQLException e) {
|
||||
throw new RuntimeException("Failed to retrieve orphaned domains", e);
|
||||
} catch (SQLException e) {
|
||||
throw new RuntimeException("Failed to retrieve DNS update schedule", e);
|
||||
}
|
||||
|
||||
return orphanedDomains;
|
||||
}
|
||||
logger.info("Found {} dns update jobs for node {}", updateJobs.size(), nodeId);
|
||||
|
||||
public List<String> getOrphanedRootDomains(int nodeId) {
|
||||
List<String> orphanedDomains = new ArrayList<>();
|
||||
try (var conn = dataSource.getConnection();
|
||||
var stmt = conn.prepareStatement("""
|
||||
SELECT DISTINCT(DOMAIN_TOP)
|
||||
FROM EC_DOMAIN e
|
||||
LEFT JOIN DOMAIN_DNS_INFORMATION d ON e.DOMAIN_TOP = d.ROOT_DOMAIN_NAME
|
||||
WHERE d.ROOT_DOMAIN_NAME IS NULL AND e.NODE_AFFINITY = ?;
|
||||
""")) {
|
||||
stmt.setInt(1, nodeId);
|
||||
stmt.setFetchSize(10_000);
|
||||
ResultSet rs = stmt.executeQuery();
|
||||
while (rs.next()) {
|
||||
String domainName = rs.getString("DOMAIN_TOP");
|
||||
orphanedDomains.add(domainName.toLowerCase());
|
||||
}
|
||||
}
|
||||
catch (SQLException e) {
|
||||
throw new RuntimeException("Failed to retrieve orphaned domains", e);
|
||||
}
|
||||
|
||||
return orphanedDomains;
|
||||
return updateJobs;
|
||||
}
|
||||
}
|
||||
|
@@ -4,15 +4,14 @@ import com.google.inject.Inject;
|
||||
import nu.marginalia.ping.model.*;
|
||||
import nu.marginalia.ping.svc.DnsPingService;
|
||||
import nu.marginalia.ping.svc.HttpPingService;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/** PingJobScheduler is responsible for scheduling and processing ping jobs
|
||||
@@ -27,50 +26,13 @@ public class PingJobScheduler {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(PingJobScheduler.class);
|
||||
|
||||
sealed interface DnsJob {
|
||||
Object reference();
|
||||
private static final UpdateSchedule<RootDomainReference, RootDomainReference> dnsUpdateSchedule
|
||||
= new UpdateSchedule<>(250_000);
|
||||
private static final UpdateSchedule<Long, HistoricalAvailabilityData> availabilityUpdateSchedule
|
||||
= new UpdateSchedule<>(250_000);
|
||||
|
||||
record DnsFetch(String rootDomain) implements DnsJob {
|
||||
@Override
|
||||
public Object reference() {
|
||||
return rootDomain;
|
||||
}
|
||||
}
|
||||
record DnsRefresh(DomainDnsRecord oldRecord) implements DnsJob {
|
||||
@Override
|
||||
public Object reference() {
|
||||
return oldRecord.rootDomainName();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sealed interface AvailabilityJob {
|
||||
Object reference();
|
||||
|
||||
record Availability(DomainReference domainReference) implements AvailabilityJob {
|
||||
@Override
|
||||
public Object reference() {
|
||||
return domainReference.domainName();
|
||||
}
|
||||
}
|
||||
record AvailabilityRefresh(String domain, @NotNull DomainAvailabilityRecord availability, @Nullable DomainSecurityRecord securityRecord) implements AvailabilityJob {
|
||||
@Override
|
||||
public Object reference() {
|
||||
return domain;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Keeps track of ongoing ping and DNS processing to avoid duplicate work,
|
||||
// which is mainly a scenario that will occur when there is not a lot of data
|
||||
// in the database. In real-world scenarios, the queues will be full most
|
||||
// of the time, and prevent this from being an issue.
|
||||
|
||||
private static final ConcurrentHashMap<Object, Boolean> processingDomainsAvailability = new ConcurrentHashMap<>();
|
||||
private static final ConcurrentHashMap<Object, Boolean> processingDomainsDns = new ConcurrentHashMap<>();
|
||||
|
||||
private static final ArrayBlockingQueue<DnsJob> dnsJobQueue = new ArrayBlockingQueue<>(8);
|
||||
private static final ArrayBlockingQueue<AvailabilityJob> availabilityJobQueue = new ArrayBlockingQueue<>(8);
|
||||
public volatile Instant dnsLastSync = Instant.now();
|
||||
public volatile Instant availabilityLastSync = Instant.now();
|
||||
|
||||
public volatile Integer nodeId = null;
|
||||
public volatile boolean running = false;
|
||||
@@ -95,15 +57,14 @@ public class PingJobScheduler {
|
||||
|
||||
running = true;
|
||||
|
||||
allThreads.add(Thread.ofPlatform().daemon().name("new-dns").start(this::fetchNewDnsRecords));
|
||||
allThreads.add(Thread.ofPlatform().daemon().name("new-pings").start(this::fetchNewAvailabilityJobs));
|
||||
allThreads.add(Thread.ofPlatform().daemon().name("update-pings").start(this::updateAvailabilityJobs));
|
||||
allThreads.add(Thread.ofPlatform().daemon().name("update-dns").start(this::updateDnsJobs));
|
||||
allThreads.add(Thread.ofPlatform().daemon().name("sync-dns").start(this::syncAvailabilityJobs));
|
||||
allThreads.add(Thread.ofPlatform().daemon().name("sync-availability").start(this::syncDnsRecords));
|
||||
|
||||
for (int i = 0; i < 4; i++) {
|
||||
allThreads.add(Thread.ofPlatform().daemon().name("ping-job-consumer-" + i).start(this::availabilityJobConsumer));
|
||||
|
||||
for (int i = 0; i < 8; i++) {
|
||||
allThreads.add(Thread.ofPlatform().daemon().name("availability-job-consumer-" + i).start(this::availabilityJobConsumer));
|
||||
}
|
||||
for (int i = 0; i < 4; i++) {
|
||||
for (int i = 0; i < 1; i++) {
|
||||
allThreads.add(Thread.ofPlatform().daemon().name("dns-job-consumer-" + i).start(this::dnsJobConsumer));
|
||||
}
|
||||
}
|
||||
@@ -127,14 +88,25 @@ public class PingJobScheduler {
|
||||
return;
|
||||
}
|
||||
this.nodeId = null;
|
||||
|
||||
availabilityUpdateSchedule.clear();
|
||||
dnsUpdateSchedule.clear();
|
||||
|
||||
logger.info("PingJobScheduler paused");
|
||||
}
|
||||
|
||||
public synchronized void resume(int nodeId) {
|
||||
if (this.nodeId != null) {
|
||||
logger.warn("Attempted to resume PingJobScheduler with mismatched nodeId: expected null, got {}", this.nodeId, nodeId);
|
||||
logger.warn("Attempted to resume PingJobScheduler with mismatched nodeId: expected {}, got {}", this.nodeId, nodeId);
|
||||
return;
|
||||
}
|
||||
|
||||
availabilityUpdateSchedule.replaceQueue(pingDao.getDomainUpdateSchedule(nodeId));
|
||||
dnsUpdateSchedule.replaceQueue(pingDao.getDnsUpdateSchedule(nodeId));
|
||||
dnsLastSync = Instant.now();
|
||||
availabilityLastSync = Instant.now();
|
||||
|
||||
// Flag that we are running again
|
||||
this.nodeId = nodeId;
|
||||
|
||||
notifyAll();
|
||||
@@ -150,32 +122,54 @@ public class PingJobScheduler {
|
||||
private void availabilityJobConsumer() {
|
||||
while (running) {
|
||||
try {
|
||||
AvailabilityJob job = availabilityJobQueue.poll(1, TimeUnit.SECONDS);
|
||||
if (job == null) {
|
||||
continue; // No job available, continue to the next iteration
|
||||
Integer nid = nodeId;
|
||||
if (nid == null) {
|
||||
waitForResume();
|
||||
continue;
|
||||
}
|
||||
|
||||
long nextId = availabilityUpdateSchedule.next();
|
||||
var data = pingDao.getHistoricalAvailabilityData(nextId);
|
||||
if (data == null) {
|
||||
logger.warn("No availability data found for ID: {}", nextId);
|
||||
continue; // No data to process, skip this iteration
|
||||
}
|
||||
|
||||
try {
|
||||
switch (job) {
|
||||
case AvailabilityJob.Availability(DomainReference reference) -> {
|
||||
logger.info("Availability check: {}", reference.domainName());
|
||||
pingDao.write(httpPingService.pingDomain(reference, null, null));
|
||||
List<WritableModel> objects = switch (data) {
|
||||
case HistoricalAvailabilityData.JustDomainReference(DomainReference reference) -> {
|
||||
logger.info("Processing availability job for domain: {}", reference.domainName());
|
||||
yield httpPingService.pingDomain(reference, null, null);
|
||||
}
|
||||
case AvailabilityJob.AvailabilityRefresh(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security) -> {
|
||||
logger.info("Availability check with reference: {}", domain);
|
||||
pingDao.write(httpPingService.pingDomain(
|
||||
case HistoricalAvailabilityData.JustAvailability(String domain, DomainAvailabilityRecord record) -> {
|
||||
logger.info("Availability check with no security info: {}", domain);
|
||||
yield httpPingService.pingDomain(
|
||||
new DomainReference(record.domainId(), record.nodeId(), domain),
|
||||
record,
|
||||
null);
|
||||
}
|
||||
case HistoricalAvailabilityData.AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security) -> {
|
||||
logger.info("Availability check with full historical data: {}", domain);
|
||||
yield httpPingService.pingDomain(
|
||||
new DomainReference(availability.domainId(), availability.nodeId(), domain),
|
||||
availability,
|
||||
security));
|
||||
security);
|
||||
}
|
||||
};
|
||||
|
||||
pingDao.write(objects);
|
||||
|
||||
// Re-schedule the next update time for the domain
|
||||
for (var object : objects) {
|
||||
var ts = object.nextUpdateTime();
|
||||
if (ts != null) {
|
||||
availabilityUpdateSchedule.add(nextId, ts);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("Error processing availability job for domain: " + job.reference(), e);
|
||||
}
|
||||
finally {
|
||||
// Remove the domain from the processing map
|
||||
processingDomainsAvailability.remove(job.reference());
|
||||
logger.error("Error processing availability job for domain: " + data.domain(), e);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
@@ -190,30 +184,41 @@ public class PingJobScheduler {
|
||||
private void dnsJobConsumer() {
|
||||
while (running) {
|
||||
try {
|
||||
DnsJob job = dnsJobQueue.poll(1, TimeUnit.SECONDS);
|
||||
if (job == null) {
|
||||
continue; // No job available, continue to the next iteration
|
||||
Integer nid = nodeId;
|
||||
if (nid == null) {
|
||||
waitForResume();
|
||||
continue;
|
||||
}
|
||||
|
||||
RootDomainReference ref = dnsUpdateSchedule.next();
|
||||
|
||||
try {
|
||||
switch (job) {
|
||||
case DnsJob.DnsFetch(String rootDomain) -> {
|
||||
logger.info("Fetching DNS records for root domain: {}", rootDomain);
|
||||
pingDao.write(dnsPingService.pingDomain(rootDomain, null));
|
||||
List<WritableModel> objects = switch(ref) {
|
||||
case RootDomainReference.ById(long id) -> {
|
||||
var oldRecord = Objects.requireNonNull(pingDao.getDomainDnsRecord(id));
|
||||
yield dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord);
|
||||
}
|
||||
case DnsJob.DnsRefresh(DomainDnsRecord oldRecord) -> {
|
||||
logger.info("Refreshing DNS records for domain: {}", oldRecord.rootDomainName());
|
||||
pingDao.write(dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord));
|
||||
case RootDomainReference.ByName(String name) -> {
|
||||
var oldRecord = pingDao.getDomainDnsRecord(name);
|
||||
yield dnsPingService.pingDomain(oldRecord.rootDomainName(), oldRecord);
|
||||
}
|
||||
};
|
||||
|
||||
pingDao.write(objects);
|
||||
|
||||
// Re-schedule the next update time for the domain
|
||||
for (var object : objects) {
|
||||
var ts = object.nextUpdateTime();
|
||||
if (ts != null) {
|
||||
dnsUpdateSchedule.add(ref, ts);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("Error processing DNS job for domain: " + job.reference(), e);
|
||||
}
|
||||
finally {
|
||||
// Remove the domain from the processing map
|
||||
processingDomainsDns.remove(job.reference());
|
||||
logger.error("Error processing DNS job for domain: " + ref, e);
|
||||
}
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
logger.error("DNS job consumer interrupted", e);
|
||||
@@ -224,38 +229,27 @@ public class PingJobScheduler {
|
||||
}
|
||||
}
|
||||
|
||||
private void fetchNewAvailabilityJobs() {
|
||||
private void syncAvailabilityJobs() {
|
||||
try {
|
||||
while (running) {
|
||||
|
||||
// If we are suspended, wait for resume
|
||||
Integer nid = nodeId;
|
||||
if (nid == null) {
|
||||
waitForResume();
|
||||
continue; // re-fetch the records after resuming
|
||||
continue;
|
||||
}
|
||||
|
||||
List<DomainReference> domains = pingDao.getOrphanedDomains(nid);
|
||||
for (DomainReference domain : domains) {
|
||||
|
||||
if (nodeId == null) {
|
||||
waitForResume();
|
||||
break; // re-fetch the records after resuming
|
||||
// Check if we need to refresh the availability data
|
||||
Instant nextRefresh = availabilityLastSync.plus(Duration.ofHours(24));
|
||||
if (Instant.now().isBefore(nextRefresh)) {
|
||||
Duration remaining = Duration.between(Instant.now(), nextRefresh);
|
||||
TimeUnit.MINUTES.sleep(Math.max(1, remaining.toMinutes()));
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
availabilityJobQueue.put(new AvailabilityJob.Availability(domain));
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
logger.error("Failed to add new ping job for domain: " + domain, e);
|
||||
}
|
||||
}
|
||||
|
||||
// This is an incredibly expensive operation, so we only do it once a day
|
||||
try {
|
||||
TimeUnit.HOURS.sleep(24);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
availabilityUpdateSchedule.replaceQueue(pingDao.getDomainUpdateSchedule(nid));
|
||||
availabilityLastSync = Instant.now();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
@@ -263,32 +257,26 @@ public class PingJobScheduler {
|
||||
}
|
||||
}
|
||||
|
||||
private void fetchNewDnsRecords() {
|
||||
private void syncDnsRecords() {
|
||||
try {
|
||||
while (running) {
|
||||
|
||||
Integer nid = nodeId;
|
||||
if (nid == null) {
|
||||
waitForResume();
|
||||
continue; // re-fetch the records after resuming
|
||||
}
|
||||
|
||||
List<String> rootDomains = pingDao.getOrphanedRootDomains(nid);
|
||||
for (String rootDomain : rootDomains) {
|
||||
|
||||
if (nodeId == null) {
|
||||
waitForResume();
|
||||
break; // re-fetch the records after resuming
|
||||
// Check if we need to refresh the availability data
|
||||
Instant nextRefresh = dnsLastSync.plus(Duration.ofHours(24));
|
||||
if (Instant.now().isBefore(nextRefresh)) {
|
||||
Duration remaining = Duration.between(Instant.now(), nextRefresh);
|
||||
TimeUnit.MINUTES.sleep(Math.max(1, remaining.toMinutes()));
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
dnsJobQueue.put(new DnsJob.DnsFetch(rootDomain));
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
logger.error("Failed to add new DNS job for root domain: " + rootDomain, e);
|
||||
}
|
||||
}
|
||||
// This is an incredibly expensive operation, so we only do it once a day
|
||||
TimeUnit.HOURS.sleep(24);
|
||||
dnsUpdateSchedule.replaceQueue(pingDao.getDnsUpdateSchedule(nid));
|
||||
dnsLastSync = Instant.now();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
@@ -296,68 +284,5 @@ public class PingJobScheduler {
|
||||
}
|
||||
}
|
||||
|
||||
private void updateAvailabilityJobs() {
|
||||
|
||||
while (running) {
|
||||
try {
|
||||
Integer nid = nodeId;
|
||||
if (nid == null) {
|
||||
waitForResume();
|
||||
continue; // re-fetch the records after resuming
|
||||
}
|
||||
|
||||
var statuses = pingDao.getNextDomainPingStatuses(100, nid);
|
||||
|
||||
if (nodeId == null) {
|
||||
waitForResume();
|
||||
break; // re-fetch the records after resuming
|
||||
}
|
||||
|
||||
for (var status : statuses) {
|
||||
var job = switch (status) {
|
||||
case HistoricalAvailabilityData.JustAvailability(String domain, DomainAvailabilityRecord record)
|
||||
-> new AvailabilityJob.AvailabilityRefresh(domain, record, null);
|
||||
case HistoricalAvailabilityData.AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availability, DomainSecurityRecord security)
|
||||
-> new AvailabilityJob.AvailabilityRefresh(domain, availability, security);
|
||||
};
|
||||
|
||||
if (processingDomainsAvailability.putIfAbsent(job.reference(), true) == null) {
|
||||
availabilityJobQueue.add(job);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("Error fetching next domain ping statuses", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void updateDnsJobs() {
|
||||
while (running) {
|
||||
try {
|
||||
Integer nid = nodeId;
|
||||
if (nid == null) {
|
||||
waitForResume();
|
||||
continue; // re-fetch the records after resuming
|
||||
}
|
||||
|
||||
var dnsRecords = pingDao.getNextDnsDomainRecords(1000, nid);
|
||||
for (var record : dnsRecords) {
|
||||
if (nodeId == null) {
|
||||
waitForResume();
|
||||
break; // re-fetch the records after resuming
|
||||
}
|
||||
if (processingDomainsDns.putIfAbsent(record.rootDomainName(), true) == null) {
|
||||
dnsJobQueue.put(new DnsJob.DnsRefresh(record));
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("Error fetching next domain DNS records", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -0,0 +1,57 @@
|
||||
package nu.marginalia.ping;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.PriorityQueue;
|
||||
|
||||
/** In-memory schedule for updates, allowing jobs to be added and processed in order of their scheduled time.
|
||||
* This is not a particularly high-performance implementation, but exists to take contention off the database's
|
||||
* timestamp index.
|
||||
* */
|
||||
public class UpdateSchedule<T, T2> {
|
||||
private final PriorityQueue<UpdateJob<T, T2>> updateQueue;
|
||||
public record UpdateJob<T, T2>(T key, Instant updateTime) {}
|
||||
|
||||
public UpdateSchedule(int initialCapacity) {
|
||||
updateQueue = new PriorityQueue<>(initialCapacity, Comparator.comparing(UpdateJob::updateTime));
|
||||
}
|
||||
|
||||
public synchronized void add(T key, Instant updateTime) {
|
||||
updateQueue.add(new UpdateJob<>(key, updateTime));
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
public synchronized T next() throws InterruptedException {
|
||||
while (true) {
|
||||
if (updateQueue.isEmpty()) {
|
||||
wait(); // Wait for a new job to be added
|
||||
continue;
|
||||
}
|
||||
|
||||
UpdateJob<T, T2> job = updateQueue.peek();
|
||||
Instant now = Instant.now();
|
||||
|
||||
if (job.updateTime.isAfter(now)) {
|
||||
Duration toWait = Duration.between(now, job.updateTime);
|
||||
wait(Math.max(1, toWait.toMillis()));
|
||||
}
|
||||
else {
|
||||
updateQueue.poll(); // Remove the job from the queue since it's due
|
||||
return job.key();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void clear() {
|
||||
updateQueue.clear();
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
public synchronized void replaceQueue(Collection<UpdateJob<T,T2>> newJobs) {
|
||||
updateQueue.clear();
|
||||
updateQueue.addAll(newJobs);
|
||||
notifyAll();
|
||||
}
|
||||
}
|
@@ -3,6 +3,8 @@ package nu.marginalia.ping.fetcher;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.name.Named;
|
||||
import nu.marginalia.ping.model.SingleDnsRecord;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.xbill.DNS.ExtendedResolver;
|
||||
import org.xbill.DNS.Lookup;
|
||||
import org.xbill.DNS.TextParseException;
|
||||
@@ -17,6 +19,7 @@ import java.util.concurrent.*;
|
||||
public class PingDnsFetcher {
|
||||
private final ThreadLocal<ExtendedResolver> resolver;
|
||||
private static final ExecutorService digExecutor = Executors.newFixedThreadPool(100);
|
||||
private static final Logger logger = LoggerFactory.getLogger(PingDnsFetcher.class);
|
||||
|
||||
private static final int[] RECORD_TYPES = {
|
||||
Type.A, Type.AAAA, Type.NS, Type.MX, Type.TXT,
|
||||
@@ -25,8 +28,7 @@ public class PingDnsFetcher {
|
||||
|
||||
@Inject
|
||||
public PingDnsFetcher(@Named("ping.nameservers")
|
||||
List<String> nameservers) throws UnknownHostException
|
||||
{
|
||||
List<String> nameservers) {
|
||||
resolver = ThreadLocal.withInitial(() -> createResolver(nameservers));
|
||||
}
|
||||
|
||||
@@ -81,13 +83,12 @@ public class PingDnsFetcher {
|
||||
try {
|
||||
results.addAll(future.get(1, TimeUnit.MINUTES));
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
System.err.println("Error fetching DNS records: " + e.getMessage());
|
||||
logger.error("Error fetching DNS records", e);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
System.err.println("DNS query interrupted: " + e.getMessage());
|
||||
logger.error("DNS query interrupted", e);
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
@@ -44,14 +44,14 @@ public class HttpClientProvider implements Provider<HttpClient> {
|
||||
|
||||
private static CloseableHttpClient createClient() throws NoSuchAlgorithmException {
|
||||
final ConnectionConfig connectionConfig = ConnectionConfig.custom()
|
||||
.setSocketTimeout(30, TimeUnit.SECONDS)
|
||||
.setConnectTimeout(30, TimeUnit.SECONDS)
|
||||
.setSocketTimeout(15, TimeUnit.SECONDS)
|
||||
.setConnectTimeout(15, TimeUnit.SECONDS)
|
||||
.setValidateAfterInactivity(TimeValue.ofSeconds(5))
|
||||
.build();
|
||||
|
||||
connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
|
||||
.setMaxConnPerRoute(2)
|
||||
.setMaxConnTotal(5000)
|
||||
.setMaxConnTotal(50)
|
||||
.setDefaultConnectionConfig(connectionConfig)
|
||||
.setTlsSocketStrategy(
|
||||
new DefaultClientTlsStrategy(SSLContext.getDefault(), NoopHostnameVerifier.INSTANCE))
|
||||
|
@@ -70,6 +70,11 @@ implements WritableModel
|
||||
return millis == null ? null : Duration.ofMillis(millis);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Instant nextUpdateTime() {
|
||||
return nextScheduledUpdate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(Connection connection) throws SQLException {
|
||||
try (var ps = connection.prepareStatement(
|
||||
|
@@ -60,6 +60,11 @@ public record DomainDnsRecord(
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Instant nextUpdateTime() {
|
||||
return tsNextScheduledUpdate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(Connection connection) throws SQLException {
|
||||
|
||||
|
@@ -1,5 +1,7 @@
|
||||
package nu.marginalia.ping.model;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.sql.Connection;
|
||||
import java.sql.ResultSet;
|
||||
@@ -201,7 +203,6 @@ public record DomainSecurityRecord(
|
||||
if (sslCertFingerprintSha256() == null) {
|
||||
ps.setNull(12, java.sql.Types.BINARY);
|
||||
} else {
|
||||
System.out.println(sslCertFingerprintSha256().length);
|
||||
ps.setBytes(12, sslCertFingerprintSha256());
|
||||
}
|
||||
if (sslCertSan() == null) {
|
||||
@@ -359,12 +360,12 @@ public record DomainSecurityRecord(
|
||||
}
|
||||
|
||||
public Builder httpVersion(String httpVersion) {
|
||||
this.httpVersion = httpVersion;
|
||||
this.httpVersion = StringUtils.truncate(httpVersion, 10);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder httpCompression(String httpCompression) {
|
||||
this.httpCompression = httpCompression;
|
||||
this.httpCompression = StringUtils.truncate(httpCompression, 50);
|
||||
return this;
|
||||
}
|
||||
|
||||
@@ -384,12 +385,12 @@ public record DomainSecurityRecord(
|
||||
}
|
||||
|
||||
public Builder sslCertIssuer(String sslCertIssuer) {
|
||||
this.sslCertIssuer = sslCertIssuer;
|
||||
this.sslCertIssuer = StringUtils.truncate(sslCertIssuer, 255);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder sslCertSubject(String sslCertSubject) {
|
||||
this.sslCertSubject = sslCertSubject;
|
||||
this.sslCertSubject = StringUtils.truncate(sslCertSubject, 255);
|
||||
return this;
|
||||
}
|
||||
|
||||
@@ -459,37 +460,37 @@ public record DomainSecurityRecord(
|
||||
}
|
||||
|
||||
public Builder headerStrictTransportSecurity(String headerStrictTransportSecurity) {
|
||||
this.headerStrictTransportSecurity = headerStrictTransportSecurity;
|
||||
this.headerStrictTransportSecurity = StringUtils.truncate(headerStrictTransportSecurity, 255);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder headerReferrerPolicy(String headerReferrerPolicy) {
|
||||
this.headerReferrerPolicy = headerReferrerPolicy;
|
||||
this.headerReferrerPolicy = StringUtils.truncate(headerReferrerPolicy, 50);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder headerXFrameOptions(String headerXFrameOptions) {
|
||||
this.headerXFrameOptions = headerXFrameOptions;
|
||||
this.headerXFrameOptions = StringUtils.truncate(headerXFrameOptions, 50);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder headerXContentTypeOptions(String headerXContentTypeOptions) {
|
||||
this.headerXContentTypeOptions = headerXContentTypeOptions;
|
||||
this.headerXContentTypeOptions = StringUtils.truncate(headerXContentTypeOptions, 50);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder headerXXssProtection(String headerXXssProtection) {
|
||||
this.headerXXssProtection = headerXXssProtection;
|
||||
this.headerXXssProtection = StringUtils.truncate(headerXXssProtection, 50);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder headerServer(String headerServer) {
|
||||
this.headerServer = headerServer;
|
||||
this.headerServer = StringUtils.truncate(headerServer, 255);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder headerXPoweredBy(String headerXPoweredBy) {
|
||||
this.headerXPoweredBy = headerXPoweredBy;
|
||||
this.headerXPoweredBy = StringUtils.truncate(headerXPoweredBy, 255);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@@ -1,6 +1,13 @@
|
||||
package nu.marginalia.ping.model;
|
||||
|
||||
public sealed interface HistoricalAvailabilityData {
|
||||
public String domain();
|
||||
record JustDomainReference(DomainReference domainReference) implements HistoricalAvailabilityData {
|
||||
@Override
|
||||
public String domain() {
|
||||
return domainReference.domainName();
|
||||
}
|
||||
}
|
||||
record JustAvailability(String domain, DomainAvailabilityRecord record) implements HistoricalAvailabilityData {}
|
||||
record AvailabilityAndSecurity(String domain, DomainAvailabilityRecord availabilityRecord, DomainSecurityRecord securityRecord) implements HistoricalAvailabilityData {}
|
||||
}
|
||||
|
@@ -0,0 +1,6 @@
|
||||
package nu.marginalia.ping.model;
|
||||
|
||||
public sealed interface RootDomainReference {
|
||||
record ById(long id) implements RootDomainReference { }
|
||||
record ByName(String name) implements RootDomainReference { }
|
||||
}
|
@@ -1,8 +1,14 @@
|
||||
package nu.marginalia.ping.model;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.time.Instant;
|
||||
|
||||
public interface WritableModel {
|
||||
void write(Connection connection) throws SQLException;
|
||||
@Nullable
|
||||
default Instant nextUpdateTime() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@@ -71,23 +71,40 @@ public class DomainAvailabilityInformationFactory {
|
||||
@Nullable DomainAvailabilityRecord previousRecord,
|
||||
HttpResponse rsp) {
|
||||
|
||||
Instant lastError = previousRecord != null ? previousRecord.tsLastAvailable() : null;
|
||||
final boolean isAvailable;
|
||||
final Instant now = Instant.now();
|
||||
final Instant lastAvailable;
|
||||
final Instant lastError;
|
||||
final ErrorClassification errorClassification;
|
||||
|
||||
if (rsp.httpStatus() >= 400) {
|
||||
isAvailable = false;
|
||||
lastError = now;
|
||||
lastAvailable = previousRecord != null ? previousRecord.tsLastAvailable() : null;
|
||||
errorClassification = ErrorClassification.HTTP_SERVER_ERROR;
|
||||
} else {
|
||||
isAvailable = true;
|
||||
lastAvailable = now;
|
||||
lastError = previousRecord != null ? previousRecord.tsLastError() : null;
|
||||
errorClassification = ErrorClassification.NONE;
|
||||
}
|
||||
|
||||
return DomainAvailabilityRecord.builder()
|
||||
.domainId(domainId)
|
||||
.nodeId(nodeId)
|
||||
.serverAvailable(true)
|
||||
.serverAvailable(isAvailable)
|
||||
.serverIp(address != null ? address.getAddress() : null)
|
||||
.serverIpAsn(getAsn(address))
|
||||
.httpSchema(HttpSchema.HTTP)
|
||||
.httpStatus(rsp.httpStatus())
|
||||
.errorClassification(errorClassification)
|
||||
.httpResponseTime(rsp.httpResponseTime())
|
||||
.httpEtag(rsp.headers().getFirst("ETag"))
|
||||
.httpLastModified(rsp.headers().getFirst("Last-Modified"))
|
||||
.tsLastPing(Instant.now())
|
||||
.tsLastAvailable(Instant.now())
|
||||
.tsLastPing(now)
|
||||
.tsLastAvailable(lastAvailable)
|
||||
.tsLastError(lastError)
|
||||
.nextScheduledUpdate(Instant.now().plus(backoffStrategy.getOkInterval()))
|
||||
.nextScheduledUpdate(now.plus(backoffStrategy.getOkInterval()))
|
||||
.backoffFetchInterval(backoffStrategy.getOkInterval())
|
||||
.build();
|
||||
|
||||
@@ -117,23 +134,44 @@ public class DomainAvailabilityInformationFactory {
|
||||
updateTime = Instant.now().plus(backoffStrategy.getOkInterval());
|
||||
}
|
||||
|
||||
Instant lastError = previousRecord != null ? previousRecord.tsLastAvailable() : null;
|
||||
final boolean isAvailable;
|
||||
final Instant now = Instant.now();
|
||||
final Instant lastAvailable;
|
||||
final Instant lastError;
|
||||
final ErrorClassification errorClassification;
|
||||
|
||||
if (!validationResult.isValid()) {
|
||||
isAvailable = false;
|
||||
lastError = now;
|
||||
lastAvailable = previousRecord != null ? previousRecord.tsLastAvailable() : null;
|
||||
errorClassification = ErrorClassification.SSL_ERROR;
|
||||
} else if (rsp.httpStatus() >= 400) {
|
||||
isAvailable = false;
|
||||
lastError = now;
|
||||
lastAvailable = previousRecord != null ? previousRecord.tsLastAvailable() : null;
|
||||
errorClassification = ErrorClassification.HTTP_SERVER_ERROR;
|
||||
} else {
|
||||
isAvailable = true;
|
||||
lastAvailable = Instant.now();
|
||||
lastError = previousRecord != null ? previousRecord.tsLastError() : null;
|
||||
errorClassification = ErrorClassification.NONE;
|
||||
}
|
||||
|
||||
return DomainAvailabilityRecord.builder()
|
||||
.domainId(domainId)
|
||||
.nodeId(nodeId)
|
||||
.serverAvailable(validationResult.isValid())
|
||||
.serverAvailable(isAvailable)
|
||||
.serverIp(address != null ? address.getAddress() : null)
|
||||
.serverIpAsn(getAsn(address))
|
||||
.httpSchema(HttpSchema.HTTPS)
|
||||
.httpStatus(rsp.httpStatus())
|
||||
.errorClassification(!validationResult.isValid() ? ErrorClassification.SSL_ERROR : ErrorClassification.NONE)
|
||||
.errorClassification(errorClassification)
|
||||
.httpResponseTime(rsp.httpResponseTime()) // Placeholder, actual timing not implemented
|
||||
.httpEtag(rsp.headers().getFirst("ETag"))
|
||||
.httpLastModified(rsp.headers().getFirst("Last-Modified"))
|
||||
.tsLastPing(Instant.now())
|
||||
.tsLastPing(now)
|
||||
.tsLastError(lastError)
|
||||
.tsLastAvailable(Instant.now())
|
||||
.tsLastAvailable(lastAvailable)
|
||||
.nextScheduledUpdate(updateTime)
|
||||
.backoffFetchInterval(backoffStrategy.getOkInterval())
|
||||
.build();
|
||||
|
@@ -330,86 +330,6 @@ class PingDaoTest {
|
||||
svc.write(event);
|
||||
}
|
||||
|
||||
@Test
|
||||
void getNextDomainPingStatuses() throws SQLException {
|
||||
var svc = new PingDao(dataSource);
|
||||
|
||||
// Create a test domain availability record
|
||||
var record = new DomainAvailabilityRecord(
|
||||
1,
|
||||
1,
|
||||
true,
|
||||
new byte[]{127, 0, 0, 1},
|
||||
40,
|
||||
0x0F00BA32L,
|
||||
0x0F00BA34L,
|
||||
HttpSchema.HTTP,
|
||||
"etag123",
|
||||
"Wed, 21 Oct 2023 07:28:00 GMT",
|
||||
200,
|
||||
"http://example.com/redirect",
|
||||
Duration.ofMillis(150),
|
||||
ErrorClassification.NONE,
|
||||
"No error",
|
||||
Instant.now().minus(30, ChronoUnit.SECONDS),
|
||||
Instant.now().minus(3600, ChronoUnit.SECONDS),
|
||||
Instant.now().minus(7200, ChronoUnit.SECONDS),
|
||||
Instant.now().minus(3000, ChronoUnit.SECONDS),
|
||||
2,
|
||||
Duration.ofSeconds(60)
|
||||
);
|
||||
|
||||
svc.write(record);
|
||||
|
||||
// Fetch the next domain ping statuses
|
||||
var statuses = svc.getNextDomainPingStatuses(10, 1);
|
||||
assertFalse(statuses.isEmpty());
|
||||
assertEquals(1, statuses.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
void getNextDnsDomainRecords() throws SQLException {
|
||||
var svc = new PingDao(dataSource);
|
||||
|
||||
// Create a test DNS record
|
||||
|
||||
var dnsRecord = new DomainDnsRecord(null, "example.com", 2,
|
||||
List.of("test"),
|
||||
List.of("test2"),
|
||||
"test3",
|
||||
List.of("test4"),
|
||||
List.of("test5"),
|
||||
List.of("test6"),
|
||||
List.of("test7"),
|
||||
"test8",
|
||||
Instant.now().minus(3600, ChronoUnit.SECONDS),
|
||||
Instant.now().minus(3600, ChronoUnit.SECONDS),
|
||||
4);
|
||||
|
||||
svc.write(dnsRecord);
|
||||
|
||||
|
||||
var nextRecords = svc.getNextDnsDomainRecords(1, 2);
|
||||
|
||||
assertFalse(nextRecords.isEmpty());
|
||||
assertEquals(1, nextRecords.size());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void getOrphanedDomains(){
|
||||
|
||||
var svc = new PingDao(dataSource);
|
||||
var orphanedDomains = svc.getOrphanedDomains(1);
|
||||
System.out.println(orphanedDomains);
|
||||
assertTrue(orphanedDomains.contains(new DomainReference(1, 1, "www.marginalia.nu")));
|
||||
assertFalse(orphanedDomains.isEmpty());
|
||||
|
||||
var orphanedRootDomains = svc.getOrphanedRootDomains(1);
|
||||
System.out.println(orphanedRootDomains);
|
||||
assertTrue(orphanedRootDomains.contains("marginalia.nu"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void write() {
|
||||
var dnsEvent = new DomainDnsEvent(
|
||||
|
Reference in New Issue
Block a user