1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 17:32:39 +02:00

Compare commits

..

7 Commits

Author SHA1 Message Date
Viktor Lofgren
34df7441ac (crawler) Add some jitter to crawl delay to avoid accidentally synchronized requests 2025-03-27 11:15:16 +01:00
Viktor Lofgren
5387e2bd80 (crawler) Adjust crawl order to get a better mixture of domains 2025-03-27 11:12:48 +01:00
Viktor Lofgren
0f3b24d0f8 (crawler) Evaluate virtual threads for the crawler
The change also alters SimpleBlockingThreadPool to add the option to use virtual threads instead of platform threads.
2025-03-27 11:02:21 +01:00
Viktor Lofgren
a732095d2a (crawler) Improve crawl task ordering
Further improve the ordering of the crawl tasks in order to ensure that potentially blocking tasks are enqueued as soon as possible.
2025-03-26 16:51:37 +01:00
Viktor Lofgren
6607f0112f (crawler) Improve how the crawler deals with interruptions
In some cases, it threads would previously fail to terminate when interrupted.
2025-03-26 16:19:57 +01:00
Viktor Lofgren
4913730de9 (jdk) Upgrade to Java 24 2025-03-26 13:26:06 +01:00
Viktor Lofgren
1db64f9d56 (chore) Fix zookeeper test by upgrading zk image version.
Test suddenly broke due to the increasing entropy of the universe.
2025-03-26 11:47:14 +01:00
9 changed files with 76 additions and 41 deletions

View File

