1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 17:32:39 +02:00

Compare commits

...

13 Commits

Author SHA1 Message Date
Viktor Lofgren
4bb71b8439 (crawler) Correct content type probing to only run on URLs that are suspected to be binary 2024-12-26 14:26:23 +01:00
Viktor Lofgren
e4a41f7dd1 (crawler) Correct content type probing to only run on URLs that are suspected to be binary 2024-12-26 14:13:17 +01:00
Viktor
69ad6287b1 Update ROADMAP.md 2024-12-25 21:16:38 +00:00
Viktor Lofgren
41a59dcf45 (feed) Sanitize illegal HTML entities out of the feed XML before parsing 2024-12-25 14:53:28 +01:00
Viktor Lofgren
94d4d2edb7 (live-crawler) Add refresh date to feeds API
For now this is just the ctime for the feeds db.  We may want to store this per-record in the future.
2024-12-25 14:20:48 +01:00
Viktor Lofgren
7ae19a92ba (deploy) Improve deployment script to allow specification of partitions 2024-12-24 11:16:15 +01:00
Viktor Lofgren
56d14e56d7 (live-crawler) Improve LiveCrawlActor resilience to FeedService outages 2024-12-23 23:33:54 +01:00
Viktor Lofgren
a557c7ae7f (live-crawler) Limit concurrent accesses per domain using DomainLocks from main crawler 2024-12-23 23:31:03 +01:00
Viktor Lofgren
b66879ccb1 (feed) Add support for date discovery through atom:issued and atom:created
This is specifically to help parse monadnock.net's Atom feed.
2024-12-23 20:05:58 +01:00
Viktor Lofgren
f1b7157ca2 (deploy) Add basic linting ability to deployment script. 2024-12-23 16:21:29 +01:00
Viktor Lofgren
7622335e84 (deploy) Correct deploy script, set correct name for assistant 2024-12-23 15:59:02 +01:00
Viktor Lofgren
0da2047eae (live-capture) Correctly update processed count, disable poll rate adjustment based on freshness. 2024-12-23 15:56:27 +01:00
Viktor Lofgren
5ee4321110 (ci) Correct deploy script 2024-12-22 20:08:37 +01:00
13 changed files with 279 additions and 124 deletions

View File

