1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 17:32:39 +02:00

Compare commits

...

1 Commits

Author SHA1 Message Date
Viktor Lofgren
9f18ced73d (crawler) Improve deferred task behavior 2025-03-18 12:54:18 +01:00

View File

@@ -41,7 +41,10 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.security.Security;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -248,54 +251,34 @@ public class CrawlerMain extends ProcessMainClass {
// List of deferred tasks used to ensure beneficial scheduling of domains with regard to DomainLocks,
// merely shuffling the domains tends to lead to a lot of threads being blocked waiting for a semphore,
// this will more aggressively attempt to schedule the jobs to avoid blocking
List<CrawlTask> deferredTasks = new LinkedList<>();
List<CrawlTask> taskList = new ArrayList<>();
// Create crawl tasks and submit them to the pool for execution
// Create crawl tasks
for (CrawlSpecRecord crawlSpec : crawlSpecRecords) {
if (workLog.isJobFinished(crawlSpec.domain()))
if (workLog.isJobFinished(crawlSpec.domain))
continue;
// Add to the end of the deferral list
deferredTasks.addLast(new CrawlTask(
var task = new CrawlTask(
crawlSpec,
anchorTagsSource,
outputDir,
warcArchiver,
domainStateDb,
workLog));
workLog);
// Start every task we currently can from the deferral list
deferredTasks.removeIf(task -> {
if (task.canRun()) {
if (pendingCrawlTasks.putIfAbsent(task.domain, task) != null) {
return true; // task has already run, duplicate in crawl specs
}
// This blocks the caller when the pool is full
pool.submitQuietly(task);
return true;
}
return false;
});
// Try to run immediately, to avoid unnecessarily keeping the entire work set in RAM
if (!trySubmitDeferredTask(task)) {
// Otherwise add to the taskList for deferred execution
taskList.add(task);
}
}
// Schedule viable tasks for execution until list is empty
while (!taskList.isEmpty()) {
taskList.removeIf(this::trySubmitDeferredTask);
// Schedule any lingering tasks for immediate execution until none exist
while (!deferredTasks.isEmpty()) {
deferredTasks.removeIf(task -> {
if (task.canRun()) {
if (pendingCrawlTasks.putIfAbsent(task.domain, task) != null) {
return true; // task has already run, duplicate in crawl specs
}
// This blocks the caller when the pool is full
pool.submitQuietly(task);
return true;
}
return false;
});
// Add a small pause here to avoid busy looping toward the end of the execution cycle when
// we might have no new viable tasks to run for hours on end
TimeUnit.MILLISECONDS.sleep(50);
}
@@ -323,6 +306,28 @@ public class CrawlerMain extends ProcessMainClass {
}
}
/** Submit a task for execution if it can be run, returns true if it was submitted
* or if it can be discarded */
private boolean trySubmitDeferredTask(CrawlTask task) {
if (!task.canRun()) {
return false;
}
if (pendingCrawlTasks.putIfAbsent(task.domain, task) != null) {
return true; // task has already run, duplicate in crawl specs
}
try {
// This blocks the caller when the pool is full
pool.submitQuietly(task);
return true;
}
catch (RuntimeException ex) {
logger.error("Failed to submit task " + task.domain, ex);
return false;
}
}
public void runForSingleDomain(String targetDomainName, FileStorageId fileStorageId) throws Exception {
runForSingleDomain(targetDomainName, fileStorageService.getStorage(fileStorageId).asPath());
}
@@ -388,6 +393,11 @@ public class CrawlerMain extends ProcessMainClass {
@Override
public void run() throws Exception {
if (workLog.isJobFinished(domain)) { // No-Op
logger.info("Omitting task {}, as it is already run", domain);
return;
}
Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE);
Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP);
Path slopFile = CrawlerOutputFile.createSlopPath(outputDir, id, domain);
@@ -442,7 +452,7 @@ public class CrawlerMain extends ProcessMainClass {
logger.error("Error fetching domain " + domain, e);
}
finally {
// We don't need to double-count these; it's also kept int he workLog
// We don't need to double-count these; it's also kept in the workLog
pendingCrawlTasks.remove(domain);
Thread.currentThread().setName("[idle]");