1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

20 Commits

Author SHA1 Message Date
Viktor Lofgren
a7d91c8527 (crawler) Clean up fetcher detailed logging 2025-04-21 12:53:52 +02:00
Viktor Lofgren
7151602124 (crawler) Reduce the likelihood of crawler tasks locking on domains before they are ready
Cleaning up after changes.
2025-04-21 12:47:03 +02:00
Viktor Lofgren
884e33bd4a (crawler) Reduce the likelihood of crawler tasks locking on domains before they are ready
Change back to an unbounded queue, tighten sleep times a bit.
2025-04-21 11:48:15 +02:00
Viktor Lofgren
e84d5c497a (crawler) Reduce the likelihood of crawler tasks locking on domains before they are ready
Change to a bounded queue and adding a sleep to reduce the amount of effectively busy looping threads.
2025-04-21 00:39:26 +02:00
Viktor Lofgren
2d2d3e2466 (crawler) Reduce the likelihood of crawler tasks locking on domains before they are ready
Change to a bounded queue and adding a sleep to reduce the amount of effectively busy looping threads.
2025-04-21 00:36:48 +02:00
Viktor Lofgren
647dd9b12f (crawler) Reduce the likelihood of crawler tasks locking on domains before they are ready 2025-04-21 00:24:30 +02:00
Viktor Lofgren
de4e2849ce (crawler) Tweak request retry counts
Increase the default number of tries to 3, but don't retry on SSL errors as they are unlikely to fix themselves in the short term.
2025-04-19 00:19:48 +02:00
Viktor Lofgren
3c43f1954e (crawler) Add custom cookie store implementation
Apache HttpClient's cookie implementation builds an enormous concurrent hashmap with every cookie for every domain ever crawled.  This is a big waste of resources.

Replacing it with a fairly crude domain-isolated instance, as we are primarily interested in answering whether a cookie is set, and we will never retain cookies long term.
2025-04-18 13:04:22 +02:00
Viktor Lofgren
fa2462ec39 (crawler) Re-enable aborts on timeout 2025-04-18 12:59:34 +02:00
Viktor Lofgren
f4ad7145db (crawler) Disable SO_LINGER 2025-04-18 01:42:02 +02:00
Viktor Lofgren
068b450180 (crawler) Temporarily disable request.abort() 2025-04-18 01:25:56 +02:00
Viktor Lofgren
05b909a21f (crawler) Add logging to get more info about connection leak 2025-04-18 01:06:52 +02:00
Viktor Lofgren
3d179cddce (crawler) Correctly consume entity in sitemap retrieval 2025-04-18 00:32:21 +02:00
Viktor Lofgren
1a2aae496a (crawler) Correct handling and abortion of HttpClient's requests
There was a resource leak in the initial implementation of the Apache HttpClient WarcInputBuffer that failed to free up resources.

Using HttpGet objects instead of the Classic...Request objects, as the latter fail to expose an abort()-method.
2025-04-18 00:16:26 +02:00
Viktor Lofgren
353cdffb3f (crawler) Increase connection request timeout, restore congestion timeout 2025-04-17 21:32:06 +02:00
Viktor Lofgren
2e3f1313c7 (crawler) Log exceptions while crawling in crawler audit log 2025-04-17 21:18:09 +02:00
Viktor Lofgren
58e6f141ce (crawler) Reduce congestion throttle go-rate 2025-04-17 20:36:58 +02:00
Viktor Lofgren
500f63e921 (crawler) Lower max conn per route 2025-04-17 18:36:16 +02:00
Viktor Lofgren
6dfbedda1e (crawler) Increase max conn per route and connection timeout 2025-04-17 18:31:46 +02:00
Viktor Lofgren
9715ddb105 (crawler) Increase max pool size to a large value 2025-04-17 18:22:58 +02:00
23 changed files with 524 additions and 332 deletions

View File

@@ -20,7 +20,6 @@ import nu.marginalia.model.crawldata.CrawledDocument;
import nu.marginalia.model.crawldata.CrawledDomain; import nu.marginalia.model.crawldata.CrawledDomain;
import nu.marginalia.model.crawldata.SerializableCrawlData; import nu.marginalia.model.crawldata.SerializableCrawlData;
import nu.marginalia.parquet.crawldata.CrawledDocumentParquetRecordFileWriter; import nu.marginalia.parquet.crawldata.CrawledDocumentParquetRecordFileWriter;
import org.apache.hc.client5.http.cookie.BasicCookieStore;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -247,7 +246,7 @@ public class CrawlingThenConvertingIntegrationTest {
private CrawledDomain crawl(CrawlerMain.CrawlSpecRecord specs, Predicate<EdgeDomain> domainBlacklist) throws Exception { private CrawledDomain crawl(CrawlerMain.CrawlSpecRecord specs, Predicate<EdgeDomain> domainBlacklist) throws Exception {
List<SerializableCrawlData> data = new ArrayList<>(); List<SerializableCrawlData> data = new ArrayList<>();
try (var recorder = new WarcRecorder(fileName, new BasicCookieStore()); try (var recorder = new WarcRecorder(fileName);
var db = new DomainStateDb(dbTempFile)) var db = new DomainStateDb(dbTempFile))
{ {
new CrawlerRetreiver(httpFetcher, new DomainProber(domainBlacklist), specs, db, recorder).crawlDomain(); new CrawlerRetreiver(httpFetcher, new DomainProber(domainBlacklist), specs, db, recorder).crawlDomain();

View File

@@ -43,6 +43,7 @@ import java.nio.file.StandardCopyOption;
import java.security.Security; import java.security.Security;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@@ -66,6 +67,8 @@ public class CrawlerMain extends ProcessMainClass {
private final Map<String, CrawlTask> pendingCrawlTasks = new ConcurrentHashMap<>(); private final Map<String, CrawlTask> pendingCrawlTasks = new ConcurrentHashMap<>();
private final LinkedBlockingQueue<CrawlTask> retryQueue = new LinkedBlockingQueue<>();
private final AtomicInteger tasksDone = new AtomicInteger(0); private final AtomicInteger tasksDone = new AtomicInteger(0);
private final HttpFetcherImpl fetcher; private final HttpFetcherImpl fetcher;
@@ -277,12 +280,29 @@ public class CrawlerMain extends ProcessMainClass {
} }
// Schedule viable tasks for execution until list is empty // Schedule viable tasks for execution until list is empty
while (!taskList.isEmpty()) { for (int emptyRuns = 0;emptyRuns < 300;) {
taskList.removeIf(this::trySubmitDeferredTask); boolean hasTasks = !taskList.isEmpty();
// Add a small pause here to avoid busy looping toward the end of the execution cycle when // The order of these checks very important to avoid a race condition
// we might have no new viable tasks to run for hours on end // where we miss a task that is put into the retry queue
TimeUnit.MILLISECONDS.sleep(50); boolean hasRunningTasks = pool.getActiveCount() > 0;
boolean hasRetryTasks = !retryQueue.isEmpty();
if (hasTasks || hasRetryTasks || hasRunningTasks) {
retryQueue.drainTo(taskList);
// Try to submit any tasks that are in the retry queue (this will block if the pool is full)
taskList.removeIf(this::trySubmitDeferredTask);
// Add a small pause here to avoid busy looping toward the end of the execution cycle when
// we might have no new viable tasks to run for hours on end
TimeUnit.MILLISECONDS.sleep(5);
} else {
// We have no tasks to run, and no tasks in the retry queue
// but we wait a bit to see if any new tasks come in via the retry queue
emptyRuns++;
TimeUnit.SECONDS.sleep(1);
}
} }
logger.info("Shutting down the pool, waiting for tasks to complete..."); logger.info("Shutting down the pool, waiting for tasks to complete...");
@@ -414,7 +434,7 @@ public class CrawlerMain extends ProcessMainClass {
/** Best effort indicator whether we could start this now without getting stuck in /** Best effort indicator whether we could start this now without getting stuck in
* DomainLocks purgatory */ * DomainLocks purgatory */
public boolean canRun() { public boolean canRun() {
return domainLocks.canLock(new EdgeDomain(domain)); return domainLocks.isLockableHint(new EdgeDomain(domain));
} }
@Override @Override
@@ -425,66 +445,82 @@ public class CrawlerMain extends ProcessMainClass {
return; return;
} }
Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE); Optional<DomainLocks.DomainLock> lock = domainLocks.tryLockDomain(new EdgeDomain(domain));
Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP); // We don't have a lock, so we can't run this task
Path slopFile = CrawlerOutputFile.createSlopPath(outputDir, id, domain); // we return to avoid blocking the pool for too long
if (lock.isEmpty()) {
// Move the WARC file to a temp file if it exists, so we can resume the crawl using the old data if (retryQueue.remainingCapacity() > 0) {
// while writing to the same file name as before // Sleep a moment to avoid busy looping via the retry queue
if (Files.exists(newWarcFile)) { // in the case when few tasks remain and almost all are ineligible for
Files.move(newWarcFile, tempFile, StandardCopyOption.REPLACE_EXISTING); // immediate restart
} Thread.sleep(5);
else {
Files.deleteIfExists(tempFile);
}
try (var warcRecorder = new WarcRecorder(newWarcFile, fetcher); // write to a temp file for now
var retriever = new CrawlerRetreiver(fetcher, domainProber, specification, domainStateDb, warcRecorder);
CrawlDataReference reference = getReference()
)
{
// Resume the crawl if it was aborted
if (Files.exists(tempFile)) {
retriever.syncAbortedRun(tempFile);
Files.delete(tempFile);
} }
DomainLinks domainLinks = anchorTagsSource.getAnchorTags(domain); retryQueue.put(this);
return;
}
DomainLocks.DomainLock domainLock = lock.get();
int size; try (domainLock) {
try (var lock = domainLocks.lockDomain(new EdgeDomain(domain))) { Thread.currentThread().setName("crawling:" + domain);
size = retriever.crawlDomain(domainLinks, reference);
Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE);
Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP);
Path slopFile = CrawlerOutputFile.createSlopPath(outputDir, id, domain);
// Move the WARC file to a temp file if it exists, so we can resume the crawl using the old data
// while writing to the same file name as before
if (Files.exists(newWarcFile)) {
Files.move(newWarcFile, tempFile, StandardCopyOption.REPLACE_EXISTING);
}
else {
Files.deleteIfExists(tempFile);
} }
// Delete the reference crawl data if it's not the same as the new one try (var warcRecorder = new WarcRecorder(newWarcFile); // write to a temp file for now
// (mostly a case when migrating from legacy->warc) var retriever = new CrawlerRetreiver(fetcher, domainProber, specification, domainStateDb, warcRecorder);
reference.delete(); CrawlDataReference reference = getReference())
{
// Resume the crawl if it was aborted
if (Files.exists(tempFile)) {
retriever.syncAbortedRun(tempFile);
Files.delete(tempFile);
}
// Convert the WARC file to Parquet DomainLinks domainLinks = anchorTagsSource.getAnchorTags(domain);
SlopCrawlDataRecord
.convertWarc(domain, userAgent, newWarcFile, slopFile);
// Optionally archive the WARC file if full retention is enabled, int size = retriever.crawlDomain(domainLinks, reference);
// otherwise delete it:
warcArchiver.consumeWarc(newWarcFile, domain);
// Mark the domain as finished in the work log // Delete the reference crawl data if it's not the same as the new one
workLog.setJobToFinished(domain, slopFile.toString(), size); // (mostly a case when migrating from legacy->warc)
reference.delete();
// Update the progress bar // Convert the WARC file to Parquet
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks); SlopCrawlDataRecord
.convertWarc(domain, userAgent, newWarcFile, slopFile);
logger.info("Fetched {}", domain); // Optionally archive the WARC file if full retention is enabled,
} catch (Exception e) { // otherwise delete it:
logger.error("Error fetching domain " + domain, e); warcArchiver.consumeWarc(newWarcFile, domain);
}
finally {
// We don't need to double-count these; it's also kept in the workLog
pendingCrawlTasks.remove(domain);
Thread.currentThread().setName("[idle]");
Files.deleteIfExists(newWarcFile); // Mark the domain as finished in the work log
Files.deleteIfExists(tempFile); workLog.setJobToFinished(domain, slopFile.toString(), size);
// Update the progress bar
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
logger.info("Fetched {}", domain);
} catch (Exception e) {
logger.error("Error fetching domain " + domain, e);
}
finally {
// We don't need to double-count these; it's also kept in the workLog
pendingCrawlTasks.remove(domain);
Thread.currentThread().setName("[idle]");
Files.deleteIfExists(newWarcFile);
Files.deleteIfExists(tempFile);
}
} }
} }

View File

