1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

5 Commits

17 changed files with 210 additions and 185 deletions

View File

@@ -12,7 +12,7 @@ import nu.marginalia.converting.sideload.SideloadSourceFactory;
import nu.marginalia.converting.writer.ConverterBatchWritableIf; import nu.marginalia.converting.writer.ConverterBatchWritableIf;
import nu.marginalia.converting.writer.ConverterBatchWriter; import nu.marginalia.converting.writer.ConverterBatchWriter;
import nu.marginalia.converting.writer.ConverterWriter; import nu.marginalia.converting.writer.ConverterWriter;
import nu.marginalia.io.CrawledDomainReader; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqapi.converting.ConvertRequest; import nu.marginalia.mqapi.converting.ConvertRequest;
import nu.marginalia.process.ProcessConfiguration; import nu.marginalia.process.ProcessConfiguration;
@@ -202,17 +202,18 @@ public class ConverterMain extends ProcessMainClass {
heartbeat.setProgress(processedDomains.get() / (double) totalDomains); heartbeat.setProgress(processedDomains.get() / (double) totalDomains);
logger.info("Processing small items"); logger.info("Processing small items");
int numBigTasks = 0;
// First process the small items // First process the small items
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(), for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog))) new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
{ {
if (CrawledDomainReader.sizeHint(dataPath) >= SIDELOAD_THRESHOLD) { if (SerializableCrawlDataStream.getSizeHint(dataPath) >= SIDELOAD_THRESHOLD) {
numBigTasks ++;
continue; continue;
} }
pool.submit(() -> { pool.submit(() -> {
try (var dataStream = CrawledDomainReader.createDataStream(dataPath)) { try (var dataStream = SerializableCrawlDataStream.openDataStream(dataPath)) {
ConverterBatchWritableIf writable = processor.fullProcessing(dataStream) ; ConverterBatchWritableIf writable = processor.fullProcessing(dataStream) ;
converterWriter.accept(writable); converterWriter.accept(writable);
} }
@@ -235,24 +236,35 @@ public class ConverterMain extends ProcessMainClass {
logger.info("Processing large items"); logger.info("Processing large items");
// Next the big items domain-by-domain try (var hb = heartbeat.createAdHocTaskHeartbeat("Large Domains")) {
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(), int bigTaskIdx = 0;
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog))) // Next the big items domain-by-domain
{ for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
int sizeHint = CrawledDomainReader.sizeHint(dataPath); new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
if (sizeHint < SIDELOAD_THRESHOLD) { {
continue; int sizeHint = SerializableCrawlDataStream.getSizeHint(dataPath);
} if (sizeHint < SIDELOAD_THRESHOLD) {
continue;
}
try (var dataStream = CrawledDomainReader.createDataStream(dataPath)) { hb.progress(dataPath.toFile().getName(), bigTaskIdx++, numBigTasks);
ConverterBatchWritableIf writable = processor.simpleProcessing(dataStream, sizeHint);
converterWriter.accept(writable); try {
} // SerializableCrawlDataStream is autocloseable, we can't try-with-resources because then it will be
catch (Exception ex) { // closed before it's consumed by the converterWriter. Instead, the converterWriter guarantees it
logger.info("Error in processing", ex); // will close it after it's consumed.
}
finally { var stream = SerializableCrawlDataStream.openDataStream(dataPath);
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains); ConverterBatchWritableIf writable = processor.simpleProcessing(stream, sizeHint);
converterWriter.accept(writable);
}
catch (Exception ex) {
logger.info("Error in processing", ex);
}
finally {
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
}
} }
} }

View File

@@ -39,6 +39,9 @@ public class ConverterWriter implements AutoCloseable {
workerThread.start(); workerThread.start();
} }
/** Queue and eventually write the domain into the converter journal
* The domain object will be closed after it's processed.
* */
public void accept(@Nullable ConverterBatchWritableIf domain) { public void accept(@Nullable ConverterBatchWritableIf domain) {
if (null == domain) if (null == domain)
return; return;
@@ -72,15 +75,15 @@ public class ConverterWriter implements AutoCloseable {
if (workLog.isItemCommitted(id) || workLog.isItemInCurrentBatch(id)) { if (workLog.isItemCommitted(id) || workLog.isItemInCurrentBatch(id)) {
logger.warn("Skipping already logged item {}", id); logger.warn("Skipping already logged item {}", id);
}
else {
currentWriter.write(data);
workLog.logItem(id);
data.close(); data.close();
continue;
} }
currentWriter.write(data);
workLog.logItem(id);
switcher.tick(); switcher.tick();
data.close();
} }
} }
catch (Exception ex) { catch (Exception ex) {

View File

@@ -19,7 +19,6 @@ import nu.marginalia.crawl.retreival.DomainProber;
import nu.marginalia.crawl.warc.WarcArchiverFactory; import nu.marginalia.crawl.warc.WarcArchiverFactory;
import nu.marginalia.crawl.warc.WarcArchiverIf; import nu.marginalia.crawl.warc.WarcArchiverIf;
import nu.marginalia.db.DomainBlacklist; import nu.marginalia.db.DomainBlacklist;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.CrawlerOutputFile; import nu.marginalia.io.CrawlerOutputFile;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MessageQueueFactory;
@@ -417,13 +416,13 @@ public class CrawlerMain extends ProcessMainClass {
try { try {
Path slopPath = CrawlerOutputFile.getSlopPath(outputDir, id, domain); Path slopPath = CrawlerOutputFile.getSlopPath(outputDir, id, domain);
if (Files.exists(slopPath)) { if (Files.exists(slopPath)) {
return new CrawlDataReference(CrawledDomainReader.createDataStream(slopPath)); return new CrawlDataReference(slopPath);
} }
Path parquetPath = CrawlerOutputFile.getParquetPath(outputDir, id, domain); Path parquetPath = CrawlerOutputFile.getParquetPath(outputDir, id, domain);
if (Files.exists(parquetPath)) { if (Files.exists(parquetPath)) {
slopPath = migrateParquetData(parquetPath, domain, outputDir); slopPath = migrateParquetData(parquetPath, domain, outputDir);
return new CrawlDataReference(CrawledDomainReader.createDataStream(slopPath)); return new CrawlDataReference(slopPath);
} }
} catch (IOException e) { } catch (IOException e) {

View File

@@ -4,6 +4,7 @@ import nu.marginalia.ContentTypes;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.lsh.EasyLSH; import nu.marginalia.lsh.EasyLSH;
import nu.marginalia.model.crawldata.CrawledDocument; import nu.marginalia.model.crawldata.CrawledDocument;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -11,51 +12,73 @@ import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
/** A reference to a domain that has been crawled before. */ /** A reference to a domain that has been crawled before. */
public class CrawlDataReference implements AutoCloseable { public class CrawlDataReference implements AutoCloseable, Iterable<CrawledDocument> {
private boolean closed = false;
@Nullable
private final Path path;
@Nullable
private SerializableCrawlDataStream data = null;
private final SerializableCrawlDataStream data;
private static final Logger logger = LoggerFactory.getLogger(CrawlDataReference.class); private static final Logger logger = LoggerFactory.getLogger(CrawlDataReference.class);
public CrawlDataReference(SerializableCrawlDataStream data) { public CrawlDataReference(@Nullable Path path) {
this.data = data; this.path = path;
} }
public CrawlDataReference() { public CrawlDataReference() {
this(SerializableCrawlDataStream.empty()); this(null);
} }
/** Delete the associated data from disk, if it exists */ /** Delete the associated data from disk, if it exists */
public void delete() throws IOException { public void delete() throws IOException {
Path filePath = data.path(); if (path != null) {
Files.deleteIfExists(path);
if (filePath != null) {
Files.deleteIfExists(filePath);
} }
} }
/** Get the next document from the crawl data, public @NotNull Iterator<CrawledDocument> iterator() {
* returning null when there are no more documents
* available
*/
@Nullable
public CrawledDocument nextDocument() {
try {
while (data.hasNext()) {
if (data.next() instanceof CrawledDocument doc) {
if (!ContentTypes.isAccepted(doc.contentType))
continue;
return doc; requireStream();
// Guaranteed by requireStream, but helps java
Objects.requireNonNull(data);
return data.map(next -> {
if (next instanceof CrawledDocument doc && ContentTypes.isAccepted(doc.contentType)) {
return Optional.of(doc);
}
else {
return Optional.empty();
}
});
}
/** After calling this method, data is guaranteed to be non-null */
private void requireStream() {
if (closed) {
throw new IllegalStateException("Use after close()");
}
if (data == null) {
try {
if (path != null) {
data = SerializableCrawlDataStream.openDataStream(path);
return;
} }
} }
} catch (Exception ex) {
catch (IOException ex) { logger.error("Failed to open stream", ex);
logger.error("Failed to read next document", ex); }
}
return null; data = SerializableCrawlDataStream.empty();
}
} }
public static boolean isContentBodySame(byte[] one, byte[] other) { public static boolean isContentBodySame(byte[] one, byte[] other) {
@@ -98,7 +121,12 @@ public class CrawlDataReference implements AutoCloseable {
} }
@Override @Override
public void close() throws Exception { public void close() throws IOException {
data.close(); if (!closed) {
if (data != null) {
data.close();
}
closed = true;
}
} }
} }

View File

@@ -89,30 +89,45 @@ public class CrawlerRetreiver implements AutoCloseable {
} }
public int crawlDomain(DomainLinks domainLinks, CrawlDataReference oldCrawlData) { public int crawlDomain(DomainLinks domainLinks, CrawlDataReference oldCrawlData) {
try { try (oldCrawlData) {
// Do an initial domain probe to determine the root URL // Do an initial domain probe to determine the root URL
EdgeUrl rootUrl;
var probeResult = probeRootUrl(); var probeResult = probeRootUrl();
switch (probeResult) {
return switch (probeResult) {
case HttpFetcher.DomainProbeResult.Ok(EdgeUrl probedUrl) -> { case HttpFetcher.DomainProbeResult.Ok(EdgeUrl probedUrl) -> {
rootUrl = probedUrl; // Good track
// Sleep after the initial probe, we don't have access to the robots.txt yet
// so we don't know the crawl delay
TimeUnit.SECONDS.sleep(1);
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(probedUrl.domain, warcRecorder);
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
delayTimer.waitFetchDelay(0); // initial delay after robots.txt
DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(probedUrl, delayTimer);
domainStateDb.save(summaryRecord);
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) {
// If we have reference data, we will always grow the crawl depth a bit
crawlFrontier.increaseDepth(1.5, 2500);
}
oldCrawlData.close(); // proactively close the crawl data reference here to not hold onto expensive resources
yield crawlDomain(probedUrl, robotsRules, delayTimer, domainLinks);
} }
case HttpFetcher.DomainProbeResult.Redirect(EdgeDomain domain1) -> { case HttpFetcher.DomainProbeResult.Redirect(EdgeDomain domain1) -> {
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, "Redirect", domain1.toString())); domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, "Redirect", domain1.toString()));
return 1; yield 1;
} }
case HttpFetcher.DomainProbeResult.Error(CrawlerDomainStatus status, String desc) -> { case HttpFetcher.DomainProbeResult.Error(CrawlerDomainStatus status, String desc) -> {
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, status.toString(), desc)); domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, status.toString(), desc));
return 1; yield 1;
} }
} };
// Sleep after the initial probe, we don't have access to the robots.txt yet
// so we don't know the crawl delay
TimeUnit.SECONDS.sleep(1);
return crawlDomain(oldCrawlData, rootUrl, domainLinks);
} }
catch (Exception ex) { catch (Exception ex) {
logger.error("Error crawling domain {}", domain, ex); logger.error("Error crawling domain {}", domain, ex);
@@ -120,28 +135,15 @@ public class CrawlerRetreiver implements AutoCloseable {
} }
} }
private int crawlDomain(CrawlDataReference oldCrawlData, private int crawlDomain(EdgeUrl rootUrl,
EdgeUrl rootUrl, SimpleRobotRules robotsRules,
DomainLinks domainLinks) throws InterruptedException { CrawlDelayTimer delayTimer,
DomainLinks domainLinks) {
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(rootUrl.domain, warcRecorder);
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
delayTimer.waitFetchDelay(0); // initial delay after robots.txt
DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(rootUrl, delayTimer);
domainStateDb.save(summaryRecord);
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) {
// If we have reference data, we will always grow the crawl depth a bit
crawlFrontier.increaseDepth(1.5, 2500);
}
// Add external links to the crawl frontier // Add external links to the crawl frontier
crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto)); crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto));
// Fetch sitemaps // Fetch sitemaps
for (var sitemap : robotsRules.getSitemaps()) { for (var sitemap : robotsRules.getSitemaps()) {
crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer)); crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer));

