mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 07:32:38 +02:00
Compare commits
8 Commits
deploy-025
...
deploy-026
Author | SHA1 | Date | |
---|---|---|---|
|
52ff7fb4dd | ||
|
a4e49e658a | ||
|
e2c56dc3ca | ||
|
470b866008 | ||
|
4895a2ac7a | ||
|
fd32ae9fa7 | ||
|
470651ea4c | ||
|
8d4829e783 |
@@ -0,0 +1,12 @@
|
||||
-- Table holding domains to be processed by the NDP in order to figure out whether to add them to
|
||||
-- be crawled.
|
||||
|
||||
CREATE TABLE IF NOT EXISTS NDP_NEW_DOMAINS(
|
||||
DOMAIN_ID INT NOT NULL PRIMARY KEY,
|
||||
STATE ENUM ('NEW', 'ACCEPTED', 'REJECTED') NOT NULL DEFAULT 'NEW',
|
||||
PRIORITY INT NOT NULL DEFAULT 0,
|
||||
TS_CHANGE TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||||
CHECK_COUNT INT NOT NULL DEFAULT 0
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS NDP_NEW_DOMAINS__STATE_PRIORITY ON NDP_NEW_DOMAINS (STATE, PRIORITY DESC);
|
@@ -20,6 +20,7 @@ dependencies {
|
||||
implementation project(':code:processes:live-crawling-process')
|
||||
implementation project(':code:processes:loading-process')
|
||||
implementation project(':code:processes:ping-process')
|
||||
implementation project(':code:processes:new-domain-process')
|
||||
implementation project(':code:processes:converting-process')
|
||||
implementation project(':code:processes:index-constructor-process')
|
||||
|
||||
@@ -41,7 +42,6 @@ dependencies {
|
||||
implementation project(':code:functions:nsfw-domain-filter')
|
||||
implementation project(':code:execution:api')
|
||||
|
||||
implementation project(':code:processes:crawling-process:model')
|
||||
implementation project(':code:processes:crawling-process:model')
|
||||
implementation project(':code:processes:crawling-process:ft-link-parser')
|
||||
implementation project(':code:index:index-journal')
|
||||
|
@@ -14,6 +14,7 @@ public enum ExecutorActor {
|
||||
PROC_CRAWLER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||
PROC_PING_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.REALTIME),
|
||||
PROC_EXPORT_TASKS_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||
PROC_NDP_SPAWNER(NodeProfile.MIXED, NodeProfile.REALTIME),
|
||||
ADJACENCY_CALCULATION(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||
EXPORT_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||
EXPORT_SEGMENTATION_MODEL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||
|
@@ -49,6 +49,7 @@ public class ExecutorActorControlService {
|
||||
RecrawlSingleDomainActor recrawlSingleDomainActor,
|
||||
RestoreBackupActor restoreBackupActor,
|
||||
ConverterMonitorActor converterMonitorFSM,
|
||||
NdpMonitorActor ndpMonitorActor,
|
||||
PingMonitorActor pingMonitorActor,
|
||||
CrawlerMonitorActor crawlerMonitorActor,
|
||||
LiveCrawlerMonitorActor liveCrawlerMonitorActor,
|
||||
@@ -93,7 +94,7 @@ public class ExecutorActorControlService {
|
||||
register(ExecutorActor.PROC_PING_SPAWNER, pingMonitorActor);
|
||||
register(ExecutorActor.PROC_LIVE_CRAWL_SPAWNER, liveCrawlerMonitorActor);
|
||||
register(ExecutorActor.PROC_EXPORT_TASKS_SPAWNER, exportTasksMonitorActor);
|
||||
|
||||
register(ExecutorActor.PROC_NDP_SPAWNER, ndpMonitorActor);
|
||||
register(ExecutorActor.MONITOR_PROCESS_LIVENESS, processMonitorFSM);
|
||||
register(ExecutorActor.MONITOR_FILE_STORAGE, fileStorageMonitorActor);
|
||||
|
||||
|
@@ -0,0 +1,29 @@
|
||||
package nu.marginalia.actor.proc;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
|
||||
@Singleton
|
||||
public class NdpMonitorActor extends AbstractProcessSpawnerActor {
|
||||
|
||||
@Inject
|
||||
public NdpMonitorActor(Gson gson,
|
||||
ServiceConfiguration configuration,
|
||||
MqPersistence persistence,
|
||||
ProcessService processService) {
|
||||
super(gson,
|
||||
configuration,
|
||||
persistence,
|
||||
processService,
|
||||
ProcessInboxNames.NDP_INBOX,
|
||||
ProcessService.ProcessId.NDP);
|
||||
}
|
||||
|
||||
|
||||
}
|
@@ -8,6 +8,7 @@ import nu.marginalia.crawl.CrawlerMain;
|
||||
import nu.marginalia.index.IndexConstructorMain;
|
||||
import nu.marginalia.livecrawler.LiveCrawlerMain;
|
||||
import nu.marginalia.loading.LoaderMain;
|
||||
import nu.marginalia.ndp.NdpMain;
|
||||
import nu.marginalia.ping.PingMain;
|
||||
import nu.marginalia.service.control.ServiceEventLog;
|
||||
import nu.marginalia.service.server.BaseServiceParams;
|
||||
@@ -57,6 +58,7 @@ public class ProcessService {
|
||||
CONVERTER(ConverterMain.class),
|
||||
LOADER(LoaderMain.class),
|
||||
INDEX_CONSTRUCTOR(IndexConstructorMain.class),
|
||||
NDP(NdpMain.class),
|
||||
EXPORT_TASKS(ExportTasksMain.class),
|
||||
;
|
||||
|
||||
@@ -72,6 +74,7 @@ public class ProcessService {
|
||||
case CONVERTER -> "CONVERTER_PROCESS_OPTS";
|
||||
case LOADER -> "LOADER_PROCESS_OPTS";
|
||||
case PING -> "PING_PROCESS_OPTS";
|
||||
case NDP -> "NDP_PROCESS_OPTS";
|
||||
case INDEX_CONSTRUCTOR -> "INDEX_CONSTRUCTION_PROCESS_OPTS";
|
||||
case EXPORT_TASKS -> "EXPORT_TASKS_PROCESS_OPTS";
|
||||
};
|
||||
|
73
code/processes/new-domain-process/build.gradle
Normal file
73
code/processes/new-domain-process/build.gradle
Normal file
@@ -0,0 +1,73 @@
|
||||
plugins {
|
||||
id 'java'
|
||||
|
||||
id 'application'
|
||||
id 'jvm-test-suite'
|
||||
}
|
||||
|
||||
java {
|
||||
toolchain {
|
||||
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
|
||||
}
|
||||
}
|
||||
|
||||
application {
|
||||
mainClass = 'nu.marginalia.ping.PingMain'
|
||||
applicationName = 'ping-process'
|
||||
}
|
||||
|
||||
tasks.distZip.enabled = false
|
||||
|
||||
apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
|
||||
dependencies {
|
||||
|
||||
implementation project(':code:common:db')
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:config')
|
||||
implementation project(':code:common:service')
|
||||
|
||||
implementation project(':code:libraries:domain-lock')
|
||||
implementation project(':code:libraries:geo-ip')
|
||||
implementation project(':code:libraries:message-queue')
|
||||
implementation project(':code:libraries:blocking-thread-pool')
|
||||
|
||||
implementation project(':code:processes:process-mq-api')
|
||||
implementation project(':code:processes:crawling-process:ft-content-type')
|
||||
implementation project(':code:processes:crawling-process:ft-link-parser')
|
||||
|
||||
|
||||
implementation libs.bundles.slf4j
|
||||
implementation libs.notnull
|
||||
implementation libs.guava
|
||||
|
||||
implementation dependencies.create(libs.guice.get()) {
|
||||
exclude group: 'com.google.guava'
|
||||
}
|
||||
implementation libs.gson
|
||||
implementation libs.zstd
|
||||
implementation libs.bucket4j
|
||||
implementation libs.crawlercommons
|
||||
implementation libs.jsoup
|
||||
implementation libs.fastutil
|
||||
implementation libs.bundles.curator
|
||||
implementation libs.bundles.mariadb
|
||||
implementation libs.bundles.httpcomponents
|
||||
implementation libs.commons.lang3
|
||||
|
||||
testImplementation libs.bundles.slf4j.test
|
||||
testImplementation libs.bundles.junit
|
||||
testImplementation libs.mockito
|
||||
|
||||
testImplementation libs.wiremock
|
||||
|
||||
|
||||
testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4')
|
||||
testImplementation libs.commons.codec
|
||||
testImplementation 'org.testcontainers:mariadb:1.17.4'
|
||||
testImplementation 'org.testcontainers:junit-jupiter:1.17.4'
|
||||
testImplementation project(':code:libraries:test-helpers')
|
||||
|
||||
testImplementation project(':code:processes:test-data')
|
||||
}
|
||||
|
@@ -0,0 +1,142 @@
|
||||
package nu.marginalia.ndp;
|
||||
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import nu.marginalia.WmsaHome;
|
||||
import nu.marginalia.contenttype.ContentType;
|
||||
import nu.marginalia.contenttype.DocumentBodyToString;
|
||||
import nu.marginalia.coordination.DomainCoordinator;
|
||||
import nu.marginalia.link_parser.LinkParser;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.ndp.io.HttpClientProvider;
|
||||
import nu.marginalia.ndp.model.DomainToTest;
|
||||
import org.apache.hc.client5.http.classic.HttpClient;
|
||||
import org.apache.hc.core5.http.ClassicHttpResponse;
|
||||
import org.apache.hc.core5.http.io.entity.EntityUtils;
|
||||
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
|
||||
import org.jsoup.Jsoup;
|
||||
import org.jsoup.nodes.Document;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
public class DomainEvaluator {
|
||||
private final HttpClient client;
|
||||
private final String userAgentString = WmsaHome.getUserAgent().uaString();
|
||||
|
||||
private final LinkParser linkParser = new LinkParser();
|
||||
private final DomainCoordinator domainCoordinator;
|
||||
sealed interface FetchResult permits FetchSuccess, FetchFailure {}
|
||||
record FetchSuccess(Document content) implements FetchResult {}
|
||||
record FetchFailure(String reason) implements FetchResult {}
|
||||
|
||||
@Inject
|
||||
public DomainEvaluator(DomainCoordinator domainCoordinator) throws NoSuchAlgorithmException, KeyManagementException {
|
||||
this.domainCoordinator = domainCoordinator;
|
||||
client = HttpClientProvider.createClient();
|
||||
}
|
||||
|
||||
public boolean evaluateDomain(DomainToTest domain) throws Exception {
|
||||
var edgeDomain = new EdgeDomain(domain.domainName());
|
||||
try (var lock = domainCoordinator.lockDomain(edgeDomain)) {
|
||||
var result = fetch(domain.domainName());
|
||||
|
||||
Instant start = Instant.now();
|
||||
|
||||
var ret = switch(result) {
|
||||
case FetchSuccess(Document content) -> validateHtml(content, edgeDomain);
|
||||
case FetchFailure failure -> false;
|
||||
};
|
||||
|
||||
// Sleep for up to 1 second before we yield the lock to respect rate limits reasonably well
|
||||
Instant end = Instant.now();
|
||||
Duration sleepDuration = Duration.ofSeconds(1).minus(Duration.between(start, end));
|
||||
|
||||
if (sleepDuration.isPositive()) {
|
||||
TimeUnit.MILLISECONDS.sleep(sleepDuration.toMillis());
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean validateHtml(Document content, EdgeDomain domain) {
|
||||
var rootUrl = domain.toRootUrlHttps();
|
||||
var text = content.body().text();
|
||||
|
||||
if (text.length() < 100) {
|
||||
return false; // Too short to be a valid page
|
||||
}
|
||||
|
||||
if (text.contains("404 Not Found") || text.contains("Page not found")) {
|
||||
return false; // Common indicators of a 404 page
|
||||
}
|
||||
|
||||
for (var metaTag : content.select("meta")) {
|
||||
if ("refresh".equalsIgnoreCase(metaTag.attr("http-equiv"))) {
|
||||
return false; // Page has a refresh tag, very likely a parked domain
|
||||
}
|
||||
}
|
||||
|
||||
boolean hasInternalLink = false;
|
||||
|
||||
for (var atag : content.select("a")) {
|
||||
var link = linkParser.parseLink(rootUrl, atag);
|
||||
if (link.isEmpty()) {
|
||||
continue; // Skip invalid links
|
||||
}
|
||||
var edgeUrl = link.get();
|
||||
if (Objects.equals(domain, edgeUrl.getDomain())) {
|
||||
hasInternalLink = true;
|
||||
}
|
||||
}
|
||||
|
||||
return hasInternalLink;
|
||||
}
|
||||
|
||||
private FetchResult fetch(String domain) throws URISyntaxException {
|
||||
var uri = new URI("https://" + domain + "/");
|
||||
|
||||
var request = ClassicRequestBuilder.get(uri)
|
||||
.addHeader("User-Agent", userAgentString)
|
||||
.addHeader("Accept-Encoding", "gzip")
|
||||
.addHeader("Accept", "text/html,application/xhtml+xml;q=0.9")
|
||||
.build();
|
||||
|
||||
try {
|
||||
return client.execute(request, (rsp) -> responseHandler(rsp, domain));
|
||||
} catch (Exception e) {
|
||||
return new FetchFailure("Failed to fetch domain: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private FetchResult responseHandler(ClassicHttpResponse rsp, String domain) {
|
||||
if (rsp.getEntity() == null)
|
||||
return new FetchFailure("No content returned from " + domain);
|
||||
|
||||
try {
|
||||
int code = rsp.getCode();
|
||||
byte[] content = rsp.getEntity().getContent().readAllBytes();
|
||||
|
||||
if (code >= 300) {
|
||||
return new FetchFailure("Received HTTP " + code + " from " + domain);
|
||||
}
|
||||
|
||||
ContentType contentType = ContentType.parse(rsp.getEntity().getContentType());
|
||||
var html = DocumentBodyToString.getStringData(contentType, content);
|
||||
return new FetchSuccess(Jsoup.parse(html));
|
||||
}
|
||||
catch (Exception e) {
|
||||
EntityUtils.consumeQuietly(rsp.getEntity());
|
||||
return new FetchFailure("Failed to read content from " + domain + ": " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,127 @@
|
||||
package nu.marginalia.ndp;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.Set;
|
||||
|
||||
/** DomainAllocator is responsible for assigning domains to partitions/nodes.
|
||||
* This is ensured to make sure that domains are evenly distributed across the nodes.
|
||||
*/
|
||||
public class DomainNodeAllocator {
|
||||
|
||||
private final NodeConfigurationService nodeConfigurationService;
|
||||
private final HikariDataSource dataSource;
|
||||
|
||||
private record NodeCount(int nodeId, int count)
|
||||
implements Comparable<NodeCount>
|
||||
{
|
||||
public NodeCount incrementCount() {
|
||||
return new NodeCount(nodeId, count + 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(@NotNull DomainNodeAllocator.NodeCount o) {
|
||||
return Integer.compare(this.count, o.count);
|
||||
}
|
||||
}
|
||||
|
||||
private final PriorityQueue<NodeCount> countPerNode = new PriorityQueue<>();
|
||||
volatile boolean initialized = false;
|
||||
|
||||
@Inject
|
||||
public DomainNodeAllocator(NodeConfigurationService nodeConfigurationService, HikariDataSource dataSource) {
|
||||
this.nodeConfigurationService = nodeConfigurationService;
|
||||
this.dataSource = dataSource;
|
||||
|
||||
Thread.ofPlatform()
|
||||
.name("DomainNodeAllocator::initialize()")
|
||||
.start(this::initialize);
|
||||
}
|
||||
|
||||
|
||||
public void initialize() {
|
||||
if (initialized) return;
|
||||
|
||||
Set<Integer> viableNodes = new HashSet<>();
|
||||
|
||||
// Find all viable nodes that can handle batch crawls
|
||||
for (var node : nodeConfigurationService.getAll()) {
|
||||
if (node.disabled())
|
||||
continue;
|
||||
if (node.profile().permitBatchCrawl())
|
||||
viableNodes.add(node.node());
|
||||
}
|
||||
|
||||
// Fetch the current counts of domains per node from the database
|
||||
try (var conn = dataSource.getConnection();
|
||||
var stmt = conn.prepareStatement("""
|
||||
SELECT COUNT(*) AS CNT, NODE_AFFINITY
|
||||
FROM EC_DOMAIN
|
||||
WHERE NODE_AFFINITY>0
|
||||
GROUP BY NODE_AFFINITY
|
||||
"""))
|
||||
{
|
||||
|
||||
var rs = stmt.executeQuery();
|
||||
while (rs.next()) {
|
||||
|
||||
int nodeId = rs.getInt("NODE_AFFINITY");
|
||||
int count = rs.getInt("CNT");
|
||||
|
||||
if (viableNodes.remove(nodeId)) {
|
||||
countPerNode.add(new NodeCount(nodeId, count));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to load domain counts from database", e);
|
||||
}
|
||||
|
||||
// Add any remaining viable nodes that were not found in the database
|
||||
for (int nodeId : viableNodes) {
|
||||
countPerNode.add(new NodeCount(nodeId, 0));
|
||||
}
|
||||
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
private void ensureInitialized() {
|
||||
if (initialized) return;
|
||||
|
||||
synchronized (this) {
|
||||
while (!initialized) {
|
||||
try {
|
||||
// Wait until the initialization is complete
|
||||
this.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException("DomainAllocator initialization interrupted", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized int totalCount() {
|
||||
ensureInitialized();
|
||||
return countPerNode.stream().mapToInt(NodeCount::count).sum();
|
||||
}
|
||||
|
||||
/** Returns the next node ID to assign a domain to.
|
||||
* This method is synchronized to ensure thread safety when multiple threads are allocating domains.
|
||||
* The node ID returned is guaranteed to be one of the viable nodes configured in the system.
|
||||
*/
|
||||
public synchronized int nextNodeId() {
|
||||
ensureInitialized();
|
||||
|
||||
// Synchronized is fine here as this is not a hot path
|
||||
// (and PriorityBlockingQueue won't help since we're re-adding the same element with a new count all the time)
|
||||
|
||||
NodeCount allocation = countPerNode.remove();
|
||||
countPerNode.add(allocation.incrementCount());
|
||||
return allocation.nodeId();
|
||||
}
|
||||
}
|
@@ -0,0 +1,148 @@
|
||||
package nu.marginalia.ndp;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import nu.marginalia.ndp.model.DomainToTest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class DomainTestingQueue {
|
||||
private final ArrayBlockingQueue<DomainToTest> queue = new ArrayBlockingQueue<>(1000);
|
||||
|
||||
// This will grow quite large, but should be manageable in memory, as theoretical maximum is around 100M domains,
|
||||
// order of 2 GB in memory.
|
||||
private final ConcurrentHashMap<String, Boolean> takenDomains = new ConcurrentHashMap<>();
|
||||
|
||||
private final HikariDataSource dataSource;
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(DomainTestingQueue.class);
|
||||
|
||||
@Inject
|
||||
public DomainTestingQueue(HikariDataSource dataSource) {
|
||||
this.dataSource = dataSource;
|
||||
|
||||
Thread.ofPlatform()
|
||||
.name("DomainTestingQueue::fetch()")
|
||||
.start(this::fetch);
|
||||
}
|
||||
|
||||
public DomainToTest next() throws InterruptedException {
|
||||
return queue.take();
|
||||
}
|
||||
|
||||
public void accept(DomainToTest domain, int nodeId) {
|
||||
try (var conn = dataSource.getConnection();
|
||||
var flagOkStmt = conn.prepareStatement("""
|
||||
UPDATE NDP_NEW_DOMAINS
|
||||
SET STATE='ACCEPTED'
|
||||
WHERE DOMAIN_ID=?
|
||||
""");
|
||||
var assigNodeStmt = conn.prepareStatement("""
|
||||
UPDATE EC_DOMAIN SET NODE_AFFINITY=?
|
||||
WHERE ID=?
|
||||
""")
|
||||
)
|
||||
{
|
||||
conn.setAutoCommit(false);
|
||||
flagOkStmt.setInt(1, domain.domainId());
|
||||
flagOkStmt.executeUpdate();
|
||||
|
||||
assigNodeStmt.setInt(1, nodeId);
|
||||
assigNodeStmt.setInt(2, domain.domainId());
|
||||
assigNodeStmt.executeUpdate();
|
||||
conn.commit();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to accept domain in database", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void reject(DomainToTest domain) {
|
||||
try (var conn = dataSource.getConnection();
|
||||
var stmt = conn.prepareStatement("""
|
||||
UPDATE NDP_NEW_DOMAINS
|
||||
SET STATE='REJECTED', CHECK_COUNT=CHECK_COUNT + 1
|
||||
WHERE DOMAIN_ID=?
|
||||
"""))
|
||||
{
|
||||
conn.setAutoCommit(false);
|
||||
stmt.setInt(1, domain.domainId());
|
||||
stmt.executeUpdate();
|
||||
conn.commit();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to reject domain in database", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void fetch() {
|
||||
while (true) {
|
||||
List<DomainToTest> domains = new ArrayList<>(2000);
|
||||
try (var conn = dataSource.getConnection();
|
||||
var stmt = conn.prepareStatement("""
|
||||
SELECT DOMAIN_ID, DOMAIN_NAME
|
||||
FROM NDP_NEW_DOMAINS
|
||||
INNER JOIN EC_DOMAIN ON ID=DOMAIN_ID
|
||||
WHERE NDP_NEW_DOMAINS.STATE = 'NEW'
|
||||
ORDER BY PRIORITY DESC
|
||||
LIMIT 2000
|
||||
"""))
|
||||
{
|
||||
var rs = stmt.executeQuery();
|
||||
|
||||
while (rs.next()) {
|
||||
int domainId = rs.getInt("DOMAIN_ID");
|
||||
String domainName = rs.getString("DOMAIN_NAME");
|
||||
if (takenDomains.put(domainName, true) != null) {
|
||||
logger.warn("Domain {} is already processed, skipping", domainName);
|
||||
continue; // Skip if already taken
|
||||
}
|
||||
domains.add(new DomainToTest(domainName, domainId));
|
||||
}
|
||||
|
||||
if (domains.isEmpty()) {
|
||||
refreshQueue(conn);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException("Failed to fetch domains from database", e);
|
||||
}
|
||||
|
||||
try {
|
||||
for (var domain : domains) {
|
||||
queue.put(domain);
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException("Domain fetching interrupted", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void refreshQueue(Connection conn) {
|
||||
logger.info("Refreshing domain queue in database");
|
||||
try (var stmt = conn.createStatement()) {
|
||||
conn.setAutoCommit(false);
|
||||
logger.info("Revitalizing rejected domains");
|
||||
|
||||
// Revitalize rejected domains
|
||||
stmt.executeUpdate("""
|
||||
UPDATE NDP_NEW_DOMAINS
|
||||
SET STATE='NEW'
|
||||
WHERE NDP_NEW_DOMAINS.STATE = 'REJECTED'
|
||||
AND DATE_ADD(TS_CHANGE, INTERVAL CHECK_COUNT DAY) > NOW()
|
||||
""");
|
||||
conn.commit();
|
||||
|
||||
logger.info("Queue refreshed successfully");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to refresh queue in database", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,162 @@
|
||||
package nu.marginalia.ndp;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
import nu.marginalia.WmsaHome;
|
||||
import nu.marginalia.coordination.DomainCoordinationModule;
|
||||
import nu.marginalia.db.DomainBlacklist;
|
||||
import nu.marginalia.geoip.GeoIpDictionary;
|
||||
import nu.marginalia.mq.MessageQueueFactory;
|
||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||
import nu.marginalia.mqapi.ndp.NdpRequest;
|
||||
import nu.marginalia.ndp.model.DomainToTest;
|
||||
import nu.marginalia.process.ProcessConfiguration;
|
||||
import nu.marginalia.process.ProcessConfigurationModule;
|
||||
import nu.marginalia.process.ProcessMainClass;
|
||||
import nu.marginalia.process.control.ProcessHeartbeat;
|
||||
import nu.marginalia.service.module.DatabaseModule;
|
||||
import nu.marginalia.service.module.ServiceDiscoveryModule;
|
||||
import nu.marginalia.util.SimpleBlockingThreadPool;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.security.Security;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class NdpMain extends ProcessMainClass {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(NdpMain.class);
|
||||
private final DomainNodeAllocator domainNodeAllocator;
|
||||
private final DomainTestingQueue domainTestingQueue;
|
||||
private final ProcessHeartbeat processHeartbeat;
|
||||
private final DomainEvaluator domainEvaluator;
|
||||
private final DomainBlacklist domainBlacklist;
|
||||
|
||||
private final AtomicInteger domainCount = new AtomicInteger(0);
|
||||
|
||||
@Inject
|
||||
public NdpMain(MessageQueueFactory messageQueueFactory,
|
||||
ProcessConfiguration config,
|
||||
DomainNodeAllocator domainNodeAllocator,
|
||||
DomainTestingQueue domainTestingQueue,
|
||||
DomainEvaluator domainEvaluator,
|
||||
DomainBlacklist domainBlacklist,
|
||||
ProcessHeartbeat processHeartbeat,
|
||||
Gson gson)
|
||||
{
|
||||
super(messageQueueFactory, config, gson, ProcessInboxNames.NDP_INBOX);
|
||||
|
||||
this.domainNodeAllocator = domainNodeAllocator;
|
||||
this.domainEvaluator = domainEvaluator;
|
||||
this.domainBlacklist = domainBlacklist;
|
||||
this.domainTestingQueue = domainTestingQueue;
|
||||
this.processHeartbeat = processHeartbeat;
|
||||
}
|
||||
|
||||
|
||||
public void run(int goalCount) throws InterruptedException {
|
||||
logger.info("Wait for blacklist to load...");
|
||||
domainBlacklist.waitUntilLoaded();
|
||||
|
||||
SimpleBlockingThreadPool threadPool = new SimpleBlockingThreadPool(
|
||||
"NDP-Worker",
|
||||
8,
|
||||
10,
|
||||
SimpleBlockingThreadPool.ThreadType.PLATFORM
|
||||
);
|
||||
|
||||
logger.info("Starting NDP process");
|
||||
|
||||
int toInsertCount = goalCount - domainNodeAllocator.totalCount();
|
||||
|
||||
if (toInsertCount <= 0) {
|
||||
logger.info("No new domains to process. Current count: " + domainNodeAllocator.totalCount());
|
||||
return;
|
||||
}
|
||||
|
||||
try (var hb = processHeartbeat.createAdHocTaskHeartbeat("Growing Index")) {
|
||||
int cnt;
|
||||
while ((cnt = domainCount.get()) < toInsertCount) {
|
||||
if (cnt % 100 == 0) {
|
||||
hb.progress("Discovered Domains", domainCount.get(), cnt);
|
||||
}
|
||||
|
||||
var nextDomain = domainTestingQueue.next();
|
||||
threadPool.submit(() -> evaluateDomain(nextDomain));
|
||||
}
|
||||
}
|
||||
|
||||
threadPool.shutDown();
|
||||
// Wait for all tasks to complete or give up after 1 hour
|
||||
threadPool.awaitTermination(1, TimeUnit.HOURS);
|
||||
|
||||
logger.info("NDP process completed. Total domains processed: " + domainCount.get());
|
||||
|
||||
}
|
||||
|
||||
|
||||
private void evaluateDomain(DomainToTest nextDomain) {
|
||||
try {
|
||||
if (domainEvaluator.evaluateDomain(nextDomain)) {
|
||||
logger.info("Accepting: {}", nextDomain.domainName());
|
||||
domainCount.incrementAndGet();
|
||||
domainTestingQueue.accept(nextDomain, domainNodeAllocator.nextNodeId());
|
||||
} else {
|
||||
logger.info("Rejecting: {}", nextDomain.domainName());
|
||||
domainTestingQueue.reject(nextDomain);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
domainTestingQueue.reject(nextDomain);
|
||||
logger.error("Error evaluating domain: " + nextDomain.domainId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
// Prevent Java from caching DNS lookups forever (filling up the system RAM as a result)
|
||||
Security.setProperty("networkaddress.cache.ttl" , "3600");
|
||||
|
||||
// This must run *early*
|
||||
System.setProperty("http.agent", WmsaHome.getUserAgent().uaString());
|
||||
|
||||
// If these aren't set properly, the JVM will hang forever on some requests
|
||||
System.setProperty("sun.net.client.defaultConnectTimeout", "30000");
|
||||
System.setProperty("sun.net.client.defaultReadTimeout", "30000");
|
||||
|
||||
// Set the maximum number of connections to keep alive in the connection pool
|
||||
System.setProperty("jdk.httpclient.idleTimeout", "15"); // 15 seconds
|
||||
System.setProperty("jdk.httpclient.connectionPoolSize", "256");
|
||||
|
||||
// We don't want to use too much memory caching sessions for https
|
||||
System.setProperty("javax.net.ssl.sessionCacheSize", "2048");
|
||||
|
||||
|
||||
Injector injector = Guice.createInjector(
|
||||
new NdpModule(),
|
||||
new ServiceDiscoveryModule(),
|
||||
new DomainCoordinationModule(),
|
||||
new ProcessConfigurationModule("ndp"),
|
||||
new DatabaseModule(false)
|
||||
);
|
||||
|
||||
GeoIpDictionary geoIpDictionary = injector.getInstance(GeoIpDictionary.class);
|
||||
|
||||
geoIpDictionary.waitReady(); // Ensure the GeoIpDictionary is ready before proceeding
|
||||
|
||||
NdpMain main = injector.getInstance(NdpMain.class);
|
||||
|
||||
var instructions = main.fetchInstructions(NdpRequest.class);
|
||||
|
||||
try {
|
||||
main.run(instructions.value().goal());
|
||||
instructions.ok();
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
logger.error("Error running ping process", ex);
|
||||
instructions.err();
|
||||
}
|
||||
}
|
||||
}
|
@@ -0,0 +1,8 @@
|
||||
package nu.marginalia.ndp;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
|
||||
public class NdpModule extends AbstractModule {
|
||||
public void configure() {
|
||||
}
|
||||
}
|
@@ -0,0 +1,126 @@
|
||||
package nu.marginalia.ndp.io;
|
||||
|
||||
import com.google.inject.Provider;
|
||||
import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
|
||||
import org.apache.hc.client5.http.classic.HttpClient;
|
||||
import org.apache.hc.client5.http.config.ConnectionConfig;
|
||||
import org.apache.hc.client5.http.config.RequestConfig;
|
||||
import org.apache.hc.client5.http.cookie.StandardCookieSpec;
|
||||
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
||||
import org.apache.hc.client5.http.impl.classic.HttpClients;
|
||||
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
|
||||
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
|
||||
import org.apache.hc.core5.http.HeaderElement;
|
||||
import org.apache.hc.core5.http.HeaderElements;
|
||||
import org.apache.hc.core5.http.HttpResponse;
|
||||
import org.apache.hc.core5.http.io.SocketConfig;
|
||||
import org.apache.hc.core5.http.message.MessageSupport;
|
||||
import org.apache.hc.core5.http.protocol.HttpContext;
|
||||
import org.apache.hc.core5.util.TimeValue;
|
||||
import org.apache.hc.core5.util.Timeout;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class HttpClientProvider implements Provider<HttpClient> {
|
||||
private static final HttpClient client;
|
||||
private static PoolingHttpClientConnectionManager connectionManager;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(HttpClientProvider.class);
|
||||
|
||||
static {
|
||||
try {
|
||||
client = createClient();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static CloseableHttpClient createClient() throws NoSuchAlgorithmException, KeyManagementException {
|
||||
final ConnectionConfig connectionConfig = ConnectionConfig.custom()
|
||||
.setSocketTimeout(15, TimeUnit.SECONDS)
|
||||
.setConnectTimeout(15, TimeUnit.SECONDS)
|
||||
.setValidateAfterInactivity(TimeValue.ofSeconds(5))
|
||||
.build();
|
||||
|
||||
|
||||
connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
|
||||
.setMaxConnPerRoute(2)
|
||||
.setMaxConnTotal(50)
|
||||
.setDefaultConnectionConfig(connectionConfig)
|
||||
.build();
|
||||
|
||||
connectionManager.setDefaultSocketConfig(SocketConfig.custom()
|
||||
.setSoLinger(TimeValue.ofSeconds(-1))
|
||||
.setSoTimeout(Timeout.ofSeconds(10))
|
||||
.build()
|
||||
);
|
||||
|
||||
Thread.ofPlatform().daemon(true).start(() -> {
|
||||
try {
|
||||
for (;;) {
|
||||
TimeUnit.SECONDS.sleep(15);
|
||||
logger.info("Connection pool stats: {}", connectionManager.getTotalStats());
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
});
|
||||
|
||||
final RequestConfig defaultRequestConfig = RequestConfig.custom()
|
||||
.setCookieSpec(StandardCookieSpec.IGNORE)
|
||||
.setResponseTimeout(10, TimeUnit.SECONDS)
|
||||
.setConnectionRequestTimeout(5, TimeUnit.MINUTES)
|
||||
.build();
|
||||
|
||||
return HttpClients.custom()
|
||||
.setConnectionManager(connectionManager)
|
||||
.setRetryStrategy(new RetryStrategy())
|
||||
.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
|
||||
// Default keep-alive duration is 3 minutes, but this is too long for us,
|
||||
// as we are either going to re-use it fairly quickly or close it for a long time.
|
||||
//
|
||||
// So we set it to 30 seconds or clamp the server-provided value to a minimum of 10 seconds.
|
||||
private static final TimeValue defaultValue = TimeValue.ofSeconds(30);
|
||||
|
||||
@Override
|
||||
public TimeValue getKeepAliveDuration(HttpResponse response, HttpContext context) {
|
||||
final Iterator<HeaderElement> it = MessageSupport.iterate(response, HeaderElements.KEEP_ALIVE);
|
||||
|
||||
while (it.hasNext()) {
|
||||
final HeaderElement he = it.next();
|
||||
final String param = he.getName();
|
||||
final String value = he.getValue();
|
||||
|
||||
if (value == null)
|
||||
continue;
|
||||
if (!"timeout".equalsIgnoreCase(param))
|
||||
continue;
|
||||
|
||||
try {
|
||||
long timeout = Long.parseLong(value);
|
||||
timeout = Math.clamp(timeout, 30, defaultValue.toSeconds());
|
||||
return TimeValue.ofSeconds(timeout);
|
||||
} catch (final NumberFormatException ignore) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return defaultValue;
|
||||
}
|
||||
})
|
||||
.disableRedirectHandling()
|
||||
.setDefaultRequestConfig(defaultRequestConfig)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpClient get() {
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
@@ -0,0 +1,79 @@
|
||||
package nu.marginalia.ndp.io;
|
||||
|
||||
import org.apache.hc.client5.http.HttpHostConnectException;
|
||||
import org.apache.hc.client5.http.HttpRequestRetryStrategy;
|
||||
import org.apache.hc.core5.http.HttpRequest;
|
||||
import org.apache.hc.core5.http.HttpResponse;
|
||||
import org.apache.hc.core5.http.protocol.HttpContext;
|
||||
import org.apache.hc.core5.util.TimeValue;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.net.ssl.SSLException;
|
||||
import java.io.IOException;
|
||||
import java.net.SocketException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
public class RetryStrategy implements HttpRequestRetryStrategy {
|
||||
private static final Logger logger = LoggerFactory.getLogger(RetryStrategy.class);
|
||||
|
||||
@Override
|
||||
public boolean retryRequest(HttpRequest request, IOException exception, int executionCount, HttpContext context) {
|
||||
return switch (exception) {
|
||||
case SocketTimeoutException ste -> false;
|
||||
case SSLException ssle -> false;
|
||||
case UnknownHostException uhe -> false;
|
||||
case HttpHostConnectException ex -> executionCount < 2;
|
||||
case SocketException ex -> executionCount < 2;
|
||||
default -> executionCount <= 3;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean retryRequest(HttpResponse response, int executionCount, HttpContext context) {
|
||||
return switch (response.getCode()) {
|
||||
case 500, 503 -> executionCount <= 2;
|
||||
case 429 -> executionCount <= 3;
|
||||
default -> false;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue getRetryInterval(HttpRequest request, IOException exception, int executionCount, HttpContext context) {
|
||||
return TimeValue.ofSeconds(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue getRetryInterval(HttpResponse response, int executionCount, HttpContext context) {
|
||||
|
||||
int statusCode = response.getCode();
|
||||
|
||||
// Give 503 a bit more time
|
||||
if (statusCode == 503) return TimeValue.ofSeconds(5);
|
||||
|
||||
if (statusCode == 429) {
|
||||
// get the Retry-After header
|
||||
var retryAfterHeader = response.getFirstHeader("Retry-After");
|
||||
if (retryAfterHeader == null) {
|
||||
return TimeValue.ofSeconds(3);
|
||||
}
|
||||
|
||||
String retryAfter = retryAfterHeader.getValue();
|
||||
if (retryAfter == null) {
|
||||
return TimeValue.ofSeconds(2);
|
||||
}
|
||||
|
||||
try {
|
||||
int retryAfterTime = Integer.parseInt(retryAfter);
|
||||
retryAfterTime = Math.clamp(retryAfterTime, 1, 5);
|
||||
|
||||
return TimeValue.ofSeconds(retryAfterTime);
|
||||
} catch (NumberFormatException e) {
|
||||
logger.warn("Invalid Retry-After header: {}", retryAfter);
|
||||
}
|
||||
}
|
||||
|
||||
return TimeValue.ofSeconds(2);
|
||||
}
|
||||
}
|
@@ -0,0 +1,4 @@
|
||||
package nu.marginalia.ndp.model;
|
||||
|
||||
public record DomainToTest(String domainName, int domainId) {
|
||||
}
|
12
code/processes/ping-process/README.md
Normal file
12
code/processes/ping-process/README.md
Normal file
@@ -0,0 +1,12 @@
|
||||
The ping process (which has nothing to do with ICMP ping) keeps track of
|
||||
the aliveness of websites. It also gathers fingerprint information about
|
||||
the security posture of the website, as well as DNS information.
|
||||
|
||||
This is kept to build an idea of when a website is down, and to identify
|
||||
ownership changes, as well as other significant events in the lifecycle
|
||||
of a website.
|
||||
|
||||
# Central Classes
|
||||
|
||||
* [PingMain](java/nu/marginalia/ping/PingMain.java) main class.
|
||||
* [PingJobScheduler](java/nu/marginalia/ping/PingJobScheduler.java) service that dispatches pings.
|
@@ -112,7 +112,7 @@ public class HttpClientProvider implements Provider<HttpClient> {
|
||||
});
|
||||
|
||||
final RequestConfig defaultRequestConfig = RequestConfig.custom()
|
||||
.setCookieSpec(StandardCookieSpec.RELAXED)
|
||||
.setCookieSpec(StandardCookieSpec.IGNORE)
|
||||
.setResponseTimeout(10, TimeUnit.SECONDS)
|
||||
.setConnectionRequestTimeout(5, TimeUnit.MINUTES)
|
||||
.build();
|
||||
|
@@ -4,6 +4,7 @@ public class ProcessInboxNames {
|
||||
public static final String CONVERTER_INBOX = "converter";
|
||||
public static final String LOADER_INBOX = "loader";
|
||||
public static final String PING_INBOX = "ping";
|
||||
public static final String NDP_INBOX = "ndp";
|
||||
public static final String CRAWLER_INBOX = "crawler";
|
||||
public static final String LIVE_CRAWLER_INBOX = "live-crawler";
|
||||
|
||||
|
@@ -0,0 +1,4 @@
|
||||
package nu.marginalia.mqapi.ndp;
|
||||
|
||||
public record NdpRequest(int goal) {
|
||||
}
|
@@ -25,6 +25,11 @@ into the [MariaDB database](../common/db).
|
||||
The [index-construction-process](index-constructor-process/) constructs indices from
|
||||
the data generated by the loader.
|
||||
|
||||
## 5. Other Processes
|
||||
|
||||
* Ping Process: The [ping-process](ping-process/) keeps track of the aliveness of websites, gathering fingerprint information about the security posture of the website, as well as DNS information.
|
||||
* Live-Crawling Process: The [live-crawling-process](live-crawling-process/) is a process that crawls websites in real-time based on RSS feeds, updating a smaller index with the latest content.
|
||||
|
||||
## Overview
|
||||
|
||||
Schematically the crawling and loading process looks like this:
|
||||
|
@@ -22,6 +22,7 @@ import nu.marginalia.search.model.NavbarModel;
|
||||
import nu.marginalia.search.model.ResultsPage;
|
||||
import nu.marginalia.search.model.UrlDetails;
|
||||
import nu.marginalia.search.svc.SearchFlagSiteService.FlagSiteFormData;
|
||||
import nu.marginalia.service.server.RateLimiter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -47,6 +48,8 @@ public class SearchSiteInfoService {
|
||||
private final HikariDataSource dataSource;
|
||||
private final SearchSiteSubscriptionService searchSiteSubscriptions;
|
||||
|
||||
private final RateLimiter rateLimiter = RateLimiter.custom(60);
|
||||
|
||||
@Inject
|
||||
public SearchSiteInfoService(SearchOperator searchOperator,
|
||||
DomainInfoClient domainInfoClient,
|
||||
@@ -238,6 +241,7 @@ public class SearchSiteInfoService {
|
||||
boolean hasScreenshot = screenshotService.hasScreenshot(domainId);
|
||||
boolean isSubscribed = searchSiteSubscriptions.isSubscribed(context, domain);
|
||||
|
||||
boolean rateLimited = !rateLimiter.isAllowed();
|
||||
if (domainId < 0) {
|
||||
domainInfoFuture = CompletableFuture.failedFuture(new Exception("Unknown Domain ID"));
|
||||
similarSetFuture = CompletableFuture.failedFuture(new Exception("Unknown Domain ID"));
|
||||
@@ -250,6 +254,12 @@ public class SearchSiteInfoService {
|
||||
linkingDomainsFuture = CompletableFuture.failedFuture(new Exception("Assistant Service Unavailable"));
|
||||
feedItemsFuture = CompletableFuture.failedFuture(new Exception("Assistant Service Unavailable"));
|
||||
}
|
||||
else if (rateLimited) {
|
||||
domainInfoFuture = domainInfoClient.domainInformation(domainId);
|
||||
similarSetFuture = CompletableFuture.failedFuture(new Exception("Rate limit exceeded"));
|
||||
linkingDomainsFuture = CompletableFuture.failedFuture(new Exception("Rate limit exceeded"));
|
||||
feedItemsFuture = CompletableFuture.failedFuture(new Exception("Rate limit exceeded"));
|
||||
}
|
||||
else {
|
||||
domainInfoFuture = domainInfoClient.domainInformation(domainId);
|
||||
similarSetFuture = domainInfoClient.similarDomains(domainId, 25);
|
||||
@@ -257,7 +267,14 @@ public class SearchSiteInfoService {
|
||||
feedItemsFuture = feedsClient.getFeed(domainId);
|
||||
}
|
||||
|
||||
List<UrlDetails> sampleResults = searchOperator.doSiteSearch(domainName, domainId,5, 1).results;
|
||||
List<UrlDetails> sampleResults;
|
||||
if (rateLimited) {
|
||||
sampleResults = List.of();
|
||||
}
|
||||
else {
|
||||
sampleResults = searchOperator.doSiteSearch(domainName, domainId, 5, 1).results;
|
||||
}
|
||||
|
||||
if (!sampleResults.isEmpty()) {
|
||||
url = sampleResults.getFirst().url.withPathAndParam("/", null).toString();
|
||||
}
|
||||
@@ -276,8 +293,9 @@ public class SearchSiteInfoService {
|
||||
sampleResults
|
||||
);
|
||||
|
||||
requestMissingScreenshots(result);
|
||||
|
||||
if (!rateLimited) {
|
||||
requestMissingScreenshots(result);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@@ -0,0 +1,11 @@
|
||||
@param String message
|
||||
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head><meta charset="UTF-8">
|
||||
<title>Unavailable</title></head>
|
||||
<body>
|
||||
<h1>Service Overloaded</h1>
|
||||
<p>${message}</p>
|
||||
</body>
|
||||
</html>
|
@@ -69,6 +69,7 @@ include 'code:processes:crawling-process:ft-link-parser'
|
||||
include 'code:processes:crawling-process:ft-content-type'
|
||||
include 'code:processes:live-crawling-process'
|
||||
include 'code:processes:ping-process'
|
||||
include 'code:processes:new-domain-process'
|
||||
|
||||
include 'code:processes:process-mq-api'
|
||||
|
||||
|
Reference in New Issue
Block a user