@@ -1,6 +1,6 @@
package nu.marginalia.crawl.fetcher; package nu.marginalia.crawl.fetcher;
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; import org.apache.hc.client5.http.classic.methods.HttpGet;
/** Encapsulates request modifiers; the ETag and Last-Modified tags for a resource */ /** Encapsulates request modifiers; the ETag and Last-Modified tags for a resource */
public record ContentTags(String etag, String lastMod) { public record ContentTags(String etag, String lastMod) {
@@ -17,14 +17,14 @@ public record ContentTags(String etag, String lastMod) {
} }
/** Paints the tags onto the request builder. */ /** Paints the tags onto the request builder. */
public void paint(ClassicRequestBuilder getBuilder) { public void paint(HttpGet request) {
if (etag != null) { if (etag != null) {
getBuilder.addHeader("If-None-Match", etag); request.addHeader("If-None-Match", etag);
} }
if (lastMod != null) { if (lastMod != null) {
getBuilder.addHeader("If-Modified-Since", lastMod); request.addHeader("If-Modified-Since", lastMod);
} }
} }
} }

View File

@@ -1,34 +0,0 @@
package nu.marginalia.crawl.fetcher;
import java.io.IOException;
import java.net.CookieHandler;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class Cookies extends CookieHandler {
final ThreadLocal<ConcurrentHashMap<String, List<String>>> cookieJar = ThreadLocal.withInitial(ConcurrentHashMap::new);
public void clear() {
cookieJar.get().clear();
}
public boolean hasCookies() {
return !cookieJar.get().isEmpty();
}
public List<String> getCookies() {
return cookieJar.get().values().stream().flatMap(List::stream).toList();
}
@Override
public Map<String, List<String>> get(URI uri, Map<String, List<String>> requestHeaders) throws IOException {
return cookieJar.get();
}
@Override
public void put(URI uri, Map<String, List<String>> responseHeaders) throws IOException {
cookieJar.get().putAll(responseHeaders);
}
}

View File

@@ -0,0 +1,56 @@
package nu.marginalia.crawl.fetcher;
import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import java.util.HashMap;
import java.util.Map;
import java.util.StringJoiner;
public class DomainCookies {
private final Map<String, String> cookies = new HashMap<>();
public boolean hasCookies() {
return !cookies.isEmpty();
}
public void updateCookieStore(HttpResponse response) {
for (var header : response.getHeaders()) {
if (header.getName().equalsIgnoreCase("Set-Cookie")) {
parseCookieHeader(header.getValue());
}
}
}
private void parseCookieHeader(String value) {
// Parse the Set-Cookie header value and extract the cookies
String[] parts = value.split(";");
String cookie = parts[0].trim();
if (cookie.contains("=")) {
String[] cookieParts = cookie.split("=");
String name = cookieParts[0].trim();
String val = cookieParts[1].trim();
cookies.put(name, val);
}
}
public void paintRequest(HttpUriRequestBase request) {
request.addHeader("Cookie", createCookieHeader());
}
public void paintRequest(ClassicHttpRequest request) {
request.addHeader("Cookie", createCookieHeader());
}
private String createCookieHeader() {
StringJoiner sj = new StringJoiner("; ");
for (var cookie : cookies.entrySet()) {
sj.add(cookie.getKey() + "=" + cookie.getValue());
}
return sj.toString();
}
}

View File

