1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-05 21:22:39 +02:00

Compare commits

...

3 Commits

Author SHA1 Message Date
Viktor Lofgren
fc1388422a (actor) Add the ability to filter sample data based on content type
This will help in extracting relevant test sets for PDF processing.
2025-05-02 13:09:22 +02:00
Viktor Lofgren
b07080db16 (crawler) Don't retry requests when encountering UnknownHostException 2025-05-01 16:07:34 +02:00
Viktor Lofgren
e9d86dca4a (crawler) Add timeout to wrap-up phase of WarcInputBuffer. 2025-05-01 15:57:47 +02:00
12 changed files with 116 additions and 26 deletions

View File

@@ -48,12 +48,13 @@ public class ExecutorExportClient {
return msgId;
}
public void exportSampleData(int node, FileStorageId fid, int size, String name) {
public void exportSampleData(int node, FileStorageId fid, int size, String ctFilter, String name) {
channelPool.call(ExecutorExportApiBlockingStub::exportSampleData)
.forNode(node)
.run(RpcExportSampleData.newBuilder()
.setFileStorageId(fid.id())
.setSize(size)
.setCtFilter(ctFilter)
.setName(name)
.build());
}

View File

@@ -100,6 +100,7 @@ message RpcExportSampleData {
int64 fileStorageId = 1;
int32 size = 2;
string name = 3;
string ctFilter = 4;
}
message RpcDownloadSampleData {
string sampleSet = 1;

View File

@@ -26,32 +26,32 @@ public class ExportSampleDataActor extends RecordActorPrototype {
private final MqOutbox exportTasksOutbox;
private final Logger logger = LoggerFactory.getLogger(getClass());
public record Export(FileStorageId crawlId, int size, String name) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId, int size, String name, long msgId) implements ActorStep {
public Run(FileStorageId crawlId, FileStorageId destId, int size, String name) {
this(crawlId, destId, size, name, -1);
public record Export(FileStorageId crawlId, int size, String ctFilter, String name) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId, int size, String ctFilter, String name, long msgId) implements ActorStep {
public Run(FileStorageId crawlId, FileStorageId destId, int size, String name, String ctFilter) {
this(crawlId, destId, size, name, ctFilter,-1);
}
}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch(self) {
case Export(FileStorageId crawlId, int size, String name) -> {
case Export(FileStorageId crawlId, int size, String ctFilter, String name) -> {
var storage = storageService.allocateStorage(FileStorageType.EXPORT,
"crawl-sample-export",
"Crawl Data Sample " + name + "/" + size + " " + LocalDateTime.now()
);
if (storage == null) yield new Error("Bad storage id");
yield new Run(crawlId, storage.id(), size, name);
yield new Run(crawlId, storage.id(), size, ctFilter, name);
}
case Run(FileStorageId crawlId, FileStorageId destId, int size, String name, long msgId) when msgId < 0 -> {
case Run(FileStorageId crawlId, FileStorageId destId, int size, String ctFilter, String name, long msgId) when msgId < 0 -> {
storageService.setFileStorageState(destId, FileStorageState.NEW);
long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.sampleData(crawlId, destId, size, name));
yield new Run(crawlId, destId, size, name, newMsgId);
long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.sampleData(crawlId, destId, ctFilter, size, name));
yield new Run(crawlId, destId, size, ctFilter, name, newMsgId);
}
case Run(_, FileStorageId destId, _, _, long msgId) -> {
case Run(_, FileStorageId destId, _, _, _, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) {
@@ -70,7 +70,7 @@ public class ExportSampleDataActor extends RecordActorPrototype {
@Override
public String describe() {
return "Export RSS/Atom feeds from crawl data";
return "Export sample crawl data";
}
@Inject

View File

@@ -49,6 +49,7 @@ public class ExecutorExportGrpcService
new ExportSampleDataActor.Export(
FileStorageId.of(request.getFileStorageId()),
request.getSize(),
request.getCtFilter(),
request.getName()
)
);

View File

@@ -51,6 +51,7 @@ import javax.net.ssl.SSLException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.Instant;
@@ -635,14 +636,12 @@ public class HttpFetcherImpl implements HttpFetcher, HttpRequestRetryStrategy {
@Override
public boolean retryRequest(HttpRequest request, IOException exception, int executionCount, HttpContext context) {
if (exception instanceof SocketTimeoutException) { // Timeouts are not recoverable
return false;
}
if (exception instanceof SSLException) { // SSL exceptions are unlikely to be recoverable
return false;
}
return executionCount <= 3;
return switch (exception) {
case SocketTimeoutException ste -> false;
case SSLException ssle -> false;
case UnknownHostException uhe -> false;
default -> executionCount <= 3;
};
}
@Override

View File

@@ -57,6 +57,7 @@ public abstract class WarcInputBuffer implements AutoCloseable {
return new ErrorBuffer();
}
Instant start = Instant.now();
InputStream is = null;
try {
is = entity.getContent();
@@ -71,8 +72,25 @@ public abstract class WarcInputBuffer implements AutoCloseable {
}
}
finally {
// We're required to consume the stream to avoid leaking connections,
// but we also don't want to get stuck on slow or malicious connections
// forever, so we set a time limit on this phase and call abort() if it's exceeded.
try {
is.skip(Long.MAX_VALUE);
while (is != null) {
// Consume some data
if (is.skip(65536) == 0) {
// Note that skip may return 0 if the stream is empty
// or for other unspecified reasons, so we need to check
// with read() as well to determine if the stream is done
if (is.read() == -1)
is = null;
}
// Check if the time limit has been exceeded
else if (Duration.between(start, Instant.now()).compareTo(timeLimit) > 0) {
request.abort();
is = null;
}
}
}
catch (IOException e) {
// Ignore the exception

View File

@@ -53,6 +53,8 @@ dependencies {
implementation libs.commons.compress
implementation libs.commons.codec
implementation libs.jsoup
implementation libs.slop
implementation libs.jwarc

View File

@@ -3,11 +3,15 @@ package nu.marginalia.extractor;
import com.google.inject.Inject;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.process.log.WorkLogEntry;
import nu.marginalia.slop.SlopCrawlDataRecord;
import nu.marginalia.slop.SlopTablePacker;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorage;
import nu.marginalia.storage.model.FileStorageId;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.nio.file.Files;
@@ -27,7 +31,7 @@ public class SampleDataExporter {
public SampleDataExporter(FileStorageService storageService) {
this.storageService = storageService;
}
public void export(FileStorageId crawlId, FileStorageId destId, int size, String name) throws SQLException, IOException {
public void export(FileStorageId crawlId, FileStorageId destId, int size, String ctFilter, String name) throws SQLException, IOException {
FileStorage destStorage = storageService.getStorage(destId);
Path inputDir = storageService.getStorage(crawlId).asPath();
@@ -54,6 +58,7 @@ public class SampleDataExporter {
Path newCrawlerLogFile = Files.createTempFile(destStorage.asPath(), "crawler", ".log",
PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--")));
try (var bw = Files.newBufferedWriter(newCrawlerLogFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)) {
for (var item : entriesAll) {
bw.write(item.id() + " " + item.ts() + " " + item.relPath() + " " + item.cnt() + "\n");
@@ -72,7 +77,22 @@ public class SampleDataExporter {
Path crawlDataPath = inputDir.resolve(item.relPath());
if (!Files.exists(crawlDataPath)) continue;
addFileToTar(stream, crawlDataPath, item.relPath());
if (StringUtils.isBlank(ctFilter)) {
addFileToTar(stream, crawlDataPath, item.relPath());
}
else /* filter != null */ {
boolean didFilterData = false;
try {
crawlDataPath = filterEntries(crawlDataPath, ctFilter);
didFilterData = true;
addFileToTar(stream, crawlDataPath, item.relPath());
}
finally {
if (didFilterData) {
Files.deleteIfExists(crawlDataPath);
}
}
}
}
addFileToTar(stream, newCrawlerLogFile, "crawler.log");
@@ -86,6 +106,46 @@ public class SampleDataExporter {
Files.move(tmpTarFile, destStorage.asPath().resolve("crawl-data.tar"), StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
}
/** Filters the entries in the crawl data file based on the content type.
* @param crawlDataPath The path to the crawl data file.
* @param contentTypeFilter The content type to filter by.
* @return The path to the filtered crawl data file, or null if an error occurred.
*/
private Path filterEntries(Path crawlDataPath, String contentTypeFilter) throws IOException {
Path tempDir = crawlDataPath.resolveSibling(crawlDataPath.getFileName() + ".filtered");
Path tempFile = crawlDataPath.resolveSibling(crawlDataPath.getFileName() + ".filtered.slop.zip");
Files.createDirectory(tempDir);
try (var writer = new SlopCrawlDataRecord.Writer(tempDir);
var reader = new SlopCrawlDataRecord.FilteringReader(crawlDataPath) {
@Override
public boolean filter(String url, int status, String contentType) {
if (contentTypeFilter.equals(contentType))
return true;
else if (contentType.startsWith("x-marginalia/"))
// This is a metadata entry, typically domain or redirect information
// let's keep those to not confuse the consumer of the data, which might
// expect at least the domain summary
return true;
return false;
}
}
) {
while (reader.hasRemaining()) {
writer.write(reader.get());
}
SlopTablePacker.packToSlopZip(tempDir, tempFile);
}
finally {
FileUtils.deleteDirectory(tempDir.toFile());
}
return tempFile;
}
private void addFileToTar(TarArchiveOutputStream outputStream, Path file, String fileName) throws IOException {
var entry = outputStream.createArchiveEntry(file.toFile(), fileName);
entry.setSize(Files.size(file));

View File

@@ -92,7 +92,7 @@ public class ExportTasksMain extends ProcessMainClass {
termFrequencyExporter.export(request.crawlId, request.destId);
break;
case SAMPLE_DATA:
sampleDataExporter.export(request.crawlId, request.destId, request.size, request.name);
sampleDataExporter.export(request.crawlId, request.destId, request.size, request.ctFilter, request.name);
break;
case ADJACENCIES:
websiteAdjacenciesCalculator.export();

View File

@@ -16,6 +16,7 @@ public class ExportTaskRequest {
public FileStorageId destId;
public int size;
public String name;
public String ctFilter;
public ExportTaskRequest(Task task) {
this.task = task;
@@ -42,12 +43,13 @@ public class ExportTaskRequest {
return request;
}
public static ExportTaskRequest sampleData(FileStorageId crawlId, FileStorageId destId, int size, String name) {
public static ExportTaskRequest sampleData(FileStorageId crawlId, FileStorageId destId, String ctFilter, int size, String name) {
ExportTaskRequest request = new ExportTaskRequest(Task.SAMPLE_DATA);
request.crawlId = crawlId;
request.destId = destId;
request.size = size;
request.name = name;
request.ctFilter = ctFilter;
return request;
}

View File

@@ -321,9 +321,10 @@ public class ControlNodeActionsService {
private Object exportSampleData(Request req, Response rsp) {
FileStorageId source = parseSourceFileStorageId(req.queryParams("source"));
int size = Integer.parseInt(req.queryParams("size"));
String ctFilter = req.queryParams("ctFilter");
String name = req.queryParams("name");
exportClient.exportSampleData(Integer.parseInt(req.params("id")), source, size, name);
exportClient.exportSampleData(Integer.parseInt(req.params("id")), source, size, ctFilter, name);
return "";
}

View File

@@ -35,6 +35,11 @@
<div><input type="text" name="size" id="size" pattern="\d+" /></div>
<small class="text-muted">How many domains to include in the sample set</small>
</div>
<div class="mb-3">
<label for="ctFilter">Content Type Filter</label>
<div><input type="text" name="ctFilter" id="ctFilter" /></div>
<small class="text-muted">If set, includes only documents with the specified content type value</small>
</div>
<div class="mb-3">
<label for="name">Name</label>
<div><input type="text" name="name" id="name" /></div>