1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

6 Commits

12 changed files with 188 additions and 134 deletions

View File

@@ -10,7 +10,9 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.*; import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function; import java.util.function.Function;
/** WorkLog is a journal of work done by a process, /** WorkLog is a journal of work done by a process,
@@ -61,6 +63,12 @@ public class WorkLog implements AutoCloseable, Closeable {
return new WorkLoadIterable<>(logFile, mapper); return new WorkLoadIterable<>(logFile, mapper);
} }
public static int countEntries(Path crawlerLog) throws IOException{
try (var linesStream = Files.lines(crawlerLog)) {
return (int) linesStream.filter(WorkLogEntry::isJobId).count();
}
}
// Use synchro over concurrent set to avoid competing writes // Use synchro over concurrent set to avoid competing writes
// - correct is better than fast here, it's sketchy enough to use // - correct is better than fast here, it's sketchy enough to use
// a PrintWriter // a PrintWriter

View File

@@ -8,6 +8,7 @@ import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.io.CrawlerOutputFile; import nu.marginalia.io.CrawlerOutputFile;
import nu.marginalia.process.log.WorkLog; import nu.marginalia.process.log.WorkLog;
import nu.marginalia.process.log.WorkLogEntry; import nu.marginalia.process.log.WorkLogEntry;
import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.slop.SlopCrawlDataRecord; import nu.marginalia.slop.SlopCrawlDataRecord;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorage; import nu.marginalia.storage.model.FileStorage;
@@ -26,14 +27,15 @@ import java.util.function.Function;
public class MigrateCrawlDataActor extends RecordActorPrototype { public class MigrateCrawlDataActor extends RecordActorPrototype {
private final FileStorageService fileStorageService; private final FileStorageService fileStorageService;
private final ServiceHeartbeat serviceHeartbeat;
private static final Logger logger = LoggerFactory.getLogger(MigrateCrawlDataActor.class); private static final Logger logger = LoggerFactory.getLogger(MigrateCrawlDataActor.class);
@Inject @Inject
public MigrateCrawlDataActor(Gson gson, FileStorageService fileStorageService) { public MigrateCrawlDataActor(Gson gson, FileStorageService fileStorageService, ServiceHeartbeat serviceHeartbeat) {
super(gson); super(gson);
this.fileStorageService = fileStorageService; this.fileStorageService = fileStorageService;
this.serviceHeartbeat = serviceHeartbeat;
} }
public record Run(long fileStorageId) implements ActorStep {} public record Run(long fileStorageId) implements ActorStep {}
@@ -49,28 +51,39 @@ public class MigrateCrawlDataActor extends RecordActorPrototype {
Path crawlerLog = root.resolve("crawler.log"); Path crawlerLog = root.resolve("crawler.log");
Path newCrawlerLog = Files.createTempFile(root, "crawler", ".migrate.log"); Path newCrawlerLog = Files.createTempFile(root, "crawler", ".migrate.log");
try (WorkLog workLog = new WorkLog(newCrawlerLog)) { int totalEntries = WorkLog.countEntries(crawlerLog);
try (WorkLog workLog = new WorkLog(newCrawlerLog);
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Migrating")
) {
int entryIdx = 0;
for (Map.Entry<WorkLogEntry, Path> item : WorkLog.iterableMap(crawlerLog, new CrawlDataLocator(root))) { for (Map.Entry<WorkLogEntry, Path> item : WorkLog.iterableMap(crawlerLog, new CrawlDataLocator(root))) {
var entry = item.getKey(); var entry = item.getKey();
var path = item.getValue(); var path = item.getValue();
logger.info("Converting {}", entry.id()); heartbeat.progress("Migrating" + path.toFile().getName(), entryIdx++, totalEntries);
if (path.toFile().getName().endsWith(".parquet") && Files.exists(path)) {
try {
String domain = entry.id();
String id = Integer.toHexString(domain.hashCode());
if (path.toFile().getName().endsWith(".parquet")) { Path outputFile = CrawlerOutputFile.createSlopPath(root, id, domain);
String domain = entry.id();
String id = Integer.toHexString(domain.hashCode());
Path outputFile = CrawlerOutputFile.createSlopPath(root, id, domain); SlopCrawlDataRecord.convertFromParquet(path, outputFile);
SlopCrawlDataRecord.convertFromParquet(path, outputFile); workLog.setJobToFinished(entry.id(), outputFile.toString(), entry.cnt());
}
workLog.setJobToFinished(entry.id(), outputFile.toString(), entry.cnt()); catch (Exception ex) {
logger.error("Failed to convert " + path, ex);
}
} }
else { else {
workLog.setJobToFinished(entry.id(), path.toString(), entry.cnt()); workLog.setJobToFinished(entry.id(), path.toString(), entry.cnt());
} }
} }
} }

View File

@@ -202,12 +202,13 @@ public class ConverterMain extends ProcessMainClass {
heartbeat.setProgress(processedDomains.get() / (double) totalDomains); heartbeat.setProgress(processedDomains.get() / (double) totalDomains);
logger.info("Processing small items"); logger.info("Processing small items");
int numBigTasks = 0;
// First process the small items // First process the small items
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(), for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog))) new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
{ {
if (SerializableCrawlDataStream.getSizeHint(dataPath) >= SIDELOAD_THRESHOLD) { if (SerializableCrawlDataStream.getSizeHint(dataPath) >= SIDELOAD_THRESHOLD) {
numBigTasks ++;
continue; continue;
} }
@@ -235,30 +236,35 @@ public class ConverterMain extends ProcessMainClass {
logger.info("Processing large items"); logger.info("Processing large items");
// Next the big items domain-by-domain try (var hb = heartbeat.createAdHocTaskHeartbeat("Large Domains")) {
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(), int bigTaskIdx = 0;
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog))) // Next the big items domain-by-domain
{ for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
int sizeHint = SerializableCrawlDataStream.getSizeHint(dataPath); new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
if (sizeHint < SIDELOAD_THRESHOLD) { {
continue; int sizeHint = SerializableCrawlDataStream.getSizeHint(dataPath);
} if (sizeHint < SIDELOAD_THRESHOLD) {
continue;
}
try { hb.progress(dataPath.toFile().getName(), bigTaskIdx++, numBigTasks);
// SerializableCrawlDataStream is autocloseable, we can't try-with-resources because then it will be
// closed before it's consumed by the converterWriter. Instead, the converterWriter guarantees it
// will close it after it's consumed.
var stream = SerializableCrawlDataStream.openDataStream(dataPath); try {
ConverterBatchWritableIf writable = processor.simpleProcessing(stream, sizeHint); // SerializableCrawlDataStream is autocloseable, we can't try-with-resources because then it will be
// closed before it's consumed by the converterWriter. Instead, the converterWriter guarantees it
// will close it after it's consumed.
converterWriter.accept(writable); var stream = SerializableCrawlDataStream.openDataStream(dataPath);
} ConverterBatchWritableIf writable = processor.simpleProcessing(stream, sizeHint);
catch (Exception ex) {
logger.info("Error in processing", ex); converterWriter.accept(writable);
} }
finally { catch (Exception ex) {
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains); logger.info("Error in processing", ex);
}
finally {
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
}
} }
} }

View File

@@ -20,7 +20,6 @@ import nu.marginalia.crawl.warc.WarcArchiverFactory;
import nu.marginalia.crawl.warc.WarcArchiverIf; import nu.marginalia.crawl.warc.WarcArchiverIf;
import nu.marginalia.db.DomainBlacklist; import nu.marginalia.db.DomainBlacklist;
import nu.marginalia.io.CrawlerOutputFile; import nu.marginalia.io.CrawlerOutputFile;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.process.ProcessConfiguration; import nu.marginalia.process.ProcessConfiguration;
@@ -417,13 +416,13 @@ public class CrawlerMain extends ProcessMainClass {
try { try {
Path slopPath = CrawlerOutputFile.getSlopPath(outputDir, id, domain); Path slopPath = CrawlerOutputFile.getSlopPath(outputDir, id, domain);
if (Files.exists(slopPath)) { if (Files.exists(slopPath)) {
return new CrawlDataReference(SerializableCrawlDataStream.openDataStream(slopPath)); return new CrawlDataReference(slopPath);
} }
Path parquetPath = CrawlerOutputFile.getParquetPath(outputDir, id, domain); Path parquetPath = CrawlerOutputFile.getParquetPath(outputDir, id, domain);
if (Files.exists(parquetPath)) { if (Files.exists(parquetPath)) {
slopPath = migrateParquetData(parquetPath, domain, outputDir); slopPath = migrateParquetData(parquetPath, domain, outputDir);
return new CrawlDataReference(SerializableCrawlDataStream.openDataStream(slopPath)); return new CrawlDataReference(slopPath);
} }
} catch (IOException e) { } catch (IOException e) {

View File

@@ -45,6 +45,7 @@ public class HttpFetcherImpl implements HttpFetcher {
private static final ContentTypeLogic contentTypeLogic = new ContentTypeLogic(); private static final ContentTypeLogic contentTypeLogic = new ContentTypeLogic();
private final Duration requestTimeout = Duration.ofSeconds(10); private final Duration requestTimeout = Duration.ofSeconds(10);
private final Duration probeTimeout = Duration.ofSeconds(30);
@Override @Override
public void setAllowAllContentTypes(boolean allowAllContentTypes) { public void setAllowAllContentTypes(boolean allowAllContentTypes) {
@@ -107,23 +108,27 @@ public class HttpFetcherImpl implements HttpFetcher {
.HEAD() .HEAD()
.uri(url.asURI()) .uri(url.asURI())
.header("User-agent", userAgentString) .header("User-agent", userAgentString)
.timeout(requestTimeout) .timeout(probeTimeout)
.build(); .build();
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
return new DomainProbeResult.Error(CrawlerDomainStatus.ERROR, "Invalid URL"); return new DomainProbeResult.Error(CrawlerDomainStatus.ERROR, "Invalid URL");
} }
try { for (int tries = 0;; tries++) {
var rsp = client.send(head, HttpResponse.BodyHandlers.discarding()); try {
EdgeUrl rspUri = new EdgeUrl(rsp.uri()); var rsp = client.send(head, HttpResponse.BodyHandlers.discarding());
EdgeUrl rspUri = new EdgeUrl(rsp.uri());
if (!Objects.equals(rspUri.domain, url.domain)) { if (!Objects.equals(rspUri.domain, url.domain)) {
return new DomainProbeResult.Redirect(rspUri.domain); return new DomainProbeResult.Redirect(rspUri.domain);
}
return new DomainProbeResult.Ok(rspUri);
} catch (Exception ex) {
if (tries > 3) {
return new DomainProbeResult.Error(CrawlerDomainStatus.ERROR, ex.getMessage());
}
// else try again ...
} }
return new DomainProbeResult.Ok(rspUri);
}
catch (Exception ex) {
return new DomainProbeResult.Error(CrawlerDomainStatus.ERROR, ex.getMessage());
} }
} }
@@ -143,7 +148,7 @@ public class HttpFetcherImpl implements HttpFetcher {
var headBuilder = HttpRequest.newBuilder() var headBuilder = HttpRequest.newBuilder()
.HEAD() .HEAD()
.uri(url.asURI()) .uri(url.asURI())
.header("User-agent", userAgentString) .header("User-Agent", userAgentString)
.header("Accept-Encoding", "gzip") .header("Accept-Encoding", "gzip")
.timeout(requestTimeout) .timeout(requestTimeout)
; ;
@@ -215,7 +220,7 @@ public class HttpFetcherImpl implements HttpFetcher {
var getBuilder = HttpRequest.newBuilder() var getBuilder = HttpRequest.newBuilder()
.GET() .GET()
.uri(url.asURI()) .uri(url.asURI())
.header("User-agent", userAgentString) .header("User-Agent", userAgentString)
.header("Accept-Encoding", "gzip") .header("Accept-Encoding", "gzip")
.header("Accept-Language", "en,*;q=0.5") .header("Accept-Language", "en,*;q=0.5")
.header("Accept", "text/html, application/xhtml+xml, text/*;q=0.8") .header("Accept", "text/html, application/xhtml+xml, text/*;q=0.8")
@@ -307,7 +312,7 @@ public class HttpFetcherImpl implements HttpFetcher {
.uri(sitemapUrl.asURI()) .uri(sitemapUrl.asURI())
.header("Accept-Encoding", "gzip") .header("Accept-Encoding", "gzip")
.header("Accept", "text/*, */*;q=0.9") .header("Accept", "text/*, */*;q=0.9")
.header("User-agent", userAgentString) .header("User-Agent", userAgentString)
.timeout(requestTimeout) .timeout(requestTimeout)
.build(); .build();
@@ -386,7 +391,7 @@ public class HttpFetcherImpl implements HttpFetcher {
.uri(url.asURI()) .uri(url.asURI())
.header("Accept-Encoding", "gzip") .header("Accept-Encoding", "gzip")
.header("Accept", "text/*, */*;q=0.9") .header("Accept", "text/*, */*;q=0.9")
.header("User-agent", userAgentString) .header("User-Agent", userAgentString)
.timeout(requestTimeout); .timeout(requestTimeout);
HttpFetchResult result = recorder.fetch(client, getRequest.build()); HttpFetchResult result = recorder.fetch(client, getRequest.build());

View File

@@ -4,6 +4,7 @@ import nu.marginalia.ContentTypes;
import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.lsh.EasyLSH; import nu.marginalia.lsh.EasyLSH;
import nu.marginalia.model.crawldata.CrawledDocument; import nu.marginalia.model.crawldata.CrawledDocument;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -11,51 +12,73 @@ import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
/** A reference to a domain that has been crawled before. */ /** A reference to a domain that has been crawled before. */
public class CrawlDataReference implements AutoCloseable { public class CrawlDataReference implements AutoCloseable, Iterable<CrawledDocument> {
private boolean closed = false;
@Nullable
private final Path path;
@Nullable
private SerializableCrawlDataStream data = null;
private final SerializableCrawlDataStream data;
private static final Logger logger = LoggerFactory.getLogger(CrawlDataReference.class); private static final Logger logger = LoggerFactory.getLogger(CrawlDataReference.class);
public CrawlDataReference(SerializableCrawlDataStream data) { public CrawlDataReference(@Nullable Path path) {
this.data = data; this.path = path;
} }
public CrawlDataReference() { public CrawlDataReference() {
this(SerializableCrawlDataStream.empty()); this(null);
} }
/** Delete the associated data from disk, if it exists */ /** Delete the associated data from disk, if it exists */
public void delete() throws IOException { public void delete() throws IOException {
Path filePath = data.path(); if (path != null) {
Files.deleteIfExists(path);
if (filePath != null) {
Files.deleteIfExists(filePath);
} }
} }
/** Get the next document from the crawl data, public @NotNull Iterator<CrawledDocument> iterator() {
* returning null when there are no more documents
* available
*/
@Nullable
public CrawledDocument nextDocument() {
try {
while (data.hasNext()) {
if (data.next() instanceof CrawledDocument doc) {
if (!ContentTypes.isAccepted(doc.contentType))
continue;
return doc; requireStream();
// Guaranteed by requireStream, but helps java
Objects.requireNonNull(data);
return data.map(next -> {
if (next instanceof CrawledDocument doc && ContentTypes.isAccepted(doc.contentType)) {
return Optional.of(doc);
}
else {
return Optional.empty();
}
});
}
/** After calling this method, data is guaranteed to be non-null */
private void requireStream() {
if (closed) {
throw new IllegalStateException("Use after close()");
}
if (data == null) {
try {
if (path != null) {
data = SerializableCrawlDataStream.openDataStream(path);
return;
} }
} }
} catch (Exception ex) {
catch (IOException ex) { logger.error("Failed to open stream", ex);
logger.error("Failed to read next document", ex); }
}
return null; data = SerializableCrawlDataStream.empty();
}
} }
public static boolean isContentBodySame(byte[] one, byte[] other) { public static boolean isContentBodySame(byte[] one, byte[] other) {
@@ -98,7 +121,12 @@ public class CrawlDataReference implements AutoCloseable {
} }
@Override @Override
public void close() throws Exception { public void close() throws IOException {
data.close(); if (!closed) {
if (data != null) {
data.close();
}
closed = true;
}
} }
} }

View File

@@ -89,30 +89,45 @@ public class CrawlerRetreiver implements AutoCloseable {
} }
public int crawlDomain(DomainLinks domainLinks, CrawlDataReference oldCrawlData) { public int crawlDomain(DomainLinks domainLinks, CrawlDataReference oldCrawlData) {
try { try (oldCrawlData) {
// Do an initial domain probe to determine the root URL // Do an initial domain probe to determine the root URL
EdgeUrl rootUrl;
var probeResult = probeRootUrl(); var probeResult = probeRootUrl();
switch (probeResult) {
return switch (probeResult) {
case HttpFetcher.DomainProbeResult.Ok(EdgeUrl probedUrl) -> { case HttpFetcher.DomainProbeResult.Ok(EdgeUrl probedUrl) -> {
rootUrl = probedUrl; // Good track
// Sleep after the initial probe, we don't have access to the robots.txt yet
// so we don't know the crawl delay
TimeUnit.SECONDS.sleep(1);
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(probedUrl.domain, warcRecorder);
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
delayTimer.waitFetchDelay(0); // initial delay after robots.txt
DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(probedUrl, delayTimer);
domainStateDb.save(summaryRecord);
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) {
// If we have reference data, we will always grow the crawl depth a bit
crawlFrontier.increaseDepth(1.5, 2500);
}
oldCrawlData.close(); // proactively close the crawl data reference here to not hold onto expensive resources
yield crawlDomain(probedUrl, robotsRules, delayTimer, domainLinks);
} }
case HttpFetcher.DomainProbeResult.Redirect(EdgeDomain domain1) -> { case HttpFetcher.DomainProbeResult.Redirect(EdgeDomain domain1) -> {
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, "Redirect", domain1.toString())); domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, "Redirect", domain1.toString()));
return 1; yield 1;
} }
case HttpFetcher.DomainProbeResult.Error(CrawlerDomainStatus status, String desc) -> { case HttpFetcher.DomainProbeResult.Error(CrawlerDomainStatus status, String desc) -> {
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, status.toString(), desc)); domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, status.toString(), desc));
return 1; yield 1;
} }
} };
// Sleep after the initial probe, we don't have access to the robots.txt yet
// so we don't know the crawl delay
TimeUnit.SECONDS.sleep(1);
return crawlDomain(oldCrawlData, rootUrl, domainLinks);
} }
catch (Exception ex) { catch (Exception ex) {
logger.error("Error crawling domain {}", domain, ex); logger.error("Error crawling domain {}", domain, ex);
@@ -120,28 +135,15 @@ public class CrawlerRetreiver implements AutoCloseable {
} }
} }
private int crawlDomain(CrawlDataReference oldCrawlData, private int crawlDomain(EdgeUrl rootUrl,
EdgeUrl rootUrl, SimpleRobotRules robotsRules,
DomainLinks domainLinks) throws InterruptedException { CrawlDelayTimer delayTimer,
DomainLinks domainLinks) {
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(rootUrl.domain, warcRecorder);
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
delayTimer.waitFetchDelay(0); // initial delay after robots.txt
DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(rootUrl, delayTimer);
domainStateDb.save(summaryRecord);
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) {
// If we have reference data, we will always grow the crawl depth a bit
crawlFrontier.increaseDepth(1.5, 2500);
}
// Add external links to the crawl frontier // Add external links to the crawl frontier
crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto)); crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto));
// Fetch sitemaps // Fetch sitemaps
for (var sitemap : robotsRules.getSitemaps()) { for (var sitemap : robotsRules.getSitemaps()) {
crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer)); crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer));

