mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 17:32:39 +02:00
Compare commits
1 Commits
deploy-011
...
deploy-011
Author | SHA1 | Date | |
---|---|---|---|
|
d1ec909b36 |
@@ -8,7 +8,10 @@ import java.net.http.HttpHeaders;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
/** Input buffer for temporary storage of a HTTP response
|
||||
@@ -39,7 +42,7 @@ public abstract class WarcInputBuffer implements AutoCloseable {
|
||||
* and suppressed from the headers.
|
||||
* If an error occurs, a buffer will be created with no content and an error status.
|
||||
*/
|
||||
static WarcInputBuffer forResponse(HttpResponse<InputStream> rsp) {
|
||||
static WarcInputBuffer forResponse(HttpResponse<InputStream> rsp, Duration timeLimit) {
|
||||
if (rsp == null)
|
||||
return new ErrorBuffer();
|
||||
|
||||
@@ -51,11 +54,11 @@ public abstract class WarcInputBuffer implements AutoCloseable {
|
||||
|
||||
if (contentEncoding == null && contentLength > 0 && contentLength < 8192) {
|
||||
// If the content is small and not compressed, we can just read it into memory
|
||||
return new MemoryBuffer(headers, is, contentLength);
|
||||
return new MemoryBuffer(headers, timeLimit, is, contentLength);
|
||||
}
|
||||
else {
|
||||
// Otherwise, we unpack it into a file and read it from there
|
||||
return new FileBuffer(headers, is);
|
||||
return new FileBuffer(headers, timeLimit, is);
|
||||
}
|
||||
}
|
||||
catch (Exception ex) {
|
||||
@@ -64,9 +67,16 @@ public abstract class WarcInputBuffer implements AutoCloseable {
|
||||
|
||||
}
|
||||
|
||||
private static final ExecutorService virtualExecutorService = Executors.newVirtualThreadPerTaskExecutor();
|
||||
|
||||
private Future<Integer> readAsync(InputStream is, byte[] out) {
|
||||
return virtualExecutorService.submit(() -> is.read(out));
|
||||
}
|
||||
|
||||
/** Copy an input stream to an output stream, with a maximum size and time limit */
|
||||
protected void copy(InputStream is, OutputStream os) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
protected void copy(InputStream is, OutputStream os, Duration timeLimit) {
|
||||
Instant start = Instant.now();
|
||||
Instant timeout = start.plus(timeLimit);
|
||||
long size = 0;
|
||||
|
||||
byte[] buffer = new byte[8192];
|
||||
@@ -76,7 +86,15 @@ public abstract class WarcInputBuffer implements AutoCloseable {
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
int n = is.read(buffer);
|
||||
Duration remaining = Duration.between(Instant.now(), timeout);
|
||||
if (remaining.isNegative()) {
|
||||
truncationReason = WarcTruncationReason.TIME;
|
||||
break;
|
||||
}
|
||||
|
||||
Future<Integer> readAsync = readAsync(is, buffer);
|
||||
int n = readAsync.get(remaining.toMillis(), TimeUnit.MILLISECONDS);
|
||||
|
||||
if (n < 0) break;
|
||||
size += n;
|
||||
os.write(buffer, 0, n);
|
||||
@@ -85,12 +103,11 @@ public abstract class WarcInputBuffer implements AutoCloseable {
|
||||
truncationReason = WarcTruncationReason.LENGTH;
|
||||
break;
|
||||
}
|
||||
|
||||
if (System.currentTimeMillis() - startTime > WarcRecorder.MAX_TIME) {
|
||||
truncationReason = WarcTruncationReason.TIME;
|
||||
break;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
} catch (IOException|ExecutionException e) {
|
||||
truncationReason = WarcTruncationReason.UNSPECIFIED;
|
||||
} catch (TimeoutException e) {
|
||||
truncationReason = WarcTruncationReason.TIME;
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
@@ -123,12 +140,12 @@ class ErrorBuffer extends WarcInputBuffer {
|
||||
/** Buffer for when we have the response in memory */
|
||||
class MemoryBuffer extends WarcInputBuffer {
|
||||
byte[] data;
|
||||
public MemoryBuffer(HttpHeaders headers, InputStream responseStream, int size) {
|
||||
public MemoryBuffer(HttpHeaders headers, Duration timeLimit, InputStream responseStream, int size) {
|
||||
super(headers);
|
||||
|
||||
var outputStream = new ByteArrayOutputStream(size);
|
||||
|
||||
copy(responseStream, outputStream);
|
||||
copy(responseStream, outputStream, timeLimit);
|
||||
|
||||
data = outputStream.toByteArray();
|
||||
}
|
||||
@@ -152,7 +169,7 @@ class MemoryBuffer extends WarcInputBuffer {
|
||||
class FileBuffer extends WarcInputBuffer {
|
||||
private final Path tempFile;
|
||||
|
||||
public FileBuffer(HttpHeaders headers, InputStream responseStream) throws IOException {
|
||||
public FileBuffer(HttpHeaders headers, Duration timeLimit, InputStream responseStream) throws IOException {
|
||||
super(suppressContentEncoding(headers));
|
||||
|
||||
this.tempFile = Files.createTempFile("rsp", ".html");
|
||||
@@ -160,7 +177,7 @@ class FileBuffer extends WarcInputBuffer {
|
||||
|
||||
if ("gzip".equalsIgnoreCase(headers.firstValue("Content-Encoding").orElse(""))) {
|
||||
try (var out = Files.newOutputStream(tempFile)) {
|
||||
copy(new GZIPInputStream(responseStream), out);
|
||||
copy(new GZIPInputStream(responseStream), out, timeLimit);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
truncationReason = WarcTruncationReason.UNSPECIFIED;
|
||||
@@ -168,7 +185,7 @@ class FileBuffer extends WarcInputBuffer {
|
||||
}
|
||||
else {
|
||||
try (var out = Files.newOutputStream(tempFile)) {
|
||||
copy(responseStream, out);
|
||||
copy(responseStream, out, timeLimit);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
truncationReason = WarcTruncationReason.UNSPECIFIED;
|
||||
|
@@ -102,7 +102,7 @@ public class WarcRecorder implements AutoCloseable {
|
||||
}
|
||||
|
||||
|
||||
try (WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response);
|
||||
try (WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response, request.timeout().orElseGet(() -> Duration.ofMillis(MAX_TIME)));
|
||||
InputStream inputStream = inputBuffer.read())
|
||||
{
|
||||
if (cookies.hasCookies()) {
|
||||
|
@@ -0,0 +1,149 @@
|
||||
package nu.marginalia.crawl.retreival.fetcher;
|
||||
|
||||
import com.sun.net.httpserver.HttpServer;
|
||||
import nu.marginalia.crawl.fetcher.Cookies;
|
||||
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.netpreserve.jwarc.WarcReader;
|
||||
import org.netpreserve.jwarc.WarcRequest;
|
||||
import org.netpreserve.jwarc.WarcResponse;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Tag("slow")
|
||||
class WarcRecorderFakeServerTest {
|
||||
static HttpServer server;
|
||||
|
||||
@BeforeAll
|
||||
public static void setUpAll() throws IOException {
|
||||
server = HttpServer.create(new InetSocketAddress("127.0.0.1", 14510), 10);
|
||||
server.createContext("/fast", exchange -> {
|
||||
exchange.getResponseHeaders().add("Content-Type", "text/html");
|
||||
exchange.sendResponseHeaders(200, "<html><body>hello</body></html>".length());
|
||||
|
||||
try (var os = exchange.getResponseBody()) {
|
||||
os.write("<html><body>hello</body></html>".getBytes());
|
||||
os.flush();
|
||||
}
|
||||
exchange.close();
|
||||
});
|
||||
|
||||
server.createContext("/slow", exchange -> {
|
||||
exchange.getResponseHeaders().add("Content-Type", "text/html");
|
||||
exchange.sendResponseHeaders(200, "<html><body>hello</body></html>:D".length());
|
||||
|
||||
try (var os = exchange.getResponseBody()) {
|
||||
os.write("<html><body>hello</body></html>".getBytes());
|
||||
os.flush();
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
os.write(":D".getBytes());
|
||||
os.flush();
|
||||
}
|
||||
exchange.close();
|
||||
});
|
||||
|
||||
server.start();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void tearDownAll() {
|
||||
server.stop(0);
|
||||
}
|
||||
|
||||
Path fileNameWarc;
|
||||
Path fileNameParquet;
|
||||
WarcRecorder client;
|
||||
|
||||
HttpClient httpClient;
|
||||
@BeforeEach
|
||||
public void setUp() throws Exception {
|
||||
httpClient = HttpClient.newBuilder().build();
|
||||
|
||||
fileNameWarc = Files.createTempFile("test", ".warc");
|
||||
fileNameParquet = Files.createTempFile("test", ".parquet");
|
||||
|
||||
client = new WarcRecorder(fileNameWarc, new Cookies());
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() throws Exception {
|
||||
client.close();
|
||||
Files.delete(fileNameWarc);
|
||||
}
|
||||
|
||||
@Test
|
||||
void fetchFast() throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException {
|
||||
client.fetch(httpClient,
|
||||
HttpRequest.newBuilder()
|
||||
.uri(new java.net.URI("http://localhost:14510/fast"))
|
||||
.timeout(Duration.ofSeconds(1))
|
||||
.header("User-agent", "test.marginalia.nu")
|
||||
.header("Accept-Encoding", "gzip")
|
||||
.GET().build()
|
||||
);
|
||||
|
||||
Map<String, String> sampleData = new HashMap<>();
|
||||
try (var warcReader = new WarcReader(fileNameWarc)) {
|
||||
warcReader.forEach(record -> {
|
||||
if (record instanceof WarcRequest req) {
|
||||
sampleData.put(record.type(), req.target());
|
||||
}
|
||||
if (record instanceof WarcResponse rsp) {
|
||||
sampleData.put(record.type(), rsp.target());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
System.out.println(sampleData);
|
||||
}
|
||||
|
||||
@Test
|
||||
void fetchSlow() throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException {
|
||||
Instant start = Instant.now();
|
||||
client.fetch(httpClient,
|
||||
HttpRequest.newBuilder()
|
||||
.uri(new java.net.URI("http://localhost:14510/slow"))
|
||||
.timeout(Duration.ofSeconds(1))
|
||||
.header("User-agent", "test.marginalia.nu")
|
||||
.header("Accept-Encoding", "gzip")
|
||||
.GET().build()
|
||||
);
|
||||
Instant end = Instant.now();
|
||||
|
||||
Map<String, String> sampleData = new HashMap<>();
|
||||
try (var warcReader = new WarcReader(fileNameWarc)) {
|
||||
warcReader.forEach(record -> {
|
||||
if (record instanceof WarcRequest req) {
|
||||
sampleData.put(record.type(), req.target());
|
||||
}
|
||||
if (record instanceof WarcResponse rsp) {
|
||||
sampleData.put(record.type(), rsp.target());
|
||||
System.out.println(rsp.target());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
System.out.println(sampleData);
|
||||
|
||||
// Timeout is set to 1 second, but the server will take 5 seconds to respond, so we expect the request to take 1s and change
|
||||
// before it times out.
|
||||
Assertions.assertTrue(Duration.between(start, end).toMillis() < 2000, "Request should take less than 2 seconds");
|
||||
}
|
||||
|
||||
}
|
Reference in New Issue
Block a user