1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-05 21:22:39 +02:00

Compare commits

...

3 Commits

Author SHA1 Message Date
Viktor Lofgren
e84d5c497a (crawler) Reduce the likelihood of crawler tasks locking on domains before they are ready
Change to a bounded queue and adding a sleep to reduce the amount of effectively busy looping threads.
2025-04-21 00:39:26 +02:00
Viktor Lofgren
2d2d3e2466 (crawler) Reduce the likelihood of crawler tasks locking on domains before they are ready
Change to a bounded queue and adding a sleep to reduce the amount of effectively busy looping threads.
2025-04-21 00:36:48 +02:00
Viktor Lofgren
647dd9b12f (crawler) Reduce the likelihood of crawler tasks locking on domains before they are ready 2025-04-21 00:24:30 +02:00
2 changed files with 102 additions and 56 deletions

View File

@@ -42,6 +42,7 @@ import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.security.Security;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -65,6 +66,7 @@ public class CrawlerMain extends ProcessMainClass {
private final DomainLocks domainLocks = new DomainLocks();
private final Map<String, CrawlTask> pendingCrawlTasks = new ConcurrentHashMap<>();
private final ArrayBlockingQueue<CrawlTask> retryQueue = new ArrayBlockingQueue<>(64);
private final AtomicInteger tasksDone = new AtomicInteger(0);
private final HttpFetcherImpl fetcher;
@@ -277,12 +279,26 @@ public class CrawlerMain extends ProcessMainClass {
}
// Schedule viable tasks for execution until list is empty
while (!taskList.isEmpty()) {
taskList.removeIf(this::trySubmitDeferredTask);
for (int emptyRuns = 0;emptyRuns < 300;) {
boolean hasTasks = !taskList.isEmpty();
// Add a small pause here to avoid busy looping toward the end of the execution cycle when
// we might have no new viable tasks to run for hours on end
TimeUnit.MILLISECONDS.sleep(50);
// The order of these checks very important to avoid a race condition
// where we miss a task that is put into the retry queue
boolean hasRunningTasks = pool.getActiveCount() > 0;
boolean hasRetryTasks = !retryQueue.isEmpty();
if (hasTasks || hasRetryTasks || hasRunningTasks) {
retryQueue.drainTo(taskList);
taskList.removeIf(this::trySubmitDeferredTask);
// Add a small pause here to avoid busy looping toward the end of the execution cycle when
// we might have no new viable tasks to run for hours on end
TimeUnit.MILLISECONDS.sleep(50);
} else {
// We have no tasks to run, and no tasks in the retry queue
// but we wait a bit to see if any new tasks come in via the retry queue
emptyRuns++;
TimeUnit.SECONDS.sleep(1);
}
}
logger.info("Shutting down the pool, waiting for tasks to complete...");
@@ -425,66 +441,79 @@ public class CrawlerMain extends ProcessMainClass {
return;
}
Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE);
Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP);
Path slopFile = CrawlerOutputFile.createSlopPath(outputDir, id, domain);
// Move the WARC file to a temp file if it exists, so we can resume the crawl using the old data
// while writing to the same file name as before
if (Files.exists(newWarcFile)) {
Files.move(newWarcFile, tempFile, StandardCopyOption.REPLACE_EXISTING);
}
else {
Files.deleteIfExists(tempFile);
}
try (var warcRecorder = new WarcRecorder(newWarcFile); // write to a temp file for now
var retriever = new CrawlerRetreiver(fetcher, domainProber, specification, domainStateDb, warcRecorder);
CrawlDataReference reference = getReference()
)
{
// Resume the crawl if it was aborted
if (Files.exists(tempFile)) {
retriever.syncAbortedRun(tempFile);
Files.delete(tempFile);
Optional<DomainLocks.DomainLock> lock = domainLocks.tryLockDomain(new EdgeDomain(domain));
// We don't have a lock, so we can't run this task
// we return to avoid blocking the pool for too long
if (lock.isEmpty()) {
if (retryQueue.remainingCapacity() > 0) {
// Sleep a moment to avoid busy looping via the retry queue
// in the case when few tasks remain
Thread.sleep(10);
}
DomainLinks domainLinks = anchorTagsSource.getAnchorTags(domain);
retryQueue.put(this);
return;
}
DomainLocks.DomainLock domainLock = lock.get();
int size;
try (var lock = domainLocks.lockDomain(new EdgeDomain(domain))) {
size = retriever.crawlDomain(domainLinks, reference);
try (domainLock) {
Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE);
Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP);
Path slopFile = CrawlerOutputFile.createSlopPath(outputDir, id, domain);
// Move the WARC file to a temp file if it exists, so we can resume the crawl using the old data
// while writing to the same file name as before
if (Files.exists(newWarcFile)) {
Files.move(newWarcFile, tempFile, StandardCopyOption.REPLACE_EXISTING);
}
else {
Files.deleteIfExists(tempFile);
}
// Delete the reference crawl data if it's not the same as the new one
// (mostly a case when migrating from legacy->warc)
reference.delete();
try (var warcRecorder = new WarcRecorder(newWarcFile); // write to a temp file for now
var retriever = new CrawlerRetreiver(fetcher, domainProber, specification, domainStateDb, warcRecorder);
CrawlDataReference reference = getReference())
{
// Resume the crawl if it was aborted
if (Files.exists(tempFile)) {
retriever.syncAbortedRun(tempFile);
Files.delete(tempFile);
}
// Convert the WARC file to Parquet
SlopCrawlDataRecord
.convertWarc(domain, userAgent, newWarcFile, slopFile);
DomainLinks domainLinks = anchorTagsSource.getAnchorTags(domain);
// Optionally archive the WARC file if full retention is enabled,
// otherwise delete it:
warcArchiver.consumeWarc(newWarcFile, domain);
int size = retriever.crawlDomain(domainLinks, reference);
// Mark the domain as finished in the work log
workLog.setJobToFinished(domain, slopFile.toString(), size);
// Delete the reference crawl data if it's not the same as the new one
// (mostly a case when migrating from legacy->warc)
reference.delete();
// Update the progress bar
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
// Convert the WARC file to Parquet
SlopCrawlDataRecord
.convertWarc(domain, userAgent, newWarcFile, slopFile);
logger.info("Fetched {}", domain);
} catch (Exception e) {
logger.error("Error fetching domain " + domain, e);
}
finally {
// We don't need to double-count these; it's also kept in the workLog
pendingCrawlTasks.remove(domain);
Thread.currentThread().setName("[idle]");
// Optionally archive the WARC file if full retention is enabled,
// otherwise delete it:
warcArchiver.consumeWarc(newWarcFile, domain);
Files.deleteIfExists(newWarcFile);
Files.deleteIfExists(tempFile);
// Mark the domain as finished in the work log
workLog.setJobToFinished(domain, slopFile.toString(), size);
// Update the progress bar
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
logger.info("Fetched {}", domain);
} catch (Exception e) {
logger.error("Error fetching domain " + domain, e);
}
finally {
// We don't need to double-count these; it's also kept in the workLog
pendingCrawlTasks.remove(domain);
Thread.currentThread().setName("[idle]");
Files.deleteIfExists(newWarcFile);
Files.deleteIfExists(tempFile);
}
}
}

