1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

20 Commits

Author SHA1 Message Date
Viktor Lofgren
895cee7004 (crawler) Improved feed discovery, new domain state db per crawlset
Feed discover is improved with by probing a few likely endpoints when no feed link tag is provided.  To store the feed URLs, a sqlite database is added to each crawlset that stores a simple summary of the crawl job, including any feed URLs that have been discovered.

Solves issue #135
2024-12-26 15:05:52 +01:00
Viktor Lofgren
4bb71b8439 (crawler) Correct content type probing to only run on URLs that are suspected to be binary 2024-12-26 14:26:23 +01:00
Viktor Lofgren
e4a41f7dd1 (crawler) Correct content type probing to only run on URLs that are suspected to be binary 2024-12-26 14:13:17 +01:00
Viktor
69ad6287b1 Update ROADMAP.md 2024-12-25 21:16:38 +00:00
Viktor Lofgren
41a59dcf45 (feed) Sanitize illegal HTML entities out of the feed XML before parsing 2024-12-25 14:53:28 +01:00
Viktor Lofgren
94d4d2edb7 (live-crawler) Add refresh date to feeds API
For now this is just the ctime for the feeds db.  We may want to store this per-record in the future.
2024-12-25 14:20:48 +01:00
Viktor Lofgren
7ae19a92ba (deploy) Improve deployment script to allow specification of partitions 2024-12-24 11:16:15 +01:00
Viktor Lofgren
56d14e56d7 (live-crawler) Improve LiveCrawlActor resilience to FeedService outages 2024-12-23 23:33:54 +01:00
Viktor Lofgren
a557c7ae7f (live-crawler) Limit concurrent accesses per domain using DomainLocks from main crawler 2024-12-23 23:31:03 +01:00
Viktor Lofgren
b66879ccb1 (feed) Add support for date discovery through atom:issued and atom:created
This is specifically to help parse monadnock.net's Atom feed.
2024-12-23 20:05:58 +01:00
Viktor Lofgren
f1b7157ca2 (deploy) Add basic linting ability to deployment script. 2024-12-23 16:21:29 +01:00
Viktor Lofgren
7622335e84 (deploy) Correct deploy script, set correct name for assistant 2024-12-23 15:59:02 +01:00
Viktor Lofgren
0da2047eae (live-capture) Correctly update processed count, disable poll rate adjustment based on freshness. 2024-12-23 15:56:27 +01:00
Viktor Lofgren
5ee4321110 (ci) Correct deploy script 2024-12-22 20:08:37 +01:00
Viktor Lofgren
9459b9933b (ci) Correct deploy script 2024-12-22 19:40:32 +01:00
Viktor Lofgren
87fb564f89 (ci) Add script for automatic deployment based on git tags 2024-12-22 19:24:54 +01:00
Viktor Lofgren
5ca8523220 (math) Reduce log error spam from null unit conversions 2024-12-21 18:51:45 +01:00
Viktor Lofgren
1118657ffd (system) Supply local IP to service discovery if multiFace is enabled 2024-12-19 22:20:19 +01:00
Viktor Lofgren
b1f970152d (system) To support configurations with multiple docker networks, bind to the "most local" interface.
Make the behavior optional.
2024-12-19 20:26:31 +01:00
Viktor Lofgren
e1783891ab (system) To support configurations with multiple docker networks, bind to the "most local" interface. 2024-12-19 20:18:57 +01:00
29 changed files with 937 additions and 124 deletions

View File

@@ -21,7 +21,7 @@ word n-grams known beforehand. This limits the ability to interpret longer quer
The positions mask should be supplemented or replaced with a more accurate (e.g.) gamma coded positions The positions mask should be supplemented or replaced with a more accurate (e.g.) gamma coded positions
list, as is the civilized way of doing this. list, as is the civilized way of doing this.
Completed with PR https://github.com/MarginaliaSearch/MarginaliaSearch/pull/99 Completed with PR [#99](https://github.com/MarginaliaSearch/MarginaliaSearch/pull/99)
## Hybridize crawler w/ Common Crawl data ## Hybridize crawler w/ Common Crawl data
@@ -41,6 +41,12 @@ The search engine has a bit of a problem showing spicy content mixed in with the
to have a way to filter this out. It's likely something like a URL blacklist (e.g. [UT1](https://dsi.ut-capitole.fr/blacklists/index_en.php) ) to have a way to filter this out. It's likely something like a URL blacklist (e.g. [UT1](https://dsi.ut-capitole.fr/blacklists/index_en.php) )
combined with naive bayesian filter would go a long way, or something more sophisticated...? combined with naive bayesian filter would go a long way, or something more sophisticated...?
## Web Design Overhaul
The design is kinda clunky and hard to maintain, and needlessly outdated-looking.
In progress: PR [#127](https://github.com/MarginaliaSearch/MarginaliaSearch/pull/127) -- demo available at https://test.marginalia.nu/
## Additional Language Support ## Additional Language Support
It would be desirable if the search engine supported more languages than English. This is partially about It would be desirable if the search engine supported more languages than English. This is partially about
@@ -56,7 +62,7 @@ it should be extended to all domains. It would also be interesting to offer sea
RSS data itself, or use the RSS set to feed a special live index that updates faster than the RSS data itself, or use the RSS set to feed a special live index that updates faster than the
main dataset. main dataset.
Completed with PR [#122](https://github.com/MarginaliaSearch/MarginaliaSearch/pull/122) Completed with PR [#122](https://github.com/MarginaliaSearch/MarginaliaSearch/pull/122) and PR [#125](https://github.com/MarginaliaSearch/MarginaliaSearch/pull/125)
## Support for binary formats like PDF ## Support for binary formats like PDF

View File

@@ -6,6 +6,9 @@ import nu.marginalia.service.ServiceId;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.Enumeration;
import java.util.Objects; import java.util.Objects;
import java.util.UUID; import java.util.UUID;
@@ -69,6 +72,17 @@ public class ServiceConfigurationModule extends AbstractModule {
return configuredValue; return configuredValue;
} }
if (Boolean.getBoolean("system.multiFace")) {
try {
String localNetworkIp = getLocalNetworkIP();
if (null != localNetworkIp) {
return localNetworkIp;
}
}
catch (Exception ex) {
logger.warn("Failed to get local network IP", ex);
}
}
// If we're in docker, we'll use the hostname // If we're in docker, we'll use the hostname
if (Boolean.getBoolean("service.useDockerHostname")) { if (Boolean.getBoolean("service.useDockerHostname")) {
return System.getenv("HOSTNAME"); return System.getenv("HOSTNAME");
@@ -84,10 +98,41 @@ public class ServiceConfigurationModule extends AbstractModule {
private String getBindAddress() { private String getBindAddress() {
String configuredValue = System.getProperty("service.bind-address"); String configuredValue = System.getProperty("service.bind-address");
if (configuredValue != null) { if (configuredValue != null) {
logger.info("Using configured bind address {}", configuredValue);
return configuredValue; return configuredValue;
} }
return "127.0.0.1"; if (Boolean.getBoolean("system.multiFace")) {
try {
return Objects.requireNonNullElse(getLocalNetworkIP(), "0.0.0.0");
} catch (Exception ex) {
logger.warn("Failed to get local network IP, falling back to bind to 0.0.0.0", ex);
return "0.0.0.0";
}
}
else {
return "0.0.0.0";
}
}
public static String getLocalNetworkIP() throws Exception {
Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
while (nets.hasMoreElements()) {
NetworkInterface netif = nets.nextElement();
if (!netif.isUp() || netif.isLoopback()) {
continue;
}
Enumeration<InetAddress> inetAddresses = netif.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
InetAddress addr = inetAddresses.nextElement();
if (addr.isSiteLocalAddress() && !addr.isLoopbackAddress()) {
return addr.getHostAddress();
}
}
}
return null;
} }
} }

View File

@@ -7,6 +7,8 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlet.ServletHolder;
import java.net.InetSocketAddress;
public class MetricsServer { public class MetricsServer {
@Inject @Inject
@@ -15,7 +17,8 @@ public class MetricsServer {
if (configuration.metricsPort() < 0) if (configuration.metricsPort() < 0)
return; return;
Server server = new Server(configuration.metricsPort()); Server server = new Server(new InetSocketAddress(configuration.bindAddress(), configuration.metricsPort()));
ServletContextHandler context = new ServletContextHandler(); ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/"); context.setContextPath("/");
server.setHandler(context); server.setHandler(context);

View File

@@ -50,12 +50,18 @@ public class LiveCrawlActor extends RecordActorPrototype {
yield new Monitor("-"); yield new Monitor("-");
} }
case Monitor(String feedsHash) -> { case Monitor(String feedsHash) -> {
// Sleep initially in case this is during start-up
for (;;) { for (;;) {
try {
Thread.sleep(Duration.ofMinutes(15));
String currentHash = feedsClient.getFeedDataHash(); String currentHash = feedsClient.getFeedDataHash();
if (!Objects.equals(currentHash, feedsHash)) { if (!Objects.equals(currentHash, feedsHash)) {
yield new LiveCrawl(currentHash); yield new LiveCrawl(currentHash);
} }
Thread.sleep(Duration.ofMinutes(15)); }
catch (RuntimeException ex) {
logger.error("Failed to fetch feed data hash");
}
} }
} }
case LiveCrawl(String feedsHash, long msgId) when msgId < 0 -> { case LiveCrawl(String feedsHash, long msgId) when msgId < 0 -> {

View File

@@ -59,12 +59,6 @@ public class FeedsClient {
.forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList()))); .forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList())));
} }
public record UpdatedDomain(String domain, List<String> urls) {
public UpdatedDomain(RpcUpdatedLinksResponse rsp) {
this(rsp.getDomain(), new ArrayList<>(rsp.getUrlList()));
}
}
/** Get the hash of the feed data, for identifying when the data has been updated */ /** Get the hash of the feed data, for identifying when the data has been updated */
public String getFeedDataHash() { public String getFeedDataHash() {
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash) return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash)