View File

@@ -40,18 +40,12 @@ public class CrawlerRevisitor {
int errors = 0; int errors = 0;
int skipped = 0; int skipped = 0;
for (;;) { for (CrawledDocument doc : oldCrawlData) {
if (errors > 20) { if (errors > 20) {
// If we've had too many errors, we'll stop trying to recrawl // If we've had too many errors, we'll stop trying to recrawl
break; break;
} }
CrawledDocument doc = oldCrawlData.nextDocument();
if (doc == null)
break;
// This Shouldn't Happen (TM)
var urlMaybe = EdgeUrl.parse(doc.url); var urlMaybe = EdgeUrl.parse(doc.url);
if (urlMaybe.isEmpty()) if (urlMaybe.isEmpty())
continue; continue;

View File

@@ -108,15 +108,17 @@ public record SlopCrawlDataRecord(String domain,
public static void convertFromParquet(Path parquetInput, Path slopOutput) throws IOException { public static void convertFromParquet(Path parquetInput, Path slopOutput) throws IOException {
Path tempDir = Files.createTempDirectory(slopOutput.getParent(), "conversion"); Path tempDir = Files.createTempDirectory(slopOutput.getParent(), "conversion");
try (var writer = new Writer(tempDir)) { try (var writer = new Writer(tempDir);
CrawledDocumentParquetRecordFileReader.stream(parquetInput).forEach( var stream = CrawledDocumentParquetRecordFileReader.stream(parquetInput))
parquetRecord -> { {
try { stream.forEach(
writer.write(new SlopCrawlDataRecord(parquetRecord)); parquetRecord -> {
} catch (IOException e) { try {
throw new RuntimeException(e); writer.write(new SlopCrawlDataRecord(parquetRecord));
} } catch (IOException e) {
}); throw new RuntimeException(e);
}
});
} }
catch (IOException ex) { catch (IOException ex) {
FileUtils.deleteDirectory(tempDir.toFile()); FileUtils.deleteDirectory(tempDir.toFile());

View File

@@ -375,7 +375,7 @@ class CrawlerRetreiverTest {
doCrawl(tempFileWarc1, specs); doCrawl(tempFileWarc1, specs);
convertToParquet(tempFileWarc1, tempFileParquet1); convertToParquet(tempFileWarc1, tempFileParquet1);
doCrawlWithReferenceStream(specs, doCrawlWithReferenceStream(specs,
SerializableCrawlDataStream.openDataStream(tempFileParquet1) new CrawlDataReference(tempFileParquet1)
); );
convertToParquet(tempFileWarc2, tempFileParquet2); convertToParquet(tempFileWarc2, tempFileParquet2);
@@ -447,11 +447,9 @@ class CrawlerRetreiverTest {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1);
System.out.println("---"); System.out.println("---");
doCrawlWithReferenceStream(specs, stream); doCrawlWithReferenceStream(specs, new CrawlDataReference(tempFileParquet1));
var revisitCrawlFrontier = new DomainCrawlFrontier( var revisitCrawlFrontier = new DomainCrawlFrontier(
new EdgeDomain("www.marginalia.nu"), new EdgeDomain("www.marginalia.nu"),
@@ -508,12 +506,11 @@ class CrawlerRetreiverTest {
} }
} }
private void doCrawlWithReferenceStream(CrawlerMain.CrawlSpecRecord specs, SerializableCrawlDataStream stream) { private void doCrawlWithReferenceStream(CrawlerMain.CrawlSpecRecord specs, CrawlDataReference reference) {
try (var recorder = new WarcRecorder(tempFileWarc2, new Cookies()); try (var recorder = new WarcRecorder(tempFileWarc2, new Cookies());
var db = new DomainStateDb(tempFileDb) var db = new DomainStateDb(tempFileDb)
) { ) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, db, recorder).crawlDomain(new DomainLinks(), new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, db, recorder).crawlDomain(new DomainLinks(), reference);
new CrawlDataReference(stream));
} }
catch (IOException | SQLException ex) { catch (IOException | SQLException ex) {
Assertions.fail(ex); Assertions.fail(ex);

View File

@@ -9,7 +9,7 @@
<span> <span>
Access logs containing IP-addresses are retained for up to 24 hours, Access logs containing IP-addresses are retained for up to 24 hours,
anonymized logs with source addresses removed are sometimes kept longer anonymized logs with source addresses removed are sometimes kept longer
for to help diagnosing bugs. to help diagnosing bugs.
</span> </span>
</div> </div>
<div class="flex space-y-4 flex-col"> <div class="flex space-y-4 flex-col">

View File

@@ -234,7 +234,7 @@ dependencyResolutionManagement {
library('jetty-util','org.eclipse.jetty','jetty-util').version('9.4.54.v20240208') library('jetty-util','org.eclipse.jetty','jetty-util').version('9.4.54.v20240208')
library('jetty-servlet','org.eclipse.jetty','jetty-servlet').version('9.4.54.v20240208') library('jetty-servlet','org.eclipse.jetty','jetty-servlet').version('9.4.54.v20240208')
library('slop', 'nu.marginalia', 'slop').version('0.0.9-org-5-SNAPSHOT') library('slop', 'nu.marginalia', 'slop').version('0.0.10-SNAPSHOT')
library('jooby-netty','io.jooby','jooby-netty').version(joobyVersion) library('jooby-netty','io.jooby','jooby-netty').version(joobyVersion)
library('jooby-jte','io.jooby','jooby-jte').version(joobyVersion) library('jooby-jte','io.jooby','jooby-jte').version(joobyVersion)
library('jooby-apt','io.jooby','jooby-apt').version(joobyVersion) library('jooby-apt','io.jooby','jooby-apt').version(joobyVersion)