1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

14 Commits

Author SHA1 Message Date
Viktor Lofgren
d1ec909b36 (crawler) Improve handling of timeouts to prevent crawler from getting stuck 2025-04-02 12:57:21 +02:00
Viktor Lofgren
c67c5bbf42 (crawler) Experimentally drop to HTTP 1.1 for crawler to see if this solves stuck send()s 2025-04-01 12:05:21 +02:00
Viktor Lofgren
ecb0e57a1a (crawler) Make the use of virtual threads in the crawler configurable via system properties 2025-03-27 21:26:05 +01:00
Viktor Lofgren
8c61f61b46 (crawler) Add crawling metadata to domainstate db 2025-03-27 16:38:37 +01:00
Viktor Lofgren
662a18c933 Revert "(crawler) Further rearrange crawl order"
This reverts commit 1c2426a052.

The change does not appear necessary to avoid problems.
2025-03-27 11:25:08 +01:00
Viktor Lofgren
1c2426a052 (crawler) Further rearrange crawl order
Limit crawl order preferrence to edu domains, to avoid hitting stuff like medium and wordpress with shotgun requests.
2025-03-27 11:19:20 +01:00
Viktor Lofgren
34df7441ac (crawler) Add some jitter to crawl delay to avoid accidentally synchronized requests 2025-03-27 11:15:16 +01:00
Viktor Lofgren
5387e2bd80 (crawler) Adjust crawl order to get a better mixture of domains 2025-03-27 11:12:48 +01:00
Viktor Lofgren
0f3b24d0f8 (crawler) Evaluate virtual threads for the crawler
The change also alters SimpleBlockingThreadPool to add the option to use virtual threads instead of platform threads.
2025-03-27 11:02:21 +01:00
Viktor Lofgren
a732095d2a (crawler) Improve crawl task ordering
Further improve the ordering of the crawl tasks in order to ensure that potentially blocking tasks are enqueued as soon as possible.
2025-03-26 16:51:37 +01:00
Viktor Lofgren
6607f0112f (crawler) Improve how the crawler deals with interruptions
In some cases, it threads would previously fail to terminate when interrupted.
2025-03-26 16:19:57 +01:00
Viktor Lofgren
4913730de9 (jdk) Upgrade to Java 24 2025-03-26 13:26:06 +01:00
Viktor Lofgren
1db64f9d56 (chore) Fix zookeeper test by upgrading zk image version.
Test suddenly broke due to the increasing entropy of the universe.
2025-03-26 11:47:14 +01:00
Viktor Lofgren
4dcff14498 (search) Improve contrast with light mode 2025-03-25 13:15:31 +01:00
15 changed files with 405 additions and 70 deletions

View File

