mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-05 21:22:39 +02:00
Compare commits
5 Commits
deploy-026
...
deploy-026
Author | SHA1 | Date | |
---|---|---|---|
|
24ab8398bb | ||
|
d2ceeff4cf | ||
|
cf64214b1c | ||
|
e50d09cc01 | ||
|
bce3892ce0 |
@@ -45,7 +45,7 @@ public class NodeConfigurationService {
|
||||
public List<NodeConfiguration> getAll() {
|
||||
try (var conn = dataSource.getConnection();
|
||||
var qs = conn.prepareStatement("""
|
||||
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, KEEP_WARCS, NODE_PROFILE, DISABLED
|
||||
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, AUTO_ASSIGN_DOMAINS, KEEP_WARCS, NODE_PROFILE, DISABLED
|
||||
FROM NODE_CONFIGURATION
|
||||
""")) {
|
||||
var rs = qs.executeQuery();
|
||||
@@ -59,6 +59,7 @@ public class NodeConfigurationService {
|
||||
rs.getBoolean("ACCEPT_QUERIES"),
|
||||
rs.getBoolean("AUTO_CLEAN"),
|
||||
rs.getBoolean("PRECESSION"),
|
||||
rs.getBoolean("AUTO_ASSIGN_DOMAINS"),
|
||||
rs.getBoolean("KEEP_WARCS"),
|
||||
NodeProfile.valueOf(rs.getString("NODE_PROFILE")),
|
||||
rs.getBoolean("DISABLED")
|
||||
@@ -75,7 +76,7 @@ public class NodeConfigurationService {
|
||||
public NodeConfiguration get(int nodeId) throws SQLException {
|
||||
try (var conn = dataSource.getConnection();
|
||||
var qs = conn.prepareStatement("""
|
||||
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, KEEP_WARCS, NODE_PROFILE, DISABLED
|
||||
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, AUTO_ASSIGN_DOMAINS, KEEP_WARCS, NODE_PROFILE, DISABLED
|
||||
FROM NODE_CONFIGURATION
|
||||
WHERE ID=?
|
||||
""")) {
|
||||
@@ -88,6 +89,7 @@ public class NodeConfigurationService {
|
||||
rs.getBoolean("ACCEPT_QUERIES"),
|
||||
rs.getBoolean("AUTO_CLEAN"),
|
||||
rs.getBoolean("PRECESSION"),
|
||||
rs.getBoolean("AUTO_ASSIGN_DOMAINS"),
|
||||
rs.getBoolean("KEEP_WARCS"),
|
||||
NodeProfile.valueOf(rs.getString("NODE_PROFILE")),
|
||||
rs.getBoolean("DISABLED")
|
||||
@@ -102,7 +104,7 @@ public class NodeConfigurationService {
|
||||
try (var conn = dataSource.getConnection();
|
||||
var us = conn.prepareStatement("""
|
||||
UPDATE NODE_CONFIGURATION
|
||||
SET DESCRIPTION=?, ACCEPT_QUERIES=?, AUTO_CLEAN=?, PRECESSION=?, KEEP_WARCS=?, DISABLED=?, NODE_PROFILE=?
|
||||
SET DESCRIPTION=?, ACCEPT_QUERIES=?, AUTO_CLEAN=?, PRECESSION=?, AUTO_ASSIGN_DOMAINS=?, KEEP_WARCS=?, DISABLED=?, NODE_PROFILE=?
|
||||
WHERE ID=?
|
||||
"""))
|
||||
{
|
||||
@@ -110,10 +112,11 @@ public class NodeConfigurationService {
|
||||
us.setBoolean(2, config.acceptQueries());
|
||||
us.setBoolean(3, config.autoClean());
|
||||
us.setBoolean(4, config.includeInPrecession());
|
||||
us.setBoolean(5, config.keepWarcs());
|
||||
us.setBoolean(6, config.disabled());
|
||||
us.setString(7, config.profile().name());
|
||||
us.setInt(8, config.node());
|
||||
us.setBoolean(5, config.autoAssignDomains());
|
||||
us.setBoolean(6, config.keepWarcs());
|
||||
us.setBoolean(7, config.disabled());
|
||||
us.setString(8, config.profile().name());
|
||||
us.setInt(9, config.node());
|
||||
|
||||
if (us.executeUpdate() <= 0)
|
||||
throw new IllegalStateException("Failed to update configuration");
|
||||
|
@@ -5,6 +5,7 @@ public record NodeConfiguration(int node,
|
||||
boolean acceptQueries,
|
||||
boolean autoClean,
|
||||
boolean includeInPrecession,
|
||||
boolean autoAssignDomains,
|
||||
boolean keepWarcs,
|
||||
NodeProfile profile,
|
||||
boolean disabled
|
||||
|
@@ -2,6 +2,7 @@ package nu.marginalia.nodecfg;
|
||||
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import nu.marginalia.nodecfg.model.NodeConfiguration;
|
||||
import nu.marginalia.nodecfg.model.NodeProfile;
|
||||
import nu.marginalia.test.TestMigrationLoader;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
@@ -62,6 +63,63 @@ public class NodeConfigurationServiceTest {
|
||||
assertEquals(2, list.size());
|
||||
assertEquals(a, list.get(0));
|
||||
assertEquals(b, list.get(1));
|
||||
}
|
||||
|
||||
|
||||
// Test all the fields that are only exposed via save()
|
||||
@Test
|
||||
public void testSaveChanges() throws SQLException {
|
||||
var original = nodeConfigurationService.create(1, "Test", false, false, NodeProfile.MIXED);
|
||||
|
||||
assertEquals(1, original.node());
|
||||
assertEquals("Test", original.description());
|
||||
assertFalse(original.acceptQueries());
|
||||
|
||||
var precession = new NodeConfiguration(
|
||||
original.node(),
|
||||
"Foo",
|
||||
true,
|
||||
original.autoClean(),
|
||||
original.includeInPrecession(),
|
||||
!original.autoAssignDomains(),
|
||||
original.keepWarcs(),
|
||||
original.profile(),
|
||||
original.disabled()
|
||||
);
|
||||
|
||||
nodeConfigurationService.save(precession);
|
||||
precession = nodeConfigurationService.get(original.node());
|
||||
assertNotEquals(original.autoAssignDomains(), precession.autoAssignDomains());
|
||||
|
||||
var autoClean = new NodeConfiguration(
|
||||
original.node(),
|
||||
"Foo",
|
||||
true,
|
||||
!original.autoClean(),
|
||||
original.includeInPrecession(),
|
||||
original.autoAssignDomains(),
|
||||
original.keepWarcs(),
|
||||
original.profile(),
|
||||
original.disabled()
|
||||
);
|
||||
|
||||
nodeConfigurationService.save(autoClean);
|
||||
autoClean = nodeConfigurationService.get(original.node());
|
||||
assertNotEquals(original.autoClean(), autoClean.autoClean());
|
||||
|
||||
var disabled = new NodeConfiguration(
|
||||
original.node(),
|
||||
"Foo",
|
||||
true,
|
||||
autoClean.autoClean(),
|
||||
autoClean.includeInPrecession(),
|
||||
autoClean.autoAssignDomains(),
|
||||
autoClean.keepWarcs(),
|
||||
autoClean.profile(),
|
||||
!autoClean.disabled()
|
||||
);
|
||||
nodeConfigurationService.save(disabled);
|
||||
disabled = nodeConfigurationService.get(original.node());
|
||||
assertNotEquals(autoClean.disabled(), disabled.disabled());
|
||||
}
|
||||
}
|
@@ -0,0 +1,3 @@
|
||||
-- Migration script to add AUTO_ASSIGN_DOMAINS column to NODE_CONFIGURATION table
|
||||
|
||||
ALTER TABLE NODE_CONFIGURATION ADD COLUMN AUTO_ASSIGN_DOMAINS BOOLEAN NOT NULL DEFAULT TRUE;
|
@@ -115,9 +115,13 @@ public class CrawlerRetreiver implements AutoCloseable {
|
||||
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(probedUrl.domain, warcRecorder);
|
||||
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
|
||||
|
||||
if (!robotsRules.isAllowed(probedUrl.toString())) {
|
||||
warcRecorder.flagAsRobotsTxtError(probedUrl);
|
||||
yield 1; // Nothing we can do here, we aren't allowed to fetch the root URL
|
||||
}
|
||||
delayTimer.waitFetchDelay(0); // initial delay after robots.txt
|
||||
|
||||
DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(probedUrl, delayTimer);
|
||||
DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(probedUrl, robotsRules, delayTimer);
|
||||
domainStateDb.save(summaryRecord);
|
||||
|
||||
if (Thread.interrupted()) {
|
||||
@@ -270,11 +274,11 @@ public class CrawlerRetreiver implements AutoCloseable {
|
||||
|
||||
|
||||
|
||||
private DomainStateDb.SummaryRecord sniffRootDocument(EdgeUrl rootUrl, CrawlDelayTimer timer) {
|
||||
private DomainStateDb.SummaryRecord sniffRootDocument(EdgeUrl rootUrl, SimpleRobotRules robotsRules, CrawlDelayTimer timer) {
|
||||
Optional<String> feedLink = Optional.empty();
|
||||
|
||||
try {
|
||||
var url = rootUrl.withPathAndParam("/", null);
|
||||
EdgeUrl url = rootUrl.withPathAndParam("/", null);
|
||||
|
||||
HttpFetchResult result = fetcher.fetchContent(url, warcRecorder, cookies, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED);
|
||||
timer.waitFetchDelay(0);
|
||||
@@ -331,7 +335,7 @@ public class CrawlerRetreiver implements AutoCloseable {
|
||||
|
||||
|
||||
if (feedLink.isEmpty()) {
|
||||
feedLink = guessFeedUrl(timer);
|
||||
feedLink = guessFeedUrl(timer, robotsRules);
|
||||
}
|
||||
|
||||
// Download the sitemap if available
|
||||
@@ -339,14 +343,18 @@ public class CrawlerRetreiver implements AutoCloseable {
|
||||
|
||||
// Grab the favicon if it exists
|
||||
|
||||
if (fetcher.fetchContent(faviconUrl, warcRecorder, cookies, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED) instanceof HttpFetchResult.ResultOk iconResult) {
|
||||
String contentType = iconResult.header("Content-Type");
|
||||
byte[] iconData = iconResult.getBodyBytes();
|
||||
if (robotsRules.isAllowed(faviconUrl.toString())) {
|
||||
if (fetcher.fetchContent(faviconUrl, warcRecorder, cookies, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED)
|
||||
instanceof HttpFetchResult.ResultOk iconResult)
|
||||
{
|
||||
String contentType = iconResult.header("Content-Type");
|
||||
byte[] iconData = iconResult.getBodyBytes();
|
||||
|
||||
domainStateDb.saveIcon(
|
||||
domain,
|
||||
new DomainStateDb.FaviconRecord(contentType, iconData)
|
||||
);
|
||||
domainStateDb.saveIcon(
|
||||
domain,
|
||||
new DomainStateDb.FaviconRecord(contentType, iconData)
|
||||
);
|
||||
}
|
||||
}
|
||||
timer.waitFetchDelay(0);
|
||||
|
||||
@@ -383,7 +391,7 @@ public class CrawlerRetreiver implements AutoCloseable {
|
||||
"blog/rss"
|
||||
);
|
||||
|
||||
private Optional<String> guessFeedUrl(CrawlDelayTimer timer) throws InterruptedException {
|
||||
private Optional<String> guessFeedUrl(CrawlDelayTimer timer, SimpleRobotRules robotsRules) throws InterruptedException {
|
||||
var oldDomainStateRecord = domainStateDb.getSummary(domain);
|
||||
|
||||
// If we are already aware of an old feed URL, then we can just revalidate it
|
||||
@@ -396,6 +404,9 @@ public class CrawlerRetreiver implements AutoCloseable {
|
||||
|
||||
for (String endpoint : likelyFeedEndpoints) {
|
||||
String url = "https://" + domain + "/" + endpoint;
|
||||
if (!robotsRules.isAllowed(url)) {
|
||||
continue;
|
||||
}
|
||||
if (validateFeedUrl(url, timer)) {
|
||||
return Optional.of(url);
|
||||
}
|
||||
|
12
code/processes/new-domain-process/README.md
Normal file
12
code/processes/new-domain-process/README.md
Normal file
@@ -0,0 +1,12 @@
|
||||
The new domain process (NDP) is a process that evaluates new domains for
|
||||
inclusion in the search engine index.
|
||||
|
||||
It visits the root document of each candidate domain, ensures that it's reachable,
|
||||
verifies that the response is valid HTML, and checks for a few factors such as length
|
||||
and links before deciding whether to assign the domain to a node.
|
||||
|
||||
The NDP process will assign new domains to the node with the fewest assigned domains.
|
||||
|
||||
The NDP process is triggered with a goal target number of domains to process, and
|
||||
will find domains until that target is reached. If e.g. a goal of 100 is set,
|
||||
and 50 are in the index, it will find 50 more domains.
|
@@ -31,6 +31,8 @@ dependencies {
|
||||
implementation project(':code:libraries:geo-ip')
|
||||
implementation project(':code:libraries:message-queue')
|
||||
implementation project(':code:libraries:blocking-thread-pool')
|
||||
|
||||
implementation project(':code:functions:link-graph:api')
|
||||
|
||||
implementation project(':code:processes:process-mq-api')
|
||||
implementation project(':code:processes:crawling-process:ft-content-type')
|
||||
|
@@ -2,40 +2,41 @@ package nu.marginalia.ndp;
|
||||
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.WmsaHome;
|
||||
import nu.marginalia.contenttype.ContentType;
|
||||
import nu.marginalia.contenttype.DocumentBodyToString;
|
||||
import nu.marginalia.coordination.DomainCoordinator;
|
||||
import nu.marginalia.link_parser.LinkParser;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.model.EdgeUrl;
|
||||
import nu.marginalia.ndp.io.HttpClientProvider;
|
||||
import nu.marginalia.ndp.model.DomainToTest;
|
||||
import org.apache.hc.client5.http.classic.HttpClient;
|
||||
import org.apache.hc.core5.http.ClassicHttpResponse;
|
||||
import org.apache.hc.core5.http.io.entity.EntityUtils;
|
||||
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
|
||||
import org.jsoup.Jsoup;
|
||||
import org.jsoup.nodes.Document;
|
||||
import org.jsoup.nodes.Element;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.io.InputStream;
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
/** Evaluates a domain to determine if it is worth indexing.
|
||||
* This class fetches the root document, checks the response code, content type,
|
||||
* and parses the HTML to ensure it smells alright.
|
||||
*/
|
||||
@Singleton
|
||||
public class DomainEvaluator {
|
||||
private final HttpClient client;
|
||||
private final String userAgentString = WmsaHome.getUserAgent().uaString();
|
||||
|
||||
private final LinkParser linkParser = new LinkParser();
|
||||
private final DomainCoordinator domainCoordinator;
|
||||
sealed interface FetchResult permits FetchSuccess, FetchFailure {}
|
||||
record FetchSuccess(Document content) implements FetchResult {}
|
||||
record FetchFailure(String reason) implements FetchResult {}
|
||||
|
||||
@Inject
|
||||
public DomainEvaluator(DomainCoordinator domainCoordinator) throws NoSuchAlgorithmException, KeyManagementException {
|
||||
@@ -43,100 +44,103 @@ public class DomainEvaluator {
|
||||
client = HttpClientProvider.createClient();
|
||||
}
|
||||
|
||||
public boolean evaluateDomain(DomainToTest domain) throws Exception {
|
||||
var edgeDomain = new EdgeDomain(domain.domainName());
|
||||
public boolean evaluateDomain(String domainName) {
|
||||
var edgeDomain = new EdgeDomain(domainName);
|
||||
|
||||
// Grab a lock on the domain to prevent concurrent evaluations between processes
|
||||
try (var lock = domainCoordinator.lockDomain(edgeDomain)) {
|
||||
var result = fetch(domain.domainName());
|
||||
var rootUrl = edgeDomain.toRootUrlHttps();
|
||||
|
||||
Instant start = Instant.now();
|
||||
var request = ClassicRequestBuilder.get(rootUrl.asURI())
|
||||
.addHeader("User-Agent", userAgentString)
|
||||
.addHeader("Accept-Encoding", "gzip")
|
||||
.addHeader("Accept", "text/html,application/xhtml+xml;q=0.9")
|
||||
.build();
|
||||
|
||||
var ret = switch(result) {
|
||||
case FetchSuccess(Document content) -> validateHtml(content, edgeDomain);
|
||||
case FetchFailure failure -> false;
|
||||
};
|
||||
return client.execute(request, (rsp) -> {
|
||||
if (rsp.getEntity() == null)
|
||||
return false;
|
||||
|
||||
// Sleep for up to 1 second before we yield the lock to respect rate limits reasonably well
|
||||
Instant end = Instant.now();
|
||||
Duration sleepDuration = Duration.ofSeconds(1).minus(Duration.between(start, end));
|
||||
try {
|
||||
// Check if the response code indicates a successful fetch
|
||||
if (200 != rsp.getCode()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (sleepDuration.isPositive()) {
|
||||
TimeUnit.MILLISECONDS.sleep(sleepDuration.toMillis());
|
||||
}
|
||||
byte[] content;
|
||||
// Read the content from the response entity
|
||||
try (InputStream contentStream = rsp.getEntity().getContent()) {
|
||||
content = contentStream.readNBytes(8192);
|
||||
}
|
||||
|
||||
return ret;
|
||||
// Parse the content (if it's valid)
|
||||
ContentType contentType = ContentType.parse(rsp.getEntity().getContentType());
|
||||
|
||||
// Validate the content type
|
||||
if (!contentType.contentType().startsWith("text/html") && !contentType.contentType().startsWith("application/xhtml+xml"))
|
||||
return false;
|
||||
|
||||
// Parse the document body to a Jsoup Document
|
||||
final Document document = Jsoup.parse(DocumentBodyToString.getStringData(contentType, content));
|
||||
final String text = document.body().text();
|
||||
|
||||
if (text.length() < 100)
|
||||
return false;
|
||||
if (text.contains("404 Not Found") || text.contains("Page not found"))
|
||||
return false;
|
||||
if (hasMetaRefresh(document))
|
||||
return false; // This almost always indicates a parked domain
|
||||
if (!hasInternalLink(document, edgeDomain, rootUrl))
|
||||
return false; // No internal links means it's not worth indexing
|
||||
|
||||
return true;
|
||||
}
|
||||
catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
finally {
|
||||
// May or may not be necessary, but let's ensure we clean up the response entity
|
||||
// to avoid resource leaks
|
||||
EntityUtils.consumeQuietly(rsp.getEntity());
|
||||
|
||||
// Sleep for a while before yielding the lock, to avoid immediately hammering the domain
|
||||
// from another process
|
||||
sleepQuietly(Duration.ofSeconds(1));
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception ex) {
|
||||
return false; // If we fail to fetch or parse the domain, we consider it invalid
|
||||
}
|
||||
}
|
||||
|
||||
private boolean validateHtml(Document content, EdgeDomain domain) {
|
||||
var rootUrl = domain.toRootUrlHttps();
|
||||
var text = content.body().text();
|
||||
private boolean hasInternalLink(Document document, EdgeDomain currentDomain, EdgeUrl rootUrl) {
|
||||
for (Element atag : document.select("a")) {
|
||||
Optional<EdgeDomain> destDomain = linkParser
|
||||
.parseLink(rootUrl, atag)
|
||||
.map(EdgeUrl::getDomain);
|
||||
|
||||
if (text.length() < 100) {
|
||||
return false; // Too short to be a valid page
|
||||
if (destDomain.isPresent() && Objects.equals(currentDomain, destDomain.get()))
|
||||
return true;
|
||||
}
|
||||
|
||||
if (text.contains("404 Not Found") || text.contains("Page not found")) {
|
||||
return false; // Common indicators of a 404 page
|
||||
}
|
||||
|
||||
for (var metaTag : content.select("meta")) {
|
||||
if ("refresh".equalsIgnoreCase(metaTag.attr("http-equiv"))) {
|
||||
return false; // Page has a refresh tag, very likely a parked domain
|
||||
}
|
||||
}
|
||||
|
||||
boolean hasInternalLink = false;
|
||||
|
||||
for (var atag : content.select("a")) {
|
||||
var link = linkParser.parseLink(rootUrl, atag);
|
||||
if (link.isEmpty()) {
|
||||
continue; // Skip invalid links
|
||||
}
|
||||
var edgeUrl = link.get();
|
||||
if (Objects.equals(domain, edgeUrl.getDomain())) {
|
||||
hasInternalLink = true;
|
||||
}
|
||||
}
|
||||
|
||||
return hasInternalLink;
|
||||
return false;
|
||||
}
|
||||
|
||||
private FetchResult fetch(String domain) throws URISyntaxException {
|
||||
var uri = new URI("https://" + domain + "/");
|
||||
|
||||
var request = ClassicRequestBuilder.get(uri)
|
||||
.addHeader("User-Agent", userAgentString)
|
||||
.addHeader("Accept-Encoding", "gzip")
|
||||
.addHeader("Accept", "text/html,application/xhtml+xml;q=0.9")
|
||||
.build();
|
||||
private boolean hasMetaRefresh(Document document) {
|
||||
for (Element metaTag : document.select("meta")) {
|
||||
if ("refresh".equalsIgnoreCase(metaTag.attr("http-equiv")))
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void sleepQuietly(Duration duration) {
|
||||
try {
|
||||
return client.execute(request, (rsp) -> responseHandler(rsp, domain));
|
||||
} catch (Exception e) {
|
||||
return new FetchFailure("Failed to fetch domain: " + e.getMessage());
|
||||
TimeUnit.MILLISECONDS.sleep(duration.toMillis());
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private FetchResult responseHandler(ClassicHttpResponse rsp, String domain) {
|
||||
if (rsp.getEntity() == null)
|
||||
return new FetchFailure("No content returned from " + domain);
|
||||
|
||||
try {
|
||||
int code = rsp.getCode();
|
||||
byte[] content = rsp.getEntity().getContent().readAllBytes();
|
||||
|
||||
if (code >= 300) {
|
||||
return new FetchFailure("Received HTTP " + code + " from " + domain);
|
||||
}
|
||||
|
||||
ContentType contentType = ContentType.parse(rsp.getEntity().getContentType());
|
||||
var html = DocumentBodyToString.getStringData(contentType, content);
|
||||
return new FetchSuccess(Jsoup.parse(html));
|
||||
}
|
||||
catch (Exception e) {
|
||||
EntityUtils.consumeQuietly(rsp.getEntity());
|
||||
return new FetchFailure("Failed to read content from " + domain + ": " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -16,6 +16,9 @@ public class DomainNodeAllocator {
|
||||
|
||||
private final NodeConfigurationService nodeConfigurationService;
|
||||
private final HikariDataSource dataSource;
|
||||
private final PriorityQueue<NodeCount> countPerNode = new PriorityQueue<>();
|
||||
|
||||
private volatile boolean initialized = false;
|
||||
|
||||
private record NodeCount(int nodeId, int count)
|
||||
implements Comparable<NodeCount>
|
||||
@@ -30,8 +33,6 @@ public class DomainNodeAllocator {
|
||||
}
|
||||
}
|
||||
|
||||
private final PriorityQueue<NodeCount> countPerNode = new PriorityQueue<>();
|
||||
volatile boolean initialized = false;
|
||||
|
||||
@Inject
|
||||
public DomainNodeAllocator(NodeConfigurationService nodeConfigurationService, HikariDataSource dataSource) {
|
||||
@@ -43,6 +44,43 @@ public class DomainNodeAllocator {
|
||||
.start(this::initialize);
|
||||
}
|
||||
|
||||
public synchronized int totalCount() {
|
||||
ensureInitialized();
|
||||
return countPerNode.stream().mapToInt(NodeCount::count).sum();
|
||||
}
|
||||
|
||||
/** Returns the next node ID to assign a domain to.
|
||||
* This method is synchronized to ensure thread safety when multiple threads are allocating domains.
|
||||
* The node ID returned is guaranteed to be one of the viable nodes configured in the system.
|
||||
*/
|
||||
public synchronized int nextNodeId() {
|
||||
ensureInitialized();
|
||||
|
||||
// Synchronized is fine here as this is not a hot path
|
||||
// (and PriorityBlockingQueue won't help since we're re-adding the same element with a new count all the time)
|
||||
|
||||
NodeCount allocation = countPerNode.remove();
|
||||
countPerNode.add(allocation.incrementCount());
|
||||
return allocation.nodeId();
|
||||
}
|
||||
|
||||
|
||||
private void ensureInitialized() {
|
||||
if (initialized) return;
|
||||
|
||||
synchronized (this) {
|
||||
while (!initialized) {
|
||||
try {
|
||||
// Wait until the initialization is complete
|
||||
this.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException("DomainAllocator initialization interrupted", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void initialize() {
|
||||
if (initialized) return;
|
||||
@@ -53,6 +91,9 @@ public class DomainNodeAllocator {
|
||||
for (var node : nodeConfigurationService.getAll()) {
|
||||
if (node.disabled())
|
||||
continue;
|
||||
if (!node.autoAssignDomains())
|
||||
continue;
|
||||
|
||||
if (node.profile().permitBatchCrawl())
|
||||
viableNodes.add(node.node());
|
||||
}
|
||||
@@ -89,39 +130,5 @@ public class DomainNodeAllocator {
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
private void ensureInitialized() {
|
||||
if (initialized) return;
|
||||
|
||||
synchronized (this) {
|
||||
while (!initialized) {
|
||||
try {
|
||||
// Wait until the initialization is complete
|
||||
this.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException("DomainAllocator initialization interrupted", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized int totalCount() {
|
||||
ensureInitialized();
|
||||
return countPerNode.stream().mapToInt(NodeCount::count).sum();
|
||||
}
|
||||
|
||||
/** Returns the next node ID to assign a domain to.
|
||||
* This method is synchronized to ensure thread safety when multiple threads are allocating domains.
|
||||
* The node ID returned is guaranteed to be one of the viable nodes configured in the system.
|
||||
*/
|
||||
public synchronized int nextNodeId() {
|
||||
ensureInitialized();
|
||||
|
||||
// Synchronized is fine here as this is not a hot path
|
||||
// (and PriorityBlockingQueue won't help since we're re-adding the same element with a new count all the time)
|
||||
|
||||
NodeCount allocation = countPerNode.remove();
|
||||
countPerNode.add(allocation.incrementCount());
|
||||
return allocation.nodeId();
|
||||
}
|
||||
}
|
||||
|
@@ -2,30 +2,41 @@ package nu.marginalia.ndp;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import it.unimi.dsi.fastutil.ints.Int2IntMap;
|
||||
import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
|
||||
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
|
||||
import nu.marginalia.api.linkgraph.AggregateLinkGraphClient;
|
||||
import nu.marginalia.ndp.model.DomainToTest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.ResultSet;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class DomainTestingQueue {
|
||||
private final ArrayBlockingQueue<DomainToTest> queue = new ArrayBlockingQueue<>(1000);
|
||||
private static Logger logger = LoggerFactory.getLogger(DomainTestingQueue.class);
|
||||
|
||||
private final ArrayBlockingQueue<DomainToTest> queue = new ArrayBlockingQueue<>(2);
|
||||
|
||||
// This will grow quite large, but should be manageable in memory, as theoretical maximum is around 100M domains,
|
||||
// order of 2 GB in memory.
|
||||
private final ConcurrentHashMap<String, Boolean> takenDomains = new ConcurrentHashMap<>();
|
||||
|
||||
private final HikariDataSource dataSource;
|
||||
private final AggregateLinkGraphClient linkGraphClient;
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(DomainTestingQueue.class);
|
||||
|
||||
@Inject
|
||||
public DomainTestingQueue(HikariDataSource dataSource) {
|
||||
public DomainTestingQueue(HikariDataSource dataSource,
|
||||
AggregateLinkGraphClient linkGraphClient
|
||||
) {
|
||||
this.dataSource = dataSource;
|
||||
this.linkGraphClient = linkGraphClient;
|
||||
|
||||
Thread.ofPlatform()
|
||||
.name("DomainTestingQueue::fetch()")
|
||||
@@ -43,9 +54,10 @@ public class DomainTestingQueue {
|
||||
SET STATE='ACCEPTED'
|
||||
WHERE DOMAIN_ID=?
|
||||
""");
|
||||
var assigNodeStmt = conn.prepareStatement("""
|
||||
var assignNodeStmt = conn.prepareStatement("""
|
||||
UPDATE EC_DOMAIN SET NODE_AFFINITY=?
|
||||
WHERE ID=?
|
||||
AND EC_DOMAIN.NODE_AFFINITY < 0
|
||||
""")
|
||||
)
|
||||
{
|
||||
@@ -53,9 +65,9 @@ public class DomainTestingQueue {
|
||||
flagOkStmt.setInt(1, domain.domainId());
|
||||
flagOkStmt.executeUpdate();
|
||||
|
||||
assigNodeStmt.setInt(1, nodeId);
|
||||
assigNodeStmt.setInt(2, domain.domainId());
|
||||
assigNodeStmt.executeUpdate();
|
||||
assignNodeStmt.setInt(1, nodeId);
|
||||
assignNodeStmt.setInt(2, domain.domainId());
|
||||
assignNodeStmt.executeUpdate();
|
||||
conn.commit();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to accept domain in database", e);
|
||||
@@ -105,9 +117,14 @@ public class DomainTestingQueue {
|
||||
}
|
||||
|
||||
if (domains.isEmpty()) {
|
||||
refreshQueue(conn);
|
||||
if (!refreshQueue(conn)) {
|
||||
throw new RuntimeException("No new domains found, aborting!");
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (RuntimeException e) {
|
||||
throw e; // Rethrow runtime exceptions to avoid wrapping them in another runtime exception
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException("Failed to fetch domains from database", e);
|
||||
}
|
||||
@@ -124,25 +141,100 @@ public class DomainTestingQueue {
|
||||
}
|
||||
}
|
||||
|
||||
private void refreshQueue(Connection conn) {
|
||||
private boolean refreshQueue(Connection conn) {
|
||||
logger.info("Refreshing domain queue in database");
|
||||
try (var stmt = conn.createStatement()) {
|
||||
conn.setAutoCommit(false);
|
||||
logger.info("Revitalizing rejected domains");
|
||||
|
||||
// Revitalize rejected domains
|
||||
stmt.executeUpdate("""
|
||||
UPDATE NDP_NEW_DOMAINS
|
||||
SET STATE='NEW'
|
||||
WHERE NDP_NEW_DOMAINS.STATE = 'REJECTED'
|
||||
AND DATE_ADD(TS_CHANGE, INTERVAL CHECK_COUNT DAY) > NOW()
|
||||
""");
|
||||
conn.commit();
|
||||
Int2IntMap domainIdToCount = new Int2IntOpenHashMap();
|
||||
|
||||
// Load known domain IDs from the database to avoid inserting duplicates from NDP_NEW_DOMAINS
|
||||
// or domains that are already assigned to a node
|
||||
{
|
||||
IntOpenHashSet knownIds = new IntOpenHashSet();
|
||||
|
||||
try (var stmt = conn.createStatement()) {
|
||||
ResultSet rs = stmt.executeQuery("SELECT DOMAIN_ID FROM NDP_NEW_DOMAINS");
|
||||
rs.setFetchSize(10_000);
|
||||
while (rs.next()) {
|
||||
int domainId = rs.getInt("DOMAIN_ID");
|
||||
knownIds.add(domainId);
|
||||
}
|
||||
|
||||
rs = stmt.executeQuery("SELECT ID FROM EC_DOMAIN WHERE NODE_AFFINITY>=0");
|
||||
rs.setFetchSize(10_000);
|
||||
while (rs.next()) {
|
||||
int domainId = rs.getInt("ID");
|
||||
knownIds.add(domainId);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to load known domain IDs from database", e);
|
||||
}
|
||||
|
||||
// Ensure the link graph is ready before proceeding. This is mainly necessary in a cold reboot
|
||||
// of the entire system.
|
||||
try {
|
||||
logger.info("Waiting for link graph client to be ready...");
|
||||
linkGraphClient.waitReady(Duration.ofHours(1));
|
||||
logger.info("Link graph client is ready, fetching domain links...");
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
// Fetch all domain links from the link graph and count by how many sources each dest domain is linked from
|
||||
var iter = linkGraphClient.getAllDomainLinks().iterator();
|
||||
while (iter.advance()) {
|
||||
int dest = iter.dest();
|
||||
if (!knownIds.contains(dest)) {
|
||||
domainIdToCount.mergeInt(dest, 1, (i, j) -> i + j);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
boolean didInsert = false;
|
||||
|
||||
/* Insert new domains into NDP_NEW_DOMAINS table */
|
||||
try (var insertStmt = conn.prepareStatement("""
|
||||
INSERT IGNORE INTO NDP_NEW_DOMAINS (DOMAIN_ID, PRIORITY) VALUES (?, ?)
|
||||
""")) {
|
||||
conn.setAutoCommit(false);
|
||||
|
||||
int cnt = 0;
|
||||
for (var entry : domainIdToCount.int2IntEntrySet()) {
|
||||
int domainId = entry.getIntKey();
|
||||
int count = entry.getIntValue();
|
||||
|
||||
insertStmt.setInt(1, domainId);
|
||||
insertStmt.setInt(2, count);
|
||||
insertStmt.addBatch();
|
||||
|
||||
if (++cnt >= 1000) {
|
||||
cnt = 0;
|
||||
insertStmt.executeBatch(); // Execute in batches to avoid memory issues
|
||||
conn.commit();
|
||||
didInsert = true;
|
||||
}
|
||||
}
|
||||
if (cnt != 0) {
|
||||
insertStmt.executeBatch(); // Execute any remaining batch
|
||||
conn.commit();
|
||||
didInsert = true;
|
||||
}
|
||||
|
||||
logger.info("Queue refreshed successfully");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to refresh queue in database", e);
|
||||
}
|
||||
|
||||
// Clean up NDP_NEW_DOMAINS table to remove any domains that are already in EC_DOMAIN
|
||||
// This acts not only to clean up domains that we've flagged as ACCEPTED, but also to
|
||||
// repair inconsistent states where domains might have incorrectly been added to NDP_NEW_DOMAINS
|
||||
try (var stmt = conn.createStatement()) {
|
||||
stmt.executeUpdate("DELETE FROM NDP_NEW_DOMAINS WHERE DOMAIN_ID IN (SELECT ID FROM EC_DOMAIN WHERE NODE_AFFINITY>=0)");
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException("Failed to clean up NDP_NEW_DOMAINS", e);
|
||||
}
|
||||
|
||||
return didInsert;
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -84,8 +84,23 @@ public class NdpMain extends ProcessMainClass {
|
||||
hb.progress("Discovery Process", cnt, toInsertCount);
|
||||
}
|
||||
|
||||
var nextDomain = domainTestingQueue.next();
|
||||
threadPool.submit(() -> evaluateDomain(nextDomain));
|
||||
final DomainToTest nextDomain = domainTestingQueue.next();
|
||||
threadPool.submit(() -> {
|
||||
try {
|
||||
if (domainEvaluator.evaluateDomain(nextDomain.domainName())) {
|
||||
logger.info("Accepting: {}", nextDomain.domainName());
|
||||
domainCount.incrementAndGet();
|
||||
domainTestingQueue.accept(nextDomain, domainNodeAllocator.nextNodeId());
|
||||
} else {
|
||||
logger.info("Rejecting: {}", nextDomain.domainName());
|
||||
domainTestingQueue.reject(nextDomain);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
domainTestingQueue.reject(nextDomain);
|
||||
logger.error("Error evaluating domain: " + nextDomain.domainId(), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,24 +112,6 @@ public class NdpMain extends ProcessMainClass {
|
||||
|
||||
}
|
||||
|
||||
|
||||
private void evaluateDomain(DomainToTest nextDomain) {
|
||||
try {
|
||||
if (domainEvaluator.evaluateDomain(nextDomain)) {
|
||||
logger.info("Accepting: {}", nextDomain.domainName());
|
||||
domainCount.incrementAndGet();
|
||||
domainTestingQueue.accept(nextDomain, domainNodeAllocator.nextNodeId());
|
||||
} else {
|
||||
logger.info("Rejecting: {}", nextDomain.domainName());
|
||||
domainTestingQueue.reject(nextDomain);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
domainTestingQueue.reject(nextDomain);
|
||||
logger.error("Error evaluating domain: " + nextDomain.domainId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
// Prevent Java from caching DNS lookups forever (filling up the system RAM as a result)
|
||||
Security.setProperty("networkaddress.cache.ttl" , "3600");
|
||||
|
@@ -0,0 +1,29 @@
|
||||
package nu.marginalia.ndp;
|
||||
|
||||
import nu.marginalia.coordination.LocalDomainCoordinator;
|
||||
import org.junit.jupiter.api.Tag;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
class DomainEvaluatorTest {
|
||||
|
||||
@Tag("flaky") // Exclude from CI runs due to potential network issues
|
||||
@Test
|
||||
public void testSunnyDay() throws NoSuchAlgorithmException, KeyManagementException {
|
||||
DomainEvaluator evaluator = new DomainEvaluator(new LocalDomainCoordinator());
|
||||
|
||||
// Should be a valid domain
|
||||
assertTrue(evaluator.evaluateDomain("www.marginalia.nu"));
|
||||
|
||||
// Should be a redirect to www.marginalia.nu
|
||||
assertFalse(evaluator.evaluateDomain("memex.marginalia.nu"));
|
||||
|
||||
// Should fail on Anubis
|
||||
assertFalse(evaluator.evaluateDomain("marginalia-search.com"));
|
||||
}
|
||||
}
|
@@ -28,6 +28,7 @@ the data generated by the loader.
|
||||
## 5. Other Processes
|
||||
|
||||
* Ping Process: The [ping-process](ping-process/) keeps track of the aliveness of websites, gathering fingerprint information about the security posture of the website, as well as DNS information.
|
||||
* New Domain Process (NDP): The [new-domain-process](new-domain-process/) evaluates new domains for inclusion in the search engine index.
|
||||
* Live-Crawling Process: The [live-crawling-process](live-crawling-process/) is a process that crawls websites in real-time based on RSS feeds, updating a smaller index with the latest content.
|
||||
|
||||
## Overview
|
||||
|
@@ -280,6 +280,7 @@ public class ControlNodeService {
|
||||
"on".equalsIgnoreCase(request.queryParams("autoClean")),
|
||||
"on".equalsIgnoreCase(request.queryParams("includeInPrecession")),
|
||||
"on".equalsIgnoreCase(request.queryParams("keepWarcs")),
|
||||
"on".equalsIgnoreCase(request.queryParams("autoAssignDomains")),
|
||||
NodeProfile.valueOf(request.queryParams("profile")),
|
||||
"on".equalsIgnoreCase(request.queryParams("disabled"))
|
||||
);
|
||||
|
@@ -66,13 +66,23 @@
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="form-check form-switch">
|
||||
<input class="form-check-input" type="checkbox" role="switch" name="autoAssignDomains" {{#if config.autoAssignDomains}}checked{{/if}}>
|
||||
<label class="form-check-label" for="autoClean">Auto-Assign Domains</label>
|
||||
|
||||
<div class="form-text">If true, the New Domain Process will assign new domains to this node and all other nodes with this setting enabled.
|
||||
This is the default behavior, but can be overridden if you want one node with a specific manual domain assignment.
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- This is not currently used, but may be in the future
|
||||
<div class="form-check form-switch">
|
||||
<input class="form-check-input" type="checkbox" role="switch" name="includeInPrecession" {{#if config.includeInPrecession}}checked{{/if}}>
|
||||
<label class="form-check-label" for="includeInPrecession">Include in crawling precession</label>
|
||||
|
||||
<div class="form-text">If true, this node will be included in the crawling precession.</div>
|
||||
</div>
|
||||
|
||||
-->
|
||||
<div class="form-check form-switch">
|
||||
<input class="form-check-input" type="checkbox" role="switch" name="keepWarcs" {{#if config.keepWarcs}}checked{{/if}}>
|
||||
<label class="form-check-label" for="includeInPrecession">Keep WARC files during crawling</label>
|
||||
|
Reference in New Issue
Block a user