1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-05 21:22:39 +02:00

Compare commits

...

10 Commits

Author SHA1 Message Date
Viktor Lofgren
b6265cee11 (feeds) Add timeout code to send()
Due to the unique way java's HttpClient implements timeouts, we must always wrap it in an executor to catch the scenario that a server stops sending data mid-response, which would otherwise hang the send method forever.
2025-04-08 22:09:59 +02:00
Viktor Lofgren
c91af247e9 (rate-limit) Fix rate limiting logic
The rate limiter was misconfigured to regenerate tokens at a fixed rate of 1 per refillRate; not refillRate per minute.  Additionally increasing the default bucket size to 4x refill rate.
2025-04-05 12:26:26 +02:00
Viktor Lofgren
7a31227de1 (crawler) Filter out robots.txt-sitemaps that belong to different domains 2025-04-02 13:35:37 +02:00
Viktor Lofgren
4f477604c5 (crawler) Improve error handling in parquet->slop conversion
Parquet code throws a RuntimeException, which was not correctly caught, leading to a failure to crawl.
2025-04-02 13:16:01 +02:00
Viktor Lofgren
2970f4395b (minor) Test code cleanup 2025-04-02 13:16:01 +02:00
Viktor Lofgren
d1ec909b36 (crawler) Improve handling of timeouts to prevent crawler from getting stuck 2025-04-02 12:57:21 +02:00
Viktor Lofgren
c67c5bbf42 (crawler) Experimentally drop to HTTP 1.1 for crawler to see if this solves stuck send()s 2025-04-01 12:05:21 +02:00
Viktor Lofgren
ecb0e57a1a (crawler) Make the use of virtual threads in the crawler configurable via system properties 2025-03-27 21:26:05 +01:00
Viktor Lofgren
8c61f61b46 (crawler) Add crawling metadata to domainstate db 2025-03-27 16:38:37 +01:00
Viktor Lofgren
662a18c933 Revert "(crawler) Further rearrange crawl order"
This reverts commit 1c2426a052.

The change does not appear necessary to avoid problems.
2025-03-27 11:25:08 +01:00
11 changed files with 363 additions and 54 deletions

View File

