1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 17:32:39 +02:00

Compare commits

...

1 Commits

Author SHA1 Message Date
Viktor Lofgren
d1ec909b36 (crawler) Improve handling of timeouts to prevent crawler from getting stuck 2025-04-02 12:57:21 +02:00
3 changed files with 184 additions and 18 deletions

View File

@@ -8,7 +8,10 @@ import java.net.http.HttpHeaders;
import java.net.http.HttpResponse;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.*;
import java.util.zip.GZIPInputStream;
/** Input buffer for temporary storage of a HTTP response
@@ -39,7 +42,7 @@ public abstract class WarcInputBuffer implements AutoCloseable {
* and suppressed from the headers.
* If an error occurs, a buffer will be created with no content and an error status.
*/
static WarcInputBuffer forResponse(HttpResponse<InputStream> rsp) {
static WarcInputBuffer forResponse(HttpResponse<InputStream> rsp, Duration timeLimit) {
if (rsp == null)
return new ErrorBuffer();
@@ -51,11 +54,11 @@ public abstract class WarcInputBuffer implements AutoCloseable {
if (contentEncoding == null && contentLength > 0 && contentLength < 8192) {
// If the content is small and not compressed, we can just read it into memory
return new MemoryBuffer(headers, is, contentLength);
return new MemoryBuffer(headers, timeLimit, is, contentLength);
}
else {
// Otherwise, we unpack it into a file and read it from there
return new FileBuffer(headers, is);
return new FileBuffer(headers, timeLimit, is);
}
}
catch (Exception ex) {
@@ -64,9 +67,16 @@ public abstract class WarcInputBuffer implements AutoCloseable {
}
private static final ExecutorService virtualExecutorService = Executors.newVirtualThreadPerTaskExecutor();
private Future<Integer> readAsync(InputStream is, byte[] out) {
return virtualExecutorService.submit(() -> is.read(out));
}
/** Copy an input stream to an output stream, with a maximum size and time limit */
protected void copy(InputStream is, OutputStream os) {
long startTime = System.currentTimeMillis();
protected void copy(InputStream is, OutputStream os, Duration timeLimit) {
Instant start = Instant.now();
Instant timeout = start.plus(timeLimit);
long size = 0;
byte[] buffer = new byte[8192];
@@ -76,7 +86,15 @@ public abstract class WarcInputBuffer implements AutoCloseable {
while (true) {
try {
int n = is.read(buffer);
Duration remaining = Duration.between(Instant.now(), timeout);
if (remaining.isNegative()) {
truncationReason = WarcTruncationReason.TIME;
break;
}
Future<Integer> readAsync = readAsync(is, buffer);
int n = readAsync.get(remaining.toMillis(), TimeUnit.MILLISECONDS);
if (n < 0) break;
size += n;
os.write(buffer, 0, n);
@@ -85,12 +103,11 @@ public abstract class WarcInputBuffer implements AutoCloseable {
truncationReason = WarcTruncationReason.LENGTH;
break;
}
if (System.currentTimeMillis() - startTime > WarcRecorder.MAX_TIME) {
truncationReason = WarcTruncationReason.TIME;
break;
}
} catch (IOException e) {
} catch (IOException|ExecutionException e) {
truncationReason = WarcTruncationReason.UNSPECIFIED;
} catch (TimeoutException e) {
truncationReason = WarcTruncationReason.TIME;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@@ -123,12 +140,12 @@ class ErrorBuffer extends WarcInputBuffer {
/** Buffer for when we have the response in memory */
class MemoryBuffer extends WarcInputBuffer {
byte[] data;
public MemoryBuffer(HttpHeaders headers, InputStream responseStream, int size) {
public MemoryBuffer(HttpHeaders headers, Duration timeLimit, InputStream responseStream, int size) {
super(headers);
var outputStream = new ByteArrayOutputStream(size);
copy(responseStream, outputStream);
copy(responseStream, outputStream, timeLimit);
data = outputStream.toByteArray();
}
@@ -152,7 +169,7 @@ class MemoryBuffer extends WarcInputBuffer {
class FileBuffer extends WarcInputBuffer {
private final Path tempFile;
public FileBuffer(HttpHeaders headers, InputStream responseStream) throws IOException {
public FileBuffer(HttpHeaders headers, Duration timeLimit, InputStream responseStream) throws IOException {
super(suppressContentEncoding(headers));
this.tempFile = Files.createTempFile("rsp", ".html");
@@ -160,7 +177,7 @@ class FileBuffer extends WarcInputBuffer {
if ("gzip".equalsIgnoreCase(headers.firstValue("Content-Encoding").orElse(""))) {
try (var out = Files.newOutputStream(tempFile)) {
copy(new GZIPInputStream(responseStream), out);
copy(new GZIPInputStream(responseStream), out, timeLimit);
}
catch (Exception ex) {
truncationReason = WarcTruncationReason.UNSPECIFIED;
@@ -168,7 +185,7 @@ class FileBuffer extends WarcInputBuffer {
}
else {
try (var out = Files.newOutputStream(tempFile)) {
copy(responseStream, out);
copy(responseStream, out, timeLimit);
}
catch (Exception ex) {
truncationReason = WarcTruncationReason.UNSPECIFIED;

View File

@@ -102,7 +102,7 @@ public class WarcRecorder implements AutoCloseable {
}
try (WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response);
try (WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response, request.timeout().orElseGet(() -> Duration.ofMillis(MAX_TIME)));
InputStream inputStream = inputBuffer.read())
{
if (cookies.hasCookies()) {

View File

@@ -0,0 +1,149 @@
package nu.marginalia.crawl.retreival.fetcher;
import com.sun.net.httpserver.HttpServer;
import nu.marginalia.crawl.fetcher.Cookies;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import org.junit.jupiter.api.*;
import org.netpreserve.jwarc.WarcReader;
import org.netpreserve.jwarc.WarcRequest;
import org.netpreserve.jwarc.WarcResponse;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Tag("slow")
class WarcRecorderFakeServerTest {
static HttpServer server;
@BeforeAll
public static void setUpAll() throws IOException {
server = HttpServer.create(new InetSocketAddress("127.0.0.1", 14510), 10);
server.createContext("/fast", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "text/html");
exchange.sendResponseHeaders(200, "<html><body>hello</body></html>".length());
try (var os = exchange.getResponseBody()) {
os.write("<html><body>hello</body></html>".getBytes());
os.flush();
}
exchange.close();
});
server.createContext("/slow", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "text/html");
exchange.sendResponseHeaders(200, "<html><body>hello</body></html>:D".length());
try (var os = exchange.getResponseBody()) {
os.write("<html><body>hello</body></html>".getBytes());
os.flush();
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
os.write(":D".getBytes());
os.flush();
}
exchange.close();
});
server.start();
}
@AfterAll
public static void tearDownAll() {
server.stop(0);
}
Path fileNameWarc;
Path fileNameParquet;
WarcRecorder client;
HttpClient httpClient;
@BeforeEach
public void setUp() throws Exception {
httpClient = HttpClient.newBuilder().build();
fileNameWarc = Files.createTempFile("test", ".warc");
fileNameParquet = Files.createTempFile("test", ".parquet");
client = new WarcRecorder(fileNameWarc, new Cookies());
}
@AfterEach
public void tearDown() throws Exception {
client.close();
Files.delete(fileNameWarc);
}
@Test
void fetchFast() throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException {
client.fetch(httpClient,
HttpRequest.newBuilder()
.uri(new java.net.URI("http://localhost:14510/fast"))
.timeout(Duration.ofSeconds(1))
.header("User-agent", "test.marginalia.nu")
.header("Accept-Encoding", "gzip")
.GET().build()
);
Map<String, String> sampleData = new HashMap<>();
try (var warcReader = new WarcReader(fileNameWarc)) {
warcReader.forEach(record -> {
if (record instanceof WarcRequest req) {
sampleData.put(record.type(), req.target());
}
if (record instanceof WarcResponse rsp) {
sampleData.put(record.type(), rsp.target());
}
});
}
System.out.println(sampleData);
}
@Test
void fetchSlow() throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException {
Instant start = Instant.now();
client.fetch(httpClient,
HttpRequest.newBuilder()
.uri(new java.net.URI("http://localhost:14510/slow"))
.timeout(Duration.ofSeconds(1))
.header("User-agent", "test.marginalia.nu")
.header("Accept-Encoding", "gzip")
.GET().build()
);
Instant end = Instant.now();
Map<String, String> sampleData = new HashMap<>();
try (var warcReader = new WarcReader(fileNameWarc)) {
warcReader.forEach(record -> {
if (record instanceof WarcRequest req) {
sampleData.put(record.type(), req.target());
}
if (record instanceof WarcResponse rsp) {
sampleData.put(record.type(), rsp.target());
System.out.println(rsp.target());
}
});
}
System.out.println(sampleData);
// Timeout is set to 1 second, but the server will take 5 seconds to respond, so we expect the request to take 1s and change
// before it times out.
Assertions.assertTrue(Duration.between(start, end).toMillis() < 2000, "Request should take less than 2 seconds");
}
}