mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-05 21:22:39 +02:00
(actor) Add the ability to filter sample data based on content type
This will help in extracting relevant test sets for PDF processing.
This commit is contained in:
@@ -48,12 +48,13 @@ public class ExecutorExportClient {
|
||||
return msgId;
|
||||
}
|
||||
|
||||
public void exportSampleData(int node, FileStorageId fid, int size, String name) {
|
||||
public void exportSampleData(int node, FileStorageId fid, int size, String ctFilter, String name) {
|
||||
channelPool.call(ExecutorExportApiBlockingStub::exportSampleData)
|
||||
.forNode(node)
|
||||
.run(RpcExportSampleData.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.setSize(size)
|
||||
.setCtFilter(ctFilter)
|
||||
.setName(name)
|
||||
.build());
|
||||
}
|
||||
|
@@ -100,6 +100,7 @@ message RpcExportSampleData {
|
||||
int64 fileStorageId = 1;
|
||||
int32 size = 2;
|
||||
string name = 3;
|
||||
string ctFilter = 4;
|
||||
}
|
||||
message RpcDownloadSampleData {
|
||||
string sampleSet = 1;
|
||||
|
@@ -26,32 +26,32 @@ public class ExportSampleDataActor extends RecordActorPrototype {
|
||||
private final MqOutbox exportTasksOutbox;
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
public record Export(FileStorageId crawlId, int size, String name) implements ActorStep {}
|
||||
public record Run(FileStorageId crawlId, FileStorageId destId, int size, String name, long msgId) implements ActorStep {
|
||||
public Run(FileStorageId crawlId, FileStorageId destId, int size, String name) {
|
||||
this(crawlId, destId, size, name, -1);
|
||||
public record Export(FileStorageId crawlId, int size, String ctFilter, String name) implements ActorStep {}
|
||||
public record Run(FileStorageId crawlId, FileStorageId destId, int size, String ctFilter, String name, long msgId) implements ActorStep {
|
||||
public Run(FileStorageId crawlId, FileStorageId destId, int size, String name, String ctFilter) {
|
||||
this(crawlId, destId, size, name, ctFilter,-1);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActorStep transition(ActorStep self) throws Exception {
|
||||
return switch(self) {
|
||||
case Export(FileStorageId crawlId, int size, String name) -> {
|
||||
case Export(FileStorageId crawlId, int size, String ctFilter, String name) -> {
|
||||
var storage = storageService.allocateStorage(FileStorageType.EXPORT,
|
||||
"crawl-sample-export",
|
||||
"Crawl Data Sample " + name + "/" + size + " " + LocalDateTime.now()
|
||||
);
|
||||
|
||||
if (storage == null) yield new Error("Bad storage id");
|
||||
yield new Run(crawlId, storage.id(), size, name);
|
||||
yield new Run(crawlId, storage.id(), size, ctFilter, name);
|
||||
}
|
||||
case Run(FileStorageId crawlId, FileStorageId destId, int size, String name, long msgId) when msgId < 0 -> {
|
||||
case Run(FileStorageId crawlId, FileStorageId destId, int size, String ctFilter, String name, long msgId) when msgId < 0 -> {
|
||||
storageService.setFileStorageState(destId, FileStorageState.NEW);
|
||||
|
||||
long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.sampleData(crawlId, destId, size, name));
|
||||
yield new Run(crawlId, destId, size, name, newMsgId);
|
||||
long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.sampleData(crawlId, destId, ctFilter, size, name));
|
||||
yield new Run(crawlId, destId, size, ctFilter, name, newMsgId);
|
||||
}
|
||||
case Run(_, FileStorageId destId, _, _, long msgId) -> {
|
||||
case Run(_, FileStorageId destId, _, _, _, long msgId) -> {
|
||||
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
|
||||
|
||||
if (rsp.state() != MqMessageState.OK) {
|
||||
@@ -70,7 +70,7 @@ public class ExportSampleDataActor extends RecordActorPrototype {
|
||||
|
||||
@Override
|
||||
public String describe() {
|
||||
return "Export RSS/Atom feeds from crawl data";
|
||||
return "Export sample crawl data";
|
||||
}
|
||||
|
||||
@Inject
|
||||
|
@@ -49,6 +49,7 @@ public class ExecutorExportGrpcService
|
||||
new ExportSampleDataActor.Export(
|
||||
FileStorageId.of(request.getFileStorageId()),
|
||||
request.getSize(),
|
||||
request.getCtFilter(),
|
||||
request.getName()
|
||||
)
|
||||
);
|
||||
|
@@ -53,6 +53,8 @@ dependencies {
|
||||
implementation libs.commons.compress
|
||||
implementation libs.commons.codec
|
||||
implementation libs.jsoup
|
||||
implementation libs.slop
|
||||
implementation libs.jwarc
|
||||
|
||||
|
||||
|
||||
|
@@ -3,11 +3,15 @@ package nu.marginalia.extractor;
|
||||
import com.google.inject.Inject;
|
||||
import nu.marginalia.process.log.WorkLog;
|
||||
import nu.marginalia.process.log.WorkLogEntry;
|
||||
import nu.marginalia.slop.SlopCrawlDataRecord;
|
||||
import nu.marginalia.slop.SlopTablePacker;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorage;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
|
||||
import org.apache.commons.compress.utils.IOUtils;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
@@ -27,7 +31,7 @@ public class SampleDataExporter {
|
||||
public SampleDataExporter(FileStorageService storageService) {
|
||||
this.storageService = storageService;
|
||||
}
|
||||
public void export(FileStorageId crawlId, FileStorageId destId, int size, String name) throws SQLException, IOException {
|
||||
public void export(FileStorageId crawlId, FileStorageId destId, int size, String ctFilter, String name) throws SQLException, IOException {
|
||||
FileStorage destStorage = storageService.getStorage(destId);
|
||||
Path inputDir = storageService.getStorage(crawlId).asPath();
|
||||
|
||||
@@ -54,6 +58,7 @@ public class SampleDataExporter {
|
||||
|
||||
Path newCrawlerLogFile = Files.createTempFile(destStorage.asPath(), "crawler", ".log",
|
||||
PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--")));
|
||||
|
||||
try (var bw = Files.newBufferedWriter(newCrawlerLogFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)) {
|
||||
for (var item : entriesAll) {
|
||||
bw.write(item.id() + " " + item.ts() + " " + item.relPath() + " " + item.cnt() + "\n");
|
||||
@@ -72,7 +77,22 @@ public class SampleDataExporter {
|
||||
Path crawlDataPath = inputDir.resolve(item.relPath());
|
||||
if (!Files.exists(crawlDataPath)) continue;
|
||||
|
||||
addFileToTar(stream, crawlDataPath, item.relPath());
|
||||
if (StringUtils.isBlank(ctFilter)) {
|
||||
addFileToTar(stream, crawlDataPath, item.relPath());
|
||||
}
|
||||
else /* filter != null */ {
|
||||
boolean didFilterData = false;
|
||||
try {
|
||||
crawlDataPath = filterEntries(crawlDataPath, ctFilter);
|
||||
didFilterData = true;
|
||||
addFileToTar(stream, crawlDataPath, item.relPath());
|
||||
}
|
||||
finally {
|
||||
if (didFilterData) {
|
||||
Files.deleteIfExists(crawlDataPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
addFileToTar(stream, newCrawlerLogFile, "crawler.log");
|
||||
@@ -86,6 +106,46 @@ public class SampleDataExporter {
|
||||
Files.move(tmpTarFile, destStorage.asPath().resolve("crawl-data.tar"), StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
|
||||
}
|
||||
|
||||
/** Filters the entries in the crawl data file based on the content type.
|
||||
* @param crawlDataPath The path to the crawl data file.
|
||||
* @param contentTypeFilter The content type to filter by.
|
||||
* @return The path to the filtered crawl data file, or null if an error occurred.
|
||||
*/
|
||||
private Path filterEntries(Path crawlDataPath, String contentTypeFilter) throws IOException {
|
||||
Path tempDir = crawlDataPath.resolveSibling(crawlDataPath.getFileName() + ".filtered");
|
||||
Path tempFile = crawlDataPath.resolveSibling(crawlDataPath.getFileName() + ".filtered.slop.zip");
|
||||
|
||||
Files.createDirectory(tempDir);
|
||||
|
||||
try (var writer = new SlopCrawlDataRecord.Writer(tempDir);
|
||||
var reader = new SlopCrawlDataRecord.FilteringReader(crawlDataPath) {
|
||||
@Override
|
||||
public boolean filter(String url, int status, String contentType) {
|
||||
if (contentTypeFilter.equals(contentType))
|
||||
return true;
|
||||
else if (contentType.startsWith("x-marginalia/"))
|
||||
// This is a metadata entry, typically domain or redirect information
|
||||
// let's keep those to not confuse the consumer of the data, which might
|
||||
// expect at least the domain summary
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
) {
|
||||
while (reader.hasRemaining()) {
|
||||
writer.write(reader.get());
|
||||
}
|
||||
|
||||
SlopTablePacker.packToSlopZip(tempDir, tempFile);
|
||||
}
|
||||
finally {
|
||||
FileUtils.deleteDirectory(tempDir.toFile());
|
||||
}
|
||||
|
||||
|
||||
return tempFile;
|
||||
}
|
||||
|
||||
private void addFileToTar(TarArchiveOutputStream outputStream, Path file, String fileName) throws IOException {
|
||||
var entry = outputStream.createArchiveEntry(file.toFile(), fileName);
|
||||
entry.setSize(Files.size(file));
|
||||
|
@@ -92,7 +92,7 @@ public class ExportTasksMain extends ProcessMainClass {
|
||||
termFrequencyExporter.export(request.crawlId, request.destId);
|
||||
break;
|
||||
case SAMPLE_DATA:
|
||||
sampleDataExporter.export(request.crawlId, request.destId, request.size, request.name);
|
||||
sampleDataExporter.export(request.crawlId, request.destId, request.size, request.ctFilter, request.name);
|
||||
break;
|
||||
case ADJACENCIES:
|
||||
websiteAdjacenciesCalculator.export();
|
||||
|
@@ -16,6 +16,7 @@ public class ExportTaskRequest {
|
||||
public FileStorageId destId;
|
||||
public int size;
|
||||
public String name;
|
||||
public String ctFilter;
|
||||
|
||||
public ExportTaskRequest(Task task) {
|
||||
this.task = task;
|
||||
@@ -42,12 +43,13 @@ public class ExportTaskRequest {
|
||||
return request;
|
||||
}
|
||||
|
||||
public static ExportTaskRequest sampleData(FileStorageId crawlId, FileStorageId destId, int size, String name) {
|
||||
public static ExportTaskRequest sampleData(FileStorageId crawlId, FileStorageId destId, String ctFilter, int size, String name) {
|
||||
ExportTaskRequest request = new ExportTaskRequest(Task.SAMPLE_DATA);
|
||||
request.crawlId = crawlId;
|
||||
request.destId = destId;
|
||||
request.size = size;
|
||||
request.name = name;
|
||||
request.ctFilter = ctFilter;
|
||||
return request;
|
||||
}
|
||||
|
||||
|
@@ -321,9 +321,10 @@ public class ControlNodeActionsService {
|
||||
private Object exportSampleData(Request req, Response rsp) {
|
||||
FileStorageId source = parseSourceFileStorageId(req.queryParams("source"));
|
||||
int size = Integer.parseInt(req.queryParams("size"));
|
||||
String ctFilter = req.queryParams("ctFilter");
|
||||
String name = req.queryParams("name");
|
||||
|
||||
exportClient.exportSampleData(Integer.parseInt(req.params("id")), source, size, name);
|
||||
exportClient.exportSampleData(Integer.parseInt(req.params("id")), source, size, ctFilter, name);
|
||||
|
||||
return "";
|
||||
}
|
||||
|
@@ -35,6 +35,11 @@
|
||||
<div><input type="text" name="size" id="size" pattern="\d+" /></div>
|
||||
<small class="text-muted">How many domains to include in the sample set</small>
|
||||
</div>
|
||||
<div class="mb-3">
|
||||
<label for="ctFilter">Content Type Filter</label>
|
||||
<div><input type="text" name="ctFilter" id="ctFilter" /></div>
|
||||
<small class="text-muted">If set, includes only documents with the specified content type value</small>
|
||||
</div>
|
||||
<div class="mb-3">
|
||||
<label for="name">Name</label>
|
||||
<div><input type="text" name="name" id="name" /></div>
|
||||
|
Reference in New Issue
Block a user