1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

4 Commits

9 changed files with 61 additions and 31 deletions

View File

@@ -12,6 +12,7 @@ import nu.marginalia.converting.sideload.SideloadSourceFactory;
import nu.marginalia.converting.writer.ConverterBatchWritableIf;
import nu.marginalia.converting.writer.ConverterBatchWriter;
import nu.marginalia.converting.writer.ConverterWriter;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqapi.converting.ConvertRequest;
import nu.marginalia.process.ProcessConfiguration;
@@ -49,6 +50,7 @@ public class ConverterMain extends ProcessMainClass {
private final ProcessHeartbeat heartbeat;
private final FileStorageService fileStorageService;
private final SideloadSourceFactory sideloadSourceFactory;
private static final int SIDELOAD_THRESHOLD = Integer.getInteger("converter.sideloadThreshold", 10_000);
public static void main(String... args) throws Exception {
@@ -199,12 +201,19 @@ public class ConverterMain extends ProcessMainClass {
processedDomains.set(batchingWorkLog.size());
heartbeat.setProgress(processedDomains.get() / (double) totalDomains);
for (var domain : WorkLog.iterableMap(crawlDir.getLogFile(),
logger.info("Processing small items");
// First process the small items
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
{
if (CrawledDomainReader.sizeHint(dataPath) >= SIDELOAD_THRESHOLD) {
continue;
}
pool.submit(() -> {
try {
ConverterBatchWritableIf writable = processor.createWritable(domain);
try (var dataStream = CrawledDomainReader.createDataStream(dataPath)) {
ConverterBatchWritableIf writable = processor.fullProcessing(dataStream) ;
converterWriter.accept(writable);
}
catch (Exception ex) {
@@ -223,6 +232,31 @@ public class ConverterMain extends ProcessMainClass {
do {
System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining");
} while (!pool.awaitTermination(60, TimeUnit.SECONDS));
logger.info("Processing large items");
// Next the big items domain-by-domain
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
{
int sizeHint = CrawledDomainReader.sizeHint(dataPath);
if (sizeHint < SIDELOAD_THRESHOLD) {
continue;
}
try (var dataStream = CrawledDomainReader.createDataStream(dataPath)) {
ConverterBatchWritableIf writable = processor.simpleProcessing(dataStream, sizeHint);
converterWriter.accept(writable);
}
catch (Exception ex) {
logger.info("Error in processing", ex);
}
finally {
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
}
}
logger.info("Processing complete");
}
}

View File

@@ -14,7 +14,6 @@ import nu.marginalia.converting.writer.ConverterBatchWritableIf;
import nu.marginalia.converting.writer.ConverterBatchWriter;
import nu.marginalia.geoip.GeoIpDictionary;
import nu.marginalia.geoip.sources.AsnTable;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.crawl.DomainIndexingState;
@@ -28,13 +27,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.*;
import java.util.regex.Pattern;
public class DomainProcessor {
private static final int SIDELOAD_THRESHOLD = Integer.getInteger("converter.sideloadThreshold", 10_000);
private final DocumentProcessor documentProcessor;
private final SiteWords siteWords;
private final AnchorTagsSource anchorTagsSource;
@@ -56,21 +53,6 @@ public class DomainProcessor {
geoIpDictionary.waitReady();
}
public ConverterBatchWritableIf createWritable(Path path) throws IOException {
var dataStream = CrawledDomainReader.createDataStream(path);
final int sizeHint = dataStream.sizeHint();
if (sizeHint > SIDELOAD_THRESHOLD) {
// If the file is too big, we run a processing mode that doesn't
// require loading the entire dataset into RAM
return simpleProcessing(dataStream, sizeHint);
}
return fullProcessing(dataStream);
}
public SimpleProcessing simpleProcessing(SerializableCrawlDataStream dataStream, int sizeHint, Collection<String> extraKeywords) {
try {
return new SimpleProcessing(dataStream, sizeHint, extraKeywords);
@@ -159,6 +141,7 @@ public class DomainProcessor {
private final Set<String> processedUrls = new HashSet<>();
private final DomainLinks externalDomainLinks;
private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator();
private static final ProcessingIterator.Factory iteratorFactory = ProcessingIterator.factory(8,
Integer.getInteger("java.util.concurrent.ForkJoinPool.common.parallelism", Runtime.getRuntime().availableProcessors())
);
@@ -195,8 +178,6 @@ public class DomainProcessor {
public Iterator<ProcessedDocument> getDocumentsStream() {
return iteratorFactory.create((taskConsumer) -> {
logger.info("Simple Processing: {}", domain);
while (dataStream.hasNext())
{
if (!(dataStream.next() instanceof CrawledDocument doc))
@@ -221,8 +202,6 @@ public class DomainProcessor {
return processedDoc;
});
}
logger.info("Finished Simple Processing: {}", domain);
});
}

View File

@@ -26,7 +26,7 @@ public class DocumentBodyToString {
return new String(data, charset);
}
public static Document getParsedData(ContentType type, byte[] data, String url) throws IOException {
public static Document getParsedData(ContentType type, byte[] data, int maxLength, String url) throws IOException {
final Charset charset;
if (type.charset() == null || type.charset().isBlank()) {
@@ -35,7 +35,7 @@ public class DocumentBodyToString {
charset = charsetMap.computeIfAbsent(type, DocumentBodyToString::computeCharset);
}
ByteArrayInputStream bais = new ByteArrayInputStream(data);
ByteArrayInputStream bais = new ByteArrayInputStream(data, 0, Math.min(data.length, maxLength));
return Jsoup.parse(bais, charset.name(), url);
}

View File

@@ -38,4 +38,16 @@ public class CrawledDomainReader {
return SerializableCrawlDataStream.empty();
}
public static int sizeHint(Path fullPath) {
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".parquet")) {
return ParquetSerializableCrawlDataStream.sizeHint(fullPath);
}
else if (fileName.endsWith(".slop.zip")) {
return SlopSerializableCrawlDataStream.sizeHint(fullPath);
}
else {
return 0;
}
}
}

View File

@@ -34,6 +34,8 @@ public interface SerializableCrawlDataStream extends AutoCloseable {
@Nullable
default Path path() { return null; }
void close() throws IOException;
default <T> Iterator<T> map(Function<SerializableCrawlData, Optional<T>> mapper) {
return new Iterator<>() {
T next = null;

View File

@@ -40,7 +40,7 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial
return path;
}
public int sizeHint() {
public static int sizeHint(Path path) {
// Only calculate size hint for large files
// (the reason we calculate them in the first place is to assess whether it is large
// because it has many documents, or because it is a small number of large documents)

View File

@@ -52,7 +52,7 @@ public class SlopSerializableCrawlDataStream implements AutoCloseable, Serializa
return path;
}
public int sizeHint() {
public static int sizeHint(Path path) {
// Only calculate size hint for large files
// (the reason we calculate them in the first place is to assess whether it is large
// because it has many documents, or because it is a small number of large documents)

View File

@@ -59,9 +59,12 @@ public final class CrawledDocument implements SerializableCrawlData {
}
public Document parseBody() throws IOException {
// Prevent stalls from parsing excessively large documents
return DocumentBodyToString.getParsedData(
ContentType.parse(contentType),
documentBodyBytes,
200_000,
url);
}

View File

@@ -228,7 +228,7 @@ public class LiveCrawlDataSet implements AutoCloseable {
}
@Override
public boolean hasNext() throws IOException {
public boolean hasNext() {
if (dataStack == null) {
query();
}
@@ -236,7 +236,7 @@ public class LiveCrawlDataSet implements AutoCloseable {
}
@Override
public void close() throws Exception {
public void close() {
dataStack.clear();
}
}