1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-05 21:22:39 +02:00

Compare commits

...

8 Commits

Author SHA1 Message Date
Viktor Lofgren
6b2d18fb9b (crawler) Adjust domain limits to be generally more permissive. 2025-04-22 15:27:57 +02:00
Viktor
59b1d200ab Merge pull request #190 from MarginaliaSearch/download-sample-chores
Download sample chores
2025-04-22 13:29:49 +02:00
Viktor Lofgren
897010a2cf (control) Update download sample data actor with better UI
The original implementation didn't really give a lot of feedback about what it was doing.  Adding a progress bar to the download step.

Relates to issue 189.
2025-04-22 13:27:22 +02:00
Viktor Lofgren
602af7a77e (control) Update UI with new sample sizes
Relates to issue 189.
2025-04-22 13:27:13 +02:00
Viktor Lofgren
a7d91c8527 (crawler) Clean up fetcher detailed logging 2025-04-21 12:53:52 +02:00
Viktor Lofgren
7151602124 (crawler) Reduce the likelihood of crawler tasks locking on domains before they are ready
Cleaning up after changes.
2025-04-21 12:47:03 +02:00
Viktor Lofgren
884e33bd4a (crawler) Reduce the likelihood of crawler tasks locking on domains before they are ready
Change back to an unbounded queue, tighten sleep times a bit.
2025-04-21 11:48:15 +02:00
Viktor Lofgren
e84d5c497a (crawler) Reduce the likelihood of crawler tasks locking on domains before they are ready
Change to a bounded queue and adding a sleep to reduce the amount of effectively busy looping threads.
2025-04-21 00:39:26 +02:00
5 changed files with 91 additions and 44 deletions

View File