@@ -43,12 +43,11 @@ subprojects.forEach {it ->
} }
ext { ext {
jvmVersion=23 jvmVersion = 24
dockerImageBase='container-registry.oracle.com/graalvm/jdk:23' dockerImageBase='container-registry.oracle.com/graalvm/jdk:24'
dockerImageTag='latest' dockerImageTag='latest'
dockerImageRegistry='marginalia' dockerImageRegistry='marginalia'
jibVersion = '3.4.4' jibVersion = '3.4.4'
} }
idea { idea {

View File

@@ -14,7 +14,7 @@ public class EdgeDomain implements Serializable {
@Nonnull @Nonnull
public final String topDomain; public final String topDomain;
public EdgeDomain(String host) { public EdgeDomain(@Nonnull String host) {
Objects.requireNonNull(host, "domain name must not be null"); Objects.requireNonNull(host, "domain name must not be null");
host = host.toLowerCase(); host = host.toLowerCase();
@@ -61,6 +61,10 @@ public class EdgeDomain implements Serializable {
this.topDomain = topDomain; this.topDomain = topDomain;
} }
public static String getTopDomain(String host) {
return new EdgeDomain(host).topDomain;
}
private boolean looksLikeGovTld(String host) { private boolean looksLikeGovTld(String host) {
if (host.length() < 8) if (host.length() < 8)
return false; return false;
@@ -116,24 +120,6 @@ public class EdgeDomain implements Serializable {
return topDomain.substring(0, cutPoint).toLowerCase(); return topDomain.substring(0, cutPoint).toLowerCase();
} }
public String getLongDomainKey() {
StringBuilder ret = new StringBuilder();
int cutPoint = topDomain.indexOf('.');
if (cutPoint < 0) {
ret.append(topDomain);
} else {
ret.append(topDomain, 0, cutPoint);
}
if (!subDomain.isEmpty() && !"www".equals(subDomain)) {
ret.append(":");
ret.append(subDomain);
}
return ret.toString().toLowerCase();
}
/** If possible, try to provide an alias domain, /** If possible, try to provide an alias domain,
* i.e. a domain name that is very likely to link to this one * i.e. a domain name that is very likely to link to this one
* */ * */

View File

@@ -25,7 +25,7 @@ import static org.mockito.Mockito.when;
class ZkServiceRegistryTest { class ZkServiceRegistryTest {
private static final int ZOOKEEPER_PORT = 2181; private static final int ZOOKEEPER_PORT = 2181;
private static final GenericContainer<?> zookeeper = private static final GenericContainer<?> zookeeper =
new GenericContainer<>("zookeeper:3.8.0") new GenericContainer<>("zookeeper:3.8")
.withExposedPorts(ZOOKEEPER_PORT); .withExposedPorts(ZOOKEEPER_PORT);
List<ZkServiceRegistry> registries = new ArrayList<>(); List<ZkServiceRegistry> registries = new ArrayList<>();

View File

@@ -23,16 +23,33 @@ public class SimpleBlockingThreadPool {
private final Logger logger = LoggerFactory.getLogger(SimpleBlockingThreadPool.class); private final Logger logger = LoggerFactory.getLogger(SimpleBlockingThreadPool.class);
public SimpleBlockingThreadPool(String name, int poolSize, int queueSize) { public SimpleBlockingThreadPool(String name, int poolSize, int queueSize) {
this(name, poolSize, queueSize, ThreadType.PLATFORM);
}
public SimpleBlockingThreadPool(String name, int poolSize, int queueSize, ThreadType threadType) {
tasks = new ArrayBlockingQueue<>(queueSize); tasks = new ArrayBlockingQueue<>(queueSize);
for (int i = 0; i < poolSize; i++) { for (int i = 0; i < poolSize; i++) {
Thread worker = new Thread(this::worker, name + "[" + i + "]");
worker.setDaemon(true); Thread.Builder threadBuilder = switch (threadType) {
worker.start(); case VIRTUAL -> Thread.ofVirtual();
case PLATFORM -> Thread.ofPlatform().daemon(true);
};
Thread worker = threadBuilder
.name(name + "[" + i + "]")
.start(this::worker);
workers.add(worker); workers.add(worker);
} }
} }
public enum ThreadType {
VIRTUAL,
PLATFORM
}
public void submit(Task task) throws InterruptedException { public void submit(Task task) throws InterruptedException {
tasks.put(task); tasks.put(task);
} }

View File

@@ -41,10 +41,7 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.security.Security; import java.security.Security;
import java.util.ArrayList; import java.util.*;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@@ -108,7 +105,8 @@ public class CrawlerMain extends ProcessMainClass {
pool = new SimpleBlockingThreadPool("CrawlerPool", pool = new SimpleBlockingThreadPool("CrawlerPool",
Integer.getInteger("crawler.poolSize", 256), Integer.getInteger("crawler.poolSize", 256),
1); 1,
SimpleBlockingThreadPool.ThreadType.VIRTUAL);
// Wait for the blacklist to be loaded before starting the crawl // Wait for the blacklist to be loaded before starting the crawl
@@ -224,10 +222,7 @@ public class CrawlerMain extends ProcessMainClass {
logger.info("Loaded {} domains", crawlSpecRecords.size()); logger.info("Loaded {} domains", crawlSpecRecords.size());
// Shuffle the domains to ensure we get a good mix of domains in each crawl, crawlSpecRecords.sort(crawlSpecArrangement(crawlSpecRecords));
// so that e.g. the big domains don't get all crawled at once, or we end up
// crawling the same server in parallel from different subdomains...
Collections.shuffle(crawlSpecRecords);
// First a validation run to ensure the file is all good to parse // First a validation run to ensure the file is all good to parse
if (crawlSpecRecords.isEmpty()) { if (crawlSpecRecords.isEmpty()) {
@@ -306,6 +301,30 @@ public class CrawlerMain extends ProcessMainClass {
} }
} }
/** Create a comparator that sorts the crawl specs in a way that is beneficial for the crawl,
* we want to enqueue domains that have common top domains first, but otherwise have a random
* order.
* <p></p>
* Note, we can't use hash codes for randomization as it is not desirable to have the same order
* every time the process is restarted (and CrawlSpecRecord is a record, which defines equals and
* hashcode based on the fields).
* */
private Comparator<CrawlSpecRecord> crawlSpecArrangement(List<CrawlSpecRecord> records) {
Random r = new Random();
Map<String, Integer> topDomainCounts = new HashMap<>(4 + (int) Math.sqrt(records.size()));
Map<String, Integer> randomOrder = new HashMap<>(records.size());
for (var spec : records) {
topDomainCounts.merge(EdgeDomain.getTopDomain(spec.domain), 1, Integer::sum);
randomOrder.put(spec.domain, r.nextInt());
}
return Comparator.comparing((CrawlSpecRecord spec) -> topDomainCounts.getOrDefault(EdgeDomain.getTopDomain(spec.domain), 0) >= 8)
.reversed()
.thenComparing(spec -> randomOrder.get(spec.domain))
.thenComparing(Record::hashCode); // non-deterministic tie-breaker to
}
/** Submit a task for execution if it can be run, returns true if it was submitted /** Submit a task for execution if it can be run, returns true if it was submitted
* or if it can be discarded */ * or if it can be discarded */
private boolean trySubmitDeferredTask(CrawlTask task) { private boolean trySubmitDeferredTask(CrawlTask task) {

View File

@@ -61,7 +61,7 @@ public class HttpFetcherImpl implements HttpFetcher {
.cookieHandler(cookies) .cookieHandler(cookies)
.followRedirects(HttpClient.Redirect.NORMAL) .followRedirects(HttpClient.Redirect.NORMAL)
.connectTimeout(Duration.ofSeconds(8)) .connectTimeout(Duration.ofSeconds(8))
.executor(Executors.newCachedThreadPool()) .executor(Executors.newVirtualThreadPerTaskExecutor())
.build(); .build();
} }

View File

@@ -3,6 +3,7 @@ package nu.marginalia.crawl.retreival;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
import static java.lang.Math.max; import static java.lang.Math.max;
import static java.lang.Math.min; import static java.lang.Math.min;
@@ -53,12 +54,13 @@ public class CrawlDelayTimer {
public void waitFetchDelay(long spentTime) { public void waitFetchDelay(long spentTime) {
long sleepTime = delayTime; long sleepTime = delayTime;
long jitter = ThreadLocalRandom.current().nextLong(0, 150);
try { try {
if (sleepTime >= 1) { if (sleepTime >= 1) {
if (spentTime > sleepTime) if (spentTime > sleepTime)
return; return;
Thread.sleep(min(sleepTime - spentTime, 5000)); Thread.sleep(min(sleepTime - spentTime, 5000) + jitter);
} else { } else {
// When no crawl delay is specified, lean toward twice the fetch+process time, // When no crawl delay is specified, lean toward twice the fetch+process time,
// within sane limits. This means slower servers get slower crawling, and faster // within sane limits. This means slower servers get slower crawling, and faster
@@ -71,17 +73,17 @@ public class CrawlDelayTimer {
if (spentTime > sleepTime) if (spentTime > sleepTime)
return; return;
Thread.sleep(sleepTime - spentTime); Thread.sleep(sleepTime - spentTime + jitter);
} }
if (slowDown) { if (slowDown) {
// Additional delay when the server is signalling it wants slower requests // Additional delay when the server is signalling it wants slower requests
Thread.sleep(DEFAULT_CRAWL_DELAY_MIN_MS); Thread.sleep(DEFAULT_CRAWL_DELAY_MIN_MS + jitter);
} }
} }
catch (InterruptedException e) { catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new RuntimeException(); throw new RuntimeException("Interrupted", e);
} }
} }
} }

View File

@@ -108,6 +108,11 @@ public class CrawlerRetreiver implements AutoCloseable {
DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(probedUrl, delayTimer); DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(probedUrl, delayTimer);
domainStateDb.save(summaryRecord); domainStateDb.save(summaryRecord);
if (Thread.interrupted()) {
// There's a small chance we're interrupted during the sniffing portion
throw new InterruptedException();
}
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified // Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) { if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) {
// If we have reference data, we will always grow the crawl depth a bit // If we have reference data, we will always grow the crawl depth a bit
@@ -140,7 +145,6 @@ public class CrawlerRetreiver implements AutoCloseable {
CrawlDelayTimer delayTimer, CrawlDelayTimer delayTimer,
DomainLinks domainLinks) { DomainLinks domainLinks) {
// Add external links to the crawl frontier // Add external links to the crawl frontier
crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto)); crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto));
@@ -289,6 +293,10 @@ public class CrawlerRetreiver implements AutoCloseable {
} }
catch (Exception ex) { catch (Exception ex) {
logger.error("Error configuring link filter", ex); logger.error("Error configuring link filter", ex);
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
return DomainStateDb.SummaryRecord.forError(domain, "Crawler Interrupted", ex.getMessage());
}
} }
finally { finally {
crawlFrontier.addVisited(rootUrl); crawlFrontier.addVisited(rootUrl);

View File

@@ -46,6 +46,10 @@ public class CrawlerRevisitor {
break; break;
} }
if (Thread.interrupted()) {
throw new InterruptedException();
}
var urlMaybe = EdgeUrl.parse(doc.url); var urlMaybe = EdgeUrl.parse(doc.url);
if (urlMaybe.isEmpty()) if (urlMaybe.isEmpty())
continue; continue;