View File

@@ -40,18 +40,12 @@ public class CrawlerRevisitor {
int errors = 0; int errors = 0;
int skipped = 0; int skipped = 0;
for (;;) { for (CrawledDocument doc : oldCrawlData) {
if (errors > 20) { if (errors > 20) {
// If we've had too many errors, we'll stop trying to recrawl // If we've had too many errors, we'll stop trying to recrawl
break; break;
} }
CrawledDocument doc = oldCrawlData.nextDocument();
if (doc == null)
break;
// This Shouldn't Happen (TM)
var urlMaybe = EdgeUrl.parse(doc.url); var urlMaybe = EdgeUrl.parse(doc.url);
if (urlMaybe.isEmpty()) if (urlMaybe.isEmpty())
continue; continue;

View File

@@ -1,53 +0,0 @@
package nu.marginalia.io;
import nu.marginalia.io.crawldata.format.ParquetSerializableCrawlDataStream;
import nu.marginalia.io.crawldata.format.SlopSerializableCrawlDataStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
public class CrawledDomainReader {
private static final Logger logger = LoggerFactory.getLogger(CrawledDomainReader.class);
/** An iterator-like access to domain data This must be closed otherwise it will leak off-heap memory! */
public static SerializableCrawlDataStream createDataStream(Path fullPath) throws IOException
{
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".parquet")) {
try {
return new ParquetSerializableCrawlDataStream(fullPath);
} catch (Exception ex) {
logger.error("Error reading domain data from " + fullPath, ex);
return SerializableCrawlDataStream.empty();
}
}
if (fileName.endsWith(".slop.zip")) {
try {
return new SlopSerializableCrawlDataStream(fullPath);
} catch (Exception ex) {
logger.error("Error reading domain data from " + fullPath, ex);
return SerializableCrawlDataStream.empty();
}
}
logger.error("Unknown file type: {}", fullPath);
return SerializableCrawlDataStream.empty();
}
public static int sizeHint(Path fullPath) {
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".parquet")) {
return ParquetSerializableCrawlDataStream.sizeHint(fullPath);
}
else if (fileName.endsWith(".slop.zip")) {
return SlopSerializableCrawlDataStream.sizeHint(fullPath);
}
else {
return 0;
}
}
}