@@ -8,6 +8,7 @@ import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorage;
import nu.marginalia.storage.model.FileStorageId;
@@ -19,6 +20,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
@@ -32,6 +34,7 @@ public class DownloadSampleActor extends RecordActorPrototype {
private final FileStorageService storageService;
private final ServiceEventLog eventLog;
private final ServiceHeartbeat heartbeat;
private final Logger logger = LoggerFactory.getLogger(getClass());
@Resume(behavior = ActorResumeBehavior.ERROR)
@@ -66,15 +69,39 @@ public class DownloadSampleActor extends RecordActorPrototype {
Files.deleteIfExists(Path.of(tarFileName));
try (var is = new BufferedInputStream(new URI(downloadURI).toURL().openStream());
var os = new BufferedOutputStream(Files.newOutputStream(Path.of(tarFileName), StandardOpenOption.CREATE))) {
is.transferTo(os);
HttpURLConnection urlConnection = (HttpURLConnection) new URI(downloadURI).toURL().openConnection();
try (var hb = heartbeat.createServiceAdHocTaskHeartbeat("Downloading sample")) {
long size = urlConnection.getContentLengthLong();
byte[] buffer = new byte[8192];
try (var is = new BufferedInputStream(urlConnection.getInputStream());
var os = new BufferedOutputStream(Files.newOutputStream(Path.of(tarFileName), StandardOpenOption.CREATE))) {
long copiedSize = 0;
while (copiedSize < size) {
int read = is.read(buffer);
if (read < 0) // We've been promised a file of length 'size'
throw new IOException("Unexpected end of stream");
os.write(buffer, 0, read);
copiedSize += read;
// Update progress bar
hb.progress(String.format("%d MB", copiedSize / 1024 / 1024), (int) (copiedSize / 1024), (int) (size / 1024));
}
}
}
catch (Exception ex) {
eventLog.logEvent(DownloadSampleActor.class, "Error downloading sample");
logger.error("Error downloading sample", ex);
yield new Error();
}
finally {
urlConnection.disconnect();
}
eventLog.logEvent(DownloadSampleActor.class, "Download complete");
yield new Extract(fileStorageId, tarFileName);
@@ -170,11 +197,12 @@ public class DownloadSampleActor extends RecordActorPrototype {
@Inject
public DownloadSampleActor(Gson gson,
FileStorageService storageService,
ServiceEventLog eventLog)
ServiceEventLog eventLog, ServiceHeartbeat heartbeat)
{
super(gson);
this.storageService = storageService;
this.eventLog = eventLog;
this.heartbeat = heartbeat;
}
}

View File

@@ -42,8 +42,8 @@ import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.security.Security;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -66,7 +66,8 @@ public class CrawlerMain extends ProcessMainClass {
private final DomainLocks domainLocks = new DomainLocks();
private final Map<String, CrawlTask> pendingCrawlTasks = new ConcurrentHashMap<>();
private final ArrayBlockingQueue<CrawlTask> retryQueue = new ArrayBlockingQueue<>(64);
private final LinkedBlockingQueue<CrawlTask> retryQueue = new LinkedBlockingQueue<>();
private final AtomicInteger tasksDone = new AtomicInteger(0);
private final HttpFetcherImpl fetcher;
@@ -289,10 +290,13 @@ public class CrawlerMain extends ProcessMainClass {
if (hasTasks || hasRetryTasks || hasRunningTasks) {
retryQueue.drainTo(taskList);
// Try to submit any tasks that are in the retry queue (this will block if the pool is full)
taskList.removeIf(this::trySubmitDeferredTask);
// Add a small pause here to avoid busy looping toward the end of the execution cycle when
// we might have no new viable tasks to run for hours on end
TimeUnit.MILLISECONDS.sleep(50);
TimeUnit.MILLISECONDS.sleep(5);
} else {
// We have no tasks to run, and no tasks in the retry queue
// but we wait a bit to see if any new tasks come in via the retry queue
@@ -430,7 +434,7 @@ public class CrawlerMain extends ProcessMainClass {
/** Best effort indicator whether we could start this now without getting stuck in
* DomainLocks purgatory */
public boolean canRun() {
return domainLocks.canLock(new EdgeDomain(domain));
return domainLocks.isLockableHint(new EdgeDomain(domain));
}
@Override
@@ -445,14 +449,21 @@ public class CrawlerMain extends ProcessMainClass {
// We don't have a lock, so we can't run this task
// we return to avoid blocking the pool for too long
if (lock.isEmpty()) {
// Sleep a moment to avoid busy looping via the retry queue
Thread.sleep(10);
retryQueue.add(this);
if (retryQueue.remainingCapacity() > 0) {
// Sleep a moment to avoid busy looping via the retry queue
// in the case when few tasks remain and almost all are ineligible for
// immediate restart
Thread.sleep(5);
}
retryQueue.put(this);
return;
}
DomainLocks.DomainLock domainLock = lock.get();
try (domainLock) {
Thread.currentThread().setName("crawling:" + domain);
Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE);
Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP);
Path slopFile = CrawlerOutputFile.createSlopPath(outputDir, id, domain);

View File

@@ -53,6 +53,7 @@ import java.net.SocketTimeoutException;
import java.net.URISyntaxException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -393,25 +394,31 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
if (probeType == HttpFetcher.ProbeType.FULL) {
try {
var probeResult = probeContentType(url, cookies, timer, contentTags);
logger.info(crawlerAuditMarker, "Probe result {} for {}", probeResult.getClass().getSimpleName(), url);
switch (probeResult) {
case HttpFetcher.ContentTypeProbeResult.NoOp():
break; //
case HttpFetcher.ContentTypeProbeResult.Ok(EdgeUrl resolvedUrl):
logger.info(crawlerAuditMarker, "Probe result OK for {}", url);
url = resolvedUrl; // If we were redirected while probing, use the final URL for fetching
break;
case ContentTypeProbeResult.BadContentType badContentType:
warcRecorder.flagAsFailedContentTypeProbe(url, badContentType.contentType(), badContentType.statusCode());
logger.info(crawlerAuditMarker, "Probe result Bad ContenType ({}) for {}", badContentType.contentType(), url);
return new HttpFetchResult.ResultNone();
case ContentTypeProbeResult.BadContentType.Timeout(Exception ex):
logger.info(crawlerAuditMarker, "Probe result Timeout for {}", url);
warcRecorder.flagAsTimeout(url);
return new HttpFetchResult.ResultException(ex);
case ContentTypeProbeResult.Exception(Exception ex):
logger.info(crawlerAuditMarker, "Probe result Exception({}) for {}", ex.getClass().getSimpleName(), url);
warcRecorder.flagAsError(url, ex);
return new HttpFetchResult.ResultException(ex);
case ContentTypeProbeResult.HttpError httpError:
logger.info(crawlerAuditMarker, "Probe result HTTP Error ({}) for {}", httpError.statusCode(), url);
return new HttpFetchResult.ResultException(new HttpException("HTTP status code " + httpError.statusCode() + ": " + httpError.message()));
case ContentTypeProbeResult.Redirect redirect:
logger.info(crawlerAuditMarker, "Probe result redirect for {} -> {}", url, redirect.location());
return new HttpFetchResult.ResultRedirect(redirect.location());
}
} catch (Exception ex) {
@@ -430,27 +437,32 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
contentTags.paint(request);
try (var sl = new SendLock()) {
Instant start = Instant.now();
HttpFetchResult result = warcRecorder.fetch(client, cookies, request);
Duration fetchDuration = Duration.between(start, Instant.now());
if (result instanceof HttpFetchResult.ResultOk ok) {
if (ok.statusCode() == 304) {
return new HttpFetchResult.Result304Raw();
result = new HttpFetchResult.Result304Raw();
}
}
switch (result) {
case HttpFetchResult.ResultOk ok -> logger.info(crawlerAuditMarker, "Fetch result OK {} for {}", ok.statusCode(), url);
case HttpFetchResult.ResultOk ok -> logger.info(crawlerAuditMarker, "Fetch result OK {} for {} ({} ms)", ok.statusCode(), url, fetchDuration.toMillis());
case HttpFetchResult.ResultRedirect redirect -> logger.info(crawlerAuditMarker, "Fetch result redirect: {} for {}", redirect.url(), url);
case HttpFetchResult.ResultNone none -> logger.info(crawlerAuditMarker, "Fetch result none for {}", url);
case HttpFetchResult.ResultException ex -> logger.error(crawlerAuditMarker, "Fetch result exception for " + url + ": {}", ex.ex());
case HttpFetchResult.ResultNone none -> logger.info(crawlerAuditMarker, "Fetch result none for {}", url);
case HttpFetchResult.ResultException ex -> logger.error(crawlerAuditMarker, "Fetch result exception for {}", url, ex.ex());
case HttpFetchResult.Result304Raw raw -> logger.info(crawlerAuditMarker, "Fetch result: 304 Raw for {}", url);
case HttpFetchResult.Result304ReplacedWithReference ref -> logger.info(crawlerAuditMarker, "Fetch result: 304 With reference for {}", url);
}
return result;
}
}
catch (Exception ex) {
ex.printStackTrace();
logger.error(crawlerAuditMarker, "Fetch result exception for {}", url, ex);
return new HttpFetchResult.ResultException(ex);
}

View File

@@ -20,16 +20,17 @@ public class DomainLocks {
* and may be held by another thread. The caller is responsible for locking and releasing the lock.
*/
public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException {
var ret = new DomainLock(domain.toString(),
locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits));
ret.lock();
return ret;
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
sem.acquire();
return new DomainLock(sem);
}
public Optional<DomainLock> tryLockDomain(EdgeDomain domain) {
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
if (sem.tryAcquire(1)) {
return Optional.of(new DomainLock(domain.toString(), sem));
return Optional.of(new DomainLock(sem));
}
else {
// We don't have a lock, so we return an empty optional
@@ -42,23 +43,27 @@ public class DomainLocks {
return new Semaphore(16);
if (topDomain.equals("blogspot.com"))
return new Semaphore(8);
if (topDomain.equals("tumblr.com"))
return new Semaphore(8);
if (topDomain.equals("neocities.org"))
return new Semaphore(4);
return new Semaphore(8);
if (topDomain.equals("github.io"))
return new Semaphore(4);
return new Semaphore(8);
// Substack really dislikes broad-scale crawlers, so we need to be careful
// to not get blocked.
if (topDomain.equals("substack.com")) {
return new Semaphore(1);
}
if (topDomain.endsWith(".edu")) {
return new Semaphore(1);
}
return new Semaphore(2);
}
public boolean canLock(EdgeDomain domain) {
/** Returns true if the domain is lockable, i.e. if it is not already locked by another thread.
* (this is just a hint, and does not guarantee that the domain is actually lockable any time
* after this method returns true)
*/
public boolean isLockableHint(EdgeDomain domain) {
Semaphore sem = locks.get(domain.topDomain.toLowerCase());
if (null == sem)
return true;
@@ -67,25 +72,16 @@ public class DomainLocks {
}
public static class DomainLock implements AutoCloseable {
private final String domainName;
private final Semaphore semaphore;
DomainLock(String domainName, Semaphore semaphore) {
this.domainName = domainName;
DomainLock(Semaphore semaphore) {
this.semaphore = semaphore;
}
// This method is called to lock the domain. It will block until the lock is available.
private void lock() throws InterruptedException {
Thread.currentThread().setName("crawling:" + domainName + " [await domain lock]");
semaphore.acquire();
Thread.currentThread().setName("crawling:" + domainName);
}
@Override
public void close() throws Exception {
semaphore.release();
Thread.currentThread().setName("crawling:" + domainName + " [wrapping up]");
Thread.currentThread().setName("[idle]");
}
}
}

View File

@@ -24,25 +24,25 @@ This is a sample of real crawl data. It is intended for demo, testing and devel
<tr>
<td><input id="sample-s" value="sample-s" name="sample" class="form-check-input" type="radio"></td>
<td><label for="sample-s">Small</label></td>
<td>1000 Domains. About 2 GB. </td>
<td>1000 Domains. About 1 GB. </td>
</tr>
<tr>
<td><input id="sample-m" value="sample-m" name="sample" class="form-check-input" type="radio"></td>
<td><label for="sample-m">Medium</label></td>
<td>2000 Domains. About 6 GB. Recommended.</td>
<td>2000 Domains. About 2 GB. Recommended.</td>
</tr>
<tr>
<td><input id="sample-l" value="sample-l" name="sample" class="form-check-input" type="radio"></td>
<td><label for="sample-l">Large</label></td>
<td>5000 Domains. About 20 GB.</td>
<td>5000 Domains. About 7 GB.</td>
</tr>
<tr>
<td><input id="sample-xl" value="sample-xl" name="sample" class="form-check-input" type="radio"></td>
<td><label for="sample-xl">Huge</label></td>
<td>50,000 Domains. Around 180 GB. Primarily intended for pre-production like testing environments.
<td>50,000 Domains. Around 80 GB. Primarily intended for pre-production like testing environments.
Expect hours of processing time. </td>
</tr>
</table>