1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-05 21:22:39 +02:00

Compare commits

...

17 Commits

Author SHA1 Message Date
Viktor Lofgren
e9af838231 (actor) Fix migration actor final steps 2025-01-30 11:48:21 +01:00
Viktor Lofgren
ae0cad47c4 (actor) Utility method for getting a json prototype for actor states
If we can hook this into the control gui somehow, it'll make for a nice QOL upgrade when manually interacting with the actors.
2025-01-29 15:20:25 +01:00
Viktor Lofgren
5fbc8ef998 (misc) Tidying 2025-01-29 15:17:04 +01:00
Viktor Lofgren
32c6dd9e6a (actor) Delete old data in the migration actor 2025-01-29 14:51:46 +01:00
Viktor Lofgren
6ece6a6cfb (actor) Improve resilience for the migration actor 2025-01-29 14:43:09 +01:00
Viktor Lofgren
39cd1c18f8 Automatically run npm install tailwindcss@3 via setup.sh, as the new default version of the package is incompatible with the project 2025-01-29 12:21:08 +01:00
Viktor
eb65daaa88 Merge pull request #151 from Lionstiger/master
fix small grammar error in footerLegal.jte
2025-01-28 21:49:50 +01:00
Viktor
0bebdb6e33 Merge branch 'master' into master 2025-01-28 21:49:36 +01:00
Viktor Lofgren
1e50e392c6 (actor) Improve logging and error handling for data migration actor 2025-01-28 15:34:36 +01:00
Viktor Lofgren
fb673de370 (crawler) Change the header 'User-agent' to 'User-Agent' 2025-01-28 15:34:16 +01:00
Viktor Lofgren
eee73ab16c (crawler) Be more lenient when performing a domain probe 2025-01-28 15:24:30 +01:00
Viktor Lofgren
5354e034bf (search) Minor grammar fix 2025-01-27 18:36:31 +01:00
Magnus Wulf
72384ad6ca fix small grammar error 2025-01-27 15:04:57 +01:00
Viktor Lofgren
a2b076f9be (converter) Add progress tracking for big domains in converter 2025-01-26 18:03:59 +01:00
Viktor Lofgren
c8b0a32c0f (crawler) Reduce long retention of CrawlDataReference objects and their associated SerializableCrawlDataStreams 2025-01-26 15:40:17 +01:00
Viktor Lofgren
f0d74aa3bb (converter) Fix close() ordering to prevent converter crash 2025-01-26 14:47:36 +01:00
Viktor Lofgren
74a1f100f4 (converter) Refactor to remove CrawledDomainReader and move its functionality into SerializableCrawlDataStream 2025-01-26 14:46:50 +01:00
28 changed files with 327 additions and 282 deletions

View File

@@ -24,58 +24,4 @@ public class LanguageModels {
this.fasttextLanguageModel = fasttextLanguageModel;
this.segments = segments;
}
public static LanguageModelsBuilder builder() {
return new LanguageModelsBuilder();
}
public static class LanguageModelsBuilder {
private Path termFrequencies;
private Path openNLPSentenceDetectionData;
private Path posRules;
private Path posDict;
private Path fasttextLanguageModel;
private Path segments;
LanguageModelsBuilder() {
}
public LanguageModelsBuilder termFrequencies(Path termFrequencies) {
this.termFrequencies = termFrequencies;
return this;
}
public LanguageModelsBuilder openNLPSentenceDetectionData(Path openNLPSentenceDetectionData) {
this.openNLPSentenceDetectionData = openNLPSentenceDetectionData;
return this;
}
public LanguageModelsBuilder posRules(Path posRules) {
this.posRules = posRules;
return this;
}
public LanguageModelsBuilder posDict(Path posDict) {
this.posDict = posDict;
return this;
}
public LanguageModelsBuilder fasttextLanguageModel(Path fasttextLanguageModel) {
this.fasttextLanguageModel = fasttextLanguageModel;
return this;
}
public LanguageModelsBuilder segments(Path segments) {
this.segments = segments;
return this;
}
public LanguageModels build() {
return new LanguageModels(this.termFrequencies, this.openNLPSentenceDetectionData, this.posRules, this.posDict, this.fasttextLanguageModel, this.segments);
}
public String toString() {
return "LanguageModels.LanguageModelsBuilder(termFrequencies=" + this.termFrequencies + ", openNLPSentenceDetectionData=" + this.openNLPSentenceDetectionData + ", posRules=" + this.posRules + ", posDict=" + this.posDict + ", fasttextLanguageModel=" + this.fasttextLanguageModel + ", segments=" + this.segments + ")";
}
}
}

View File

