1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

12 Commits

Author SHA1 Message Date
Viktor Lofgren
c91af247e9 (rate-limit) Fix rate limiting logic
The rate limiter was misconfigured to regenerate tokens at a fixed rate of 1 per refillRate; not refillRate per minute.  Additionally increasing the default bucket size to 4x refill rate.
2025-04-05 12:26:26 +02:00
Viktor Lofgren
7a31227de1 (crawler) Filter out robots.txt-sitemaps that belong to different domains 2025-04-02 13:35:37 +02:00
Viktor Lofgren
4f477604c5 (crawler) Improve error handling in parquet->slop conversion
Parquet code throws a RuntimeException, which was not correctly caught, leading to a failure to crawl.
2025-04-02 13:16:01 +02:00
Viktor Lofgren
2970f4395b (minor) Test code cleanup 2025-04-02 13:16:01 +02:00
Viktor Lofgren
d1ec909b36 (crawler) Improve handling of timeouts to prevent crawler from getting stuck 2025-04-02 12:57:21 +02:00
Viktor Lofgren
c67c5bbf42 (crawler) Experimentally drop to HTTP 1.1 for crawler to see if this solves stuck send()s 2025-04-01 12:05:21 +02:00
Viktor Lofgren
ecb0e57a1a (crawler) Make the use of virtual threads in the crawler configurable via system properties 2025-03-27 21:26:05 +01:00
Viktor Lofgren
8c61f61b46 (crawler) Add crawling metadata to domainstate db 2025-03-27 16:38:37 +01:00
Viktor Lofgren
662a18c933 Revert "(crawler) Further rearrange crawl order"
This reverts commit 1c2426a052.

The change does not appear necessary to avoid problems.
2025-03-27 11:25:08 +01:00
Viktor Lofgren
1c2426a052 (crawler) Further rearrange crawl order
Limit crawl order preferrence to edu domains, to avoid hitting stuff like medium and wordpress with shotgun requests.
2025-03-27 11:19:20 +01:00
Viktor Lofgren
34df7441ac (crawler) Add some jitter to crawl delay to avoid accidentally synchronized requests 2025-03-27 11:15:16 +01:00
Viktor Lofgren
5387e2bd80 (crawler) Adjust crawl order to get a better mixture of domains 2025-03-27 11:12:48 +01:00
11 changed files with 351 additions and 52 deletions

View File

@@ -35,21 +35,8 @@ public class RateLimiter {
}
public static RateLimiter forExpensiveRequest() {
return new RateLimiter(5, 10);
}
public static RateLimiter custom(int perMinute) {
return new RateLimiter(perMinute, 60);
}
public static RateLimiter forSpamBots() {
return new RateLimiter(120, 3600);
}
public static RateLimiter forLogin() {
return new RateLimiter(3, 15);
return new RateLimiter(4 * perMinute, perMinute);
}
private void cleanIdleBuckets() {
@@ -62,7 +49,7 @@ public class RateLimiter {
}
private Bucket createBucket() {
var refill = Refill.greedy(1, Duration.ofSeconds(refillRate));
var refill = Refill.greedy(refillRate, Duration.ofSeconds(60));
var bw = Bandwidth.classic(capacity, refill);
return Bucket.builder().addLimit(bw).build();
}

View File

@@ -103,10 +103,18 @@ public class CrawlerMain extends ProcessMainClass {
this.blacklist = blacklist;
this.node = processConfiguration.node();
SimpleBlockingThreadPool.ThreadType threadType;
if (Boolean.getBoolean("crawler.useVirtualThreads")) {
threadType = SimpleBlockingThreadPool.ThreadType.VIRTUAL;
}
else {
threadType = SimpleBlockingThreadPool.ThreadType.PLATFORM;
}
pool = new SimpleBlockingThreadPool("CrawlerPool",
Integer.getInteger("crawler.poolSize", 256),
1,
SimpleBlockingThreadPool.ThreadType.VIRTUAL);
threadType);
// Wait for the blacklist to be loaded before starting the crawl
@@ -319,7 +327,7 @@ public class CrawlerMain extends ProcessMainClass {
randomOrder.put(spec.domain, r.nextInt());
}
return Comparator.comparing((CrawlSpecRecord spec) -> topDomainCounts.getOrDefault(EdgeDomain.getTopDomain(spec.domain), 0))
return Comparator.comparing((CrawlSpecRecord spec) -> topDomainCounts.getOrDefault(EdgeDomain.getTopDomain(spec.domain), 0) >= 8)
.reversed()
.thenComparing(spec -> randomOrder.get(spec.domain))
.thenComparing(Record::hashCode); // non-deterministic tie-breaker to
@@ -493,7 +501,7 @@ public class CrawlerMain extends ProcessMainClass {
return new CrawlDataReference(slopPath);
}
} catch (IOException e) {
} catch (Exception e) {
logger.debug("Failed to read previous crawl data for {}", specification.domain());
}