View File

@@ -3,6 +3,7 @@ package nu.marginalia.crawl.logic;
import nu.marginalia.model.EdgeDomain;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
@@ -19,8 +20,21 @@ public class DomainLocks {
* and may be held by another thread. The caller is responsible for locking and releasing the lock.
*/
public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException {
return new DomainLock(domain.toString(),
var ret = new DomainLock(domain.toString(),
locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits));
ret.lock();
return ret;
}
public Optional<DomainLock> tryLockDomain(EdgeDomain domain) {
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
if (sem.tryAcquire(1)) {
return Optional.of(new DomainLock(domain.toString(), sem));
}
else {
// We don't have a lock, so we return an empty optional
return Optional.empty();
}
}
private Semaphore defaultPermits(String topDomain) {
@@ -56,10 +70,13 @@ public class DomainLocks {
private final String domainName;
private final Semaphore semaphore;
DomainLock(String domainName, Semaphore semaphore) throws InterruptedException {
DomainLock(String domainName, Semaphore semaphore) {
this.domainName = domainName;
this.semaphore = semaphore;
}
// This method is called to lock the domain. It will block until the lock is available.
private void lock() throws InterruptedException {
Thread.currentThread().setName("crawling:" + domainName + " [await domain lock]");
semaphore.acquire();
Thread.currentThread().setName("crawling:" + domainName);