1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

4 Commits

Author SHA1 Message Date
Viktor Lofgren
668f3b16ef (search) Redirect ^/site/$ to /site 2025-01-22 13:35:18 +01:00
Viktor Lofgren
98a340a0d1 (crawler) Add favicon data to domain state db in its own table 2025-01-22 11:41:20 +01:00
Viktor Lofgren
8862100f7e (crawler) Improve logging and error handling 2025-01-21 21:44:21 +01:00
Viktor Lofgren
274941f6de (crawler) Smarter parquet->slop crawl data migration 2025-01-21 21:26:12 +01:00
10 changed files with 117 additions and 133 deletions

View File

@@ -28,13 +28,11 @@ import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.process.log.WorkLogEntry;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.slop.SlopCrawlDataRecord;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.util.SimpleBlockingThreadPool;
import org.apache.logging.log4j.util.Strings;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,11 +42,13 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.security.Security;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import static nu.marginalia.mqapi.ProcessInboxNames.CRAWLER_INBOX;
@@ -182,8 +182,6 @@ public class CrawlerMain extends ProcessMainClass {
// Assign any domains with node_affinity=0 to this node, and then fetch all domains assigned to this node
// to be crawled.
performMigration(outputDir);
try (var conn = dataSource.getConnection()) {
try (var assignFreeDomains = conn.prepareStatement(
"""
@@ -417,11 +415,22 @@ public class CrawlerMain extends ProcessMainClass {
private CrawlDataReference getReference() {
try {
return new CrawlDataReference(CrawledDomainReader.createDataStream(outputDir, domain, id));
Path slopPath = CrawlerOutputFile.getSlopPath(outputDir, id, domain);
if (Files.exists(slopPath)) {
return new CrawlDataReference(CrawledDomainReader.createDataStream(slopPath));
}
Path parquetPath = CrawlerOutputFile.getParquetPath(outputDir, id, domain);
if (Files.exists(parquetPath)) {
slopPath = migrateParquetData(parquetPath, domain, outputDir);
return new CrawlDataReference(CrawledDomainReader.createDataStream(slopPath));
}
} catch (IOException e) {
logger.debug("Failed to read previous crawl data for {}", specification.domain());
return new CrawlDataReference();
}
return new CrawlDataReference();
}
}
@@ -482,92 +491,19 @@ public class CrawlerMain extends ProcessMainClass {
}
}
// Data migration logic
private void performMigration(Path root) throws IOException {
Path crawlerLog = root.resolve("crawler.log");
Path newCrawlerLog = Files.createTempFile(root, "crawler", ".migrate.log");
int finishedTasks = 0;
int totalTasks;
try (var oldLog = new WorkLog(crawlerLog)) {
totalTasks = oldLog.countFinishedJobs();
// Migrate from parquet to slop if necessary
//
// This must be synchronized as chewing through parquet files in parallel leads to enormous memory overhead
private synchronized Path migrateParquetData(Path inputPath, String domain, Path crawlDataRoot) throws IOException {
if (!inputPath.endsWith(".parquet")) {
return inputPath;
}
try (WorkLog workLog = new WorkLog(newCrawlerLog);
var migrationHeartbeat = heartbeat.createAdHocTaskHeartbeat("MIGRATING")) {
Path outputFile = CrawlerOutputFile.createSlopPath(crawlDataRoot, Integer.toHexString(domain.hashCode()), domain);
SlopCrawlDataRecord.convertFromParquet(inputPath, outputFile);
for (Map.Entry<WorkLogEntry, Path> item : WorkLog.iterableMap(crawlerLog, new CrawlDataLocator(root))) {
var entry = item.getKey();
var path = item.getValue();
if (path.toFile().getName().endsWith(".parquet")) {
logger.info("Converting {}", entry.id());
String domain = entry.id();
String id = Integer.toHexString(domain.hashCode());
Path outputFile = CrawlerOutputFile.createSlopPath(root, id, domain);
SlopCrawlDataRecord.convertFromParquet(path, outputFile);
workLog.setJobToFinished(entry.id(), outputFile.toString(), entry.cnt());
}
else {
workLog.setJobToFinished(entry.id(), path.toString(), entry.cnt());
}
migrationHeartbeat.progress("Parquet To Slop", ++finishedTasks, totalTasks);
}
}
Path oldCrawlerLog = Files.createTempFile(root, "crawler-", ".migrate.old.log");
Files.move(crawlerLog, oldCrawlerLog, StandardCopyOption.REPLACE_EXISTING);
Files.move(newCrawlerLog, crawlerLog);
}
private static class CrawlDataLocator implements Function<WorkLogEntry, Optional<Map.Entry<WorkLogEntry, Path>>> {
private final Path crawlRootDir;
CrawlDataLocator(Path crawlRootDir) {
this.crawlRootDir = crawlRootDir;
}
@Override
public Optional<Map.Entry<WorkLogEntry, Path>> apply(WorkLogEntry entry) {
var path = getCrawledFilePath(crawlRootDir, entry.path());
if (!Files.exists(path)) {
return Optional.empty();
}
try {
return Optional.of(Map.entry(entry, path));
}
catch (Exception ex) {
return Optional.empty();
}
}
private Path getCrawledFilePath(Path crawlDir, String fileName) {
int sp = fileName.lastIndexOf('/');
// Normalize the filename
if (sp >= 0 && sp + 1< fileName.length())
fileName = fileName.substring(sp + 1);
if (fileName.length() < 4)
fileName = Strings.repeat("0", 4 - fileName.length()) + fileName;
String sp1 = fileName.substring(0, 2);
String sp2 = fileName.substring(2, 4);
return crawlDir.resolve(sp1).resolve(sp2).resolve(fileName);
}
return outputFile;
}
}

View File

@@ -60,6 +60,8 @@ public class DomainStateDb implements AutoCloseable {
}
public record FaviconRecord(String contentType, byte[] imageData) {}
public DomainStateDb(Path filename) throws SQLException {
String sqliteDbString = "jdbc:sqlite:" + filename.toString();
connection = DriverManager.getConnection(sqliteDbString);
@@ -74,7 +76,13 @@ public class DomainStateDb implements AutoCloseable {
feedUrl TEXT
)
""");
stmt.executeUpdate("""
CREATE TABLE IF NOT EXISTS favicon (
domain TEXT PRIMARY KEY,
contentType TEXT NOT NULL,
icon BLOB NOT NULL
)
""");
stmt.execute("PRAGMA journal_mode=WAL");
}
}
@@ -85,6 +93,41 @@ public class DomainStateDb implements AutoCloseable {
}
public void saveIcon(String domain, FaviconRecord faviconRecord) {
try (var stmt = connection.prepareStatement("""
INSERT OR REPLACE INTO favicon (domain, contentType, icon)
VALUES(?, ?, ?)
""")) {
stmt.setString(1, domain);
stmt.setString(2, faviconRecord.contentType);
stmt.setBytes(3, faviconRecord.imageData);
stmt.executeUpdate();
}
catch (SQLException ex) {
logger.error("Failed to insert favicon", ex);
}
}
public Optional<FaviconRecord> getIcon(String domain) {
try (var stmt = connection.prepareStatement("SELECT contentType, icon FROM favicon WHERE DOMAIN = ?")) {
stmt.setString(1, domain);
var rs = stmt.executeQuery();
if (rs.next()) {
return Optional.of(
new FaviconRecord(
rs.getString("contentType"),
rs.getBytes("icon")
)
);
}
} catch (SQLException e) {
logger.error("Failed to retrieve favicon", e);
}
return Optional.empty();
}
public void save(SummaryRecord record) {
try (var stmt = connection.prepareStatement("""
INSERT OR REPLACE INTO summary (domain, lastUpdatedEpochMs, state, stateDesc, feedUrl)

View File

@@ -23,12 +23,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpTimeoutException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.Executors;

View File

@@ -96,7 +96,7 @@ public class WarcRecorder implements AutoCloseable {
try {
response = client.send(request, java.net.http.HttpResponse.BodyHandlers.ofInputStream());
}
catch (IOException ex) {
catch (Exception ex) {
logger.warn("Failed to fetch URL {}: {}", requestUri, ex.getMessage());
return new HttpFetchResult.ResultException(ex);
}

View File

@@ -19,7 +19,6 @@ import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.body.DocumentBodyExtractor;
import nu.marginalia.model.body.HttpFetchResult;
import nu.marginalia.model.crawldata.CrawlerDomainStatus;
import org.jsoup.Jsoup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -273,7 +272,16 @@ public class CrawlerRetreiver implements AutoCloseable {
feedLink.ifPresent(s -> fetcher.fetchSitemapUrls(s, timer));
// Grab the favicon if it exists
fetchWithRetry(faviconUrl, timer, HttpFetcher.ProbeType.DISABLED, ContentTags.empty());
if (fetchWithRetry(faviconUrl, timer, HttpFetcher.ProbeType.DISABLED, ContentTags.empty()) instanceof HttpFetchResult.ResultOk iconResult) {
String contentType = iconResult.header("Content-Type");
byte[] iconData = iconResult.getBodyBytes();
domainStateDb.saveIcon(
domain,
new DomainStateDb.FaviconRecord(contentType, iconData)
);
}
timer.waitFetchDelay(0);
}

View File

@@ -5,9 +5,7 @@ import nu.marginalia.io.crawldata.format.SlopSerializableCrawlDataStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
public class CrawledDomainReader {
@@ -26,7 +24,8 @@ public class CrawledDomainReader {
return SerializableCrawlDataStream.empty();
}
}
else if (fileName.endsWith(".slop.zip")) {
if (fileName.endsWith(".slop.zip")) {
try {
return new SlopSerializableCrawlDataStream(fullPath);
} catch (Exception ex) {
@@ -34,22 +33,9 @@ public class CrawledDomainReader {
return SerializableCrawlDataStream.empty();
}
}
else {
logger.error("Unknown file type: {}", fullPath);
return SerializableCrawlDataStream.empty();
}
}
/** An iterator-like access to domain data. This must be closed otherwise it will leak off-heap memory! */
public static SerializableCrawlDataStream createDataStream(Path basePath, String domain, String id) throws IOException {
Path parquetPath = CrawlerOutputFile.getParquetPath(basePath, id, domain);
if (Files.exists(parquetPath)) {
return createDataStream(parquetPath);
}
else {
throw new FileNotFoundException("No such file: " + parquetPath);
}
}
}

View File

@@ -35,19 +35,6 @@ public class CrawlerOutputFile {
return destDir.resolve(id + "-" + filesystemSafeName(domain) + "-" + version.suffix + ".warc.gz");
}
public static Path createParquetPath(Path basePath, String id, String domain) throws IOException {
id = padId(id);
String first = id.substring(0, 2);
String second = id.substring(2, 4);
Path destDir = basePath.resolve(first).resolve(second);
if (!Files.exists(destDir)) {
Files.createDirectories(destDir);
}
return destDir.resolve(id + "-" + filesystemSafeName(domain) + ".parquet");
}
public static Path createSlopPath(Path basePath, String id, String domain) throws IOException {
id = padId(id);
@@ -71,16 +58,17 @@ public class CrawlerOutputFile {
return destDir.resolve(id + "-" + filesystemSafeName(domain) + ".parquet");
}
public static Path getWarcPath(Path basePath, String id, String domain, WarcFileVersion version) {
public static Path getSlopPath(Path basePath, String id, String domain) {
id = padId(id);
String first = id.substring(0, 2);
String second = id.substring(2, 4);
Path destDir = basePath.resolve(first).resolve(second);
return destDir.resolve(id + "-" + filesystemSafeName(domain) + ".warc" + version.suffix);
return destDir.resolve(id + "-" + filesystemSafeName(domain) + ".slop.zip");
}
/**
* Pads the given ID with leading zeros to ensure it has a length of 4 characters.
*/

View File

@@ -12,6 +12,7 @@ import java.io.InputStream;
import java.net.InetAddress;
import java.net.URI;
import java.net.http.HttpHeaders;
import java.util.Arrays;
import java.util.Optional;
/* FIXME: This interface has a very unfortunate name that is not very descriptive.
@@ -58,7 +59,7 @@ public sealed interface HttpFetchResult {
int statusCode,
HttpHeaders headers,
String ipAddress,
byte[] bytesRaw,
byte[] bytesRaw, // raw data for the entire response including headers
int bytesStart,
int bytesLength
) implements HttpFetchResult {
@@ -75,6 +76,12 @@ public sealed interface HttpFetchResult {
return new ByteArrayInputStream(bytesRaw, bytesStart, bytesLength);
}
/** Copy the byte range corresponding to the payload of the response,
Warning: Copies the data, use getInputStream() for zero copy access */
public byte[] getBodyBytes() {
return Arrays.copyOfRange(bytesRaw, bytesStart, bytesStart + bytesLength);
}
public Optional<Document> parseDocument() {
return DocumentBodyExtractor.asString(this).flatMapOpt((contentType, body) -> {
if (contentType.is("text/html")) {

View File

@@ -10,7 +10,7 @@ import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Instant;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.*;
class DomainStateDbTest {
@@ -26,7 +26,7 @@ class DomainStateDbTest {
}
@Test
public void testSunnyDay() throws SQLException {
public void testSummaryRecord() throws SQLException {
try (var db = new DomainStateDb(tempFile)) {
var allFields = new DomainStateDb.SummaryRecord(
"all.marginalia.nu",
@@ -63,4 +63,21 @@ class DomainStateDbTest {
}
}
@Test
public void testFavicon() throws SQLException {
try (var db = new DomainStateDb(tempFile)) {
db.saveIcon("www.marginalia.nu", new DomainStateDb.FaviconRecord("text/plain", "hello world".getBytes()));
var maybeData = db.getIcon("www.marginalia.nu");
assertTrue(maybeData.isPresent());
var actualData = maybeData.get();
assertEquals("text/plain", actualData.contentType());
assertArrayEquals("hello world".getBytes(), actualData.imageData());
maybeData = db.getIcon("foobar");
assertTrue(maybeData.isEmpty());
}
}
}

View File

@@ -140,7 +140,8 @@ public class SearchSiteInfoService {
) throws SQLException, ExecutionException {
if (null == domainName || domainName.isBlank()) {
return null;
// If we don't get a domain name, we redirect to the /site endpoint
return new MapModelAndView("redirect.jte", Map.of("url", "/site"));
}
page = Objects.requireNonNullElse(page, 1);