mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 07:32:38 +02:00
Compare commits
14 Commits
deploy-013
...
deploy-014
Author | SHA1 | Date | |
---|---|---|---|
|
c246a59158 | ||
|
0b99781d24 | ||
|
39db9620c1 | ||
|
1781599363 | ||
|
6b2d18fb9b | ||
|
59b1d200ab | ||
|
897010a2cf | ||
|
602af7a77e | ||
|
a7d91c8527 | ||
|
7151602124 | ||
|
884e33bd4a | ||
|
e84d5c497a | ||
|
2d2d3e2466 | ||
|
647dd9b12f |
@@ -8,6 +8,7 @@ import nu.marginalia.actor.state.ActorResumeBehavior;
|
|||||||
import nu.marginalia.actor.state.ActorStep;
|
import nu.marginalia.actor.state.ActorStep;
|
||||||
import nu.marginalia.actor.state.Resume;
|
import nu.marginalia.actor.state.Resume;
|
||||||
import nu.marginalia.service.control.ServiceEventLog;
|
import nu.marginalia.service.control.ServiceEventLog;
|
||||||
|
import nu.marginalia.service.control.ServiceHeartbeat;
|
||||||
import nu.marginalia.storage.FileStorageService;
|
import nu.marginalia.storage.FileStorageService;
|
||||||
import nu.marginalia.storage.model.FileStorage;
|
import nu.marginalia.storage.model.FileStorage;
|
||||||
import nu.marginalia.storage.model.FileStorageId;
|
import nu.marginalia.storage.model.FileStorageId;
|
||||||
@@ -19,6 +20,7 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
|
import java.net.HttpURLConnection;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
@@ -32,6 +34,7 @@ public class DownloadSampleActor extends RecordActorPrototype {
|
|||||||
|
|
||||||
private final FileStorageService storageService;
|
private final FileStorageService storageService;
|
||||||
private final ServiceEventLog eventLog;
|
private final ServiceEventLog eventLog;
|
||||||
|
private final ServiceHeartbeat heartbeat;
|
||||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||||
|
|
||||||
@Resume(behavior = ActorResumeBehavior.ERROR)
|
@Resume(behavior = ActorResumeBehavior.ERROR)
|
||||||
@@ -66,15 +69,39 @@ public class DownloadSampleActor extends RecordActorPrototype {
|
|||||||
|
|
||||||
Files.deleteIfExists(Path.of(tarFileName));
|
Files.deleteIfExists(Path.of(tarFileName));
|
||||||
|
|
||||||
try (var is = new BufferedInputStream(new URI(downloadURI).toURL().openStream());
|
HttpURLConnection urlConnection = (HttpURLConnection) new URI(downloadURI).toURL().openConnection();
|
||||||
var os = new BufferedOutputStream(Files.newOutputStream(Path.of(tarFileName), StandardOpenOption.CREATE))) {
|
|
||||||
is.transferTo(os);
|
try (var hb = heartbeat.createServiceAdHocTaskHeartbeat("Downloading sample")) {
|
||||||
|
long size = urlConnection.getContentLengthLong();
|
||||||
|
byte[] buffer = new byte[8192];
|
||||||
|
|
||||||
|
try (var is = new BufferedInputStream(urlConnection.getInputStream());
|
||||||
|
var os = new BufferedOutputStream(Files.newOutputStream(Path.of(tarFileName), StandardOpenOption.CREATE))) {
|
||||||
|
long copiedSize = 0;
|
||||||
|
|
||||||
|
while (copiedSize < size) {
|
||||||
|
int read = is.read(buffer);
|
||||||
|
|
||||||
|
if (read < 0) // We've been promised a file of length 'size'
|
||||||
|
throw new IOException("Unexpected end of stream");
|
||||||
|
|
||||||
|
os.write(buffer, 0, read);
|
||||||
|
copiedSize += read;
|
||||||
|
|
||||||
|
// Update progress bar
|
||||||
|
hb.progress(String.format("%d MB", copiedSize / 1024 / 1024), (int) (copiedSize / 1024), (int) (size / 1024));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
catch (Exception ex) {
|
catch (Exception ex) {
|
||||||
eventLog.logEvent(DownloadSampleActor.class, "Error downloading sample");
|
eventLog.logEvent(DownloadSampleActor.class, "Error downloading sample");
|
||||||
logger.error("Error downloading sample", ex);
|
logger.error("Error downloading sample", ex);
|
||||||
yield new Error();
|
yield new Error();
|
||||||
}
|
}
|
||||||
|
finally {
|
||||||
|
urlConnection.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
eventLog.logEvent(DownloadSampleActor.class, "Download complete");
|
eventLog.logEvent(DownloadSampleActor.class, "Download complete");
|
||||||
yield new Extract(fileStorageId, tarFileName);
|
yield new Extract(fileStorageId, tarFileName);
|
||||||
@@ -170,11 +197,12 @@ public class DownloadSampleActor extends RecordActorPrototype {
|
|||||||
@Inject
|
@Inject
|
||||||
public DownloadSampleActor(Gson gson,
|
public DownloadSampleActor(Gson gson,
|
||||||
FileStorageService storageService,
|
FileStorageService storageService,
|
||||||
ServiceEventLog eventLog)
|
ServiceEventLog eventLog, ServiceHeartbeat heartbeat)
|
||||||
{
|
{
|
||||||
super(gson);
|
super(gson);
|
||||||
this.storageService = storageService;
|
this.storageService = storageService;
|
||||||
this.eventLog = eventLog;
|
this.eventLog = eventLog;
|
||||||
|
this.heartbeat = heartbeat;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@@ -43,6 +43,7 @@ import java.nio.file.StandardCopyOption;
|
|||||||
import java.security.Security;
|
import java.security.Security;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
@@ -66,6 +67,8 @@ public class CrawlerMain extends ProcessMainClass {
|
|||||||
|
|
||||||
private final Map<String, CrawlTask> pendingCrawlTasks = new ConcurrentHashMap<>();
|
private final Map<String, CrawlTask> pendingCrawlTasks = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private final LinkedBlockingQueue<CrawlTask> retryQueue = new LinkedBlockingQueue<>();
|
||||||
|
|
||||||
private final AtomicInteger tasksDone = new AtomicInteger(0);
|
private final AtomicInteger tasksDone = new AtomicInteger(0);
|
||||||
private final HttpFetcherImpl fetcher;
|
private final HttpFetcherImpl fetcher;
|
||||||
|
|
||||||
@@ -277,12 +280,29 @@ public class CrawlerMain extends ProcessMainClass {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Schedule viable tasks for execution until list is empty
|
// Schedule viable tasks for execution until list is empty
|
||||||
while (!taskList.isEmpty()) {
|
for (int emptyRuns = 0;emptyRuns < 300;) {
|
||||||
taskList.removeIf(this::trySubmitDeferredTask);
|
boolean hasTasks = !taskList.isEmpty();
|
||||||
|
|
||||||
// Add a small pause here to avoid busy looping toward the end of the execution cycle when
|
// The order of these checks very important to avoid a race condition
|
||||||
// we might have no new viable tasks to run for hours on end
|
// where we miss a task that is put into the retry queue
|
||||||
TimeUnit.MILLISECONDS.sleep(50);
|
boolean hasRunningTasks = pool.getActiveCount() > 0;
|
||||||
|
boolean hasRetryTasks = !retryQueue.isEmpty();
|
||||||
|
|
||||||
|
if (hasTasks || hasRetryTasks || hasRunningTasks) {
|
||||||
|
retryQueue.drainTo(taskList);
|
||||||
|
|
||||||
|
// Try to submit any tasks that are in the retry queue (this will block if the pool is full)
|
||||||
|
taskList.removeIf(this::trySubmitDeferredTask);
|
||||||
|
|
||||||
|
// Add a small pause here to avoid busy looping toward the end of the execution cycle when
|
||||||
|
// we might have no new viable tasks to run for hours on end
|
||||||
|
TimeUnit.MILLISECONDS.sleep(5);
|
||||||
|
} else {
|
||||||
|
// We have no tasks to run, and no tasks in the retry queue
|
||||||
|
// but we wait a bit to see if any new tasks come in via the retry queue
|
||||||
|
emptyRuns++;
|
||||||
|
TimeUnit.SECONDS.sleep(1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info("Shutting down the pool, waiting for tasks to complete...");
|
logger.info("Shutting down the pool, waiting for tasks to complete...");
|
||||||
@@ -414,7 +434,7 @@ public class CrawlerMain extends ProcessMainClass {
|
|||||||
/** Best effort indicator whether we could start this now without getting stuck in
|
/** Best effort indicator whether we could start this now without getting stuck in
|
||||||
* DomainLocks purgatory */
|
* DomainLocks purgatory */
|
||||||
public boolean canRun() {
|
public boolean canRun() {
|
||||||
return domainLocks.canLock(new EdgeDomain(domain));
|
return domainLocks.isLockableHint(new EdgeDomain(domain));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -425,66 +445,82 @@ public class CrawlerMain extends ProcessMainClass {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE);
|
Optional<DomainLocks.DomainLock> lock = domainLocks.tryLockDomain(new EdgeDomain(domain));
|
||||||
Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP);
|
// We don't have a lock, so we can't run this task
|
||||||
Path slopFile = CrawlerOutputFile.createSlopPath(outputDir, id, domain);
|
// we return to avoid blocking the pool for too long
|
||||||
|
if (lock.isEmpty()) {
|
||||||
// Move the WARC file to a temp file if it exists, so we can resume the crawl using the old data
|
if (retryQueue.remainingCapacity() > 0) {
|
||||||
// while writing to the same file name as before
|
// Sleep a moment to avoid busy looping via the retry queue
|
||||||
if (Files.exists(newWarcFile)) {
|
// in the case when few tasks remain and almost all are ineligible for
|
||||||
Files.move(newWarcFile, tempFile, StandardCopyOption.REPLACE_EXISTING);
|
// immediate restart
|
||||||
}
|
Thread.sleep(5);
|
||||||
else {
|
|
||||||
Files.deleteIfExists(tempFile);
|
|
||||||
}
|
|
||||||
|
|
||||||
try (var warcRecorder = new WarcRecorder(newWarcFile); // write to a temp file for now
|
|
||||||
var retriever = new CrawlerRetreiver(fetcher, domainProber, specification, domainStateDb, warcRecorder);
|
|
||||||
CrawlDataReference reference = getReference()
|
|
||||||
)
|
|
||||||
{
|
|
||||||
// Resume the crawl if it was aborted
|
|
||||||
if (Files.exists(tempFile)) {
|
|
||||||
retriever.syncAbortedRun(tempFile);
|
|
||||||
Files.delete(tempFile);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DomainLinks domainLinks = anchorTagsSource.getAnchorTags(domain);
|
retryQueue.put(this);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
DomainLocks.DomainLock domainLock = lock.get();
|
||||||
|
|
||||||
int size;
|
try (domainLock) {
|
||||||
try (var lock = domainLocks.lockDomain(new EdgeDomain(domain))) {
|
Thread.currentThread().setName("crawling:" + domain);
|
||||||
size = retriever.crawlDomain(domainLinks, reference);
|
|
||||||
|
Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE);
|
||||||
|
Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP);
|
||||||
|
Path slopFile = CrawlerOutputFile.createSlopPath(outputDir, id, domain);
|
||||||
|
|
||||||
|
// Move the WARC file to a temp file if it exists, so we can resume the crawl using the old data
|
||||||
|
// while writing to the same file name as before
|
||||||
|
if (Files.exists(newWarcFile)) {
|
||||||
|
Files.move(newWarcFile, tempFile, StandardCopyOption.REPLACE_EXISTING);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Files.deleteIfExists(tempFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete the reference crawl data if it's not the same as the new one
|
try (var warcRecorder = new WarcRecorder(newWarcFile); // write to a temp file for now
|
||||||
// (mostly a case when migrating from legacy->warc)
|
var retriever = new CrawlerRetreiver(fetcher, domainProber, specification, domainStateDb, warcRecorder);
|
||||||
reference.delete();
|
CrawlDataReference reference = getReference())
|
||||||
|
{
|
||||||
|
// Resume the crawl if it was aborted
|
||||||
|
if (Files.exists(tempFile)) {
|
||||||
|
retriever.syncAbortedRun(tempFile);
|
||||||
|
Files.delete(tempFile);
|
||||||
|
}
|
||||||
|
|
||||||
// Convert the WARC file to Parquet
|
DomainLinks domainLinks = anchorTagsSource.getAnchorTags(domain);
|
||||||
SlopCrawlDataRecord
|
|
||||||
.convertWarc(domain, userAgent, newWarcFile, slopFile);
|
|
||||||
|
|
||||||
// Optionally archive the WARC file if full retention is enabled,
|
int size = retriever.crawlDomain(domainLinks, reference);
|
||||||
// otherwise delete it:
|
|
||||||
warcArchiver.consumeWarc(newWarcFile, domain);
|
|
||||||
|
|
||||||
// Mark the domain as finished in the work log
|
// Delete the reference crawl data if it's not the same as the new one
|
||||||
workLog.setJobToFinished(domain, slopFile.toString(), size);
|
// (mostly a case when migrating from legacy->warc)
|
||||||
|
reference.delete();
|
||||||
|
|
||||||
// Update the progress bar
|
// Convert the WARC file to Slop
|
||||||
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
|
SlopCrawlDataRecord
|
||||||
|
.convertWarc(domain, userAgent, newWarcFile, slopFile);
|
||||||
|
|
||||||
logger.info("Fetched {}", domain);
|
// Optionally archive the WARC file if full retention is enabled,
|
||||||
} catch (Exception e) {
|
// otherwise delete it:
|
||||||
logger.error("Error fetching domain " + domain, e);
|
warcArchiver.consumeWarc(newWarcFile, domain);
|
||||||
}
|
|
||||||
finally {
|
|
||||||
// We don't need to double-count these; it's also kept in the workLog
|
|
||||||
pendingCrawlTasks.remove(domain);
|
|
||||||
Thread.currentThread().setName("[idle]");
|
|
||||||
|
|
||||||
Files.deleteIfExists(newWarcFile);
|
// Mark the domain as finished in the work log
|
||||||
Files.deleteIfExists(tempFile);
|
workLog.setJobToFinished(domain, slopFile.toString(), size);
|
||||||
|
|
||||||
|
// Update the progress bar
|
||||||
|
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
|
||||||
|
|
||||||
|
logger.info("Fetched {}", domain);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Error fetching domain " + domain, e);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
// We don't need to double-count these; it's also kept in the workLog
|
||||||
|
pendingCrawlTasks.remove(domain);
|
||||||
|
Thread.currentThread().setName("[idle]");
|
||||||
|
|
||||||
|
Files.deleteIfExists(newWarcFile);
|
||||||
|
Files.deleteIfExists(tempFile);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -53,6 +53,7 @@ import java.net.SocketTimeoutException;
|
|||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
@@ -393,25 +394,31 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
|
|||||||
if (probeType == HttpFetcher.ProbeType.FULL) {
|
if (probeType == HttpFetcher.ProbeType.FULL) {
|
||||||
try {
|
try {
|
||||||
var probeResult = probeContentType(url, cookies, timer, contentTags);
|
var probeResult = probeContentType(url, cookies, timer, contentTags);
|
||||||
logger.info(crawlerAuditMarker, "Probe result {} for {}", probeResult.getClass().getSimpleName(), url);
|
|
||||||
switch (probeResult) {
|
switch (probeResult) {
|
||||||
case HttpFetcher.ContentTypeProbeResult.NoOp():
|
case HttpFetcher.ContentTypeProbeResult.NoOp():
|
||||||
break; //
|
break; //
|
||||||
case HttpFetcher.ContentTypeProbeResult.Ok(EdgeUrl resolvedUrl):
|
case HttpFetcher.ContentTypeProbeResult.Ok(EdgeUrl resolvedUrl):
|
||||||
|
logger.info(crawlerAuditMarker, "Probe result OK for {}", url);
|
||||||
url = resolvedUrl; // If we were redirected while probing, use the final URL for fetching
|
url = resolvedUrl; // If we were redirected while probing, use the final URL for fetching
|
||||||
break;
|
break;
|
||||||
case ContentTypeProbeResult.BadContentType badContentType:
|
case ContentTypeProbeResult.BadContentType badContentType:
|
||||||
warcRecorder.flagAsFailedContentTypeProbe(url, badContentType.contentType(), badContentType.statusCode());
|
warcRecorder.flagAsFailedContentTypeProbe(url, badContentType.contentType(), badContentType.statusCode());
|
||||||
|
logger.info(crawlerAuditMarker, "Probe result Bad ContenType ({}) for {}", badContentType.contentType(), url);
|
||||||
return new HttpFetchResult.ResultNone();
|
return new HttpFetchResult.ResultNone();
|
||||||
case ContentTypeProbeResult.BadContentType.Timeout(Exception ex):
|
case ContentTypeProbeResult.BadContentType.Timeout(Exception ex):
|
||||||
|
logger.info(crawlerAuditMarker, "Probe result Timeout for {}", url);
|
||||||
warcRecorder.flagAsTimeout(url);
|
warcRecorder.flagAsTimeout(url);
|
||||||
return new HttpFetchResult.ResultException(ex);
|
return new HttpFetchResult.ResultException(ex);
|
||||||
case ContentTypeProbeResult.Exception(Exception ex):
|
case ContentTypeProbeResult.Exception(Exception ex):
|
||||||
|
logger.info(crawlerAuditMarker, "Probe result Exception({}) for {}", ex.getClass().getSimpleName(), url);
|
||||||
warcRecorder.flagAsError(url, ex);
|
warcRecorder.flagAsError(url, ex);
|
||||||
return new HttpFetchResult.ResultException(ex);
|
return new HttpFetchResult.ResultException(ex);
|
||||||
case ContentTypeProbeResult.HttpError httpError:
|
case ContentTypeProbeResult.HttpError httpError:
|
||||||
|
logger.info(crawlerAuditMarker, "Probe result HTTP Error ({}) for {}", httpError.statusCode(), url);
|
||||||
return new HttpFetchResult.ResultException(new HttpException("HTTP status code " + httpError.statusCode() + ": " + httpError.message()));
|
return new HttpFetchResult.ResultException(new HttpException("HTTP status code " + httpError.statusCode() + ": " + httpError.message()));
|
||||||
case ContentTypeProbeResult.Redirect redirect:
|
case ContentTypeProbeResult.Redirect redirect:
|
||||||
|
logger.info(crawlerAuditMarker, "Probe result redirect for {} -> {}", url, redirect.location());
|
||||||
return new HttpFetchResult.ResultRedirect(redirect.location());
|
return new HttpFetchResult.ResultRedirect(redirect.location());
|
||||||
}
|
}
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
@@ -430,27 +437,32 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
|
|||||||
contentTags.paint(request);
|
contentTags.paint(request);
|
||||||
|
|
||||||
try (var sl = new SendLock()) {
|
try (var sl = new SendLock()) {
|
||||||
|
Instant start = Instant.now();
|
||||||
HttpFetchResult result = warcRecorder.fetch(client, cookies, request);
|
HttpFetchResult result = warcRecorder.fetch(client, cookies, request);
|
||||||
|
|
||||||
|
Duration fetchDuration = Duration.between(start, Instant.now());
|
||||||
|
|
||||||
if (result instanceof HttpFetchResult.ResultOk ok) {
|
if (result instanceof HttpFetchResult.ResultOk ok) {
|
||||||
if (ok.statusCode() == 304) {
|
if (ok.statusCode() == 304) {
|
||||||
return new HttpFetchResult.Result304Raw();
|
result = new HttpFetchResult.Result304Raw();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (result) {
|
switch (result) {
|
||||||
case HttpFetchResult.ResultOk ok -> logger.info(crawlerAuditMarker, "Fetch result OK {} for {}", ok.statusCode(), url);
|
case HttpFetchResult.ResultOk ok -> logger.info(crawlerAuditMarker, "Fetch result OK {} for {} ({} ms)", ok.statusCode(), url, fetchDuration.toMillis());
|
||||||
case HttpFetchResult.ResultRedirect redirect -> logger.info(crawlerAuditMarker, "Fetch result redirect: {} for {}", redirect.url(), url);
|
case HttpFetchResult.ResultRedirect redirect -> logger.info(crawlerAuditMarker, "Fetch result redirect: {} for {}", redirect.url(), url);
|
||||||
case HttpFetchResult.ResultNone none -> logger.info(crawlerAuditMarker, "Fetch result none for {}", url);
|
case HttpFetchResult.ResultNone none -> logger.info(crawlerAuditMarker, "Fetch result none for {}", url);
|
||||||
case HttpFetchResult.ResultException ex -> logger.error(crawlerAuditMarker, "Fetch result exception for " + url + ": {}", ex.ex());
|
case HttpFetchResult.ResultException ex -> logger.error(crawlerAuditMarker, "Fetch result exception for {}", url, ex.ex());
|
||||||
case HttpFetchResult.Result304Raw raw -> logger.info(crawlerAuditMarker, "Fetch result: 304 Raw for {}", url);
|
case HttpFetchResult.Result304Raw raw -> logger.info(crawlerAuditMarker, "Fetch result: 304 Raw for {}", url);
|
||||||
case HttpFetchResult.Result304ReplacedWithReference ref -> logger.info(crawlerAuditMarker, "Fetch result: 304 With reference for {}", url);
|
case HttpFetchResult.Result304ReplacedWithReference ref -> logger.info(crawlerAuditMarker, "Fetch result: 304 With reference for {}", url);
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Exception ex) {
|
catch (Exception ex) {
|
||||||
ex.printStackTrace();
|
logger.error(crawlerAuditMarker, "Fetch result exception for {}", url, ex);
|
||||||
|
|
||||||
return new HttpFetchResult.ResultException(ex);
|
return new HttpFetchResult.ResultException(ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -41,7 +41,7 @@ public class WarcRecorder implements AutoCloseable {
|
|||||||
static final int MAX_TIME = 30_000;
|
static final int MAX_TIME = 30_000;
|
||||||
|
|
||||||
/** Maximum (decompressed) size we'll save */
|
/** Maximum (decompressed) size we'll save */
|
||||||
static final int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 10 * 1024 * 1024);
|
static final int MAX_SIZE = Integer.getInteger("crawler.maxFetchSize", 32 * 1024 * 1024);
|
||||||
|
|
||||||
private final WarcWriter writer;
|
private final WarcWriter writer;
|
||||||
private final Path warcFile;
|
private final Path warcFile;
|
||||||
|
@@ -3,6 +3,7 @@ package nu.marginalia.crawl.logic;
|
|||||||
import nu.marginalia.model.EdgeDomain;
|
import nu.marginalia.model.EdgeDomain;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
|
|
||||||
@@ -19,8 +20,22 @@ public class DomainLocks {
|
|||||||
* and may be held by another thread. The caller is responsible for locking and releasing the lock.
|
* and may be held by another thread. The caller is responsible for locking and releasing the lock.
|
||||||
*/
|
*/
|
||||||
public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException {
|
public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException {
|
||||||
return new DomainLock(domain.toString(),
|
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
|
||||||
locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits));
|
|
||||||
|
sem.acquire();
|
||||||
|
|
||||||
|
return new DomainLock(sem);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Optional<DomainLock> tryLockDomain(EdgeDomain domain) {
|
||||||
|
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
|
||||||
|
if (sem.tryAcquire(1)) {
|
||||||
|
return Optional.of(new DomainLock(sem));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// We don't have a lock, so we return an empty optional
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Semaphore defaultPermits(String topDomain) {
|
private Semaphore defaultPermits(String topDomain) {
|
||||||
@@ -28,23 +43,27 @@ public class DomainLocks {
|
|||||||
return new Semaphore(16);
|
return new Semaphore(16);
|
||||||
if (topDomain.equals("blogspot.com"))
|
if (topDomain.equals("blogspot.com"))
|
||||||
return new Semaphore(8);
|
return new Semaphore(8);
|
||||||
|
if (topDomain.equals("tumblr.com"))
|
||||||
|
return new Semaphore(8);
|
||||||
if (topDomain.equals("neocities.org"))
|
if (topDomain.equals("neocities.org"))
|
||||||
return new Semaphore(4);
|
return new Semaphore(8);
|
||||||
if (topDomain.equals("github.io"))
|
if (topDomain.equals("github.io"))
|
||||||
return new Semaphore(4);
|
return new Semaphore(8);
|
||||||
|
|
||||||
|
// Substack really dislikes broad-scale crawlers, so we need to be careful
|
||||||
|
// to not get blocked.
|
||||||
if (topDomain.equals("substack.com")) {
|
if (topDomain.equals("substack.com")) {
|
||||||
return new Semaphore(1);
|
return new Semaphore(1);
|
||||||
}
|
}
|
||||||
if (topDomain.endsWith(".edu")) {
|
|
||||||
return new Semaphore(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
return new Semaphore(2);
|
return new Semaphore(2);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean canLock(EdgeDomain domain) {
|
/** Returns true if the domain is lockable, i.e. if it is not already locked by another thread.
|
||||||
|
* (this is just a hint, and does not guarantee that the domain is actually lockable any time
|
||||||
|
* after this method returns true)
|
||||||
|
*/
|
||||||
|
public boolean isLockableHint(EdgeDomain domain) {
|
||||||
Semaphore sem = locks.get(domain.topDomain.toLowerCase());
|
Semaphore sem = locks.get(domain.topDomain.toLowerCase());
|
||||||
if (null == sem)
|
if (null == sem)
|
||||||
return true;
|
return true;
|
||||||
@@ -53,22 +72,16 @@ public class DomainLocks {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static class DomainLock implements AutoCloseable {
|
public static class DomainLock implements AutoCloseable {
|
||||||
private final String domainName;
|
|
||||||
private final Semaphore semaphore;
|
private final Semaphore semaphore;
|
||||||
|
|
||||||
DomainLock(String domainName, Semaphore semaphore) throws InterruptedException {
|
DomainLock(Semaphore semaphore) {
|
||||||
this.domainName = domainName;
|
|
||||||
this.semaphore = semaphore;
|
this.semaphore = semaphore;
|
||||||
|
|
||||||
Thread.currentThread().setName("crawling:" + domainName + " [await domain lock]");
|
|
||||||
semaphore.acquire();
|
|
||||||
Thread.currentThread().setName("crawling:" + domainName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws Exception {
|
public void close() throws Exception {
|
||||||
semaphore.release();
|
semaphore.release();
|
||||||
Thread.currentThread().setName("crawling:" + domainName + " [wrapping up]");
|
Thread.currentThread().setName("[idle]");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -6,6 +6,7 @@ public class ContentTypes {
|
|||||||
public static final Set<String> acceptedContentTypes = Set.of("application/xhtml+xml",
|
public static final Set<String> acceptedContentTypes = Set.of("application/xhtml+xml",
|
||||||
"application/xhtml",
|
"application/xhtml",
|
||||||
"text/html",
|
"text/html",
|
||||||
|
"application/pdf",
|
||||||
"image/x-icon",
|
"image/x-icon",
|
||||||
"text/plain");
|
"text/plain");
|
||||||
|
|
||||||
@@ -19,4 +20,9 @@ public class ContentTypes {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean isBinary(String contentTypeHeader) {
|
||||||
|
String lcHeader = contentTypeHeader.toLowerCase();
|
||||||
|
return lcHeader.startsWith("application/pdf");
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@@ -37,8 +37,12 @@ public class SlopSerializableCrawlDataStream implements AutoCloseable, Serializa
|
|||||||
public boolean filter(String url, int status, String contentType) {
|
public boolean filter(String url, int status, String contentType) {
|
||||||
String ctLc = contentType.toLowerCase();
|
String ctLc = contentType.toLowerCase();
|
||||||
|
|
||||||
|
// Permit all plain text content types
|
||||||
if (ctLc.startsWith("text/"))
|
if (ctLc.startsWith("text/"))
|
||||||
return true;
|
return true;
|
||||||
|
// PDF
|
||||||
|
else if (ctLc.startsWith("application/pdf"))
|
||||||
|
return true;
|
||||||
else if (ctLc.startsWith("x-marginalia/"))
|
else if (ctLc.startsWith("x-marginalia/"))
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
|
@@ -10,7 +10,7 @@ import java.util.regex.Pattern;
|
|||||||
|
|
||||||
public class ContentTypeLogic {
|
public class ContentTypeLogic {
|
||||||
|
|
||||||
private static final Predicate<String> probableHtmlPattern = Pattern.compile("^.*\\.(htm|html|php|txt|md)$").asMatchPredicate();
|
private static final Predicate<String> probableGoodPattern = Pattern.compile("^.*\\.(htm|html|php|txt|md|pdf)$").asMatchPredicate();
|
||||||
private static final Predicate<String> probableBinaryPattern = Pattern.compile("^.*\\.[a-z]+$").asMatchPredicate();
|
private static final Predicate<String> probableBinaryPattern = Pattern.compile("^.*\\.[a-z]+$").asMatchPredicate();
|
||||||
private static final Set<String> blockedContentTypes = Set.of("text/css", "text/javascript");
|
private static final Set<String> blockedContentTypes = Set.of("text/css", "text/javascript");
|
||||||
private static final List<String> acceptedContentTypePrefixes = List.of(
|
private static final List<String> acceptedContentTypePrefixes = List.of(
|
||||||
@@ -22,6 +22,7 @@ public class ContentTypeLogic {
|
|||||||
"application/rss+xml",
|
"application/rss+xml",
|
||||||
"application/x-rss+xml",
|
"application/x-rss+xml",
|
||||||
"application/rdf+xml",
|
"application/rdf+xml",
|
||||||
|
"application/pdf",
|
||||||
"x-rss+xml"
|
"x-rss+xml"
|
||||||
);
|
);
|
||||||
private boolean allowAllContentTypes = false;
|
private boolean allowAllContentTypes = false;
|
||||||
@@ -34,7 +35,7 @@ public class ContentTypeLogic {
|
|||||||
public boolean isUrlLikeBinary(EdgeUrl url) {
|
public boolean isUrlLikeBinary(EdgeUrl url) {
|
||||||
String pathLowerCase = url.path.toLowerCase();
|
String pathLowerCase = url.path.toLowerCase();
|
||||||
|
|
||||||
if (probableHtmlPattern.test(pathLowerCase))
|
if (probableGoodPattern.test(pathLowerCase))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
return probableBinaryPattern.test(pathLowerCase);
|
return probableBinaryPattern.test(pathLowerCase);
|
||||||
|
@@ -216,6 +216,11 @@ public record SlopCrawlDataRecord(String domain,
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the format is binary, we don't want to translate it if the response is truncated
|
||||||
|
if (response.truncated() != WarcTruncationReason.NOT_TRUNCATED && ContentTypes.isBinary(contentType)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -40,6 +40,8 @@ class HttpFetcherImplFetchTest {
|
|||||||
private static EdgeUrl badHttpStatusUrl;
|
private static EdgeUrl badHttpStatusUrl;
|
||||||
private static EdgeUrl keepAliveUrl;
|
private static EdgeUrl keepAliveUrl;
|
||||||
|
|
||||||
|
private static EdgeUrl pdfUrl;
|
||||||
|
|
||||||
@BeforeAll
|
@BeforeAll
|
||||||
public static void setupAll() throws URISyntaxException {
|
public static void setupAll() throws URISyntaxException {
|
||||||
wireMockServer =
|
wireMockServer =
|
||||||
@@ -133,6 +135,13 @@ class HttpFetcherImplFetchTest {
|
|||||||
));
|
));
|
||||||
|
|
||||||
|
|
||||||
|
pdfUrl = new EdgeUrl("http://localhost:18089/test.pdf");
|
||||||
|
wireMockServer.stubFor(WireMock.get(WireMock.urlEqualTo(pdfUrl.path))
|
||||||
|
.willReturn(WireMock.aResponse()
|
||||||
|
.withHeader("Content-Type", "application/pdf")
|
||||||
|
.withStatus(200)
|
||||||
|
.withBody("Hello World")));
|
||||||
|
|
||||||
wireMockServer.start();
|
wireMockServer.start();
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -352,6 +361,14 @@ class HttpFetcherImplFetchTest {
|
|||||||
Assertions.assertTrue(result.isOk());
|
Assertions.assertTrue(result.isOk());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPdf() {
|
||||||
|
var result = fetcher.fetchContent(pdfUrl, warcRecorder, new DomainCookies(), new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.FULL);
|
||||||
|
|
||||||
|
Assertions.assertInstanceOf(HttpFetchResult.ResultOk.class, result);
|
||||||
|
Assertions.assertTrue(result.isOk());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private List<WarcRecord> getWarcRecords() throws IOException {
|
private List<WarcRecord> getWarcRecords() throws IOException {
|
||||||
List<WarcRecord> records = new ArrayList<>();
|
List<WarcRecord> records = new ArrayList<>();
|
||||||
|
@@ -4,9 +4,9 @@ import nu.marginalia.UserAgent;
|
|||||||
import nu.marginalia.crawl.fetcher.ContentTags;
|
import nu.marginalia.crawl.fetcher.ContentTags;
|
||||||
import nu.marginalia.crawl.fetcher.DomainCookies;
|
import nu.marginalia.crawl.fetcher.DomainCookies;
|
||||||
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
|
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
|
||||||
|
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||||
import nu.marginalia.model.EdgeUrl;
|
import nu.marginalia.model.EdgeUrl;
|
||||||
import nu.marginalia.parquet.crawldata.CrawledDocumentParquetRecordFileReader;
|
import nu.marginalia.slop.SlopCrawlDataRecord;
|
||||||
import nu.marginalia.parquet.crawldata.CrawledDocumentParquetRecordFileWriter;
|
|
||||||
import org.apache.hc.client5.http.classic.HttpClient;
|
import org.apache.hc.client5.http.classic.HttpClient;
|
||||||
import org.apache.hc.client5.http.classic.methods.HttpGet;
|
import org.apache.hc.client5.http.classic.methods.HttpGet;
|
||||||
import org.apache.hc.client5.http.impl.classic.HttpClients;
|
import org.apache.hc.client5.http.impl.classic.HttpClients;
|
||||||
@@ -24,13 +24,14 @@ import java.nio.file.Files;
|
|||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
class WarcRecorderTest {
|
class WarcRecorderTest {
|
||||||
Path fileNameWarc;
|
Path fileNameWarc;
|
||||||
Path fileNameParquet;
|
Path fileNameSlop;
|
||||||
WarcRecorder client;
|
WarcRecorder client;
|
||||||
|
|
||||||
HttpClient httpClient;
|
HttpClient httpClient;
|
||||||
@@ -39,7 +40,7 @@ class WarcRecorderTest {
|
|||||||
httpClient = HttpClients.createDefault();
|
httpClient = HttpClients.createDefault();
|
||||||
|
|
||||||
fileNameWarc = Files.createTempFile("test", ".warc");
|
fileNameWarc = Files.createTempFile("test", ".warc");
|
||||||
fileNameParquet = Files.createTempFile("test", ".parquet");
|
fileNameSlop = Files.createTempFile("test", ".slop.zip");
|
||||||
|
|
||||||
client = new WarcRecorder(fileNameWarc);
|
client = new WarcRecorder(fileNameWarc);
|
||||||
}
|
}
|
||||||
@@ -159,17 +160,28 @@ class WarcRecorderTest {
|
|||||||
|
|
||||||
client.fetch(httpClient, new DomainCookies(), request3);
|
client.fetch(httpClient, new DomainCookies(), request3);
|
||||||
|
|
||||||
CrawledDocumentParquetRecordFileWriter.convertWarc(
|
HttpGet request4 = new HttpGet("https://downloads.marginalia.nu/test.pdf");
|
||||||
|
request4.addHeader("User-agent", "test.marginalia.nu");
|
||||||
|
request4.addHeader("Accept-Encoding", "gzip");
|
||||||
|
|
||||||
|
client.fetch(httpClient, new DomainCookies(), request4);
|
||||||
|
|
||||||
|
SlopCrawlDataRecord.convertWarc(
|
||||||
"www.marginalia.nu",
|
"www.marginalia.nu",
|
||||||
new UserAgent("test", "test"),
|
new UserAgent("test", "test"),
|
||||||
fileNameWarc,
|
fileNameWarc,
|
||||||
fileNameParquet);
|
fileNameSlop);
|
||||||
|
|
||||||
var urls = CrawledDocumentParquetRecordFileReader.stream(fileNameParquet).map(doc -> doc.url).toList();
|
List<String> urls;
|
||||||
assertEquals(2, urls.size());
|
try (var stream = SerializableCrawlDataStream.openDataStream(fileNameSlop)) {
|
||||||
|
urls = stream.docsAsList().stream().map(doc -> doc.url.toString()).toList();
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(3, urls.size());
|
||||||
assertEquals("https://www.marginalia.nu/", urls.get(0));
|
assertEquals("https://www.marginalia.nu/", urls.get(0));
|
||||||
assertEquals("https://www.marginalia.nu/log/", urls.get(1));
|
assertEquals("https://www.marginalia.nu/log/", urls.get(1));
|
||||||
// sanic.jpg gets filtered out for its bad mime type
|
// sanic.jpg gets filtered out for its bad mime type
|
||||||
|
assertEquals("https://downloads.marginalia.nu/test.pdf", urls.get(2));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -13,7 +13,7 @@
|
|||||||
class="shadow-inner flex-1 dark:bg-black dark:text-gray-100 bg-gray-50 border dark:border-gray-600 border-gray-300 text-gray-900 text-sm rounded-sm block w-full p-2.5"
|
class="shadow-inner flex-1 dark:bg-black dark:text-gray-100 bg-gray-50 border dark:border-gray-600 border-gray-300 text-gray-900 text-sm rounded-sm block w-full p-2.5"
|
||||||
value="${query}"
|
value="${query}"
|
||||||
autofocus
|
autofocus
|
||||||
placeholder="Search..."
|
placeholder="Search the web!"
|
||||||
autocomplete="off"
|
autocomplete="off"
|
||||||
name="query"
|
name="query"
|
||||||
id="searchInput" />
|
id="searchInput" />
|
||||||
@@ -21,7 +21,7 @@
|
|||||||
<input type="text"
|
<input type="text"
|
||||||
class="shadow-inner flex-1 dark:bg-black dark:text-gray-100 bg-gray-50 border dark:border-gray-600 border-gray-300 text-gray-900 text-sm rounded-sm block w-full p-2.5"
|
class="shadow-inner flex-1 dark:bg-black dark:text-gray-100 bg-gray-50 border dark:border-gray-600 border-gray-300 text-gray-900 text-sm rounded-sm block w-full p-2.5"
|
||||||
value="${query}"
|
value="${query}"
|
||||||
placeholder="Search..."
|
placeholder="Search the web!"
|
||||||
autocomplete="off"
|
autocomplete="off"
|
||||||
name="query"
|
name="query"
|
||||||
id="searchInput" />
|
id="searchInput" />
|
||||||
|
@@ -24,25 +24,25 @@ This is a sample of real crawl data. It is intended for demo, testing and devel
|
|||||||
<tr>
|
<tr>
|
||||||
<td><input id="sample-s" value="sample-s" name="sample" class="form-check-input" type="radio"></td>
|
<td><input id="sample-s" value="sample-s" name="sample" class="form-check-input" type="radio"></td>
|
||||||
<td><label for="sample-s">Small</label></td>
|
<td><label for="sample-s">Small</label></td>
|
||||||
<td>1000 Domains. About 2 GB. </td>
|
<td>1000 Domains. About 1 GB. </td>
|
||||||
</tr>
|
</tr>
|
||||||
|
|
||||||
<tr>
|
<tr>
|
||||||
<td><input id="sample-m" value="sample-m" name="sample" class="form-check-input" type="radio"></td>
|
<td><input id="sample-m" value="sample-m" name="sample" class="form-check-input" type="radio"></td>
|
||||||
<td><label for="sample-m">Medium</label></td>
|
<td><label for="sample-m">Medium</label></td>
|
||||||
<td>2000 Domains. About 6 GB. Recommended.</td>
|
<td>2000 Domains. About 2 GB. Recommended.</td>
|
||||||
</tr>
|
</tr>
|
||||||
|
|
||||||
<tr>
|
<tr>
|
||||||
<td><input id="sample-l" value="sample-l" name="sample" class="form-check-input" type="radio"></td>
|
<td><input id="sample-l" value="sample-l" name="sample" class="form-check-input" type="radio"></td>
|
||||||
<td><label for="sample-l">Large</label></td>
|
<td><label for="sample-l">Large</label></td>
|
||||||
<td>5000 Domains. About 20 GB.</td>
|
<td>5000 Domains. About 7 GB.</td>
|
||||||
</tr>
|
</tr>
|
||||||
|
|
||||||
<tr>
|
<tr>
|
||||||
<td><input id="sample-xl" value="sample-xl" name="sample" class="form-check-input" type="radio"></td>
|
<td><input id="sample-xl" value="sample-xl" name="sample" class="form-check-input" type="radio"></td>
|
||||||
<td><label for="sample-xl">Huge</label></td>
|
<td><label for="sample-xl">Huge</label></td>
|
||||||
<td>50,000 Domains. Around 180 GB. Primarily intended for pre-production like testing environments.
|
<td>50,000 Domains. Around 80 GB. Primarily intended for pre-production like testing environments.
|
||||||
Expect hours of processing time. </td>
|
Expect hours of processing time. </td>
|
||||||
</tr>
|
</tr>
|
||||||
</table>
|
</table>
|
||||||
|
Reference in New Issue
Block a user