@@ -21,7 +21,7 @@ word n-grams known beforehand. This limits the ability to interpret longer quer
The positions mask should be supplemented or replaced with a more accurate (e.g.) gamma coded positions The positions mask should be supplemented or replaced with a more accurate (e.g.) gamma coded positions
list, as is the civilized way of doing this. list, as is the civilized way of doing this.
Completed with PR https://github.com/MarginaliaSearch/MarginaliaSearch/pull/99 Completed with PR [#99](https://github.com/MarginaliaSearch/MarginaliaSearch/pull/99)
## Hybridize crawler w/ Common Crawl data ## Hybridize crawler w/ Common Crawl data
@@ -41,6 +41,12 @@ The search engine has a bit of a problem showing spicy content mixed in with the
to have a way to filter this out. It's likely something like a URL blacklist (e.g. [UT1](https://dsi.ut-capitole.fr/blacklists/index_en.php) ) to have a way to filter this out. It's likely something like a URL blacklist (e.g. [UT1](https://dsi.ut-capitole.fr/blacklists/index_en.php) )
combined with naive bayesian filter would go a long way, or something more sophisticated...? combined with naive bayesian filter would go a long way, or something more sophisticated...?
## Web Design Overhaul
The design is kinda clunky and hard to maintain, and needlessly outdated-looking.
In progress: PR [#127](https://github.com/MarginaliaSearch/MarginaliaSearch/pull/127) -- demo available at https://test.marginalia.nu/
## Additional Language Support ## Additional Language Support
It would be desirable if the search engine supported more languages than English. This is partially about It would be desirable if the search engine supported more languages than English. This is partially about
@@ -56,7 +62,7 @@ it should be extended to all domains. It would also be interesting to offer sea
RSS data itself, or use the RSS set to feed a special live index that updates faster than the RSS data itself, or use the RSS set to feed a special live index that updates faster than the
main dataset. main dataset.
Completed with PR [#122](https://github.com/MarginaliaSearch/MarginaliaSearch/pull/122) Completed with PR [#122](https://github.com/MarginaliaSearch/MarginaliaSearch/pull/122) and PR [#125](https://github.com/MarginaliaSearch/MarginaliaSearch/pull/125)
## Support for binary formats like PDF ## Support for binary formats like PDF

View File

@@ -50,12 +50,18 @@ public class LiveCrawlActor extends RecordActorPrototype {
yield new Monitor("-"); yield new Monitor("-");
} }
case Monitor(String feedsHash) -> { case Monitor(String feedsHash) -> {
// Sleep initially in case this is during start-up
for (;;) { for (;;) {
String currentHash = feedsClient.getFeedDataHash(); try {
if (!Objects.equals(currentHash, feedsHash)) { Thread.sleep(Duration.ofMinutes(15));
yield new LiveCrawl(currentHash); String currentHash = feedsClient.getFeedDataHash();
if (!Objects.equals(currentHash, feedsHash)) {
yield new LiveCrawl(currentHash);
}
}
catch (RuntimeException ex) {
logger.error("Failed to fetch feed data hash");
} }
Thread.sleep(Duration.ofMinutes(15));
} }
} }
case LiveCrawl(String feedsHash, long msgId) when msgId < 0 -> { case LiveCrawl(String feedsHash, long msgId) when msgId < 0 -> {

View File

@@ -59,12 +59,6 @@ public class FeedsClient {
.forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList()))); .forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList())));
} }
public record UpdatedDomain(String domain, List<String> urls) {
public UpdatedDomain(RpcUpdatedLinksResponse rsp) {
this(rsp.getDomain(), new ArrayList<>(rsp.getUrlList()));
}
}
/** Get the hash of the feed data, for identifying when the data has been updated */ /** Get the hash of the feed data, for identifying when the data has been updated */
public String getFeedDataHash() { public String getFeedDataHash() {
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash) return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash)

View File

@@ -46,6 +46,7 @@ message RpcFeed {
string feedUrl = 3; string feedUrl = 3;
string updated = 4; string updated = 4;
repeated RpcFeedItem items = 5; repeated RpcFeedItem items = 5;
int64 fetchTimestamp = 6;
} }
message RpcFeedItem { message RpcFeedItem {

View File

@@ -12,9 +12,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.PosixFileAttributes;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.time.Instant; import java.time.Instant;
import java.util.Base64; import java.util.Base64;
@@ -209,4 +211,20 @@ public class FeedDb {
reader.getLinksUpdatedSince(since, consumer); reader.getLinksUpdatedSince(since, consumer);
} }
public Instant getFetchTime() {
if (!Files.exists(readerDbPath)) {
return Instant.ofEpochMilli(0);
}
try {
return Files.readAttributes(readerDbPath, PosixFileAttributes.class)
.creationTime()
.toInstant();
}
catch (IOException ex) {
logger.error("Failed to read the creatiom time of {}", readerDbPath);
return Instant.ofEpochMilli(0);
}
}
} }

View File