@@ -10,7 +10,9 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.util.*;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
/** WorkLog is a journal of work done by a process,
@@ -61,6 +63,12 @@ public class WorkLog implements AutoCloseable, Closeable {
return new WorkLoadIterable<>(logFile, mapper);
}
public static int countEntries(Path crawlerLog) throws IOException{
try (var linesStream = Files.lines(crawlerLog)) {
return (int) linesStream.filter(WorkLogEntry::isJobId).count();
}
}
// Use synchro over concurrent set to avoid competing writes
// - correct is better than fast here, it's sketchy enough to use
// a PrintWriter

View File

@@ -8,6 +8,7 @@ import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.io.CrawlerOutputFile;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.process.log.WorkLogEntry;
import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.slop.SlopCrawlDataRecord;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorage;
@@ -18,6 +19,7 @@ import org.slf4j.LoggerFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
@@ -26,14 +28,15 @@ import java.util.function.Function;
public class MigrateCrawlDataActor extends RecordActorPrototype {
private final FileStorageService fileStorageService;
private final ServiceHeartbeat serviceHeartbeat;
private static final Logger logger = LoggerFactory.getLogger(MigrateCrawlDataActor.class);
@Inject
public MigrateCrawlDataActor(Gson gson, FileStorageService fileStorageService) {
public MigrateCrawlDataActor(Gson gson, FileStorageService fileStorageService, ServiceHeartbeat serviceHeartbeat) {
super(gson);
this.fileStorageService = fileStorageService;
this.serviceHeartbeat = serviceHeartbeat;
}
public record Run(long fileStorageId) implements ActorStep {}
@@ -49,33 +52,50 @@ public class MigrateCrawlDataActor extends RecordActorPrototype {
Path crawlerLog = root.resolve("crawler.log");
Path newCrawlerLog = Files.createTempFile(root, "crawler", ".migrate.log");
try (WorkLog workLog = new WorkLog(newCrawlerLog)) {
int totalEntries = WorkLog.countEntries(crawlerLog);
try (WorkLog workLog = new WorkLog(newCrawlerLog);
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Migrating")
) {
int entryIdx = 0;
for (Map.Entry<WorkLogEntry, Path> item : WorkLog.iterableMap(crawlerLog, new CrawlDataLocator(root))) {
var entry = item.getKey();
var path = item.getValue();
final WorkLogEntry entry = item.getKey();
final Path inputPath = item.getValue();
logger.info("Converting {}", entry.id());
Path outputPath = inputPath;
heartbeat.progress("Migrating" + inputPath.getFileName(), entryIdx++, totalEntries);
if (path.toFile().getName().endsWith(".parquet")) {
if (inputPath.toString().endsWith(".parquet")) {
String domain = entry.id();
String id = Integer.toHexString(domain.hashCode());
Path outputFile = CrawlerOutputFile.createSlopPath(root, id, domain);
outputPath = CrawlerOutputFile.createSlopPath(root, id, domain);
SlopCrawlDataRecord.convertFromParquet(path, outputFile);
if (Files.exists(inputPath)) {
try {
SlopCrawlDataRecord.convertFromParquet(inputPath, outputPath);
Files.deleteIfExists(inputPath);
} catch (Exception ex) {
outputPath = inputPath; // don't update the work log on error
logger.error("Failed to convert " + inputPath, ex);
}
}
else if (!Files.exists(inputPath) && !Files.exists(outputPath)) {
// if the input file is missing, and the output file is missing, we just write the log
// record identical to the old one
outputPath = inputPath;
}
}
workLog.setJobToFinished(entry.id(), outputFile.toString(), entry.cnt());
}
else {
workLog.setJobToFinished(entry.id(), path.toString(), entry.cnt());
}
// Write a log entry for the (possibly) converted file
workLog.setJobToFinished(entry.id(), outputPath.toString(), entry.cnt());
}
}
Path oldCrawlerLog = Files.createTempFile(root, "crawler-", ".migrate.old.log");
Files.move(crawlerLog, oldCrawlerLog);
Files.move(crawlerLog, oldCrawlerLog, StandardCopyOption.REPLACE_EXISTING);
Files.move(newCrawlerLog, crawlerLog);
yield new End();

View File

@@ -5,9 +5,7 @@ import nu.marginalia.actor.state.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.*;
public abstract class RecordActorPrototype implements ActorPrototype {
@@ -118,7 +116,7 @@ public abstract class RecordActorPrototype implements ActorPrototype {
}
private String functionName(Class<? extends ActorStep> functionClass) {
return functionClass.getSimpleName().toUpperCase();
return ActorStep.functionName(functionClass);
}
private ActorStep constructState(String message) throws ReflectiveOperationException {
@@ -145,4 +143,43 @@ public abstract class RecordActorPrototype implements ActorPrototype {
}
}
/** Get a list of JSON prototypes for each actor step declared by this actor */
@SuppressWarnings("unchecked")
public Map<String, String> getMessagePrototypes() {
Map<String, String> messagePrototypes = new HashMap<>();
for (var clazz : getClass().getDeclaredClasses()) {
if (!clazz.isRecord() || !ActorStep.class.isAssignableFrom(clazz))
continue;
StringJoiner sj = new StringJoiner(",\n\t", "{\n\t", "\n}");
renderToJsonPrototype(sj, (Class<? extends Record>) clazz);
messagePrototypes.put(ActorStep.functionName((Class<? extends ActorStep>) clazz), sj.toString());
}
return messagePrototypes;
}
@SuppressWarnings("unchecked")
private void renderToJsonPrototype(StringJoiner sj, Class<? extends Record> recordType) {
for (var field : recordType.getDeclaredFields()) {
String typeName = field.getType().getSimpleName();
if ("List".equals(typeName)) {
sj.add(String.format("\"%s\": [ ]", field.getName()));
}
else if (field.getType().isRecord()) {
var innerSj = new StringJoiner(",", "{", "}");
renderToJsonPrototype(innerSj, (Class<? extends Record>) field.getType());
sj.add(String.format("\"%s\": %s", field.getName(), sj));
}
else {
sj.add(String.format("\"%s\": \"%s\"", field.getName(), typeName));
}
}
}
}