View File

@@ -1,5 +1,7 @@
package nu.marginalia.io; package nu.marginalia.io;
import nu.marginalia.io.crawldata.format.ParquetSerializableCrawlDataStream;
import nu.marginalia.io.crawldata.format.SlopSerializableCrawlDataStream;
import nu.marginalia.model.crawldata.CrawledDocument; import nu.marginalia.model.crawldata.CrawledDocument;
import nu.marginalia.model.crawldata.CrawledDomain; import nu.marginalia.model.crawldata.CrawledDomain;
import nu.marginalia.model.crawldata.SerializableCrawlData; import nu.marginalia.model.crawldata.SerializableCrawlData;
@@ -18,7 +20,6 @@ import java.util.function.Function;
/** Closable iterator exceptional over serialized crawl data /** Closable iterator exceptional over serialized crawl data
* The data may appear in any order, and the iterator must be closed. * The data may appear in any order, and the iterator must be closed.
* *
* @see CrawledDomainReader
* */ * */
public interface SerializableCrawlDataStream extends AutoCloseable { public interface SerializableCrawlDataStream extends AutoCloseable {
Logger logger = LoggerFactory.getLogger(SerializableCrawlDataStream.class); Logger logger = LoggerFactory.getLogger(SerializableCrawlDataStream.class);
@@ -27,7 +28,7 @@ public interface SerializableCrawlDataStream extends AutoCloseable {
/** Return a size hint for the stream. 0 is returned if the hint is not available, /** Return a size hint for the stream. 0 is returned if the hint is not available,
* or if the file is seemed too small to bother */ * or if the file is seemed too small to bother */
default int sizeHint() { return 0; } default int getSizeHint() { return 0; }
boolean hasNext() throws IOException; boolean hasNext() throws IOException;
@@ -36,6 +37,49 @@ public interface SerializableCrawlDataStream extends AutoCloseable {
void close() throws IOException; void close() throws IOException;
/** An iterator-like access to domain data This must be closed otherwise it will leak off-heap memory! */
static SerializableCrawlDataStream openDataStream(Path fullPath) throws IOException
{
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".parquet")) {
try {
return new ParquetSerializableCrawlDataStream(fullPath);
} catch (Exception ex) {
logger.error("Error reading domain data from " + fullPath, ex);
return SerializableCrawlDataStream.empty();
}
}
if (fileName.endsWith(".slop.zip")) {
try {
return new SlopSerializableCrawlDataStream(fullPath);
} catch (Exception ex) {
logger.error("Error reading domain data from " + fullPath, ex);
return SerializableCrawlDataStream.empty();
}
}
logger.error("Unknown file type: {}", fullPath);
return SerializableCrawlDataStream.empty();
}
/** Get an idication of the size of the stream. This is used to determine whether to
* load the stream into memory or not. 0 is returned if the hint is not available,
* or if the file is seemed too small to bother */
static int getSizeHint(Path fullPath) {
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".parquet")) {
return ParquetSerializableCrawlDataStream.sizeHint(fullPath);
}
else if (fileName.endsWith(".slop.zip")) {
return SlopSerializableCrawlDataStream.sizeHint(fullPath);
}
else {
return 0;
}
}
default <T> Iterator<T> map(Function<SerializableCrawlData, Optional<T>> mapper) { default <T> Iterator<T> map(Function<SerializableCrawlData, Optional<T>> mapper) {
return new Iterator<>() { return new Iterator<>() {
T next = null; T next = null;

View File

@@ -108,15 +108,17 @@ public record SlopCrawlDataRecord(String domain,
public static void convertFromParquet(Path parquetInput, Path slopOutput) throws IOException { public static void convertFromParquet(Path parquetInput, Path slopOutput) throws IOException {
Path tempDir = Files.createTempDirectory(slopOutput.getParent(), "conversion"); Path tempDir = Files.createTempDirectory(slopOutput.getParent(), "conversion");
try (var writer = new Writer(tempDir)) { try (var writer = new Writer(tempDir);
CrawledDocumentParquetRecordFileReader.stream(parquetInput).forEach( var stream = CrawledDocumentParquetRecordFileReader.stream(parquetInput))
parquetRecord -> { {
try { stream.forEach(
writer.write(new SlopCrawlDataRecord(parquetRecord)); parquetRecord -> {
} catch (IOException e) { try {
throw new RuntimeException(e); writer.write(new SlopCrawlDataRecord(parquetRecord));
} } catch (IOException e) {
}); throw new RuntimeException(e);
}
});
} }
catch (IOException ex) { catch (IOException ex) {
FileUtils.deleteDirectory(tempDir.toFile()); FileUtils.deleteDirectory(tempDir.toFile());

View File

@@ -10,7 +10,6 @@ import nu.marginalia.crawl.fetcher.HttpFetcher;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.retreival.*; import nu.marginalia.crawl.retreival.*;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
@@ -227,7 +226,7 @@ class CrawlerRetreiverTest {
convertToParquet(tempFileWarc1, tempFileParquet1); convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) { try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
while (stream.hasNext()) { while (stream.hasNext()) {
if (stream.next() instanceof CrawledDocument doc) { if (stream.next() instanceof CrawledDocument doc) {
data.add(doc); data.add(doc);
@@ -280,7 +279,7 @@ class CrawlerRetreiverTest {
convertToParquet(tempFileWarc1, tempFileParquet1); convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) { try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
while (stream.hasNext()) { while (stream.hasNext()) {
if (stream.next() instanceof CrawledDocument doc) { if (stream.next() instanceof CrawledDocument doc) {
data.add(doc); data.add(doc);
@@ -329,7 +328,7 @@ class CrawlerRetreiverTest {
doCrawl(tempFileWarc1, specs); doCrawl(tempFileWarc1, specs);
convertToParquet(tempFileWarc1, tempFileParquet1); convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) { try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
while (stream.hasNext()) { while (stream.hasNext()) {
if (stream.next() instanceof CrawledDocument doc) { if (stream.next() instanceof CrawledDocument doc) {
data.add(doc); data.add(doc);
@@ -376,7 +375,7 @@ class CrawlerRetreiverTest {
doCrawl(tempFileWarc1, specs); doCrawl(tempFileWarc1, specs);
convertToParquet(tempFileWarc1, tempFileParquet1); convertToParquet(tempFileWarc1, tempFileParquet1);
doCrawlWithReferenceStream(specs, doCrawlWithReferenceStream(specs,
CrawledDomainReader.createDataStream(tempFileParquet1) new CrawlDataReference(tempFileParquet1)
); );
convertToParquet(tempFileWarc2, tempFileParquet2); convertToParquet(tempFileWarc2, tempFileParquet2);
@@ -397,7 +396,7 @@ class CrawlerRetreiverTest {
}); });
} }
try (var ds = CrawledDomainReader.createDataStream(tempFileParquet2)) { try (var ds = SerializableCrawlDataStream.openDataStream(tempFileParquet2)) {
while (ds.hasNext()) { while (ds.hasNext()) {
var doc = ds.next(); var doc = ds.next();
if (doc instanceof CrawledDomain dr) { if (doc instanceof CrawledDomain dr) {
@@ -439,7 +438,7 @@ class CrawlerRetreiverTest {
convertToParquet(tempFileWarc1, tempFileParquet1); convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) { try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
while (stream.hasNext()) { while (stream.hasNext()) {
var doc = stream.next(); var doc = stream.next();
data.computeIfAbsent(doc.getClass(), c -> new ArrayList<>()).add(doc); data.computeIfAbsent(doc.getClass(), c -> new ArrayList<>()).add(doc);
@@ -448,11 +447,9 @@ class CrawlerRetreiverTest {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
var stream = CrawledDomainReader.createDataStream(tempFileParquet1);
System.out.println("---"); System.out.println("---");
doCrawlWithReferenceStream(specs, stream); doCrawlWithReferenceStream(specs, new CrawlDataReference(tempFileParquet1));
var revisitCrawlFrontier = new DomainCrawlFrontier( var revisitCrawlFrontier = new DomainCrawlFrontier(
new EdgeDomain("www.marginalia.nu"), new EdgeDomain("www.marginalia.nu"),
@@ -488,7 +485,7 @@ class CrawlerRetreiverTest {
}); });
} }
try (var ds = CrawledDomainReader.createDataStream(tempFileParquet2)) { try (var ds = SerializableCrawlDataStream.openDataStream(tempFileParquet2)) {
while (ds.hasNext()) { while (ds.hasNext()) {
var doc = ds.next(); var doc = ds.next();
if (doc instanceof CrawledDomain dr) { if (doc instanceof CrawledDomain dr) {
@@ -509,12 +506,11 @@ class CrawlerRetreiverTest {
} }
} }
private void doCrawlWithReferenceStream(CrawlerMain.CrawlSpecRecord specs, SerializableCrawlDataStream stream) { private void doCrawlWithReferenceStream(CrawlerMain.CrawlSpecRecord specs, CrawlDataReference reference) {
try (var recorder = new WarcRecorder(tempFileWarc2, new Cookies()); try (var recorder = new WarcRecorder(tempFileWarc2, new Cookies());
var db = new DomainStateDb(tempFileDb) var db = new DomainStateDb(tempFileDb)
) { ) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, db, recorder).crawlDomain(new DomainLinks(), new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, db, recorder).crawlDomain(new DomainLinks(), reference);
new CrawlDataReference(stream));
} }
catch (IOException | SQLException ex) { catch (IOException | SQLException ex) {
Assertions.fail(ex); Assertions.fail(ex);

View File

@@ -3,7 +3,6 @@ package nu.marginalia.extractor;
import com.google.inject.Inject; import com.google.inject.Inject;
import gnu.trove.set.hash.TLongHashSet; import gnu.trove.set.hash.TLongHashSet;
import nu.marginalia.hash.MurmurHash3_128; import nu.marginalia.hash.MurmurHash3_128;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.link_parser.LinkParser; import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
@@ -59,7 +58,7 @@ public class AtagExporter implements ExporterIf {
} }
Path crawlDataPath = inputDir.resolve(item.relPath()); Path crawlDataPath = inputDir.resolve(item.relPath());
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) { try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
exportLinks(tagWriter, stream); exportLinks(tagWriter, stream);
} }
catch (Exception ex) { catch (Exception ex) {

View File

@@ -1,7 +1,6 @@
package nu.marginalia.extractor; package nu.marginalia.extractor;
import com.google.inject.Inject; import com.google.inject.Inject;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.link_parser.FeedExtractor; import nu.marginalia.link_parser.FeedExtractor;
import nu.marginalia.link_parser.LinkParser; import nu.marginalia.link_parser.LinkParser;
@@ -56,7 +55,7 @@ public class FeedExporter implements ExporterIf {
} }
Path crawlDataPath = inputDir.resolve(item.relPath()); Path crawlDataPath = inputDir.resolve(item.relPath());
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) { try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
exportFeeds(tagWriter, stream); exportFeeds(tagWriter, stream);
} }
catch (Exception ex) { catch (Exception ex) {
@@ -75,7 +74,7 @@ public class FeedExporter implements ExporterIf {
private boolean exportFeeds(FeedCsvWriter exporter, SerializableCrawlDataStream stream) throws IOException, URISyntaxException { private boolean exportFeeds(FeedCsvWriter exporter, SerializableCrawlDataStream stream) throws IOException, URISyntaxException {
FeedExtractor feedExtractor = new FeedExtractor(new LinkParser()); FeedExtractor feedExtractor = new FeedExtractor(new LinkParser());
int size = stream.sizeHint(); int size = stream.getSizeHint();
while (stream.hasNext()) { while (stream.hasNext()) {
if (!(stream.next() instanceof CrawledDocument doc)) if (!(stream.next() instanceof CrawledDocument doc))

View File

@@ -5,7 +5,7 @@ import gnu.trove.map.hash.TLongIntHashMap;
import gnu.trove.set.hash.TLongHashSet; import gnu.trove.set.hash.TLongHashSet;
import nu.marginalia.WmsaHome; import nu.marginalia.WmsaHome;
import nu.marginalia.converting.processor.logic.dom.DomPruningFilter; import nu.marginalia.converting.processor.logic.dom.DomPruningFilter;
import nu.marginalia.io.CrawledDomainReader; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.language.filter.LanguageFilter; import nu.marginalia.language.filter.LanguageFilter;
import nu.marginalia.language.model.DocumentLanguageData; import nu.marginalia.language.model.DocumentLanguageData;
import nu.marginalia.language.sentence.SentenceExtractor; import nu.marginalia.language.sentence.SentenceExtractor;
@@ -103,7 +103,7 @@ public class TermFrequencyExporter implements ExporterIf {
{ {
TLongHashSet words = new TLongHashSet(1000); TLongHashSet words = new TLongHashSet(1000);
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) { try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
while (stream.hasNext()) { while (stream.hasNext()) {
if (Thread.interrupted()) if (Thread.interrupted())
return; return;

View File

@@ -9,7 +9,7 @@
<span> <span>
Access logs containing IP-addresses are retained for up to 24 hours, Access logs containing IP-addresses are retained for up to 24 hours,
anonymized logs with source addresses removed are sometimes kept longer anonymized logs with source addresses removed are sometimes kept longer
for to help diagnosing bugs. to help diagnosing bugs.
</span> </span>
</div> </div>
<div class="flex space-y-4 flex-col"> <div class="flex space-y-4 flex-col">

View File

@@ -3,7 +3,7 @@ package nu.marginalia.tools;
import com.google.inject.Guice; import com.google.inject.Guice;
import com.google.inject.Injector; import com.google.inject.Injector;
import nu.marginalia.converting.ConverterModule; import nu.marginalia.converting.ConverterModule;
import nu.marginalia.io.CrawledDomainReader; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.process.log.WorkLog; import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.module.DatabaseModule;
@@ -40,7 +40,7 @@ public class ExperimentRunnerMain {
Path basePath = Path.of(args[0]); Path basePath = Path.of(args[0]);
for (var item : WorkLog.iterable(basePath.resolve("crawler.log"))) { for (var item : WorkLog.iterable(basePath.resolve("crawler.log"))) {
Path crawlDataPath = basePath.resolve(item.relPath()); Path crawlDataPath = basePath.resolve(item.relPath());
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) { try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
experiment.process(stream); experiment.process(stream);
} }
catch (Exception ex) { catch (Exception ex) {

View File

@@ -26,7 +26,7 @@ import nu.marginalia.index.index.StatefulIndex;
import nu.marginalia.index.journal.IndexJournal; import nu.marginalia.index.journal.IndexJournal;
import nu.marginalia.index.model.SearchParameters; import nu.marginalia.index.model.SearchParameters;
import nu.marginalia.index.searchset.SearchSetAny; import nu.marginalia.index.searchset.SearchSetAny;
import nu.marginalia.io.CrawledDomainReader; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.linkdb.docs.DocumentDbReader; import nu.marginalia.linkdb.docs.DocumentDbReader;
import nu.marginalia.linkdb.docs.DocumentDbWriter; import nu.marginalia.linkdb.docs.DocumentDbWriter;
import nu.marginalia.loading.LoaderIndexJournalWriter; import nu.marginalia.loading.LoaderIndexJournalWriter;
@@ -152,7 +152,7 @@ public class IntegrationTest {
/** PROCESS CRAWL DATA */ /** PROCESS CRAWL DATA */
var processedDomain = domainProcessor.fullProcessing(CrawledDomainReader.createDataStream(crawlDataParquet)); var processedDomain = domainProcessor.fullProcessing(SerializableCrawlDataStream.openDataStream(crawlDataParquet));
System.out.println(processedDomain); System.out.println(processedDomain);

View File

@@ -234,7 +234,7 @@ dependencyResolutionManagement {
library('jetty-util','org.eclipse.jetty','jetty-util').version('9.4.54.v20240208') library('jetty-util','org.eclipse.jetty','jetty-util').version('9.4.54.v20240208')
library('jetty-servlet','org.eclipse.jetty','jetty-servlet').version('9.4.54.v20240208') library('jetty-servlet','org.eclipse.jetty','jetty-servlet').version('9.4.54.v20240208')
library('slop', 'nu.marginalia', 'slop').version('0.0.9-org-5-SNAPSHOT') library('slop', 'nu.marginalia', 'slop').version('0.0.10-SNAPSHOT')
library('jooby-netty','io.jooby','jooby-netty').version(joobyVersion) library('jooby-netty','io.jooby','jooby-netty').version(joobyVersion)
library('jooby-jte','io.jooby','jooby-jte').version(joobyVersion) library('jooby-jte','io.jooby','jooby-jte').version(joobyVersion)
library('jooby-apt','io.jooby','jooby-apt').version(joobyVersion) library('jooby-apt','io.jooby','jooby-apt').version(joobyVersion)