1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

3 Commits

8 changed files with 62 additions and 30 deletions

View File

@@ -12,6 +12,7 @@ import nu.marginalia.converting.sideload.SideloadSourceFactory;
import nu.marginalia.converting.writer.ConverterBatchWritableIf; import nu.marginalia.converting.writer.ConverterBatchWritableIf;
import nu.marginalia.converting.writer.ConverterBatchWriter; import nu.marginalia.converting.writer.ConverterBatchWriter;
import nu.marginalia.converting.writer.ConverterWriter; import nu.marginalia.converting.writer.ConverterWriter;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqapi.converting.ConvertRequest; import nu.marginalia.mqapi.converting.ConvertRequest;
import nu.marginalia.process.ProcessConfiguration; import nu.marginalia.process.ProcessConfiguration;
@@ -49,6 +50,7 @@ public class ConverterMain extends ProcessMainClass {
private final ProcessHeartbeat heartbeat; private final ProcessHeartbeat heartbeat;
private final FileStorageService fileStorageService; private final FileStorageService fileStorageService;
private final SideloadSourceFactory sideloadSourceFactory; private final SideloadSourceFactory sideloadSourceFactory;
private static final int SIDELOAD_THRESHOLD = Integer.getInteger("converter.sideloadThreshold", 10_000);
public static void main(String... args) throws Exception { public static void main(String... args) throws Exception {
@@ -199,12 +201,19 @@ public class ConverterMain extends ProcessMainClass {
processedDomains.set(batchingWorkLog.size()); processedDomains.set(batchingWorkLog.size());
heartbeat.setProgress(processedDomains.get() / (double) totalDomains); heartbeat.setProgress(processedDomains.get() / (double) totalDomains);
for (var domain : WorkLog.iterableMap(crawlDir.getLogFile(), logger.info("Processing small items");
// First process the small items
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog))) new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
{ {
if (CrawledDomainReader.sizeHint(dataPath) >= SIDELOAD_THRESHOLD) {
continue;
}
pool.submit(() -> { pool.submit(() -> {
try { try (var dataStream = CrawledDomainReader.createDataStream(dataPath)) {
ConverterBatchWritableIf writable = processor.createWritable(domain); ConverterBatchWritableIf writable = processor.fullProcessing(dataStream) ;
converterWriter.accept(writable); converterWriter.accept(writable);
} }
catch (Exception ex) { catch (Exception ex) {
@@ -223,6 +232,31 @@ public class ConverterMain extends ProcessMainClass {
do { do {
System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining"); System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining");
} while (!pool.awaitTermination(60, TimeUnit.SECONDS)); } while (!pool.awaitTermination(60, TimeUnit.SECONDS));
logger.info("Processing large items");
// Next the big items domain-by-domain
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
{
int sizeHint = CrawledDomainReader.sizeHint(dataPath);
if (sizeHint < SIDELOAD_THRESHOLD) {
continue;
}
try (var dataStream = CrawledDomainReader.createDataStream(dataPath)) {
ConverterBatchWritableIf writable = processor.simpleProcessing(dataStream, sizeHint);
converterWriter.accept(writable);
}
catch (Exception ex) {
logger.info("Error in processing", ex);
}
finally {
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
}
}
logger.info("Processing complete");
} }
} }

View File

@@ -14,7 +14,6 @@ import nu.marginalia.converting.writer.ConverterBatchWritableIf;
import nu.marginalia.converting.writer.ConverterBatchWriter; import nu.marginalia.converting.writer.ConverterBatchWriter;
import nu.marginalia.geoip.GeoIpDictionary; import nu.marginalia.geoip.GeoIpDictionary;
import nu.marginalia.geoip.sources.AsnTable; import nu.marginalia.geoip.sources.AsnTable;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.crawl.DomainIndexingState; import nu.marginalia.model.crawl.DomainIndexingState;
@@ -28,13 +27,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.*; import java.util.*;
import java.util.regex.Pattern; import java.util.regex.Pattern;
public class DomainProcessor { public class DomainProcessor {
private static final int SIDELOAD_THRESHOLD = Integer.getInteger("converter.sideloadThreshold", 10_000);
private final DocumentProcessor documentProcessor; private final DocumentProcessor documentProcessor;
private final SiteWords siteWords; private final SiteWords siteWords;
private final AnchorTagsSource anchorTagsSource; private final AnchorTagsSource anchorTagsSource;
@@ -56,21 +53,6 @@ public class DomainProcessor {
geoIpDictionary.waitReady(); geoIpDictionary.waitReady();
} }
public ConverterBatchWritableIf createWritable(Path path) throws IOException {
var dataStream = CrawledDomainReader.createDataStream(path);
final int sizeHint = dataStream.sizeHint();
if (sizeHint > SIDELOAD_THRESHOLD) {
// If the file is too big, we run a processing mode that doesn't
// require loading the entire dataset into RAM
return simpleProcessing(dataStream, sizeHint);
}
return fullProcessing(dataStream);
}
public SimpleProcessing simpleProcessing(SerializableCrawlDataStream dataStream, int sizeHint, Collection<String> extraKeywords) { public SimpleProcessing simpleProcessing(SerializableCrawlDataStream dataStream, int sizeHint, Collection<String> extraKeywords) {
try { try {
return new SimpleProcessing(dataStream, sizeHint, extraKeywords); return new SimpleProcessing(dataStream, sizeHint, extraKeywords);
@@ -159,6 +141,7 @@ public class DomainProcessor {
private final Set<String> processedUrls = new HashSet<>(); private final Set<String> processedUrls = new HashSet<>();
private final DomainLinks externalDomainLinks; private final DomainLinks externalDomainLinks;
private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator(); private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator();
private static final ProcessingIterator.Factory iteratorFactory = ProcessingIterator.factory(8, private static final ProcessingIterator.Factory iteratorFactory = ProcessingIterator.factory(8,
Integer.getInteger("java.util.concurrent.ForkJoinPool.common.parallelism", Runtime.getRuntime().availableProcessors()) Integer.getInteger("java.util.concurrent.ForkJoinPool.common.parallelism", Runtime.getRuntime().availableProcessors())
); );
@@ -195,8 +178,6 @@ public class DomainProcessor {
public Iterator<ProcessedDocument> getDocumentsStream() { public Iterator<ProcessedDocument> getDocumentsStream() {
return iteratorFactory.create((taskConsumer) -> { return iteratorFactory.create((taskConsumer) -> {
logger.info("Simple Processing: {}", domain);
while (dataStream.hasNext()) while (dataStream.hasNext())
{ {
if (!(dataStream.next() instanceof CrawledDocument doc)) if (!(dataStream.next() instanceof CrawledDocument doc))
@@ -221,8 +202,6 @@ public class DomainProcessor {
return processedDoc; return processedDoc;
}); });
} }
logger.info("Finished Simple Processing: {}", domain);
}); });
} }

View File

@@ -38,4 +38,16 @@ public class CrawledDomainReader {
return SerializableCrawlDataStream.empty(); return SerializableCrawlDataStream.empty();
} }
public static int sizeHint(Path fullPath) {
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".parquet")) {
return ParquetSerializableCrawlDataStream.sizeHint(fullPath);
}
else if (fileName.endsWith(".slop.zip")) {
return SlopSerializableCrawlDataStream.sizeHint(fullPath);
}
else {
return 0;
}
}
} }

View File

@@ -34,6 +34,8 @@ public interface SerializableCrawlDataStream extends AutoCloseable {
@Nullable @Nullable
default Path path() { return null; } default Path path() { return null; }
void close() throws IOException;
default <T> Iterator<T> map(Function<SerializableCrawlData, Optional<T>> mapper) { default <T> Iterator<T> map(Function<SerializableCrawlData, Optional<T>> mapper) {
return new Iterator<>() { return new Iterator<>() {
T next = null; T next = null;

View File

@@ -40,7 +40,7 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial
return path; return path;
} }
public int sizeHint() { public static int sizeHint(Path path) {
// Only calculate size hint for large files // Only calculate size hint for large files
// (the reason we calculate them in the first place is to assess whether it is large // (the reason we calculate them in the first place is to assess whether it is large
// because it has many documents, or because it is a small number of large documents) // because it has many documents, or because it is a small number of large documents)

View File

@@ -52,7 +52,7 @@ public class SlopSerializableCrawlDataStream implements AutoCloseable, Serializa
return path; return path;
} }
public int sizeHint() { public static int sizeHint(Path path) {
// Only calculate size hint for large files // Only calculate size hint for large files
// (the reason we calculate them in the first place is to assess whether it is large // (the reason we calculate them in the first place is to assess whether it is large
// because it has many documents, or because it is a small number of large documents) // because it has many documents, or because it is a small number of large documents)

View File

@@ -59,9 +59,14 @@ public final class CrawledDocument implements SerializableCrawlData {
} }
public Document parseBody() throws IOException { public Document parseBody() throws IOException {
// Prevent stalls from parsing excessively large documents
byte[] bytes = documentBodyBytes.length > 200_000
? Arrays.copyOf(documentBodyBytes, 200_000) : documentBodyBytes;
return DocumentBodyToString.getParsedData( return DocumentBodyToString.getParsedData(
ContentType.parse(contentType), ContentType.parse(contentType),
documentBodyBytes, bytes,
url); url);
} }

View File

@@ -228,7 +228,7 @@ public class LiveCrawlDataSet implements AutoCloseable {
} }
@Override @Override
public boolean hasNext() throws IOException { public boolean hasNext() {
if (dataStack == null) { if (dataStack == null) {
query(); query();
} }
@@ -236,7 +236,7 @@ public class LiveCrawlDataSet implements AutoCloseable {
} }
@Override @Override
public void close() throws Exception { public void close() {
dataStack.clear(); dataStack.clear();
} }
} }