mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 17:32:39 +02:00
Compare commits
2 Commits
deploy-026
...
deploy-026
Author | SHA1 | Date | |
---|---|---|---|
|
e50d09cc01 | ||
|
bce3892ce0 |
@@ -115,9 +115,13 @@ public class CrawlerRetreiver implements AutoCloseable {
|
|||||||
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(probedUrl.domain, warcRecorder);
|
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(probedUrl.domain, warcRecorder);
|
||||||
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
|
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
|
||||||
|
|
||||||
|
if (!robotsRules.isAllowed(probedUrl.toString())) {
|
||||||
|
warcRecorder.flagAsRobotsTxtError(probedUrl);
|
||||||
|
yield 1; // Nothing we can do here, we aren't allowed to fetch the root URL
|
||||||
|
}
|
||||||
delayTimer.waitFetchDelay(0); // initial delay after robots.txt
|
delayTimer.waitFetchDelay(0); // initial delay after robots.txt
|
||||||
|
|
||||||
DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(probedUrl, delayTimer);
|
DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(probedUrl, robotsRules, delayTimer);
|
||||||
domainStateDb.save(summaryRecord);
|
domainStateDb.save(summaryRecord);
|
||||||
|
|
||||||
if (Thread.interrupted()) {
|
if (Thread.interrupted()) {
|
||||||
@@ -270,11 +274,11 @@ public class CrawlerRetreiver implements AutoCloseable {
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
private DomainStateDb.SummaryRecord sniffRootDocument(EdgeUrl rootUrl, CrawlDelayTimer timer) {
|
private DomainStateDb.SummaryRecord sniffRootDocument(EdgeUrl rootUrl, SimpleRobotRules robotsRules, CrawlDelayTimer timer) {
|
||||||
Optional<String> feedLink = Optional.empty();
|
Optional<String> feedLink = Optional.empty();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
var url = rootUrl.withPathAndParam("/", null);
|
EdgeUrl url = rootUrl.withPathAndParam("/", null);
|
||||||
|
|
||||||
HttpFetchResult result = fetcher.fetchContent(url, warcRecorder, cookies, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED);
|
HttpFetchResult result = fetcher.fetchContent(url, warcRecorder, cookies, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED);
|
||||||
timer.waitFetchDelay(0);
|
timer.waitFetchDelay(0);
|
||||||
@@ -331,7 +335,7 @@ public class CrawlerRetreiver implements AutoCloseable {
|
|||||||
|
|
||||||
|
|
||||||
if (feedLink.isEmpty()) {
|
if (feedLink.isEmpty()) {
|
||||||
feedLink = guessFeedUrl(timer);
|
feedLink = guessFeedUrl(timer, robotsRules);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Download the sitemap if available
|
// Download the sitemap if available
|
||||||
@@ -339,7 +343,10 @@ public class CrawlerRetreiver implements AutoCloseable {
|
|||||||
|
|
||||||
// Grab the favicon if it exists
|
// Grab the favicon if it exists
|
||||||
|
|
||||||
if (fetcher.fetchContent(faviconUrl, warcRecorder, cookies, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED) instanceof HttpFetchResult.ResultOk iconResult) {
|
if (robotsRules.isAllowed(faviconUrl.toString())) {
|
||||||
|
if (fetcher.fetchContent(faviconUrl, warcRecorder, cookies, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED)
|
||||||
|
instanceof HttpFetchResult.ResultOk iconResult)
|
||||||
|
{
|
||||||
String contentType = iconResult.header("Content-Type");
|
String contentType = iconResult.header("Content-Type");
|
||||||
byte[] iconData = iconResult.getBodyBytes();
|
byte[] iconData = iconResult.getBodyBytes();
|
||||||
|
|
||||||
@@ -348,6 +355,7 @@ public class CrawlerRetreiver implements AutoCloseable {
|
|||||||
new DomainStateDb.FaviconRecord(contentType, iconData)
|
new DomainStateDb.FaviconRecord(contentType, iconData)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
timer.waitFetchDelay(0);
|
timer.waitFetchDelay(0);
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -383,7 +391,7 @@ public class CrawlerRetreiver implements AutoCloseable {
|
|||||||
"blog/rss"
|
"blog/rss"
|
||||||
);
|
);
|
||||||
|
|
||||||
private Optional<String> guessFeedUrl(CrawlDelayTimer timer) throws InterruptedException {
|
private Optional<String> guessFeedUrl(CrawlDelayTimer timer, SimpleRobotRules robotsRules) throws InterruptedException {
|
||||||
var oldDomainStateRecord = domainStateDb.getSummary(domain);
|
var oldDomainStateRecord = domainStateDb.getSummary(domain);
|
||||||
|
|
||||||
// If we are already aware of an old feed URL, then we can just revalidate it
|
// If we are already aware of an old feed URL, then we can just revalidate it
|
||||||
@@ -396,6 +404,9 @@ public class CrawlerRetreiver implements AutoCloseable {
|
|||||||
|
|
||||||
for (String endpoint : likelyFeedEndpoints) {
|
for (String endpoint : likelyFeedEndpoints) {
|
||||||
String url = "https://" + domain + "/" + endpoint;
|
String url = "https://" + domain + "/" + endpoint;
|
||||||
|
if (!robotsRules.isAllowed(url)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (validateFeedUrl(url, timer)) {
|
if (validateFeedUrl(url, timer)) {
|
||||||
return Optional.of(url);
|
return Optional.of(url);
|
||||||
}
|
}
|
||||||
|
@@ -2,40 +2,41 @@ package nu.marginalia.ndp;
|
|||||||
|
|
||||||
|
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import com.google.inject.Singleton;
|
||||||
import nu.marginalia.WmsaHome;
|
import nu.marginalia.WmsaHome;
|
||||||
import nu.marginalia.contenttype.ContentType;
|
import nu.marginalia.contenttype.ContentType;
|
||||||
import nu.marginalia.contenttype.DocumentBodyToString;
|
import nu.marginalia.contenttype.DocumentBodyToString;
|
||||||
import nu.marginalia.coordination.DomainCoordinator;
|
import nu.marginalia.coordination.DomainCoordinator;
|
||||||
import nu.marginalia.link_parser.LinkParser;
|
import nu.marginalia.link_parser.LinkParser;
|
||||||
import nu.marginalia.model.EdgeDomain;
|
import nu.marginalia.model.EdgeDomain;
|
||||||
|
import nu.marginalia.model.EdgeUrl;
|
||||||
import nu.marginalia.ndp.io.HttpClientProvider;
|
import nu.marginalia.ndp.io.HttpClientProvider;
|
||||||
import nu.marginalia.ndp.model.DomainToTest;
|
|
||||||
import org.apache.hc.client5.http.classic.HttpClient;
|
import org.apache.hc.client5.http.classic.HttpClient;
|
||||||
import org.apache.hc.core5.http.ClassicHttpResponse;
|
|
||||||
import org.apache.hc.core5.http.io.entity.EntityUtils;
|
import org.apache.hc.core5.http.io.entity.EntityUtils;
|
||||||
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
|
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
|
||||||
import org.jsoup.Jsoup;
|
import org.jsoup.Jsoup;
|
||||||
import org.jsoup.nodes.Document;
|
import org.jsoup.nodes.Document;
|
||||||
|
import org.jsoup.nodes.Element;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.io.InputStream;
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.security.KeyManagementException;
|
import java.security.KeyManagementException;
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.Instant;
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/** Evaluates a domain to determine if it is worth indexing.
|
||||||
|
* This class fetches the root document, checks the response code, content type,
|
||||||
|
* and parses the HTML to ensure it smells alright.
|
||||||
|
*/
|
||||||
|
@Singleton
|
||||||
public class DomainEvaluator {
|
public class DomainEvaluator {
|
||||||
private final HttpClient client;
|
private final HttpClient client;
|
||||||
private final String userAgentString = WmsaHome.getUserAgent().uaString();
|
private final String userAgentString = WmsaHome.getUserAgent().uaString();
|
||||||
|
|
||||||
private final LinkParser linkParser = new LinkParser();
|
private final LinkParser linkParser = new LinkParser();
|
||||||
private final DomainCoordinator domainCoordinator;
|
private final DomainCoordinator domainCoordinator;
|
||||||
sealed interface FetchResult permits FetchSuccess, FetchFailure {}
|
|
||||||
record FetchSuccess(Document content) implements FetchResult {}
|
|
||||||
record FetchFailure(String reason) implements FetchResult {}
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public DomainEvaluator(DomainCoordinator domainCoordinator) throws NoSuchAlgorithmException, KeyManagementException {
|
public DomainEvaluator(DomainCoordinator domainCoordinator) throws NoSuchAlgorithmException, KeyManagementException {
|
||||||
@@ -43,100 +44,103 @@ public class DomainEvaluator {
|
|||||||
client = HttpClientProvider.createClient();
|
client = HttpClientProvider.createClient();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean evaluateDomain(DomainToTest domain) throws Exception {
|
public boolean evaluateDomain(String domainName) {
|
||||||
var edgeDomain = new EdgeDomain(domain.domainName());
|
var edgeDomain = new EdgeDomain(domainName);
|
||||||
|
|
||||||
|
// Grab a lock on the domain to prevent concurrent evaluations between processes
|
||||||
try (var lock = domainCoordinator.lockDomain(edgeDomain)) {
|
try (var lock = domainCoordinator.lockDomain(edgeDomain)) {
|
||||||
var result = fetch(domain.domainName());
|
var rootUrl = edgeDomain.toRootUrlHttps();
|
||||||
|
|
||||||
Instant start = Instant.now();
|
var request = ClassicRequestBuilder.get(rootUrl.asURI())
|
||||||
|
|
||||||
var ret = switch(result) {
|
|
||||||
case FetchSuccess(Document content) -> validateHtml(content, edgeDomain);
|
|
||||||
case FetchFailure failure -> false;
|
|
||||||
};
|
|
||||||
|
|
||||||
// Sleep for up to 1 second before we yield the lock to respect rate limits reasonably well
|
|
||||||
Instant end = Instant.now();
|
|
||||||
Duration sleepDuration = Duration.ofSeconds(1).minus(Duration.between(start, end));
|
|
||||||
|
|
||||||
if (sleepDuration.isPositive()) {
|
|
||||||
TimeUnit.MILLISECONDS.sleep(sleepDuration.toMillis());
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean validateHtml(Document content, EdgeDomain domain) {
|
|
||||||
var rootUrl = domain.toRootUrlHttps();
|
|
||||||
var text = content.body().text();
|
|
||||||
|
|
||||||
if (text.length() < 100) {
|
|
||||||
return false; // Too short to be a valid page
|
|
||||||
}
|
|
||||||
|
|
||||||
if (text.contains("404 Not Found") || text.contains("Page not found")) {
|
|
||||||
return false; // Common indicators of a 404 page
|
|
||||||
}
|
|
||||||
|
|
||||||
for (var metaTag : content.select("meta")) {
|
|
||||||
if ("refresh".equalsIgnoreCase(metaTag.attr("http-equiv"))) {
|
|
||||||
return false; // Page has a refresh tag, very likely a parked domain
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean hasInternalLink = false;
|
|
||||||
|
|
||||||
for (var atag : content.select("a")) {
|
|
||||||
var link = linkParser.parseLink(rootUrl, atag);
|
|
||||||
if (link.isEmpty()) {
|
|
||||||
continue; // Skip invalid links
|
|
||||||
}
|
|
||||||
var edgeUrl = link.get();
|
|
||||||
if (Objects.equals(domain, edgeUrl.getDomain())) {
|
|
||||||
hasInternalLink = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return hasInternalLink;
|
|
||||||
}
|
|
||||||
|
|
||||||
private FetchResult fetch(String domain) throws URISyntaxException {
|
|
||||||
var uri = new URI("https://" + domain + "/");
|
|
||||||
|
|
||||||
var request = ClassicRequestBuilder.get(uri)
|
|
||||||
.addHeader("User-Agent", userAgentString)
|
.addHeader("User-Agent", userAgentString)
|
||||||
.addHeader("Accept-Encoding", "gzip")
|
.addHeader("Accept-Encoding", "gzip")
|
||||||
.addHeader("Accept", "text/html,application/xhtml+xml;q=0.9")
|
.addHeader("Accept", "text/html,application/xhtml+xml;q=0.9")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
try {
|
return client.execute(request, (rsp) -> {
|
||||||
return client.execute(request, (rsp) -> responseHandler(rsp, domain));
|
|
||||||
} catch (Exception e) {
|
|
||||||
return new FetchFailure("Failed to fetch domain: " + e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private FetchResult responseHandler(ClassicHttpResponse rsp, String domain) {
|
|
||||||
if (rsp.getEntity() == null)
|
if (rsp.getEntity() == null)
|
||||||
return new FetchFailure("No content returned from " + domain);
|
return false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
int code = rsp.getCode();
|
// Check if the response code indicates a successful fetch
|
||||||
byte[] content = rsp.getEntity().getContent().readAllBytes();
|
if (200 != rsp.getCode()) {
|
||||||
|
return false;
|
||||||
if (code >= 300) {
|
|
||||||
return new FetchFailure("Received HTTP " + code + " from " + domain);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
byte[] content;
|
||||||
|
// Read the content from the response entity
|
||||||
|
try (InputStream contentStream = rsp.getEntity().getContent()) {
|
||||||
|
content = contentStream.readNBytes(8192);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse the content (if it's valid)
|
||||||
ContentType contentType = ContentType.parse(rsp.getEntity().getContentType());
|
ContentType contentType = ContentType.parse(rsp.getEntity().getContentType());
|
||||||
var html = DocumentBodyToString.getStringData(contentType, content);
|
|
||||||
return new FetchSuccess(Jsoup.parse(html));
|
// Validate the content type
|
||||||
|
if (!contentType.contentType().startsWith("text/html") && !contentType.contentType().startsWith("application/xhtml+xml"))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
// Parse the document body to a Jsoup Document
|
||||||
|
final Document document = Jsoup.parse(DocumentBodyToString.getStringData(contentType, content));
|
||||||
|
final String text = document.body().text();
|
||||||
|
|
||||||
|
if (text.length() < 100)
|
||||||
|
return false;
|
||||||
|
if (text.contains("404 Not Found") || text.contains("Page not found"))
|
||||||
|
return false;
|
||||||
|
if (hasMetaRefresh(document))
|
||||||
|
return false; // This almost always indicates a parked domain
|
||||||
|
if (!hasInternalLink(document, edgeDomain, rootUrl))
|
||||||
|
return false; // No internal links means it's not worth indexing
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
// May or may not be necessary, but let's ensure we clean up the response entity
|
||||||
|
// to avoid resource leaks
|
||||||
EntityUtils.consumeQuietly(rsp.getEntity());
|
EntityUtils.consumeQuietly(rsp.getEntity());
|
||||||
return new FetchFailure("Failed to read content from " + domain + ": " + e.getMessage());
|
|
||||||
|
// Sleep for a while before yielding the lock, to avoid immediately hammering the domain
|
||||||
|
// from another process
|
||||||
|
sleepQuietly(Duration.ofSeconds(1));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
catch (Exception ex) {
|
||||||
|
return false; // If we fail to fetch or parse the domain, we consider it invalid
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean hasInternalLink(Document document, EdgeDomain currentDomain, EdgeUrl rootUrl) {
|
||||||
|
for (Element atag : document.select("a")) {
|
||||||
|
Optional<EdgeDomain> destDomain = linkParser
|
||||||
|
.parseLink(rootUrl, atag)
|
||||||
|
.map(EdgeUrl::getDomain);
|
||||||
|
|
||||||
|
if (destDomain.isPresent() && Objects.equals(currentDomain, destDomain.get()))
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean hasMetaRefresh(Document document) {
|
||||||
|
for (Element metaTag : document.select("meta")) {
|
||||||
|
if ("refresh".equalsIgnoreCase(metaTag.attr("http-equiv")))
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sleepQuietly(Duration duration) {
|
||||||
|
try {
|
||||||
|
TimeUnit.MILLISECONDS.sleep(duration.toMillis());
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@@ -16,6 +16,9 @@ public class DomainNodeAllocator {
|
|||||||
|
|
||||||
private final NodeConfigurationService nodeConfigurationService;
|
private final NodeConfigurationService nodeConfigurationService;
|
||||||
private final HikariDataSource dataSource;
|
private final HikariDataSource dataSource;
|
||||||
|
private final PriorityQueue<NodeCount> countPerNode = new PriorityQueue<>();
|
||||||
|
|
||||||
|
private volatile boolean initialized = false;
|
||||||
|
|
||||||
private record NodeCount(int nodeId, int count)
|
private record NodeCount(int nodeId, int count)
|
||||||
implements Comparable<NodeCount>
|
implements Comparable<NodeCount>
|
||||||
@@ -30,8 +33,6 @@ public class DomainNodeAllocator {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final PriorityQueue<NodeCount> countPerNode = new PriorityQueue<>();
|
|
||||||
volatile boolean initialized = false;
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public DomainNodeAllocator(NodeConfigurationService nodeConfigurationService, HikariDataSource dataSource) {
|
public DomainNodeAllocator(NodeConfigurationService nodeConfigurationService, HikariDataSource dataSource) {
|
||||||
@@ -43,6 +44,43 @@ public class DomainNodeAllocator {
|
|||||||
.start(this::initialize);
|
.start(this::initialize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized int totalCount() {
|
||||||
|
ensureInitialized();
|
||||||
|
return countPerNode.stream().mapToInt(NodeCount::count).sum();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns the next node ID to assign a domain to.
|
||||||
|
* This method is synchronized to ensure thread safety when multiple threads are allocating domains.
|
||||||
|
* The node ID returned is guaranteed to be one of the viable nodes configured in the system.
|
||||||
|
*/
|
||||||
|
public synchronized int nextNodeId() {
|
||||||
|
ensureInitialized();
|
||||||
|
|
||||||
|
// Synchronized is fine here as this is not a hot path
|
||||||
|
// (and PriorityBlockingQueue won't help since we're re-adding the same element with a new count all the time)
|
||||||
|
|
||||||
|
NodeCount allocation = countPerNode.remove();
|
||||||
|
countPerNode.add(allocation.incrementCount());
|
||||||
|
return allocation.nodeId();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void ensureInitialized() {
|
||||||
|
if (initialized) return;
|
||||||
|
|
||||||
|
synchronized (this) {
|
||||||
|
while (!initialized) {
|
||||||
|
try {
|
||||||
|
// Wait until the initialization is complete
|
||||||
|
this.wait(1000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new RuntimeException("DomainAllocator initialization interrupted", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public void initialize() {
|
public void initialize() {
|
||||||
if (initialized) return;
|
if (initialized) return;
|
||||||
@@ -89,39 +127,5 @@ public class DomainNodeAllocator {
|
|||||||
initialized = true;
|
initialized = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void ensureInitialized() {
|
|
||||||
if (initialized) return;
|
|
||||||
|
|
||||||
synchronized (this) {
|
|
||||||
while (!initialized) {
|
|
||||||
try {
|
|
||||||
// Wait until the initialization is complete
|
|
||||||
this.wait(1000);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new RuntimeException("DomainAllocator initialization interrupted", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized int totalCount() {
|
|
||||||
ensureInitialized();
|
|
||||||
return countPerNode.stream().mapToInt(NodeCount::count).sum();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns the next node ID to assign a domain to.
|
|
||||||
* This method is synchronized to ensure thread safety when multiple threads are allocating domains.
|
|
||||||
* The node ID returned is guaranteed to be one of the viable nodes configured in the system.
|
|
||||||
*/
|
|
||||||
public synchronized int nextNodeId() {
|
|
||||||
ensureInitialized();
|
|
||||||
|
|
||||||
// Synchronized is fine here as this is not a hot path
|
|
||||||
// (and PriorityBlockingQueue won't help since we're re-adding the same element with a new count all the time)
|
|
||||||
|
|
||||||
NodeCount allocation = countPerNode.remove();
|
|
||||||
countPerNode.add(allocation.incrementCount());
|
|
||||||
return allocation.nodeId();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@@ -13,7 +13,9 @@ import java.util.concurrent.ArrayBlockingQueue;
|
|||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
public class DomainTestingQueue {
|
public class DomainTestingQueue {
|
||||||
private final ArrayBlockingQueue<DomainToTest> queue = new ArrayBlockingQueue<>(1000);
|
private static Logger logger = LoggerFactory.getLogger(DomainTestingQueue.class);
|
||||||
|
|
||||||
|
private final ArrayBlockingQueue<DomainToTest> queue = new ArrayBlockingQueue<>(2);
|
||||||
|
|
||||||
// This will grow quite large, but should be manageable in memory, as theoretical maximum is around 100M domains,
|
// This will grow quite large, but should be manageable in memory, as theoretical maximum is around 100M domains,
|
||||||
// order of 2 GB in memory.
|
// order of 2 GB in memory.
|
||||||
@@ -21,7 +23,6 @@ public class DomainTestingQueue {
|
|||||||
|
|
||||||
private final HikariDataSource dataSource;
|
private final HikariDataSource dataSource;
|
||||||
|
|
||||||
private static Logger logger = LoggerFactory.getLogger(DomainTestingQueue.class);
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public DomainTestingQueue(HikariDataSource dataSource) {
|
public DomainTestingQueue(HikariDataSource dataSource) {
|
||||||
|
@@ -84,23 +84,10 @@ public class NdpMain extends ProcessMainClass {
|
|||||||
hb.progress("Discovery Process", cnt, toInsertCount);
|
hb.progress("Discovery Process", cnt, toInsertCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
var nextDomain = domainTestingQueue.next();
|
final DomainToTest nextDomain = domainTestingQueue.next();
|
||||||
threadPool.submit(() -> evaluateDomain(nextDomain));
|
threadPool.submit(() -> {
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
threadPool.shutDown();
|
|
||||||
// Wait for all tasks to complete or give up after 1 hour
|
|
||||||
threadPool.awaitTermination(1, TimeUnit.HOURS);
|
|
||||||
|
|
||||||
logger.info("NDP process completed. Total domains processed: " + domainCount.get());
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private void evaluateDomain(DomainToTest nextDomain) {
|
|
||||||
try {
|
try {
|
||||||
if (domainEvaluator.evaluateDomain(nextDomain)) {
|
if (domainEvaluator.evaluateDomain(nextDomain.domainName())) {
|
||||||
logger.info("Accepting: {}", nextDomain.domainName());
|
logger.info("Accepting: {}", nextDomain.domainName());
|
||||||
domainCount.incrementAndGet();
|
domainCount.incrementAndGet();
|
||||||
domainTestingQueue.accept(nextDomain, domainNodeAllocator.nextNodeId());
|
domainTestingQueue.accept(nextDomain, domainNodeAllocator.nextNodeId());
|
||||||
@@ -113,6 +100,16 @@ public class NdpMain extends ProcessMainClass {
|
|||||||
domainTestingQueue.reject(nextDomain);
|
domainTestingQueue.reject(nextDomain);
|
||||||
logger.error("Error evaluating domain: " + nextDomain.domainId(), e);
|
logger.error("Error evaluating domain: " + nextDomain.domainId(), e);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
threadPool.shutDown();
|
||||||
|
// Wait for all tasks to complete or give up after 1 hour
|
||||||
|
threadPool.awaitTermination(1, TimeUnit.HOURS);
|
||||||
|
|
||||||
|
logger.info("NDP process completed. Total domains processed: " + domainCount.get());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
@@ -0,0 +1,29 @@
|
|||||||
|
package nu.marginalia.ndp;
|
||||||
|
|
||||||
|
import nu.marginalia.coordination.LocalDomainCoordinator;
|
||||||
|
import org.junit.jupiter.api.Tag;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.security.KeyManagementException;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
class DomainEvaluatorTest {
|
||||||
|
|
||||||
|
@Tag("flaky") // Exclude from CI runs due to potential network issues
|
||||||
|
@Test
|
||||||
|
public void testSunnyDay() throws NoSuchAlgorithmException, KeyManagementException {
|
||||||
|
DomainEvaluator evaluator = new DomainEvaluator(new LocalDomainCoordinator());
|
||||||
|
|
||||||
|
// Should be a valid domain
|
||||||
|
assertTrue(evaluator.evaluateDomain("www.marginalia.nu"));
|
||||||
|
|
||||||
|
// Should be a redirect to www.marginalia.nu
|
||||||
|
assertFalse(evaluator.evaluateDomain("memex.marginalia.nu"));
|
||||||
|
|
||||||
|
// Should fail on Anubis
|
||||||
|
assertFalse(evaluator.evaluateDomain("marginalia-search.com"));
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user