mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 07:32:38 +02:00
Compare commits
12 Commits
deploy-007
...
deploy-008
Author | SHA1 | Date | |
---|---|---|---|
|
1e50e392c6 | ||
|
fb673de370 | ||
|
eee73ab16c | ||
|
5354e034bf | ||
|
a2b076f9be | ||
|
c8b0a32c0f | ||
|
f0d74aa3bb | ||
|
74a1f100f4 | ||
|
eb049658e4 | ||
|
db138b2a6f | ||
|
1673fc284c | ||
|
503ea57d5b |
@@ -10,7 +10,9 @@ import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
import java.util.HashSet;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
|
||||
/** WorkLog is a journal of work done by a process,
|
||||
@@ -61,6 +63,12 @@ public class WorkLog implements AutoCloseable, Closeable {
|
||||
return new WorkLoadIterable<>(logFile, mapper);
|
||||
}
|
||||
|
||||
public static int countEntries(Path crawlerLog) throws IOException{
|
||||
try (var linesStream = Files.lines(crawlerLog)) {
|
||||
return (int) linesStream.filter(WorkLogEntry::isJobId).count();
|
||||
}
|
||||
}
|
||||
|
||||
// Use synchro over concurrent set to avoid competing writes
|
||||
// - correct is better than fast here, it's sketchy enough to use
|
||||
// a PrintWriter
|
||||
|
@@ -8,6 +8,7 @@ import nu.marginalia.actor.state.ActorStep;
|
||||
import nu.marginalia.io.CrawlerOutputFile;
|
||||
import nu.marginalia.process.log.WorkLog;
|
||||
import nu.marginalia.process.log.WorkLogEntry;
|
||||
import nu.marginalia.service.control.ServiceHeartbeat;
|
||||
import nu.marginalia.slop.SlopCrawlDataRecord;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorage;
|
||||
@@ -26,14 +27,15 @@ import java.util.function.Function;
|
||||
public class MigrateCrawlDataActor extends RecordActorPrototype {
|
||||
|
||||
private final FileStorageService fileStorageService;
|
||||
|
||||
private final ServiceHeartbeat serviceHeartbeat;
|
||||
private static final Logger logger = LoggerFactory.getLogger(MigrateCrawlDataActor.class);
|
||||
|
||||
@Inject
|
||||
public MigrateCrawlDataActor(Gson gson, FileStorageService fileStorageService) {
|
||||
public MigrateCrawlDataActor(Gson gson, FileStorageService fileStorageService, ServiceHeartbeat serviceHeartbeat) {
|
||||
super(gson);
|
||||
|
||||
this.fileStorageService = fileStorageService;
|
||||
this.serviceHeartbeat = serviceHeartbeat;
|
||||
}
|
||||
|
||||
public record Run(long fileStorageId) implements ActorStep {}
|
||||
@@ -49,16 +51,22 @@ public class MigrateCrawlDataActor extends RecordActorPrototype {
|
||||
Path crawlerLog = root.resolve("crawler.log");
|
||||
Path newCrawlerLog = Files.createTempFile(root, "crawler", ".migrate.log");
|
||||
|
||||
try (WorkLog workLog = new WorkLog(newCrawlerLog)) {
|
||||
int totalEntries = WorkLog.countEntries(crawlerLog);
|
||||
|
||||
try (WorkLog workLog = new WorkLog(newCrawlerLog);
|
||||
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Migrating")
|
||||
) {
|
||||
int entryIdx = 0;
|
||||
|
||||
for (Map.Entry<WorkLogEntry, Path> item : WorkLog.iterableMap(crawlerLog, new CrawlDataLocator(root))) {
|
||||
|
||||
var entry = item.getKey();
|
||||
var path = item.getValue();
|
||||
|
||||
logger.info("Converting {}", entry.id());
|
||||
heartbeat.progress("Migrating" + path.toFile().getName(), entryIdx++, totalEntries);
|
||||
|
||||
|
||||
if (path.toFile().getName().endsWith(".parquet")) {
|
||||
if (path.toFile().getName().endsWith(".parquet") && Files.exists(path)) {
|
||||
try {
|
||||
String domain = entry.id();
|
||||
String id = Integer.toHexString(domain.hashCode());
|
||||
|
||||
@@ -68,9 +76,14 @@ public class MigrateCrawlDataActor extends RecordActorPrototype {
|
||||
|
||||
workLog.setJobToFinished(entry.id(), outputFile.toString(), entry.cnt());
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.error("Failed to convert " + path, ex);
|
||||
}
|
||||
}
|
||||
else {
|
||||
workLog.setJobToFinished(entry.id(), path.toString(), entry.cnt());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -12,6 +12,7 @@ import nu.marginalia.converting.sideload.SideloadSourceFactory;
|
||||
import nu.marginalia.converting.writer.ConverterBatchWritableIf;
|
||||
import nu.marginalia.converting.writer.ConverterBatchWriter;
|
||||
import nu.marginalia.converting.writer.ConverterWriter;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.mq.MessageQueueFactory;
|
||||
import nu.marginalia.mqapi.converting.ConvertRequest;
|
||||
import nu.marginalia.process.ProcessConfiguration;
|
||||
@@ -49,6 +50,7 @@ public class ConverterMain extends ProcessMainClass {
|
||||
private final ProcessHeartbeat heartbeat;
|
||||
private final FileStorageService fileStorageService;
|
||||
private final SideloadSourceFactory sideloadSourceFactory;
|
||||
private static final int SIDELOAD_THRESHOLD = Integer.getInteger("converter.sideloadThreshold", 10_000);
|
||||
|
||||
public static void main(String... args) throws Exception {
|
||||
|
||||
@@ -199,12 +201,20 @@ public class ConverterMain extends ProcessMainClass {
|
||||
processedDomains.set(batchingWorkLog.size());
|
||||
heartbeat.setProgress(processedDomains.get() / (double) totalDomains);
|
||||
|
||||
for (var domain : WorkLog.iterableMap(crawlDir.getLogFile(),
|
||||
logger.info("Processing small items");
|
||||
int numBigTasks = 0;
|
||||
// First process the small items
|
||||
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
|
||||
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
|
||||
{
|
||||
if (SerializableCrawlDataStream.getSizeHint(dataPath) >= SIDELOAD_THRESHOLD) {
|
||||
numBigTasks ++;
|
||||
continue;
|
||||
}
|
||||
|
||||
pool.submit(() -> {
|
||||
try {
|
||||
ConverterBatchWritableIf writable = processor.createWritable(domain);
|
||||
try (var dataStream = SerializableCrawlDataStream.openDataStream(dataPath)) {
|
||||
ConverterBatchWritableIf writable = processor.fullProcessing(dataStream) ;
|
||||
converterWriter.accept(writable);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
@@ -223,6 +233,42 @@ public class ConverterMain extends ProcessMainClass {
|
||||
do {
|
||||
System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining");
|
||||
} while (!pool.awaitTermination(60, TimeUnit.SECONDS));
|
||||
|
||||
logger.info("Processing large items");
|
||||
|
||||
try (var hb = heartbeat.createAdHocTaskHeartbeat("Large Domains")) {
|
||||
int bigTaskIdx = 0;
|
||||
// Next the big items domain-by-domain
|
||||
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
|
||||
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
|
||||
{
|
||||
int sizeHint = SerializableCrawlDataStream.getSizeHint(dataPath);
|
||||
if (sizeHint < SIDELOAD_THRESHOLD) {
|
||||
continue;
|
||||
}
|
||||
|
||||
hb.progress(dataPath.toFile().getName(), bigTaskIdx++, numBigTasks);
|
||||
|
||||
try {
|
||||
// SerializableCrawlDataStream is autocloseable, we can't try-with-resources because then it will be
|
||||
// closed before it's consumed by the converterWriter. Instead, the converterWriter guarantees it
|
||||
// will close it after it's consumed.
|
||||
|
||||
var stream = SerializableCrawlDataStream.openDataStream(dataPath);
|
||||
ConverterBatchWritableIf writable = processor.simpleProcessing(stream, sizeHint);
|
||||
|
||||
converterWriter.accept(writable);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.info("Error in processing", ex);
|
||||
}
|
||||
finally {
|
||||
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("Processing complete");
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -14,7 +14,6 @@ import nu.marginalia.converting.writer.ConverterBatchWritableIf;
|
||||
import nu.marginalia.converting.writer.ConverterBatchWriter;
|
||||
import nu.marginalia.geoip.GeoIpDictionary;
|
||||
import nu.marginalia.geoip.sources.AsnTable;
|
||||
import nu.marginalia.io.CrawledDomainReader;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.model.crawl.DomainIndexingState;
|
||||
@@ -28,13 +27,11 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.sql.SQLException;
|
||||
import java.util.*;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class DomainProcessor {
|
||||
private static final int SIDELOAD_THRESHOLD = Integer.getInteger("converter.sideloadThreshold", 10_000);
|
||||
private final DocumentProcessor documentProcessor;
|
||||
private final SiteWords siteWords;
|
||||
private final AnchorTagsSource anchorTagsSource;
|
||||
@@ -56,21 +53,6 @@ public class DomainProcessor {
|
||||
geoIpDictionary.waitReady();
|
||||
}
|
||||
|
||||
public ConverterBatchWritableIf createWritable(Path path) throws IOException {
|
||||
|
||||
var dataStream = CrawledDomainReader.createDataStream(path);
|
||||
|
||||
final int sizeHint = dataStream.sizeHint();
|
||||
|
||||
if (sizeHint > SIDELOAD_THRESHOLD) {
|
||||
// If the file is too big, we run a processing mode that doesn't
|
||||
// require loading the entire dataset into RAM
|
||||
return simpleProcessing(dataStream, sizeHint);
|
||||
}
|
||||
|
||||
return fullProcessing(dataStream);
|
||||
}
|
||||
|
||||
public SimpleProcessing simpleProcessing(SerializableCrawlDataStream dataStream, int sizeHint, Collection<String> extraKeywords) {
|
||||
try {
|
||||
return new SimpleProcessing(dataStream, sizeHint, extraKeywords);
|
||||
@@ -159,6 +141,7 @@ public class DomainProcessor {
|
||||
private final Set<String> processedUrls = new HashSet<>();
|
||||
private final DomainLinks externalDomainLinks;
|
||||
private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator();
|
||||
|
||||
private static final ProcessingIterator.Factory iteratorFactory = ProcessingIterator.factory(8,
|
||||
Integer.getInteger("java.util.concurrent.ForkJoinPool.common.parallelism", Runtime.getRuntime().availableProcessors())
|
||||
);
|
||||
@@ -195,8 +178,6 @@ public class DomainProcessor {
|
||||
public Iterator<ProcessedDocument> getDocumentsStream() {
|
||||
return iteratorFactory.create((taskConsumer) -> {
|
||||
|
||||
logger.info("Simple Processing: {}", domain);
|
||||
|
||||
while (dataStream.hasNext())
|
||||
{
|
||||
if (!(dataStream.next() instanceof CrawledDocument doc))
|
||||
@@ -221,8 +202,6 @@ public class DomainProcessor {
|
||||
return processedDoc;
|
||||
});
|
||||
}
|
||||
|
||||
logger.info("Finished Simple Processing: {}", domain);
|
||||
});
|
||||
}
|
||||
|
||||
|
@@ -39,6 +39,9 @@ public class ConverterWriter implements AutoCloseable {
|
||||
workerThread.start();
|
||||
}
|
||||
|
||||
/** Queue and eventually write the domain into the converter journal
|
||||
* The domain object will be closed after it's processed.
|
||||
* */
|
||||
public void accept(@Nullable ConverterBatchWritableIf domain) {
|
||||
if (null == domain)
|
||||
return;
|
||||
@@ -72,15 +75,15 @@ public class ConverterWriter implements AutoCloseable {
|
||||
|
||||
if (workLog.isItemCommitted(id) || workLog.isItemInCurrentBatch(id)) {
|
||||
logger.warn("Skipping already logged item {}", id);
|
||||
}
|
||||
else {
|
||||
currentWriter.write(data);
|
||||
workLog.logItem(id);
|
||||
data.close();
|
||||
continue;
|
||||
}
|
||||
|
||||
currentWriter.write(data);
|
||||
|
||||
workLog.logItem(id);
|
||||
|
||||
switcher.tick();
|
||||
data.close();
|
||||
}
|
||||
}
|
||||
catch (Exception ex) {
|
||||
|
@@ -26,7 +26,7 @@ public class DocumentBodyToString {
|
||||
return new String(data, charset);
|
||||
}
|
||||
|
||||
public static Document getParsedData(ContentType type, byte[] data, String url) throws IOException {
|
||||
public static Document getParsedData(ContentType type, byte[] data, int maxLength, String url) throws IOException {
|
||||
final Charset charset;
|
||||
|
||||
if (type.charset() == null || type.charset().isBlank()) {
|
||||
@@ -35,7 +35,7 @@ public class DocumentBodyToString {
|
||||
charset = charsetMap.computeIfAbsent(type, DocumentBodyToString::computeCharset);
|
||||
}
|
||||
|
||||
ByteArrayInputStream bais = new ByteArrayInputStream(data);
|
||||
ByteArrayInputStream bais = new ByteArrayInputStream(data, 0, Math.min(data.length, maxLength));
|
||||
|
||||
return Jsoup.parse(bais, charset.name(), url);
|
||||
}
|
||||
|
@@ -19,7 +19,6 @@ import nu.marginalia.crawl.retreival.DomainProber;
|
||||
import nu.marginalia.crawl.warc.WarcArchiverFactory;
|
||||
import nu.marginalia.crawl.warc.WarcArchiverIf;
|
||||
import nu.marginalia.db.DomainBlacklist;
|
||||
import nu.marginalia.io.CrawledDomainReader;
|
||||
import nu.marginalia.io.CrawlerOutputFile;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.mq.MessageQueueFactory;
|
||||
@@ -417,13 +416,13 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
try {
|
||||
Path slopPath = CrawlerOutputFile.getSlopPath(outputDir, id, domain);
|
||||
if (Files.exists(slopPath)) {
|
||||
return new CrawlDataReference(CrawledDomainReader.createDataStream(slopPath));
|
||||
return new CrawlDataReference(slopPath);
|
||||
}
|
||||
|
||||
Path parquetPath = CrawlerOutputFile.getParquetPath(outputDir, id, domain);
|
||||
if (Files.exists(parquetPath)) {
|
||||
slopPath = migrateParquetData(parquetPath, domain, outputDir);
|
||||
return new CrawlDataReference(CrawledDomainReader.createDataStream(slopPath));
|
||||
return new CrawlDataReference(slopPath);
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
|
@@ -45,6 +45,7 @@ public class HttpFetcherImpl implements HttpFetcher {
|
||||
private static final ContentTypeLogic contentTypeLogic = new ContentTypeLogic();
|
||||
|
||||
private final Duration requestTimeout = Duration.ofSeconds(10);
|
||||
private final Duration probeTimeout = Duration.ofSeconds(30);
|
||||
|
||||
@Override
|
||||
public void setAllowAllContentTypes(boolean allowAllContentTypes) {
|
||||
@@ -107,12 +108,13 @@ public class HttpFetcherImpl implements HttpFetcher {
|
||||
.HEAD()
|
||||
.uri(url.asURI())
|
||||
.header("User-agent", userAgentString)
|
||||
.timeout(requestTimeout)
|
||||
.timeout(probeTimeout)
|
||||
.build();
|
||||
} catch (URISyntaxException e) {
|
||||
return new DomainProbeResult.Error(CrawlerDomainStatus.ERROR, "Invalid URL");
|
||||
}
|
||||
|
||||
for (int tries = 0;; tries++) {
|
||||
try {
|
||||
var rsp = client.send(head, HttpResponse.BodyHandlers.discarding());
|
||||
EdgeUrl rspUri = new EdgeUrl(rsp.uri());
|
||||
@@ -121,10 +123,13 @@ public class HttpFetcherImpl implements HttpFetcher {
|
||||
return new DomainProbeResult.Redirect(rspUri.domain);
|
||||
}
|
||||
return new DomainProbeResult.Ok(rspUri);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
} catch (Exception ex) {
|
||||
if (tries > 3) {
|
||||
return new DomainProbeResult.Error(CrawlerDomainStatus.ERROR, ex.getMessage());
|
||||
}
|
||||
// else try again ...
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Perform a HEAD request to fetch the content type of a URL.
|
||||
@@ -143,7 +148,7 @@ public class HttpFetcherImpl implements HttpFetcher {
|
||||
var headBuilder = HttpRequest.newBuilder()
|
||||
.HEAD()
|
||||
.uri(url.asURI())
|
||||
.header("User-agent", userAgentString)
|
||||
.header("User-Agent", userAgentString)
|
||||
.header("Accept-Encoding", "gzip")
|
||||
.timeout(requestTimeout)
|
||||
;
|
||||
@@ -215,7 +220,7 @@ public class HttpFetcherImpl implements HttpFetcher {
|
||||
var getBuilder = HttpRequest.newBuilder()
|
||||
.GET()
|
||||
.uri(url.asURI())
|
||||
.header("User-agent", userAgentString)
|
||||
.header("User-Agent", userAgentString)
|
||||
.header("Accept-Encoding", "gzip")
|
||||
.header("Accept-Language", "en,*;q=0.5")
|
||||
.header("Accept", "text/html, application/xhtml+xml, text/*;q=0.8")
|
||||
@@ -307,7 +312,7 @@ public class HttpFetcherImpl implements HttpFetcher {
|
||||
.uri(sitemapUrl.asURI())
|
||||
.header("Accept-Encoding", "gzip")
|
||||
.header("Accept", "text/*, */*;q=0.9")
|
||||
.header("User-agent", userAgentString)
|
||||
.header("User-Agent", userAgentString)
|
||||
.timeout(requestTimeout)
|
||||
.build();
|
||||
|
||||
@@ -386,7 +391,7 @@ public class HttpFetcherImpl implements HttpFetcher {
|
||||
.uri(url.asURI())
|
||||
.header("Accept-Encoding", "gzip")
|
||||
.header("Accept", "text/*, */*;q=0.9")
|
||||
.header("User-agent", userAgentString)
|
||||
.header("User-Agent", userAgentString)
|
||||
.timeout(requestTimeout);
|
||||
|
||||
HttpFetchResult result = recorder.fetch(client, getRequest.build());
|
||||
|
@@ -4,6 +4,7 @@ import nu.marginalia.ContentTypes;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.lsh.EasyLSH;
|
||||
import nu.marginalia.model.crawldata.CrawledDocument;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -11,51 +12,73 @@ import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Iterator;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
/** A reference to a domain that has been crawled before. */
|
||||
public class CrawlDataReference implements AutoCloseable {
|
||||
public class CrawlDataReference implements AutoCloseable, Iterable<CrawledDocument> {
|
||||
|
||||
private boolean closed = false;
|
||||
|
||||
@Nullable
|
||||
private final Path path;
|
||||
|
||||
@Nullable
|
||||
private SerializableCrawlDataStream data = null;
|
||||
|
||||
private final SerializableCrawlDataStream data;
|
||||
private static final Logger logger = LoggerFactory.getLogger(CrawlDataReference.class);
|
||||
|
||||
public CrawlDataReference(SerializableCrawlDataStream data) {
|
||||
this.data = data;
|
||||
public CrawlDataReference(@Nullable Path path) {
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
public CrawlDataReference() {
|
||||
this(SerializableCrawlDataStream.empty());
|
||||
this(null);
|
||||
}
|
||||
|
||||
/** Delete the associated data from disk, if it exists */
|
||||
public void delete() throws IOException {
|
||||
Path filePath = data.path();
|
||||
|
||||
if (filePath != null) {
|
||||
Files.deleteIfExists(filePath);
|
||||
if (path != null) {
|
||||
Files.deleteIfExists(path);
|
||||
}
|
||||
}
|
||||
|
||||
/** Get the next document from the crawl data,
|
||||
* returning null when there are no more documents
|
||||
* available
|
||||
*/
|
||||
@Nullable
|
||||
public CrawledDocument nextDocument() {
|
||||
public @NotNull Iterator<CrawledDocument> iterator() {
|
||||
|
||||
requireStream();
|
||||
// Guaranteed by requireStream, but helps java
|
||||
Objects.requireNonNull(data);
|
||||
|
||||
return data.map(next -> {
|
||||
if (next instanceof CrawledDocument doc && ContentTypes.isAccepted(doc.contentType)) {
|
||||
return Optional.of(doc);
|
||||
}
|
||||
else {
|
||||
return Optional.empty();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/** After calling this method, data is guaranteed to be non-null */
|
||||
private void requireStream() {
|
||||
if (closed) {
|
||||
throw new IllegalStateException("Use after close()");
|
||||
}
|
||||
|
||||
if (data == null) {
|
||||
try {
|
||||
while (data.hasNext()) {
|
||||
if (data.next() instanceof CrawledDocument doc) {
|
||||
if (!ContentTypes.isAccepted(doc.contentType))
|
||||
continue;
|
||||
|
||||
return doc;
|
||||
if (path != null) {
|
||||
data = SerializableCrawlDataStream.openDataStream(path);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (IOException ex) {
|
||||
logger.error("Failed to read next document", ex);
|
||||
catch (Exception ex) {
|
||||
logger.error("Failed to open stream", ex);
|
||||
}
|
||||
|
||||
return null;
|
||||
data = SerializableCrawlDataStream.empty();
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isContentBodySame(byte[] one, byte[] other) {
|
||||
@@ -98,7 +121,12 @@ public class CrawlDataReference implements AutoCloseable {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
public void close() throws IOException {
|
||||
if (!closed) {
|
||||
if (data != null) {
|
||||
data.close();
|
||||
}
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -89,47 +89,23 @@ public class CrawlerRetreiver implements AutoCloseable {
|
||||
}
|
||||
|
||||
public int crawlDomain(DomainLinks domainLinks, CrawlDataReference oldCrawlData) {
|
||||
try {
|
||||
try (oldCrawlData) {
|
||||
// Do an initial domain probe to determine the root URL
|
||||
EdgeUrl rootUrl;
|
||||
|
||||
var probeResult = probeRootUrl();
|
||||
switch (probeResult) {
|
||||
|
||||
return switch (probeResult) {
|
||||
case HttpFetcher.DomainProbeResult.Ok(EdgeUrl probedUrl) -> {
|
||||
rootUrl = probedUrl; // Good track
|
||||
}
|
||||
case HttpFetcher.DomainProbeResult.Redirect(EdgeDomain domain1) -> {
|
||||
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, "Redirect", domain1.toString()));
|
||||
return 1;
|
||||
}
|
||||
case HttpFetcher.DomainProbeResult.Error(CrawlerDomainStatus status, String desc) -> {
|
||||
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, status.toString(), desc));
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Sleep after the initial probe, we don't have access to the robots.txt yet
|
||||
// so we don't know the crawl delay
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
|
||||
return crawlDomain(oldCrawlData, rootUrl, domainLinks);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.error("Error crawling domain {}", domain, ex);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
private int crawlDomain(CrawlDataReference oldCrawlData,
|
||||
EdgeUrl rootUrl,
|
||||
DomainLinks domainLinks) throws InterruptedException {
|
||||
|
||||
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(rootUrl.domain, warcRecorder);
|
||||
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(probedUrl.domain, warcRecorder);
|
||||
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
|
||||
|
||||
delayTimer.waitFetchDelay(0); // initial delay after robots.txt
|
||||
|
||||
DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(rootUrl, delayTimer);
|
||||
DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(probedUrl, delayTimer);
|
||||
domainStateDb.save(summaryRecord);
|
||||
|
||||
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
|
||||
@@ -138,10 +114,36 @@ public class CrawlerRetreiver implements AutoCloseable {
|
||||
crawlFrontier.increaseDepth(1.5, 2500);
|
||||
}
|
||||
|
||||
oldCrawlData.close(); // proactively close the crawl data reference here to not hold onto expensive resources
|
||||
|
||||
yield crawlDomain(probedUrl, robotsRules, delayTimer, domainLinks);
|
||||
}
|
||||
case HttpFetcher.DomainProbeResult.Redirect(EdgeDomain domain1) -> {
|
||||
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, "Redirect", domain1.toString()));
|
||||
yield 1;
|
||||
}
|
||||
case HttpFetcher.DomainProbeResult.Error(CrawlerDomainStatus status, String desc) -> {
|
||||
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, status.toString(), desc));
|
||||
yield 1;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.error("Error crawling domain {}", domain, ex);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
private int crawlDomain(EdgeUrl rootUrl,
|
||||
SimpleRobotRules robotsRules,
|
||||
CrawlDelayTimer delayTimer,
|
||||
DomainLinks domainLinks) {
|
||||
|
||||
|
||||
// Add external links to the crawl frontier
|
||||
crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto));
|
||||
|
||||
|
||||
// Fetch sitemaps
|
||||
for (var sitemap : robotsRules.getSitemaps()) {
|
||||
crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer));
|
||||
|
@@ -40,18 +40,12 @@ public class CrawlerRevisitor {
|
||||
int errors = 0;
|
||||
int skipped = 0;
|
||||
|
||||
for (;;) {
|
||||
for (CrawledDocument doc : oldCrawlData) {
|
||||
if (errors > 20) {
|
||||
// If we've had too many errors, we'll stop trying to recrawl
|
||||
break;
|
||||
}
|
||||
|
||||
CrawledDocument doc = oldCrawlData.nextDocument();
|
||||
|
||||
if (doc == null)
|
||||
break;
|
||||
|
||||
// This Shouldn't Happen (TM)
|
||||
var urlMaybe = EdgeUrl.parse(doc.url);
|
||||
if (urlMaybe.isEmpty())
|
||||
continue;
|
||||
|
@@ -1,41 +0,0 @@
|
||||
package nu.marginalia.io;
|
||||
|
||||
import nu.marginalia.io.crawldata.format.ParquetSerializableCrawlDataStream;
|
||||
import nu.marginalia.io.crawldata.format.SlopSerializableCrawlDataStream;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
|
||||
public class CrawledDomainReader {
|
||||
private static final Logger logger = LoggerFactory.getLogger(CrawledDomainReader.class);
|
||||
|
||||
/** An iterator-like access to domain data This must be closed otherwise it will leak off-heap memory! */
|
||||
public static SerializableCrawlDataStream createDataStream(Path fullPath) throws IOException
|
||||
{
|
||||
|
||||
String fileName = fullPath.getFileName().toString();
|
||||
if (fileName.endsWith(".parquet")) {
|
||||
try {
|
||||
return new ParquetSerializableCrawlDataStream(fullPath);
|
||||
} catch (Exception ex) {
|
||||
logger.error("Error reading domain data from " + fullPath, ex);
|
||||
return SerializableCrawlDataStream.empty();
|
||||
}
|
||||
}
|
||||
|
||||
if (fileName.endsWith(".slop.zip")) {
|
||||
try {
|
||||
return new SlopSerializableCrawlDataStream(fullPath);
|
||||
} catch (Exception ex) {
|
||||
logger.error("Error reading domain data from " + fullPath, ex);
|
||||
return SerializableCrawlDataStream.empty();
|
||||
}
|
||||
}
|
||||
|
||||
logger.error("Unknown file type: {}", fullPath);
|
||||
return SerializableCrawlDataStream.empty();
|
||||
}
|
||||
|
||||
}
|
@@ -1,5 +1,7 @@
|
||||
package nu.marginalia.io;
|
||||
|
||||
import nu.marginalia.io.crawldata.format.ParquetSerializableCrawlDataStream;
|
||||
import nu.marginalia.io.crawldata.format.SlopSerializableCrawlDataStream;
|
||||
import nu.marginalia.model.crawldata.CrawledDocument;
|
||||
import nu.marginalia.model.crawldata.CrawledDomain;
|
||||
import nu.marginalia.model.crawldata.SerializableCrawlData;
|
||||
@@ -18,7 +20,6 @@ import java.util.function.Function;
|
||||
/** Closable iterator exceptional over serialized crawl data
|
||||
* The data may appear in any order, and the iterator must be closed.
|
||||
*
|
||||
* @see CrawledDomainReader
|
||||
* */
|
||||
public interface SerializableCrawlDataStream extends AutoCloseable {
|
||||
Logger logger = LoggerFactory.getLogger(SerializableCrawlDataStream.class);
|
||||
@@ -27,13 +28,58 @@ public interface SerializableCrawlDataStream extends AutoCloseable {
|
||||
|
||||
/** Return a size hint for the stream. 0 is returned if the hint is not available,
|
||||
* or if the file is seemed too small to bother */
|
||||
default int sizeHint() { return 0; }
|
||||
default int getSizeHint() { return 0; }
|
||||
|
||||
boolean hasNext() throws IOException;
|
||||
|
||||
@Nullable
|
||||
default Path path() { return null; }
|
||||
|
||||
void close() throws IOException;
|
||||
|
||||
/** An iterator-like access to domain data This must be closed otherwise it will leak off-heap memory! */
|
||||
static SerializableCrawlDataStream openDataStream(Path fullPath) throws IOException
|
||||
{
|
||||
|
||||
String fileName = fullPath.getFileName().toString();
|
||||
if (fileName.endsWith(".parquet")) {
|
||||
try {
|
||||
return new ParquetSerializableCrawlDataStream(fullPath);
|
||||
} catch (Exception ex) {
|
||||
logger.error("Error reading domain data from " + fullPath, ex);
|
||||
return SerializableCrawlDataStream.empty();
|
||||
}
|
||||
}
|
||||
|
||||
if (fileName.endsWith(".slop.zip")) {
|
||||
try {
|
||||
return new SlopSerializableCrawlDataStream(fullPath);
|
||||
} catch (Exception ex) {
|
||||
logger.error("Error reading domain data from " + fullPath, ex);
|
||||
return SerializableCrawlDataStream.empty();
|
||||
}
|
||||
}
|
||||
|
||||
logger.error("Unknown file type: {}", fullPath);
|
||||
return SerializableCrawlDataStream.empty();
|
||||
}
|
||||
|
||||
/** Get an idication of the size of the stream. This is used to determine whether to
|
||||
* load the stream into memory or not. 0 is returned if the hint is not available,
|
||||
* or if the file is seemed too small to bother */
|
||||
static int getSizeHint(Path fullPath) {
|
||||
String fileName = fullPath.getFileName().toString();
|
||||
if (fileName.endsWith(".parquet")) {
|
||||
return ParquetSerializableCrawlDataStream.sizeHint(fullPath);
|
||||
}
|
||||
else if (fileName.endsWith(".slop.zip")) {
|
||||
return SlopSerializableCrawlDataStream.sizeHint(fullPath);
|
||||
}
|
||||
else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
default <T> Iterator<T> map(Function<SerializableCrawlData, Optional<T>> mapper) {
|
||||
return new Iterator<>() {
|
||||
T next = null;
|
||||
|
@@ -40,7 +40,7 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial
|
||||
return path;
|
||||
}
|
||||
|
||||
public int sizeHint() {
|
||||
public static int sizeHint(Path path) {
|
||||
// Only calculate size hint for large files
|
||||
// (the reason we calculate them in the first place is to assess whether it is large
|
||||
// because it has many documents, or because it is a small number of large documents)
|
||||
|
@@ -52,7 +52,7 @@ public class SlopSerializableCrawlDataStream implements AutoCloseable, Serializa
|
||||
return path;
|
||||
}
|
||||
|
||||
public int sizeHint() {
|
||||
public static int sizeHint(Path path) {
|
||||
// Only calculate size hint for large files
|
||||
// (the reason we calculate them in the first place is to assess whether it is large
|
||||
// because it has many documents, or because it is a small number of large documents)
|
||||
|
@@ -59,9 +59,12 @@ public final class CrawledDocument implements SerializableCrawlData {
|
||||
}
|
||||
|
||||
public Document parseBody() throws IOException {
|
||||
// Prevent stalls from parsing excessively large documents
|
||||
|
||||
return DocumentBodyToString.getParsedData(
|
||||
ContentType.parse(contentType),
|
||||
documentBodyBytes,
|
||||
200_000,
|
||||
url);
|
||||
}
|
||||
|
||||
|
@@ -108,8 +108,10 @@ public record SlopCrawlDataRecord(String domain,
|
||||
public static void convertFromParquet(Path parquetInput, Path slopOutput) throws IOException {
|
||||
Path tempDir = Files.createTempDirectory(slopOutput.getParent(), "conversion");
|
||||
|
||||
try (var writer = new Writer(tempDir)) {
|
||||
CrawledDocumentParquetRecordFileReader.stream(parquetInput).forEach(
|
||||
try (var writer = new Writer(tempDir);
|
||||
var stream = CrawledDocumentParquetRecordFileReader.stream(parquetInput))
|
||||
{
|
||||
stream.forEach(
|
||||
parquetRecord -> {
|
||||
try {
|
||||
writer.write(new SlopCrawlDataRecord(parquetRecord));
|
||||
|
@@ -10,7 +10,6 @@ import nu.marginalia.crawl.fetcher.HttpFetcher;
|
||||
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
|
||||
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
|
||||
import nu.marginalia.crawl.retreival.*;
|
||||
import nu.marginalia.io.CrawledDomainReader;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.model.EdgeUrl;
|
||||
@@ -227,7 +226,7 @@ class CrawlerRetreiverTest {
|
||||
|
||||
convertToParquet(tempFileWarc1, tempFileParquet1);
|
||||
|
||||
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
|
||||
try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
|
||||
while (stream.hasNext()) {
|
||||
if (stream.next() instanceof CrawledDocument doc) {
|
||||
data.add(doc);
|
||||
@@ -280,7 +279,7 @@ class CrawlerRetreiverTest {
|
||||
|
||||
convertToParquet(tempFileWarc1, tempFileParquet1);
|
||||
|
||||
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
|
||||
try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
|
||||
while (stream.hasNext()) {
|
||||
if (stream.next() instanceof CrawledDocument doc) {
|
||||
data.add(doc);
|
||||
@@ -329,7 +328,7 @@ class CrawlerRetreiverTest {
|
||||
doCrawl(tempFileWarc1, specs);
|
||||
convertToParquet(tempFileWarc1, tempFileParquet1);
|
||||
|
||||
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
|
||||
try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
|
||||
while (stream.hasNext()) {
|
||||
if (stream.next() instanceof CrawledDocument doc) {
|
||||
data.add(doc);
|
||||
@@ -376,7 +375,7 @@ class CrawlerRetreiverTest {
|
||||
doCrawl(tempFileWarc1, specs);
|
||||
convertToParquet(tempFileWarc1, tempFileParquet1);
|
||||
doCrawlWithReferenceStream(specs,
|
||||
CrawledDomainReader.createDataStream(tempFileParquet1)
|
||||
new CrawlDataReference(tempFileParquet1)
|
||||
);
|
||||
convertToParquet(tempFileWarc2, tempFileParquet2);
|
||||
|
||||
@@ -397,7 +396,7 @@ class CrawlerRetreiverTest {
|
||||
});
|
||||
}
|
||||
|
||||
try (var ds = CrawledDomainReader.createDataStream(tempFileParquet2)) {
|
||||
try (var ds = SerializableCrawlDataStream.openDataStream(tempFileParquet2)) {
|
||||
while (ds.hasNext()) {
|
||||
var doc = ds.next();
|
||||
if (doc instanceof CrawledDomain dr) {
|
||||
@@ -439,7 +438,7 @@ class CrawlerRetreiverTest {
|
||||
|
||||
convertToParquet(tempFileWarc1, tempFileParquet1);
|
||||
|
||||
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
|
||||
try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
|
||||
while (stream.hasNext()) {
|
||||
var doc = stream.next();
|
||||
data.computeIfAbsent(doc.getClass(), c -> new ArrayList<>()).add(doc);
|
||||
@@ -448,11 +447,9 @@ class CrawlerRetreiverTest {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
var stream = CrawledDomainReader.createDataStream(tempFileParquet1);
|
||||
|
||||
System.out.println("---");
|
||||
|
||||
doCrawlWithReferenceStream(specs, stream);
|
||||
doCrawlWithReferenceStream(specs, new CrawlDataReference(tempFileParquet1));
|
||||
|
||||
var revisitCrawlFrontier = new DomainCrawlFrontier(
|
||||
new EdgeDomain("www.marginalia.nu"),
|
||||
@@ -488,7 +485,7 @@ class CrawlerRetreiverTest {
|
||||
});
|
||||
}
|
||||
|
||||
try (var ds = CrawledDomainReader.createDataStream(tempFileParquet2)) {
|
||||
try (var ds = SerializableCrawlDataStream.openDataStream(tempFileParquet2)) {
|
||||
while (ds.hasNext()) {
|
||||
var doc = ds.next();
|
||||
if (doc instanceof CrawledDomain dr) {
|
||||
@@ -509,12 +506,11 @@ class CrawlerRetreiverTest {
|
||||
}
|
||||
}
|
||||
|
||||
private void doCrawlWithReferenceStream(CrawlerMain.CrawlSpecRecord specs, SerializableCrawlDataStream stream) {
|
||||
private void doCrawlWithReferenceStream(CrawlerMain.CrawlSpecRecord specs, CrawlDataReference reference) {
|
||||
try (var recorder = new WarcRecorder(tempFileWarc2, new Cookies());
|
||||
var db = new DomainStateDb(tempFileDb)
|
||||
) {
|
||||
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, db, recorder).crawlDomain(new DomainLinks(),
|
||||
new CrawlDataReference(stream));
|
||||
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, db, recorder).crawlDomain(new DomainLinks(), reference);
|
||||
}
|
||||
catch (IOException | SQLException ex) {
|
||||
Assertions.fail(ex);
|
||||
|
@@ -3,7 +3,6 @@ package nu.marginalia.extractor;
|
||||
import com.google.inject.Inject;
|
||||
import gnu.trove.set.hash.TLongHashSet;
|
||||
import nu.marginalia.hash.MurmurHash3_128;
|
||||
import nu.marginalia.io.CrawledDomainReader;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.link_parser.LinkParser;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
@@ -59,7 +58,7 @@ public class AtagExporter implements ExporterIf {
|
||||
}
|
||||
|
||||
Path crawlDataPath = inputDir.resolve(item.relPath());
|
||||
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
|
||||
try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
|
||||
exportLinks(tagWriter, stream);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
|
@@ -1,7 +1,6 @@
|
||||
package nu.marginalia.extractor;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import nu.marginalia.io.CrawledDomainReader;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.link_parser.FeedExtractor;
|
||||
import nu.marginalia.link_parser.LinkParser;
|
||||
@@ -56,7 +55,7 @@ public class FeedExporter implements ExporterIf {
|
||||
}
|
||||
|
||||
Path crawlDataPath = inputDir.resolve(item.relPath());
|
||||
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
|
||||
try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
|
||||
exportFeeds(tagWriter, stream);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
@@ -75,7 +74,7 @@ public class FeedExporter implements ExporterIf {
|
||||
private boolean exportFeeds(FeedCsvWriter exporter, SerializableCrawlDataStream stream) throws IOException, URISyntaxException {
|
||||
FeedExtractor feedExtractor = new FeedExtractor(new LinkParser());
|
||||
|
||||
int size = stream.sizeHint();
|
||||
int size = stream.getSizeHint();
|
||||
|
||||
while (stream.hasNext()) {
|
||||
if (!(stream.next() instanceof CrawledDocument doc))
|
||||
|
@@ -5,7 +5,7 @@ import gnu.trove.map.hash.TLongIntHashMap;
|
||||
import gnu.trove.set.hash.TLongHashSet;
|
||||
import nu.marginalia.WmsaHome;
|
||||
import nu.marginalia.converting.processor.logic.dom.DomPruningFilter;
|
||||
import nu.marginalia.io.CrawledDomainReader;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.language.filter.LanguageFilter;
|
||||
import nu.marginalia.language.model.DocumentLanguageData;
|
||||
import nu.marginalia.language.sentence.SentenceExtractor;
|
||||
@@ -103,7 +103,7 @@ public class TermFrequencyExporter implements ExporterIf {
|
||||
{
|
||||
TLongHashSet words = new TLongHashSet(1000);
|
||||
|
||||
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
|
||||
try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
|
||||
while (stream.hasNext()) {
|
||||
if (Thread.interrupted())
|
||||
return;
|
||||
|
@@ -228,7 +228,7 @@ public class LiveCrawlDataSet implements AutoCloseable {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() throws IOException {
|
||||
public boolean hasNext() {
|
||||
if (dataStack == null) {
|
||||
query();
|
||||
}
|
||||
@@ -236,7 +236,7 @@ public class LiveCrawlDataSet implements AutoCloseable {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
public void close() {
|
||||
dataStack.clear();
|
||||
}
|
||||
}
|
||||
|
@@ -9,7 +9,7 @@
|
||||
<span>
|
||||
Access logs containing IP-addresses are retained for up to 24 hours,
|
||||
anonymized logs with source addresses removed are sometimes kept longer
|
||||
for to help diagnosing bugs.
|
||||
to help diagnosing bugs.
|
||||
</span>
|
||||
</div>
|
||||
<div class="flex space-y-4 flex-col">
|
||||
|
@@ -3,7 +3,7 @@ package nu.marginalia.tools;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import nu.marginalia.converting.ConverterModule;
|
||||
import nu.marginalia.io.CrawledDomainReader;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.process.log.WorkLog;
|
||||
import nu.marginalia.service.module.DatabaseModule;
|
||||
|
||||
@@ -40,7 +40,7 @@ public class ExperimentRunnerMain {
|
||||
Path basePath = Path.of(args[0]);
|
||||
for (var item : WorkLog.iterable(basePath.resolve("crawler.log"))) {
|
||||
Path crawlDataPath = basePath.resolve(item.relPath());
|
||||
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
|
||||
try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
|
||||
experiment.process(stream);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
|
@@ -26,7 +26,7 @@ import nu.marginalia.index.index.StatefulIndex;
|
||||
import nu.marginalia.index.journal.IndexJournal;
|
||||
import nu.marginalia.index.model.SearchParameters;
|
||||
import nu.marginalia.index.searchset.SearchSetAny;
|
||||
import nu.marginalia.io.CrawledDomainReader;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.linkdb.docs.DocumentDbReader;
|
||||
import nu.marginalia.linkdb.docs.DocumentDbWriter;
|
||||
import nu.marginalia.loading.LoaderIndexJournalWriter;
|
||||
@@ -152,7 +152,7 @@ public class IntegrationTest {
|
||||
|
||||
/** PROCESS CRAWL DATA */
|
||||
|
||||
var processedDomain = domainProcessor.fullProcessing(CrawledDomainReader.createDataStream(crawlDataParquet));
|
||||
var processedDomain = domainProcessor.fullProcessing(SerializableCrawlDataStream.openDataStream(crawlDataParquet));
|
||||
|
||||
System.out.println(processedDomain);
|
||||
|
||||
|
@@ -234,7 +234,7 @@ dependencyResolutionManagement {
|
||||
library('jetty-util','org.eclipse.jetty','jetty-util').version('9.4.54.v20240208')
|
||||
library('jetty-servlet','org.eclipse.jetty','jetty-servlet').version('9.4.54.v20240208')
|
||||
|
||||
library('slop', 'nu.marginalia', 'slop').version('0.0.9-org-5-SNAPSHOT')
|
||||
library('slop', 'nu.marginalia', 'slop').version('0.0.10-SNAPSHOT')
|
||||
library('jooby-netty','io.jooby','jooby-netty').version(joobyVersion)
|
||||
library('jooby-jte','io.jooby','jooby-jte').version(joobyVersion)
|
||||
library('jooby-apt','io.jooby','jooby-apt').version(joobyVersion)
|
||||
|
Reference in New Issue
Block a user