mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 07:32:38 +02:00
Compare commits
7 Commits
deploy-011
...
deploy-011
Author | SHA1 | Date | |
---|---|---|---|
|
b6265cee11 | ||
|
c91af247e9 | ||
|
7a31227de1 | ||
|
4f477604c5 | ||
|
2970f4395b | ||
|
d1ec909b36 | ||
|
c67c5bbf42 |
@@ -35,21 +35,8 @@ public class RateLimiter {
|
||||
}
|
||||
|
||||
|
||||
public static RateLimiter forExpensiveRequest() {
|
||||
return new RateLimiter(5, 10);
|
||||
}
|
||||
|
||||
public static RateLimiter custom(int perMinute) {
|
||||
return new RateLimiter(perMinute, 60);
|
||||
}
|
||||
|
||||
public static RateLimiter forSpamBots() {
|
||||
return new RateLimiter(120, 3600);
|
||||
}
|
||||
|
||||
|
||||
public static RateLimiter forLogin() {
|
||||
return new RateLimiter(3, 15);
|
||||
return new RateLimiter(4 * perMinute, perMinute);
|
||||
}
|
||||
|
||||
private void cleanIdleBuckets() {
|
||||
@@ -62,7 +49,7 @@ public class RateLimiter {
|
||||
}
|
||||
|
||||
private Bucket createBucket() {
|
||||
var refill = Refill.greedy(1, Duration.ofSeconds(refillRate));
|
||||
var refill = Refill.greedy(refillRate, Duration.ofSeconds(60));
|
||||
var bw = Bandwidth.classic(capacity, refill);
|
||||
return Bucket.builder().addLimit(bw).build();
|
||||
}
|
||||
|
@@ -33,6 +33,7 @@ import java.sql.SQLException;
|
||||
import java.time.*;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
@@ -71,7 +72,7 @@ public class FeedFetcherService {
|
||||
public enum UpdateMode {
|
||||
CLEAN,
|
||||
REFRESH
|
||||
};
|
||||
}
|
||||
|
||||
public void updateFeeds(UpdateMode updateMode) throws IOException {
|
||||
if (updating) // Prevent concurrent updates
|
||||
@@ -87,6 +88,7 @@ public class FeedFetcherService {
|
||||
.followRedirects(HttpClient.Redirect.NORMAL)
|
||||
.version(HttpClient.Version.HTTP_2)
|
||||
.build();
|
||||
ExecutorService fetchExecutor = Executors.newCachedThreadPool();
|
||||
FeedJournal feedJournal = FeedJournal.create();
|
||||
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds")
|
||||
) {
|
||||
@@ -131,7 +133,7 @@ public class FeedFetcherService {
|
||||
|
||||
FetchResult feedData;
|
||||
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
|
||||
feedData = fetchFeedData(feed, client, ifModifiedSinceDate, ifNoneMatchTag);
|
||||
feedData = fetchFeedData(feed, client, fetchExecutor, ifModifiedSinceDate, ifNoneMatchTag);
|
||||
} catch (Exception ex) {
|
||||
feedData = new FetchResult.TransientError();
|
||||
}
|
||||
@@ -211,6 +213,7 @@ public class FeedFetcherService {
|
||||
|
||||
private FetchResult fetchFeedData(FeedDefinition feed,
|
||||
HttpClient client,
|
||||
ExecutorService executorService,
|
||||
@Nullable String ifModifiedSinceDate,
|
||||
@Nullable String ifNoneMatchTag)
|
||||
{
|
||||
@@ -237,7 +240,14 @@ public class FeedFetcherService {
|
||||
HttpRequest getRequest = requestBuilder.build();
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
HttpResponse<byte[]> rs = client.send(getRequest, HttpResponse.BodyHandlers.ofByteArray());
|
||||
|
||||
/* Note we need to use an executor to time-limit the send() method in HttpClient, as
|
||||
* its support for timeouts only applies to the time until response starts to be received,
|
||||
* and does not catch the case when the server starts to send data but then hangs.
|
||||
*/
|
||||
HttpResponse<byte[]> rs = executorService.submit(
|
||||
() -> client.send(getRequest, HttpResponse.BodyHandlers.ofByteArray()))
|
||||
.get(15, TimeUnit.SECONDS);
|
||||
|
||||
if (rs.statusCode() == 429) { // Too Many Requests
|
||||
int retryAfter = Integer.parseInt(rs.headers().firstValue("Retry-After").orElse("2"));
|
||||
|
@@ -501,7 +501,7 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
return new CrawlDataReference(slopPath);
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
} catch (Exception e) {
|
||||
logger.debug("Failed to read previous crawl data for {}", specification.domain());
|
||||
}
|
||||
|
||||
|
@@ -70,6 +70,7 @@ public class HttpFetcherImpl implements HttpFetcher {
|
||||
.sslContext(NoSecuritySSL.buildSslContext())
|
||||
.cookieHandler(cookies)
|
||||
.followRedirects(HttpClient.Redirect.NORMAL)
|
||||
.version(HttpClient.Version.HTTP_1_1)
|
||||
.connectTimeout(Duration.ofSeconds(8))
|
||||
.executor(executorService)
|
||||
.build();
|
||||
|
@@ -8,7 +8,10 @@ import java.net.http.HttpHeaders;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
/** Input buffer for temporary storage of a HTTP response
|
||||
@@ -39,7 +42,7 @@ public abstract class WarcInputBuffer implements AutoCloseable {
|
||||
* and suppressed from the headers.
|
||||
* If an error occurs, a buffer will be created with no content and an error status.
|
||||
*/
|
||||
static WarcInputBuffer forResponse(HttpResponse<InputStream> rsp) {
|
||||
static WarcInputBuffer forResponse(HttpResponse<InputStream> rsp, Duration timeLimit) {
|
||||
if (rsp == null)
|
||||
return new ErrorBuffer();
|
||||
|
||||
@@ -51,11 +54,11 @@ public abstract class WarcInputBuffer implements AutoCloseable {
|
||||
|
||||
if (contentEncoding == null && contentLength > 0 && contentLength < 8192) {
|
||||
// If the content is small and not compressed, we can just read it into memory
|
||||
return new MemoryBuffer(headers, is, contentLength);
|
||||
return new MemoryBuffer(headers, timeLimit, is, contentLength);
|
||||
}
|
||||
else {
|
||||
// Otherwise, we unpack it into a file and read it from there
|
||||
return new FileBuffer(headers, is);
|
||||
return new FileBuffer(headers, timeLimit, is);
|
||||
}
|
||||
}
|
||||
catch (Exception ex) {
|
||||
@@ -64,9 +67,16 @@ public abstract class WarcInputBuffer implements AutoCloseable {
|
||||
|
||||
}
|
||||
|
||||
private static final ExecutorService virtualExecutorService = Executors.newVirtualThreadPerTaskExecutor();
|
||||
|
||||
private Future<Integer> readAsync(InputStream is, byte[] out) {
|
||||
return virtualExecutorService.submit(() -> is.read(out));
|
||||
}
|
||||
|
||||
/** Copy an input stream to an output stream, with a maximum size and time limit */
|
||||
protected void copy(InputStream is, OutputStream os) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
protected void copy(InputStream is, OutputStream os, Duration timeLimit) {
|
||||
Instant start = Instant.now();
|
||||
Instant timeout = start.plus(timeLimit);
|
||||
long size = 0;
|
||||
|
||||
byte[] buffer = new byte[8192];
|
||||
@@ -76,7 +86,15 @@ public abstract class WarcInputBuffer implements AutoCloseable {
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
int n = is.read(buffer);
|
||||
Duration remaining = Duration.between(Instant.now(), timeout);
|
||||
if (remaining.isNegative()) {
|
||||
truncationReason = WarcTruncationReason.TIME;
|
||||
break;
|
||||
}
|
||||
|
||||
Future<Integer> readAsync = readAsync(is, buffer);
|
||||
int n = readAsync.get(remaining.toMillis(), TimeUnit.MILLISECONDS);
|
||||
|
||||
if (n < 0) break;
|
||||
size += n;
|
||||
os.write(buffer, 0, n);
|
||||
@@ -85,12 +103,11 @@ public abstract class WarcInputBuffer implements AutoCloseable {
|
||||
truncationReason = WarcTruncationReason.LENGTH;
|
||||
break;
|
||||
}
|
||||
|
||||
if (System.currentTimeMillis() - startTime > WarcRecorder.MAX_TIME) {
|
||||
truncationReason = WarcTruncationReason.TIME;
|
||||
break;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
} catch (IOException|ExecutionException e) {
|
||||
truncationReason = WarcTruncationReason.UNSPECIFIED;
|
||||
} catch (TimeoutException e) {
|
||||
truncationReason = WarcTruncationReason.TIME;
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
@@ -123,12 +140,12 @@ class ErrorBuffer extends WarcInputBuffer {
|
||||
/** Buffer for when we have the response in memory */
|
||||
class MemoryBuffer extends WarcInputBuffer {
|
||||
byte[] data;
|
||||
public MemoryBuffer(HttpHeaders headers, InputStream responseStream, int size) {
|
||||
public MemoryBuffer(HttpHeaders headers, Duration timeLimit, InputStream responseStream, int size) {
|
||||
super(headers);
|
||||
|
||||
var outputStream = new ByteArrayOutputStream(size);
|
||||
|
||||
copy(responseStream, outputStream);
|
||||
copy(responseStream, outputStream, timeLimit);
|
||||
|
||||
data = outputStream.toByteArray();
|
||||
}
|
||||
@@ -152,7 +169,7 @@ class MemoryBuffer extends WarcInputBuffer {
|
||||
class FileBuffer extends WarcInputBuffer {
|
||||
private final Path tempFile;
|
||||
|
||||
public FileBuffer(HttpHeaders headers, InputStream responseStream) throws IOException {
|
||||
public FileBuffer(HttpHeaders headers, Duration timeLimit, InputStream responseStream) throws IOException {
|
||||
super(suppressContentEncoding(headers));
|
||||
|
||||
this.tempFile = Files.createTempFile("rsp", ".html");
|
||||
@@ -160,7 +177,7 @@ class FileBuffer extends WarcInputBuffer {
|
||||
|
||||
if ("gzip".equalsIgnoreCase(headers.firstValue("Content-Encoding").orElse(""))) {
|
||||
try (var out = Files.newOutputStream(tempFile)) {
|
||||
copy(new GZIPInputStream(responseStream), out);
|
||||
copy(new GZIPInputStream(responseStream), out, timeLimit);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
truncationReason = WarcTruncationReason.UNSPECIFIED;
|
||||
@@ -168,7 +185,7 @@ class FileBuffer extends WarcInputBuffer {
|
||||
}
|
||||
else {
|
||||
try (var out = Files.newOutputStream(tempFile)) {
|
||||
copy(responseStream, out);
|
||||
copy(responseStream, out, timeLimit);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
truncationReason = WarcTruncationReason.UNSPECIFIED;
|
||||
|
@@ -102,7 +102,7 @@ public class WarcRecorder implements AutoCloseable {
|
||||
}
|
||||
|
||||
|
||||
try (WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response);
|
||||
try (WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response, request.timeout().orElseGet(() -> Duration.ofMillis(MAX_TIME)));
|
||||
InputStream inputStream = inputBuffer.read())
|
||||
{
|
||||
if (cookies.hasCookies()) {
|
||||
|
@@ -160,7 +160,14 @@ public class CrawlerRetreiver implements AutoCloseable {
|
||||
|
||||
// Fetch sitemaps
|
||||
for (var sitemap : robotsRules.getSitemaps()) {
|
||||
crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer));
|
||||
|
||||
// Validate the sitemap URL and check if it belongs to the domain as the root URL
|
||||
if (EdgeUrl.parse(sitemap)
|
||||
.map(url -> url.getDomain().equals(rootUrl.domain))
|
||||
.orElse(false)) {
|
||||
|
||||
crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer));
|
||||
}
|
||||
}
|
||||
|
||||
int crawlerAdditions = 0;
|
||||
|
@@ -0,0 +1,152 @@
|
||||
package nu.marginalia.crawl.retreival.fetcher;
|
||||
|
||||
import com.sun.net.httpserver.HttpServer;
|
||||
import nu.marginalia.crawl.fetcher.Cookies;
|
||||
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.netpreserve.jwarc.WarcReader;
|
||||
import org.netpreserve.jwarc.WarcRequest;
|
||||
import org.netpreserve.jwarc.WarcResponse;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Tag("slow")
|
||||
class WarcRecorderFakeServerTest {
|
||||
static HttpServer server;
|
||||
|
||||
@BeforeAll
|
||||
public static void setUpAll() throws IOException {
|
||||
server = HttpServer.create(new InetSocketAddress("127.0.0.1", 14510), 10);
|
||||
|
||||
// This endpoint will finish sending the response immediately
|
||||
server.createContext("/fast", exchange -> {
|
||||
exchange.getResponseHeaders().add("Content-Type", "text/html");
|
||||
exchange.sendResponseHeaders(200, "<html><body>hello</body></html>".length());
|
||||
|
||||
try (var os = exchange.getResponseBody()) {
|
||||
os.write("<html><body>hello</body></html>".getBytes());
|
||||
os.flush();
|
||||
}
|
||||
exchange.close();
|
||||
});
|
||||
|
||||
// This endpoint will take 10 seconds to finish sending the response,
|
||||
// which should trigger a timeout in the client
|
||||
server.createContext("/slow", exchange -> {
|
||||
exchange.getResponseHeaders().add("Content-Type", "text/html");
|
||||
exchange.sendResponseHeaders(200, "<html><body>hello</body></html>:D".length());
|
||||
|
||||
try (var os = exchange.getResponseBody()) {
|
||||
os.write("<html><body>hello</body></html>".getBytes());
|
||||
os.flush();
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
os.write(":D".getBytes());
|
||||
os.flush();
|
||||
}
|
||||
exchange.close();
|
||||
});
|
||||
|
||||
server.start();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void tearDownAll() {
|
||||
server.stop(0);
|
||||
}
|
||||
|
||||
Path fileNameWarc;
|
||||
Path fileNameParquet;
|
||||
WarcRecorder client;
|
||||
|
||||
HttpClient httpClient;
|
||||
@BeforeEach
|
||||
public void setUp() throws Exception {
|
||||
httpClient = HttpClient.newBuilder().build();
|
||||
|
||||
fileNameWarc = Files.createTempFile("test", ".warc");
|
||||
fileNameParquet = Files.createTempFile("test", ".parquet");
|
||||
|
||||
client = new WarcRecorder(fileNameWarc, new Cookies());
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() throws Exception {
|
||||
client.close();
|
||||
Files.delete(fileNameWarc);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fetchFast() throws Exception {
|
||||
client.fetch(httpClient,
|
||||
HttpRequest.newBuilder()
|
||||
.uri(new java.net.URI("http://localhost:14510/fast"))
|
||||
.timeout(Duration.ofSeconds(1))
|
||||
.header("User-agent", "test.marginalia.nu")
|
||||
.header("Accept-Encoding", "gzip")
|
||||
.GET().build()
|
||||
);
|
||||
|
||||
Map<String, String> sampleData = new HashMap<>();
|
||||
try (var warcReader = new WarcReader(fileNameWarc)) {
|
||||
warcReader.forEach(record -> {
|
||||
if (record instanceof WarcRequest req) {
|
||||
sampleData.put(record.type(), req.target());
|
||||
}
|
||||
if (record instanceof WarcResponse rsp) {
|
||||
sampleData.put(record.type(), rsp.target());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
System.out.println(sampleData);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fetchSlow() throws Exception {
|
||||
Instant start = Instant.now();
|
||||
client.fetch(httpClient,
|
||||
HttpRequest.newBuilder()
|
||||
.uri(new java.net.URI("http://localhost:14510/slow"))
|
||||
.timeout(Duration.ofSeconds(1))
|
||||
.header("User-agent", "test.marginalia.nu")
|
||||
.header("Accept-Encoding", "gzip")
|
||||
.GET().build()
|
||||
);
|
||||
Instant end = Instant.now();
|
||||
|
||||
Map<String, String> sampleData = new HashMap<>();
|
||||
try (var warcReader = new WarcReader(fileNameWarc)) {
|
||||
warcReader.forEach(record -> {
|
||||
if (record instanceof WarcRequest req) {
|
||||
sampleData.put(record.type(), req.target());
|
||||
}
|
||||
if (record instanceof WarcResponse rsp) {
|
||||
sampleData.put(record.type(), rsp.target());
|
||||
System.out.println(rsp.target());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
System.out.println(sampleData);
|
||||
|
||||
// Timeout is set to 1 second, but the server will take 5 seconds to respond,
|
||||
// so we expect the request to take 1s and change before it times out.
|
||||
|
||||
Assertions.assertTrue(Duration.between(start, end).toMillis() < 2000);
|
||||
}
|
||||
|
||||
}
|
Reference in New Issue
Block a user