@@ -38,7 +38,6 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction; import java.util.function.BiFunction;
@@ -74,6 +73,17 @@ public class FeedFetcherService {
this.nodeConfigurationService = nodeConfigurationService; this.nodeConfigurationService = nodeConfigurationService;
this.serviceHeartbeat = serviceHeartbeat; this.serviceHeartbeat = serviceHeartbeat;
this.executorClient = executorClient; this.executorClient = executorClient;
// Add support for some alternate date tags for atom
rssReader.addItemExtension("issued", this::setDateFallback);
rssReader.addItemExtension("created", this::setDateFallback);
}
private void setDateFallback(Item item, String value) {
if (item.getPubDate().isEmpty()) {
item.setPubDate(value);
}
} }
public enum UpdateMode { public enum UpdateMode {
@@ -124,51 +134,57 @@ public class FeedFetcherService {
for (var feed : definitions) { for (var feed : definitions) {
executor.submitQuietly(() -> { executor.submitQuietly(() -> {
var oldData = feedDb.getFeed(new EdgeDomain(feed.domain())); try {
var oldData = feedDb.getFeed(new EdgeDomain(feed.domain()));
// If we have existing data, we might skip updating it with a probability that increases with time, // If we have existing data, we might skip updating it with a probability that increases with time,
// this is to avoid hammering the feeds that are updated very rarely and save some time and resources // this is to avoid hammering the feeds that are updated very rarely and save some time and resources
// on our end // on our end
if (!oldData.isEmpty()) { /* Disable for now:
Duration duration = feed.durationSinceUpdated();
long daysSinceUpdate = duration.toDays(); if (!oldData.isEmpty()) {
Duration duration = feed.durationSinceUpdated();
long daysSinceUpdate = duration.toDays();
if (deterministic || (daysSinceUpdate > 2 && ThreadLocalRandom.current() if (deterministic || (daysSinceUpdate > 2 && ThreadLocalRandom.current()
.nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1)) .nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1)) {
{ // Skip updating this feed, just write the old data back instead
// Skip updating this feed, just write the old data back instead
writer.saveFeed(oldData);
return;
}
}
FetchResult feedData;
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
feedData = fetchFeedData(feed, client);
}
catch (Exception ex) {
feedData = new FetchResult.TransientError();
}
switch (feedData) {
case FetchResult.Success(String value) -> writer.saveFeed(parseFeed(value, feed));
case FetchResult.TransientError() -> {
int errorCount = errorCounts.getOrDefault(feed.domain().toLowerCase(), 0);
writer.setErrorCount(feed.domain().toLowerCase(), ++errorCount);
if (errorCount < 5) {
// Permit the server a few days worth of retries before we drop the feed entirely
writer.saveFeed(oldData); writer.saveFeed(oldData);
return;
} }
} }
case FetchResult.PermanentError() -> {} // let the definition be forgotten about */
}
if ((definitionsUpdated.incrementAndGet() % 1_000) == 0) { FetchResult feedData;
// Update the progress every 1k feeds, to avoid hammering the database and flooding the logs try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
heartbeat.progress("Updated " + definitionsUpdated + "/" + totalDefinitions + " feeds", definitionsUpdated.get(), totalDefinitions); feedData = fetchFeedData(feed, client);
} catch (Exception ex) {
feedData = new FetchResult.TransientError();
}
switch (feedData) {
case FetchResult.Success(String value) -> writer.saveFeed(parseFeed(value, feed));
case FetchResult.TransientError() -> {
int errorCount = errorCounts.getOrDefault(feed.domain().toLowerCase(), 0);
writer.setErrorCount(feed.domain().toLowerCase(), ++errorCount);
if (errorCount < 5) {
// Permit the server a few days worth of retries before we drop the feed entirely
writer.saveFeed(oldData);
}
}
case FetchResult.PermanentError() -> {
} // let the definition be forgotten about
}
}
finally {
if ((definitionsUpdated.incrementAndGet() % 1_000) == 0) {
// Update the progress every 1k feeds, to avoid hammering the database and flooding the logs
heartbeat.progress("Updated " + definitionsUpdated + "/" + totalDefinitions + " feeds", definitionsUpdated.get(), totalDefinitions);
}
} }
}); });
} }
@@ -300,6 +316,8 @@ public class FeedFetcherService {
public FeedItems parseFeed(String feedData, FeedDefinition definition) { public FeedItems parseFeed(String feedData, FeedDefinition definition) {
try { try {
feedData = sanitizeEntities(feedData);
List<Item> rawItems = rssReader.read( List<Item> rawItems = rssReader.read(
// Massage the data to maximize the possibility of the flaky XML parser consuming it // Massage the data to maximize the possibility of the flaky XML parser consuming it
new BOMInputStream(new ByteArrayInputStream(feedData.trim().getBytes(StandardCharsets.UTF_8)), false) new BOMInputStream(new ByteArrayInputStream(feedData.trim().getBytes(StandardCharsets.UTF_8)), false)
@@ -326,6 +344,32 @@ public class FeedFetcherService {
} }
} }
private static final Map<String, String> HTML_ENTITIES = Map.of(
"&raquo;", "»",
"&laquo;", "«",
"&mdash;", "--",
"&ndash;", "-",
"&rsquo;", "'",
"&lsquo;", "'",
"&nbsp;", ""
);
/** The XML parser will blow up if you insert HTML entities in the feed XML,
* which is unfortunately relatively common. Replace them as far as is possible
* with their corresponding characters
*/
static String sanitizeEntities(String feedData) {
String result = feedData;
for (Map.Entry<String, String> entry : HTML_ENTITIES.entrySet()) {
result = result.replace(entry.getKey(), entry.getValue());
}
// Handle lone ampersands not part of a recognized XML entity
result = result.replaceAll("&(?!(amp|lt|gt|apos|quot);)", "&amp;");
return result;
}
/** Decide whether to keep URI fragments in the feed items. /** Decide whether to keep URI fragments in the feed items.
* <p></p> * <p></p>
* We keep fragments if there are multiple different fragments in the items. * We keep fragments if there are multiple different fragments in the items.
@@ -361,7 +405,7 @@ public class FeedFetcherService {
return seenFragments.size() > 1; return seenFragments.size() > 1;
} }
private static class IsFeedItemDateValid implements Predicate<FeedItem> { static class IsFeedItemDateValid implements Predicate<FeedItem> {
private final String today = ZonedDateTime.now().format(DateTimeFormatter.ISO_ZONED_DATE_TIME); private final String today = ZonedDateTime.now().format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
public boolean test(FeedItem item) { public boolean test(FeedItem item) {

View File

@@ -107,8 +107,7 @@ public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements Dis
@Override @Override
public void getFeed(RpcDomainId request, public void getFeed(RpcDomainId request,
StreamObserver<RpcFeed> responseObserver) StreamObserver<RpcFeed> responseObserver) {
{
if (!feedDb.isEnabled()) { if (!feedDb.isEnabled()) {
responseObserver.onError(new IllegalStateException("Feed database is disabled on this node")); responseObserver.onError(new IllegalStateException("Feed database is disabled on this node"));
return; return;
@@ -126,7 +125,8 @@ public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements Dis
.setDomainId(request.getDomainId()) .setDomainId(request.getDomainId())
.setDomain(domainName.get().toString()) .setDomain(domainName.get().toString())
.setFeedUrl(feedItems.feedUrl()) .setFeedUrl(feedItems.feedUrl())
.setUpdated(feedItems.updated()); .setUpdated(feedItems.updated())
.setFetchTimestamp(feedDb.getFetchTime().toEpochMilli());
for (var item : feedItems.items()) { for (var item : feedItems.items()) {
retB.addItemsBuilder() retB.addItemsBuilder()

View File

@@ -99,7 +99,9 @@ class FeedFetcherServiceTest extends AbstractModule {
feedFetcherService.setDeterministic(); feedFetcherService.setDeterministic();
feedFetcherService.updateFeeds(FeedFetcherService.UpdateMode.REFRESH); feedFetcherService.updateFeeds(FeedFetcherService.UpdateMode.REFRESH);
Assertions.assertFalse(feedDb.getFeed(new EdgeDomain("www.marginalia.nu")).isEmpty()); var result = feedDb.getFeed(new EdgeDomain("www.marginalia.nu"));
System.out.println(result);
Assertions.assertFalse(result.isEmpty());
} }
@Tag("flaky") @Tag("flaky")

View File

@@ -0,0 +1,26 @@
package nu.marginalia.rss.svc;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class TestXmlSanitization {
@Test
public void testPreservedEntities() {
Assertions.assertEquals("&amp;", FeedFetcherService.sanitizeEntities("&amp;"));
Assertions.assertEquals("&lt;", FeedFetcherService.sanitizeEntities("&lt;"));
Assertions.assertEquals("&gt;", FeedFetcherService.sanitizeEntities("&gt;"));
Assertions.assertEquals("&quot;", FeedFetcherService.sanitizeEntities("&quot;"));
Assertions.assertEquals("&apos;", FeedFetcherService.sanitizeEntities("&apos;"));
}
@Test
public void testStrayAmpersand() {
Assertions.assertEquals("Bed &amp; Breakfast", FeedFetcherService.sanitizeEntities("Bed & Breakfast"));
}
@Test
public void testTranslatedHtmlEntity() {
Assertions.assertEquals("Foo -- Bar", FeedFetcherService.sanitizeEntities("Foo &mdash; Bar"));
}
}

View File

@@ -139,7 +139,7 @@ public class HttpFetcherImpl implements HttpFetcher {
public ContentTypeProbeResult probeContentType(EdgeUrl url, public ContentTypeProbeResult probeContentType(EdgeUrl url,
WarcRecorder warcRecorder, WarcRecorder warcRecorder,
ContentTags tags) throws RateLimitException { ContentTags tags) throws RateLimitException {
if (tags.isEmpty()) { if (tags.isEmpty() && contentTypeLogic.isUrlLikeBinary(url)) {
var headBuilder = new Request.Builder().head() var headBuilder = new Request.Builder().head()
.addHeader("User-agent", userAgentString) .addHeader("User-agent", userAgentString)
.addHeader("Accept-Encoding", "gzip") .addHeader("Accept-Encoding", "gzip")

View File

@@ -42,24 +42,24 @@ class ContentTypeProberTest {
port = r.nextInt(10000) + 8000; port = r.nextInt(10000) + 8000;
server = HttpServer.create(new InetSocketAddress("127.0.0.1", port), 10); server = HttpServer.create(new InetSocketAddress("127.0.0.1", port), 10);
server.createContext("/html", exchange -> { server.createContext("/html.gz", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "text/html"); exchange.getResponseHeaders().add("Content-Type", "text/html");
exchange.sendResponseHeaders(200, -1); exchange.sendResponseHeaders(200, -1);
exchange.close(); exchange.close();
}); });
server.createContext("/redir", exchange -> { server.createContext("/redir.gz", exchange -> {
exchange.getResponseHeaders().add("Location", "/html"); exchange.getResponseHeaders().add("Location", "/html.gz");
exchange.sendResponseHeaders(301, -1); exchange.sendResponseHeaders(301, -1);
exchange.close(); exchange.close();
}); });
server.createContext("/bin", exchange -> { server.createContext("/bin.gz", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "application/binary"); exchange.getResponseHeaders().add("Content-Type", "application/binary");
exchange.sendResponseHeaders(200, -1); exchange.sendResponseHeaders(200, -1);
exchange.close(); exchange.close();
}); });
server.createContext("/timeout", exchange -> { server.createContext("/timeout.gz", exchange -> {
try { try {
Thread.sleep(15_000); Thread.sleep(15_000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@@ -73,10 +73,10 @@ class ContentTypeProberTest {
server.start(); server.start();
htmlEndpoint = EdgeUrl.parse("http://localhost:" + port + "/html").get(); htmlEndpoint = EdgeUrl.parse("http://localhost:" + port + "/html.gz").get();
binaryEndpoint = EdgeUrl.parse("http://localhost:" + port + "/bin").get(); binaryEndpoint = EdgeUrl.parse("http://localhost:" + port + "/bin.gz").get();
timeoutEndpoint = EdgeUrl.parse("http://localhost:" + port + "/timeout").get(); timeoutEndpoint = EdgeUrl.parse("http://localhost:" + port + "/timeout.gz").get();
htmlRedirEndpoint = EdgeUrl.parse("http://localhost:" + port + "/redir").get(); htmlRedirEndpoint = EdgeUrl.parse("http://localhost:" + port + "/redir.gz").get();
fetcher = new HttpFetcherImpl("test"); fetcher = new HttpFetcherImpl("test");
recorder = new WarcRecorder(warcFile); recorder = new WarcRecorder(warcFile);

View File

@@ -4,6 +4,7 @@ import crawlercommons.robots.SimpleRobotRules;
import crawlercommons.robots.SimpleRobotRulesParser; import crawlercommons.robots.SimpleRobotRulesParser;
import nu.marginalia.WmsaHome; import nu.marginalia.WmsaHome;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.logic.DomainLocks;
import nu.marginalia.crawl.retreival.CrawlDelayTimer; import nu.marginalia.crawl.retreival.CrawlDelayTimer;
import nu.marginalia.db.DbDomainQueries; import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.db.DomainBlacklist; import nu.marginalia.db.DomainBlacklist;
@@ -40,6 +41,7 @@ public class SimpleLinkScraper implements AutoCloseable {
private final DomainBlacklist domainBlacklist; private final DomainBlacklist domainBlacklist;
private final Duration connectTimeout = Duration.ofSeconds(10); private final Duration connectTimeout = Duration.ofSeconds(10);
private final Duration readTimeout = Duration.ofSeconds(10); private final Duration readTimeout = Duration.ofSeconds(10);
private final DomainLocks domainLocks = new DomainLocks();
public SimpleLinkScraper(LiveCrawlDataSet dataSet, public SimpleLinkScraper(LiveCrawlDataSet dataSet,
DbDomainQueries domainQueries, DbDomainQueries domainQueries,
@@ -65,7 +67,9 @@ public class SimpleLinkScraper implements AutoCloseable {
.connectTimeout(connectTimeout) .connectTimeout(connectTimeout)
.followRedirects(HttpClient.Redirect.NEVER) .followRedirects(HttpClient.Redirect.NEVER)
.version(HttpClient.Version.HTTP_2) .version(HttpClient.Version.HTTP_2)
.build()) { .build();
DomainLocks.DomainLock lock = domainLocks.lockDomain(domain) // throttle concurrent access per domain; do not remove
) {
EdgeUrl rootUrl = domain.toRootUrlHttps(); EdgeUrl rootUrl = domain.toRootUrlHttps();

View File

@@ -1,6 +1,7 @@
from dataclasses import dataclass from dataclasses import dataclass
import subprocess, os import subprocess, os
from typing import List, Set, Dict, Optional from typing import List, Set, Dict, Optional
import argparse
build_dir = "/app/search.marginalia.nu/build" build_dir = "/app/search.marginalia.nu/build"
docker_dir = "/app/search.marginalia.nu/docker" docker_dir = "/app/search.marginalia.nu/docker"
@@ -12,11 +13,12 @@ class ServiceConfig:
docker_name: str docker_name: str
instances: int | None instances: int | None
deploy_tier: int deploy_tier: int
groups: Set[str]
@dataclass @dataclass
class DeploymentPlan: class DeploymentPlan:
services_to_build: List[str] services_to_build: List[str]
instances_to_hold: Set[str] instances_to_deploy: Set[str]
@dataclass @dataclass
class DockerContainer: class DockerContainer:
@@ -72,24 +74,49 @@ def parse_deployment_tags(
instances_to_hold = set() instances_to_hold = set()
available_services = set(service_config.keys()) available_services = set(service_config.keys())
available_groups = set()
partitions = set()
for service in service_config.values():
available_groups = available_groups | service.groups
for tag in [tag.strip() for tag in tag_messages]: for tag in [tag.strip() for tag in tag_messages]:
if tag.startswith('partition:'):
for p in tag[10:].strip().split(','):
partitions.add(int(p))
if tag.startswith('deploy:'): if tag.startswith('deploy:'):
parts = tag[7:].strip().split(',') parts = tag[7:].strip().split(',')
for part in parts: for part in parts:
part = part.strip() part = part.strip()
if part == 'all':
services_to_build.update(available_services) if part.startswith('-'):
elif part.startswith('-'): service = part[1:]
services_to_exclude.add(part[1:]) if not service in available_services:
raise ValueError(f"Unknown service {service}")
services_to_exclude.add(service)
elif part.startswith('+'): elif part.startswith('+'):
services_to_build.add(part[1:]) service = part[1:]
if not service in available_services:
raise ValueError(f"Unknown service {service}")
services_to_build.add(service)
else:
group = part
if not group in available_groups:
raise ValueError(f"Unknown service group {group}")
for name, service in service_config.items():
if group in service.groups:
services_to_build.add(name)
elif tag.startswith('hold:'): elif tag.startswith('hold:'):
instances = tag[5:].strip().split(',') instances = tag[5:].strip().split(',')
instances_to_hold.update(i.strip() for i in instances if i.strip()) instances_to_hold.update(i.strip() for i in instances if i.strip())
print(partitions)
# Remove any explicitly excluded services # Remove any explicitly excluded services
services_to_build = services_to_build - services_to_exclude services_to_build = services_to_build - services_to_exclude
@@ -98,9 +125,32 @@ def parse_deployment_tags(
if invalid_services: if invalid_services:
raise ValueError(f"Unknown services specified: {invalid_services}") raise ValueError(f"Unknown services specified: {invalid_services}")
to_deploy = list()
for service in services_to_build:
config = service_config[service]
if config.instances == None:
if config.docker_name in instances_to_hold:
continue
container = DockerContainer(config.docker_name, 0, config)
if len(partitions) == 0 or 0 in partitions:
to_deploy.append(container)
else:
for instance in range(1,config.instances + 1):
if config.docker_name in instances_to_hold:
continue
container_name = f"{config.docker_name}-{instance}"
if container_name in instances_to_hold:
continue
if len(partitions) == 0 or instance in partitions:
to_deploy.append(DockerContainer(container_name, instance, config))
return DeploymentPlan( return DeploymentPlan(
services_to_build=sorted(list(services_to_build)), services_to_build=sorted(list(services_to_build)),
instances_to_hold=instances_to_hold instances_to_deploy=sorted(to_deploy, key = lambda c : c.deploy_key())
) )
@@ -118,6 +168,7 @@ def deploy_container(container: DockerContainer) -> None:
text=True text=True
) )
# Stream output in real-time # Stream output in real-time
while True: while True:
output = process.stdout.readline() output = process.stdout.readline()
@@ -131,49 +182,27 @@ def deploy_container(container: DockerContainer) -> None:
raise BuildError(container, return_code) raise BuildError(container, return_code)
def deploy_services(containers: List[str]) -> None: def deploy_services(containers: List[str]) -> None:
cwd = os.getcwd() print(f"Deploying {containers}")
os.chdir(docker_dir) os.chdir(docker_dir)
for container in containers: for container in containers:
deploy_container(container) deploy_container(container)
def build_and_deploy(plan: DeploymentPlan, service_config: Dict[str, ServiceConfig]): def build_and_deploy(plan: DeploymentPlan, service_config: Dict[str, ServiceConfig]):
"""Execute the deployment plan""" """Execute the deployment plan"""
for service in plan.services_to_build: run_gradle_build([service_config[service].gradle_target for service in plan.services_to_build])
config = service_config[service]
print(f"Building {service}:")
run_gradle_build(service, config.gradle_target)
to_deploy = list() deploy_services(plan.instances_to_deploy)
for service in plan.services_to_build:
config = service_config[service]
if config.instances == None:
if config.docker_name in plan.instances_to_hold:
continue
container = DockerContainer(config.docker_name, 0, config)
to_deploy.append(container)
else:
for instance in range(1,config.instances + 1):
container_name = f"{config.docker_name}-{instance}"
if container_name in plan.instances_to_hold:
continue
to_deploy.append(DockerContainer(container_name, instance, config))
to_deploy = sorted(to_deploy, key = lambda c : c.deploy_key())
deploy_services(to_deploy)
def run_gradle_build(service: str, target: str) -> None: def run_gradle_build(targets: str) -> None:
""" """
Run a Gradle build for the specified service and target. Run a Gradle build for the specified target.
Raises BuildError if the build fails. Raises BuildError if the build fails.
""" """
print(f"\nBuilding {service} with target {target}") print(f"\nBuilding targets {targets}")
process = subprocess.Popen( process = subprocess.Popen(
['./gradlew', '-q', target], ['./gradlew', '-q'] + targets,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
text=True text=True
@@ -199,73 +228,98 @@ if __name__ == '__main__':
gradle_target=':code:services-application:search-service:docker', gradle_target=':code:services-application:search-service:docker',
docker_name='search-service', docker_name='search-service',
instances=2, instances=2,
deploy_tier=2 deploy_tier=2,
groups={"all", "frontend", "core"}
), ),
'api': ServiceConfig( 'api': ServiceConfig(
gradle_target=':code:services-application:api-service:docker', gradle_target=':code:services-application:api-service:docker',
docker_name='api-service', docker_name='api-service',
instances=2, instances=2,
deploy_tier=1 deploy_tier=1,
groups={"all", "core"}
), ),
'api': ServiceConfig( 'assistant': ServiceConfig(
gradle_target=':code:services-core:assistant-service:docker', gradle_target=':code:services-core:assistant-service:docker',
docker_name='assistant-service', docker_name='assistant-service',
instances=2, instances=2,
deploy_tier=2 deploy_tier=2,
groups={"all", "core"}
), ),
'explorer': ServiceConfig( 'explorer': ServiceConfig(
gradle_target=':code:services-application:explorer-service:docker', gradle_target=':code:services-application:explorer-service:docker',
docker_name='explorer-service', docker_name='explorer-service',
instances=1, instances=None,
deploy_tier=1 deploy_tier=1,
groups={"all", "extra"}
), ),
'dating': ServiceConfig( 'dating': ServiceConfig(
gradle_target=':code:services-application:dating-service:docker', gradle_target=':code:services-application:dating-service:docker',
docker_name='dating-service', docker_name='dating-service',
instances=1, instances=None,
deploy_tier=1 deploy_tier=1,
groups={"all", "extra"}
), ),
'index': ServiceConfig( 'index': ServiceConfig(
gradle_target=':code:services-core:index-service:docker', gradle_target=':code:services-core:index-service:docker',
docker_name='index-service', docker_name='index-service',
instances=10, instances=10,
deploy_tier=3 deploy_tier=3,
groups={"all", "index"}
), ),
'executor': ServiceConfig( 'executor': ServiceConfig(
gradle_target=':code:services-core:executor-service:docker', gradle_target=':code:services-core:executor-service:docker',
docker_name='executor-service', docker_name='executor-service',
instances=10, instances=10,
deploy_tier=3 deploy_tier=3,
groups={"all", "executor"}
), ),
'control': ServiceConfig( 'control': ServiceConfig(
gradle_target=':code:services-core:control-service:docker', gradle_target=':code:services-core:control-service:docker',
docker_name='control-service', docker_name='control-service',
instances=None, instances=None,
deploy_tier=0 deploy_tier=0,
groups={"all", "core"}
), ),
'query': ServiceConfig( 'query': ServiceConfig(
gradle_target=':code:services-core:query-service:docker', gradle_target=':code:services-core:query-service:docker',
docker_name='query-service', docker_name='query-service',
instances=2, instances=2,
deploy_tier=2 deploy_tier=2,
groups={"all", "query"}
), ),
} }
try: try:
tags = get_deployment_tag() parser = argparse.ArgumentParser(
if tags == None: prog='deployment.py',
exit description='Continuous Deployment helper')
parser.add_argument('-v', '--verify', help='Verify the tags are valid, if present', action='store_true')
parser.add_argument('-t', '--tag', help='Use the specified tag value instead of the head git tag starting with deploy-')
print(tags) args = parser.parse_args()
tags = args.tag
if tags is None:
tags = get_deployment_tag()
else:
tags = tags.split(' ')
plan = parse_deployment_tags(tags, SERVICE_CONFIG)
print("\nDeployment Plan:")
print("Services to build:", plan.services_to_build)
print("Instances to hold:", plan.instances_to_hold)
print("\nExecution Plan:")
build_and_deploy(plan, SERVICE_CONFIG) if tags != None:
print("Found deployment tags:", tags)
plan = parse_deployment_tags(tags, SERVICE_CONFIG)
print("\nDeployment Plan:")
print("Services to build:", plan.services_to_build)
print("Instances to deploy:", [container.name for container in plan.instances_to_deploy])
if not args.verify:
print("\nExecution Plan:")
build_and_deploy(plan, SERVICE_CONFIG)
else:
print("No tags found")
except ValueError as e: except ValueError as e:
print(f"Error: {e}") print(f"Error: {e}")