mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 17:32:39 +02:00
Compare commits
2 Commits
deploy-007
...
deploy-007
Author | SHA1 | Date | |
---|---|---|---|
|
503ea57d5b | ||
|
18ca926c7f |
@@ -155,8 +155,15 @@ public class SentenceExtractor {
|
||||
public List<DocumentSentence> extractSentencesFromString(String text, EnumSet<HtmlTag> htmlTags) {
|
||||
String[] sentences;
|
||||
|
||||
// Normalize spaces
|
||||
// Safety net against malformed data DOS attacks,
|
||||
// found 5+ MB <p>-tags in the wild that just break
|
||||
// the sentence extractor causing it to stall forever.
|
||||
if (text.length() > 50_000) {
|
||||
// 50k chars can hold a small novel, let alone single html tags
|
||||
text = text.substring(0, 50_000);
|
||||
}
|
||||
|
||||
// Normalize spaces
|
||||
text = normalizeSpaces(text);
|
||||
|
||||
// Split into sentences
|
||||
|
@@ -12,6 +12,7 @@ import nu.marginalia.converting.sideload.SideloadSourceFactory;
|
||||
import nu.marginalia.converting.writer.ConverterBatchWritableIf;
|
||||
import nu.marginalia.converting.writer.ConverterBatchWriter;
|
||||
import nu.marginalia.converting.writer.ConverterWriter;
|
||||
import nu.marginalia.io.CrawledDomainReader;
|
||||
import nu.marginalia.mq.MessageQueueFactory;
|
||||
import nu.marginalia.mqapi.converting.ConvertRequest;
|
||||
import nu.marginalia.process.ProcessConfiguration;
|
||||
@@ -49,6 +50,7 @@ public class ConverterMain extends ProcessMainClass {
|
||||
private final ProcessHeartbeat heartbeat;
|
||||
private final FileStorageService fileStorageService;
|
||||
private final SideloadSourceFactory sideloadSourceFactory;
|
||||
private static final int SIDELOAD_THRESHOLD = Integer.getInteger("converter.sideloadThreshold", 10_000);
|
||||
|
||||
public static void main(String... args) throws Exception {
|
||||
|
||||
@@ -199,12 +201,19 @@ public class ConverterMain extends ProcessMainClass {
|
||||
processedDomains.set(batchingWorkLog.size());
|
||||
heartbeat.setProgress(processedDomains.get() / (double) totalDomains);
|
||||
|
||||
for (var domain : WorkLog.iterableMap(crawlDir.getLogFile(),
|
||||
logger.info("Processing small items");
|
||||
|
||||
// First process the small items
|
||||
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
|
||||
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
|
||||
{
|
||||
if (CrawledDomainReader.sizeHint(dataPath) >= SIDELOAD_THRESHOLD) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pool.submit(() -> {
|
||||
try {
|
||||
ConverterBatchWritableIf writable = processor.createWritable(domain);
|
||||
try (var dataStream = CrawledDomainReader.createDataStream(dataPath)) {
|
||||
ConverterBatchWritableIf writable = processor.fullProcessing(dataStream) ;
|
||||
converterWriter.accept(writable);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
@@ -223,6 +232,31 @@ public class ConverterMain extends ProcessMainClass {
|
||||
do {
|
||||
System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining");
|
||||
} while (!pool.awaitTermination(60, TimeUnit.SECONDS));
|
||||
|
||||
logger.info("Processing large items");
|
||||
|
||||
// Next the big items domain-by-domain
|
||||
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
|
||||
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
|
||||
{
|
||||
int sizeHint = CrawledDomainReader.sizeHint(dataPath);
|
||||
if (sizeHint < SIDELOAD_THRESHOLD) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try (var dataStream = CrawledDomainReader.createDataStream(dataPath)) {
|
||||
ConverterBatchWritableIf writable = processor.simpleProcessing(dataStream, sizeHint);
|
||||
converterWriter.accept(writable);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.info("Error in processing", ex);
|
||||
}
|
||||
finally {
|
||||
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("Processing complete");
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -14,7 +14,6 @@ import nu.marginalia.converting.writer.ConverterBatchWritableIf;
|
||||
import nu.marginalia.converting.writer.ConverterBatchWriter;
|
||||
import nu.marginalia.geoip.GeoIpDictionary;
|
||||
import nu.marginalia.geoip.sources.AsnTable;
|
||||
import nu.marginalia.io.CrawledDomainReader;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.model.crawl.DomainIndexingState;
|
||||
@@ -28,13 +27,11 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.sql.SQLException;
|
||||
import java.util.*;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class DomainProcessor {
|
||||
private static final int SIDELOAD_THRESHOLD = Integer.getInteger("converter.sideloadThreshold", 10_000);
|
||||
private final DocumentProcessor documentProcessor;
|
||||
private final SiteWords siteWords;
|
||||
private final AnchorTagsSource anchorTagsSource;
|
||||
@@ -56,21 +53,6 @@ public class DomainProcessor {
|
||||
geoIpDictionary.waitReady();
|
||||
}
|
||||
|
||||
public ConverterBatchWritableIf createWritable(Path path) throws IOException {
|
||||
|
||||
var dataStream = CrawledDomainReader.createDataStream(path);
|
||||
|
||||
final int sizeHint = dataStream.sizeHint();
|
||||
|
||||
if (sizeHint > SIDELOAD_THRESHOLD) {
|
||||
// If the file is too big, we run a processing mode that doesn't
|
||||
// require loading the entire dataset into RAM
|
||||
return simpleProcessing(dataStream, sizeHint);
|
||||
}
|
||||
|
||||
return fullProcessing(dataStream);
|
||||
}
|
||||
|
||||
public SimpleProcessing simpleProcessing(SerializableCrawlDataStream dataStream, int sizeHint, Collection<String> extraKeywords) {
|
||||
try {
|
||||
return new SimpleProcessing(dataStream, sizeHint, extraKeywords);
|
||||
@@ -159,6 +141,7 @@ public class DomainProcessor {
|
||||
private final Set<String> processedUrls = new HashSet<>();
|
||||
private final DomainLinks externalDomainLinks;
|
||||
private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator();
|
||||
|
||||
private static final ProcessingIterator.Factory iteratorFactory = ProcessingIterator.factory(8,
|
||||
Integer.getInteger("java.util.concurrent.ForkJoinPool.common.parallelism", Runtime.getRuntime().availableProcessors())
|
||||
);
|
||||
@@ -195,8 +178,6 @@ public class DomainProcessor {
|
||||
public Iterator<ProcessedDocument> getDocumentsStream() {
|
||||
return iteratorFactory.create((taskConsumer) -> {
|
||||
|
||||
logger.info("Simple Processing: {}", domain);
|
||||
|
||||
while (dataStream.hasNext())
|
||||
{
|
||||
if (!(dataStream.next() instanceof CrawledDocument doc))
|
||||
@@ -221,8 +202,6 @@ public class DomainProcessor {
|
||||
return processedDoc;
|
||||
});
|
||||
}
|
||||
|
||||
logger.info("Finished Simple Processing: {}", domain);
|
||||
});
|
||||
}
|
||||
|
||||
|
@@ -38,4 +38,16 @@ public class CrawledDomainReader {
|
||||
return SerializableCrawlDataStream.empty();
|
||||
}
|
||||
|
||||
public static int sizeHint(Path fullPath) {
|
||||
String fileName = fullPath.getFileName().toString();
|
||||
if (fileName.endsWith(".parquet")) {
|
||||
return ParquetSerializableCrawlDataStream.sizeHint(fullPath);
|
||||
}
|
||||
else if (fileName.endsWith(".slop.zip")) {
|
||||
return SlopSerializableCrawlDataStream.sizeHint(fullPath);
|
||||
}
|
||||
else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -34,6 +34,8 @@ public interface SerializableCrawlDataStream extends AutoCloseable {
|
||||
@Nullable
|
||||
default Path path() { return null; }
|
||||
|
||||
void close() throws IOException;
|
||||
|
||||
default <T> Iterator<T> map(Function<SerializableCrawlData, Optional<T>> mapper) {
|
||||
return new Iterator<>() {
|
||||
T next = null;
|
||||
|
@@ -40,7 +40,7 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial
|
||||
return path;
|
||||
}
|
||||
|
||||
public int sizeHint() {
|
||||
public static int sizeHint(Path path) {
|
||||
// Only calculate size hint for large files
|
||||
// (the reason we calculate them in the first place is to assess whether it is large
|
||||
// because it has many documents, or because it is a small number of large documents)
|
||||
|
@@ -52,7 +52,7 @@ public class SlopSerializableCrawlDataStream implements AutoCloseable, Serializa
|
||||
return path;
|
||||
}
|
||||
|
||||
public int sizeHint() {
|
||||
public static int sizeHint(Path path) {
|
||||
// Only calculate size hint for large files
|
||||
// (the reason we calculate them in the first place is to assess whether it is large
|
||||
// because it has many documents, or because it is a small number of large documents)
|
||||
|
Reference in New Issue
Block a user