View File

@@ -11,6 +11,7 @@ import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
@@ -24,6 +25,17 @@ public class DomainStateDb implements AutoCloseable {
private final Connection connection;
public record CrawlMeta(
String domainName,
Instant lastFullCrawl,
Duration recrawlTime,
Duration crawlTime,
int recrawlErrors,
int crawlChanges,
int totalCrawlSize
) {}
public record SummaryRecord(
String domainName,
Instant lastUpdated,
@@ -102,6 +114,17 @@ public class DomainStateDb implements AutoCloseable {
feedUrl TEXT
)
""");
stmt.executeUpdate("""
CREATE TABLE IF NOT EXISTS crawl_meta (
domain TEXT PRIMARY KEY,
lastFullCrawlEpochMs LONG NOT NULL,
recrawlTimeMs LONG NOT NULL,
recrawlErrors INTEGER NOT NULL,
crawlTimeMs LONG NOT NULL,
crawlChanges INTEGER NOT NULL,
totalCrawlSize INTEGER NOT NULL
)
""");
stmt.executeUpdate("""
CREATE TABLE IF NOT EXISTS favicon (
domain TEXT PRIMARY KEY,
@@ -164,6 +187,26 @@ public class DomainStateDb implements AutoCloseable {
return Optional.empty();
}
public void save(CrawlMeta crawlMeta) {
if (connection == null) throw new IllegalStateException("No connection to domainstate db");
try (var stmt = connection.prepareStatement("""
INSERT OR REPLACE INTO crawl_meta (domain, lastFullCrawlEpochMs, recrawlTimeMs, recrawlErrors, crawlTimeMs, crawlChanges, totalCrawlSize)
VALUES (?, ?, ?, ?, ?, ?, ?)
""")) {
stmt.setString(1, crawlMeta.domainName());
stmt.setLong(2, crawlMeta.lastFullCrawl.toEpochMilli());
stmt.setLong(3, crawlMeta.recrawlTime.toMillis());
stmt.setInt(4, crawlMeta.recrawlErrors);
stmt.setLong(5, crawlMeta.crawlTime.toMillis());
stmt.setInt(6, crawlMeta.crawlChanges);
stmt.setInt(7, crawlMeta.totalCrawlSize);
stmt.executeUpdate();
} catch (SQLException e) {
logger.error("Failed to insert crawl meta record", e);
}
}
public void save(SummaryRecord record) {
if (connection == null) throw new IllegalStateException("No connection to domainstate db");
@@ -182,7 +225,35 @@ public class DomainStateDb implements AutoCloseable {
}
}
public Optional<SummaryRecord> get(String domainName) {
public Optional<CrawlMeta> getMeta(String domainName) {
if (connection == null)
return Optional.empty();
try (var stmt = connection.prepareStatement("""
SELECT domain, lastFullCrawlEpochMs, recrawlTimeMs, recrawlErrors, crawlTimeMs, crawlChanges, totalCrawlSize
FROM crawl_meta
WHERE domain = ?
""")) {
stmt.setString(1, domainName);
var rs = stmt.executeQuery();
if (rs.next()) {
return Optional.of(new CrawlMeta(
rs.getString("domain"),
Instant.ofEpochMilli(rs.getLong("lastFullCrawlEpochMs")),
Duration.ofMillis(rs.getLong("recrawlTimeMs")),
Duration.ofMillis(rs.getLong("crawlTimeMs")),
rs.getInt("recrawlErrors"),
rs.getInt("crawlChanges"),
rs.getInt("totalCrawlSize")
));
}
} catch (SQLException ex) {
logger.error("Failed to get crawl meta record", ex);
}
return Optional.empty();
}
public Optional<SummaryRecord> getSummary(String domainName) {
if (connection == null)
return Optional.empty();

View File

@@ -29,6 +29,7 @@ import java.net.http.HttpResponse;
import java.net.http.HttpTimeoutException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.zip.GZIPInputStream;
@@ -56,12 +57,22 @@ public class HttpFetcherImpl implements HttpFetcher {
private final HttpClient client;
private HttpClient createClient() {
final ExecutorService executorService;
if (Boolean.getBoolean("crawler.httpclient.useVirtualThreads")) {
executorService = Executors.newVirtualThreadPerTaskExecutor();
}
else {
executorService = Executors.newCachedThreadPool();
}
return HttpClient.newBuilder()
.sslContext(NoSecuritySSL.buildSslContext())
.cookieHandler(cookies)
.followRedirects(HttpClient.Redirect.NORMAL)
.version(HttpClient.Version.HTTP_1_1)
.connectTimeout(Duration.ofSeconds(8))
.executor(Executors.newVirtualThreadPerTaskExecutor())
.executor(executorService)
.build();
}

View File

@@ -8,7 +8,10 @@ import java.net.http.HttpHeaders;
import java.net.http.HttpResponse;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.*;
import java.util.zip.GZIPInputStream;
/** Input buffer for temporary storage of a HTTP response
@@ -39,7 +42,7 @@ public abstract class WarcInputBuffer implements AutoCloseable {
* and suppressed from the headers.
* If an error occurs, a buffer will be created with no content and an error status.
*/
static WarcInputBuffer forResponse(HttpResponse<InputStream> rsp) {
static WarcInputBuffer forResponse(HttpResponse<InputStream> rsp, Duration timeLimit) {
if (rsp == null)
return new ErrorBuffer();
@@ -51,11 +54,11 @@ public abstract class WarcInputBuffer implements AutoCloseable {
if (contentEncoding == null && contentLength > 0 && contentLength < 8192) {
// If the content is small and not compressed, we can just read it into memory
return new MemoryBuffer(headers, is, contentLength);
return new MemoryBuffer(headers, timeLimit, is, contentLength);
}
else {
// Otherwise, we unpack it into a file and read it from there
return new FileBuffer(headers, is);
return new FileBuffer(headers, timeLimit, is);
}
}
catch (Exception ex) {
@@ -64,9 +67,16 @@ public abstract class WarcInputBuffer implements AutoCloseable {
}
private static final ExecutorService virtualExecutorService = Executors.newVirtualThreadPerTaskExecutor();
private Future<Integer> readAsync(InputStream is, byte[] out) {
return virtualExecutorService.submit(() -> is.read(out));
}
/** Copy an input stream to an output stream, with a maximum size and time limit */
protected void copy(InputStream is, OutputStream os) {
long startTime = System.currentTimeMillis();
protected void copy(InputStream is, OutputStream os, Duration timeLimit) {
Instant start = Instant.now();
Instant timeout = start.plus(timeLimit);
long size = 0;
byte[] buffer = new byte[8192];
@@ -76,7 +86,15 @@ public abstract class WarcInputBuffer implements AutoCloseable {
while (true) {
try {
int n = is.read(buffer);
Duration remaining = Duration.between(Instant.now(), timeout);
if (remaining.isNegative()) {
truncationReason = WarcTruncationReason.TIME;
break;
}
Future<Integer> readAsync = readAsync(is, buffer);
int n = readAsync.get(remaining.toMillis(), TimeUnit.MILLISECONDS);
if (n < 0) break;
size += n;
os.write(buffer, 0, n);
@@ -85,12 +103,11 @@ public abstract class WarcInputBuffer implements AutoCloseable {
truncationReason = WarcTruncationReason.LENGTH;
break;
}
if (System.currentTimeMillis() - startTime > WarcRecorder.MAX_TIME) {
} catch (IOException|ExecutionException e) {
truncationReason = WarcTruncationReason.UNSPECIFIED;
} catch (TimeoutException e) {
truncationReason = WarcTruncationReason.TIME;
break;
}
} catch (IOException e) {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@@ -123,12 +140,12 @@ class ErrorBuffer extends WarcInputBuffer {
/** Buffer for when we have the response in memory */
class MemoryBuffer extends WarcInputBuffer {
byte[] data;
public MemoryBuffer(HttpHeaders headers, InputStream responseStream, int size) {
public MemoryBuffer(HttpHeaders headers, Duration timeLimit, InputStream responseStream, int size) {
super(headers);
var outputStream = new ByteArrayOutputStream(size);
copy(responseStream, outputStream);
copy(responseStream, outputStream, timeLimit);
data = outputStream.toByteArray();
}
@@ -152,7 +169,7 @@ class MemoryBuffer extends WarcInputBuffer {
class FileBuffer extends WarcInputBuffer {
private final Path tempFile;
public FileBuffer(HttpHeaders headers, InputStream responseStream) throws IOException {
public FileBuffer(HttpHeaders headers, Duration timeLimit, InputStream responseStream) throws IOException {
super(suppressContentEncoding(headers));
this.tempFile = Files.createTempFile("rsp", ".html");
@@ -160,7 +177,7 @@ class FileBuffer extends WarcInputBuffer {
if ("gzip".equalsIgnoreCase(headers.firstValue("Content-Encoding").orElse(""))) {
try (var out = Files.newOutputStream(tempFile)) {
copy(new GZIPInputStream(responseStream), out);
copy(new GZIPInputStream(responseStream), out, timeLimit);
}
catch (Exception ex) {
truncationReason = WarcTruncationReason.UNSPECIFIED;
@@ -168,7 +185,7 @@ class FileBuffer extends WarcInputBuffer {
}
else {
try (var out = Files.newOutputStream(tempFile)) {
copy(responseStream, out);
copy(responseStream, out, timeLimit);
}
catch (Exception ex) {
truncationReason = WarcTruncationReason.UNSPECIFIED;

View File

@@ -102,7 +102,7 @@ public class WarcRecorder implements AutoCloseable {
}
try (WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response);
try (WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response, request.timeout().orElseGet(() -> Duration.ofMillis(MAX_TIME)));
InputStream inputStream = inputBuffer.read())
{
if (cookies.hasCookies()) {

View File

@@ -3,6 +3,7 @@ package nu.marginalia.crawl.retreival;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
import static java.lang.Math.max;
import static java.lang.Math.min;
@@ -53,12 +54,13 @@ public class CrawlDelayTimer {
public void waitFetchDelay(long spentTime) {
long sleepTime = delayTime;
long jitter = ThreadLocalRandom.current().nextLong(0, 150);
try {
if (sleepTime >= 1) {
if (spentTime > sleepTime)
return;
Thread.sleep(min(sleepTime - spentTime, 5000));
Thread.sleep(min(sleepTime - spentTime, 5000) + jitter);
} else {
// When no crawl delay is specified, lean toward twice the fetch+process time,
// within sane limits. This means slower servers get slower crawling, and faster
@@ -71,12 +73,12 @@ public class CrawlDelayTimer {
if (spentTime > sleepTime)
return;
Thread.sleep(sleepTime - spentTime);
Thread.sleep(sleepTime - spentTime + jitter);
}
if (slowDown) {
// Additional delay when the server is signalling it wants slower requests
Thread.sleep(DEFAULT_CRAWL_DELAY_MIN_MS);
Thread.sleep(DEFAULT_CRAWL_DELAY_MIN_MS + jitter);
}
}
catch (InterruptedException e) {

View File

@@ -26,6 +26,8 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -113,15 +115,19 @@ public class CrawlerRetreiver implements AutoCloseable {
throw new InterruptedException();
}
Instant recrawlStart = Instant.now();
CrawlerRevisitor.RecrawlMetadata recrawlMetadata = crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer);
Duration recrawlTime = Duration.between(recrawlStart, Instant.now());
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) {
if (recrawlMetadata.size() > 0) {
// If we have reference data, we will always grow the crawl depth a bit
crawlFrontier.increaseDepth(1.5, 2500);
}
oldCrawlData.close(); // proactively close the crawl data reference here to not hold onto expensive resources
yield crawlDomain(probedUrl, robotsRules, delayTimer, domainLinks);
yield crawlDomain(probedUrl, robotsRules, delayTimer, domainLinks, recrawlMetadata, recrawlTime);
}
case HttpFetcher.DomainProbeResult.Redirect(EdgeDomain domain1) -> {
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, "Redirect", domain1.toString()));
@@ -143,15 +149,28 @@ public class CrawlerRetreiver implements AutoCloseable {
private int crawlDomain(EdgeUrl rootUrl,
SimpleRobotRules robotsRules,
CrawlDelayTimer delayTimer,
DomainLinks domainLinks) {
DomainLinks domainLinks,
CrawlerRevisitor.RecrawlMetadata recrawlMetadata,
Duration recrawlTime) {
Instant crawlStart = Instant.now();
// Add external links to the crawl frontier
crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto));
// Fetch sitemaps
for (var sitemap : robotsRules.getSitemaps()) {
// Validate the sitemap URL and check if it belongs to the domain as the root URL
if (EdgeUrl.parse(sitemap)
.map(url -> url.getDomain().equals(rootUrl.domain))
.orElse(false)) {
crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer));
}
}
int crawlerAdditions = 0;
while (!crawlFrontier.isEmpty()
&& !crawlFrontier.isCrawlDepthReached()
@@ -184,7 +203,11 @@ public class CrawlerRetreiver implements AutoCloseable {
continue;
try {
fetchContentWithReference(top, delayTimer, DocumentWithReference.empty());
var result = fetchContentWithReference(top, delayTimer, DocumentWithReference.empty());
if (result.isOk()) {
crawlerAdditions++;
}
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
@@ -192,6 +215,17 @@ public class CrawlerRetreiver implements AutoCloseable {
}
}
Duration crawlTime = Duration.between(crawlStart, Instant.now());
domainStateDb.save(new DomainStateDb.CrawlMeta(
domain,
Instant.now(),
recrawlTime,
crawlTime,
recrawlMetadata.errors(),
crawlerAdditions,
recrawlMetadata.size() + crawlerAdditions
));
return crawlFrontier.visitedSize();
}
@@ -324,7 +358,7 @@ public class CrawlerRetreiver implements AutoCloseable {
);
private Optional<String> guessFeedUrl(CrawlDelayTimer timer) throws InterruptedException {
var oldDomainStateRecord = domainStateDb.get(domain);
var oldDomainStateRecord = domainStateDb.getSummary(domain);
// If we are already aware of an old feed URL, then we can just revalidate it
if (oldDomainStateRecord.isPresent()) {

View File

@@ -31,7 +31,7 @@ public class CrawlerRevisitor {
}
/** Performs a re-crawl of old documents, comparing etags and last-modified */
public int recrawl(CrawlDataReference oldCrawlData,
public RecrawlMetadata recrawl(CrawlDataReference oldCrawlData,
SimpleRobotRules robotsRules,
CrawlDelayTimer delayTimer)
throws InterruptedException {
@@ -39,6 +39,7 @@ public class CrawlerRevisitor {
int retained = 0;
int errors = 0;
int skipped = 0;
int size = 0;
for (CrawledDocument doc : oldCrawlData) {
if (errors > 20) {
@@ -82,6 +83,7 @@ public class CrawlerRevisitor {
continue;
}
size++;
double skipProb;
@@ -154,6 +156,8 @@ public class CrawlerRevisitor {
}
}
return recrawled;
return new RecrawlMetadata(size, errors, skipped);
}
public record RecrawlMetadata(int size, int errors, int skipped) {}
}

View File

@@ -8,6 +8,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import static org.junit.jupiter.api.Assertions.*;
@@ -47,8 +48,8 @@ class DomainStateDbTest {
db.save(allFields);
db.save(minFields);
assertEquals(allFields, db.get("all.marginalia.nu").orElseThrow());
assertEquals(minFields, db.get("min.marginalia.nu").orElseThrow());
assertEquals(allFields, db.getSummary("all.marginalia.nu").orElseThrow());
assertEquals(minFields, db.getSummary("min.marginalia.nu").orElseThrow());
var updatedAllFields = new DomainStateDb.SummaryRecord(
"all.marginalia.nu",
@@ -59,7 +60,19 @@ class DomainStateDbTest {
);
db.save(updatedAllFields);
assertEquals(updatedAllFields, db.get("all.marginalia.nu").orElseThrow());
assertEquals(updatedAllFields, db.getSummary("all.marginalia.nu").orElseThrow());
}
}
@Test
public void testMetadata() throws SQLException {
try (var db = new DomainStateDb(tempFile)) {
var original = new DomainStateDb.CrawlMeta("example.com", Instant.ofEpochMilli(12345), Duration.ofMillis(30), Duration.ofMillis(300), 1, 2, 3);
db.save(original);
var maybeMeta = db.getMeta("example.com");
assertTrue(maybeMeta.isPresent());
assertEquals(original, maybeMeta.get());
}
}

View File

@@ -0,0 +1,152 @@
package nu.marginalia.crawl.retreival.fetcher;
import com.sun.net.httpserver.HttpServer;
import nu.marginalia.crawl.fetcher.Cookies;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import org.junit.jupiter.api.*;
import org.netpreserve.jwarc.WarcReader;
import org.netpreserve.jwarc.WarcRequest;
import org.netpreserve.jwarc.WarcResponse;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Tag("slow")
class WarcRecorderFakeServerTest {
static HttpServer server;
@BeforeAll
public static void setUpAll() throws IOException {
server = HttpServer.create(new InetSocketAddress("127.0.0.1", 14510), 10);
// This endpoint will finish sending the response immediately
server.createContext("/fast", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "text/html");
exchange.sendResponseHeaders(200, "<html><body>hello</body></html>".length());
try (var os = exchange.getResponseBody()) {
os.write("<html><body>hello</body></html>".getBytes());
os.flush();
}
exchange.close();
});
// This endpoint will take 10 seconds to finish sending the response,
// which should trigger a timeout in the client
server.createContext("/slow", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "text/html");
exchange.sendResponseHeaders(200, "<html><body>hello</body></html>:D".length());
try (var os = exchange.getResponseBody()) {
os.write("<html><body>hello</body></html>".getBytes());
os.flush();
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
os.write(":D".getBytes());
os.flush();
}
exchange.close();
});
server.start();
}
@AfterAll
public static void tearDownAll() {
server.stop(0);
}
Path fileNameWarc;
Path fileNameParquet;
WarcRecorder client;
HttpClient httpClient;
@BeforeEach
public void setUp() throws Exception {
httpClient = HttpClient.newBuilder().build();
fileNameWarc = Files.createTempFile("test", ".warc");
fileNameParquet = Files.createTempFile("test", ".parquet");
client = new WarcRecorder(fileNameWarc, new Cookies());
}
@AfterEach
public void tearDown() throws Exception {
client.close();
Files.delete(fileNameWarc);
}
@Test
public void fetchFast() throws Exception {
client.fetch(httpClient,
HttpRequest.newBuilder()
.uri(new java.net.URI("http://localhost:14510/fast"))
.timeout(Duration.ofSeconds(1))
.header("User-agent", "test.marginalia.nu")
.header("Accept-Encoding", "gzip")
.GET().build()
);
Map<String, String> sampleData = new HashMap<>();
try (var warcReader = new WarcReader(fileNameWarc)) {
warcReader.forEach(record -> {
if (record instanceof WarcRequest req) {
sampleData.put(record.type(), req.target());
}
if (record instanceof WarcResponse rsp) {
sampleData.put(record.type(), rsp.target());
}
});
}
System.out.println(sampleData);
}
@Test
public void fetchSlow() throws Exception {
Instant start = Instant.now();
client.fetch(httpClient,
HttpRequest.newBuilder()
.uri(new java.net.URI("http://localhost:14510/slow"))
.timeout(Duration.ofSeconds(1))
.header("User-agent", "test.marginalia.nu")
.header("Accept-Encoding", "gzip")
.GET().build()
);
Instant end = Instant.now();
Map<String, String> sampleData = new HashMap<>();
try (var warcReader = new WarcReader(fileNameWarc)) {
warcReader.forEach(record -> {
if (record instanceof WarcRequest req) {
sampleData.put(record.type(), req.target());
}
if (record instanceof WarcResponse rsp) {
sampleData.put(record.type(), rsp.target());
System.out.println(rsp.target());
}
});
}
System.out.println(sampleData);
// Timeout is set to 1 second, but the server will take 5 seconds to respond,
// so we expect the request to take 1s and change before it times out.
Assertions.assertTrue(Duration.between(start, end).toMillis() < 2000);
}
}