View File

@@ -46,6 +46,7 @@ message RpcFeed {
string feedUrl = 3; string feedUrl = 3;
string updated = 4; string updated = 4;
repeated RpcFeedItem items = 5; repeated RpcFeedItem items = 5;
int64 fetchTimestamp = 6;
} }
message RpcFeedItem { message RpcFeedItem {

View File

@@ -12,9 +12,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.PosixFileAttributes;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.time.Instant; import java.time.Instant;
import java.util.Base64; import java.util.Base64;
@@ -209,4 +211,20 @@ public class FeedDb {
reader.getLinksUpdatedSince(since, consumer); reader.getLinksUpdatedSince(since, consumer);
} }
public Instant getFetchTime() {
if (!Files.exists(readerDbPath)) {
return Instant.ofEpochMilli(0);
}
try {
return Files.readAttributes(readerDbPath, PosixFileAttributes.class)
.creationTime()
.toInstant();
}
catch (IOException ex) {
logger.error("Failed to read the creatiom time of {}", readerDbPath);
return Instant.ofEpochMilli(0);
}
}
} }

View File

@@ -38,7 +38,6 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction; import java.util.function.BiFunction;
@@ -74,6 +73,17 @@ public class FeedFetcherService {
this.nodeConfigurationService = nodeConfigurationService; this.nodeConfigurationService = nodeConfigurationService;
this.serviceHeartbeat = serviceHeartbeat; this.serviceHeartbeat = serviceHeartbeat;
this.executorClient = executorClient; this.executorClient = executorClient;
// Add support for some alternate date tags for atom
rssReader.addItemExtension("issued", this::setDateFallback);
rssReader.addItemExtension("created", this::setDateFallback);
}
private void setDateFallback(Item item, String value) {
if (item.getPubDate().isEmpty()) {
item.setPubDate(value);
}
} }
public enum UpdateMode { public enum UpdateMode {
@@ -124,31 +134,33 @@ public class FeedFetcherService {
for (var feed : definitions) { for (var feed : definitions) {
executor.submitQuietly(() -> { executor.submitQuietly(() -> {
try {
var oldData = feedDb.getFeed(new EdgeDomain(feed.domain())); var oldData = feedDb.getFeed(new EdgeDomain(feed.domain()));
// If we have existing data, we might skip updating it with a probability that increases with time, // If we have existing data, we might skip updating it with a probability that increases with time,
// this is to avoid hammering the feeds that are updated very rarely and save some time and resources // this is to avoid hammering the feeds that are updated very rarely and save some time and resources
// on our end // on our end
/* Disable for now:
if (!oldData.isEmpty()) { if (!oldData.isEmpty()) {
Duration duration = feed.durationSinceUpdated(); Duration duration = feed.durationSinceUpdated();
long daysSinceUpdate = duration.toDays(); long daysSinceUpdate = duration.toDays();
if (deterministic || (daysSinceUpdate > 2 && ThreadLocalRandom.current() if (deterministic || (daysSinceUpdate > 2 && ThreadLocalRandom.current()
.nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1)) .nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1)) {
{
// Skip updating this feed, just write the old data back instead // Skip updating this feed, just write the old data back instead
writer.saveFeed(oldData); writer.saveFeed(oldData);
return; return;
} }
} }
*/
FetchResult feedData; FetchResult feedData;
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) { try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
feedData = fetchFeedData(feed, client); feedData = fetchFeedData(feed, client);
} } catch (Exception ex) {
catch (Exception ex) {
feedData = new FetchResult.TransientError(); feedData = new FetchResult.TransientError();
} }
@@ -163,13 +175,17 @@ public class FeedFetcherService {
writer.saveFeed(oldData); writer.saveFeed(oldData);
} }
} }
case FetchResult.PermanentError() -> {} // let the definition be forgotten about case FetchResult.PermanentError() -> {
} // let the definition be forgotten about
} }
}
finally {
if ((definitionsUpdated.incrementAndGet() % 1_000) == 0) { if ((definitionsUpdated.incrementAndGet() % 1_000) == 0) {
// Update the progress every 1k feeds, to avoid hammering the database and flooding the logs // Update the progress every 1k feeds, to avoid hammering the database and flooding the logs
heartbeat.progress("Updated " + definitionsUpdated + "/" + totalDefinitions + " feeds", definitionsUpdated.get(), totalDefinitions); heartbeat.progress("Updated " + definitionsUpdated + "/" + totalDefinitions + " feeds", definitionsUpdated.get(), totalDefinitions);
} }
}
}); });
} }
@@ -300,6 +316,8 @@ public class FeedFetcherService {
public FeedItems parseFeed(String feedData, FeedDefinition definition) { public FeedItems parseFeed(String feedData, FeedDefinition definition) {
try { try {
feedData = sanitizeEntities(feedData);
List<Item> rawItems = rssReader.read( List<Item> rawItems = rssReader.read(
// Massage the data to maximize the possibility of the flaky XML parser consuming it // Massage the data to maximize the possibility of the flaky XML parser consuming it
new BOMInputStream(new ByteArrayInputStream(feedData.trim().getBytes(StandardCharsets.UTF_8)), false) new BOMInputStream(new ByteArrayInputStream(feedData.trim().getBytes(StandardCharsets.UTF_8)), false)
@@ -326,6 +344,32 @@ public class FeedFetcherService {
} }
} }
private static final Map<String, String> HTML_ENTITIES = Map.of(
"&raquo;", "»",
"&laquo;", "«",
"&mdash;", "--",
"&ndash;", "-",
"&rsquo;", "'",
"&lsquo;", "'",
"&nbsp;", ""
);
/** The XML parser will blow up if you insert HTML entities in the feed XML,
* which is unfortunately relatively common. Replace them as far as is possible
* with their corresponding characters
*/
static String sanitizeEntities(String feedData) {
String result = feedData;
for (Map.Entry<String, String> entry : HTML_ENTITIES.entrySet()) {
result = result.replace(entry.getKey(), entry.getValue());
}
// Handle lone ampersands not part of a recognized XML entity
result = result.replaceAll("&(?!(amp|lt|gt|apos|quot);)", "&amp;");
return result;
}
/** Decide whether to keep URI fragments in the feed items. /** Decide whether to keep URI fragments in the feed items.
* <p></p> * <p></p>
* We keep fragments if there are multiple different fragments in the items. * We keep fragments if there are multiple different fragments in the items.
@@ -361,7 +405,7 @@ public class FeedFetcherService {
return seenFragments.size() > 1; return seenFragments.size() > 1;
} }
private static class IsFeedItemDateValid implements Predicate<FeedItem> { static class IsFeedItemDateValid implements Predicate<FeedItem> {
private final String today = ZonedDateTime.now().format(DateTimeFormatter.ISO_ZONED_DATE_TIME); private final String today = ZonedDateTime.now().format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
public boolean test(FeedItem item) { public boolean test(FeedItem item) {

View File

@@ -107,8 +107,7 @@ public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements Dis
@Override @Override
public void getFeed(RpcDomainId request, public void getFeed(RpcDomainId request,
StreamObserver<RpcFeed> responseObserver) StreamObserver<RpcFeed> responseObserver) {
{
if (!feedDb.isEnabled()) { if (!feedDb.isEnabled()) {
responseObserver.onError(new IllegalStateException("Feed database is disabled on this node")); responseObserver.onError(new IllegalStateException("Feed database is disabled on this node"));
return; return;
@@ -126,7 +125,8 @@ public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements Dis
.setDomainId(request.getDomainId()) .setDomainId(request.getDomainId())
.setDomain(domainName.get().toString()) .setDomain(domainName.get().toString())
.setFeedUrl(feedItems.feedUrl()) .setFeedUrl(feedItems.feedUrl())
.setUpdated(feedItems.updated()); .setUpdated(feedItems.updated())
.setFetchTimestamp(feedDb.getFetchTime().toEpochMilli());
for (var item : feedItems.items()) { for (var item : feedItems.items()) {
retB.addItemsBuilder() retB.addItemsBuilder()

View File

@@ -99,7 +99,9 @@ class FeedFetcherServiceTest extends AbstractModule {
feedFetcherService.setDeterministic(); feedFetcherService.setDeterministic();
feedFetcherService.updateFeeds(FeedFetcherService.UpdateMode.REFRESH); feedFetcherService.updateFeeds(FeedFetcherService.UpdateMode.REFRESH);
Assertions.assertFalse(feedDb.getFeed(new EdgeDomain("www.marginalia.nu")).isEmpty()); var result = feedDb.getFeed(new EdgeDomain("www.marginalia.nu"));
System.out.println(result);
Assertions.assertFalse(result.isEmpty());
} }
@Tag("flaky") @Tag("flaky")

View File

@@ -0,0 +1,26 @@
package nu.marginalia.rss.svc;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class TestXmlSanitization {
@Test
public void testPreservedEntities() {
Assertions.assertEquals("&amp;", FeedFetcherService.sanitizeEntities("&amp;"));
Assertions.assertEquals("&lt;", FeedFetcherService.sanitizeEntities("&lt;"));
Assertions.assertEquals("&gt;", FeedFetcherService.sanitizeEntities("&gt;"));
Assertions.assertEquals("&quot;", FeedFetcherService.sanitizeEntities("&quot;"));
Assertions.assertEquals("&apos;", FeedFetcherService.sanitizeEntities("&apos;"));
}
@Test
public void testStrayAmpersand() {
Assertions.assertEquals("Bed &amp; Breakfast", FeedFetcherService.sanitizeEntities("Bed & Breakfast"));
}
@Test
public void testTranslatedHtmlEntity() {
Assertions.assertEquals("Foo -- Bar", FeedFetcherService.sanitizeEntities("Foo &mdash; Bar"));
}
}

View File

@@ -49,13 +49,14 @@ public class Units {
var fromUnit = unitsByName.get(fromUnitName.toLowerCase()); var fromUnit = unitsByName.get(fromUnitName.toLowerCase());
var toUnit = unitsByName.get(toUnitName.toLowerCase()); var toUnit = unitsByName.get(toUnitName.toLowerCase());
if (Objects.equals(fromUnit, toUnit)) {
return Optional.of(value + " " + fromUnit.name);
}
if (null == fromUnit || null == toUnit) { if (null == fromUnit || null == toUnit) {
return Optional.empty(); return Optional.empty();
} }
if (Objects.equals(fromUnit, toUnit)) {
return Optional.of(value + " " + fromUnit.name);
}
if (!Objects.equals(toUnit.type, fromUnit.type)) { if (!Objects.equals(toUnit.type, fromUnit.type)) {
return Optional.empty(); return Optional.empty();
} }

View File

@@ -85,7 +85,7 @@ class BTreeWriterTest {
public void testWriteEntrySize2() throws IOException { public void testWriteEntrySize2() throws IOException {
BTreeContext ctx = new BTreeContext(4, 2, BTreeBlockSize.BS_64); BTreeContext ctx = new BTreeContext(4, 2, BTreeBlockSize.BS_64);
var tempFile = Files.createTempFile(Path.of("/tmp"), "tst", "dat"); var tempFile = Files.createTempFile("tst", "dat");
int[] data = generateItems32(64); int[] data = generateItems32(64);

View File

@@ -7,6 +7,7 @@ import nu.marginalia.WmsaHome;
import nu.marginalia.converting.model.ProcessedDomain; import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.converting.processor.DomainProcessor; import nu.marginalia.converting.processor.DomainProcessor;
import nu.marginalia.crawl.CrawlerMain; import nu.marginalia.crawl.CrawlerMain;
import nu.marginalia.crawl.DomainStateDb;
import nu.marginalia.crawl.fetcher.HttpFetcher; import nu.marginalia.crawl.fetcher.HttpFetcher;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
@@ -46,6 +47,7 @@ public class CrawlingThenConvertingIntegrationTest {
private Path fileName; private Path fileName;
private Path fileName2; private Path fileName2;
private Path dbTempFile;
@BeforeAll @BeforeAll
public static void setUpAll() { public static void setUpAll() {
@@ -63,16 +65,18 @@ public class CrawlingThenConvertingIntegrationTest {
httpFetcher = new HttpFetcherImpl(WmsaHome.getUserAgent().uaString()); httpFetcher = new HttpFetcherImpl(WmsaHome.getUserAgent().uaString());
this.fileName = Files.createTempFile("crawling-then-converting", ".warc.gz"); this.fileName = Files.createTempFile("crawling-then-converting", ".warc.gz");
this.fileName2 = Files.createTempFile("crawling-then-converting", ".warc.gz"); this.fileName2 = Files.createTempFile("crawling-then-converting", ".warc.gz");
this.dbTempFile = Files.createTempFile("domains", "db");
} }
@AfterEach @AfterEach
public void tearDown() throws IOException { public void tearDown() throws IOException {
Files.deleteIfExists(fileName); Files.deleteIfExists(fileName);
Files.deleteIfExists(fileName2); Files.deleteIfExists(fileName2);
Files.deleteIfExists(dbTempFile);
} }
@Test @Test
public void testInvalidDomain() throws IOException { public void testInvalidDomain() throws Exception {
// Attempt to fetch an invalid domain // Attempt to fetch an invalid domain
var specs = new CrawlerMain.CrawlSpecRecord("invalid.invalid.invalid", 10); var specs = new CrawlerMain.CrawlSpecRecord("invalid.invalid.invalid", 10);
@@ -88,7 +92,7 @@ public class CrawlingThenConvertingIntegrationTest {
} }
@Test @Test
public void testRedirectingDomain() throws IOException { public void testRedirectingDomain() throws Exception {
// Attempt to fetch an invalid domain // Attempt to fetch an invalid domain
var specs = new CrawlerMain.CrawlSpecRecord("memex.marginalia.nu", 10); var specs = new CrawlerMain.CrawlSpecRecord("memex.marginalia.nu", 10);
@@ -107,7 +111,7 @@ public class CrawlingThenConvertingIntegrationTest {
} }
@Test @Test
public void testBlockedDomain() throws IOException { public void testBlockedDomain() throws Exception {
// Attempt to fetch an invalid domain // Attempt to fetch an invalid domain
var specs = new CrawlerMain.CrawlSpecRecord("search.marginalia.nu", 10); var specs = new CrawlerMain.CrawlSpecRecord("search.marginalia.nu", 10);
@@ -124,7 +128,7 @@ public class CrawlingThenConvertingIntegrationTest {
} }
@Test @Test
public void crawlSunnyDay() throws IOException { public void crawlSunnyDay() throws Exception {
var specs = new CrawlerMain.CrawlSpecRecord("www.marginalia.nu", 10); var specs = new CrawlerMain.CrawlSpecRecord("www.marginalia.nu", 10);
CrawledDomain domain = crawl(specs); CrawledDomain domain = crawl(specs);
@@ -157,7 +161,7 @@ public class CrawlingThenConvertingIntegrationTest {
@Test @Test
public void crawlContentTypes() throws IOException { public void crawlContentTypes() throws Exception {
var specs = new CrawlerMain.CrawlSpecRecord("www.marginalia.nu", 10, var specs = new CrawlerMain.CrawlSpecRecord("www.marginalia.nu", 10,
List.of( List.of(
"https://www.marginalia.nu/sanic.png", "https://www.marginalia.nu/sanic.png",
@@ -195,7 +199,7 @@ public class CrawlingThenConvertingIntegrationTest {
@Test @Test
public void crawlRobotsTxt() throws IOException { public void crawlRobotsTxt() throws Exception {
var specs = new CrawlerMain.CrawlSpecRecord("search.marginalia.nu", 5, var specs = new CrawlerMain.CrawlSpecRecord("search.marginalia.nu", 5,
List.of("https://search.marginalia.nu/search?q=hello+world") List.of("https://search.marginalia.nu/search?q=hello+world")
); );
@@ -235,15 +239,17 @@ public class CrawlingThenConvertingIntegrationTest {
return null; // unreachable return null; // unreachable
} }
} }
private CrawledDomain crawl(CrawlerMain.CrawlSpecRecord specs) throws IOException { private CrawledDomain crawl(CrawlerMain.CrawlSpecRecord specs) throws Exception {
return crawl(specs, domain -> true); return crawl(specs, domain -> true);
} }
private CrawledDomain crawl(CrawlerMain.CrawlSpecRecord specs, Predicate<EdgeDomain> domainBlacklist) throws IOException { private CrawledDomain crawl(CrawlerMain.CrawlSpecRecord specs, Predicate<EdgeDomain> domainBlacklist) throws Exception {
List<SerializableCrawlData> data = new ArrayList<>(); List<SerializableCrawlData> data = new ArrayList<>();
try (var recorder = new WarcRecorder(fileName)) { try (var recorder = new WarcRecorder(fileName);
new CrawlerRetreiver(httpFetcher, new DomainProber(domainBlacklist), specs, recorder).crawlDomain(); var db = new DomainStateDb(dbTempFile))
{
new CrawlerRetreiver(httpFetcher, new DomainProber(domainBlacklist), specs, db, recorder).crawlDomain();
} }
CrawledDocumentParquetRecordFileWriter.convertWarc(specs.domain(), CrawledDocumentParquetRecordFileWriter.convertWarc(specs.domain(),

View File

@@ -46,6 +46,8 @@ dependencies {
implementation libs.notnull implementation libs.notnull
implementation libs.guava implementation libs.guava
implementation libs.sqlite
implementation dependencies.create(libs.guice.get()) { implementation dependencies.create(libs.guice.get()) {
exclude group: 'com.google.guava' exclude group: 'com.google.guava'
} }

View File

@@ -241,6 +241,7 @@ public class CrawlerMain extends ProcessMainClass {
// Set up the work log and the warc archiver so we can keep track of what we've done // Set up the work log and the warc archiver so we can keep track of what we've done
try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log")); try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log"));
DomainStateDb domainStateDb = new DomainStateDb(outputDir.resolve("domainstate.db"));
WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir); WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir);
AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(domainsToCrawl) AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(domainsToCrawl)
) { ) {
@@ -258,6 +259,7 @@ public class CrawlerMain extends ProcessMainClass {
anchorTagsSource, anchorTagsSource,
outputDir, outputDir,
warcArchiver, warcArchiver,
domainStateDb,
workLog); workLog);
if (pendingCrawlTasks.putIfAbsent(crawlSpec.domain(), task) == null) { if (pendingCrawlTasks.putIfAbsent(crawlSpec.domain(), task) == null) {
@@ -299,11 +301,12 @@ public class CrawlerMain extends ProcessMainClass {
heartbeat.start(); heartbeat.start();
try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler-" + targetDomainName.replace('/', '-') + ".log")); try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler-" + targetDomainName.replace('/', '-') + ".log"));
DomainStateDb domainStateDb = new DomainStateDb(outputDir.resolve("domainstate.db"));
WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir); WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir);
AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(List.of(new EdgeDomain(targetDomainName))) AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(List.of(new EdgeDomain(targetDomainName)))
) { ) {
var spec = new CrawlSpecRecord(targetDomainName, 1000, List.of()); var spec = new CrawlSpecRecord(targetDomainName, 1000, List.of());
var task = new CrawlTask(spec, anchorTagsSource, outputDir, warcArchiver, workLog); var task = new CrawlTask(spec, anchorTagsSource, outputDir, warcArchiver, domainStateDb, workLog);
task.run(); task.run();
} }
catch (Exception ex) { catch (Exception ex) {
@@ -324,18 +327,21 @@ public class CrawlerMain extends ProcessMainClass {
private final AnchorTagsSource anchorTagsSource; private final AnchorTagsSource anchorTagsSource;
private final Path outputDir; private final Path outputDir;
private final WarcArchiverIf warcArchiver; private final WarcArchiverIf warcArchiver;
private final DomainStateDb domainStateDb;
private final WorkLog workLog; private final WorkLog workLog;
CrawlTask(CrawlSpecRecord specification, CrawlTask(CrawlSpecRecord specification,
AnchorTagsSource anchorTagsSource, AnchorTagsSource anchorTagsSource,
Path outputDir, Path outputDir,
WarcArchiverIf warcArchiver, WarcArchiverIf warcArchiver,
DomainStateDb domainStateDb,
WorkLog workLog) WorkLog workLog)
{ {
this.specification = specification; this.specification = specification;
this.anchorTagsSource = anchorTagsSource; this.anchorTagsSource = anchorTagsSource;
this.outputDir = outputDir; this.outputDir = outputDir;
this.warcArchiver = warcArchiver; this.warcArchiver = warcArchiver;
this.domainStateDb = domainStateDb;
this.workLog = workLog; this.workLog = workLog;
this.domain = specification.domain(); this.domain = specification.domain();
@@ -359,7 +365,7 @@ public class CrawlerMain extends ProcessMainClass {
} }
try (var warcRecorder = new WarcRecorder(newWarcFile); // write to a temp file for now try (var warcRecorder = new WarcRecorder(newWarcFile); // write to a temp file for now
var retriever = new CrawlerRetreiver(fetcher, domainProber, specification, warcRecorder); var retriever = new CrawlerRetreiver(fetcher, domainProber, specification, domainStateDb, warcRecorder);
CrawlDataReference reference = getReference(); CrawlDataReference reference = getReference();
) )
{ {

View File

@@ -0,0 +1,127 @@
package nu.marginalia.crawl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Optional;
/** Supplemental sqlite database for storing the summary of a crawl.
* One database exists per crawl data set.
* */
public class DomainStateDb implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(DomainStateDb.class);
private final Connection connection;
public record SummaryRecord(
String domainName,
Instant lastUpdated,
String state,
@Nullable String stateDesc,
@Nullable String feedUrl
)
{
public static SummaryRecord forSuccess(String domainName) {
return new SummaryRecord(domainName, Instant.now(), "OK", null, null);
}
public static SummaryRecord forSuccess(String domainName, String feedUrl) {
return new SummaryRecord(domainName, Instant.now(), "OK", null, feedUrl);
}
public static SummaryRecord forError(String domainName, String state, String stateDesc) {
return new SummaryRecord(domainName, Instant.now(), state, stateDesc, null);
}
public boolean equals(Object other) {
if (other == this) {
return true;
}
if (!(other instanceof SummaryRecord(String name, Instant updated, String state1, String desc, String url))) {
return false;
}
return domainName.equals(name) &&
lastUpdated.toEpochMilli() == updated.toEpochMilli() &&
state.equals(state1) &&
(stateDesc == null ? desc == null : stateDesc.equals(desc)) &&
(feedUrl == null ? url == null : feedUrl.equals(url));
}
public int hashCode() {
return domainName.hashCode() + Long.hashCode(lastUpdated.toEpochMilli());
}
}
public DomainStateDb(Path filename) throws SQLException {
String sqliteDbString = "jdbc:sqlite:" + filename.toString();
connection = DriverManager.getConnection(sqliteDbString);
try (var stmt = connection.createStatement()) {
stmt.executeUpdate("""
CREATE TABLE IF NOT EXISTS summary (
domain TEXT PRIMARY KEY,
lastUpdatedEpochMs LONG NOT NULL,
state TEXT NOT NULL,
stateDesc TEXT,
feedUrl TEXT
)
""");
stmt.execute("PRAGMA journal_mode=WAL");
}
}
@Override
public void close() throws SQLException {
connection.close();
}
public void save(SummaryRecord record) {
try (var stmt = connection.prepareStatement("""
INSERT OR REPLACE INTO summary (domain, lastUpdatedEpochMs, state, stateDesc, feedUrl)
VALUES (?, ?, ?, ?, ?)
""")) {
stmt.setString(1, record.domainName());
stmt.setLong(2, record.lastUpdated().toEpochMilli());
stmt.setString(3, record.state());
stmt.setString(4, record.stateDesc());
stmt.setString(5, record.feedUrl());
stmt.executeUpdate();
} catch (SQLException e) {
logger.error("Failed to insert summary record", e);
}
}
public Optional<SummaryRecord> get(String domainName) {
try (var stmt = connection.prepareStatement("""
SELECT domain, lastUpdatedEpochMs, state, stateDesc, feedUrl
FROM summary
WHERE domain = ?
""")) {
stmt.setString(1, domainName);
var rs = stmt.executeQuery();
if (rs.next()) {
return Optional.of(new SummaryRecord(
rs.getString("domain"),
Instant.ofEpochMilli(rs.getLong("lastUpdatedEpochMs")),
rs.getString("state"),
rs.getString("stateDesc"),
rs.getString("feedUrl")
));
}
} catch (SQLException e) {
logger.error("Failed to get summary record", e);
}
return Optional.empty();
}
}

View File

@@ -139,7 +139,7 @@ public class HttpFetcherImpl implements HttpFetcher {
public ContentTypeProbeResult probeContentType(EdgeUrl url, public ContentTypeProbeResult probeContentType(EdgeUrl url,
WarcRecorder warcRecorder, WarcRecorder warcRecorder,
ContentTags tags) throws RateLimitException { ContentTags tags) throws RateLimitException {
if (tags.isEmpty()) { if (tags.isEmpty() && contentTypeLogic.isUrlLikeBinary(url)) {
var headBuilder = new Request.Builder().head() var headBuilder = new Request.Builder().head()
.addHeader("User-agent", userAgentString) .addHeader("User-agent", userAgentString)
.addHeader("Accept-Encoding", "gzip") .addHeader("Accept-Encoding", "gzip")

View File

@@ -4,6 +4,7 @@ import crawlercommons.robots.SimpleRobotRules;
import nu.marginalia.atags.model.DomainLinks; import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.contenttype.ContentType; import nu.marginalia.contenttype.ContentType;
import nu.marginalia.crawl.CrawlerMain; import nu.marginalia.crawl.CrawlerMain;
import nu.marginalia.crawl.DomainStateDb;
import nu.marginalia.crawl.fetcher.ContentTags; import nu.marginalia.crawl.fetcher.ContentTags;
import nu.marginalia.crawl.fetcher.HttpFetcher; import nu.marginalia.crawl.fetcher.HttpFetcher;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
@@ -16,7 +17,9 @@ import nu.marginalia.ip_blocklist.UrlBlocklist;
import nu.marginalia.link_parser.LinkParser; import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.body.DocumentBodyExtractor;
import nu.marginalia.model.body.HttpFetchResult; import nu.marginalia.model.body.HttpFetchResult;
import nu.marginalia.model.crawldata.CrawlerDomainStatus;
import org.jsoup.Jsoup; import org.jsoup.Jsoup;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -46,6 +49,7 @@ public class CrawlerRetreiver implements AutoCloseable {
private final DomainProber domainProber; private final DomainProber domainProber;
private final DomainCrawlFrontier crawlFrontier; private final DomainCrawlFrontier crawlFrontier;
private final DomainStateDb domainStateDb;
private final WarcRecorder warcRecorder; private final WarcRecorder warcRecorder;
private final CrawlerRevisitor crawlerRevisitor; private final CrawlerRevisitor crawlerRevisitor;
@@ -55,8 +59,10 @@ public class CrawlerRetreiver implements AutoCloseable {
public CrawlerRetreiver(HttpFetcher fetcher, public CrawlerRetreiver(HttpFetcher fetcher,
DomainProber domainProber, DomainProber domainProber,
CrawlerMain.CrawlSpecRecord specs, CrawlerMain.CrawlSpecRecord specs,
DomainStateDb domainStateDb,
WarcRecorder warcRecorder) WarcRecorder warcRecorder)
{ {
this.domainStateDb = domainStateDb;
this.warcRecorder = warcRecorder; this.warcRecorder = warcRecorder;
this.fetcher = fetcher; this.fetcher = fetcher;
this.domainProber = domainProber; this.domainProber = domainProber;
@@ -90,8 +96,21 @@ public class CrawlerRetreiver implements AutoCloseable {
try { try {
// Do an initial domain probe to determine the root URL // Do an initial domain probe to determine the root URL
EdgeUrl rootUrl; EdgeUrl rootUrl;
if (probeRootUrl() instanceof HttpFetcher.DomainProbeResult.Ok ok) rootUrl = ok.probedUrl();
else return 1; var probeResult = probeRootUrl();
switch (probeResult) {
case HttpFetcher.DomainProbeResult.Ok(EdgeUrl probedUrl) -> {
rootUrl = probedUrl; // Good track
}
case HttpFetcher.DomainProbeResult.Redirect(EdgeDomain domain1) -> {
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, "Redirect", domain1.toString()));
return 1;
}
case HttpFetcher.DomainProbeResult.Error(CrawlerDomainStatus status, String desc) -> {
domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, status.toString(), desc));
return 1;
}
}
// Sleep after the initial probe, we don't have access to the robots.txt yet // Sleep after the initial probe, we don't have access to the robots.txt yet
// so we don't know the crawl delay // so we don't know the crawl delay
@@ -114,7 +133,8 @@ public class CrawlerRetreiver implements AutoCloseable {
delayTimer.waitFetchDelay(0); // initial delay after robots.txt delayTimer.waitFetchDelay(0); // initial delay after robots.txt
sniffRootDocument(rootUrl, delayTimer); DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(rootUrl, delayTimer);
domainStateDb.save(summaryRecord);
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified // Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) { if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) {
@@ -196,7 +216,9 @@ public class CrawlerRetreiver implements AutoCloseable {
return domainProbeResult; return domainProbeResult;
} }
private void sniffRootDocument(EdgeUrl rootUrl, CrawlDelayTimer timer) { private DomainStateDb.SummaryRecord sniffRootDocument(EdgeUrl rootUrl, CrawlDelayTimer timer) {
Optional<String> feedLink = Optional.empty();
try { try {
var url = rootUrl.withPathAndParam("/", null); var url = rootUrl.withPathAndParam("/", null);
@@ -204,11 +226,11 @@ public class CrawlerRetreiver implements AutoCloseable {
timer.waitFetchDelay(0); timer.waitFetchDelay(0);
if (!(result instanceof HttpFetchResult.ResultOk ok)) if (!(result instanceof HttpFetchResult.ResultOk ok))
return; return DomainStateDb.SummaryRecord.forSuccess(domain);
var optDoc = ok.parseDocument(); var optDoc = ok.parseDocument();
if (optDoc.isEmpty()) if (optDoc.isEmpty())
return; return DomainStateDb.SummaryRecord.forSuccess(domain);
// Sniff the software based on the sample document // Sniff the software based on the sample document
var doc = optDoc.get(); var doc = optDoc.get();
@@ -216,7 +238,6 @@ public class CrawlerRetreiver implements AutoCloseable {
crawlFrontier.enqueueLinksFromDocument(url, doc); crawlFrontier.enqueueLinksFromDocument(url, doc);
EdgeUrl faviconUrl = url.withPathAndParam("/favicon.ico", null); EdgeUrl faviconUrl = url.withPathAndParam("/favicon.ico", null);
Optional<EdgeUrl> sitemapUrl = Optional.empty();
for (var link : doc.getElementsByTag("link")) { for (var link : doc.getElementsByTag("link")) {
String rel = link.attr("rel"); String rel = link.attr("rel");
@@ -232,23 +253,33 @@ public class CrawlerRetreiver implements AutoCloseable {
// Grab the RSS/Atom as a sitemap if it exists // Grab the RSS/Atom as a sitemap if it exists
if (rel.equalsIgnoreCase("alternate") if (rel.equalsIgnoreCase("alternate")
&& (type.equalsIgnoreCase("application/atom+xml") || type.equalsIgnoreCase("application/atomsvc+xml"))) { && (type.equalsIgnoreCase("application/atom+xml")
|| type.equalsIgnoreCase("application/atomsvc+xml")
|| type.equalsIgnoreCase("application/rss+xml")
)) {
String href = link.attr("href"); String href = link.attr("href");
sitemapUrl = linkParser.parseLink(url, href) feedLink = linkParser.parseLink(url, href)
.filter(crawlFrontier::isSameDomain); .filter(crawlFrontier::isSameDomain)
.map(EdgeUrl::toString);
} }
} }
// Download the sitemap if available exists
if (sitemapUrl.isPresent()) { if (feedLink.isEmpty()) {
sitemapFetcher.downloadSitemaps(List.of(sitemapUrl.get())); feedLink = guessFeedUrl(timer);
}
// Download the sitemap if available
if (feedLink.isPresent()) {
sitemapFetcher.downloadSitemaps(List.of(feedLink.get()));
timer.waitFetchDelay(0); timer.waitFetchDelay(0);
} }
// Grab the favicon if it exists // Grab the favicon if it exists
fetchWithRetry(faviconUrl, timer, HttpFetcher.ProbeType.DISABLED, ContentTags.empty()); fetchWithRetry(faviconUrl, timer, HttpFetcher.ProbeType.DISABLED, ContentTags.empty());
timer.waitFetchDelay(0); timer.waitFetchDelay(0);
} }
catch (Exception ex) { catch (Exception ex) {
logger.error("Error configuring link filter", ex); logger.error("Error configuring link filter", ex);
@@ -256,6 +287,74 @@ public class CrawlerRetreiver implements AutoCloseable {
finally { finally {
crawlFrontier.addVisited(rootUrl); crawlFrontier.addVisited(rootUrl);
} }
if (feedLink.isPresent()) {
return DomainStateDb.SummaryRecord.forSuccess(domain, feedLink.get());
}
else {
return DomainStateDb.SummaryRecord.forSuccess(domain);
}
}
private final List<String> likelyFeedEndpoints = List.of(
"/rss.xml",
"/atom.xml",
"/feed.xml",
"/index.xml",
"/feed",
"/rss",
"/atom",
"/feeds",
"/blog/feed",
"/blog/rss"
);
private Optional<String> guessFeedUrl(CrawlDelayTimer timer) throws InterruptedException {
var oldDomainStateRecord = domainStateDb.get(domain);
// If we are already aware of an old feed URL, then we can just revalidate it
if (oldDomainStateRecord.isPresent()) {
var oldRecord = oldDomainStateRecord.get();
if (oldRecord.feedUrl() != null && validateFeedUrl(oldRecord.feedUrl(), timer)) {
return Optional.of(oldRecord.feedUrl());
}
}
for (String endpoint : likelyFeedEndpoints) {
String url = "https://" + domain + "/" + endpoint;
if (validateFeedUrl(url, timer)) {
return Optional.of(url);
}
}
return Optional.empty();
}
private boolean validateFeedUrl(String url, CrawlDelayTimer timer) throws InterruptedException {
var parsedOpt = EdgeUrl.parse(url);
if (parsedOpt.isEmpty())
return false;
HttpFetchResult result = fetchWithRetry(parsedOpt.get(), timer, HttpFetcher.ProbeType.DISABLED, ContentTags.empty());
timer.waitFetchDelay(0);
if (!(result instanceof HttpFetchResult.ResultOk ok)) {
return false;
}
// Extract the beginning of the
Optional<String> bodyOpt = DocumentBodyExtractor.asString(ok).getBody();
if (bodyOpt.isEmpty())
return false;
String body = bodyOpt.get();
body = body.substring(0, Math.min(128, body.length())).toLowerCase();
if (body.contains("<atom"))
return true;
if (body.contains("<rss"))
return true;
return false;
} }
public HttpFetchResult fetchContentWithReference(EdgeUrl top, public HttpFetchResult fetchContentWithReference(EdgeUrl top,

View File

@@ -7,9 +7,9 @@ import nu.marginalia.model.EdgeUrl;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Set; import java.util.Set;
public class SitemapFetcher { public class SitemapFetcher {
@@ -24,26 +24,27 @@ public class SitemapFetcher {
} }
public void downloadSitemaps(SimpleRobotRules robotsRules, EdgeUrl rootUrl) { public void downloadSitemaps(SimpleRobotRules robotsRules, EdgeUrl rootUrl) {
List<String> sitemaps = robotsRules.getSitemaps(); List<String> urls = robotsRules.getSitemaps();
List<EdgeUrl> urls = new ArrayList<>(sitemaps.size()); if (urls.isEmpty()) {
if (!sitemaps.isEmpty()) { urls = List.of(rootUrl.withPathAndParam("/sitemap.xml", null).toString());
for (var url : sitemaps) {
EdgeUrl.parse(url).ifPresent(urls::add);
}
}
else {
urls.add(rootUrl.withPathAndParam("/sitemap.xml", null));
} }
downloadSitemaps(urls); downloadSitemaps(urls);
} }
public void downloadSitemaps(List<EdgeUrl> urls) { public void downloadSitemaps(List<String> urls) {
Set<String> checkedSitemaps = new HashSet<>(); Set<String> checkedSitemaps = new HashSet<>();
for (var url : urls) { for (var rawUrl : urls) {
Optional<EdgeUrl> parsedUrl = EdgeUrl.parse(rawUrl);
if (parsedUrl.isEmpty()) {
continue;
}
EdgeUrl url = parsedUrl.get();
// Let's not download sitemaps from other domains for now // Let's not download sitemaps from other domains for now
if (!crawlFrontier.isSameDomain(url)) { if (!crawlFrontier.isSameDomain(url)) {
continue; continue;

View File

@@ -18,6 +18,7 @@ public class ContentTypeLogic {
"application/xhtml", "application/xhtml",
"application/xml", "application/xml",
"application/atom+xml", "application/atom+xml",
"application/atomsvc+xml",
"application/rss+xml", "application/rss+xml",
"application/x-rss+xml", "application/x-rss+xml",
"application/rdf+xml", "application/rdf+xml",

View File

@@ -23,6 +23,10 @@ public sealed interface DocumentBodyResult<T> {
return mapper.apply(contentType, body); return mapper.apply(contentType, body);
} }
public Optional<T> getBody() {
return Optional.of(body);
}
@Override @Override
public void ifPresent(ExConsumer<T, Exception> consumer) throws Exception { public void ifPresent(ExConsumer<T, Exception> consumer) throws Exception {
consumer.accept(contentType, body); consumer.accept(contentType, body);
@@ -41,6 +45,11 @@ public sealed interface DocumentBodyResult<T> {
return (DocumentBodyResult<T2>) this; return (DocumentBodyResult<T2>) this;
} }
@Override
public Optional<T> getBody() {
return Optional.empty();
}
@Override @Override
public void ifPresent(ExConsumer<T, Exception> consumer) throws Exception { public void ifPresent(ExConsumer<T, Exception> consumer) throws Exception {
} }
@@ -49,6 +58,7 @@ public sealed interface DocumentBodyResult<T> {
<T2> Optional<T2> mapOpt(BiFunction<ContentType, T, T2> mapper); <T2> Optional<T2> mapOpt(BiFunction<ContentType, T, T2> mapper);
<T2> Optional<T2> flatMapOpt(BiFunction<ContentType, T, Optional<T2>> mapper); <T2> Optional<T2> flatMapOpt(BiFunction<ContentType, T, Optional<T2>> mapper);
<T2> DocumentBodyResult<T2> flatMap(BiFunction<ContentType, T, DocumentBodyResult<T2>> mapper); <T2> DocumentBodyResult<T2> flatMap(BiFunction<ContentType, T, DocumentBodyResult<T2>> mapper);
Optional<T> getBody();
void ifPresent(ExConsumer<T,Exception> consumer) throws Exception; void ifPresent(ExConsumer<T,Exception> consumer) throws Exception;

View File

@@ -0,0 +1,66 @@
package nu.marginalia.crawl;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Instant;
import static org.junit.jupiter.api.Assertions.assertEquals;
class DomainStateDbTest {
Path tempFile;
@BeforeEach
void setUp() throws IOException {
tempFile = Files.createTempFile(getClass().getSimpleName(), ".db");
}
@AfterEach
void tearDown() throws IOException {
Files.deleteIfExists(tempFile);
}
@Test
public void testSunnyDay() throws SQLException {
try (var db = new DomainStateDb(tempFile)) {
var allFields = new DomainStateDb.SummaryRecord(
"all.marginalia.nu",
Instant.now(),
"OK",
"Bad address",
"https://www.marginalia.nu/atom.xml"
);
var minFields = new DomainStateDb.SummaryRecord(
"min.marginalia.nu",
Instant.now(),
"OK",
null,
null
);
db.save(allFields);
db.save(minFields);
assertEquals(allFields, db.get("all.marginalia.nu").orElseThrow());
assertEquals(minFields, db.get("min.marginalia.nu").orElseThrow());
var updatedAllFields = new DomainStateDb.SummaryRecord(
"all.marginalia.nu",
Instant.now(),
"BAD",
null,
null
);
db.save(updatedAllFields);
assertEquals(updatedAllFields, db.get("all.marginalia.nu").orElseThrow());
}
}
}

View File

@@ -42,24 +42,24 @@ class ContentTypeProberTest {
port = r.nextInt(10000) + 8000; port = r.nextInt(10000) + 8000;
server = HttpServer.create(new InetSocketAddress("127.0.0.1", port), 10); server = HttpServer.create(new InetSocketAddress("127.0.0.1", port), 10);
server.createContext("/html", exchange -> { server.createContext("/html.gz", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "text/html"); exchange.getResponseHeaders().add("Content-Type", "text/html");
exchange.sendResponseHeaders(200, -1); exchange.sendResponseHeaders(200, -1);
exchange.close(); exchange.close();
}); });
server.createContext("/redir", exchange -> { server.createContext("/redir.gz", exchange -> {
exchange.getResponseHeaders().add("Location", "/html"); exchange.getResponseHeaders().add("Location", "/html.gz");
exchange.sendResponseHeaders(301, -1); exchange.sendResponseHeaders(301, -1);
exchange.close(); exchange.close();
}); });
server.createContext("/bin", exchange -> { server.createContext("/bin.gz", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "application/binary"); exchange.getResponseHeaders().add("Content-Type", "application/binary");
exchange.sendResponseHeaders(200, -1); exchange.sendResponseHeaders(200, -1);
exchange.close(); exchange.close();
}); });
server.createContext("/timeout", exchange -> { server.createContext("/timeout.gz", exchange -> {
try { try {
Thread.sleep(15_000); Thread.sleep(15_000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@@ -73,10 +73,10 @@ class ContentTypeProberTest {
server.start(); server.start();
htmlEndpoint = EdgeUrl.parse("http://localhost:" + port + "/html").get(); htmlEndpoint = EdgeUrl.parse("http://localhost:" + port + "/html.gz").get();
binaryEndpoint = EdgeUrl.parse("http://localhost:" + port + "/bin").get(); binaryEndpoint = EdgeUrl.parse("http://localhost:" + port + "/bin.gz").get();
timeoutEndpoint = EdgeUrl.parse("http://localhost:" + port + "/timeout").get(); timeoutEndpoint = EdgeUrl.parse("http://localhost:" + port + "/timeout.gz").get();
htmlRedirEndpoint = EdgeUrl.parse("http://localhost:" + port + "/redir").get(); htmlRedirEndpoint = EdgeUrl.parse("http://localhost:" + port + "/redir.gz").get();
fetcher = new HttpFetcherImpl("test"); fetcher = new HttpFetcherImpl("test");
recorder = new WarcRecorder(warcFile); recorder = new WarcRecorder(warcFile);

View File

@@ -2,6 +2,7 @@ package nu.marginalia.crawling.retreival;
import crawlercommons.robots.SimpleRobotRules; import crawlercommons.robots.SimpleRobotRules;
import nu.marginalia.crawl.CrawlerMain; import nu.marginalia.crawl.CrawlerMain;
import nu.marginalia.crawl.DomainStateDb;
import nu.marginalia.crawl.fetcher.ContentTags; import nu.marginalia.crawl.fetcher.ContentTags;
import nu.marginalia.crawl.fetcher.HttpFetcher; import nu.marginalia.crawl.fetcher.HttpFetcher;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
@@ -18,6 +19,7 @@ import nu.marginalia.model.crawldata.SerializableCrawlData;
import nu.marginalia.test.CommonTestData; import nu.marginalia.test.CommonTestData;
import okhttp3.Headers; import okhttp3.Headers;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -25,6 +27,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@@ -36,9 +41,14 @@ public class CrawlerMockFetcherTest {
Map<EdgeUrl, CrawledDocument> mockData = new HashMap<>(); Map<EdgeUrl, CrawledDocument> mockData = new HashMap<>();
HttpFetcher fetcherMock = new MockFetcher(); HttpFetcher fetcherMock = new MockFetcher();
private Path dbTempFile;
@BeforeEach
public void setUp() throws IOException {
dbTempFile = Files.createTempFile("domains","db");
}
@AfterEach @AfterEach
public void tearDown() { public void tearDown() throws IOException {
Files.deleteIfExists(dbTempFile);
mockData.clear(); mockData.clear();
} }
@@ -66,15 +76,17 @@ public class CrawlerMockFetcherTest {
} }
void crawl(CrawlerMain.CrawlSpecRecord spec) throws IOException { void crawl(CrawlerMain.CrawlSpecRecord spec) throws IOException, SQLException {
try (var recorder = new WarcRecorder()) { try (var recorder = new WarcRecorder();
new CrawlerRetreiver(fetcherMock, new DomainProber(d -> true), spec, recorder) var db = new DomainStateDb(dbTempFile)
) {
new CrawlerRetreiver(fetcherMock, new DomainProber(d -> true), spec, db, recorder)
.crawlDomain(); .crawlDomain();
} }
} }
@Test @Test
public void testLemmy() throws URISyntaxException, IOException { public void testLemmy() throws Exception {
List<SerializableCrawlData> out = new ArrayList<>(); List<SerializableCrawlData> out = new ArrayList<>();
registerUrlClasspathData(new EdgeUrl("https://startrek.website/"), "mock-crawl-data/lemmy/index.html"); registerUrlClasspathData(new EdgeUrl("https://startrek.website/"), "mock-crawl-data/lemmy/index.html");
@@ -85,7 +97,7 @@ public class CrawlerMockFetcherTest {
} }
@Test @Test
public void testMediawiki() throws URISyntaxException, IOException { public void testMediawiki() throws Exception {
List<SerializableCrawlData> out = new ArrayList<>(); List<SerializableCrawlData> out = new ArrayList<>();
registerUrlClasspathData(new EdgeUrl("https://en.wikipedia.org/"), "mock-crawl-data/mediawiki/index.html"); registerUrlClasspathData(new EdgeUrl("https://en.wikipedia.org/"), "mock-crawl-data/mediawiki/index.html");
@@ -94,7 +106,7 @@ public class CrawlerMockFetcherTest {
} }
@Test @Test
public void testDiscourse() throws URISyntaxException, IOException { public void testDiscourse() throws Exception {
List<SerializableCrawlData> out = new ArrayList<>(); List<SerializableCrawlData> out = new ArrayList<>();
registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/"), "mock-crawl-data/discourse/index.html"); registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/"), "mock-crawl-data/discourse/index.html");

View File

@@ -4,6 +4,7 @@ import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome; import nu.marginalia.WmsaHome;
import nu.marginalia.atags.model.DomainLinks; import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.crawl.CrawlerMain; import nu.marginalia.crawl.CrawlerMain;
import nu.marginalia.crawl.DomainStateDb;
import nu.marginalia.crawl.fetcher.HttpFetcher; import nu.marginalia.crawl.fetcher.HttpFetcher;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
@@ -25,6 +26,7 @@ import java.io.RandomAccessFile;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.sql.SQLException;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -39,11 +41,13 @@ class CrawlerRetreiverTest {
Path tempFileWarc2; Path tempFileWarc2;
Path tempFileParquet2; Path tempFileParquet2;
Path tempFileWarc3; Path tempFileWarc3;
Path tempFileDb;
@BeforeEach @BeforeEach
public void setUp() throws IOException { public void setUp() throws IOException {
httpFetcher = new HttpFetcherImpl("search.marginalia.nu; testing a bit :D"); httpFetcher = new HttpFetcherImpl("search.marginalia.nu; testing a bit :D");
tempFileParquet1 = Files.createTempFile("crawling-process", ".parquet"); tempFileParquet1 = Files.createTempFile("crawling-process", ".parquet");
tempFileParquet2 = Files.createTempFile("crawling-process", ".parquet"); tempFileParquet2 = Files.createTempFile("crawling-process", ".parquet");
tempFileDb = Files.createTempFile("crawling-process", ".db");
} }
@@ -505,22 +509,26 @@ class CrawlerRetreiverTest {
} }
private void doCrawlWithReferenceStream(CrawlerMain.CrawlSpecRecord specs, SerializableCrawlDataStream stream) { private void doCrawlWithReferenceStream(CrawlerMain.CrawlSpecRecord specs, SerializableCrawlDataStream stream) {
try (var recorder = new WarcRecorder(tempFileWarc2)) { try (var recorder = new WarcRecorder(tempFileWarc2);
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).crawlDomain(new DomainLinks(), var db = new DomainStateDb(tempFileDb)
) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, db, recorder).crawlDomain(new DomainLinks(),
new CrawlDataReference(stream)); new CrawlDataReference(stream));
} }
catch (IOException ex) { catch (IOException | SQLException ex) {
Assertions.fail(ex); Assertions.fail(ex);
} }
} }
@NotNull @NotNull
private DomainCrawlFrontier doCrawl(Path tempFileWarc1, CrawlerMain.CrawlSpecRecord specs) { private DomainCrawlFrontier doCrawl(Path tempFileWarc1, CrawlerMain.CrawlSpecRecord specs) {
try (var recorder = new WarcRecorder(tempFileWarc1)) { try (var recorder = new WarcRecorder(tempFileWarc1);
var crawler = new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder); var db = new DomainStateDb(tempFileDb)
) {
var crawler = new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, db, recorder);
crawler.crawlDomain(); crawler.crawlDomain();
return crawler.getCrawlFrontier(); return crawler.getCrawlFrontier();
} catch (IOException ex) { } catch (IOException| SQLException ex) {
Assertions.fail(ex); Assertions.fail(ex);
return null; // unreachable return null; // unreachable
} }

View File

@@ -4,6 +4,7 @@ import crawlercommons.robots.SimpleRobotRules;
import crawlercommons.robots.SimpleRobotRulesParser; import crawlercommons.robots.SimpleRobotRulesParser;
import nu.marginalia.WmsaHome; import nu.marginalia.WmsaHome;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.logic.DomainLocks;
import nu.marginalia.crawl.retreival.CrawlDelayTimer; import nu.marginalia.crawl.retreival.CrawlDelayTimer;
import nu.marginalia.db.DbDomainQueries; import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.db.DomainBlacklist; import nu.marginalia.db.DomainBlacklist;
@@ -40,6 +41,7 @@ public class SimpleLinkScraper implements AutoCloseable {
private final DomainBlacklist domainBlacklist; private final DomainBlacklist domainBlacklist;
private final Duration connectTimeout = Duration.ofSeconds(10); private final Duration connectTimeout = Duration.ofSeconds(10);
private final Duration readTimeout = Duration.ofSeconds(10); private final Duration readTimeout = Duration.ofSeconds(10);
private final DomainLocks domainLocks = new DomainLocks();
public SimpleLinkScraper(LiveCrawlDataSet dataSet, public SimpleLinkScraper(LiveCrawlDataSet dataSet,
DbDomainQueries domainQueries, DbDomainQueries domainQueries,
@@ -65,7 +67,9 @@ public class SimpleLinkScraper implements AutoCloseable {
.connectTimeout(connectTimeout) .connectTimeout(connectTimeout)
.followRedirects(HttpClient.Redirect.NEVER) .followRedirects(HttpClient.Redirect.NEVER)
.version(HttpClient.Version.HTTP_2) .version(HttpClient.Version.HTTP_2)
.build()) { .build();
DomainLocks.DomainLock lock = domainLocks.lockDomain(domain) // throttle concurrent access per domain; do not remove
) {
EdgeUrl rootUrl = domain.toRootUrlHttps(); EdgeUrl rootUrl = domain.toRootUrlHttps();

View File

@@ -8,7 +8,7 @@ jib {
} }
container { container {
mainClass = application.mainClass mainClass = application.mainClass
jvmFlags = ['-Dservice.bind-address=0.0.0.0', '-Dservice.useDockerHostname=TRUE', '-Dsystem.homePath=/wmsa'] jvmFlags = ['-Dservice.useDockerHostname=TRUE', '-Dsystem.homePath=/wmsa']
volumes = ['/wmsa/conf', '/wmsa/model', '/wmsa/data', '/var/log/wmsa'] volumes = ['/wmsa/conf', '/wmsa/model', '/wmsa/data', '/var/log/wmsa']
} }
} }

View File

@@ -0,0 +1,325 @@
from dataclasses import dataclass
import subprocess, os
from typing import List, Set, Dict, Optional
import argparse
build_dir = "/app/search.marginalia.nu/build"
docker_dir = "/app/search.marginalia.nu/docker"
@dataclass
class ServiceConfig:
"""Configuration for a service"""
gradle_target: str
docker_name: str
instances: int | None
deploy_tier: int
groups: Set[str]
@dataclass
class DeploymentPlan:
services_to_build: List[str]
instances_to_deploy: Set[str]
@dataclass
class DockerContainer:
name: str
partition: int
config: ServiceConfig
def docker_name(self) -> str:
if self.partition < 1:
return f"{self.name}"
return f"{self.name}-{self.partition}"
def deploy_key(self) -> str:
return f"{self.config.deploy_tier}.{self.partition}"
class BuildError(Exception):
"""Raised when a build fails"""
def __init__(self, service: str, return_code: int):
self.service = service
self.return_code = return_code
super().__init__(f"Build failed for {service} with code {return_code}")
def get_deployment_tag() -> str | None:
"""Get the deployment tag from the current HEAD commit, if one exists."""
cmd = ['git', 'for-each-ref', '--points-at', 'HEAD', 'refs/tags', '--format=%(refname:short) %(subject)']
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"Git command failed: {result.stderr}")
for tag in result.stdout.splitlines():
if tag.startswith('deploy-'):
return tag.split(' ')[1:]
return None
def parse_deployment_tags(
tag_messages: List[str],
service_config: Dict[str, ServiceConfig]
) -> DeploymentPlan:
"""
Parse deployment and hold tags using service configuration.
Args:
tag_messages: List of tag messages (e.g. ['deploy:all,-frontend', 'hold:index-service-7'])
service_config: Dictionary mapping service names to their configuration
Returns:
DeploymentPlan containing services to build and instances to hold
"""
services_to_build = set()
services_to_exclude = set()
instances_to_hold = set()
available_services = set(service_config.keys())
available_groups = set()
partitions = set()
for service in service_config.values():
available_groups = available_groups | service.groups
for tag in [tag.strip() for tag in tag_messages]:
if tag.startswith('partition:'):
for p in tag[10:].strip().split(','):
partitions.add(int(p))
if tag.startswith('deploy:'):
parts = tag[7:].strip().split(',')
for part in parts:
part = part.strip()
if part.startswith('-'):
service = part[1:]
if not service in available_services:
raise ValueError(f"Unknown service {service}")
services_to_exclude.add(service)
elif part.startswith('+'):
service = part[1:]
if not service in available_services:
raise ValueError(f"Unknown service {service}")
services_to_build.add(service)
else:
group = part
if not group in available_groups:
raise ValueError(f"Unknown service group {group}")
for name, service in service_config.items():
if group in service.groups:
services_to_build.add(name)
elif tag.startswith('hold:'):
instances = tag[5:].strip().split(',')
instances_to_hold.update(i.strip() for i in instances if i.strip())
print(partitions)
# Remove any explicitly excluded services
services_to_build = services_to_build - services_to_exclude
# Validate that all specified services exist
invalid_services = (services_to_build | services_to_exclude) - available_services
if invalid_services:
raise ValueError(f"Unknown services specified: {invalid_services}")
to_deploy = list()
for service in services_to_build:
config = service_config[service]
if config.instances == None:
if config.docker_name in instances_to_hold:
continue
container = DockerContainer(config.docker_name, 0, config)
if len(partitions) == 0 or 0 in partitions:
to_deploy.append(container)
else:
for instance in range(1,config.instances + 1):
if config.docker_name in instances_to_hold:
continue
container_name = f"{config.docker_name}-{instance}"
if container_name in instances_to_hold:
continue
if len(partitions) == 0 or instance in partitions:
to_deploy.append(DockerContainer(container_name, instance, config))
return DeploymentPlan(
services_to_build=sorted(list(services_to_build)),
instances_to_deploy=sorted(to_deploy, key = lambda c : c.deploy_key())
)
def deploy_container(container: DockerContainer) -> None:
"""
Run a docker deployment for the specified service and target.
Raises BuildError if the build fails.
"""
print(f"Deploying {container.name}")
process = subprocess.Popen(
['docker', 'compose', '--progress', 'quiet', 'up', '-d', container.name],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True
)
# Stream output in real-time
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
print(output.rstrip())
return_code = process.poll()
if return_code != 0:
raise BuildError(container, return_code)
def deploy_services(containers: List[str]) -> None:
print(f"Deploying {containers}")
os.chdir(docker_dir)
for container in containers:
deploy_container(container)
def build_and_deploy(plan: DeploymentPlan, service_config: Dict[str, ServiceConfig]):
"""Execute the deployment plan"""
run_gradle_build([service_config[service].gradle_target for service in plan.services_to_build])
deploy_services(plan.instances_to_deploy)
def run_gradle_build(targets: str) -> None:
"""
Run a Gradle build for the specified target.
Raises BuildError if the build fails.
"""
print(f"\nBuilding targets {targets}")
process = subprocess.Popen(
['./gradlew', '-q'] + targets,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True
)
# Stream output in real-time
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
print(output.rstrip())
return_code = process.poll()
if return_code != 0:
raise BuildError(service, return_code)
# Example usage:
if __name__ == '__main__':
# Define service configuration
SERVICE_CONFIG = {
'search': ServiceConfig(
gradle_target=':code:services-application:search-service:docker',
docker_name='search-service',
instances=2,
deploy_tier=2,
groups={"all", "frontend", "core"}
),
'api': ServiceConfig(
gradle_target=':code:services-application:api-service:docker',
docker_name='api-service',
instances=2,
deploy_tier=1,
groups={"all", "core"}
),
'assistant': ServiceConfig(
gradle_target=':code:services-core:assistant-service:docker',
docker_name='assistant-service',
instances=2,
deploy_tier=2,
groups={"all", "core"}
),
'explorer': ServiceConfig(
gradle_target=':code:services-application:explorer-service:docker',
docker_name='explorer-service',
instances=None,
deploy_tier=1,
groups={"all", "extra"}
),
'dating': ServiceConfig(
gradle_target=':code:services-application:dating-service:docker',
docker_name='dating-service',
instances=None,
deploy_tier=1,
groups={"all", "extra"}
),
'index': ServiceConfig(
gradle_target=':code:services-core:index-service:docker',
docker_name='index-service',
instances=10,
deploy_tier=3,
groups={"all", "index"}
),
'executor': ServiceConfig(
gradle_target=':code:services-core:executor-service:docker',
docker_name='executor-service',
instances=10,
deploy_tier=3,
groups={"all", "executor"}
),
'control': ServiceConfig(
gradle_target=':code:services-core:control-service:docker',
docker_name='control-service',
instances=None,
deploy_tier=0,
groups={"all", "core"}
),
'query': ServiceConfig(
gradle_target=':code:services-core:query-service:docker',
docker_name='query-service',
instances=2,
deploy_tier=2,
groups={"all", "query"}
),
}
try:
parser = argparse.ArgumentParser(
prog='deployment.py',
description='Continuous Deployment helper')
parser.add_argument('-v', '--verify', help='Verify the tags are valid, if present', action='store_true')
parser.add_argument('-t', '--tag', help='Use the specified tag value instead of the head git tag starting with deploy-')
args = parser.parse_args()
tags = args.tag
if tags is None:
tags = get_deployment_tag()
else:
tags = tags.split(' ')
if tags != None:
print("Found deployment tags:", tags)
plan = parse_deployment_tags(tags, SERVICE_CONFIG)
print("\nDeployment Plan:")
print("Services to build:", plan.services_to_build)
print("Instances to deploy:", [container.name for container in plan.instances_to_deploy])
if not args.verify:
print("\nExecution Plan:")
build_and_deploy(plan, SERVICE_CONFIG)
else:
print("No tags found")
except ValueError as e:
print(f"Error: {e}")