1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

8 Commits

23 changed files with 239 additions and 183 deletions

View File

@@ -155,8 +155,15 @@ public class SentenceExtractor {
public List<DocumentSentence> extractSentencesFromString(String text, EnumSet<HtmlTag> htmlTags) { public List<DocumentSentence> extractSentencesFromString(String text, EnumSet<HtmlTag> htmlTags) {
String[] sentences; String[] sentences;
// Normalize spaces // Safety net against malformed data DOS attacks,
// found 5+ MB <p>-tags in the wild that just break
// the sentence extractor causing it to stall forever.
if (text.length() > 50_000) {
// 50k chars can hold a small novel, let alone single html tags
text = text.substring(0, 50_000);
}
// Normalize spaces
text = normalizeSpaces(text); text = normalizeSpaces(text);
// Split into sentences // Split into sentences

View File

@@ -12,6 +12,7 @@ import nu.marginalia.converting.sideload.SideloadSourceFactory;
import nu.marginalia.converting.writer.ConverterBatchWritableIf; import nu.marginalia.converting.writer.ConverterBatchWritableIf;
import nu.marginalia.converting.writer.ConverterBatchWriter; import nu.marginalia.converting.writer.ConverterBatchWriter;
import nu.marginalia.converting.writer.ConverterWriter; import nu.marginalia.converting.writer.ConverterWriter;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqapi.converting.ConvertRequest; import nu.marginalia.mqapi.converting.ConvertRequest;
import nu.marginalia.process.ProcessConfiguration; import nu.marginalia.process.ProcessConfiguration;
@@ -49,6 +50,7 @@ public class ConverterMain extends ProcessMainClass {
private final ProcessHeartbeat heartbeat; private final ProcessHeartbeat heartbeat;
private final FileStorageService fileStorageService; private final FileStorageService fileStorageService;
private final SideloadSourceFactory sideloadSourceFactory; private final SideloadSourceFactory sideloadSourceFactory;
private static final int SIDELOAD_THRESHOLD = Integer.getInteger("converter.sideloadThreshold", 10_000);
public static void main(String... args) throws Exception { public static void main(String... args) throws Exception {
@@ -199,12 +201,19 @@ public class ConverterMain extends ProcessMainClass {
processedDomains.set(batchingWorkLog.size()); processedDomains.set(batchingWorkLog.size());
heartbeat.setProgress(processedDomains.get() / (double) totalDomains); heartbeat.setProgress(processedDomains.get() / (double) totalDomains);
for (var domain : WorkLog.iterableMap(crawlDir.getLogFile(), logger.info("Processing small items");
// First process the small items
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog))) new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
{ {
if (SerializableCrawlDataStream.getSizeHint(dataPath) >= SIDELOAD_THRESHOLD) {
continue;
}
pool.submit(() -> { pool.submit(() -> {
try { try (var dataStream = SerializableCrawlDataStream.openDataStream(dataPath)) {
ConverterBatchWritableIf writable = processor.createWritable(domain); ConverterBatchWritableIf writable = processor.fullProcessing(dataStream) ;
converterWriter.accept(writable); converterWriter.accept(writable);
} }
catch (Exception ex) { catch (Exception ex) {
@@ -223,6 +232,37 @@ public class ConverterMain extends ProcessMainClass {
do { do {
System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining"); System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining");
} while (!pool.awaitTermination(60, TimeUnit.SECONDS)); } while (!pool.awaitTermination(60, TimeUnit.SECONDS));
logger.info("Processing large items");
// Next the big items domain-by-domain
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
{
int sizeHint = SerializableCrawlDataStream.getSizeHint(dataPath);
if (sizeHint < SIDELOAD_THRESHOLD) {
continue;
}
try {
// SerializableCrawlDataStream is autocloseable, we can't try-with-resources because then it will be
// closed before it's consumed by the converterWriter. Instead, the converterWriter guarantees it
// will close it after it's consumed.
var stream = SerializableCrawlDataStream.openDataStream(dataPath);
ConverterBatchWritableIf writable = processor.simpleProcessing(stream, sizeHint);
converterWriter.accept(writable);
}
catch (Exception ex) {
logger.info("Error in processing", ex);
}
finally {
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
}
}
logger.info("Processing complete");
} }
} }

View File

@@ -14,7 +14,6 @@ import nu.marginalia.converting.writer.ConverterBatchWritableIf;
import nu.marginalia.converting.writer.ConverterBatchWriter; import nu.marginalia.converting.writer.ConverterBatchWriter;
import nu.marginalia.geoip.GeoIpDictionary; import nu.marginalia.geoip.GeoIpDictionary;
import nu.marginalia.geoip.sources.AsnTable; import nu.marginalia.geoip.sources.AsnTable;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.crawl.DomainIndexingState; import nu.marginalia.model.crawl.DomainIndexingState;
@@ -28,13 +27,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.*; import java.util.*;
import java.util.regex.Pattern; import java.util.regex.Pattern;
public class DomainProcessor { public class DomainProcessor {
private static final int SIDELOAD_THRESHOLD = Integer.getInteger("converter.sideloadThreshold", 10_000);
private final DocumentProcessor documentProcessor; private final DocumentProcessor documentProcessor;
private final SiteWords siteWords; private final SiteWords siteWords;
private final AnchorTagsSource anchorTagsSource; private final AnchorTagsSource anchorTagsSource;
@@ -56,21 +53,6 @@ public class DomainProcessor {
geoIpDictionary.waitReady(); geoIpDictionary.waitReady();
} }
public ConverterBatchWritableIf createWritable(Path path) throws IOException {
var dataStream = CrawledDomainReader.createDataStream(path);
final int sizeHint = dataStream.sizeHint();
if (sizeHint > SIDELOAD_THRESHOLD) {
// If the file is too big, we run a processing mode that doesn't
// require loading the entire dataset into RAM
return simpleProcessing(dataStream, sizeHint);
}
return fullProcessing(dataStream);
}
public SimpleProcessing simpleProcessing(SerializableCrawlDataStream dataStream, int sizeHint, Collection<String> extraKeywords) { public SimpleProcessing simpleProcessing(SerializableCrawlDataStream dataStream, int sizeHint, Collection<String> extraKeywords) {
try { try {
return new SimpleProcessing(dataStream, sizeHint, extraKeywords); return new SimpleProcessing(dataStream, sizeHint, extraKeywords);
@@ -159,6 +141,7 @@ public class DomainProcessor {
private final Set<String> processedUrls = new HashSet<>(); private final Set<String> processedUrls = new HashSet<>();
private final DomainLinks externalDomainLinks; private final DomainLinks externalDomainLinks;
private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator(); private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator();
private static final ProcessingIterator.Factory iteratorFactory = ProcessingIterator.factory(8, private static final ProcessingIterator.Factory iteratorFactory = ProcessingIterator.factory(8,
Integer.getInteger("java.util.concurrent.ForkJoinPool.common.parallelism", Runtime.getRuntime().availableProcessors()) Integer.getInteger("java.util.concurrent.ForkJoinPool.common.parallelism", Runtime.getRuntime().availableProcessors())
); );
@@ -195,8 +178,6 @@ public class DomainProcessor {
public Iterator<ProcessedDocument> getDocumentsStream() { public Iterator<ProcessedDocument> getDocumentsStream() {
return iteratorFactory.create((taskConsumer) -> { return iteratorFactory.create((taskConsumer) -> {
logger.info("Simple Processing: {}", domain);
while (dataStream.hasNext()) while (dataStream.hasNext())
{ {
if (!(dataStream.next() instanceof CrawledDocument doc)) if (!(dataStream.next() instanceof CrawledDocument doc))
@@ -221,8 +202,6 @@ public class DomainProcessor {
return processedDoc; return processedDoc;
}); });
} }
logger.info("Finished Simple Processing: {}", domain);
}); });
} }

View File

@@ -39,6 +39,9 @@ public class ConverterWriter implements AutoCloseable {
workerThread.start(); workerThread.start();
} }
/** Queue and eventually write the domain into the converter journal
* The domain object will be closed after it's processed.
* */
public void accept(@Nullable ConverterBatchWritableIf domain) { public void accept(@Nullable ConverterBatchWritableIf domain) {
if (null == domain) if (null == domain)
return; return;
@@ -72,15 +75,15 @@ public class ConverterWriter implements AutoCloseable {
if (workLog.isItemCommitted(id) || workLog.isItemInCurrentBatch(id)) { if (workLog.isItemCommitted(id) || workLog.isItemInCurrentBatch(id)) {
logger.warn("Skipping already logged item {}", id); logger.warn("Skipping already logged item {}", id);
}
else {
currentWriter.write(data);
workLog.logItem(id);
data.close(); data.close();
continue;
} }
currentWriter.write(data);
workLog.logItem(id);
switcher.tick(); switcher.tick();
data.close();
} }
} }
catch (Exception ex) { catch (Exception ex) {

View File

@@ -26,7 +26,7 @@ public class DocumentBodyToString {
return new String(data, charset); return new String(data, charset);
} }
public static Document getParsedData(ContentType type, byte[] data, String url) throws IOException { public static Document getParsedData(ContentType type, byte[] data, int maxLength, String url) throws IOException {
final Charset charset; final Charset charset;
if (type.charset() == null || type.charset().isBlank()) { if (type.charset() == null || type.charset().isBlank()) {
@@ -35,7 +35,7 @@ public class DocumentBodyToString {
charset = charsetMap.computeIfAbsent(type, DocumentBodyToString::computeCharset); charset = charsetMap.computeIfAbsent(type, DocumentBodyToString::computeCharset);
} }
ByteArrayInputStream bais = new ByteArrayInputStream(data); ByteArrayInputStream bais = new ByteArrayInputStream(data, 0, Math.min(data.length, maxLength));
return Jsoup.parse(bais, charset.name(), url); return Jsoup.parse(bais, charset.name(), url);
} }

View File

@@ -19,7 +19,6 @@ import nu.marginalia.crawl.retreival.DomainProber;
import nu.marginalia.crawl.warc.WarcArchiverFactory; import nu.marginalia.crawl.warc.WarcArchiverFactory;
import nu.marginalia.crawl.warc.WarcArchiverIf; import nu.marginalia.crawl.warc.WarcArchiverIf;
import nu.marginalia.db.DomainBlacklist; import nu.marginalia.db.DomainBlacklist;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.CrawlerOutputFile; import nu.marginalia.io.CrawlerOutputFile;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MessageQueueFactory;
@@ -417,13 +416,13 @@ public class CrawlerMain extends ProcessMainClass {
try { try {
Path slopPath = CrawlerOutputFile.getSlopPath(outputDir, id, domain); Path slopPath = CrawlerOutputFile.getSlopPath(outputDir, id, domain);
if (Files.exists(slopPath)) { if (Files.exists(slopPath)) {
return new CrawlDataReference(CrawledDomainReader.createDataStream(slopPath)); return new CrawlDataReference(slopPath);
} }
Path parquetPath = CrawlerOutputFile.getParquetPath(outputDir, id, domain); Path parquetPath = CrawlerOutputFile.getParquetPath(outputDir, id, domain);
if (Files.exists(parquetPath)) { if (Files.exists(parquetPath)) {
slopPath = migrateParquetData(parquetPath, domain, outputDir); slopPath = migrateParquetData(parquetPath, domain, outputDir);
return new CrawlDataReference(CrawledDomainReader.createDataStream(slopPath)); return new CrawlDataReference(slopPath);
} }
} catch (IOException e) { } catch (IOException e) {

View File

@@ -4,6 +4,7 @@ import nu.marginalia.ContentTypes;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.lsh.EasyLSH; import nu.marginalia.lsh.EasyLSH;
import nu.marginalia.model.crawldata.CrawledDocument; import nu.marginalia.model.crawldata.CrawledDocument;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -11,51 +12,73 @@ import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
/** A reference to a domain that has been crawled before. */ /** A reference to a domain that has been crawled before. */
public class CrawlDataReference implements AutoCloseable { public class CrawlDataReference implements AutoCloseable, Iterable<CrawledDocument> {
private boolean closed = false;
@Nullable
private final Path path;
@Nullable
private SerializableCrawlDataStream data = null;
private final SerializableCrawlDataStream data;
private static final Logger logger = LoggerFactory.getLogger(CrawlDataReference.class); private static final Logger logger = LoggerFactory.getLogger(CrawlDataReference.class);
public CrawlDataReference(SerializableCrawlDataStream data) { public CrawlDataReference(@Nullable Path path) {
this.data = data; this.path = path;
} }
public CrawlDataReference() { public CrawlDataReference() {
this(SerializableCrawlDataStream.empty()); this(null);
} }
/** Delete the associated data from disk, if it exists */ /** Delete the associated data from disk, if it exists */
public void delete() throws IOException { public void delete() throws IOException {
Path filePath = data.path(); if (path != null) {
Files.deleteIfExists(path);
if (filePath != null) {
Files.deleteIfExists(filePath);
} }
} }
/** Get the next document from the crawl data, public @NotNull Iterator<CrawledDocument> iterator() {
* returning null when there are no more documents
* available
*/
@Nullable
public CrawledDocument nextDocument() {
try {
while (data.hasNext()) {
if (data.next() instanceof CrawledDocument doc) {
if (!ContentTypes.isAccepted(doc.contentType))
continue;
return doc; requireStream();
// Guaranteed by requireStream, but helps java
Objects.requireNonNull(data);
return data.map(next -> {
if (next instanceof CrawledDocument doc && ContentTypes.isAccepted(doc.contentType)) {
return Optional.of(doc);
}
else {
return Optional.empty();
}
});
}
/** After calling this method, data is guaranteed to be non-null */
private void requireStream() {
if (closed) {
throw new IllegalStateException("Use after close()");
}
if (data == null) {
try {
if (path != null) {
data = SerializableCrawlDataStream.openDataStream(path);
return;
} }
} }
} catch (Exception ex) {
catch (IOException ex) { logger.error("Failed to open stream", ex);
logger.error("Failed to read next document", ex); }
}
return null; data = SerializableCrawlDataStream.empty();
}
} }
public static boolean isContentBodySame(byte[] one, byte[] other) { public static boolean isContentBodySame(byte[] one, byte[] other) {
@@ -98,7 +121,12 @@ public class CrawlDataReference implements AutoCloseable {
} }
@Override @Override
public void close() throws Exception { public void close() throws IOException {
data.close(); if (!closed) {
if (data != null) {
data.close();
}
closed = true;
}
} }
} }

View File

@@ -89,30 +89,45 @@ public class CrawlerRetreiver implements AutoCloseable {
} }
public int crawlDomain(DomainLinks domainLinks, CrawlDataReference oldCrawlData) { public int crawlDomain(DomainLinks domainLinks, CrawlDataReference oldCrawlData) {
try { try (oldCrawlData) {
// Do an initial domain probe to determine the root URL // Do an initial domain probe to determine the root URL
EdgeUrl rootUrl;
var probeResult = probeRootUrl(); var probeResult = probeRootUrl();
switch (probeResult) {
return switch (probeResult) {
case HttpFetcher.DomainProbeResult.Ok(EdgeUrl probedUrl) -> { case HttpFetcher.DomainProbeResult.Ok(EdgeUrl probedUrl) -> {
rootUrl = probedUrl; // Good track
// Sleep after the initial probe, we don't have access to the robots.txt yet
// so we don't know the crawl delay
TimeUnit.SECONDS.sleep(1);
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(probedUrl.domain, warcRecorder);
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
delayTimer.waitFetchDelay(0); // initial delay after robots.txt
DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(probedUrl, delayTimer);
domainStateDb.save(summaryRecord);
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) {
// If we have reference data, we will always grow the crawl depth a bit
crawlFrontier.increaseDepth(1.5, 2500);
}
oldCrawlData.close(); // proactively close the crawl data reference here to not hold onto expensive resources
yield crawlDomain(probedUrl, robotsRules, delayTimer, domainLinks);
} }
case HttpFetcher.DomainProbeResult.Redirect(EdgeDomain domain1) -> { case HttpFetcher.DomainProbeResult.Redirect(EdgeDomain domain1) -> {
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, "Redirect", domain1.toString())); domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, "Redirect", domain1.toString()));
return 1; yield 1;
} }
case HttpFetcher.DomainProbeResult.Error(CrawlerDomainStatus status, String desc) -> { case HttpFetcher.DomainProbeResult.Error(CrawlerDomainStatus status, String desc) -> {
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, status.toString(), desc)); domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, status.toString(), desc));
return 1; yield 1;
} }
} };
// Sleep after the initial probe, we don't have access to the robots.txt yet
// so we don't know the crawl delay
TimeUnit.SECONDS.sleep(1);
return crawlDomain(oldCrawlData, rootUrl, domainLinks);
} }
catch (Exception ex) { catch (Exception ex) {
logger.error("Error crawling domain {}", domain, ex); logger.error("Error crawling domain {}", domain, ex);
@@ -120,28 +135,15 @@ public class CrawlerRetreiver implements AutoCloseable {
} }
} }
private int crawlDomain(CrawlDataReference oldCrawlData, private int crawlDomain(EdgeUrl rootUrl,
EdgeUrl rootUrl, SimpleRobotRules robotsRules,
DomainLinks domainLinks) throws InterruptedException { CrawlDelayTimer delayTimer,
DomainLinks domainLinks) {
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(rootUrl.domain, warcRecorder);
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
delayTimer.waitFetchDelay(0); // initial delay after robots.txt
DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(rootUrl, delayTimer);
domainStateDb.save(summaryRecord);
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) {
// If we have reference data, we will always grow the crawl depth a bit
crawlFrontier.increaseDepth(1.5, 2500);
}
// Add external links to the crawl frontier // Add external links to the crawl frontier
crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto)); crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto));
// Fetch sitemaps // Fetch sitemaps
for (var sitemap : robotsRules.getSitemaps()) { for (var sitemap : robotsRules.getSitemaps()) {
crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer)); crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer));

View File

@@ -40,18 +40,12 @@ public class CrawlerRevisitor {
int errors = 0; int errors = 0;
int skipped = 0; int skipped = 0;
for (;;) { for (CrawledDocument doc : oldCrawlData) {
if (errors > 20) { if (errors > 20) {
// If we've had too many errors, we'll stop trying to recrawl // If we've had too many errors, we'll stop trying to recrawl
break; break;
} }
CrawledDocument doc = oldCrawlData.nextDocument();
if (doc == null)
break;
// This Shouldn't Happen (TM)
var urlMaybe = EdgeUrl.parse(doc.url); var urlMaybe = EdgeUrl.parse(doc.url);
if (urlMaybe.isEmpty()) if (urlMaybe.isEmpty())
continue; continue;

View File

@@ -1,41 +0,0 @@
package nu.marginalia.io;
import nu.marginalia.io.crawldata.format.ParquetSerializableCrawlDataStream;
import nu.marginalia.io.crawldata.format.SlopSerializableCrawlDataStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
public class CrawledDomainReader {
private static final Logger logger = LoggerFactory.getLogger(CrawledDomainReader.class);
/** An iterator-like access to domain data This must be closed otherwise it will leak off-heap memory! */
public static SerializableCrawlDataStream createDataStream(Path fullPath) throws IOException
{
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".parquet")) {
try {
return new ParquetSerializableCrawlDataStream(fullPath);
} catch (Exception ex) {
logger.error("Error reading domain data from " + fullPath, ex);
return SerializableCrawlDataStream.empty();
}
}
if (fileName.endsWith(".slop.zip")) {
try {
return new SlopSerializableCrawlDataStream(fullPath);
} catch (Exception ex) {
logger.error("Error reading domain data from " + fullPath, ex);
return SerializableCrawlDataStream.empty();
}
}
logger.error("Unknown file type: {}", fullPath);
return SerializableCrawlDataStream.empty();
}
}

View File

@@ -1,5 +1,7 @@
package nu.marginalia.io; package nu.marginalia.io;
import nu.marginalia.io.crawldata.format.ParquetSerializableCrawlDataStream;
import nu.marginalia.io.crawldata.format.SlopSerializableCrawlDataStream;
import nu.marginalia.model.crawldata.CrawledDocument; import nu.marginalia.model.crawldata.CrawledDocument;
import nu.marginalia.model.crawldata.CrawledDomain; import nu.marginalia.model.crawldata.CrawledDomain;
import nu.marginalia.model.crawldata.SerializableCrawlData; import nu.marginalia.model.crawldata.SerializableCrawlData;
@@ -18,7 +20,6 @@ import java.util.function.Function;
/** Closable iterator exceptional over serialized crawl data /** Closable iterator exceptional over serialized crawl data
* The data may appear in any order, and the iterator must be closed. * The data may appear in any order, and the iterator must be closed.
* *
* @see CrawledDomainReader
* */ * */
public interface SerializableCrawlDataStream extends AutoCloseable { public interface SerializableCrawlDataStream extends AutoCloseable {
Logger logger = LoggerFactory.getLogger(SerializableCrawlDataStream.class); Logger logger = LoggerFactory.getLogger(SerializableCrawlDataStream.class);
@@ -27,13 +28,58 @@ public interface SerializableCrawlDataStream extends AutoCloseable {
/** Return a size hint for the stream. 0 is returned if the hint is not available, /** Return a size hint for the stream. 0 is returned if the hint is not available,
* or if the file is seemed too small to bother */ * or if the file is seemed too small to bother */
default int sizeHint() { return 0; } default int getSizeHint() { return 0; }
boolean hasNext() throws IOException; boolean hasNext() throws IOException;
@Nullable @Nullable
default Path path() { return null; } default Path path() { return null; }
void close() throws IOException;
/** An iterator-like access to domain data This must be closed otherwise it will leak off-heap memory! */
static SerializableCrawlDataStream openDataStream(Path fullPath) throws IOException
{
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".parquet")) {
try {
return new ParquetSerializableCrawlDataStream(fullPath);
} catch (Exception ex) {
logger.error("Error reading domain data from " + fullPath, ex);
return SerializableCrawlDataStream.empty();
}
}
if (fileName.endsWith(".slop.zip")) {
try {
return new SlopSerializableCrawlDataStream(fullPath);
} catch (Exception ex) {
logger.error("Error reading domain data from " + fullPath, ex);
return SerializableCrawlDataStream.empty();
}
}
logger.error("Unknown file type: {}", fullPath);
return SerializableCrawlDataStream.empty();
}
/** Get an idication of the size of the stream. This is used to determine whether to
* load the stream into memory or not. 0 is returned if the hint is not available,
* or if the file is seemed too small to bother */
static int getSizeHint(Path fullPath) {
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".parquet")) {
return ParquetSerializableCrawlDataStream.sizeHint(fullPath);
}
else if (fileName.endsWith(".slop.zip")) {
return SlopSerializableCrawlDataStream.sizeHint(fullPath);
}
else {
return 0;
}
}
default <T> Iterator<T> map(Function<SerializableCrawlData, Optional<T>> mapper) { default <T> Iterator<T> map(Function<SerializableCrawlData, Optional<T>> mapper) {
return new Iterator<>() { return new Iterator<>() {
T next = null; T next = null;

View File

@@ -40,7 +40,7 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial
return path; return path;
} }
public int sizeHint() { public static int sizeHint(Path path) {
// Only calculate size hint for large files // Only calculate size hint for large files
// (the reason we calculate them in the first place is to assess whether it is large // (the reason we calculate them in the first place is to assess whether it is large
// because it has many documents, or because it is a small number of large documents) // because it has many documents, or because it is a small number of large documents)

View File

@@ -52,7 +52,7 @@ public class SlopSerializableCrawlDataStream implements AutoCloseable, Serializa
return path; return path;
} }
public int sizeHint() { public static int sizeHint(Path path) {
// Only calculate size hint for large files // Only calculate size hint for large files
// (the reason we calculate them in the first place is to assess whether it is large // (the reason we calculate them in the first place is to assess whether it is large
// because it has many documents, or because it is a small number of large documents) // because it has many documents, or because it is a small number of large documents)

View File

@@ -59,9 +59,12 @@ public final class CrawledDocument implements SerializableCrawlData {
} }
public Document parseBody() throws IOException { public Document parseBody() throws IOException {
// Prevent stalls from parsing excessively large documents
return DocumentBodyToString.getParsedData( return DocumentBodyToString.getParsedData(
ContentType.parse(contentType), ContentType.parse(contentType),
documentBodyBytes, documentBodyBytes,
200_000,
url); url);
} }

View File

@@ -108,15 +108,17 @@ public record SlopCrawlDataRecord(String domain,
public static void convertFromParquet(Path parquetInput, Path slopOutput) throws IOException { public static void convertFromParquet(Path parquetInput, Path slopOutput) throws IOException {
Path tempDir = Files.createTempDirectory(slopOutput.getParent(), "conversion"); Path tempDir = Files.createTempDirectory(slopOutput.getParent(), "conversion");
try (var writer = new Writer(tempDir)) { try (var writer = new Writer(tempDir);
CrawledDocumentParquetRecordFileReader.stream(parquetInput).forEach( var stream = CrawledDocumentParquetRecordFileReader.stream(parquetInput))
parquetRecord -> { {
try { stream.forEach(
writer.write(new SlopCrawlDataRecord(parquetRecord)); parquetRecord -> {
} catch (IOException e) { try {
throw new RuntimeException(e); writer.write(new SlopCrawlDataRecord(parquetRecord));
} } catch (IOException e) {
}); throw new RuntimeException(e);
}
});
} }
catch (IOException ex) { catch (IOException ex) {
FileUtils.deleteDirectory(tempDir.toFile()); FileUtils.deleteDirectory(tempDir.toFile());

View File

@@ -10,7 +10,6 @@ import nu.marginalia.crawl.fetcher.HttpFetcher;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.retreival.*; import nu.marginalia.crawl.retreival.*;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
@@ -227,7 +226,7 @@ class CrawlerRetreiverTest {
convertToParquet(tempFileWarc1, tempFileParquet1); convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) { try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
while (stream.hasNext()) { while (stream.hasNext()) {
if (stream.next() instanceof CrawledDocument doc) { if (stream.next() instanceof CrawledDocument doc) {
data.add(doc); data.add(doc);
@@ -280,7 +279,7 @@ class CrawlerRetreiverTest {
convertToParquet(tempFileWarc1, tempFileParquet1); convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) { try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
while (stream.hasNext()) { while (stream.hasNext()) {
if (stream.next() instanceof CrawledDocument doc) { if (stream.next() instanceof CrawledDocument doc) {
data.add(doc); data.add(doc);
@@ -329,7 +328,7 @@ class CrawlerRetreiverTest {
doCrawl(tempFileWarc1, specs); doCrawl(tempFileWarc1, specs);
convertToParquet(tempFileWarc1, tempFileParquet1); convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) { try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
while (stream.hasNext()) { while (stream.hasNext()) {
if (stream.next() instanceof CrawledDocument doc) { if (stream.next() instanceof CrawledDocument doc) {
data.add(doc); data.add(doc);
@@ -376,7 +375,7 @@ class CrawlerRetreiverTest {
doCrawl(tempFileWarc1, specs); doCrawl(tempFileWarc1, specs);
convertToParquet(tempFileWarc1, tempFileParquet1); convertToParquet(tempFileWarc1, tempFileParquet1);
doCrawlWithReferenceStream(specs, doCrawlWithReferenceStream(specs,
CrawledDomainReader.createDataStream(tempFileParquet1) new CrawlDataReference(tempFileParquet1)
); );
convertToParquet(tempFileWarc2, tempFileParquet2); convertToParquet(tempFileWarc2, tempFileParquet2);
@@ -397,7 +396,7 @@ class CrawlerRetreiverTest {
}); });
} }
try (var ds = CrawledDomainReader.createDataStream(tempFileParquet2)) { try (var ds = SerializableCrawlDataStream.openDataStream(tempFileParquet2)) {
while (ds.hasNext()) { while (ds.hasNext()) {
var doc = ds.next(); var doc = ds.next();
if (doc instanceof CrawledDomain dr) { if (doc instanceof CrawledDomain dr) {
@@ -439,7 +438,7 @@ class CrawlerRetreiverTest {
convertToParquet(tempFileWarc1, tempFileParquet1); convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) { try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
while (stream.hasNext()) { while (stream.hasNext()) {
var doc = stream.next(); var doc = stream.next();
data.computeIfAbsent(doc.getClass(), c -> new ArrayList<>()).add(doc); data.computeIfAbsent(doc.getClass(), c -> new ArrayList<>()).add(doc);
@@ -448,11 +447,9 @@ class CrawlerRetreiverTest {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
var stream = CrawledDomainReader.createDataStream(tempFileParquet1);
System.out.println("---"); System.out.println("---");
doCrawlWithReferenceStream(specs, stream); doCrawlWithReferenceStream(specs, new CrawlDataReference(tempFileParquet1));
var revisitCrawlFrontier = new DomainCrawlFrontier( var revisitCrawlFrontier = new DomainCrawlFrontier(
new EdgeDomain("www.marginalia.nu"), new EdgeDomain("www.marginalia.nu"),
@@ -488,7 +485,7 @@ class CrawlerRetreiverTest {
}); });
} }
try (var ds = CrawledDomainReader.createDataStream(tempFileParquet2)) { try (var ds = SerializableCrawlDataStream.openDataStream(tempFileParquet2)) {
while (ds.hasNext()) { while (ds.hasNext()) {
var doc = ds.next(); var doc = ds.next();
if (doc instanceof CrawledDomain dr) { if (doc instanceof CrawledDomain dr) {
@@ -509,12 +506,11 @@ class CrawlerRetreiverTest {
} }
} }
private void doCrawlWithReferenceStream(CrawlerMain.CrawlSpecRecord specs, SerializableCrawlDataStream stream) { private void doCrawlWithReferenceStream(CrawlerMain.CrawlSpecRecord specs, CrawlDataReference reference) {
try (var recorder = new WarcRecorder(tempFileWarc2, new Cookies()); try (var recorder = new WarcRecorder(tempFileWarc2, new Cookies());
var db = new DomainStateDb(tempFileDb) var db = new DomainStateDb(tempFileDb)
) { ) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, db, recorder).crawlDomain(new DomainLinks(), new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, db, recorder).crawlDomain(new DomainLinks(), reference);
new CrawlDataReference(stream));
} }
catch (IOException | SQLException ex) { catch (IOException | SQLException ex) {
Assertions.fail(ex); Assertions.fail(ex);

View File

@@ -3,7 +3,6 @@ package nu.marginalia.extractor;
import com.google.inject.Inject; import com.google.inject.Inject;
import gnu.trove.set.hash.TLongHashSet; import gnu.trove.set.hash.TLongHashSet;
import nu.marginalia.hash.MurmurHash3_128; import nu.marginalia.hash.MurmurHash3_128;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.link_parser.LinkParser; import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
@@ -59,7 +58,7 @@ public class AtagExporter implements ExporterIf {
} }
Path crawlDataPath = inputDir.resolve(item.relPath()); Path crawlDataPath = inputDir.resolve(item.relPath());
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) { try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
exportLinks(tagWriter, stream); exportLinks(tagWriter, stream);
} }
catch (Exception ex) { catch (Exception ex) {

View File

@@ -1,7 +1,6 @@
package nu.marginalia.extractor; package nu.marginalia.extractor;
import com.google.inject.Inject; import com.google.inject.Inject;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.link_parser.FeedExtractor; import nu.marginalia.link_parser.FeedExtractor;
import nu.marginalia.link_parser.LinkParser; import nu.marginalia.link_parser.LinkParser;
@@ -56,7 +55,7 @@ public class FeedExporter implements ExporterIf {
} }
Path crawlDataPath = inputDir.resolve(item.relPath()); Path crawlDataPath = inputDir.resolve(item.relPath());
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) { try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
exportFeeds(tagWriter, stream); exportFeeds(tagWriter, stream);
} }
catch (Exception ex) { catch (Exception ex) {
@@ -75,7 +74,7 @@ public class FeedExporter implements ExporterIf {
private boolean exportFeeds(FeedCsvWriter exporter, SerializableCrawlDataStream stream) throws IOException, URISyntaxException { private boolean exportFeeds(FeedCsvWriter exporter, SerializableCrawlDataStream stream) throws IOException, URISyntaxException {
FeedExtractor feedExtractor = new FeedExtractor(new LinkParser()); FeedExtractor feedExtractor = new FeedExtractor(new LinkParser());
int size = stream.sizeHint(); int size = stream.getSizeHint();
while (stream.hasNext()) { while (stream.hasNext()) {
if (!(stream.next() instanceof CrawledDocument doc)) if (!(stream.next() instanceof CrawledDocument doc))

View File

@@ -5,7 +5,7 @@ import gnu.trove.map.hash.TLongIntHashMap;
import gnu.trove.set.hash.TLongHashSet; import gnu.trove.set.hash.TLongHashSet;
import nu.marginalia.WmsaHome; import nu.marginalia.WmsaHome;
import nu.marginalia.converting.processor.logic.dom.DomPruningFilter; import nu.marginalia.converting.processor.logic.dom.DomPruningFilter;
import nu.marginalia.io.CrawledDomainReader; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.language.filter.LanguageFilter; import nu.marginalia.language.filter.LanguageFilter;
import nu.marginalia.language.model.DocumentLanguageData; import nu.marginalia.language.model.DocumentLanguageData;
import nu.marginalia.language.sentence.SentenceExtractor; import nu.marginalia.language.sentence.SentenceExtractor;
@@ -103,7 +103,7 @@ public class TermFrequencyExporter implements ExporterIf {
{ {
TLongHashSet words = new TLongHashSet(1000); TLongHashSet words = new TLongHashSet(1000);
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) { try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
while (stream.hasNext()) { while (stream.hasNext()) {
if (Thread.interrupted()) if (Thread.interrupted())
return; return;

View File

@@ -228,7 +228,7 @@ public class LiveCrawlDataSet implements AutoCloseable {
} }
@Override @Override
public boolean hasNext() throws IOException { public boolean hasNext() {
if (dataStack == null) { if (dataStack == null) {
query(); query();
} }
@@ -236,7 +236,7 @@ public class LiveCrawlDataSet implements AutoCloseable {
} }
@Override @Override
public void close() throws Exception { public void close() {
dataStack.clear(); dataStack.clear();
} }
} }

View File

@@ -3,7 +3,7 @@ package nu.marginalia.tools;
import com.google.inject.Guice; import com.google.inject.Guice;
import com.google.inject.Injector; import com.google.inject.Injector;
import nu.marginalia.converting.ConverterModule; import nu.marginalia.converting.ConverterModule;
import nu.marginalia.io.CrawledDomainReader; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.process.log.WorkLog; import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.module.DatabaseModule;
@@ -40,7 +40,7 @@ public class ExperimentRunnerMain {
Path basePath = Path.of(args[0]); Path basePath = Path.of(args[0]);
for (var item : WorkLog.iterable(basePath.resolve("crawler.log"))) { for (var item : WorkLog.iterable(basePath.resolve("crawler.log"))) {
Path crawlDataPath = basePath.resolve(item.relPath()); Path crawlDataPath = basePath.resolve(item.relPath());
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) { try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
experiment.process(stream); experiment.process(stream);
} }
catch (Exception ex) { catch (Exception ex) {

View File

@@ -26,7 +26,7 @@ import nu.marginalia.index.index.StatefulIndex;
import nu.marginalia.index.journal.IndexJournal; import nu.marginalia.index.journal.IndexJournal;
import nu.marginalia.index.model.SearchParameters; import nu.marginalia.index.model.SearchParameters;
import nu.marginalia.index.searchset.SearchSetAny; import nu.marginalia.index.searchset.SearchSetAny;
import nu.marginalia.io.CrawledDomainReader; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.linkdb.docs.DocumentDbReader; import nu.marginalia.linkdb.docs.DocumentDbReader;
import nu.marginalia.linkdb.docs.DocumentDbWriter; import nu.marginalia.linkdb.docs.DocumentDbWriter;
import nu.marginalia.loading.LoaderIndexJournalWriter; import nu.marginalia.loading.LoaderIndexJournalWriter;
@@ -152,7 +152,7 @@ public class IntegrationTest {
/** PROCESS CRAWL DATA */ /** PROCESS CRAWL DATA */
var processedDomain = domainProcessor.fullProcessing(CrawledDomainReader.createDataStream(crawlDataParquet)); var processedDomain = domainProcessor.fullProcessing(SerializableCrawlDataStream.openDataStream(crawlDataParquet));
System.out.println(processedDomain); System.out.println(processedDomain);

View File

@@ -234,7 +234,7 @@ dependencyResolutionManagement {
library('jetty-util','org.eclipse.jetty','jetty-util').version('9.4.54.v20240208') library('jetty-util','org.eclipse.jetty','jetty-util').version('9.4.54.v20240208')
library('jetty-servlet','org.eclipse.jetty','jetty-servlet').version('9.4.54.v20240208') library('jetty-servlet','org.eclipse.jetty','jetty-servlet').version('9.4.54.v20240208')
library('slop', 'nu.marginalia', 'slop').version('0.0.9-org-5-SNAPSHOT') library('slop', 'nu.marginalia', 'slop').version('0.0.10-SNAPSHOT')
library('jooby-netty','io.jooby','jooby-netty').version(joobyVersion) library('jooby-netty','io.jooby','jooby-netty').version(joobyVersion)
library('jooby-jte','io.jooby','jooby-jte').version(joobyVersion) library('jooby-jte','io.jooby','jooby-jte').version(joobyVersion)
library('jooby-apt','io.jooby','jooby-apt').version(joobyVersion) library('jooby-apt','io.jooby','jooby-apt').version(joobyVersion)