mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 07:32:38 +02:00
Compare commits
4 Commits
deploy-007
...
deploy-007
Author | SHA1 | Date | |
---|---|---|---|
|
db138b2a6f | ||
|
1673fc284c | ||
|
503ea57d5b | ||
|
18ca926c7f |
@@ -155,8 +155,15 @@ public class SentenceExtractor {
|
|||||||
public List<DocumentSentence> extractSentencesFromString(String text, EnumSet<HtmlTag> htmlTags) {
|
public List<DocumentSentence> extractSentencesFromString(String text, EnumSet<HtmlTag> htmlTags) {
|
||||||
String[] sentences;
|
String[] sentences;
|
||||||
|
|
||||||
// Normalize spaces
|
// Safety net against malformed data DOS attacks,
|
||||||
|
// found 5+ MB <p>-tags in the wild that just break
|
||||||
|
// the sentence extractor causing it to stall forever.
|
||||||
|
if (text.length() > 50_000) {
|
||||||
|
// 50k chars can hold a small novel, let alone single html tags
|
||||||
|
text = text.substring(0, 50_000);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Normalize spaces
|
||||||
text = normalizeSpaces(text);
|
text = normalizeSpaces(text);
|
||||||
|
|
||||||
// Split into sentences
|
// Split into sentences
|
||||||
|
@@ -12,6 +12,7 @@ import nu.marginalia.converting.sideload.SideloadSourceFactory;
|
|||||||
import nu.marginalia.converting.writer.ConverterBatchWritableIf;
|
import nu.marginalia.converting.writer.ConverterBatchWritableIf;
|
||||||
import nu.marginalia.converting.writer.ConverterBatchWriter;
|
import nu.marginalia.converting.writer.ConverterBatchWriter;
|
||||||
import nu.marginalia.converting.writer.ConverterWriter;
|
import nu.marginalia.converting.writer.ConverterWriter;
|
||||||
|
import nu.marginalia.io.CrawledDomainReader;
|
||||||
import nu.marginalia.mq.MessageQueueFactory;
|
import nu.marginalia.mq.MessageQueueFactory;
|
||||||
import nu.marginalia.mqapi.converting.ConvertRequest;
|
import nu.marginalia.mqapi.converting.ConvertRequest;
|
||||||
import nu.marginalia.process.ProcessConfiguration;
|
import nu.marginalia.process.ProcessConfiguration;
|
||||||
@@ -49,6 +50,7 @@ public class ConverterMain extends ProcessMainClass {
|
|||||||
private final ProcessHeartbeat heartbeat;
|
private final ProcessHeartbeat heartbeat;
|
||||||
private final FileStorageService fileStorageService;
|
private final FileStorageService fileStorageService;
|
||||||
private final SideloadSourceFactory sideloadSourceFactory;
|
private final SideloadSourceFactory sideloadSourceFactory;
|
||||||
|
private static final int SIDELOAD_THRESHOLD = Integer.getInteger("converter.sideloadThreshold", 10_000);
|
||||||
|
|
||||||
public static void main(String... args) throws Exception {
|
public static void main(String... args) throws Exception {
|
||||||
|
|
||||||
@@ -199,12 +201,19 @@ public class ConverterMain extends ProcessMainClass {
|
|||||||
processedDomains.set(batchingWorkLog.size());
|
processedDomains.set(batchingWorkLog.size());
|
||||||
heartbeat.setProgress(processedDomains.get() / (double) totalDomains);
|
heartbeat.setProgress(processedDomains.get() / (double) totalDomains);
|
||||||
|
|
||||||
for (var domain : WorkLog.iterableMap(crawlDir.getLogFile(),
|
logger.info("Processing small items");
|
||||||
|
|
||||||
|
// First process the small items
|
||||||
|
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
|
||||||
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
|
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
|
||||||
{
|
{
|
||||||
|
if (CrawledDomainReader.sizeHint(dataPath) >= SIDELOAD_THRESHOLD) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
pool.submit(() -> {
|
pool.submit(() -> {
|
||||||
try {
|
try (var dataStream = CrawledDomainReader.createDataStream(dataPath)) {
|
||||||
ConverterBatchWritableIf writable = processor.createWritable(domain);
|
ConverterBatchWritableIf writable = processor.fullProcessing(dataStream) ;
|
||||||
converterWriter.accept(writable);
|
converterWriter.accept(writable);
|
||||||
}
|
}
|
||||||
catch (Exception ex) {
|
catch (Exception ex) {
|
||||||
@@ -223,6 +232,31 @@ public class ConverterMain extends ProcessMainClass {
|
|||||||
do {
|
do {
|
||||||
System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining");
|
System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining");
|
||||||
} while (!pool.awaitTermination(60, TimeUnit.SECONDS));
|
} while (!pool.awaitTermination(60, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
logger.info("Processing large items");
|
||||||
|
|
||||||
|
// Next the big items domain-by-domain
|
||||||
|
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
|
||||||
|
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
|
||||||
|
{
|
||||||
|
int sizeHint = CrawledDomainReader.sizeHint(dataPath);
|
||||||
|
if (sizeHint < SIDELOAD_THRESHOLD) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
try (var dataStream = CrawledDomainReader.createDataStream(dataPath)) {
|
||||||
|
ConverterBatchWritableIf writable = processor.simpleProcessing(dataStream, sizeHint);
|
||||||
|
converterWriter.accept(writable);
|
||||||
|
}
|
||||||
|
catch (Exception ex) {
|
||||||
|
logger.info("Error in processing", ex);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("Processing complete");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -14,7 +14,6 @@ import nu.marginalia.converting.writer.ConverterBatchWritableIf;
|
|||||||
import nu.marginalia.converting.writer.ConverterBatchWriter;
|
import nu.marginalia.converting.writer.ConverterBatchWriter;
|
||||||
import nu.marginalia.geoip.GeoIpDictionary;
|
import nu.marginalia.geoip.GeoIpDictionary;
|
||||||
import nu.marginalia.geoip.sources.AsnTable;
|
import nu.marginalia.geoip.sources.AsnTable;
|
||||||
import nu.marginalia.io.CrawledDomainReader;
|
|
||||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||||
import nu.marginalia.model.EdgeDomain;
|
import nu.marginalia.model.EdgeDomain;
|
||||||
import nu.marginalia.model.crawl.DomainIndexingState;
|
import nu.marginalia.model.crawl.DomainIndexingState;
|
||||||
@@ -28,13 +27,11 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
public class DomainProcessor {
|
public class DomainProcessor {
|
||||||
private static final int SIDELOAD_THRESHOLD = Integer.getInteger("converter.sideloadThreshold", 10_000);
|
|
||||||
private final DocumentProcessor documentProcessor;
|
private final DocumentProcessor documentProcessor;
|
||||||
private final SiteWords siteWords;
|
private final SiteWords siteWords;
|
||||||
private final AnchorTagsSource anchorTagsSource;
|
private final AnchorTagsSource anchorTagsSource;
|
||||||
@@ -56,21 +53,6 @@ public class DomainProcessor {
|
|||||||
geoIpDictionary.waitReady();
|
geoIpDictionary.waitReady();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ConverterBatchWritableIf createWritable(Path path) throws IOException {
|
|
||||||
|
|
||||||
var dataStream = CrawledDomainReader.createDataStream(path);
|
|
||||||
|
|
||||||
final int sizeHint = dataStream.sizeHint();
|
|
||||||
|
|
||||||
if (sizeHint > SIDELOAD_THRESHOLD) {
|
|
||||||
// If the file is too big, we run a processing mode that doesn't
|
|
||||||
// require loading the entire dataset into RAM
|
|
||||||
return simpleProcessing(dataStream, sizeHint);
|
|
||||||
}
|
|
||||||
|
|
||||||
return fullProcessing(dataStream);
|
|
||||||
}
|
|
||||||
|
|
||||||
public SimpleProcessing simpleProcessing(SerializableCrawlDataStream dataStream, int sizeHint, Collection<String> extraKeywords) {
|
public SimpleProcessing simpleProcessing(SerializableCrawlDataStream dataStream, int sizeHint, Collection<String> extraKeywords) {
|
||||||
try {
|
try {
|
||||||
return new SimpleProcessing(dataStream, sizeHint, extraKeywords);
|
return new SimpleProcessing(dataStream, sizeHint, extraKeywords);
|
||||||
@@ -159,6 +141,7 @@ public class DomainProcessor {
|
|||||||
private final Set<String> processedUrls = new HashSet<>();
|
private final Set<String> processedUrls = new HashSet<>();
|
||||||
private final DomainLinks externalDomainLinks;
|
private final DomainLinks externalDomainLinks;
|
||||||
private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator();
|
private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator();
|
||||||
|
|
||||||
private static final ProcessingIterator.Factory iteratorFactory = ProcessingIterator.factory(8,
|
private static final ProcessingIterator.Factory iteratorFactory = ProcessingIterator.factory(8,
|
||||||
Integer.getInteger("java.util.concurrent.ForkJoinPool.common.parallelism", Runtime.getRuntime().availableProcessors())
|
Integer.getInteger("java.util.concurrent.ForkJoinPool.common.parallelism", Runtime.getRuntime().availableProcessors())
|
||||||
);
|
);
|
||||||
@@ -195,8 +178,6 @@ public class DomainProcessor {
|
|||||||
public Iterator<ProcessedDocument> getDocumentsStream() {
|
public Iterator<ProcessedDocument> getDocumentsStream() {
|
||||||
return iteratorFactory.create((taskConsumer) -> {
|
return iteratorFactory.create((taskConsumer) -> {
|
||||||
|
|
||||||
logger.info("Simple Processing: {}", domain);
|
|
||||||
|
|
||||||
while (dataStream.hasNext())
|
while (dataStream.hasNext())
|
||||||
{
|
{
|
||||||
if (!(dataStream.next() instanceof CrawledDocument doc))
|
if (!(dataStream.next() instanceof CrawledDocument doc))
|
||||||
@@ -221,8 +202,6 @@ public class DomainProcessor {
|
|||||||
return processedDoc;
|
return processedDoc;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info("Finished Simple Processing: {}", domain);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -38,4 +38,16 @@ public class CrawledDomainReader {
|
|||||||
return SerializableCrawlDataStream.empty();
|
return SerializableCrawlDataStream.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static int sizeHint(Path fullPath) {
|
||||||
|
String fileName = fullPath.getFileName().toString();
|
||||||
|
if (fileName.endsWith(".parquet")) {
|
||||||
|
return ParquetSerializableCrawlDataStream.sizeHint(fullPath);
|
||||||
|
}
|
||||||
|
else if (fileName.endsWith(".slop.zip")) {
|
||||||
|
return SlopSerializableCrawlDataStream.sizeHint(fullPath);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -34,6 +34,8 @@ public interface SerializableCrawlDataStream extends AutoCloseable {
|
|||||||
@Nullable
|
@Nullable
|
||||||
default Path path() { return null; }
|
default Path path() { return null; }
|
||||||
|
|
||||||
|
void close() throws IOException;
|
||||||
|
|
||||||
default <T> Iterator<T> map(Function<SerializableCrawlData, Optional<T>> mapper) {
|
default <T> Iterator<T> map(Function<SerializableCrawlData, Optional<T>> mapper) {
|
||||||
return new Iterator<>() {
|
return new Iterator<>() {
|
||||||
T next = null;
|
T next = null;
|
||||||
|
@@ -40,7 +40,7 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial
|
|||||||
return path;
|
return path;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int sizeHint() {
|
public static int sizeHint(Path path) {
|
||||||
// Only calculate size hint for large files
|
// Only calculate size hint for large files
|
||||||
// (the reason we calculate them in the first place is to assess whether it is large
|
// (the reason we calculate them in the first place is to assess whether it is large
|
||||||
// because it has many documents, or because it is a small number of large documents)
|
// because it has many documents, or because it is a small number of large documents)
|
||||||
|
@@ -52,7 +52,7 @@ public class SlopSerializableCrawlDataStream implements AutoCloseable, Serializa
|
|||||||
return path;
|
return path;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int sizeHint() {
|
public static int sizeHint(Path path) {
|
||||||
// Only calculate size hint for large files
|
// Only calculate size hint for large files
|
||||||
// (the reason we calculate them in the first place is to assess whether it is large
|
// (the reason we calculate them in the first place is to assess whether it is large
|
||||||
// because it has many documents, or because it is a small number of large documents)
|
// because it has many documents, or because it is a small number of large documents)
|
||||||
|
@@ -59,9 +59,14 @@ public final class CrawledDocument implements SerializableCrawlData {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Document parseBody() throws IOException {
|
public Document parseBody() throws IOException {
|
||||||
|
// Prevent stalls from parsing excessively large documents
|
||||||
|
|
||||||
|
byte[] bytes = documentBodyBytes.length > 200_000
|
||||||
|
? Arrays.copyOf(documentBodyBytes, 200_000) : documentBodyBytes;
|
||||||
|
|
||||||
return DocumentBodyToString.getParsedData(
|
return DocumentBodyToString.getParsedData(
|
||||||
ContentType.parse(contentType),
|
ContentType.parse(contentType),
|
||||||
documentBodyBytes,
|
bytes,
|
||||||
url);
|
url);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -228,7 +228,7 @@ public class LiveCrawlDataSet implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean hasNext() throws IOException {
|
public boolean hasNext() {
|
||||||
if (dataStack == null) {
|
if (dataStack == null) {
|
||||||
query();
|
query();
|
||||||
}
|
}
|
||||||
@@ -236,7 +236,7 @@ public class LiveCrawlDataSet implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws Exception {
|
public void close() {
|
||||||
dataStack.clear();
|
dataStack.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user