View File

@@ -1,3 +1,7 @@
package nu.marginalia.actor.state;
public interface ActorStep {}
public interface ActorStep {
static String functionName(Class<? extends ActorStep> type) {
return type.getSimpleName().toUpperCase();
}
}

View File

@@ -12,7 +12,7 @@ import nu.marginalia.converting.sideload.SideloadSourceFactory;
import nu.marginalia.converting.writer.ConverterBatchWritableIf;
import nu.marginalia.converting.writer.ConverterBatchWriter;
import nu.marginalia.converting.writer.ConverterWriter;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqapi.converting.ConvertRequest;
import nu.marginalia.process.ProcessConfiguration;
@@ -35,6 +35,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
@@ -203,16 +204,23 @@ public class ConverterMain extends ProcessMainClass {
logger.info("Processing small items");
// We separate the large and small domains to reduce the number of critical sections,
// as the large domains have a separate processing track that doesn't store everything
// in memory
final List<Path> bigTasks = new ArrayList<>();
// First process the small items
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
{
if (CrawledDomainReader.sizeHint(dataPath) >= SIDELOAD_THRESHOLD) {
if (SerializableCrawlDataStream.getSizeHint(dataPath) >= SIDELOAD_THRESHOLD) {
bigTasks.add(dataPath);
continue;
}
pool.submit(() -> {
try (var dataStream = CrawledDomainReader.createDataStream(dataPath)) {
try (var dataStream = SerializableCrawlDataStream.openDataStream(dataPath)) {
ConverterBatchWritableIf writable = processor.fullProcessing(dataStream) ;
converterWriter.accept(writable);
}
@@ -235,24 +243,28 @@ public class ConverterMain extends ProcessMainClass {
logger.info("Processing large items");
// Next the big items domain-by-domain
for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(),
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
{
int sizeHint = CrawledDomainReader.sizeHint(dataPath);
if (sizeHint < SIDELOAD_THRESHOLD) {
continue;
}
try (var hb = heartbeat.createAdHocTaskHeartbeat("Large Domains")) {
int bigTaskIdx = 0;
// Next the big items domain-by-domain
for (var dataPath : bigTasks) {
hb.progress(dataPath.toFile().getName(), bigTaskIdx++, bigTasks.size());
try (var dataStream = CrawledDomainReader.createDataStream(dataPath)) {
ConverterBatchWritableIf writable = processor.simpleProcessing(dataStream, sizeHint);
converterWriter.accept(writable);
}
catch (Exception ex) {
logger.info("Error in processing", ex);
}
finally {
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
try {
// SerializableCrawlDataStream is autocloseable, we can't try-with-resources because then it will be
// closed before it's consumed by the converterWriter. Instead, the converterWriter guarantees it
// will close it after it's consumed.
var stream = SerializableCrawlDataStream.openDataStream(dataPath);
ConverterBatchWritableIf writable = processor.simpleProcessing(stream, SerializableCrawlDataStream.getSizeHint(dataPath));
converterWriter.accept(writable);
}
catch (Exception ex) {
logger.info("Error in processing", ex);
}
finally {
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
}
}
}

View File

@@ -116,7 +116,7 @@ public class AdblockSimulator {
// Refrain from cleaning up this code, it's very hot code and needs to be fast.
// This version is about 100x faster than the a "clean" first stab implementation.
// This version is about 100x faster than a "clean" first stab implementation.
class RuleVisitor implements NodeFilter {
public boolean sawAds;

View File

@@ -23,7 +23,7 @@ public class DocumentGeneratorExtractor {
var tags = doc.select("meta[name=generator]");
if (tags.size() == 0) {
if (tags.isEmpty()) {
// Some sites have a comment in the head instead of a meta tag
return fingerprintServerTech(doc, responseHeaders);
}

View File

@@ -127,7 +127,7 @@ public class EncyclopediaMarginaliaNuSideloader implements SideloadSource, AutoC
}
fullHtml.append("</div></body></html>");
var doc = sideloaderProcessing
return sideloaderProcessing
.processDocument(fullUrl,
fullHtml.toString(),
List.of("encyclopedia", "wiki"),
@@ -137,8 +137,6 @@ public class EncyclopediaMarginaliaNuSideloader implements SideloadSource, AutoC
anchorTextKeywords.getAnchorTextKeywords(domainLinks, new EdgeUrl(fullUrl)),
LocalDate.now().getYear(),
10_000_000);
return doc;
}
private String normalizeUtf8(String url) {

View File

@@ -39,6 +39,9 @@ public class ConverterWriter implements AutoCloseable {
workerThread.start();
}
/** Queue and eventually write the domain into the converter journal
* The domain object will be closed after it's processed.
* */
public void accept(@Nullable ConverterBatchWritableIf domain) {
if (null == domain)
return;
@@ -72,15 +75,15 @@ public class ConverterWriter implements AutoCloseable {
if (workLog.isItemCommitted(id) || workLog.isItemInCurrentBatch(id)) {
logger.warn("Skipping already logged item {}", id);
}
else {
currentWriter.write(data);
workLog.logItem(id);
data.close();
continue;
}
currentWriter.write(data);
workLog.logItem(id);
switcher.tick();
data.close();
}
}
catch (Exception ex) {

View File

@@ -19,7 +19,6 @@ import nu.marginalia.crawl.retreival.DomainProber;
import nu.marginalia.crawl.warc.WarcArchiverFactory;
import nu.marginalia.crawl.warc.WarcArchiverIf;
import nu.marginalia.db.DomainBlacklist;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.CrawlerOutputFile;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.mq.MessageQueueFactory;
@@ -417,13 +416,13 @@ public class CrawlerMain extends ProcessMainClass {
try {
Path slopPath = CrawlerOutputFile.getSlopPath(outputDir, id, domain);
if (Files.exists(slopPath)) {
return new CrawlDataReference(CrawledDomainReader.createDataStream(slopPath));
return new CrawlDataReference(slopPath);
}
Path parquetPath = CrawlerOutputFile.getParquetPath(outputDir, id, domain);
if (Files.exists(parquetPath)) {
slopPath = migrateParquetData(parquetPath, domain, outputDir);
return new CrawlDataReference(CrawledDomainReader.createDataStream(slopPath));
return new CrawlDataReference(slopPath);
}
} catch (IOException e) {

View File

@@ -45,6 +45,7 @@ public class HttpFetcherImpl implements HttpFetcher {
private static final ContentTypeLogic contentTypeLogic = new ContentTypeLogic();
private final Duration requestTimeout = Duration.ofSeconds(10);
private final Duration probeTimeout = Duration.ofSeconds(30);
@Override
public void setAllowAllContentTypes(boolean allowAllContentTypes) {
@@ -107,23 +108,27 @@ public class HttpFetcherImpl implements HttpFetcher {
.HEAD()
.uri(url.asURI())
.header("User-agent", userAgentString)
.timeout(requestTimeout)
.timeout(probeTimeout)
.build();
} catch (URISyntaxException e) {
return new DomainProbeResult.Error(CrawlerDomainStatus.ERROR, "Invalid URL");
}
try {
var rsp = client.send(head, HttpResponse.BodyHandlers.discarding());
EdgeUrl rspUri = new EdgeUrl(rsp.uri());
for (int tries = 0;; tries++) {
try {
var rsp = client.send(head, HttpResponse.BodyHandlers.discarding());
EdgeUrl rspUri = new EdgeUrl(rsp.uri());
if (!Objects.equals(rspUri.domain, url.domain)) {
return new DomainProbeResult.Redirect(rspUri.domain);
if (!Objects.equals(rspUri.domain, url.domain)) {
return new DomainProbeResult.Redirect(rspUri.domain);
}
return new DomainProbeResult.Ok(rspUri);
} catch (Exception ex) {
if (tries > 3) {
return new DomainProbeResult.Error(CrawlerDomainStatus.ERROR, ex.getMessage());
}
// else try again ...
}
return new DomainProbeResult.Ok(rspUri);
}
catch (Exception ex) {
return new DomainProbeResult.Error(CrawlerDomainStatus.ERROR, ex.getMessage());
}
}
@@ -143,7 +148,7 @@ public class HttpFetcherImpl implements HttpFetcher {
var headBuilder = HttpRequest.newBuilder()
.HEAD()
.uri(url.asURI())
.header("User-agent", userAgentString)
.header("User-Agent", userAgentString)
.header("Accept-Encoding", "gzip")
.timeout(requestTimeout)
;
@@ -215,7 +220,7 @@ public class HttpFetcherImpl implements HttpFetcher {
var getBuilder = HttpRequest.newBuilder()
.GET()
.uri(url.asURI())
.header("User-agent", userAgentString)
.header("User-Agent", userAgentString)
.header("Accept-Encoding", "gzip")
.header("Accept-Language", "en,*;q=0.5")
.header("Accept", "text/html, application/xhtml+xml, text/*;q=0.8")
@@ -307,7 +312,7 @@ public class HttpFetcherImpl implements HttpFetcher {
.uri(sitemapUrl.asURI())
.header("Accept-Encoding", "gzip")
.header("Accept", "text/*, */*;q=0.9")
.header("User-agent", userAgentString)
.header("User-Agent", userAgentString)
.timeout(requestTimeout)
.build();
@@ -386,7 +391,7 @@ public class HttpFetcherImpl implements HttpFetcher {
.uri(url.asURI())
.header("Accept-Encoding", "gzip")
.header("Accept", "text/*, */*;q=0.9")
.header("User-agent", userAgentString)
.header("User-Agent", userAgentString)
.timeout(requestTimeout);
HttpFetchResult result = recorder.fetch(client, getRequest.build());

View File

@@ -4,6 +4,7 @@ import nu.marginalia.ContentTypes;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.lsh.EasyLSH;
import nu.marginalia.model.crawldata.CrawledDocument;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -11,51 +12,73 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
/** A reference to a domain that has been crawled before. */
public class CrawlDataReference implements AutoCloseable {
public class CrawlDataReference implements AutoCloseable, Iterable<CrawledDocument> {
private boolean closed = false;
@Nullable
private final Path path;
@Nullable
private SerializableCrawlDataStream data = null;
private final SerializableCrawlDataStream data;
private static final Logger logger = LoggerFactory.getLogger(CrawlDataReference.class);
public CrawlDataReference(SerializableCrawlDataStream data) {
this.data = data;
public CrawlDataReference(@Nullable Path path) {
this.path = path;
}
public CrawlDataReference() {
this(SerializableCrawlDataStream.empty());
this(null);
}
/** Delete the associated data from disk, if it exists */
public void delete() throws IOException {
Path filePath = data.path();
if (filePath != null) {
Files.deleteIfExists(filePath);
if (path != null) {
Files.deleteIfExists(path);
}
}
/** Get the next document from the crawl data,
* returning null when there are no more documents
* available
*/
@Nullable
public CrawledDocument nextDocument() {
try {
while (data.hasNext()) {
if (data.next() instanceof CrawledDocument doc) {
if (!ContentTypes.isAccepted(doc.contentType))
continue;
public @NotNull Iterator<CrawledDocument> iterator() {
return doc;
requireStream();
// Guaranteed by requireStream, but helps java
Objects.requireNonNull(data);
return data.map(next -> {
if (next instanceof CrawledDocument doc && ContentTypes.isAccepted(doc.contentType)) {
return Optional.of(doc);
}
else {
return Optional.empty();
}
});
}
/** After calling this method, data is guaranteed to be non-null */
private void requireStream() {
if (closed) {
throw new IllegalStateException("Use after close()");
}
if (data == null) {
try {
if (path != null) {
data = SerializableCrawlDataStream.openDataStream(path);
return;
}
}
}
catch (IOException ex) {
logger.error("Failed to read next document", ex);
}
catch (Exception ex) {
logger.error("Failed to open stream", ex);
}
return null;
data = SerializableCrawlDataStream.empty();
}
}
public static boolean isContentBodySame(byte[] one, byte[] other) {
@@ -98,7 +121,12 @@ public class CrawlDataReference implements AutoCloseable {
}
@Override
public void close() throws Exception {
data.close();
public void close() throws IOException {
if (!closed) {
if (data != null) {
data.close();
}
closed = true;
}
}
}

View File

@@ -89,30 +89,45 @@ public class CrawlerRetreiver implements AutoCloseable {
}
public int crawlDomain(DomainLinks domainLinks, CrawlDataReference oldCrawlData) {
try {
try (oldCrawlData) {
// Do an initial domain probe to determine the root URL
EdgeUrl rootUrl;
var probeResult = probeRootUrl();
switch (probeResult) {
return switch (probeResult) {
case HttpFetcher.DomainProbeResult.Ok(EdgeUrl probedUrl) -> {
rootUrl = probedUrl; // Good track
// Sleep after the initial probe, we don't have access to the robots.txt yet
// so we don't know the crawl delay
TimeUnit.SECONDS.sleep(1);
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(probedUrl.domain, warcRecorder);
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
delayTimer.waitFetchDelay(0); // initial delay after robots.txt
DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(probedUrl, delayTimer);
domainStateDb.save(summaryRecord);
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) {
// If we have reference data, we will always grow the crawl depth a bit
crawlFrontier.increaseDepth(1.5, 2500);
}
oldCrawlData.close(); // proactively close the crawl data reference here to not hold onto expensive resources
yield crawlDomain(probedUrl, robotsRules, delayTimer, domainLinks);
}
case HttpFetcher.DomainProbeResult.Redirect(EdgeDomain domain1) -> {
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, "Redirect", domain1.toString()));
return 1;
yield 1;
}
case HttpFetcher.DomainProbeResult.Error(CrawlerDomainStatus status, String desc) -> {
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, status.toString(), desc));
return 1;
yield 1;
}
}
};
// Sleep after the initial probe, we don't have access to the robots.txt yet
// so we don't know the crawl delay
TimeUnit.SECONDS.sleep(1);
return crawlDomain(oldCrawlData, rootUrl, domainLinks);
}
catch (Exception ex) {
logger.error("Error crawling domain {}", domain, ex);
@@ -120,28 +135,15 @@ public class CrawlerRetreiver implements AutoCloseable {
}
}
private int crawlDomain(CrawlDataReference oldCrawlData,
EdgeUrl rootUrl,
DomainLinks domainLinks) throws InterruptedException {
private int crawlDomain(EdgeUrl rootUrl,
SimpleRobotRules robotsRules,
CrawlDelayTimer delayTimer,
DomainLinks domainLinks) {
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(rootUrl.domain, warcRecorder);
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
delayTimer.waitFetchDelay(0); // initial delay after robots.txt
DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(rootUrl, delayTimer);
domainStateDb.save(summaryRecord);
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) {
// If we have reference data, we will always grow the crawl depth a bit
crawlFrontier.increaseDepth(1.5, 2500);
}
// Add external links to the crawl frontier
crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto));
// Fetch sitemaps
for (var sitemap : robotsRules.getSitemaps()) {
crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer));

View File

@@ -40,18 +40,12 @@ public class CrawlerRevisitor {
int errors = 0;
int skipped = 0;
for (;;) {
for (CrawledDocument doc : oldCrawlData) {
if (errors > 20) {
// If we've had too many errors, we'll stop trying to recrawl
break;
}
CrawledDocument doc = oldCrawlData.nextDocument();
if (doc == null)
break;
// This Shouldn't Happen (TM)
var urlMaybe = EdgeUrl.parse(doc.url);
if (urlMaybe.isEmpty())
continue;

View File

@@ -1,53 +0,0 @@
package nu.marginalia.io;
import nu.marginalia.io.crawldata.format.ParquetSerializableCrawlDataStream;
import nu.marginalia.io.crawldata.format.SlopSerializableCrawlDataStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
public class CrawledDomainReader {
private static final Logger logger = LoggerFactory.getLogger(CrawledDomainReader.class);
/** An iterator-like access to domain data This must be closed otherwise it will leak off-heap memory! */
public static SerializableCrawlDataStream createDataStream(Path fullPath) throws IOException
{
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".parquet")) {
try {
return new ParquetSerializableCrawlDataStream(fullPath);
} catch (Exception ex) {
logger.error("Error reading domain data from " + fullPath, ex);
return SerializableCrawlDataStream.empty();
}
}
if (fileName.endsWith(".slop.zip")) {
try {
return new SlopSerializableCrawlDataStream(fullPath);
} catch (Exception ex) {
logger.error("Error reading domain data from " + fullPath, ex);
return SerializableCrawlDataStream.empty();
}
}
logger.error("Unknown file type: {}", fullPath);
return SerializableCrawlDataStream.empty();
}
public static int sizeHint(Path fullPath) {
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".parquet")) {
return ParquetSerializableCrawlDataStream.sizeHint(fullPath);
}
else if (fileName.endsWith(".slop.zip")) {
return SlopSerializableCrawlDataStream.sizeHint(fullPath);
}
else {
return 0;
}
}
}

View File

@@ -1,5 +1,7 @@
package nu.marginalia.io;
import nu.marginalia.io.crawldata.format.ParquetSerializableCrawlDataStream;
import nu.marginalia.io.crawldata.format.SlopSerializableCrawlDataStream;
import nu.marginalia.model.crawldata.CrawledDocument;
import nu.marginalia.model.crawldata.CrawledDomain;
import nu.marginalia.model.crawldata.SerializableCrawlData;
@@ -18,7 +20,6 @@ import java.util.function.Function;
/** Closable iterator exceptional over serialized crawl data
* The data may appear in any order, and the iterator must be closed.
*
* @see CrawledDomainReader
* */
public interface SerializableCrawlDataStream extends AutoCloseable {
Logger logger = LoggerFactory.getLogger(SerializableCrawlDataStream.class);
@@ -27,7 +28,7 @@ public interface SerializableCrawlDataStream extends AutoCloseable {
/** Return a size hint for the stream. 0 is returned if the hint is not available,
* or if the file is seemed too small to bother */
default int sizeHint() { return 0; }
default int getSizeHint() { return 0; }
boolean hasNext() throws IOException;
@@ -36,6 +37,49 @@ public interface SerializableCrawlDataStream extends AutoCloseable {
void close() throws IOException;
/** An iterator-like access to domain data This must be closed otherwise it will leak off-heap memory! */
static SerializableCrawlDataStream openDataStream(Path fullPath) throws IOException
{
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".parquet")) {
try {
return new ParquetSerializableCrawlDataStream(fullPath);
} catch (Exception ex) {
logger.error("Error reading domain data from " + fullPath, ex);
return SerializableCrawlDataStream.empty();
}
}
if (fileName.endsWith(".slop.zip")) {
try {
return new SlopSerializableCrawlDataStream(fullPath);
} catch (Exception ex) {
logger.error("Error reading domain data from " + fullPath, ex);
return SerializableCrawlDataStream.empty();
}
}
logger.error("Unknown file type: {}", fullPath);
return SerializableCrawlDataStream.empty();
}
/** Get an idication of the size of the stream. This is used to determine whether to
* load the stream into memory or not. 0 is returned if the hint is not available,
* or if the file is seemed too small to bother */
static int getSizeHint(Path fullPath) {
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".parquet")) {
return ParquetSerializableCrawlDataStream.sizeHint(fullPath);
}
else if (fileName.endsWith(".slop.zip")) {
return SlopSerializableCrawlDataStream.sizeHint(fullPath);
}
else {
return 0;
}
}
default <T> Iterator<T> map(Function<SerializableCrawlData, Optional<T>> mapper) {
return new Iterator<>() {
T next = null;

View File

@@ -108,15 +108,17 @@ public record SlopCrawlDataRecord(String domain,
public static void convertFromParquet(Path parquetInput, Path slopOutput) throws IOException {
Path tempDir = Files.createTempDirectory(slopOutput.getParent(), "conversion");
try (var writer = new Writer(tempDir)) {
CrawledDocumentParquetRecordFileReader.stream(parquetInput).forEach(
parquetRecord -> {
try {
writer.write(new SlopCrawlDataRecord(parquetRecord));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
try (var writer = new Writer(tempDir);
var stream = CrawledDocumentParquetRecordFileReader.stream(parquetInput))
{
stream.forEach(
parquetRecord -> {
try {
writer.write(new SlopCrawlDataRecord(parquetRecord));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
catch (IOException ex) {
FileUtils.deleteDirectory(tempDir.toFile());

View File

@@ -10,7 +10,6 @@ import nu.marginalia.crawl.fetcher.HttpFetcher;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.retreival.*;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
@@ -227,7 +226,7 @@ class CrawlerRetreiverTest {
convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
while (stream.hasNext()) {
if (stream.next() instanceof CrawledDocument doc) {
data.add(doc);
@@ -280,7 +279,7 @@ class CrawlerRetreiverTest {
convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
while (stream.hasNext()) {
if (stream.next() instanceof CrawledDocument doc) {
data.add(doc);
@@ -329,7 +328,7 @@ class CrawlerRetreiverTest {
doCrawl(tempFileWarc1, specs);
convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
while (stream.hasNext()) {
if (stream.next() instanceof CrawledDocument doc) {
data.add(doc);
@@ -376,7 +375,7 @@ class CrawlerRetreiverTest {
doCrawl(tempFileWarc1, specs);
convertToParquet(tempFileWarc1, tempFileParquet1);
doCrawlWithReferenceStream(specs,
CrawledDomainReader.createDataStream(tempFileParquet1)
new CrawlDataReference(tempFileParquet1)
);
convertToParquet(tempFileWarc2, tempFileParquet2);
@@ -397,7 +396,7 @@ class CrawlerRetreiverTest {
});
}
try (var ds = CrawledDomainReader.createDataStream(tempFileParquet2)) {
try (var ds = SerializableCrawlDataStream.openDataStream(tempFileParquet2)) {
while (ds.hasNext()) {
var doc = ds.next();
if (doc instanceof CrawledDomain dr) {
@@ -439,7 +438,7 @@ class CrawlerRetreiverTest {
convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) {
while (stream.hasNext()) {
var doc = stream.next();
data.computeIfAbsent(doc.getClass(), c -> new ArrayList<>()).add(doc);
@@ -448,11 +447,9 @@ class CrawlerRetreiverTest {
throw new RuntimeException(e);
}
var stream = CrawledDomainReader.createDataStream(tempFileParquet1);
System.out.println("---");
doCrawlWithReferenceStream(specs, stream);
doCrawlWithReferenceStream(specs, new CrawlDataReference(tempFileParquet1));
var revisitCrawlFrontier = new DomainCrawlFrontier(
new EdgeDomain("www.marginalia.nu"),
@@ -488,7 +485,7 @@ class CrawlerRetreiverTest {
});
}
try (var ds = CrawledDomainReader.createDataStream(tempFileParquet2)) {
try (var ds = SerializableCrawlDataStream.openDataStream(tempFileParquet2)) {
while (ds.hasNext()) {
var doc = ds.next();
if (doc instanceof CrawledDomain dr) {
@@ -509,12 +506,11 @@ class CrawlerRetreiverTest {
}
}
private void doCrawlWithReferenceStream(CrawlerMain.CrawlSpecRecord specs, SerializableCrawlDataStream stream) {
private void doCrawlWithReferenceStream(CrawlerMain.CrawlSpecRecord specs, CrawlDataReference reference) {
try (var recorder = new WarcRecorder(tempFileWarc2, new Cookies());
var db = new DomainStateDb(tempFileDb)
) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, db, recorder).crawlDomain(new DomainLinks(),
new CrawlDataReference(stream));
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, db, recorder).crawlDomain(new DomainLinks(), reference);
}
catch (IOException | SQLException ex) {
Assertions.fail(ex);

View File

@@ -3,7 +3,6 @@ package nu.marginalia.extractor;
import com.google.inject.Inject;
import gnu.trove.set.hash.TLongHashSet;
import nu.marginalia.hash.MurmurHash3_128;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.model.EdgeDomain;
@@ -59,7 +58,7 @@ public class AtagExporter implements ExporterIf {
}
Path crawlDataPath = inputDir.resolve(item.relPath());
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
exportLinks(tagWriter, stream);
}
catch (Exception ex) {

View File

@@ -1,7 +1,6 @@
package nu.marginalia.extractor;
import com.google.inject.Inject;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.link_parser.FeedExtractor;
import nu.marginalia.link_parser.LinkParser;
@@ -56,7 +55,7 @@ public class FeedExporter implements ExporterIf {
}
Path crawlDataPath = inputDir.resolve(item.relPath());
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
exportFeeds(tagWriter, stream);
}
catch (Exception ex) {
@@ -75,7 +74,7 @@ public class FeedExporter implements ExporterIf {
private boolean exportFeeds(FeedCsvWriter exporter, SerializableCrawlDataStream stream) throws IOException, URISyntaxException {
FeedExtractor feedExtractor = new FeedExtractor(new LinkParser());
int size = stream.sizeHint();
int size = stream.getSizeHint();
while (stream.hasNext()) {
if (!(stream.next() instanceof CrawledDocument doc))

View File

@@ -5,7 +5,7 @@ import gnu.trove.map.hash.TLongIntHashMap;
import gnu.trove.set.hash.TLongHashSet;
import nu.marginalia.WmsaHome;
import nu.marginalia.converting.processor.logic.dom.DomPruningFilter;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.language.filter.LanguageFilter;
import nu.marginalia.language.model.DocumentLanguageData;
import nu.marginalia.language.sentence.SentenceExtractor;
@@ -103,7 +103,7 @@ public class TermFrequencyExporter implements ExporterIf {
{
TLongHashSet words = new TLongHashSet(1000);
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
while (stream.hasNext()) {
if (Thread.interrupted())
return;

View File

@@ -9,7 +9,7 @@
<span>
Access logs containing IP-addresses are retained for up to 24 hours,
anonymized logs with source addresses removed are sometimes kept longer
for to help diagnosing bugs.
to help diagnose bugs.
</span>
</div>
<div class="flex space-y-4 flex-col">
@@ -33,4 +33,4 @@
</span>
</div>
</footer>
</footer>

View File

@@ -3,7 +3,7 @@ package nu.marginalia.tools;
import com.google.inject.Guice;
import com.google.inject.Injector;
import nu.marginalia.converting.ConverterModule;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.module.DatabaseModule;
@@ -40,7 +40,7 @@ public class ExperimentRunnerMain {
Path basePath = Path.of(args[0]);
for (var item : WorkLog.iterable(basePath.resolve("crawler.log"))) {
Path crawlDataPath = basePath.resolve(item.relPath());
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) {
experiment.process(stream);
}
catch (Exception ex) {

View File

@@ -26,7 +26,7 @@ import nu.marginalia.index.index.StatefulIndex;
import nu.marginalia.index.journal.IndexJournal;
import nu.marginalia.index.model.SearchParameters;
import nu.marginalia.index.searchset.SearchSetAny;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.linkdb.docs.DocumentDbReader;
import nu.marginalia.linkdb.docs.DocumentDbWriter;
import nu.marginalia.loading.LoaderIndexJournalWriter;
@@ -152,7 +152,7 @@ public class IntegrationTest {
/** PROCESS CRAWL DATA */
var processedDomain = domainProcessor.fullProcessing(CrawledDomainReader.createDataStream(crawlDataParquet));
var processedDomain = domainProcessor.fullProcessing(SerializableCrawlDataStream.openDataStream(crawlDataParquet));
System.out.println(processedDomain);

View File

@@ -16,8 +16,6 @@ platforms, but for lack of suitable hardware, this can not be guaranteed.
The civilized way of installing this is to use [SDKMAN](https://sdkman.io/);
graalce is a good distribution choice but it doesn't matter too much.
**Tailwindcss** - Install NPM and run `npm install tailwindcss @tailwindcss/cli`
## Quick Set up
[https://docs.marginalia.nu/](https://docs.marginalia.nu/) has a more comprehensive guide for the install

View File

@@ -74,3 +74,7 @@ download_model model/tfreq-new-algo3.bin https://huggingface.co/MarginaliaNu/Mar
download_model model/lid.176.ftz https://huggingface.co/MarginaliaNu/MarginaliaModelData/resolve/c9339e4224f1dfad7f628809c32687e748198ae3/lid.176.ftz?download=true 340156704bb8c8e50c4abf35a7ec2569
popd
pushd $(dirname $0)/..
npm install -D tailwindcss@3
popd

View File

@@ -234,7 +234,7 @@ dependencyResolutionManagement {
library('jetty-util','org.eclipse.jetty','jetty-util').version('9.4.54.v20240208')
library('jetty-servlet','org.eclipse.jetty','jetty-servlet').version('9.4.54.v20240208')
library('slop', 'nu.marginalia', 'slop').version('0.0.9-org-5-SNAPSHOT')
library('slop', 'nu.marginalia', 'slop').version('0.0.10-SNAPSHOT')
library('jooby-netty','io.jooby','jooby-netty').version(joobyVersion)
library('jooby-jte','io.jooby','jooby-jte').version(joobyVersion)
library('jooby-apt','io.jooby','jooby-apt').version(joobyVersion)