@@ -35,21 +35,8 @@ public class RateLimiter {
}
public static RateLimiter forExpensiveRequest() {
return new RateLimiter(5, 10);
}
public static RateLimiter custom(int perMinute) {
return new RateLimiter(perMinute, 60);
}
public static RateLimiter forSpamBots() {
return new RateLimiter(120, 3600);
}
public static RateLimiter forLogin() {
return new RateLimiter(3, 15);
return new RateLimiter(4 * perMinute, perMinute);
}
private void cleanIdleBuckets() {
@@ -62,7 +49,7 @@ public class RateLimiter {
}
private Bucket createBucket() {
var refill = Refill.greedy(1, Duration.ofSeconds(refillRate));
var refill = Refill.greedy(refillRate, Duration.ofSeconds(60));
var bw = Bandwidth.classic(capacity, refill);
return Bucket.builder().addLimit(bw).build();
}

View File

@@ -33,6 +33,7 @@ import java.sql.SQLException;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -71,7 +72,7 @@ public class FeedFetcherService {
public enum UpdateMode {
CLEAN,
REFRESH
};
}
public void updateFeeds(UpdateMode updateMode) throws IOException {
if (updating) // Prevent concurrent updates
@@ -87,6 +88,7 @@ public class FeedFetcherService {
.followRedirects(HttpClient.Redirect.NORMAL)
.version(HttpClient.Version.HTTP_2)
.build();
ExecutorService fetchExecutor = Executors.newCachedThreadPool();
FeedJournal feedJournal = FeedJournal.create();
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds")
) {
@@ -131,7 +133,7 @@ public class FeedFetcherService {
FetchResult feedData;
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
feedData = fetchFeedData(feed, client, ifModifiedSinceDate, ifNoneMatchTag);
feedData = fetchFeedData(feed, client, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
} catch (Exception ex) {
feedData = new FetchResult.TransientError();
}
@@ -211,6 +213,7 @@ public class FeedFetcherService {
private FetchResult fetchFeedData(FeedDefinition feed,
HttpClient client,
ExecutorService executorService,
@Nullable String ifModifiedSinceDate,
@Nullable String ifNoneMatchTag)
{
@@ -237,7 +240,14 @@ public class FeedFetcherService {
HttpRequest getRequest = requestBuilder.build();
for (int i = 0; i < 3; i++) {
HttpResponse<byte[]> rs = client.send(getRequest, HttpResponse.BodyHandlers.ofByteArray());
/* Note we need to use an executor to time-limit the send() method in HttpClient, as
* its support for timeouts only applies to the time until response starts to be received,
* and does not catch the case when the server starts to send data but then hangs.
*/
HttpResponse<byte[]> rs = executorService.submit(
() -> client.send(getRequest, HttpResponse.BodyHandlers.ofByteArray()))
.get(15, TimeUnit.SECONDS);
if (rs.statusCode() == 429) { // Too Many Requests
int retryAfter = Integer.parseInt(rs.headers().firstValue("Retry-After").orElse("2"));

View File

@@ -103,10 +103,18 @@ public class CrawlerMain extends ProcessMainClass {
this.blacklist = blacklist;
this.node = processConfiguration.node();
SimpleBlockingThreadPool.ThreadType threadType;
if (Boolean.getBoolean("crawler.useVirtualThreads")) {
threadType = SimpleBlockingThreadPool.ThreadType.VIRTUAL;
}
else {
threadType = SimpleBlockingThreadPool.ThreadType.PLATFORM;
}
pool = new SimpleBlockingThreadPool("CrawlerPool",
Integer.getInteger("crawler.poolSize", 256),
1,
SimpleBlockingThreadPool.ThreadType.VIRTUAL);
threadType);
// Wait for the blacklist to be loaded before starting the crawl
@@ -302,8 +310,8 @@ public class CrawlerMain extends ProcessMainClass {
}
/** Create a comparator that sorts the crawl specs in a way that is beneficial for the crawl,
* we want to enqueue domains that tend ro be large and have common top domains first,
* but otherwise have a random order.
* we want to enqueue domains that have common top domains first, but otherwise have a random
* order.
* <p></p>
* Note, we can't use hash codes for randomization as it is not desirable to have the same order
* every time the process is restarted (and CrawlSpecRecord is a record, which defines equals and
@@ -311,13 +319,15 @@ public class CrawlerMain extends ProcessMainClass {
* */
private Comparator<CrawlSpecRecord> crawlSpecArrangement(List<CrawlSpecRecord> records) {
Random r = new Random();
Map<String, Integer> topDomainCounts = new HashMap<>(4 + (int) Math.sqrt(records.size()));
Map<String, Integer> randomOrder = new HashMap<>(records.size());
for (var spec : records) {
topDomainCounts.merge(EdgeDomain.getTopDomain(spec.domain), 1, Integer::sum);
randomOrder.put(spec.domain, r.nextInt());
}
return Comparator.comparing((CrawlSpecRecord spec) -> spec.domain.contains(".edu"))
return Comparator.comparing((CrawlSpecRecord spec) -> topDomainCounts.getOrDefault(EdgeDomain.getTopDomain(spec.domain), 0) >= 8)
.reversed()
.thenComparing(spec -> randomOrder.get(spec.domain))
.thenComparing(Record::hashCode); // non-deterministic tie-breaker to
@@ -491,7 +501,7 @@ public class CrawlerMain extends ProcessMainClass {
return new CrawlDataReference(slopPath);
}
} catch (IOException e) {
} catch (Exception e) {
logger.debug("Failed to read previous crawl data for {}", specification.domain());
}

View File

@@ -11,6 +11,7 @@ import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
@@ -24,6 +25,17 @@ public class DomainStateDb implements AutoCloseable {
private final Connection connection;
public record CrawlMeta(
String domainName,
Instant lastFullCrawl,
Duration recrawlTime,
Duration crawlTime,
int recrawlErrors,
int crawlChanges,
int totalCrawlSize
) {}
public record SummaryRecord(
String domainName,
Instant lastUpdated,
@@ -102,6 +114,17 @@ public class DomainStateDb implements AutoCloseable {
feedUrl TEXT
)
""");
stmt.executeUpdate("""
CREATE TABLE IF NOT EXISTS crawl_meta (
domain TEXT PRIMARY KEY,
lastFullCrawlEpochMs LONG NOT NULL,
recrawlTimeMs LONG NOT NULL,
recrawlErrors INTEGER NOT NULL,
crawlTimeMs LONG NOT NULL,
crawlChanges INTEGER NOT NULL,
totalCrawlSize INTEGER NOT NULL
)
""");
stmt.executeUpdate("""
CREATE TABLE IF NOT EXISTS favicon (
domain TEXT PRIMARY KEY,
@@ -164,6 +187,26 @@ public class DomainStateDb implements AutoCloseable {
return Optional.empty();
}
public void save(CrawlMeta crawlMeta) {
if (connection == null) throw new IllegalStateException("No connection to domainstate db");
try (var stmt = connection.prepareStatement("""
INSERT OR REPLACE INTO crawl_meta (domain, lastFullCrawlEpochMs, recrawlTimeMs, recrawlErrors, crawlTimeMs, crawlChanges, totalCrawlSize)
VALUES (?, ?, ?, ?, ?, ?, ?)
""")) {
stmt.setString(1, crawlMeta.domainName());
stmt.setLong(2, crawlMeta.lastFullCrawl.toEpochMilli());
stmt.setLong(3, crawlMeta.recrawlTime.toMillis());
stmt.setInt(4, crawlMeta.recrawlErrors);
stmt.setLong(5, crawlMeta.crawlTime.toMillis());
stmt.setInt(6, crawlMeta.crawlChanges);
stmt.setInt(7, crawlMeta.totalCrawlSize);
stmt.executeUpdate();
} catch (SQLException e) {
logger.error("Failed to insert crawl meta record", e);
}
}
public void save(SummaryRecord record) {
if (connection == null) throw new IllegalStateException("No connection to domainstate db");
@@ -182,7 +225,35 @@ public class DomainStateDb implements AutoCloseable {
}
}
public Optional<SummaryRecord> get(String domainName) {
public Optional<CrawlMeta> getMeta(String domainName) {
if (connection == null)
return Optional.empty();
try (var stmt = connection.prepareStatement("""
SELECT domain, lastFullCrawlEpochMs, recrawlTimeMs, recrawlErrors, crawlTimeMs, crawlChanges, totalCrawlSize
FROM crawl_meta
WHERE domain = ?
""")) {
stmt.setString(1, domainName);
var rs = stmt.executeQuery();
if (rs.next()) {
return Optional.of(new CrawlMeta(
rs.getString("domain"),
Instant.ofEpochMilli(rs.getLong("lastFullCrawlEpochMs")),
Duration.ofMillis(rs.getLong("recrawlTimeMs")),
Duration.ofMillis(rs.getLong("crawlTimeMs")),
rs.getInt("recrawlErrors"),
rs.getInt("crawlChanges"),
rs.getInt("totalCrawlSize")
));
}
} catch (SQLException ex) {
logger.error("Failed to get crawl meta record", ex);
}
return Optional.empty();
}
public Optional<SummaryRecord> getSummary(String domainName) {
if (connection == null)
return Optional.empty();

View File

@@ -29,6 +29,7 @@ import java.net.http.HttpResponse;
import java.net.http.HttpTimeoutException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.zip.GZIPInputStream;
@@ -56,12 +57,22 @@ public class HttpFetcherImpl implements HttpFetcher {
private final HttpClient client;
private HttpClient createClient() {
final ExecutorService executorService;
if (Boolean.getBoolean("crawler.httpclient.useVirtualThreads")) {
executorService = Executors.newVirtualThreadPerTaskExecutor();
}
else {
executorService = Executors.newCachedThreadPool();
}
return HttpClient.newBuilder()
.sslContext(NoSecuritySSL.buildSslContext())
.cookieHandler(cookies)
.followRedirects(HttpClient.Redirect.NORMAL)
.version(HttpClient.Version.HTTP_1_1)
.connectTimeout(Duration.ofSeconds(8))
.executor(Executors.newVirtualThreadPerTaskExecutor())
.executor(executorService)
.build();
}

View File

@@ -8,7 +8,10 @@ import java.net.http.HttpHeaders;
import java.net.http.HttpResponse;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.*;
import java.util.zip.GZIPInputStream;
/** Input buffer for temporary storage of a HTTP response
@@ -39,7 +42,7 @@ public abstract class WarcInputBuffer implements AutoCloseable {
* and suppressed from the headers.
* If an error occurs, a buffer will be created with no content and an error status.
*/
static WarcInputBuffer forResponse(HttpResponse<InputStream> rsp) {
static WarcInputBuffer forResponse(HttpResponse<InputStream> rsp, Duration timeLimit) {
if (rsp == null)
return new ErrorBuffer();
@@ -51,11 +54,11 @@ public abstract class WarcInputBuffer implements AutoCloseable {
if (contentEncoding == null && contentLength > 0 && contentLength < 8192) {
// If the content is small and not compressed, we can just read it into memory
return new MemoryBuffer(headers, is, contentLength);
return new MemoryBuffer(headers, timeLimit, is, contentLength);
}
else {
// Otherwise, we unpack it into a file and read it from there
return new FileBuffer(headers, is);
return new FileBuffer(headers, timeLimit, is);
}
}
catch (Exception ex) {
@@ -64,9 +67,16 @@ public abstract class WarcInputBuffer implements AutoCloseable {
}
private static final ExecutorService virtualExecutorService = Executors.newVirtualThreadPerTaskExecutor();
private Future<Integer> readAsync(InputStream is, byte[] out) {
return virtualExecutorService.submit(() -> is.read(out));
}
/** Copy an input stream to an output stream, with a maximum size and time limit */
protected void copy(InputStream is, OutputStream os) {
long startTime = System.currentTimeMillis();
protected void copy(InputStream is, OutputStream os, Duration timeLimit) {
Instant start = Instant.now();
Instant timeout = start.plus(timeLimit);
long size = 0;
byte[] buffer = new byte[8192];
@@ -76,7 +86,15 @@ public abstract class WarcInputBuffer implements AutoCloseable {
while (true) {
try {
int n = is.read(buffer);
Duration remaining = Duration.between(Instant.now(), timeout);
if (remaining.isNegative()) {
truncationReason = WarcTruncationReason.TIME;
break;
}
Future<Integer> readAsync = readAsync(is, buffer);
int n = readAsync.get(remaining.toMillis(), TimeUnit.MILLISECONDS);
if (n < 0) break;
size += n;
os.write(buffer, 0, n);
@@ -85,12 +103,11 @@ public abstract class WarcInputBuffer implements AutoCloseable {
truncationReason = WarcTruncationReason.LENGTH;
break;
}
if (System.currentTimeMillis() - startTime > WarcRecorder.MAX_TIME) {
truncationReason = WarcTruncationReason.TIME;
break;
}
} catch (IOException e) {
} catch (IOException|ExecutionException e) {
truncationReason = WarcTruncationReason.UNSPECIFIED;
} catch (TimeoutException e) {
truncationReason = WarcTruncationReason.TIME;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@@ -123,12 +140,12 @@ class ErrorBuffer extends WarcInputBuffer {
/** Buffer for when we have the response in memory */
class MemoryBuffer extends WarcInputBuffer {
byte[] data;
public MemoryBuffer(HttpHeaders headers, InputStream responseStream, int size) {
public MemoryBuffer(HttpHeaders headers, Duration timeLimit, InputStream responseStream, int size) {
super(headers);
var outputStream = new ByteArrayOutputStream(size);
copy(responseStream, outputStream);
copy(responseStream, outputStream, timeLimit);
data = outputStream.toByteArray();
}
@@ -152,7 +169,7 @@ class MemoryBuffer extends WarcInputBuffer {
class FileBuffer extends WarcInputBuffer {
private final Path tempFile;
public FileBuffer(HttpHeaders headers, InputStream responseStream) throws IOException {
public FileBuffer(HttpHeaders headers, Duration timeLimit, InputStream responseStream) throws IOException {
super(suppressContentEncoding(headers));
this.tempFile = Files.createTempFile("rsp", ".html");
@@ -160,7 +177,7 @@ class FileBuffer extends WarcInputBuffer {
if ("gzip".equalsIgnoreCase(headers.firstValue("Content-Encoding").orElse(""))) {
try (var out = Files.newOutputStream(tempFile)) {
copy(new GZIPInputStream(responseStream), out);
copy(new GZIPInputStream(responseStream), out, timeLimit);
}
catch (Exception ex) {
truncationReason = WarcTruncationReason.UNSPECIFIED;
@@ -168,7 +185,7 @@ class FileBuffer extends WarcInputBuffer {
}
else {
try (var out = Files.newOutputStream(tempFile)) {
copy(responseStream, out);
copy(responseStream, out, timeLimit);
}
catch (Exception ex) {
truncationReason = WarcTruncationReason.UNSPECIFIED;

View File

@@ -102,7 +102,7 @@ public class WarcRecorder implements AutoCloseable {
}
try (WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response);
try (WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response, request.timeout().orElseGet(() -> Duration.ofMillis(MAX_TIME)));
InputStream inputStream = inputBuffer.read())
{
if (cookies.hasCookies()) {

View File

@@ -26,6 +26,8 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -113,15 +115,19 @@ public class CrawlerRetreiver implements AutoCloseable {
throw new InterruptedException();
}
Instant recrawlStart = Instant.now();
CrawlerRevisitor.RecrawlMetadata recrawlMetadata = crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer);
Duration recrawlTime = Duration.between(recrawlStart, Instant.now());
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) {
if (recrawlMetadata.size() > 0) {
// If we have reference data, we will always grow the crawl depth a bit
crawlFrontier.increaseDepth(1.5, 2500);
}
oldCrawlData.close(); // proactively close the crawl data reference here to not hold onto expensive resources
yield crawlDomain(probedUrl, robotsRules, delayTimer, domainLinks);
yield crawlDomain(probedUrl, robotsRules, delayTimer, domainLinks, recrawlMetadata, recrawlTime);
}
case HttpFetcher.DomainProbeResult.Redirect(EdgeDomain domain1) -> {
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, "Redirect", domain1.toString()));
@@ -143,16 +149,29 @@ public class CrawlerRetreiver implements AutoCloseable {
private int crawlDomain(EdgeUrl rootUrl,
SimpleRobotRules robotsRules,
CrawlDelayTimer delayTimer,
DomainLinks domainLinks) {
DomainLinks domainLinks,
CrawlerRevisitor.RecrawlMetadata recrawlMetadata,
Duration recrawlTime) {
Instant crawlStart = Instant.now();
// Add external links to the crawl frontier
crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto));
// Fetch sitemaps
for (var sitemap : robotsRules.getSitemaps()) {
crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer));
// Validate the sitemap URL and check if it belongs to the domain as the root URL
if (EdgeUrl.parse(sitemap)
.map(url -> url.getDomain().equals(rootUrl.domain))
.orElse(false)) {
crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer));
}
}
int crawlerAdditions = 0;
while (!crawlFrontier.isEmpty()
&& !crawlFrontier.isCrawlDepthReached()
&& errorCount < MAX_ERRORS
@@ -184,7 +203,11 @@ public class CrawlerRetreiver implements AutoCloseable {
continue;
try {
fetchContentWithReference(top, delayTimer, DocumentWithReference.empty());
var result = fetchContentWithReference(top, delayTimer, DocumentWithReference.empty());
if (result.isOk()) {
crawlerAdditions++;
}
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
@@ -192,6 +215,17 @@ public class CrawlerRetreiver implements AutoCloseable {
}
}
Duration crawlTime = Duration.between(crawlStart, Instant.now());
domainStateDb.save(new DomainStateDb.CrawlMeta(
domain,
Instant.now(),
recrawlTime,
crawlTime,
recrawlMetadata.errors(),
crawlerAdditions,
recrawlMetadata.size() + crawlerAdditions
));
return crawlFrontier.visitedSize();
}
@@ -324,7 +358,7 @@ public class CrawlerRetreiver implements AutoCloseable {
);
private Optional<String> guessFeedUrl(CrawlDelayTimer timer) throws InterruptedException {
var oldDomainStateRecord = domainStateDb.get(domain);
var oldDomainStateRecord = domainStateDb.getSummary(domain);
// If we are already aware of an old feed URL, then we can just revalidate it
if (oldDomainStateRecord.isPresent()) {

View File

@@ -31,7 +31,7 @@ public class CrawlerRevisitor {
}
/** Performs a re-crawl of old documents, comparing etags and last-modified */
public int recrawl(CrawlDataReference oldCrawlData,
public RecrawlMetadata recrawl(CrawlDataReference oldCrawlData,
SimpleRobotRules robotsRules,
CrawlDelayTimer delayTimer)
throws InterruptedException {
@@ -39,6 +39,7 @@ public class CrawlerRevisitor {
int retained = 0;
int errors = 0;
int skipped = 0;
int size = 0;
for (CrawledDocument doc : oldCrawlData) {
if (errors > 20) {
@@ -82,6 +83,7 @@ public class CrawlerRevisitor {
continue;
}
size++;
double skipProb;
@@ -154,6 +156,8 @@ public class CrawlerRevisitor {
}
}
return recrawled;
return new RecrawlMetadata(size, errors, skipped);
}
public record RecrawlMetadata(int size, int errors, int skipped) {}
}

View File

@@ -8,6 +8,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import static org.junit.jupiter.api.Assertions.*;
@@ -47,8 +48,8 @@ class DomainStateDbTest {
db.save(allFields);
db.save(minFields);
assertEquals(allFields, db.get("all.marginalia.nu").orElseThrow());
assertEquals(minFields, db.get("min.marginalia.nu").orElseThrow());
assertEquals(allFields, db.getSummary("all.marginalia.nu").orElseThrow());
assertEquals(minFields, db.getSummary("min.marginalia.nu").orElseThrow());
var updatedAllFields = new DomainStateDb.SummaryRecord(
"all.marginalia.nu",
@@ -59,7 +60,19 @@ class DomainStateDbTest {
);
db.save(updatedAllFields);
assertEquals(updatedAllFields, db.get("all.marginalia.nu").orElseThrow());
assertEquals(updatedAllFields, db.getSummary("all.marginalia.nu").orElseThrow());
}
}
@Test
public void testMetadata() throws SQLException {
try (var db = new DomainStateDb(tempFile)) {
var original = new DomainStateDb.CrawlMeta("example.com", Instant.ofEpochMilli(12345), Duration.ofMillis(30), Duration.ofMillis(300), 1, 2, 3);
db.save(original);
var maybeMeta = db.getMeta("example.com");
assertTrue(maybeMeta.isPresent());
assertEquals(original, maybeMeta.get());
}
}

View File

@@ -0,0 +1,152 @@
package nu.marginalia.crawl.retreival.fetcher;
import com.sun.net.httpserver.HttpServer;
import nu.marginalia.crawl.fetcher.Cookies;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import org.junit.jupiter.api.*;
import org.netpreserve.jwarc.WarcReader;
import org.netpreserve.jwarc.WarcRequest;
import org.netpreserve.jwarc.WarcResponse;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Tag("slow")
class WarcRecorderFakeServerTest {
static HttpServer server;
@BeforeAll
public static void setUpAll() throws IOException {
server = HttpServer.create(new InetSocketAddress("127.0.0.1", 14510), 10);
// This endpoint will finish sending the response immediately
server.createContext("/fast", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "text/html");
exchange.sendResponseHeaders(200, "<html><body>hello</body></html>".length());
try (var os = exchange.getResponseBody()) {
os.write("<html><body>hello</body></html>".getBytes());
os.flush();
}
exchange.close();
});
// This endpoint will take 10 seconds to finish sending the response,
// which should trigger a timeout in the client
server.createContext("/slow", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "text/html");
exchange.sendResponseHeaders(200, "<html><body>hello</body></html>:D".length());
try (var os = exchange.getResponseBody()) {
os.write("<html><body>hello</body></html>".getBytes());
os.flush();
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
os.write(":D".getBytes());
os.flush();
}
exchange.close();
});
server.start();
}
@AfterAll
public static void tearDownAll() {
server.stop(0);
}
Path fileNameWarc;
Path fileNameParquet;
WarcRecorder client;
HttpClient httpClient;
@BeforeEach
public void setUp() throws Exception {
httpClient = HttpClient.newBuilder().build();
fileNameWarc = Files.createTempFile("test", ".warc");
fileNameParquet = Files.createTempFile("test", ".parquet");
client = new WarcRecorder(fileNameWarc, new Cookies());
}
@AfterEach
public void tearDown() throws Exception {
client.close();
Files.delete(fileNameWarc);
}
@Test
public void fetchFast() throws Exception {
client.fetch(httpClient,
HttpRequest.newBuilder()
.uri(new java.net.URI("http://localhost:14510/fast"))
.timeout(Duration.ofSeconds(1))
.header("User-agent", "test.marginalia.nu")
.header("Accept-Encoding", "gzip")
.GET().build()
);
Map<String, String> sampleData = new HashMap<>();
try (var warcReader = new WarcReader(fileNameWarc)) {
warcReader.forEach(record -> {
if (record instanceof WarcRequest req) {
sampleData.put(record.type(), req.target());
}
if (record instanceof WarcResponse rsp) {
sampleData.put(record.type(), rsp.target());
}
});
}
System.out.println(sampleData);
}
@Test
public void fetchSlow() throws Exception {
Instant start = Instant.now();
client.fetch(httpClient,
HttpRequest.newBuilder()
.uri(new java.net.URI("http://localhost:14510/slow"))
.timeout(Duration.ofSeconds(1))
.header("User-agent", "test.marginalia.nu")
.header("Accept-Encoding", "gzip")
.GET().build()
);
Instant end = Instant.now();
Map<String, String> sampleData = new HashMap<>();
try (var warcReader = new WarcReader(fileNameWarc)) {
warcReader.forEach(record -> {
if (record instanceof WarcRequest req) {
sampleData.put(record.type(), req.target());
}
if (record instanceof WarcResponse rsp) {
sampleData.put(record.type(), rsp.target());
System.out.println(rsp.target());
}
});
}
System.out.println(sampleData);
// Timeout is set to 1 second, but the server will take 5 seconds to respond,
// so we expect the request to take 1s and change before it times out.
Assertions.assertTrue(Duration.between(start, end).toMillis() < 2000);
}
}