@@ -23,6 +23,7 @@ public interface HttpFetcher extends AutoCloseable {
HttpFetchResult fetchContent(EdgeUrl url, HttpFetchResult fetchContent(EdgeUrl url,
WarcRecorder recorder, WarcRecorder recorder,
DomainCookies cookies,
CrawlDelayTimer timer, CrawlDelayTimer timer,
ContentTags tags, ContentTags tags,
ProbeType probeType); ProbeType probeType);

View File

@@ -17,6 +17,7 @@ import nu.marginalia.model.crawldata.CrawlerDomainStatus;
import org.apache.hc.client5.http.ConnectionKeepAliveStrategy; import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
import org.apache.hc.client5.http.HttpRequestRetryStrategy; import org.apache.hc.client5.http.HttpRequestRetryStrategy;
import org.apache.hc.client5.http.classic.HttpClient; import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.config.ConnectionConfig; import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.cookie.BasicCookieStore; import org.apache.hc.client5.http.cookie.BasicCookieStore;
@@ -34,6 +35,7 @@ import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
import org.apache.hc.core5.http.message.MessageSupport; import org.apache.hc.core5.http.message.MessageSupport;
import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.pool.PoolStats;
import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout; import org.apache.hc.core5.util.Timeout;
import org.jsoup.Jsoup; import org.jsoup.Jsoup;
@@ -45,11 +47,13 @@ import org.slf4j.Marker;
import org.slf4j.MarkerFactory; import org.slf4j.MarkerFactory;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;
import java.io.IOException; import java.io.IOException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.time.Duration; import java.time.Duration;
import java.time.Instant;
import java.util.*; import java.util.*;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@@ -76,29 +80,48 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
} }
private final CloseableHttpClient client; private final CloseableHttpClient client;
private PoolingHttpClientConnectionManager connectionManager;
public PoolStats getPoolStats() {
return connectionManager.getTotalStats();
}
private CloseableHttpClient createClient() throws NoSuchAlgorithmException { private CloseableHttpClient createClient() throws NoSuchAlgorithmException {
final ConnectionConfig connectionConfig = ConnectionConfig.custom() final ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setSocketTimeout(10, TimeUnit.SECONDS) .setSocketTimeout(10, TimeUnit.SECONDS)
.setConnectTimeout(10, TimeUnit.SECONDS) .setConnectTimeout(30, TimeUnit.SECONDS)
.setValidateAfterInactivity(TimeValue.ofSeconds(5))
.build(); .build();
final PoolingHttpClientConnectionManager connectionManager = PoolingHttpClientConnectionManagerBuilder.create() connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
.setMaxConnPerRoute(4) .setMaxConnPerRoute(2)
.setMaxConnTotal(5000)
.setDefaultConnectionConfig(connectionConfig) .setDefaultConnectionConfig(connectionConfig)
.setTlsSocketStrategy(new DefaultClientTlsStrategy(SSLContext.getDefault())) .setTlsSocketStrategy(new DefaultClientTlsStrategy(SSLContext.getDefault()))
.build(); .build();
connectionManager.setDefaultSocketConfig(SocketConfig.custom() connectionManager.setDefaultSocketConfig(SocketConfig.custom()
.setSoLinger(TimeValue.ofSeconds(15)) .setSoLinger(TimeValue.ofSeconds(-1))
.setSoTimeout(Timeout.ofSeconds(10)) .setSoTimeout(Timeout.ofSeconds(10))
.build() .build()
); );
Thread.ofPlatform().daemon(true).start(() -> {
try {
for (;;) {
TimeUnit.SECONDS.sleep(15);
logger.info("Connection pool stats: {}", connectionManager.getTotalStats());
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
final RequestConfig defaultRequestConfig = RequestConfig.custom() final RequestConfig defaultRequestConfig = RequestConfig.custom()
.setCookieSpec(StandardCookieSpec.RELAXED) .setCookieSpec(StandardCookieSpec.RELAXED)
.setResponseTimeout(10, TimeUnit.SECONDS) .setResponseTimeout(10, TimeUnit.SECONDS)
.setConnectionRequestTimeout(8, TimeUnit.SECONDS) .setConnectionRequestTimeout(5, TimeUnit.MINUTES)
.build(); .build();
return HttpClients.custom() return HttpClients.custom()
@@ -286,6 +309,7 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
* recorded in the WARC file on failure. * recorded in the WARC file on failure.
*/ */
public ContentTypeProbeResult probeContentType(EdgeUrl url, public ContentTypeProbeResult probeContentType(EdgeUrl url,
DomainCookies cookies,
CrawlDelayTimer timer, CrawlDelayTimer timer,
ContentTags tags) { ContentTags tags) {
if (!tags.isEmpty() || !contentTypeLogic.isUrlLikeBinary(url)) { if (!tags.isEmpty() || !contentTypeLogic.isUrlLikeBinary(url)) {
@@ -298,9 +322,11 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
.addHeader("Accept-Encoding", "gzip") .addHeader("Accept-Encoding", "gzip")
.build(); .build();
var result = SendLock.wrapSend(client, head, (rsp) -> { cookies.paintRequest(head);
EntityUtils.consume(rsp.getEntity());
return SendLock.wrapSend(client, head, (rsp) -> {
cookies.updateCookieStore(rsp);
EntityUtils.consume(rsp.getEntity());
int statusCode = rsp.getCode(); int statusCode = rsp.getCode();
// Handle redirects // Handle redirects
@@ -338,8 +364,6 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
return new ContentTypeProbeResult.BadContentType(contentType, statusCode); return new ContentTypeProbeResult.BadContentType(contentType, statusCode);
} }
}); });
return result;
} }
catch (SocketTimeoutException ex) { catch (SocketTimeoutException ex) {
@@ -361,6 +385,7 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
@Override @Override
public HttpFetchResult fetchContent(EdgeUrl url, public HttpFetchResult fetchContent(EdgeUrl url,
WarcRecorder warcRecorder, WarcRecorder warcRecorder,
DomainCookies cookies,
CrawlDelayTimer timer, CrawlDelayTimer timer,
ContentTags contentTags, ContentTags contentTags,
ProbeType probeType) ProbeType probeType)
@@ -368,26 +393,32 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
try { try {
if (probeType == HttpFetcher.ProbeType.FULL) { if (probeType == HttpFetcher.ProbeType.FULL) {
try { try {
var probeResult = probeContentType(url, timer, contentTags); var probeResult = probeContentType(url, cookies, timer, contentTags);
logger.info(crawlerAuditMarker, "Probe result {} for {}", probeResult.getClass().getSimpleName(), url);
switch (probeResult) { switch (probeResult) {
case HttpFetcher.ContentTypeProbeResult.NoOp(): case HttpFetcher.ContentTypeProbeResult.NoOp():
break; // break; //
case HttpFetcher.ContentTypeProbeResult.Ok(EdgeUrl resolvedUrl): case HttpFetcher.ContentTypeProbeResult.Ok(EdgeUrl resolvedUrl):
logger.info(crawlerAuditMarker, "Probe result OK for {}", url);
url = resolvedUrl; // If we were redirected while probing, use the final URL for fetching url = resolvedUrl; // If we were redirected while probing, use the final URL for fetching
break; break;
case ContentTypeProbeResult.BadContentType badContentType: case ContentTypeProbeResult.BadContentType badContentType:
warcRecorder.flagAsFailedContentTypeProbe(url, badContentType.contentType(), badContentType.statusCode()); warcRecorder.flagAsFailedContentTypeProbe(url, badContentType.contentType(), badContentType.statusCode());
logger.info(crawlerAuditMarker, "Probe result Bad ContenType ({}) for {}", badContentType.contentType(), url);
return new HttpFetchResult.ResultNone(); return new HttpFetchResult.ResultNone();
case ContentTypeProbeResult.BadContentType.Timeout(Exception ex): case ContentTypeProbeResult.BadContentType.Timeout(Exception ex):
logger.info(crawlerAuditMarker, "Probe result Timeout for {}", url);
warcRecorder.flagAsTimeout(url); warcRecorder.flagAsTimeout(url);
return new HttpFetchResult.ResultException(ex); return new HttpFetchResult.ResultException(ex);
case ContentTypeProbeResult.Exception(Exception ex): case ContentTypeProbeResult.Exception(Exception ex):
logger.info(crawlerAuditMarker, "Probe result Exception({}) for {}", ex.getClass().getSimpleName(), url);
warcRecorder.flagAsError(url, ex); warcRecorder.flagAsError(url, ex);
return new HttpFetchResult.ResultException(ex); return new HttpFetchResult.ResultException(ex);
case ContentTypeProbeResult.HttpError httpError: case ContentTypeProbeResult.HttpError httpError:
logger.info(crawlerAuditMarker, "Probe result HTTP Error ({}) for {}", httpError.statusCode(), url);
return new HttpFetchResult.ResultException(new HttpException("HTTP status code " + httpError.statusCode() + ": " + httpError.message())); return new HttpFetchResult.ResultException(new HttpException("HTTP status code " + httpError.statusCode() + ": " + httpError.message()));
case ContentTypeProbeResult.Redirect redirect: case ContentTypeProbeResult.Redirect redirect:
logger.info(crawlerAuditMarker, "Probe result redirect for {} -> {}", url, redirect.location());
return new HttpFetchResult.ResultRedirect(redirect.location()); return new HttpFetchResult.ResultRedirect(redirect.location());
} }
} catch (Exception ex) { } catch (Exception ex) {
@@ -397,36 +428,41 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
} }
ClassicRequestBuilder getBuilder = ClassicRequestBuilder.get(url.asURI()) HttpGet request = new HttpGet(url.asURI());
.addHeader("User-Agent", userAgentString) request.addHeader("User-Agent", userAgentString);
.addHeader("Accept-Encoding", "gzip") request.addHeader("Accept-Encoding", "gzip");
.addHeader("Accept-Language", "en,*;q=0.5") request.addHeader("Accept-Language", "en,*;q=0.5");
.addHeader("Accept", "text/html, application/xhtml+xml, text/*;q=0.8"); request.addHeader("Accept", "text/html, application/xhtml+xml, text/*;q=0.8");
contentTags.paint(getBuilder); contentTags.paint(request);
try (var sl = new SendLock()) { try (var sl = new SendLock()) {
HttpFetchResult result = warcRecorder.fetch(client, getBuilder.build()); Instant start = Instant.now();
HttpFetchResult result = warcRecorder.fetch(client, cookies, request);
Duration fetchDuration = Duration.between(start, Instant.now());
if (result instanceof HttpFetchResult.ResultOk ok) { if (result instanceof HttpFetchResult.ResultOk ok) {
if (ok.statusCode() == 304) { if (ok.statusCode() == 304) {
return new HttpFetchResult.Result304Raw(); result = new HttpFetchResult.Result304Raw();
} }
} }
switch (result) { switch (result) {
case HttpFetchResult.ResultOk ok -> logger.info(crawlerAuditMarker, "Fetch result OK {} for {}", ok.statusCode(), url); case HttpFetchResult.ResultOk ok -> logger.info(crawlerAuditMarker, "Fetch result OK {} for {} ({} ms)", ok.statusCode(), url, fetchDuration.toMillis());
case HttpFetchResult.ResultRedirect redirect -> logger.info(crawlerAuditMarker, "Fetch result redirect: {} for {}", redirect.url(), url); case HttpFetchResult.ResultRedirect redirect -> logger.info(crawlerAuditMarker, "Fetch result redirect: {} for {}", redirect.url(), url);
case HttpFetchResult.ResultNone none -> logger.info(crawlerAuditMarker, "Fetch result none for {}", url); case HttpFetchResult.ResultNone none -> logger.info(crawlerAuditMarker, "Fetch result none for {}", url);
case HttpFetchResult.ResultException ex -> logger.error(crawlerAuditMarker, "Fetch result exception: {} for {}", ex.getClass().getSimpleName(), url); case HttpFetchResult.ResultException ex -> logger.error(crawlerAuditMarker, "Fetch result exception for {}", url, ex.ex());
case HttpFetchResult.Result304Raw raw -> logger.info(crawlerAuditMarker, "Fetch result: 304 Raw for {}", url); case HttpFetchResult.Result304Raw raw -> logger.info(crawlerAuditMarker, "Fetch result: 304 Raw for {}", url);
case HttpFetchResult.Result304ReplacedWithReference ref -> logger.info(crawlerAuditMarker, "Fetch result: 304 With reference for {}", url); case HttpFetchResult.Result304ReplacedWithReference ref -> logger.info(crawlerAuditMarker, "Fetch result: 304 With reference for {}", url);
} }
return result; return result;
} }
} }
catch (Exception ex) { catch (Exception ex) {
ex.printStackTrace(); logger.error(crawlerAuditMarker, "Fetch result exception for {}", url, ex);
return new HttpFetchResult.ResultException(ex); return new HttpFetchResult.ResultException(ex);
} }
@@ -493,56 +529,61 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
} }
private SitemapResult fetchSingleSitemap(EdgeUrl sitemapUrl) throws URISyntaxException, IOException, InterruptedException { private SitemapResult fetchSingleSitemap(EdgeUrl sitemapUrl) throws URISyntaxException {
ClassicHttpRequest getRequest = ClassicRequestBuilder.get(sitemapUrl.asURI()) HttpGet getRequest = new HttpGet(sitemapUrl.asURI());
.addHeader("User-Agent", userAgentString)
.addHeader("Accept-Encoding", "gzip") getRequest.addHeader("User-Agent", userAgentString);
.addHeader("Accept", "text/*, */*;q=0.9") getRequest.addHeader("Accept-Encoding", "gzip");
.addHeader("User-Agent", userAgentString) getRequest.addHeader("Accept", "text/*, */*;q=0.9");
.build(); getRequest.addHeader("User-Agent", userAgentString);
try (var sl = new SendLock()) { try (var sl = new SendLock()) {
return client.execute(getRequest, response -> { return client.execute(getRequest, response -> {
if (response.getCode() != 200) { try {
return new SitemapResult.SitemapError(); if (response.getCode() != 200) {
return new SitemapResult.SitemapError();
}
Document parsedSitemap = Jsoup.parse(
EntityUtils.toString(response.getEntity()),
sitemapUrl.toString(),
Parser.xmlParser()
);
if (parsedSitemap.childrenSize() == 0) {
return new SitemapResult.SitemapError();
}
String rootTagName = parsedSitemap.child(0).tagName();
return switch (rootTagName.toLowerCase()) {
case "sitemapindex" -> {
List<String> references = new ArrayList<>();
for (var locTag : parsedSitemap.getElementsByTag("loc")) {
references.add(locTag.text().trim());
}
yield new SitemapResult.SitemapReferences(Collections.unmodifiableList(references));
}
case "urlset" -> {
List<String> urls = new ArrayList<>();
for (var locTag : parsedSitemap.select("url > loc")) {
urls.add(locTag.text().trim());
}
yield new SitemapResult.SitemapUrls(Collections.unmodifiableList(urls));
}
case "rss", "atom" -> {
List<String> urls = new ArrayList<>();
for (var locTag : parsedSitemap.select("link, url")) {
urls.add(locTag.text().trim());
}
yield new SitemapResult.SitemapUrls(Collections.unmodifiableList(urls));
}
default -> new SitemapResult.SitemapError();
};
} }
finally {
Document parsedSitemap = Jsoup.parse( EntityUtils.consume(response.getEntity());
EntityUtils.toString(response.getEntity()),
sitemapUrl.toString(),
Parser.xmlParser()
);
if (parsedSitemap.childrenSize() == 0) {
return new SitemapResult.SitemapError();
} }
String rootTagName = parsedSitemap.child(0).tagName();
return switch (rootTagName.toLowerCase()) {
case "sitemapindex" -> {
List<String> references = new ArrayList<>();
for (var locTag : parsedSitemap.getElementsByTag("loc")) {
references.add(locTag.text().trim());
}
yield new SitemapResult.SitemapReferences(Collections.unmodifiableList(references));
}
case "urlset" -> {
List<String> urls = new ArrayList<>();
for (var locTag : parsedSitemap.select("url > loc")) {
urls.add(locTag.text().trim());
}
yield new SitemapResult.SitemapUrls(Collections.unmodifiableList(urls));
}
case "rss", "atom" -> {
List<String> urls = new ArrayList<>();
for (var locTag : parsedSitemap.select("link, url")) {
urls.add(locTag.text().trim());
}
yield new SitemapResult.SitemapUrls(Collections.unmodifiableList(urls));
}
default -> new SitemapResult.SitemapError();
};
}); });
} }
catch (Exception ex) { catch (Exception ex) {
@@ -573,13 +614,12 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
private Optional<SimpleRobotRules> fetchAndParseRobotsTxt(EdgeUrl url, WarcRecorder recorder) { private Optional<SimpleRobotRules> fetchAndParseRobotsTxt(EdgeUrl url, WarcRecorder recorder) {
try (var sl = new SendLock()) { try (var sl = new SendLock()) {
ClassicHttpRequest request = ClassicRequestBuilder.get(url.asURI()) HttpGet request = new HttpGet(url.asURI());
.addHeader("User-Agent", userAgentString) request.addHeader("User-Agent", userAgentString);
.addHeader("Accept-Encoding", "gzip") request.addHeader("Accept-Encoding", "gzip");
.addHeader("Accept", "text/*, */*;q=0.9") request.addHeader("Accept", "text/*, */*;q=0.9");
.build();
HttpFetchResult result = recorder.fetch(client, request); HttpFetchResult result = recorder.fetch(client, new DomainCookies(), request);
return DocumentBodyExtractor.asBytes(result).mapOpt((contentType, body) -> return DocumentBodyExtractor.asBytes(result).mapOpt((contentType, body) ->
robotsParser.parseContent(url.toString(), robotsParser.parseContent(url.toString(),
@@ -595,18 +635,21 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
@Override @Override
public boolean retryRequest(HttpRequest request, IOException exception, int executionCount, HttpContext context) { public boolean retryRequest(HttpRequest request, IOException exception, int executionCount, HttpContext context) {
if (exception instanceof SocketTimeoutException ex) { if (exception instanceof SocketTimeoutException) { // Timeouts are not recoverable
return false;
}
if (exception instanceof SSLException) { // SSL exceptions are unlikely to be recoverable
return false; return false;
} }
return executionCount < 3; return executionCount <= 3;
} }
@Override @Override
public boolean retryRequest(HttpResponse response, int executionCount, HttpContext context) { public boolean retryRequest(HttpResponse response, int executionCount, HttpContext context) {
return switch (response.getCode()) { return switch (response.getCode()) {
case 500, 503 -> executionCount < 2; case 500, 503 -> executionCount <= 2;
case 429 -> executionCount < 3; case 429 -> executionCount <= 3;
default -> false; default -> false;
}; };
} }

View File

@@ -2,6 +2,7 @@ package nu.marginalia.crawl.fetcher.warc;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.BOMInputStream; import org.apache.commons.io.input.BOMInputStream;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.core5.http.ClassicHttpResponse; import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.Header;
import org.netpreserve.jwarc.WarcTruncationReason; import org.netpreserve.jwarc.WarcTruncationReason;
@@ -43,7 +44,9 @@ public abstract class WarcInputBuffer implements AutoCloseable {
* and suppressed from the headers. * and suppressed from the headers.
* If an error occurs, a buffer will be created with no content and an error status. * If an error occurs, a buffer will be created with no content and an error status.
*/ */
static WarcInputBuffer forResponse(ClassicHttpResponse response, Duration timeLimit) throws IOException { static WarcInputBuffer forResponse(ClassicHttpResponse response,
HttpGet request,
Duration timeLimit) throws IOException {
if (response == null) if (response == null)
return new ErrorBuffer(); return new ErrorBuffer();
@@ -54,16 +57,29 @@ public abstract class WarcInputBuffer implements AutoCloseable {
return new ErrorBuffer(); return new ErrorBuffer();
} }
InputStream is = entity.getContent(); InputStream is = null;
long length = entity.getContentLength(); try {
is = entity.getContent();
long length = entity.getContentLength();
try (response) {
if (length > 0 && length < 8192) { if (length > 0 && length < 8192) {
// If the content is small and not compressed, we can just read it into memory // If the content is small and not compressed, we can just read it into memory
return new MemoryBuffer(response.getHeaders(), timeLimit, is, (int) length); return new MemoryBuffer(response.getHeaders(), request, timeLimit, is, (int) length);
} else { } else {
// Otherwise, we unpack it into a file and read it from there // Otherwise, we unpack it into a file and read it from there
return new FileBuffer(response.getHeaders(), timeLimit, is); return new FileBuffer(response.getHeaders(), request, timeLimit, is);
}
}
finally {
try {
is.skip(Long.MAX_VALUE);
}
catch (IOException e) {
// Ignore the exception
}
finally {
// Close the input stream
IOUtils.closeQuietly(is);
} }
} }
@@ -71,7 +87,7 @@ public abstract class WarcInputBuffer implements AutoCloseable {
} }
/** Copy an input stream to an output stream, with a maximum size and time limit */ /** Copy an input stream to an output stream, with a maximum size and time limit */
protected void copy(InputStream is, OutputStream os, Duration timeLimit) { protected void copy(InputStream is, HttpGet request, OutputStream os, Duration timeLimit) {
Instant start = Instant.now(); Instant start = Instant.now();
Instant timeout = start.plus(timeLimit); Instant timeout = start.plus(timeLimit);
long size = 0; long size = 0;
@@ -86,6 +102,11 @@ public abstract class WarcInputBuffer implements AutoCloseable {
Duration remaining = Duration.between(Instant.now(), timeout); Duration remaining = Duration.between(Instant.now(), timeout);
if (remaining.isNegative()) { if (remaining.isNegative()) {
truncationReason = WarcTruncationReason.TIME; truncationReason = WarcTruncationReason.TIME;
// Abort the request if the time limit is exceeded
// so we don't keep the connection open forever or are forced to consume
// the stream to the end
request.abort();
break; break;
} }
@@ -104,6 +125,7 @@ public abstract class WarcInputBuffer implements AutoCloseable {
} }
else if (truncationReason != WarcTruncationReason.LENGTH) { else if (truncationReason != WarcTruncationReason.LENGTH) {
truncationReason = WarcTruncationReason.LENGTH; truncationReason = WarcTruncationReason.LENGTH;
break;
} }
} catch (IOException e) { } catch (IOException e) {
@@ -111,13 +133,6 @@ public abstract class WarcInputBuffer implements AutoCloseable {
} }
} }
// Try to close the connection as long as we haven't timed out.
// As per Apache HttpClient's semantics, this will reset the connection
// and close the stream if we have timed out.
if (truncationReason != WarcTruncationReason.TIME) {
IOUtils.closeQuietly(is);
}
} }
/** Takes a Content-Range header and checks if it is complete. /** Takes a Content-Range header and checks if it is complete.
@@ -218,7 +233,7 @@ class ErrorBuffer extends WarcInputBuffer {
/** Buffer for when we have the response in memory */ /** Buffer for when we have the response in memory */
class MemoryBuffer extends WarcInputBuffer { class MemoryBuffer extends WarcInputBuffer {
byte[] data; byte[] data;
public MemoryBuffer(Header[] headers, Duration timeLimit, InputStream responseStream, int size) { public MemoryBuffer(Header[] headers, HttpGet request, Duration timeLimit, InputStream responseStream, int size) {
super(suppressContentEncoding(headers)); super(suppressContentEncoding(headers));
if (!isRangeComplete(headers)) { if (!isRangeComplete(headers)) {
@@ -229,7 +244,7 @@ class MemoryBuffer extends WarcInputBuffer {
var outputStream = new ByteArrayOutputStream(size); var outputStream = new ByteArrayOutputStream(size);
copy(responseStream, outputStream, timeLimit); copy(responseStream, request, outputStream, timeLimit);
data = outputStream.toByteArray(); data = outputStream.toByteArray();
} }
@@ -253,7 +268,7 @@ class MemoryBuffer extends WarcInputBuffer {
class FileBuffer extends WarcInputBuffer { class FileBuffer extends WarcInputBuffer {
private final Path tempFile; private final Path tempFile;
public FileBuffer(Header[] headers, Duration timeLimit, InputStream responseStream) throws IOException { public FileBuffer(Header[] headers, HttpGet request, Duration timeLimit, InputStream responseStream) throws IOException {
super(suppressContentEncoding(headers)); super(suppressContentEncoding(headers));
if (!isRangeComplete(headers)) { if (!isRangeComplete(headers)) {
@@ -265,7 +280,7 @@ class FileBuffer extends WarcInputBuffer {
this.tempFile = Files.createTempFile("rsp", ".html"); this.tempFile = Files.createTempFile("rsp", ".html");
try (var out = Files.newOutputStream(tempFile)) { try (var out = Files.newOutputStream(tempFile)) {
copy(responseStream, out, timeLimit); copy(responseStream, request, out, timeLimit);
} }
catch (Exception ex) { catch (Exception ex) {
truncationReason = WarcTruncationReason.UNSPECIFIED; truncationReason = WarcTruncationReason.UNSPECIFIED;

View File

@@ -1,6 +1,7 @@
package nu.marginalia.crawl.fetcher.warc; package nu.marginalia.crawl.fetcher.warc;
import nu.marginalia.crawl.fetcher.ContentTags; import nu.marginalia.crawl.fetcher.ContentTags;
import nu.marginalia.crawl.fetcher.DomainCookies;
import nu.marginalia.crawl.fetcher.HttpFetcher; import nu.marginalia.crawl.fetcher.HttpFetcher;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.link_parser.LinkParser; import nu.marginalia.link_parser.LinkParser;
@@ -8,9 +9,7 @@ import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.body.HttpFetchResult; import nu.marginalia.model.body.HttpFetchResult;
import org.apache.hc.client5.http.classic.HttpClient; import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.cookie.BasicCookieStore; import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.cookie.CookieStore;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.NameValuePair; import org.apache.hc.core5.http.NameValuePair;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.netpreserve.jwarc.*; import org.netpreserve.jwarc.*;
@@ -53,23 +52,15 @@ public class WarcRecorder implements AutoCloseable {
// Affix a version string in case we need to change the format in the future // Affix a version string in case we need to change the format in the future
// in some way // in some way
private final String warcRecorderVersion = "1.0"; private final String warcRecorderVersion = "1.0";
private final CookieStore cookies;
private final LinkParser linkParser = new LinkParser(); private final LinkParser linkParser = new LinkParser();
/** /**
* Create a new WarcRecorder that will write to the given file * Create a new WarcRecorder that will write to the given file
* *
* @param warcFile The file to write to * @param warcFile The file to write to
*/ */
public WarcRecorder(Path warcFile, HttpFetcherImpl fetcher) throws IOException { public WarcRecorder(Path warcFile) throws IOException {
this.warcFile = warcFile; this.warcFile = warcFile;
this.writer = new WarcWriter(warcFile); this.writer = new WarcWriter(warcFile);
this.cookies = fetcher.getCookies();
}
public WarcRecorder(Path warcFile, CookieStore cookies) throws IOException {
this.warcFile = warcFile;
this.writer = new WarcWriter(warcFile);
this.cookies = cookies;
} }
/** /**
@@ -79,24 +70,21 @@ public class WarcRecorder implements AutoCloseable {
public WarcRecorder() throws IOException { public WarcRecorder() throws IOException {
this.warcFile = Files.createTempFile("warc", ".warc.gz"); this.warcFile = Files.createTempFile("warc", ".warc.gz");
this.writer = new WarcWriter(this.warcFile); this.writer = new WarcWriter(this.warcFile);
this.cookies = new BasicCookieStore();
temporaryFile = true; temporaryFile = true;
} }
private boolean hasCookies() {
return !cookies.getCookies().isEmpty();
}
public HttpFetchResult fetch(HttpClient client, public HttpFetchResult fetch(HttpClient client,
ClassicHttpRequest request) DomainCookies cookies,
HttpGet request)
throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException
{ {
return fetch(client, request, Duration.ofMillis(MAX_TIME)); return fetch(client, cookies, request, Duration.ofMillis(MAX_TIME));
} }
public HttpFetchResult fetch(HttpClient client, public HttpFetchResult fetch(HttpClient client,
ClassicHttpRequest request, DomainCookies cookies,
HttpGet request,
Duration timeout) Duration timeout)
throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException
{ {
@@ -113,13 +101,15 @@ public class WarcRecorder implements AutoCloseable {
// Inject a range header to attempt to limit the size of the response // Inject a range header to attempt to limit the size of the response
// to the maximum size we want to store, if the server supports it. // to the maximum size we want to store, if the server supports it.
request.addHeader("Range", "bytes=0-"+MAX_SIZE); request.addHeader("Range", "bytes=0-"+MAX_SIZE);
cookies.paintRequest(request);
try { try {
return client.execute(request, response -> { return client.execute(request,response -> {
try (WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response, timeout); try (WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response, request, timeout);
InputStream inputStream = inputBuffer.read()) { InputStream inputStream = inputBuffer.read()) {
cookies.updateCookieStore(response);
// Build and write the request // Build and write the request
WarcDigestBuilder requestDigestBuilder = new WarcDigestBuilder(); WarcDigestBuilder requestDigestBuilder = new WarcDigestBuilder();
@@ -143,8 +133,9 @@ public class WarcRecorder implements AutoCloseable {
warcRequest.http(); // force HTTP header to be parsed before body is consumed so that caller can use it warcRequest.http(); // force HTTP header to be parsed before body is consumed so that caller can use it
writer.write(warcRequest); writer.write(warcRequest);
if (hasCookies()) {
extraHeaders.put("X-Has-Cookies", List.of("1")); if (cookies.hasCookies()) {
response.addHeader("X-Has-Cookies", 1);
} }
byte[] responseHeaders = WarcProtocolReconstructor.getResponseHeader(response, inputBuffer.size()).getBytes(StandardCharsets.UTF_8); byte[] responseHeaders = WarcProtocolReconstructor.getResponseHeader(response, inputBuffer.size()).getBytes(StandardCharsets.UTF_8);
@@ -259,7 +250,7 @@ public class WarcRecorder implements AutoCloseable {
writer.write(item); writer.write(item);
} }
private void saveOldResponse(EdgeUrl url, String contentType, int statusCode, byte[] documentBody, @Nullable String headers, ContentTags contentTags) { private void saveOldResponse(EdgeUrl url, DomainCookies domainCookies, String contentType, int statusCode, byte[] documentBody, @Nullable String headers, ContentTags contentTags) {
try { try {
WarcDigestBuilder responseDigestBuilder = new WarcDigestBuilder(); WarcDigestBuilder responseDigestBuilder = new WarcDigestBuilder();
WarcDigestBuilder payloadDigestBuilder = new WarcDigestBuilder(); WarcDigestBuilder payloadDigestBuilder = new WarcDigestBuilder();
@@ -320,7 +311,7 @@ public class WarcRecorder implements AutoCloseable {
.date(Instant.now()) .date(Instant.now())
.body(MediaType.HTTP_RESPONSE, responseDataBuffer.copyBytes()); .body(MediaType.HTTP_RESPONSE, responseDataBuffer.copyBytes());
if (hasCookies()) { if (domainCookies.hasCookies() || (headers != null && headers.contains("Set-Cookie:"))) {
builder.addHeader("X-Has-Cookies", "1"); builder.addHeader("X-Has-Cookies", "1");
} }
@@ -340,8 +331,8 @@ public class WarcRecorder implements AutoCloseable {
* an E-Tag or Last-Modified header, and the server responds with a 304 Not Modified. In this * an E-Tag or Last-Modified header, and the server responds with a 304 Not Modified. In this
* scenario we want to record the data as it was in the previous crawl, but not re-fetch it. * scenario we want to record the data as it was in the previous crawl, but not re-fetch it.
*/ */
public void writeReferenceCopy(EdgeUrl url, String contentType, int statusCode, byte[] documentBody, @Nullable String headers, ContentTags ctags) { public void writeReferenceCopy(EdgeUrl url, DomainCookies cookies, String contentType, int statusCode, byte[] documentBody, @Nullable String headers, ContentTags ctags) {
saveOldResponse(url, contentType, statusCode, documentBody, headers, ctags); saveOldResponse(url, cookies, contentType, statusCode, documentBody, headers, ctags);
} }
public void writeWarcinfoHeader(String ip, EdgeDomain domain, HttpFetcherImpl.DomainProbeResult result) throws IOException { public void writeWarcinfoHeader(String ip, EdgeDomain domain, HttpFetcherImpl.DomainProbeResult result) throws IOException {

View File

@@ -3,6 +3,7 @@ package nu.marginalia.crawl.logic;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
@@ -19,8 +20,22 @@ public class DomainLocks {
* and may be held by another thread. The caller is responsible for locking and releasing the lock. * and may be held by another thread. The caller is responsible for locking and releasing the lock.
*/ */
public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException { public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException {
return new DomainLock(domain.toString(), var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits));
sem.acquire();
return new DomainLock(sem);
}
public Optional<DomainLock> tryLockDomain(EdgeDomain domain) {
var sem = locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
if (sem.tryAcquire(1)) {
return Optional.of(new DomainLock(sem));
}
else {
// We don't have a lock, so we return an empty optional
return Optional.empty();
}
} }
private Semaphore defaultPermits(String topDomain) { private Semaphore defaultPermits(String topDomain) {
@@ -44,7 +59,11 @@ public class DomainLocks {
return new Semaphore(2); return new Semaphore(2);
} }
public boolean canLock(EdgeDomain domain) { /** Returns true if the domain is lockable, i.e. if it is not already locked by another thread.
* (this is just a hint, and does not guarantee that the domain is actually lockable any time
* after this method returns true)
*/
public boolean isLockableHint(EdgeDomain domain) {
Semaphore sem = locks.get(domain.topDomain.toLowerCase()); Semaphore sem = locks.get(domain.topDomain.toLowerCase());
if (null == sem) if (null == sem)
return true; return true;
@@ -53,22 +72,16 @@ public class DomainLocks {
} }
public static class DomainLock implements AutoCloseable { public static class DomainLock implements AutoCloseable {
private final String domainName;
private final Semaphore semaphore; private final Semaphore semaphore;
DomainLock(String domainName, Semaphore semaphore) throws InterruptedException { DomainLock(Semaphore semaphore) {
this.domainName = domainName;
this.semaphore = semaphore; this.semaphore = semaphore;
Thread.currentThread().setName("crawling:" + domainName + " [await domain lock]");
semaphore.acquire();
Thread.currentThread().setName("crawling:" + domainName);
} }
@Override @Override
public void close() throws Exception { public void close() throws Exception {
semaphore.release(); semaphore.release();
Thread.currentThread().setName("crawling:" + domainName + " [wrapping up]"); Thread.currentThread().setName("[idle]");
} }
} }
} }

View File

@@ -6,6 +6,7 @@ import nu.marginalia.contenttype.ContentType;
import nu.marginalia.crawl.CrawlerMain; import nu.marginalia.crawl.CrawlerMain;
import nu.marginalia.crawl.DomainStateDb; import nu.marginalia.crawl.DomainStateDb;
import nu.marginalia.crawl.fetcher.ContentTags; import nu.marginalia.crawl.fetcher.ContentTags;
import nu.marginalia.crawl.fetcher.DomainCookies;
import nu.marginalia.crawl.fetcher.HttpFetcher; import nu.marginalia.crawl.fetcher.HttpFetcher;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.logic.LinkFilterSelector; import nu.marginalia.crawl.logic.LinkFilterSelector;
@@ -51,9 +52,10 @@ public class CrawlerRetreiver implements AutoCloseable {
private final DomainStateDb domainStateDb; private final DomainStateDb domainStateDb;
private final WarcRecorder warcRecorder; private final WarcRecorder warcRecorder;
private final CrawlerRevisitor crawlerRevisitor; private final CrawlerRevisitor crawlerRevisitor;
private final DomainCookies cookies = new DomainCookies();
private static final CrawlerConnectionThrottle connectionThrottle = new CrawlerConnectionThrottle( private static final CrawlerConnectionThrottle connectionThrottle = new CrawlerConnectionThrottle(
Duration.ofSeconds(1) // pace the connections to avoid network congestion by waiting 1 second between establishing them Duration.ofSeconds(1) // pace the connections to avoid network congestion at startup
); );
int errorCount = 0; int errorCount = 0;
@@ -124,7 +126,7 @@ public class CrawlerRetreiver implements AutoCloseable {
} }
Instant recrawlStart = Instant.now(); Instant recrawlStart = Instant.now();
CrawlerRevisitor.RecrawlMetadata recrawlMetadata = crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer); CrawlerRevisitor.RecrawlMetadata recrawlMetadata = crawlerRevisitor.recrawl(oldCrawlData, cookies, robotsRules, delayTimer);
Duration recrawlTime = Duration.between(recrawlStart, Instant.now()); Duration recrawlTime = Duration.between(recrawlStart, Instant.now());
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified // Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
@@ -274,7 +276,7 @@ public class CrawlerRetreiver implements AutoCloseable {
try { try {
var url = rootUrl.withPathAndParam("/", null); var url = rootUrl.withPathAndParam("/", null);
HttpFetchResult result = fetcher.fetchContent(url, warcRecorder, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED); HttpFetchResult result = fetcher.fetchContent(url, warcRecorder, cookies, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED);
timer.waitFetchDelay(0); timer.waitFetchDelay(0);
if (result instanceof HttpFetchResult.ResultRedirect(EdgeUrl location)) { if (result instanceof HttpFetchResult.ResultRedirect(EdgeUrl location)) {
@@ -337,7 +339,7 @@ public class CrawlerRetreiver implements AutoCloseable {
// Grab the favicon if it exists // Grab the favicon if it exists
if (fetcher.fetchContent(faviconUrl, warcRecorder, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED) instanceof HttpFetchResult.ResultOk iconResult) { if (fetcher.fetchContent(faviconUrl, warcRecorder, cookies, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED) instanceof HttpFetchResult.ResultOk iconResult) {
String contentType = iconResult.header("Content-Type"); String contentType = iconResult.header("Content-Type");
byte[] iconData = iconResult.getBodyBytes(); byte[] iconData = iconResult.getBodyBytes();
@@ -407,7 +409,7 @@ public class CrawlerRetreiver implements AutoCloseable {
if (parsedOpt.isEmpty()) if (parsedOpt.isEmpty())
return false; return false;
HttpFetchResult result = fetcher.fetchContent(parsedOpt.get(), warcRecorder, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED); HttpFetchResult result = fetcher.fetchContent(parsedOpt.get(), warcRecorder, cookies, timer, ContentTags.empty(), HttpFetcher.ProbeType.DISABLED);
timer.waitFetchDelay(0); timer.waitFetchDelay(0);
if (!(result instanceof HttpFetchResult.ResultOk ok)) { if (!(result instanceof HttpFetchResult.ResultOk ok)) {
@@ -435,7 +437,7 @@ public class CrawlerRetreiver implements AutoCloseable {
{ {
var contentTags = reference.getContentTags(); var contentTags = reference.getContentTags();
HttpFetchResult fetchedDoc = fetcher.fetchContent(top, warcRecorder, timer, contentTags, HttpFetcher.ProbeType.FULL); HttpFetchResult fetchedDoc = fetcher.fetchContent(top, warcRecorder, cookies, timer, contentTags, HttpFetcher.ProbeType.FULL);
timer.waitFetchDelay(); timer.waitFetchDelay();
if (Thread.interrupted()) { if (Thread.interrupted()) {
@@ -461,7 +463,7 @@ public class CrawlerRetreiver implements AutoCloseable {
{ {
var doc = reference.doc(); var doc = reference.doc();
warcRecorder.writeReferenceCopy(top, doc.contentType, doc.httpStatus, doc.documentBodyBytes, doc.headers, contentTags); warcRecorder.writeReferenceCopy(top, cookies, doc.contentType, doc.httpStatus, doc.documentBodyBytes, doc.headers, contentTags);
fetchedDoc = new HttpFetchResult.Result304ReplacedWithReference(doc.url, fetchedDoc = new HttpFetchResult.Result304ReplacedWithReference(doc.url,
new ContentType(doc.contentType, "UTF-8"), new ContentType(doc.contentType, "UTF-8"),

View File

@@ -2,6 +2,7 @@ package nu.marginalia.crawl.retreival.revisit;
import crawlercommons.robots.SimpleRobotRules; import crawlercommons.robots.SimpleRobotRules;
import nu.marginalia.crawl.fetcher.ContentTags; import nu.marginalia.crawl.fetcher.ContentTags;
import nu.marginalia.crawl.fetcher.DomainCookies;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.retreival.CrawlDataReference; import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.CrawlDelayTimer; import nu.marginalia.crawl.retreival.CrawlDelayTimer;
@@ -37,6 +38,7 @@ public class CrawlerRevisitor {
/** Performs a re-crawl of old documents, comparing etags and last-modified */ /** Performs a re-crawl of old documents, comparing etags and last-modified */
public RecrawlMetadata recrawl(CrawlDataReference oldCrawlData, public RecrawlMetadata recrawl(CrawlDataReference oldCrawlData,
DomainCookies cookies,
SimpleRobotRules robotsRules, SimpleRobotRules robotsRules,
CrawlDelayTimer delayTimer) CrawlDelayTimer delayTimer)
throws InterruptedException { throws InterruptedException {
@@ -132,6 +134,7 @@ public class CrawlerRevisitor {
} }
// Add a WARC record so we don't repeat this // Add a WARC record so we don't repeat this
warcRecorder.writeReferenceCopy(url, warcRecorder.writeReferenceCopy(url,
cookies,
doc.contentType, doc.contentType,
doc.httpStatus, doc.httpStatus,
doc.documentBodyBytes, doc.documentBodyBytes,

View File

@@ -11,6 +11,8 @@ import org.junit.jupiter.api.*;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("slow") @Tag("slow")
class HttpFetcherImplContentTypeProbeTest { class HttpFetcherImplContentTypeProbeTest {
@@ -85,55 +87,59 @@ class HttpFetcherImplContentTypeProbeTest {
@AfterEach @AfterEach
public void tearDown() throws IOException { public void tearDown() throws IOException {
var stats = fetcher.getPoolStats();
assertEquals(0, stats.getLeased());
assertEquals(0, stats.getPending());
fetcher.close(); fetcher.close();
} }
@Test @Test
public void testProbeContentTypeHtmlShortcircuitPath() throws URISyntaxException { public void testProbeContentTypeHtmlShortcircuitPath() throws URISyntaxException {
var result = fetcher.probeContentType(new EdgeUrl("https://localhost/test.html"), new CrawlDelayTimer(50), ContentTags.empty()); var result = fetcher.probeContentType(new EdgeUrl("https://localhost/test.html"), new DomainCookies(), new CrawlDelayTimer(50), ContentTags.empty());
Assertions.assertInstanceOf(HttpFetcher.ContentTypeProbeResult.Ok.class, result); Assertions.assertInstanceOf(HttpFetcher.ContentTypeProbeResult.NoOp.class, result);
} }
@Test @Test
public void testProbeContentTypeHtmlShortcircuitTags() { public void testProbeContentTypeHtmlShortcircuitTags() {
var result = fetcher.probeContentType(contentTypeBinaryUrl, new CrawlDelayTimer(50), new ContentTags("a", "b")); var result = fetcher.probeContentType(contentTypeBinaryUrl, new DomainCookies(), new CrawlDelayTimer(50), new ContentTags("a", "b"));
Assertions.assertInstanceOf(HttpFetcher.ContentTypeProbeResult.Ok.class, result); Assertions.assertInstanceOf(HttpFetcher.ContentTypeProbeResult.NoOp.class, result);
} }
@Test @Test
public void testProbeContentTypeHtml() { public void testProbeContentTypeHtml() {
var result = fetcher.probeContentType(contentTypeHtmlUrl, new CrawlDelayTimer(50), ContentTags.empty()); var result = fetcher.probeContentType(contentTypeHtmlUrl, new DomainCookies(), new CrawlDelayTimer(50), ContentTags.empty());
Assertions.assertEquals(new HttpFetcher.ContentTypeProbeResult.Ok(contentTypeHtmlUrl), result); Assertions.assertEquals(new HttpFetcher.ContentTypeProbeResult.Ok(contentTypeHtmlUrl), result);
} }
@Test @Test
public void testProbeContentTypeBinary() { public void testProbeContentTypeBinary() {
var result = fetcher.probeContentType(contentTypeBinaryUrl, new CrawlDelayTimer(50), ContentTags.empty()); var result = fetcher.probeContentType(contentTypeBinaryUrl, new DomainCookies(), new CrawlDelayTimer(50), ContentTags.empty());
Assertions.assertEquals(new HttpFetcher.ContentTypeProbeResult.BadContentType("application/octet-stream", 200), result); Assertions.assertEquals(new HttpFetcher.ContentTypeProbeResult.BadContentType("application/octet-stream", 200), result);
} }
@Test @Test
public void testProbeContentTypeRedirect() { public void testProbeContentTypeRedirect() {
var result = fetcher.probeContentType(redirectUrl, new CrawlDelayTimer(50), ContentTags.empty()); var result = fetcher.probeContentType(redirectUrl, new DomainCookies(), new CrawlDelayTimer(50), ContentTags.empty());
Assertions.assertEquals(new HttpFetcher.ContentTypeProbeResult.Redirect(contentTypeHtmlUrl), result); Assertions.assertEquals(new HttpFetcher.ContentTypeProbeResult.Redirect(contentTypeHtmlUrl), result);
} }
@Test @Test
public void testProbeContentTypeBadHttpStatus() { public void testProbeContentTypeBadHttpStatus() {
var result = fetcher.probeContentType(badHttpStatusUrl, new CrawlDelayTimer(50), ContentTags.empty()); var result = fetcher.probeContentType(badHttpStatusUrl, new DomainCookies(), new CrawlDelayTimer(50), ContentTags.empty());
Assertions.assertEquals(new HttpFetcher.ContentTypeProbeResult.HttpError(500, "Bad status code"), result); Assertions.assertEquals(new HttpFetcher.ContentTypeProbeResult.HttpError(500, "Bad status code"), result);
} }
@Test @Test
public void testOnlyGetAllowed() { public void testOnlyGetAllowed() {
var result = fetcher.probeContentType(onlyGetAllowedUrl, new CrawlDelayTimer(50), ContentTags.empty()); var result = fetcher.probeContentType(onlyGetAllowedUrl, new DomainCookies(), new CrawlDelayTimer(50), ContentTags.empty());
Assertions.assertEquals(new HttpFetcher.ContentTypeProbeResult.Ok(onlyGetAllowedUrl), result); Assertions.assertEquals(new HttpFetcher.ContentTypeProbeResult.Ok(onlyGetAllowedUrl), result);
} }
@Test @Test
public void testTimeout() { public void testTimeout() {
var result = fetcher.probeContentType(timeoutUrl, new CrawlDelayTimer(50), ContentTags.empty()); var result = fetcher.probeContentType(timeoutUrl, new DomainCookies(), new CrawlDelayTimer(50), ContentTags.empty());
Assertions.assertInstanceOf(HttpFetcher.ContentTypeProbeResult.Timeout.class, result); Assertions.assertInstanceOf(HttpFetcher.ContentTypeProbeResult.Timeout.class, result);
} }

View File

@@ -12,6 +12,8 @@ import org.junit.jupiter.api.*;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("slow") @Tag("slow")
class HttpFetcherImplDomainProbeTest { class HttpFetcherImplDomainProbeTest {
@@ -47,6 +49,10 @@ class HttpFetcherImplDomainProbeTest {
@AfterEach @AfterEach
public void tearDown() throws IOException { public void tearDown() throws IOException {
var stats = fetcher.getPoolStats();
assertEquals(0, stats.getLeased());
assertEquals(0, stats.getPending());
fetcher.close(); fetcher.close();
} }

View File

@@ -31,6 +31,7 @@ class HttpFetcherImplFetchTest {
private static String lastModified = "Wed, 21 Oct 2024 07:28:00 GMT"; private static String lastModified = "Wed, 21 Oct 2024 07:28:00 GMT";
private static EdgeUrl okUrl; private static EdgeUrl okUrl;
private static EdgeUrl okUrlSetsCookie;
private static EdgeUrl okRangeResponseUrl; private static EdgeUrl okRangeResponseUrl;
private static EdgeUrl okUrlWith304; private static EdgeUrl okUrlWith304;
@@ -88,6 +89,19 @@ class HttpFetcherImplFetchTest {
.withStatus(200) .withStatus(200)
.withBody("Hello World"))); .withBody("Hello World")));
okUrlSetsCookie = new EdgeUrl("http://localhost:18089/okSetCookie.bin");
wireMockServer.stubFor(WireMock.head(WireMock.urlEqualTo(okUrlSetsCookie.path))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "text/html")
.withHeader("Set-Cookie", "test=1")
.withStatus(200)));
wireMockServer.stubFor(WireMock.get(WireMock.urlEqualTo(okUrlSetsCookie.path))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "text/html")
.withHeader("Set-Cookie", "test=1")
.withStatus(200)
.withBody("Hello World")));
okUrlWith304 = new EdgeUrl("http://localhost:18089/ok304.bin"); okUrlWith304 = new EdgeUrl("http://localhost:18089/ok304.bin");
wireMockServer.stubFor(WireMock.head(WireMock.urlEqualTo(okUrlWith304.path)) wireMockServer.stubFor(WireMock.head(WireMock.urlEqualTo(okUrlWith304.path))
.willReturn(WireMock.aResponse() .willReturn(WireMock.aResponse()
@@ -117,6 +131,8 @@ class HttpFetcherImplFetchTest {
.withHeader("Keep-Alive", "max=4, timeout=30") .withHeader("Keep-Alive", "max=4, timeout=30")
.withBody("Hello") .withBody("Hello")
)); ));
wireMockServer.start(); wireMockServer.start();
} }
@@ -134,20 +150,31 @@ class HttpFetcherImplFetchTest {
public void setUp() throws IOException { public void setUp() throws IOException {
fetcher = new HttpFetcherImpl(new UserAgent("test.marginalia.nu", "test.marginalia.nu")); fetcher = new HttpFetcherImpl(new UserAgent("test.marginalia.nu", "test.marginalia.nu"));
warcFile = Files.createTempFile(getClass().getSimpleName(), ".warc"); warcFile = Files.createTempFile(getClass().getSimpleName(), ".warc");
warcRecorder = new WarcRecorder(warcFile, fetcher); warcRecorder = new WarcRecorder(warcFile);
} }
@AfterEach @AfterEach
public void tearDown() throws IOException { public void tearDown() throws IOException {
var stats = fetcher.getPoolStats();
assertEquals(0, stats.getLeased());
assertEquals(0, stats.getPending());
System.out.println(stats);
fetcher.close(); fetcher.close();
warcRecorder.close(); warcRecorder.close();
Files.deleteIfExists(warcFile); Files.deleteIfExists(warcFile);
} }
@Test
public void testFoo() {
fetcher.fetchSitemapUrls("https://www.marginalia.nu/sitemap.xml", new CrawlDelayTimer(100));
}
@Test @Test
public void testOk_NoProbe() throws IOException { public void testOk_NoProbe() throws IOException {
var result = fetcher.fetchContent(okUrl, warcRecorder, new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.DISABLED); var result = fetcher.fetchContent(okUrl, warcRecorder, new DomainCookies(), new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.DISABLED);
Assertions.assertInstanceOf(HttpFetchResult.ResultOk.class, result); Assertions.assertInstanceOf(HttpFetchResult.ResultOk.class, result);
Assertions.assertTrue(result.isOk()); Assertions.assertTrue(result.isOk());
@@ -158,12 +185,29 @@ class HttpFetcherImplFetchTest {
Assertions.assertInstanceOf(WarcResponse.class, warcRecords.get(1)); Assertions.assertInstanceOf(WarcResponse.class, warcRecords.get(1));
WarcResponse response = (WarcResponse) warcRecords.get(1); WarcResponse response = (WarcResponse) warcRecords.get(1);
assertEquals("0", response.headers().first("X-Has-Cookies").orElse("0")); assertEquals("0", response.http().headers().first("X-Has-Cookies").orElse("0"));
}
@Test
public void testOkSetsCookie() throws IOException {
var cookies = new DomainCookies();
var result = fetcher.fetchContent(okUrlSetsCookie, warcRecorder, cookies, new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.DISABLED);
Assertions.assertInstanceOf(HttpFetchResult.ResultOk.class, result);
Assertions.assertTrue(result.isOk());
List<WarcRecord> warcRecords = getWarcRecords();
assertEquals(2, warcRecords.size());
Assertions.assertInstanceOf(WarcRequest.class, warcRecords.get(0));
Assertions.assertInstanceOf(WarcResponse.class, warcRecords.get(1));
WarcResponse response = (WarcResponse) warcRecords.get(1);
assertEquals("1", response.http().headers().first("X-Has-Cookies").orElse("0"));
} }
@Test @Test
public void testOk_FullProbe() { public void testOk_FullProbe() {
var result = fetcher.fetchContent(okUrl, warcRecorder, new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.FULL); var result = fetcher.fetchContent(okUrl, warcRecorder, new DomainCookies(), new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.FULL);
Assertions.assertInstanceOf(HttpFetchResult.ResultOk.class, result); Assertions.assertInstanceOf(HttpFetchResult.ResultOk.class, result);
Assertions.assertTrue(result.isOk()); Assertions.assertTrue(result.isOk());
@@ -171,7 +215,7 @@ class HttpFetcherImplFetchTest {
@Test @Test
public void testOk304_NoProbe() { public void testOk304_NoProbe() {
var result = fetcher.fetchContent(okUrlWith304, warcRecorder, new CrawlDelayTimer(1000), new ContentTags(etag, lastModified), HttpFetcher.ProbeType.DISABLED); var result = fetcher.fetchContent(okUrlWith304, warcRecorder, new DomainCookies(), new CrawlDelayTimer(1000), new ContentTags(etag, lastModified), HttpFetcher.ProbeType.DISABLED);
Assertions.assertInstanceOf(HttpFetchResult.Result304Raw.class, result); Assertions.assertInstanceOf(HttpFetchResult.Result304Raw.class, result);
System.out.println(result); System.out.println(result);
@@ -180,7 +224,7 @@ class HttpFetcherImplFetchTest {
@Test @Test
public void testOk304_FullProbe() { public void testOk304_FullProbe() {
var result = fetcher.fetchContent(okUrlWith304, warcRecorder, new CrawlDelayTimer(1000), new ContentTags(etag, lastModified), HttpFetcher.ProbeType.FULL); var result = fetcher.fetchContent(okUrlWith304, warcRecorder, new DomainCookies(), new CrawlDelayTimer(1000), new ContentTags(etag, lastModified), HttpFetcher.ProbeType.FULL);
Assertions.assertInstanceOf(HttpFetchResult.Result304Raw.class, result); Assertions.assertInstanceOf(HttpFetchResult.Result304Raw.class, result);
System.out.println(result); System.out.println(result);
@@ -188,7 +232,7 @@ class HttpFetcherImplFetchTest {
@Test @Test
public void testBadStatus_NoProbe() throws IOException { public void testBadStatus_NoProbe() throws IOException {
var result = fetcher.fetchContent(badHttpStatusUrl, warcRecorder, new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.DISABLED); var result = fetcher.fetchContent(badHttpStatusUrl, warcRecorder, new DomainCookies(), new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.DISABLED);
Assertions.assertInstanceOf(HttpFetchResult.ResultOk.class, result); Assertions.assertInstanceOf(HttpFetchResult.ResultOk.class, result);
Assertions.assertFalse(result.isOk()); Assertions.assertFalse(result.isOk());
@@ -202,7 +246,7 @@ class HttpFetcherImplFetchTest {
@Test @Test
public void testBadStatus_FullProbe() { public void testBadStatus_FullProbe() {
var result = fetcher.fetchContent(badHttpStatusUrl, warcRecorder, new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.FULL); var result = fetcher.fetchContent(badHttpStatusUrl, warcRecorder, new DomainCookies(), new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.FULL);
Assertions.assertInstanceOf(HttpFetchResult.ResultOk.class, result); Assertions.assertInstanceOf(HttpFetchResult.ResultOk.class, result);
Assertions.assertFalse(result.isOk()); Assertions.assertFalse(result.isOk());
@@ -212,7 +256,7 @@ class HttpFetcherImplFetchTest {
@Test @Test
public void testRedirect_NoProbe() throws URISyntaxException, IOException { public void testRedirect_NoProbe() throws URISyntaxException, IOException {
var result = fetcher.fetchContent(redirectUrl, warcRecorder, new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.DISABLED); var result = fetcher.fetchContent(redirectUrl, warcRecorder, new DomainCookies(), new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.DISABLED);
Assertions.assertInstanceOf(HttpFetchResult.ResultRedirect.class, result); Assertions.assertInstanceOf(HttpFetchResult.ResultRedirect.class, result);
assertEquals(new EdgeUrl("http://localhost:18089/test.html.bin"), ((HttpFetchResult.ResultRedirect) result).url()); assertEquals(new EdgeUrl("http://localhost:18089/test.html.bin"), ((HttpFetchResult.ResultRedirect) result).url());
@@ -225,7 +269,7 @@ class HttpFetcherImplFetchTest {
@Test @Test
public void testRedirect_FullProbe() throws URISyntaxException { public void testRedirect_FullProbe() throws URISyntaxException {
var result = fetcher.fetchContent(redirectUrl, warcRecorder, new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.FULL); var result = fetcher.fetchContent(redirectUrl, warcRecorder, new DomainCookies(), new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.FULL);
Assertions.assertInstanceOf(HttpFetchResult.ResultRedirect.class, result); Assertions.assertInstanceOf(HttpFetchResult.ResultRedirect.class, result);
assertEquals(new EdgeUrl("http://localhost:18089/test.html.bin"), ((HttpFetchResult.ResultRedirect) result).url()); assertEquals(new EdgeUrl("http://localhost:18089/test.html.bin"), ((HttpFetchResult.ResultRedirect) result).url());
@@ -238,7 +282,7 @@ class HttpFetcherImplFetchTest {
public void testFetchTimeout_NoProbe() throws IOException, URISyntaxException { public void testFetchTimeout_NoProbe() throws IOException, URISyntaxException {
Instant requestStart = Instant.now(); Instant requestStart = Instant.now();
var result = fetcher.fetchContent(timeoutUrl, warcRecorder, new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.DISABLED); var result = fetcher.fetchContent(timeoutUrl, warcRecorder, new DomainCookies(), new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.DISABLED);
Assertions.assertInstanceOf(HttpFetchResult.ResultException.class, result); Assertions.assertInstanceOf(HttpFetchResult.ResultException.class, result);
@@ -262,7 +306,7 @@ class HttpFetcherImplFetchTest {
@Test @Test
public void testRangeResponse() throws IOException { public void testRangeResponse() throws IOException {
var result = fetcher.fetchContent(okRangeResponseUrl, warcRecorder, new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.DISABLED); var result = fetcher.fetchContent(okRangeResponseUrl, warcRecorder, new DomainCookies(), new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.DISABLED);
Assertions.assertInstanceOf(HttpFetchResult.ResultOk.class, result); Assertions.assertInstanceOf(HttpFetchResult.ResultOk.class, result);
Assertions.assertTrue(result.isOk()); Assertions.assertTrue(result.isOk());
@@ -279,7 +323,7 @@ class HttpFetcherImplFetchTest {
@Test @Test
public void testFetchTimeout_Probe() throws IOException, URISyntaxException { public void testFetchTimeout_Probe() throws IOException, URISyntaxException {
Instant requestStart = Instant.now(); Instant requestStart = Instant.now();
var result = fetcher.fetchContent(timeoutUrl, warcRecorder, new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.FULL); var result = fetcher.fetchContent(timeoutUrl, warcRecorder, new DomainCookies(), new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.FULL);
Instant requestEnd = Instant.now(); Instant requestEnd = Instant.now();
Assertions.assertInstanceOf(HttpFetchResult.ResultException.class, result); Assertions.assertInstanceOf(HttpFetchResult.ResultException.class, result);
@@ -302,7 +346,7 @@ class HttpFetcherImplFetchTest {
@Test @Test
public void testKeepaliveUrl() { public void testKeepaliveUrl() {
// mostly for smoke testing and debugger utility // mostly for smoke testing and debugger utility
var result = fetcher.fetchContent(keepAliveUrl, warcRecorder, new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.DISABLED); var result = fetcher.fetchContent(keepAliveUrl, warcRecorder, new DomainCookies(), new CrawlDelayTimer(1000), ContentTags.empty(), HttpFetcher.ProbeType.DISABLED);
Assertions.assertInstanceOf(HttpFetchResult.ResultOk.class, result); Assertions.assertInstanceOf(HttpFetchResult.ResultOk.class, result);
Assertions.assertTrue(result.isOk()); Assertions.assertTrue(result.isOk());
@@ -319,6 +363,13 @@ class HttpFetcherImplFetchTest {
WarcXEntityRefused.register(reader); WarcXEntityRefused.register(reader);
for (var record : reader) { for (var record : reader) {
// Load the body, we need to do this before we close the reader to have access to the content.
if (record instanceof WarcRequest req) {
req.http();
} else if (record instanceof WarcResponse rsp) {
rsp.http();
}
records.add(record); records.add(record);
} }
} }

View File

@@ -1,12 +1,12 @@
package nu.marginalia.crawl.retreival; package nu.marginalia.crawl.retreival;
import nu.marginalia.crawl.fetcher.DomainCookies;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
import org.apache.hc.client5.http.classic.HttpClient; import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.cookie.BasicCookieStore; import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.impl.classic.HttpClients; import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@@ -45,7 +45,7 @@ class CrawlerWarcResynchronizerTest {
@Test @Test
void run() throws IOException, URISyntaxException { void run() throws IOException, URISyntaxException {
try (var oldRecorder = new WarcRecorder(fileName, new BasicCookieStore())) { try (var oldRecorder = new WarcRecorder(fileName)) {
fetchUrl(oldRecorder, "https://www.marginalia.nu/"); fetchUrl(oldRecorder, "https://www.marginalia.nu/");
fetchUrl(oldRecorder, "https://www.marginalia.nu/log/"); fetchUrl(oldRecorder, "https://www.marginalia.nu/log/");
fetchUrl(oldRecorder, "https://www.marginalia.nu/feed/"); fetchUrl(oldRecorder, "https://www.marginalia.nu/feed/");
@@ -55,7 +55,7 @@ class CrawlerWarcResynchronizerTest {
var crawlFrontier = new DomainCrawlFrontier(new EdgeDomain("www.marginalia.nu"), List.of(), 100); var crawlFrontier = new DomainCrawlFrontier(new EdgeDomain("www.marginalia.nu"), List.of(), 100);
try (var newRecorder = new WarcRecorder(outputFile, new BasicCookieStore())) { try (var newRecorder = new WarcRecorder(outputFile)) {
new CrawlerWarcResynchronizer(crawlFrontier, newRecorder).run(fileName); new CrawlerWarcResynchronizer(crawlFrontier, newRecorder).run(fileName);
} }
@@ -78,10 +78,10 @@ class CrawlerWarcResynchronizerTest {
} }
void fetchUrl(WarcRecorder recorder, String url) throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException { void fetchUrl(WarcRecorder recorder, String url) throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException {
var req = ClassicRequestBuilder.get(new java.net.URI(url)) HttpGet request = new HttpGet(url);
.addHeader("User-agent", "test.marginalia.nu") request.addHeader("User-agent", "test.marginalia.nu");
.addHeader("Accept-Encoding", "gzip") request.addHeader("Accept-Encoding", "gzip");
.build();
recorder.fetch(httpClient, req); recorder.fetch(httpClient, new DomainCookies(), request);
} }
} }

View File

@@ -2,6 +2,7 @@ package nu.marginalia.crawl.retreival.fetcher;
import com.sun.net.httpserver.HttpServer; import com.sun.net.httpserver.HttpServer;
import nu.marginalia.crawl.fetcher.ContentTags; import nu.marginalia.crawl.fetcher.ContentTags;
import nu.marginalia.crawl.fetcher.DomainCookies;
import nu.marginalia.crawl.fetcher.HttpFetcher; import nu.marginalia.crawl.fetcher.HttpFetcher;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.retreival.CrawlDelayTimer; import nu.marginalia.crawl.retreival.CrawlDelayTimer;
@@ -88,7 +89,7 @@ class ContentTypeProberTest {
@Test @Test
void probeContentTypeOk() throws Exception { void probeContentTypeOk() throws Exception {
HttpFetcher.ContentTypeProbeResult result = fetcher.probeContentType(htmlEndpoint, new CrawlDelayTimer(50), ContentTags.empty()); HttpFetcher.ContentTypeProbeResult result = fetcher.probeContentType(htmlEndpoint, new DomainCookies(), new CrawlDelayTimer(50), ContentTags.empty());
System.out.println(result); System.out.println(result);
@@ -97,7 +98,7 @@ class ContentTypeProberTest {
@Test @Test
void probeContentTypeRedir() throws Exception { void probeContentTypeRedir() throws Exception {
HttpFetcher.ContentTypeProbeResult result = fetcher.probeContentType(htmlRedirEndpoint, new CrawlDelayTimer(50), ContentTags.empty()); HttpFetcher.ContentTypeProbeResult result = fetcher.probeContentType(htmlRedirEndpoint, new DomainCookies(), new CrawlDelayTimer(50), ContentTags.empty());
System.out.println(result); System.out.println(result);
@@ -106,7 +107,7 @@ class ContentTypeProberTest {
@Test @Test
void probeContentTypeBad() throws Exception { void probeContentTypeBad() throws Exception {
HttpFetcher.ContentTypeProbeResult result = fetcher.probeContentType(binaryEndpoint, new CrawlDelayTimer(50), ContentTags.empty()); HttpFetcher.ContentTypeProbeResult result = fetcher.probeContentType(binaryEndpoint, new DomainCookies(), new CrawlDelayTimer(50), ContentTags.empty());
System.out.println(result); System.out.println(result);
@@ -115,7 +116,7 @@ class ContentTypeProberTest {
@Test @Test
void probeContentTypeTimeout() throws Exception { void probeContentTypeTimeout() throws Exception {
HttpFetcher.ContentTypeProbeResult result = fetcher.probeContentType(timeoutEndpoint, new CrawlDelayTimer(50), ContentTags.empty()); HttpFetcher.ContentTypeProbeResult result = fetcher.probeContentType(timeoutEndpoint, new DomainCookies(), new CrawlDelayTimer(50), ContentTags.empty());
System.out.println(result); System.out.println(result);

View File

@@ -1,11 +1,11 @@
package nu.marginalia.crawl.retreival.fetcher; package nu.marginalia.crawl.retreival.fetcher;
import com.sun.net.httpserver.HttpServer; import com.sun.net.httpserver.HttpServer;
import nu.marginalia.crawl.fetcher.DomainCookies;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import org.apache.hc.client5.http.classic.HttpClient; import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.cookie.BasicCookieStore; import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.impl.classic.HttpClients; import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import org.netpreserve.jwarc.WarcReader; import org.netpreserve.jwarc.WarcReader;
import org.netpreserve.jwarc.WarcRequest; import org.netpreserve.jwarc.WarcRequest;
@@ -51,14 +51,14 @@ class WarcRecorderFakeServerTest {
os.write("<html><body>hello</body></html>".getBytes()); os.write("<html><body>hello</body></html>".getBytes());
os.flush(); os.flush();
try { try {
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
os.write(":".getBytes()); os.write(":".getBytes());
os.flush(); os.flush();
try { try {
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@@ -89,24 +89,22 @@ class WarcRecorderFakeServerTest {
fileNameWarc = Files.createTempFile("test", ".warc"); fileNameWarc = Files.createTempFile("test", ".warc");
fileNameParquet = Files.createTempFile("test", ".parquet"); fileNameParquet = Files.createTempFile("test", ".parquet");
client = new WarcRecorder(fileNameWarc, new BasicCookieStore()); client = new WarcRecorder(fileNameWarc);
} }
@AfterEach @AfterEach
public void tearDown() throws Exception { public void tearDown() throws Exception {
client.close(); client.close();
Files.delete(fileNameWarc); Files.delete(fileNameWarc);
} }
@Test @Test
public void fetchFast() throws Exception { public void fetchFast() throws Exception {
client.fetch(httpClient, HttpGet request = new HttpGet("http://localhost:14510/fast");
ClassicRequestBuilder request.addHeader("User-agent", "test.marginalia.nu");
.get(new java.net.URI("http://localhost:14510/fast")) request.addHeader("Accept-Encoding", "gzip");
.addHeader("User-agent", "test.marginalia.nu") client.fetch(httpClient, new DomainCookies(), request);
.addHeader("Accept-Encoding", "gzip")
.build()
);
Map<String, String> sampleData = new HashMap<>(); Map<String, String> sampleData = new HashMap<>();
try (var warcReader = new WarcReader(fileNameWarc)) { try (var warcReader = new WarcReader(fileNameWarc)) {
@@ -127,11 +125,13 @@ class WarcRecorderFakeServerTest {
public void fetchSlow() throws Exception { public void fetchSlow() throws Exception {
Instant start = Instant.now(); Instant start = Instant.now();
HttpGet request = new HttpGet("http://localhost:14510/slow");
request.addHeader("User-agent", "test.marginalia.nu");
request.addHeader("Accept-Encoding", "gzip");
client.fetch(httpClient, client.fetch(httpClient,
ClassicRequestBuilder.get(new java.net.URI("http://localhost:14510/slow")) new DomainCookies(),
.addHeader("User-agent", "test.marginalia.nu") request,
.addHeader("Accept-Encoding", "gzip")
.build(),
Duration.ofSeconds(1) Duration.ofSeconds(1)
); );
Instant end = Instant.now(); Instant end = Instant.now();
@@ -149,6 +149,8 @@ class WarcRecorderFakeServerTest {
}); });
} }
System.out.println(
Files.readString(fileNameWarc));
System.out.println(sampleData); System.out.println(sampleData);
// Timeout is set to 1 second, but the server will take 5 seconds to respond, // Timeout is set to 1 second, but the server will take 5 seconds to respond,

View File

@@ -2,14 +2,14 @@ package nu.marginalia.crawl.retreival.fetcher;
import nu.marginalia.UserAgent; import nu.marginalia.UserAgent;
import nu.marginalia.crawl.fetcher.ContentTags; import nu.marginalia.crawl.fetcher.ContentTags;
import nu.marginalia.crawl.fetcher.DomainCookies;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
import nu.marginalia.parquet.crawldata.CrawledDocumentParquetRecordFileReader; import nu.marginalia.parquet.crawldata.CrawledDocumentParquetRecordFileReader;
import nu.marginalia.parquet.crawldata.CrawledDocumentParquetRecordFileWriter; import nu.marginalia.parquet.crawldata.CrawledDocumentParquetRecordFileWriter;
import org.apache.hc.client5.http.classic.HttpClient; import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.cookie.BasicCookieStore; import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.impl.classic.HttpClients; import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@@ -41,7 +41,7 @@ class WarcRecorderTest {
fileNameWarc = Files.createTempFile("test", ".warc"); fileNameWarc = Files.createTempFile("test", ".warc");
fileNameParquet = Files.createTempFile("test", ".parquet"); fileNameParquet = Files.createTempFile("test", ".parquet");
client = new WarcRecorder(fileNameWarc, new BasicCookieStore()); client = new WarcRecorder(fileNameWarc);
} }
@AfterEach @AfterEach
@@ -52,12 +52,12 @@ class WarcRecorderTest {
@Test @Test
void fetch() throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException { void fetch() throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException {
client.fetch(httpClient,
ClassicRequestBuilder.get(new java.net.URI("https://www.marginalia.nu/")) HttpGet request = new HttpGet("https://www.marginalia.nu/");
.addHeader("User-agent", "test.marginalia.nu") request.addHeader("User-agent", "test.marginalia.nu");
.addHeader("Accept-Encoding", "gzip") request.addHeader("Accept-Encoding", "gzip");
.build()
); client.fetch(httpClient, new DomainCookies(), request);
Map<String, String> sampleData = new HashMap<>(); Map<String, String> sampleData = new HashMap<>();
try (var warcReader = new WarcReader(fileNameWarc)) { try (var warcReader = new WarcReader(fileNameWarc)) {
@@ -78,8 +78,9 @@ class WarcRecorderTest {
@Test @Test
public void flagAsSkipped() throws IOException, URISyntaxException { public void flagAsSkipped() throws IOException, URISyntaxException {
try (var recorder = new WarcRecorder(fileNameWarc, new BasicCookieStore())) { try (var recorder = new WarcRecorder(fileNameWarc)) {
recorder.writeReferenceCopy(new EdgeUrl("https://www.marginalia.nu/"), recorder.writeReferenceCopy(new EdgeUrl("https://www.marginalia.nu/"),
new DomainCookies(),
"text/html", "text/html",
200, 200,
"<?doctype html><html><body>test</body></html>".getBytes(), "<?doctype html><html><body>test</body></html>".getBytes(),
@@ -102,8 +103,9 @@ class WarcRecorderTest {
@Test @Test
public void flagAsSkippedNullBody() throws IOException, URISyntaxException { public void flagAsSkippedNullBody() throws IOException, URISyntaxException {
try (var recorder = new WarcRecorder(fileNameWarc, new BasicCookieStore())) { try (var recorder = new WarcRecorder(fileNameWarc)) {
recorder.writeReferenceCopy(new EdgeUrl("https://www.marginalia.nu/"), recorder.writeReferenceCopy(new EdgeUrl("https://www.marginalia.nu/"),
new DomainCookies(),
"text/html", "text/html",
200, 200,
null, null,
@@ -114,8 +116,9 @@ class WarcRecorderTest {
@Test @Test
public void testSaveImport() throws URISyntaxException, IOException { public void testSaveImport() throws URISyntaxException, IOException {
try (var recorder = new WarcRecorder(fileNameWarc, new BasicCookieStore())) { try (var recorder = new WarcRecorder(fileNameWarc)) {
recorder.writeReferenceCopy(new EdgeUrl("https://www.marginalia.nu/"), recorder.writeReferenceCopy(new EdgeUrl("https://www.marginalia.nu/"),
new DomainCookies(),
"text/html", "text/html",
200, 200,
"<?doctype html><html><body>test</body></html>".getBytes(), "<?doctype html><html><body>test</body></html>".getBytes(),
@@ -138,23 +141,23 @@ class WarcRecorderTest {
@Test @Test
public void testConvertToParquet() throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException { public void testConvertToParquet() throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException {
client.fetch(httpClient, ClassicRequestBuilder HttpGet request1 = new HttpGet("https://www.marginalia.nu/");
.get(new java.net.URI("https://www.marginalia.nu/")) request1.addHeader("User-agent", "test.marginalia.nu");
.addHeader("User-agent", "test.marginalia.nu") request1.addHeader("Accept-Encoding", "gzip");
.addHeader("Accept-Encoding", "gzip")
.build());
client.fetch(httpClient, ClassicRequestBuilder client.fetch(httpClient, new DomainCookies(), request1);
.get(new java.net.URI("https://www.marginalia.nu/log/"))
.addHeader("User-agent", "test.marginalia.nu")
.addHeader("Accept-Encoding", "gzip")
.build());
client.fetch(httpClient, ClassicRequestBuilder HttpGet request2 = new HttpGet("https://www.marginalia.nu/log/");
.get(new java.net.URI("https://www.marginalia.nu/sanic.png")) request2.addHeader("User-agent", "test.marginalia.nu");
.addHeader("User-agent", "test.marginalia.nu") request2.addHeader("Accept-Encoding", "gzip");
.addHeader("Accept-Encoding", "gzip")
.build()); client.fetch(httpClient, new DomainCookies(), request2);
HttpGet request3 = new HttpGet("https://www.marginalia.nu/sanic.png");
request3.addHeader("User-agent", "test.marginalia.nu");
request3.addHeader("Accept-Encoding", "gzip");
client.fetch(httpClient, new DomainCookies(), request3);
CrawledDocumentParquetRecordFileWriter.convertWarc( CrawledDocumentParquetRecordFileWriter.convertWarc(
"www.marginalia.nu", "www.marginalia.nu",

View File

@@ -1,6 +1,7 @@
package nu.marginalia.crawling; package nu.marginalia.crawling;
import nu.marginalia.crawl.fetcher.ContentTags; import nu.marginalia.crawl.fetcher.ContentTags;
import nu.marginalia.crawl.fetcher.DomainCookies;
import nu.marginalia.crawl.fetcher.HttpFetcher; import nu.marginalia.crawl.fetcher.HttpFetcher;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
@@ -31,7 +32,7 @@ class HttpFetcherTest {
void fetchUTF8() throws Exception { void fetchUTF8() throws Exception {
var fetcher = new HttpFetcherImpl("nu.marginalia.edge-crawler"); var fetcher = new HttpFetcherImpl("nu.marginalia.edge-crawler");
try (var recorder = new WarcRecorder()) { try (var recorder = new WarcRecorder()) {
var result = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu"), recorder, new CrawlDelayTimer(100), ContentTags.empty(), HttpFetcher.ProbeType.FULL); var result = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu"), recorder, new DomainCookies(), new CrawlDelayTimer(100), ContentTags.empty(), HttpFetcher.ProbeType.FULL);
if (DocumentBodyExtractor.asString(result) instanceof DocumentBodyResult.Ok bodyOk) { if (DocumentBodyExtractor.asString(result) instanceof DocumentBodyResult.Ok bodyOk) {
System.out.println(bodyOk.contentType()); System.out.println(bodyOk.contentType());
} }
@@ -49,7 +50,7 @@ class HttpFetcherTest {
var fetcher = new HttpFetcherImpl("nu.marginalia.edge-crawler"); var fetcher = new HttpFetcherImpl("nu.marginalia.edge-crawler");
try (var recorder = new WarcRecorder()) { try (var recorder = new WarcRecorder()) {
var result = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu/robots.txt"), recorder, new CrawlDelayTimer(100), ContentTags.empty(), HttpFetcher.ProbeType.FULL); var result = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu/robots.txt"), recorder, new DomainCookies(), new CrawlDelayTimer(100), ContentTags.empty(), HttpFetcher.ProbeType.FULL);
if (DocumentBodyExtractor.asString(result) instanceof DocumentBodyResult.Ok bodyOk) { if (DocumentBodyExtractor.asString(result) instanceof DocumentBodyResult.Ok bodyOk) {
System.out.println(bodyOk.contentType()); System.out.println(bodyOk.contentType());
} }

View File

@@ -3,10 +3,7 @@ package nu.marginalia.crawling.retreival;
import crawlercommons.robots.SimpleRobotRules; import crawlercommons.robots.SimpleRobotRules;
import nu.marginalia.crawl.CrawlerMain; import nu.marginalia.crawl.CrawlerMain;
import nu.marginalia.crawl.DomainStateDb; import nu.marginalia.crawl.DomainStateDb;
import nu.marginalia.crawl.fetcher.ContentTags; import nu.marginalia.crawl.fetcher.*;
import nu.marginalia.crawl.fetcher.HttpFetcher;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.fetcher.SitemapRetriever;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.retreival.CrawlDelayTimer; import nu.marginalia.crawl.retreival.CrawlDelayTimer;
import nu.marginalia.crawl.retreival.CrawlerRetreiver; import nu.marginalia.crawl.retreival.CrawlerRetreiver;
@@ -137,7 +134,7 @@ public class CrawlerMockFetcherTest {
} }
@Override @Override
public HttpFetchResult fetchContent(EdgeUrl url, WarcRecorder recorder, CrawlDelayTimer timer, ContentTags tags, ProbeType probeType) { public HttpFetchResult fetchContent(EdgeUrl url, WarcRecorder recorder, DomainCookies cookies, CrawlDelayTimer timer, ContentTags tags, ProbeType probeType) {
logger.info("Fetching {}", url); logger.info("Fetching {}", url);
if (mockData.containsKey(url)) { if (mockData.containsKey(url)) {
byte[] bodyBytes = mockData.get(url).documentBodyBytes; byte[] bodyBytes = mockData.get(url).documentBodyBytes;

View File

@@ -16,7 +16,6 @@ import nu.marginalia.model.crawldata.CrawledDocument;
import nu.marginalia.model.crawldata.CrawledDomain; import nu.marginalia.model.crawldata.CrawledDomain;
import nu.marginalia.model.crawldata.SerializableCrawlData; import nu.marginalia.model.crawldata.SerializableCrawlData;
import nu.marginalia.slop.SlopCrawlDataRecord; import nu.marginalia.slop.SlopCrawlDataRecord;
import org.apache.hc.client5.http.cookie.BasicCookieStore;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import org.netpreserve.jwarc.*; import org.netpreserve.jwarc.*;
@@ -180,7 +179,7 @@ class CrawlerRetreiverTest {
new EdgeDomain("www.marginalia.nu"), new EdgeDomain("www.marginalia.nu"),
List.of(), 100); List.of(), 100);
var resync = new CrawlerWarcResynchronizer(revisitCrawlFrontier, var resync = new CrawlerWarcResynchronizer(revisitCrawlFrontier,
new WarcRecorder(tempFileWarc2, new BasicCookieStore()) new WarcRecorder(tempFileWarc2)
); );
// truncate the size of the file to simulate a crash // truncate the size of the file to simulate a crash
@@ -456,7 +455,7 @@ class CrawlerRetreiverTest {
List.of(), 100); List.of(), 100);
var resync = new CrawlerWarcResynchronizer(revisitCrawlFrontier, var resync = new CrawlerWarcResynchronizer(revisitCrawlFrontier,
new WarcRecorder(tempFileWarc3, new BasicCookieStore()) new WarcRecorder(tempFileWarc3)
); );
// truncate the size of the file to simulate a crash // truncate the size of the file to simulate a crash
@@ -507,7 +506,7 @@ class CrawlerRetreiverTest {
} }
private void doCrawlWithReferenceStream(CrawlerMain.CrawlSpecRecord specs, CrawlDataReference reference) { private void doCrawlWithReferenceStream(CrawlerMain.CrawlSpecRecord specs, CrawlDataReference reference) {
try (var recorder = new WarcRecorder(tempFileWarc2, new BasicCookieStore()); try (var recorder = new WarcRecorder(tempFileWarc2);
var db = new DomainStateDb(tempFileDb) var db = new DomainStateDb(tempFileDb)
) { ) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, db, recorder).crawlDomain(new DomainLinks(), reference); new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, db, recorder).crawlDomain(new DomainLinks(), reference);
@@ -519,7 +518,7 @@ class CrawlerRetreiverTest {
@NotNull @NotNull
private DomainCrawlFrontier doCrawl(Path tempFileWarc1, CrawlerMain.CrawlSpecRecord specs) { private DomainCrawlFrontier doCrawl(Path tempFileWarc1, CrawlerMain.CrawlSpecRecord specs) {
try (var recorder = new WarcRecorder(tempFileWarc1, new BasicCookieStore()); try (var recorder = new WarcRecorder(tempFileWarc1);
var db = new DomainStateDb(tempFileDb) var db = new DomainStateDb(tempFileDb)
) { ) {
var crawler = new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, db, recorder); var crawler = new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, db, recorder);

View File

@@ -10,6 +10,7 @@ import nu.marginalia.api.searchquery.model.results.PrototypeRankingParameters;
import nu.marginalia.converting.processor.DomainProcessor; import nu.marginalia.converting.processor.DomainProcessor;
import nu.marginalia.converting.writer.ConverterBatchWriter; import nu.marginalia.converting.writer.ConverterBatchWriter;
import nu.marginalia.crawl.fetcher.ContentTags; import nu.marginalia.crawl.fetcher.ContentTags;
import nu.marginalia.crawl.fetcher.DomainCookies;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.functions.searchquery.QueryFactory; import nu.marginalia.functions.searchquery.QueryFactory;
@@ -43,7 +44,6 @@ import nu.marginalia.process.control.FakeProcessHeartbeat;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.test.IntegrationTestModule; import nu.marginalia.test.IntegrationTestModule;
import nu.marginalia.test.TestUtil; import nu.marginalia.test.TestUtil;
import org.apache.hc.client5.http.cookie.BasicCookieStore;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@@ -121,11 +121,12 @@ public class IntegrationTest {
public void run() throws Exception { public void run() throws Exception {
/** CREATE WARC */ /** CREATE WARC */
try (WarcRecorder warcRecorder = new WarcRecorder(warcData, new BasicCookieStore())) { try (WarcRecorder warcRecorder = new WarcRecorder(warcData)) {
warcRecorder.writeWarcinfoHeader("127.0.0.1", new EdgeDomain("www.example.com"), warcRecorder.writeWarcinfoHeader("127.0.0.1", new EdgeDomain("www.example.com"),
new HttpFetcherImpl.DomainProbeResult.Ok(new EdgeUrl("https://www.example.com/"))); new HttpFetcherImpl.DomainProbeResult.Ok(new EdgeUrl("https://www.example.com/")));
warcRecorder.writeReferenceCopy(new EdgeUrl("https://www.example.com/"), warcRecorder.writeReferenceCopy(new EdgeUrl("https://www.example.com/"),
new DomainCookies(),
"text/html", 200, "text/html", 200,
""" """
<html> <html>