1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-05 21:22:39 +02:00

Compare commits

...

5 Commits

14 changed files with 93 additions and 93 deletions

View File

@@ -12,7 +12,7 @@ import nu.marginalia.converting.sideload.SideloadSourceFactory;
import nu.marginalia.converting.writer.ConverterBatchWritableIf;
import nu.marginalia.converting.writer.ConverterBatchWriter;
import nu.marginalia.converting.writer.ConverterWriter;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqapi.converting.ConvertRequest;
import nu.marginalia.process.ProcessConfiguration;
@@ -207,12 +207,12 @@ public class ConverterMain extends ProcessMainClass {
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
{
if (CrawledDomainReader.sizeHint(dataPath) >= SIDELOAD_THRESHOLD) {
if (SerializableCrawlDataStream.getSizeHint(dataPath) >= SIDELOAD_THRESHOLD) {
continue;
}
pool.submit(() -> {
try (var dataStream = CrawledDomainReader.createDataStream(dataPath)) {
try (var dataStream = SerializableCrawlDataStream.openDataStream(dataPath)) {
ConverterBatchWritableIf writable = processor.fullProcessing(dataStream) ;
converterWriter.accept(writable);
}
@@ -239,13 +239,19 @@ public class ConverterMain extends ProcessMainClass {
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
{
int sizeHint = CrawledDomainReader.sizeHint(dataPath);
int sizeHint = SerializableCrawlDataStream.getSizeHint(dataPath);
if (sizeHint < SIDELOAD_THRESHOLD) {
continue;
}
try (var dataStream = CrawledDomainReader.createDataStream(dataPath)) {
ConverterBatchWritableIf writable = processor.simpleProcessing(dataStream, sizeHint);
try {
// SerializableCrawlDataStream is autocloseable, we can't try-with-resources because then it will be
// closed before it's consumed by the converterWriter. Instead, the converterWriter guarantees it
// will close it after it's consumed.
var stream = SerializableCrawlDataStream.openDataStream(dataPath);
ConverterBatchWritableIf writable = processor.simpleProcessing(stream, sizeHint);
converterWriter.accept(writable);
}
catch (Exception ex) {

View File

@@ -39,6 +39,9 @@ public class ConverterWriter implements AutoCloseable {
workerThread.start();
}
/** Queue and eventually write the domain into the converter journal
* The domain object will be closed after it's processed.
* */
public void accept(@Nullable ConverterBatchWritableIf domain) {
if (null == domain)
return;
@@ -72,15 +75,15 @@ public class ConverterWriter implements AutoCloseable {
if (workLog.isItemCommitted(id) || workLog.isItemInCurrentBatch(id)) {
logger.warn("Skipping already logged item {}", id);
}
else {
currentWriter.write(data);
workLog.logItem(id);
data.close();
continue;
}
currentWriter.write(data);
workLog.logItem(id);
switcher.tick();
data.close();
}
}
catch (Exception ex) {

View File

@@ -26,7 +26,7 @@ public class DocumentBodyToString {
return new String(data, charset);
}
public static Document getParsedData(ContentType type, byte[] data, String url) throws IOException {
public static Document getParsedData(ContentType type, byte[] data, int maxLength, String url) throws IOException {
final Charset charset;
if (type.charset() == null || type.charset().isBlank()) {
@@ -35,7 +35,7 @@ public class DocumentBodyToString {
charset = charsetMap.computeIfAbsent(type, DocumentBodyToString::computeCharset);
}
ByteArrayInputStream bais = new ByteArrayInputStream(data);
ByteArrayInputStream bais = new ByteArrayInputStream(data, 0, Math.min(data.length, maxLength));
return Jsoup.parse(bais, charset.name(), url);
}

View File

@@ -19,8 +19,8 @@ import nu.marginalia.crawl.retreival.DomainProber;
import nu.marginalia.crawl.warc.WarcArchiverFactory;
import nu.marginalia.crawl.warc.WarcArchiverIf;
import nu.marginalia.db.DomainBlacklist;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.CrawlerOutputFile;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.process.ProcessConfiguration;
@@ -417,13 +417,13 @@ public class CrawlerMain extends ProcessMainClass {
try {
Path slopPath = CrawlerOutputFile.getSlopPath(outputDir, id, domain);
if (Files.exists(slopPath)) {
return new CrawlDataReference(CrawledDomainReader.createDataStream(slopPath));
return new CrawlDataReference(SerializableCrawlDataStream.openDataStream(slopPath));
}
Path parquetPath = CrawlerOutputFile.getParquetPath(outputDir, id, domain);
if (Files.exists(parquetPath)) {
slopPath = migrateParquetData(parquetPath, domain, outputDir);
return new CrawlDataReference(CrawledDomainReader.createDataStream(slopPath));
return new CrawlDataReference(SerializableCrawlDataStream.openDataStream(slopPath));
}
} catch (IOException e) {

View File

@@ -1,53 +0,0 @@
package nu.marginalia.io;
import nu.marginalia.io.crawldata.format.ParquetSerializableCrawlDataStream;
import nu.marginalia.io.crawldata.format.SlopSerializableCrawlDataStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
public class CrawledDomainReader {
private static final Logger logger = LoggerFactory.getLogger(CrawledDomainReader.class);
/** An iterator-like access to domain data This must be closed otherwise it will leak off-heap memory! */
public static SerializableCrawlDataStream createDataStream(Path fullPath) throws IOException
{
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".parquet")) {
try {
return new ParquetSerializableCrawlDataStream(fullPath);
} catch (Exception ex) {
logger.error("Error reading domain data from " + fullPath, ex);
return SerializableCrawlDataStream.empty();
}
}
if (fileName.endsWith(".slop.zip")) {
try {
return new SlopSerializableCrawlDataStream(fullPath);
} catch (Exception ex) {
logger.error("Error reading domain data from " + fullPath, ex);
return SerializableCrawlDataStream.empty();
}
}
logger.error("Unknown file type: {}", fullPath);
return SerializableCrawlDataStream.empty();
}
public static int sizeHint(Path fullPath) {
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".parquet")) {
return ParquetSerializableCrawlDataStream.sizeHint(fullPath);
}
else if (fileName.endsWith(".slop.zip")) {
return SlopSerializableCrawlDataStream.sizeHint(fullPath);
}
else {
return 0;
}
}
}

View File

@@ -1,5 +1,7 @@
package nu.marginalia.io;
import nu.marginalia.io.crawldata.format.ParquetSerializableCrawlDataStream;
import nu.marginalia.io.crawldata.format.SlopSerializableCrawlDataStream;
import nu.marginalia.model.crawldata.CrawledDocument;
import nu.marginalia.model.crawldata.CrawledDomain;
import nu.marginalia.model.crawldata.SerializableCrawlData;
@@ -18,7 +20,6 @@ import java.util.function.Function;
/** Closable iterator exceptional over serialized crawl data
* The data may appear in any order, and the iterator must be closed.
*
* @see CrawledDomainReader
* */
public interface SerializableCrawlDataStream extends AutoCloseable {
Logger logger = LoggerFactory.getLogger(SerializableCrawlDataStream.class);
@@ -27,7 +28,7 @@ public interface SerializableCrawlDataStream extends AutoCloseable {
/** Return a size hint for the stream. 0 is returned if the hint is not available,
* or if the file is seemed too small to bother */
default int sizeHint() { return 0; }
default int getSizeHint() { return 0; }
boolean hasNext() throws IOException;
@@ -36,6 +37,49 @@ public interface SerializableCrawlDataStream extends AutoCloseable {
void close() throws IOException;
/** An iterator-like access to domain data This must be closed otherwise it will leak off-heap memory! */
static SerializableCrawlDataStream openDataStream(Path fullPath) throws IOException
{
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".parquet")) {
try {
return new ParquetSerializableCrawlDataStream(fullPath);
} catch (Exception ex) {
logger.error("Error reading domain data from " + fullPath, ex);
return SerializableCrawlDataStream.empty();
}
}
if (fileName.endsWith(".slop.zip")) {
try {
return new SlopSerializableCrawlDataStream(fullPath);
} catch (Exception ex) {
logger.error("Error reading domain data from " + fullPath, ex);
return SerializableCrawlDataStream.empty();
}
}
logger.error("Unknown file type: {}", fullPath);
return SerializableCrawlDataStream.empty();
}
/** Get an idication of the size of the stream. This is used to determine whether to
* load the stream into memory or not. 0 is returned if the hint is not available,
* or if the file is seemed too small to bother */
static int getSizeHint(Path fullPath) {
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".parquet")) {
return ParquetSerializableCrawlDataStream.sizeHint(fullPath);
}
else if (fileName.endsWith(".slop.zip")) {
return SlopSerializableCrawlDataStream.sizeHint(fullPath);
}
else {
return 0;
}
}
default <T> Iterator<T> map(Function<SerializableCrawlData, Optional<T>> mapper) {
return new Iterator<>() {
T next = null;

View File

@@ -59,9 +59,12 @@ public final class CrawledDocument implements SerializableCrawlData {
}
public Document parseBody() throws IOException {
// Prevent stalls from parsing excessively large documents
return DocumentBodyToString.getParsedData(
ContentType.parse(contentType),
documentBodyBytes,
200_000,
url);
}

View File

@@ -10,7 +10,6 @@ import nu.marginalia.crawl.fetcher.HttpFetcher;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.retreival.*;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
@@ -227,7 +226,7 @@ class CrawlerRetreiverTest {
convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
while (stream.hasNext()) {
if (stream.next() instanceof CrawledDocument doc) {
data.add(doc);
@@ -280,7 +279,7 @@ class CrawlerRetreiverTest {
convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
while (stream.hasNext()) {
if (stream.next() instanceof CrawledDocument doc) {
data.add(doc);
@@ -329,7 +328,7 @@ class CrawlerRetreiverTest {
doCrawl(tempFileWarc1, specs);
convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
while (stream.hasNext()) {
if (stream.next() instanceof CrawledDocument doc) {
data.add(doc);
@@ -376,7 +375,7 @@ class CrawlerRetreiverTest {
doCrawl(tempFileWarc1, specs);
convertToParquet(tempFileWarc1, tempFileParquet1);
doCrawlWithReferenceStream(specs,
CrawledDomainReader.createDataStream(tempFileParquet1)
SerializableCrawlDataStream.openDataStream(tempFileParquet1)
);
convertToParquet(tempFileWarc2, tempFileParquet2);
@@ -397,7 +396,7 @@ class CrawlerRetreiverTest {
});
}
try (var ds = CrawledDomainReader.createDataStream(tempFileParquet2)) {
try (var ds = SerializableCrawlDataStream.openDataStream(tempFileParquet2)) {
while (ds.hasNext()) {
var doc = ds.next();
if (doc instanceof CrawledDomain dr) {
@@ -439,7 +438,7 @@ class CrawlerRetreiverTest {
convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
while (stream.hasNext()) {
var doc = stream.next();
data.computeIfAbsent(doc.getClass(), c -> new ArrayList<>()).add(doc);
@@ -448,7 +447,7 @@ class CrawlerRetreiverTest {
throw new RuntimeException(e);
}
var stream = CrawledDomainReader.createDataStream(tempFileParquet1);
var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1);
System.out.println("---");
@@ -488,7 +487,7 @@ class CrawlerRetreiverTest {
});
}
try (var ds = CrawledDomainReader.createDataStream(tempFileParquet2)) {
try (var ds = SerializableCrawlDataStream.openDataStream(tempFileParquet2)) {
while (ds.hasNext()) {
var doc = ds.next();
if (doc instanceof CrawledDomain dr) {

View File

@@ -3,7 +3,6 @@ package nu.marginalia.extractor;
import com.google.inject.Inject;
import gnu.trove.set.hash.TLongHashSet;
import nu.marginalia.hash.MurmurHash3_128;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.model.EdgeDomain;
@@ -59,7 +58,7 @@ public class AtagExporter implements ExporterIf {
}
Path crawlDataPath = inputDir.resolve(item.relPath());
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
exportLinks(tagWriter, stream);
}
catch (Exception ex) {

View File

@@ -1,7 +1,6 @@
package nu.marginalia.extractor;
import com.google.inject.Inject;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.link_parser.FeedExtractor;
import nu.marginalia.link_parser.LinkParser;
@@ -56,7 +55,7 @@ public class FeedExporter implements ExporterIf {
}
Path crawlDataPath = inputDir.resolve(item.relPath());
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
exportFeeds(tagWriter, stream);
}
catch (Exception ex) {
@@ -75,7 +74,7 @@ public class FeedExporter implements ExporterIf {
private boolean exportFeeds(FeedCsvWriter exporter, SerializableCrawlDataStream stream) throws IOException, URISyntaxException {
FeedExtractor feedExtractor = new FeedExtractor(new LinkParser());
int size = stream.sizeHint();
int size = stream.getSizeHint();
while (stream.hasNext()) {
if (!(stream.next() instanceof CrawledDocument doc))

View File

@@ -5,7 +5,7 @@ import gnu.trove.map.hash.TLongIntHashMap;
import gnu.trove.set.hash.TLongHashSet;
import nu.marginalia.WmsaHome;
import nu.marginalia.converting.processor.logic.dom.DomPruningFilter;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.language.filter.LanguageFilter;
import nu.marginalia.language.model.DocumentLanguageData;
import nu.marginalia.language.sentence.SentenceExtractor;
@@ -103,7 +103,7 @@ public class TermFrequencyExporter implements ExporterIf {
{
TLongHashSet words = new TLongHashSet(1000);
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
while (stream.hasNext()) {
if (Thread.interrupted())
return;

View File

@@ -228,7 +228,7 @@ public class LiveCrawlDataSet implements AutoCloseable {
}
@Override
public boolean hasNext() throws IOException {
public boolean hasNext() {
if (dataStack == null) {
query();
}
@@ -236,7 +236,7 @@ public class LiveCrawlDataSet implements AutoCloseable {
}
@Override
public void close() throws Exception {
public void close() {
dataStack.clear();
}
}

View File

@@ -3,7 +3,7 @@ package nu.marginalia.tools;
import com.google.inject.Guice;
import com.google.inject.Injector;
import nu.marginalia.converting.ConverterModule;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.module.DatabaseModule;
@@ -40,7 +40,7 @@ public class ExperimentRunnerMain {
Path basePath = Path.of(args[0]);
for (var item : WorkLog.iterable(basePath.resolve("crawler.log"))) {
Path crawlDataPath = basePath.resolve(item.relPath());
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
experiment.process(stream);
}
catch (Exception ex) {

View File

@@ -26,7 +26,7 @@ import nu.marginalia.index.index.StatefulIndex;
import nu.marginalia.index.journal.IndexJournal;
import nu.marginalia.index.model.SearchParameters;
import nu.marginalia.index.searchset.SearchSetAny;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.linkdb.docs.DocumentDbReader;
import nu.marginalia.linkdb.docs.DocumentDbWriter;
import nu.marginalia.loading.LoaderIndexJournalWriter;
@@ -152,7 +152,7 @@ public class IntegrationTest {
/** PROCESS CRAWL DATA */
var processedDomain = domainProcessor.fullProcessing(CrawledDomainReader.createDataStream(crawlDataParquet));
var processedDomain = domainProcessor.fullProcessing(SerializableCrawlDataStream.openDataStream(crawlDataParquet));
System.out.println(processedDomain);