1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

5 Commits

Author SHA1 Message Date
Viktor Lofgren
b6265cee11 (feeds) Add timeout code to send()
Due to the unique way java's HttpClient implements timeouts, we must always wrap it in an executor to catch the scenario that a server stops sending data mid-response, which would otherwise hang the send method forever.
2025-04-08 22:09:59 +02:00
Viktor Lofgren
c91af247e9 (rate-limit) Fix rate limiting logic
The rate limiter was misconfigured to regenerate tokens at a fixed rate of 1 per refillRate; not refillRate per minute.  Additionally increasing the default bucket size to 4x refill rate.
2025-04-05 12:26:26 +02:00
Viktor Lofgren
7a31227de1 (crawler) Filter out robots.txt-sitemaps that belong to different domains 2025-04-02 13:35:37 +02:00
Viktor Lofgren
4f477604c5 (crawler) Improve error handling in parquet->slop conversion
Parquet code throws a RuntimeException, which was not correctly caught, leading to a failure to crawl.
2025-04-02 13:16:01 +02:00
Viktor Lofgren
2970f4395b (minor) Test code cleanup 2025-04-02 13:16:01 +02:00
5 changed files with 34 additions and 27 deletions

View File

@@ -35,21 +35,8 @@ public class RateLimiter {
} }
public static RateLimiter forExpensiveRequest() {
return new RateLimiter(5, 10);
}
public static RateLimiter custom(int perMinute) { public static RateLimiter custom(int perMinute) {
return new RateLimiter(perMinute, 60); return new RateLimiter(4 * perMinute, perMinute);
}
public static RateLimiter forSpamBots() {
return new RateLimiter(120, 3600);
}
public static RateLimiter forLogin() {
return new RateLimiter(3, 15);
} }
private void cleanIdleBuckets() { private void cleanIdleBuckets() {
@@ -62,7 +49,7 @@ public class RateLimiter {
} }
private Bucket createBucket() { private Bucket createBucket() {
var refill = Refill.greedy(1, Duration.ofSeconds(refillRate)); var refill = Refill.greedy(refillRate, Duration.ofSeconds(60));
var bw = Bandwidth.classic(capacity, refill); var bw = Bandwidth.classic(capacity, refill);
return Bucket.builder().addLimit(bw).build(); return Bucket.builder().addLimit(bw).build();
} }

View File

@@ -33,6 +33,7 @@ import java.sql.SQLException;
import java.time.*; import java.time.*;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@@ -71,7 +72,7 @@ public class FeedFetcherService {
public enum UpdateMode { public enum UpdateMode {
CLEAN, CLEAN,
REFRESH REFRESH
}; }
public void updateFeeds(UpdateMode updateMode) throws IOException { public void updateFeeds(UpdateMode updateMode) throws IOException {
if (updating) // Prevent concurrent updates if (updating) // Prevent concurrent updates
@@ -87,6 +88,7 @@ public class FeedFetcherService {
.followRedirects(HttpClient.Redirect.NORMAL) .followRedirects(HttpClient.Redirect.NORMAL)
.version(HttpClient.Version.HTTP_2) .version(HttpClient.Version.HTTP_2)
.build(); .build();
ExecutorService fetchExecutor = Executors.newCachedThreadPool();
FeedJournal feedJournal = FeedJournal.create(); FeedJournal feedJournal = FeedJournal.create();
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds") var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds")
) { ) {
@@ -131,7 +133,7 @@ public class FeedFetcherService {
FetchResult feedData; FetchResult feedData;
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) { try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
feedData = fetchFeedData(feed, client, ifModifiedSinceDate, ifNoneMatchTag); feedData = fetchFeedData(feed, client, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
} catch (Exception ex) { } catch (Exception ex) {
feedData = new FetchResult.TransientError(); feedData = new FetchResult.TransientError();
} }
@@ -211,6 +213,7 @@ public class FeedFetcherService {
private FetchResult fetchFeedData(FeedDefinition feed, private FetchResult fetchFeedData(FeedDefinition feed,
HttpClient client, HttpClient client,
ExecutorService executorService,
@Nullable String ifModifiedSinceDate, @Nullable String ifModifiedSinceDate,
@Nullable String ifNoneMatchTag) @Nullable String ifNoneMatchTag)
{ {
@@ -237,7 +240,14 @@ public class FeedFetcherService {
HttpRequest getRequest = requestBuilder.build(); HttpRequest getRequest = requestBuilder.build();
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
HttpResponse<byte[]> rs = client.send(getRequest, HttpResponse.BodyHandlers.ofByteArray());
/* Note we need to use an executor to time-limit the send() method in HttpClient, as
* its support for timeouts only applies to the time until response starts to be received,
* and does not catch the case when the server starts to send data but then hangs.
*/
HttpResponse<byte[]> rs = executorService.submit(
() -> client.send(getRequest, HttpResponse.BodyHandlers.ofByteArray()))
.get(15, TimeUnit.SECONDS);
if (rs.statusCode() == 429) { // Too Many Requests if (rs.statusCode() == 429) { // Too Many Requests
int retryAfter = Integer.parseInt(rs.headers().firstValue("Retry-After").orElse("2")); int retryAfter = Integer.parseInt(rs.headers().firstValue("Retry-After").orElse("2"));

View File

@@ -501,7 +501,7 @@ public class CrawlerMain extends ProcessMainClass {
return new CrawlDataReference(slopPath); return new CrawlDataReference(slopPath);
} }
} catch (IOException e) { } catch (Exception e) {
logger.debug("Failed to read previous crawl data for {}", specification.domain()); logger.debug("Failed to read previous crawl data for {}", specification.domain());
} }

View File

@@ -160,7 +160,14 @@ public class CrawlerRetreiver implements AutoCloseable {
// Fetch sitemaps // Fetch sitemaps
for (var sitemap : robotsRules.getSitemaps()) { for (var sitemap : robotsRules.getSitemaps()) {
crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer));
// Validate the sitemap URL and check if it belongs to the domain as the root URL
if (EdgeUrl.parse(sitemap)
.map(url -> url.getDomain().equals(rootUrl.domain))
.orElse(false)) {
crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer));
}
} }
int crawlerAdditions = 0; int crawlerAdditions = 0;

View File

@@ -10,12 +10,10 @@ import org.netpreserve.jwarc.WarcResponse;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.net.http.HttpClient; import java.net.http.HttpClient;
import java.net.http.HttpRequest; import java.net.http.HttpRequest;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.security.NoSuchAlgorithmException;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.HashMap; import java.util.HashMap;
@@ -29,6 +27,8 @@ class WarcRecorderFakeServerTest {
@BeforeAll @BeforeAll
public static void setUpAll() throws IOException { public static void setUpAll() throws IOException {
server = HttpServer.create(new InetSocketAddress("127.0.0.1", 14510), 10); server = HttpServer.create(new InetSocketAddress("127.0.0.1", 14510), 10);
// This endpoint will finish sending the response immediately
server.createContext("/fast", exchange -> { server.createContext("/fast", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "text/html"); exchange.getResponseHeaders().add("Content-Type", "text/html");
exchange.sendResponseHeaders(200, "<html><body>hello</body></html>".length()); exchange.sendResponseHeaders(200, "<html><body>hello</body></html>".length());
@@ -40,6 +40,8 @@ class WarcRecorderFakeServerTest {
exchange.close(); exchange.close();
}); });
// This endpoint will take 10 seconds to finish sending the response,
// which should trigger a timeout in the client
server.createContext("/slow", exchange -> { server.createContext("/slow", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "text/html"); exchange.getResponseHeaders().add("Content-Type", "text/html");
exchange.sendResponseHeaders(200, "<html><body>hello</body></html>:D".length()); exchange.sendResponseHeaders(200, "<html><body>hello</body></html>:D".length());
@@ -88,7 +90,7 @@ class WarcRecorderFakeServerTest {
} }
@Test @Test
void fetchFast() throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException { public void fetchFast() throws Exception {
client.fetch(httpClient, client.fetch(httpClient,
HttpRequest.newBuilder() HttpRequest.newBuilder()
.uri(new java.net.URI("http://localhost:14510/fast")) .uri(new java.net.URI("http://localhost:14510/fast"))
@@ -114,7 +116,7 @@ class WarcRecorderFakeServerTest {
} }
@Test @Test
void fetchSlow() throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException { public void fetchSlow() throws Exception {
Instant start = Instant.now(); Instant start = Instant.now();
client.fetch(httpClient, client.fetch(httpClient,
HttpRequest.newBuilder() HttpRequest.newBuilder()
@@ -141,9 +143,10 @@ class WarcRecorderFakeServerTest {
System.out.println(sampleData); System.out.println(sampleData);
// Timeout is set to 1 second, but the server will take 5 seconds to respond, so we expect the request to take 1s and change // Timeout is set to 1 second, but the server will take 5 seconds to respond,
// before it times out. // so we expect the request to take 1s and change before it times out.
Assertions.assertTrue(Duration.between(start, end).toMillis() < 2000, "Request should take less than 2 seconds");
Assertions.assertTrue(Duration.between(start, end).toMillis() < 2000);
} }
} }