mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-05 21:22:39 +02:00
Compare commits
2 Commits
deploy-013
...
deploy-014
Author | SHA1 | Date | |
---|---|---|---|
|
a7d91c8527 | ||
|
7151602124 |
@@ -66,6 +66,7 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
private final DomainLocks domainLocks = new DomainLocks();
|
||||
|
||||
private final Map<String, CrawlTask> pendingCrawlTasks = new ConcurrentHashMap<>();
|
||||
|
||||
private final LinkedBlockingQueue<CrawlTask> retryQueue = new LinkedBlockingQueue<>();
|
||||
|
||||
private final AtomicInteger tasksDone = new AtomicInteger(0);
|
||||
@@ -433,7 +434,7 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
/** Best effort indicator whether we could start this now without getting stuck in
|
||||
* DomainLocks purgatory */
|
||||
public boolean canRun() {
|
||||
return domainLocks.canLock(new EdgeDomain(domain));
|
||||
return domainLocks.isLockableHint(new EdgeDomain(domain));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -456,12 +457,13 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
}
|
||||
|
||||
retryQueue.put(this);
|
||||
Thread.currentThread().setName("[idle]");
|
||||
return;
|
||||
}
|
||||
DomainLocks.DomainLock domainLock = lock.get();
|
||||
|
||||
try (domainLock) {
|
||||
Thread.currentThread().setName("crawling:" + domain);
|
||||
|
||||
Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE);
|
||||
Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP);
|
||||
Path slopFile = CrawlerOutputFile.createSlopPath(outputDir, id, domain);
|
||||
|
@@ -53,6 +53,7 @@ import java.net.SocketTimeoutException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@@ -393,25 +394,31 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
|
||||
if (probeType == HttpFetcher.ProbeType.FULL) {
|
||||
try {
|
||||
var probeResult = probeContentType(url, cookies, timer, contentTags);
|
||||
logger.info(crawlerAuditMarker, "Probe result {} for {}", probeResult.getClass().getSimpleName(), url);
|
||||
|
||||
switch (probeResult) {
|
||||
case HttpFetcher.ContentTypeProbeResult.NoOp():
|
||||
break; //
|
||||
case HttpFetcher.ContentTypeProbeResult.Ok(EdgeUrl resolvedUrl):
|
||||
logger.info(crawlerAuditMarker, "Probe result OK for {}", url);
|
||||
url = resolvedUrl; // If we were redirected while probing, use the final URL for fetching
|
||||
break;
|
||||
case ContentTypeProbeResult.BadContentType badContentType:
|
||||
warcRecorder.flagAsFailedContentTypeProbe(url, badContentType.contentType(), badContentType.statusCode());
|
||||
logger.info(crawlerAuditMarker, "Probe result Bad ContenType ({}) for {}", badContentType.contentType(), url);
|
||||
return new HttpFetchResult.ResultNone();
|
||||
case ContentTypeProbeResult.BadContentType.Timeout(Exception ex):
|
||||
logger.info(crawlerAuditMarker, "Probe result Timeout for {}", url);
|
||||
warcRecorder.flagAsTimeout(url);
|
||||
return new HttpFetchResult.ResultException(ex);
|
||||
case ContentTypeProbeResult.Exception(Exception ex):
|
||||
logger.info(crawlerAuditMarker, "Probe result Exception({}) for {}", ex.getClass().getSimpleName(), url);
|
||||
warcRecorder.flagAsError(url, ex);
|
||||
return new HttpFetchResult.ResultException(ex);
|
||||
case ContentTypeProbeResult.HttpError httpError:
|
||||
logger.info(crawlerAuditMarker, "Probe result HTTP Error ({}) for {}", httpError.statusCode(), url);
|
||||
return new HttpFetchResult.ResultException(new HttpException("HTTP status code " + httpError.statusCode() + ": " + httpError.message()));
|
||||
case ContentTypeProbeResult.Redirect redirect:
|
||||
logger.info(crawlerAuditMarker, "Probe result redirect for {} -> {}", url, redirect.location());
|
||||
return new HttpFetchResult.ResultRedirect(redirect.location());
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
@@ -430,27 +437,32 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
|
||||
contentTags.paint(request);
|
||||
|
||||
try (var sl = new SendLock()) {
|
||||
Instant start = Instant.now();
|
||||
HttpFetchResult result = warcRecorder.fetch(client, cookies, request);
|
||||
|
||||
Duration fetchDuration = Duration.between(start, Instant.now());
|
||||
|
||||
if (result instanceof HttpFetchResult.ResultOk ok) {
|
||||
if (ok.statusCode() == 304) {
|
||||
return new HttpFetchResult.Result304Raw();
|
||||
result = new HttpFetchResult.Result304Raw();
|
||||
}
|
||||
}
|
||||
|
||||
switch (result) {
|
||||
case HttpFetchResult.ResultOk ok -> logger.info(crawlerAuditMarker, "Fetch result OK {} for {}", ok.statusCode(), url);
|
||||
case HttpFetchResult.ResultOk ok -> logger.info(crawlerAuditMarker, "Fetch result OK {} for {} ({} ms)", ok.statusCode(), url, fetchDuration.toMillis());
|
||||
case HttpFetchResult.ResultRedirect redirect -> logger.info(crawlerAuditMarker, "Fetch result redirect: {} for {}", redirect.url(), url);
|
||||
case HttpFetchResult.ResultNone none -> logger.info(crawlerAuditMarker, "Fetch result none for {}", url);
|
||||
case HttpFetchResult.ResultException ex -> logger.error(crawlerAuditMarker, "Fetch result exception for " + url + ": {}", ex.ex());
|
||||
case HttpFetchResult.ResultNone none -> logger.info(crawlerAuditMarker, "Fetch result none for {}", url);
|
||||
case HttpFetchResult.ResultException ex -> logger.error(crawlerAuditMarker, "Fetch result exception for {}", url, ex.ex());
|
||||
case HttpFetchResult.Result304Raw raw -> logger.info(crawlerAuditMarker, "Fetch result: 304 Raw for {}", url);
|
||||
case HttpFetchResult.Result304ReplacedWithReference ref -> logger.info(crawlerAuditMarker, "Fetch result: 304 With reference for {}", url);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
catch (Exception ex) {
|
||||
ex.printStackTrace();
|
||||
logger.error(crawlerAuditMarker, "Fetch result exception for {}", url, ex);
|
||||
|
||||
return new HttpFetchResult.ResultException(ex);
|
||||
}
|
||||
|
||||
|
@@ -20,16 +20,17 @@ public class DomainLocks {
|
||||
* and may be held by another thread. The caller is responsible for locking and releasing the lock.
|
||||
*/
|
||||
public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException {
|
||||
var ret = new DomainLock(domain.toString(),
|
||||
locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits));
|
||||
ret.lock();
|
||||
return ret;
|
||||
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
|
||||
|
||||
sem.acquire();
|
||||
|
||||
return new DomainLock(sem);
|
||||
}
|
||||
|
||||
public Optional<DomainLock> tryLockDomain(EdgeDomain domain) {
|
||||
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
|
||||
if (sem.tryAcquire(1)) {
|
||||
return Optional.of(new DomainLock(domain.toString(), sem));
|
||||
return Optional.of(new DomainLock(sem));
|
||||
}
|
||||
else {
|
||||
// We don't have a lock, so we return an empty optional
|
||||
@@ -58,7 +59,11 @@ public class DomainLocks {
|
||||
return new Semaphore(2);
|
||||
}
|
||||
|
||||
public boolean canLock(EdgeDomain domain) {
|
||||
/** Returns true if the domain is lockable, i.e. if it is not already locked by another thread.
|
||||
* (this is just a hint, and does not guarantee that the domain is actually lockable any time
|
||||
* after this method returns true)
|
||||
*/
|
||||
public boolean isLockableHint(EdgeDomain domain) {
|
||||
Semaphore sem = locks.get(domain.topDomain.toLowerCase());
|
||||
if (null == sem)
|
||||
return true;
|
||||
@@ -67,25 +72,16 @@ public class DomainLocks {
|
||||
}
|
||||
|
||||
public static class DomainLock implements AutoCloseable {
|
||||
private final String domainName;
|
||||
private final Semaphore semaphore;
|
||||
|
||||
DomainLock(String domainName, Semaphore semaphore) {
|
||||
this.domainName = domainName;
|
||||
DomainLock(Semaphore semaphore) {
|
||||
this.semaphore = semaphore;
|
||||
}
|
||||
|
||||
// This method is called to lock the domain. It will block until the lock is available.
|
||||
private void lock() throws InterruptedException {
|
||||
Thread.currentThread().setName("crawling:" + domainName + " [await domain lock]");
|
||||
semaphore.acquire();
|
||||
Thread.currentThread().setName("crawling:" + domainName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
semaphore.release();
|
||||
Thread.currentThread().setName("crawling:" + domainName + " [wrapping up]");
|
||||
Thread.currentThread().setName("[idle]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user