@@ -43,12 +43,11 @@ subprojects.forEach {it ->
}
ext {
jvmVersion=23
dockerImageBase='container-registry.oracle.com/graalvm/jdk:23'
jvmVersion = 24
dockerImageBase='container-registry.oracle.com/graalvm/jdk:24'
dockerImageTag='latest'
dockerImageRegistry='marginalia'
jibVersion = '3.4.4'
}
idea {

View File

@@ -14,7 +14,7 @@ public class EdgeDomain implements Serializable {
@Nonnull
public final String topDomain;
public EdgeDomain(String host) {
public EdgeDomain(@Nonnull String host) {
Objects.requireNonNull(host, "domain name must not be null");
host = host.toLowerCase();
@@ -61,6 +61,10 @@ public class EdgeDomain implements Serializable {
this.topDomain = topDomain;
}
public static String getTopDomain(String host) {
return new EdgeDomain(host).topDomain;
}
private boolean looksLikeGovTld(String host) {
if (host.length() < 8)
return false;
@@ -116,24 +120,6 @@ public class EdgeDomain implements Serializable {
return topDomain.substring(0, cutPoint).toLowerCase();
}
public String getLongDomainKey() {
StringBuilder ret = new StringBuilder();
int cutPoint = topDomain.indexOf('.');
if (cutPoint < 0) {
ret.append(topDomain);
} else {
ret.append(topDomain, 0, cutPoint);
}
if (!subDomain.isEmpty() && !"www".equals(subDomain)) {
ret.append(":");
ret.append(subDomain);
}
return ret.toString().toLowerCase();
}
/** If possible, try to provide an alias domain,
* i.e. a domain name that is very likely to link to this one
* */

View File

@@ -25,7 +25,7 @@ import static org.mockito.Mockito.when;
class ZkServiceRegistryTest {
private static final int ZOOKEEPER_PORT = 2181;
private static final GenericContainer<?> zookeeper =
new GenericContainer<>("zookeeper:3.8.0")
new GenericContainer<>("zookeeper:3.8")
.withExposedPorts(ZOOKEEPER_PORT);
List<ZkServiceRegistry> registries = new ArrayList<>();

View File

@@ -23,16 +23,33 @@ public class SimpleBlockingThreadPool {
private final Logger logger = LoggerFactory.getLogger(SimpleBlockingThreadPool.class);
public SimpleBlockingThreadPool(String name, int poolSize, int queueSize) {
this(name, poolSize, queueSize, ThreadType.PLATFORM);
}
public SimpleBlockingThreadPool(String name, int poolSize, int queueSize, ThreadType threadType) {
tasks = new ArrayBlockingQueue<>(queueSize);
for (int i = 0; i < poolSize; i++) {
Thread worker = new Thread(this::worker, name + "[" + i + "]");
worker.setDaemon(true);
worker.start();
Thread.Builder threadBuilder = switch (threadType) {
case VIRTUAL -> Thread.ofVirtual();
case PLATFORM -> Thread.ofPlatform().daemon(true);
};
Thread worker = threadBuilder
.name(name + "[" + i + "]")
.start(this::worker);
workers.add(worker);
}
}
public enum ThreadType {
VIRTUAL,
PLATFORM
}
public void submit(Task task) throws InterruptedException {
tasks.put(task);
}

View File

@@ -41,10 +41,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.security.Security;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -106,9 +103,18 @@ public class CrawlerMain extends ProcessMainClass {
this.blacklist = blacklist;
this.node = processConfiguration.node();
SimpleBlockingThreadPool.ThreadType threadType;
if (Boolean.getBoolean("crawler.useVirtualThreads")) {
threadType = SimpleBlockingThreadPool.ThreadType.VIRTUAL;
}
else {
threadType = SimpleBlockingThreadPool.ThreadType.PLATFORM;
}
pool = new SimpleBlockingThreadPool("CrawlerPool",
Integer.getInteger("crawler.poolSize", 256),
1);
1,
threadType);
// Wait for the blacklist to be loaded before starting the crawl
@@ -224,10 +230,7 @@ public class CrawlerMain extends ProcessMainClass {
logger.info("Loaded {} domains", crawlSpecRecords.size());
// Shuffle the domains to ensure we get a good mix of domains in each crawl,
// so that e.g. the big domains don't get all crawled at once, or we end up
// crawling the same server in parallel from different subdomains...
Collections.shuffle(crawlSpecRecords);
crawlSpecRecords.sort(crawlSpecArrangement(crawlSpecRecords));
// First a validation run to ensure the file is all good to parse
if (crawlSpecRecords.isEmpty()) {
@@ -306,6 +309,30 @@ public class CrawlerMain extends ProcessMainClass {
}
}
/** Create a comparator that sorts the crawl specs in a way that is beneficial for the crawl,
* we want to enqueue domains that have common top domains first, but otherwise have a random
* order.
* <p></p>
* Note, we can't use hash codes for randomization as it is not desirable to have the same order
* every time the process is restarted (and CrawlSpecRecord is a record, which defines equals and
* hashcode based on the fields).
* */
private Comparator<CrawlSpecRecord> crawlSpecArrangement(List<CrawlSpecRecord> records) {
Random r = new Random();
Map<String, Integer> topDomainCounts = new HashMap<>(4 + (int) Math.sqrt(records.size()));
Map<String, Integer> randomOrder = new HashMap<>(records.size());
for (var spec : records) {
topDomainCounts.merge(EdgeDomain.getTopDomain(spec.domain), 1, Integer::sum);
randomOrder.put(spec.domain, r.nextInt());
}
return Comparator.comparing((CrawlSpecRecord spec) -> topDomainCounts.getOrDefault(EdgeDomain.getTopDomain(spec.domain), 0) >= 8)
.reversed()
.thenComparing(spec -> randomOrder.get(spec.domain))
.thenComparing(Record::hashCode); // non-deterministic tie-breaker to
}
/** Submit a task for execution if it can be run, returns true if it was submitted
* or if it can be discarded */
private boolean trySubmitDeferredTask(CrawlTask task) {

View File

@@ -11,6 +11,7 @@ import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
@@ -24,6 +25,17 @@ public class DomainStateDb implements AutoCloseable {
private final Connection connection;
public record CrawlMeta(
String domainName,
Instant lastFullCrawl,
Duration recrawlTime,
Duration crawlTime,
int recrawlErrors,
int crawlChanges,
int totalCrawlSize
) {}
public record SummaryRecord(
String domainName,
Instant lastUpdated,
@@ -102,6 +114,17 @@ public class DomainStateDb implements AutoCloseable {
feedUrl TEXT
)
""");
stmt.executeUpdate("""
CREATE TABLE IF NOT EXISTS crawl_meta (
domain TEXT PRIMARY KEY,
lastFullCrawlEpochMs LONG NOT NULL,
recrawlTimeMs LONG NOT NULL,
recrawlErrors INTEGER NOT NULL,
crawlTimeMs LONG NOT NULL,
crawlChanges INTEGER NOT NULL,
totalCrawlSize INTEGER NOT NULL
)
""");
stmt.executeUpdate("""
CREATE TABLE IF NOT EXISTS favicon (
domain TEXT PRIMARY KEY,
@@ -164,6 +187,26 @@ public class DomainStateDb implements AutoCloseable {
return Optional.empty();
}
public void save(CrawlMeta crawlMeta) {
if (connection == null) throw new IllegalStateException("No connection to domainstate db");
try (var stmt = connection.prepareStatement("""
INSERT OR REPLACE INTO crawl_meta (domain, lastFullCrawlEpochMs, recrawlTimeMs, recrawlErrors, crawlTimeMs, crawlChanges, totalCrawlSize)
VALUES (?, ?, ?, ?, ?, ?, ?)
""")) {
stmt.setString(1, crawlMeta.domainName());
stmt.setLong(2, crawlMeta.lastFullCrawl.toEpochMilli());
stmt.setLong(3, crawlMeta.recrawlTime.toMillis());
stmt.setInt(4, crawlMeta.recrawlErrors);
stmt.setLong(5, crawlMeta.crawlTime.toMillis());
stmt.setInt(6, crawlMeta.crawlChanges);
stmt.setInt(7, crawlMeta.totalCrawlSize);
stmt.executeUpdate();
} catch (SQLException e) {
logger.error("Failed to insert crawl meta record", e);
}
}
public void save(SummaryRecord record) {
if (connection == null) throw new IllegalStateException("No connection to domainstate db");
@@ -182,7 +225,35 @@ public class DomainStateDb implements AutoCloseable {
}
}
public Optional<SummaryRecord> get(String domainName) {
public Optional<CrawlMeta> getMeta(String domainName) {
if (connection == null)
return Optional.empty();
try (var stmt = connection.prepareStatement("""
SELECT domain, lastFullCrawlEpochMs, recrawlTimeMs, recrawlErrors, crawlTimeMs, crawlChanges, totalCrawlSize
FROM crawl_meta
WHERE domain = ?
""")) {
stmt.setString(1, domainName);
var rs = stmt.executeQuery();
if (rs.next()) {
return Optional.of(new CrawlMeta(
rs.getString("domain"),
Instant.ofEpochMilli(rs.getLong("lastFullCrawlEpochMs")),
Duration.ofMillis(rs.getLong("recrawlTimeMs")),
Duration.ofMillis(rs.getLong("crawlTimeMs")),
rs.getInt("recrawlErrors"),
rs.getInt("crawlChanges"),
rs.getInt("totalCrawlSize")
));
}
} catch (SQLException ex) {
logger.error("Failed to get crawl meta record", ex);
}
return Optional.empty();
}
public Optional<SummaryRecord> getSummary(String domainName) {
if (connection == null)
return Optional.empty();

View File

@@ -29,6 +29,7 @@ import java.net.http.HttpResponse;
import java.net.http.HttpTimeoutException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.zip.GZIPInputStream;
@@ -56,12 +57,22 @@ public class HttpFetcherImpl implements HttpFetcher {
private final HttpClient client;
private HttpClient createClient() {
final ExecutorService executorService;
if (Boolean.getBoolean("crawler.httpclient.useVirtualThreads")) {
executorService = Executors.newVirtualThreadPerTaskExecutor();
}
else {
executorService = Executors.newCachedThreadPool();
}
return HttpClient.newBuilder()
.sslContext(NoSecuritySSL.buildSslContext())
.cookieHandler(cookies)
.followRedirects(HttpClient.Redirect.NORMAL)
.version(HttpClient.Version.HTTP_1_1)
.connectTimeout(Duration.ofSeconds(8))
.executor(Executors.newCachedThreadPool())
.executor(executorService)
.build();
}

View File

@@ -8,7 +8,10 @@ import java.net.http.HttpHeaders;
import java.net.http.HttpResponse;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.*;
import java.util.zip.GZIPInputStream;
/** Input buffer for temporary storage of a HTTP response
@@ -39,7 +42,7 @@ public abstract class WarcInputBuffer implements AutoCloseable {
* and suppressed from the headers.
* If an error occurs, a buffer will be created with no content and an error status.
*/
static WarcInputBuffer forResponse(HttpResponse<InputStream> rsp) {
static WarcInputBuffer forResponse(HttpResponse<InputStream> rsp, Duration timeLimit) {
if (rsp == null)
return new ErrorBuffer();
@@ -51,11 +54,11 @@ public abstract class WarcInputBuffer implements AutoCloseable {
if (contentEncoding == null && contentLength > 0 && contentLength < 8192) {
// If the content is small and not compressed, we can just read it into memory
return new MemoryBuffer(headers, is, contentLength);
return new MemoryBuffer(headers, timeLimit, is, contentLength);
}
else {
// Otherwise, we unpack it into a file and read it from there
return new FileBuffer(headers, is);
return new FileBuffer(headers, timeLimit, is);
}
}
catch (Exception ex) {
@@ -64,9 +67,16 @@ public abstract class WarcInputBuffer implements AutoCloseable {
}
private static final ExecutorService virtualExecutorService = Executors.newVirtualThreadPerTaskExecutor();
private Future<Integer> readAsync(InputStream is, byte[] out) {
return virtualExecutorService.submit(() -> is.read(out));
}
/** Copy an input stream to an output stream, with a maximum size and time limit */
protected void copy(InputStream is, OutputStream os) {
long startTime = System.currentTimeMillis();
protected void copy(InputStream is, OutputStream os, Duration timeLimit) {
Instant start = Instant.now();
Instant timeout = start.plus(timeLimit);
long size = 0;
byte[] buffer = new byte[8192];
@@ -76,7 +86,15 @@ public abstract class WarcInputBuffer implements AutoCloseable {
while (true) {
try {
int n = is.read(buffer);
Duration remaining = Duration.between(Instant.now(), timeout);
if (remaining.isNegative()) {
truncationReason = WarcTruncationReason.TIME;
break;
}
Future<Integer> readAsync = readAsync(is, buffer);
int n = readAsync.get(remaining.toMillis(), TimeUnit.MILLISECONDS);
if (n < 0) break;
size += n;
os.write(buffer, 0, n);
@@ -85,12 +103,11 @@ public abstract class WarcInputBuffer implements AutoCloseable {
truncationReason = WarcTruncationReason.LENGTH;
break;
}
if (System.currentTimeMillis() - startTime > WarcRecorder.MAX_TIME) {
truncationReason = WarcTruncationReason.TIME;
break;
}
} catch (IOException e) {
} catch (IOException|ExecutionException e) {
truncationReason = WarcTruncationReason.UNSPECIFIED;
} catch (TimeoutException e) {
truncationReason = WarcTruncationReason.TIME;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@@ -123,12 +140,12 @@ class ErrorBuffer extends WarcInputBuffer {
/** Buffer for when we have the response in memory */
class MemoryBuffer extends WarcInputBuffer {
byte[] data;
public MemoryBuffer(HttpHeaders headers, InputStream responseStream, int size) {
public MemoryBuffer(HttpHeaders headers, Duration timeLimit, InputStream responseStream, int size) {
super(headers);
var outputStream = new ByteArrayOutputStream(size);
copy(responseStream, outputStream);
copy(responseStream, outputStream, timeLimit);
data = outputStream.toByteArray();
}
@@ -152,7 +169,7 @@ class MemoryBuffer extends WarcInputBuffer {
class FileBuffer extends WarcInputBuffer {
private final Path tempFile;
public FileBuffer(HttpHeaders headers, InputStream responseStream) throws IOException {
public FileBuffer(HttpHeaders headers, Duration timeLimit, InputStream responseStream) throws IOException {
super(suppressContentEncoding(headers));
this.tempFile = Files.createTempFile("rsp", ".html");
@@ -160,7 +177,7 @@ class FileBuffer extends WarcInputBuffer {
if ("gzip".equalsIgnoreCase(headers.firstValue("Content-Encoding").orElse(""))) {
try (var out = Files.newOutputStream(tempFile)) {
copy(new GZIPInputStream(responseStream), out);
copy(new GZIPInputStream(responseStream), out, timeLimit);
}
catch (Exception ex) {
truncationReason = WarcTruncationReason.UNSPECIFIED;
@@ -168,7 +185,7 @@ class FileBuffer extends WarcInputBuffer {
}
else {
try (var out = Files.newOutputStream(tempFile)) {
copy(responseStream, out);
copy(responseStream, out, timeLimit);
}
catch (Exception ex) {
truncationReason = WarcTruncationReason.UNSPECIFIED;

View File

@@ -102,7 +102,7 @@ public class WarcRecorder implements AutoCloseable {
}
try (WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response);
try (WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response, request.timeout().orElseGet(() -> Duration.ofMillis(MAX_TIME)));
InputStream inputStream = inputBuffer.read())
{
if (cookies.hasCookies()) {

View File

@@ -3,6 +3,7 @@ package nu.marginalia.crawl.retreival;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
import static java.lang.Math.max;
import static java.lang.Math.min;
@@ -53,12 +54,13 @@ public class CrawlDelayTimer {
public void waitFetchDelay(long spentTime) {
long sleepTime = delayTime;
long jitter = ThreadLocalRandom.current().nextLong(0, 150);
try {
if (sleepTime >= 1) {
if (spentTime > sleepTime)
return;
Thread.sleep(min(sleepTime - spentTime, 5000));
Thread.sleep(min(sleepTime - spentTime, 5000) + jitter);
} else {
// When no crawl delay is specified, lean toward twice the fetch+process time,
// within sane limits. This means slower servers get slower crawling, and faster
@@ -71,17 +73,17 @@ public class CrawlDelayTimer {
if (spentTime > sleepTime)
return;
Thread.sleep(sleepTime - spentTime);
Thread.sleep(sleepTime - spentTime + jitter);
}
if (slowDown) {
// Additional delay when the server is signalling it wants slower requests
Thread.sleep(DEFAULT_CRAWL_DELAY_MIN_MS);
Thread.sleep(DEFAULT_CRAWL_DELAY_MIN_MS + jitter);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException();
throw new RuntimeException("Interrupted", e);
}
}
}

View File

@@ -26,6 +26,8 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -108,15 +110,24 @@ public class CrawlerRetreiver implements AutoCloseable {
DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(probedUrl, delayTimer);
domainStateDb.save(summaryRecord);
if (Thread.interrupted()) {
// There's a small chance we're interrupted during the sniffing portion
throw new InterruptedException();
}
Instant recrawlStart = Instant.now();
CrawlerRevisitor.RecrawlMetadata recrawlMetadata = crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer);
Duration recrawlTime = Duration.between(recrawlStart, Instant.now());
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) {
if (recrawlMetadata.size() > 0) {
// If we have reference data, we will always grow the crawl depth a bit
crawlFrontier.increaseDepth(1.5, 2500);
}
oldCrawlData.close(); // proactively close the crawl data reference here to not hold onto expensive resources
yield crawlDomain(probedUrl, robotsRules, delayTimer, domainLinks);
yield crawlDomain(probedUrl, robotsRules, delayTimer, domainLinks, recrawlMetadata, recrawlTime);
}
case HttpFetcher.DomainProbeResult.Redirect(EdgeDomain domain1) -> {
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, "Redirect", domain1.toString()));
@@ -138,8 +149,11 @@ public class CrawlerRetreiver implements AutoCloseable {
private int crawlDomain(EdgeUrl rootUrl,
SimpleRobotRules robotsRules,
CrawlDelayTimer delayTimer,
DomainLinks domainLinks) {
DomainLinks domainLinks,
CrawlerRevisitor.RecrawlMetadata recrawlMetadata,
Duration recrawlTime) {
Instant crawlStart = Instant.now();
// Add external links to the crawl frontier
crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto));
@@ -149,6 +163,8 @@ public class CrawlerRetreiver implements AutoCloseable {
crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer));
}
int crawlerAdditions = 0;
while (!crawlFrontier.isEmpty()
&& !crawlFrontier.isCrawlDepthReached()
&& errorCount < MAX_ERRORS
@@ -180,7 +196,11 @@ public class CrawlerRetreiver implements AutoCloseable {
continue;
try {
fetchContentWithReference(top, delayTimer, DocumentWithReference.empty());
var result = fetchContentWithReference(top, delayTimer, DocumentWithReference.empty());
if (result.isOk()) {
crawlerAdditions++;
}
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
@@ -188,6 +208,17 @@ public class CrawlerRetreiver implements AutoCloseable {
}
}
Duration crawlTime = Duration.between(crawlStart, Instant.now());
domainStateDb.save(new DomainStateDb.CrawlMeta(
domain,
Instant.now(),
recrawlTime,
crawlTime,
recrawlMetadata.errors(),
crawlerAdditions,
recrawlMetadata.size() + crawlerAdditions
));
return crawlFrontier.visitedSize();
}
@@ -289,6 +320,10 @@ public class CrawlerRetreiver implements AutoCloseable {
}
catch (Exception ex) {
logger.error("Error configuring link filter", ex);
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
return DomainStateDb.SummaryRecord.forError(domain, "Crawler Interrupted", ex.getMessage());
}
}
finally {
crawlFrontier.addVisited(rootUrl);
@@ -316,7 +351,7 @@ public class CrawlerRetreiver implements AutoCloseable {
);
private Optional<String> guessFeedUrl(CrawlDelayTimer timer) throws InterruptedException {
var oldDomainStateRecord = domainStateDb.get(domain);
var oldDomainStateRecord = domainStateDb.getSummary(domain);
// If we are already aware of an old feed URL, then we can just revalidate it
if (oldDomainStateRecord.isPresent()) {

View File

@@ -31,7 +31,7 @@ public class CrawlerRevisitor {
}
/** Performs a re-crawl of old documents, comparing etags and last-modified */
public int recrawl(CrawlDataReference oldCrawlData,
public RecrawlMetadata recrawl(CrawlDataReference oldCrawlData,
SimpleRobotRules robotsRules,
CrawlDelayTimer delayTimer)
throws InterruptedException {
@@ -39,6 +39,7 @@ public class CrawlerRevisitor {
int retained = 0;
int errors = 0;
int skipped = 0;
int size = 0;
for (CrawledDocument doc : oldCrawlData) {
if (errors > 20) {
@@ -46,6 +47,10 @@ public class CrawlerRevisitor {
break;
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
var urlMaybe = EdgeUrl.parse(doc.url);
if (urlMaybe.isEmpty())
continue;
@@ -78,6 +83,7 @@ public class CrawlerRevisitor {
continue;
}
size++;
double skipProb;
@@ -150,6 +156,8 @@ public class CrawlerRevisitor {
}
}
return recrawled;
return new RecrawlMetadata(size, errors, skipped);
}
public record RecrawlMetadata(int size, int errors, int skipped) {}
}

View File

@@ -8,6 +8,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import static org.junit.jupiter.api.Assertions.*;
@@ -47,8 +48,8 @@ class DomainStateDbTest {
db.save(allFields);
db.save(minFields);
assertEquals(allFields, db.get("all.marginalia.nu").orElseThrow());
assertEquals(minFields, db.get("min.marginalia.nu").orElseThrow());
assertEquals(allFields, db.getSummary("all.marginalia.nu").orElseThrow());
assertEquals(minFields, db.getSummary("min.marginalia.nu").orElseThrow());
var updatedAllFields = new DomainStateDb.SummaryRecord(
"all.marginalia.nu",
@@ -59,7 +60,19 @@ class DomainStateDbTest {
);
db.save(updatedAllFields);
assertEquals(updatedAllFields, db.get("all.marginalia.nu").orElseThrow());
assertEquals(updatedAllFields, db.getSummary("all.marginalia.nu").orElseThrow());
}
}
@Test
public void testMetadata() throws SQLException {
try (var db = new DomainStateDb(tempFile)) {
var original = new DomainStateDb.CrawlMeta("example.com", Instant.ofEpochMilli(12345), Duration.ofMillis(30), Duration.ofMillis(300), 1, 2, 3);
db.save(original);
var maybeMeta = db.getMeta("example.com");
assertTrue(maybeMeta.isPresent());
assertEquals(original, maybeMeta.get());
}
}

View File

@@ -0,0 +1,149 @@
package nu.marginalia.crawl.retreival.fetcher;
import com.sun.net.httpserver.HttpServer;
import nu.marginalia.crawl.fetcher.Cookies;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import org.junit.jupiter.api.*;
import org.netpreserve.jwarc.WarcReader;
import org.netpreserve.jwarc.WarcRequest;
import org.netpreserve.jwarc.WarcResponse;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Tag("slow")
class WarcRecorderFakeServerTest {
static HttpServer server;
@BeforeAll
public static void setUpAll() throws IOException {
server = HttpServer.create(new InetSocketAddress("127.0.0.1", 14510), 10);
server.createContext("/fast", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "text/html");
exchange.sendResponseHeaders(200, "<html><body>hello</body></html>".length());
try (var os = exchange.getResponseBody()) {
os.write("<html><body>hello</body></html>".getBytes());
os.flush();
}
exchange.close();
});
server.createContext("/slow", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "text/html");
exchange.sendResponseHeaders(200, "<html><body>hello</body></html>:D".length());
try (var os = exchange.getResponseBody()) {
os.write("<html><body>hello</body></html>".getBytes());
os.flush();
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
os.write(":D".getBytes());
os.flush();
}
exchange.close();
});
server.start();
}
@AfterAll
public static void tearDownAll() {
server.stop(0);
}
Path fileNameWarc;
Path fileNameParquet;
WarcRecorder client;
HttpClient httpClient;
@BeforeEach
public void setUp() throws Exception {
httpClient = HttpClient.newBuilder().build();
fileNameWarc = Files.createTempFile("test", ".warc");
fileNameParquet = Files.createTempFile("test", ".parquet");
client = new WarcRecorder(fileNameWarc, new Cookies());
}
@AfterEach
public void tearDown() throws Exception {
client.close();
Files.delete(fileNameWarc);
}
@Test
void fetchFast() throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException {
client.fetch(httpClient,
HttpRequest.newBuilder()
.uri(new java.net.URI("http://localhost:14510/fast"))
.timeout(Duration.ofSeconds(1))
.header("User-agent", "test.marginalia.nu")
.header("Accept-Encoding", "gzip")
.GET().build()
);
Map<String, String> sampleData = new HashMap<>();
try (var warcReader = new WarcReader(fileNameWarc)) {
warcReader.forEach(record -> {
if (record instanceof WarcRequest req) {
sampleData.put(record.type(), req.target());
}
if (record instanceof WarcResponse rsp) {
sampleData.put(record.type(), rsp.target());
}
});
}
System.out.println(sampleData);
}
@Test
void fetchSlow() throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException {
Instant start = Instant.now();
client.fetch(httpClient,
HttpRequest.newBuilder()
.uri(new java.net.URI("http://localhost:14510/slow"))
.timeout(Duration.ofSeconds(1))
.header("User-agent", "test.marginalia.nu")
.header("Accept-Encoding", "gzip")
.GET().build()
);
Instant end = Instant.now();
Map<String, String> sampleData = new HashMap<>();
try (var warcReader = new WarcReader(fileNameWarc)) {
warcReader.forEach(record -> {
if (record instanceof WarcRequest req) {
sampleData.put(record.type(), req.target());
}
if (record instanceof WarcResponse rsp) {
sampleData.put(record.type(), rsp.target());
System.out.println(rsp.target());
}
});
}
System.out.println(sampleData);
// Timeout is set to 1 second, but the server will take 5 seconds to respond, so we expect the request to take 1s and change
// before it times out.
Assertions.assertTrue(Duration.between(start, end).toMillis() < 2000, "Request should take less than 2 seconds");
}
}

View File

@@ -38,7 +38,7 @@
<div class="space-y-2">
@for (SearchFilters.SearchOption option : filters.searchOptions())
<label class="flex items-center">
<button title="${option.name()}" onclick="document.location='$unsafe{option.getUrl()}'" class="flex-1 py-2 pl-2 rounded flex space-x-2 dark:has-[:checked]:bg-gray-950 has-[:checked]:bg-gray-100 has-[:checked]:text-slate-900 dark:has-[:checked]:text-slate-100 hover:bg-gray-50 dark:hover:bg-gray-950 bg-white dark:bg-gray-900 dark:border dark:border-gray-600 text-margeblue dark:text-slate-200 outline-1 active:outline">
<button title="${option.name()}" onclick="document.location='$unsafe{option.getUrl()}'" class="flex-1 py-2 pl-2 rounded flex space-x-2 dark:has-[:checked]:bg-gray-950 has-[:checked]:bg-gray-300 has-[:checked]:text-slate-900 dark:has-[:checked]:text-slate-100 hover:bg-gray-50 dark:hover:bg-gray-950 bg-white dark:bg-gray-900 dark:border dark:border-gray-600 text-margeblue dark:text-slate-200 outline-1 active:outline">
@if (option.isSet())
<input type="checkbox" checked class="sr-only" aria-checked="true" />
@else