mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 07:32:38 +02:00
Compare commits
16 Commits
deploy-exe
...
deploy-000
Author | SHA1 | Date | |
---|---|---|---|
|
94d4d2edb7 | ||
|
7ae19a92ba | ||
|
56d14e56d7 | ||
|
a557c7ae7f | ||
|
b66879ccb1 | ||
|
f1b7157ca2 | ||
|
7622335e84 | ||
|
0da2047eae | ||
|
5ee4321110 | ||
|
9459b9933b | ||
|
87fb564f89 | ||
|
5ca8523220 | ||
|
1118657ffd | ||
|
b1f970152d | ||
|
e1783891ab | ||
|
64d32471dd |
@@ -6,6 +6,9 @@ import nu.marginalia.service.ServiceId;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.NetworkInterface;
|
||||
import java.util.Enumeration;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
|
||||
@@ -69,6 +72,17 @@ public class ServiceConfigurationModule extends AbstractModule {
|
||||
return configuredValue;
|
||||
}
|
||||
|
||||
if (Boolean.getBoolean("system.multiFace")) {
|
||||
try {
|
||||
String localNetworkIp = getLocalNetworkIP();
|
||||
if (null != localNetworkIp) {
|
||||
return localNetworkIp;
|
||||
}
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.warn("Failed to get local network IP", ex);
|
||||
}
|
||||
}
|
||||
// If we're in docker, we'll use the hostname
|
||||
if (Boolean.getBoolean("service.useDockerHostname")) {
|
||||
return System.getenv("HOSTNAME");
|
||||
@@ -84,10 +98,41 @@ public class ServiceConfigurationModule extends AbstractModule {
|
||||
private String getBindAddress() {
|
||||
String configuredValue = System.getProperty("service.bind-address");
|
||||
if (configuredValue != null) {
|
||||
logger.info("Using configured bind address {}", configuredValue);
|
||||
return configuredValue;
|
||||
}
|
||||
|
||||
return "127.0.0.1";
|
||||
if (Boolean.getBoolean("system.multiFace")) {
|
||||
try {
|
||||
return Objects.requireNonNullElse(getLocalNetworkIP(), "0.0.0.0");
|
||||
} catch (Exception ex) {
|
||||
logger.warn("Failed to get local network IP, falling back to bind to 0.0.0.0", ex);
|
||||
return "0.0.0.0";
|
||||
}
|
||||
}
|
||||
else {
|
||||
return "0.0.0.0";
|
||||
}
|
||||
}
|
||||
|
||||
public static String getLocalNetworkIP() throws Exception {
|
||||
Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
|
||||
|
||||
while (nets.hasMoreElements()) {
|
||||
NetworkInterface netif = nets.nextElement();
|
||||
if (!netif.isUp() || netif.isLoopback()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Enumeration<InetAddress> inetAddresses = netif.getInetAddresses();
|
||||
while (inetAddresses.hasMoreElements()) {
|
||||
InetAddress addr = inetAddresses.nextElement();
|
||||
if (addr.isSiteLocalAddress() && !addr.isLoopbackAddress()) {
|
||||
return addr.getHostAddress();
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -7,6 +7,8 @@ import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
public class MetricsServer {
|
||||
|
||||
@Inject
|
||||
@@ -15,7 +17,8 @@ public class MetricsServer {
|
||||
if (configuration.metricsPort() < 0)
|
||||
return;
|
||||
|
||||
Server server = new Server(configuration.metricsPort());
|
||||
Server server = new Server(new InetSocketAddress(configuration.bindAddress(), configuration.metricsPort()));
|
||||
|
||||
ServletContextHandler context = new ServletContextHandler();
|
||||
context.setContextPath("/");
|
||||
server.setHandler(context);
|
||||
|
@@ -50,12 +50,18 @@ public class LiveCrawlActor extends RecordActorPrototype {
|
||||
yield new Monitor("-");
|
||||
}
|
||||
case Monitor(String feedsHash) -> {
|
||||
// Sleep initially in case this is during start-up
|
||||
for (;;) {
|
||||
String currentHash = feedsClient.getFeedDataHash();
|
||||
if (!Objects.equals(currentHash, feedsHash)) {
|
||||
yield new LiveCrawl(currentHash);
|
||||
try {
|
||||
Thread.sleep(Duration.ofMinutes(15));
|
||||
String currentHash = feedsClient.getFeedDataHash();
|
||||
if (!Objects.equals(currentHash, feedsHash)) {
|
||||
yield new LiveCrawl(currentHash);
|
||||
}
|
||||
}
|
||||
catch (RuntimeException ex) {
|
||||
logger.error("Failed to fetch feed data hash");
|
||||
}
|
||||
Thread.sleep(Duration.ofMinutes(15));
|
||||
}
|
||||
}
|
||||
case LiveCrawl(String feedsHash, long msgId) when msgId < 0 -> {
|
||||
|
@@ -59,12 +59,6 @@ public class FeedsClient {
|
||||
.forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList())));
|
||||
}
|
||||
|
||||
public record UpdatedDomain(String domain, List<String> urls) {
|
||||
public UpdatedDomain(RpcUpdatedLinksResponse rsp) {
|
||||
this(rsp.getDomain(), new ArrayList<>(rsp.getUrlList()));
|
||||
}
|
||||
}
|
||||
|
||||
/** Get the hash of the feed data, for identifying when the data has been updated */
|
||||
public String getFeedDataHash() {
|
||||
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash)
|
||||
|
@@ -46,6 +46,7 @@ message RpcFeed {
|
||||
string feedUrl = 3;
|
||||
string updated = 4;
|
||||
repeated RpcFeedItem items = 5;
|
||||
int64 fetchTimestamp = 6;
|
||||
}
|
||||
|
||||
message RpcFeedItem {
|
||||
|
@@ -12,9 +12,11 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.nio.file.attribute.PosixFileAttributes;
|
||||
import java.security.MessageDigest;
|
||||
import java.time.Instant;
|
||||
import java.util.Base64;
|
||||
@@ -209,4 +211,20 @@ public class FeedDb {
|
||||
|
||||
reader.getLinksUpdatedSince(since, consumer);
|
||||
}
|
||||
|
||||
public Instant getFetchTime() {
|
||||
if (!Files.exists(readerDbPath)) {
|
||||
return Instant.ofEpochMilli(0);
|
||||
}
|
||||
|
||||
try {
|
||||
return Files.readAttributes(readerDbPath, PosixFileAttributes.class)
|
||||
.creationTime()
|
||||
.toInstant();
|
||||
}
|
||||
catch (IOException ex) {
|
||||
logger.error("Failed to read the creatiom time of {}", readerDbPath);
|
||||
return Instant.ofEpochMilli(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -38,7 +38,6 @@ import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BiFunction;
|
||||
@@ -74,6 +73,17 @@ public class FeedFetcherService {
|
||||
this.nodeConfigurationService = nodeConfigurationService;
|
||||
this.serviceHeartbeat = serviceHeartbeat;
|
||||
this.executorClient = executorClient;
|
||||
|
||||
|
||||
// Add support for some alternate date tags for atom
|
||||
rssReader.addItemExtension("issued", this::setDateFallback);
|
||||
rssReader.addItemExtension("created", this::setDateFallback);
|
||||
}
|
||||
|
||||
private void setDateFallback(Item item, String value) {
|
||||
if (item.getPubDate().isEmpty()) {
|
||||
item.setPubDate(value);
|
||||
}
|
||||
}
|
||||
|
||||
public enum UpdateMode {
|
||||
@@ -124,51 +134,57 @@ public class FeedFetcherService {
|
||||
|
||||
for (var feed : definitions) {
|
||||
executor.submitQuietly(() -> {
|
||||
var oldData = feedDb.getFeed(new EdgeDomain(feed.domain()));
|
||||
try {
|
||||
var oldData = feedDb.getFeed(new EdgeDomain(feed.domain()));
|
||||
|
||||
// If we have existing data, we might skip updating it with a probability that increases with time,
|
||||
// this is to avoid hammering the feeds that are updated very rarely and save some time and resources
|
||||
// on our end
|
||||
// If we have existing data, we might skip updating it with a probability that increases with time,
|
||||
// this is to avoid hammering the feeds that are updated very rarely and save some time and resources
|
||||
// on our end
|
||||
|
||||
if (!oldData.isEmpty()) {
|
||||
Duration duration = feed.durationSinceUpdated();
|
||||
long daysSinceUpdate = duration.toDays();
|
||||
/* Disable for now:
|
||||
|
||||
if (!oldData.isEmpty()) {
|
||||
Duration duration = feed.durationSinceUpdated();
|
||||
long daysSinceUpdate = duration.toDays();
|
||||
|
||||
|
||||
if (deterministic || (daysSinceUpdate > 2 && ThreadLocalRandom.current()
|
||||
.nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1))
|
||||
{
|
||||
// Skip updating this feed, just write the old data back instead
|
||||
writer.saveFeed(oldData);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
FetchResult feedData;
|
||||
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
|
||||
feedData = fetchFeedData(feed, client);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
feedData = new FetchResult.TransientError();
|
||||
}
|
||||
|
||||
switch (feedData) {
|
||||
case FetchResult.Success(String value) -> writer.saveFeed(parseFeed(value, feed));
|
||||
case FetchResult.TransientError() -> {
|
||||
int errorCount = errorCounts.getOrDefault(feed.domain().toLowerCase(), 0);
|
||||
writer.setErrorCount(feed.domain().toLowerCase(), ++errorCount);
|
||||
|
||||
if (errorCount < 5) {
|
||||
// Permit the server a few days worth of retries before we drop the feed entirely
|
||||
if (deterministic || (daysSinceUpdate > 2 && ThreadLocalRandom.current()
|
||||
.nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1)) {
|
||||
// Skip updating this feed, just write the old data back instead
|
||||
writer.saveFeed(oldData);
|
||||
return;
|
||||
}
|
||||
}
|
||||
case FetchResult.PermanentError() -> {} // let the definition be forgotten about
|
||||
}
|
||||
*/
|
||||
|
||||
if ((definitionsUpdated.incrementAndGet() % 1_000) == 0) {
|
||||
// Update the progress every 1k feeds, to avoid hammering the database and flooding the logs
|
||||
heartbeat.progress("Updated " + definitionsUpdated + "/" + totalDefinitions + " feeds", definitionsUpdated.get(), totalDefinitions);
|
||||
FetchResult feedData;
|
||||
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
|
||||
feedData = fetchFeedData(feed, client);
|
||||
} catch (Exception ex) {
|
||||
feedData = new FetchResult.TransientError();
|
||||
}
|
||||
|
||||
switch (feedData) {
|
||||
case FetchResult.Success(String value) -> writer.saveFeed(parseFeed(value, feed));
|
||||
case FetchResult.TransientError() -> {
|
||||
int errorCount = errorCounts.getOrDefault(feed.domain().toLowerCase(), 0);
|
||||
writer.setErrorCount(feed.domain().toLowerCase(), ++errorCount);
|
||||
|
||||
if (errorCount < 5) {
|
||||
// Permit the server a few days worth of retries before we drop the feed entirely
|
||||
writer.saveFeed(oldData);
|
||||
}
|
||||
}
|
||||
case FetchResult.PermanentError() -> {
|
||||
} // let the definition be forgotten about
|
||||
}
|
||||
|
||||
}
|
||||
finally {
|
||||
if ((definitionsUpdated.incrementAndGet() % 1_000) == 0) {
|
||||
// Update the progress every 1k feeds, to avoid hammering the database and flooding the logs
|
||||
heartbeat.progress("Updated " + definitionsUpdated + "/" + totalDefinitions + " feeds", definitionsUpdated.get(), totalDefinitions);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -361,7 +377,7 @@ public class FeedFetcherService {
|
||||
return seenFragments.size() > 1;
|
||||
}
|
||||
|
||||
private static class IsFeedItemDateValid implements Predicate<FeedItem> {
|
||||
static class IsFeedItemDateValid implements Predicate<FeedItem> {
|
||||
private final String today = ZonedDateTime.now().format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
|
||||
|
||||
public boolean test(FeedItem item) {
|
||||
|
@@ -107,8 +107,7 @@ public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements Dis
|
||||
|
||||
@Override
|
||||
public void getFeed(RpcDomainId request,
|
||||
StreamObserver<RpcFeed> responseObserver)
|
||||
{
|
||||
StreamObserver<RpcFeed> responseObserver) {
|
||||
if (!feedDb.isEnabled()) {
|
||||
responseObserver.onError(new IllegalStateException("Feed database is disabled on this node"));
|
||||
return;
|
||||
@@ -126,7 +125,8 @@ public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements Dis
|
||||
.setDomainId(request.getDomainId())
|
||||
.setDomain(domainName.get().toString())
|
||||
.setFeedUrl(feedItems.feedUrl())
|
||||
.setUpdated(feedItems.updated());
|
||||
.setUpdated(feedItems.updated())
|
||||
.setFetchTimestamp(feedDb.getFetchTime().toEpochMilli());
|
||||
|
||||
for (var item : feedItems.items()) {
|
||||
retB.addItemsBuilder()
|
||||
|
@@ -49,13 +49,14 @@ public class Units {
|
||||
var fromUnit = unitsByName.get(fromUnitName.toLowerCase());
|
||||
var toUnit = unitsByName.get(toUnitName.toLowerCase());
|
||||
|
||||
if (Objects.equals(fromUnit, toUnit)) {
|
||||
return Optional.of(value + " " + fromUnit.name);
|
||||
}
|
||||
if (null == fromUnit || null == toUnit) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
if (Objects.equals(fromUnit, toUnit)) {
|
||||
return Optional.of(value + " " + fromUnit.name);
|
||||
}
|
||||
|
||||
if (!Objects.equals(toUnit.type, fromUnit.type)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
@@ -4,6 +4,7 @@ import crawlercommons.robots.SimpleRobotRules;
|
||||
import crawlercommons.robots.SimpleRobotRulesParser;
|
||||
import nu.marginalia.WmsaHome;
|
||||
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
|
||||
import nu.marginalia.crawl.logic.DomainLocks;
|
||||
import nu.marginalia.crawl.retreival.CrawlDelayTimer;
|
||||
import nu.marginalia.db.DbDomainQueries;
|
||||
import nu.marginalia.db.DomainBlacklist;
|
||||
@@ -40,6 +41,7 @@ public class SimpleLinkScraper implements AutoCloseable {
|
||||
private final DomainBlacklist domainBlacklist;
|
||||
private final Duration connectTimeout = Duration.ofSeconds(10);
|
||||
private final Duration readTimeout = Duration.ofSeconds(10);
|
||||
private final DomainLocks domainLocks = new DomainLocks();
|
||||
|
||||
public SimpleLinkScraper(LiveCrawlDataSet dataSet,
|
||||
DbDomainQueries domainQueries,
|
||||
@@ -65,7 +67,9 @@ public class SimpleLinkScraper implements AutoCloseable {
|
||||
.connectTimeout(connectTimeout)
|
||||
.followRedirects(HttpClient.Redirect.NEVER)
|
||||
.version(HttpClient.Version.HTTP_2)
|
||||
.build()) {
|
||||
.build();
|
||||
DomainLocks.DomainLock lock = domainLocks.lockDomain(domain) // throttle concurrent access per domain; do not remove
|
||||
) {
|
||||
|
||||
EdgeUrl rootUrl = domain.toRootUrlHttps();
|
||||
|
||||
|
@@ -2,4 +2,5 @@
|
||||
|
||||
A master HEAD tagged with deploy-core*, deploy-executor*, or deploy-index* will trigger a commit.
|
||||
|
||||
2024-12-19: Test deployment of executor
|
||||
2024-12-19-00002: Test deployment of executor
|
||||
2024-12-19-00001: Test deployment of executor
|
@@ -8,7 +8,7 @@ jib {
|
||||
}
|
||||
container {
|
||||
mainClass = application.mainClass
|
||||
jvmFlags = ['-Dservice.bind-address=0.0.0.0', '-Dservice.useDockerHostname=TRUE', '-Dsystem.homePath=/wmsa']
|
||||
jvmFlags = ['-Dservice.useDockerHostname=TRUE', '-Dsystem.homePath=/wmsa']
|
||||
volumes = ['/wmsa/conf', '/wmsa/model', '/wmsa/data', '/var/log/wmsa']
|
||||
}
|
||||
}
|
||||
|
325
tools/deployment/deployment.py
Normal file
325
tools/deployment/deployment.py
Normal file
@@ -0,0 +1,325 @@
|
||||
from dataclasses import dataclass
|
||||
import subprocess, os
|
||||
from typing import List, Set, Dict, Optional
|
||||
import argparse
|
||||
|
||||
build_dir = "/app/search.marginalia.nu/build"
|
||||
docker_dir = "/app/search.marginalia.nu/docker"
|
||||
|
||||
@dataclass
|
||||
class ServiceConfig:
|
||||
"""Configuration for a service"""
|
||||
gradle_target: str
|
||||
docker_name: str
|
||||
instances: int | None
|
||||
deploy_tier: int
|
||||
groups: Set[str]
|
||||
|
||||
@dataclass
|
||||
class DeploymentPlan:
|
||||
services_to_build: List[str]
|
||||
instances_to_deploy: Set[str]
|
||||
|
||||
@dataclass
|
||||
class DockerContainer:
|
||||
name: str
|
||||
partition: int
|
||||
config: ServiceConfig
|
||||
|
||||
def docker_name(self) -> str:
|
||||
if self.partition < 1:
|
||||
return f"{self.name}"
|
||||
return f"{self.name}-{self.partition}"
|
||||
|
||||
def deploy_key(self) -> str:
|
||||
return f"{self.config.deploy_tier}.{self.partition}"
|
||||
|
||||
class BuildError(Exception):
|
||||
"""Raised when a build fails"""
|
||||
def __init__(self, service: str, return_code: int):
|
||||
self.service = service
|
||||
self.return_code = return_code
|
||||
super().__init__(f"Build failed for {service} with code {return_code}")
|
||||
|
||||
def get_deployment_tag() -> str | None:
|
||||
"""Get the deployment tag from the current HEAD commit, if one exists."""
|
||||
cmd = ['git', 'for-each-ref', '--points-at', 'HEAD', 'refs/tags', '--format=%(refname:short) %(subject)']
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f"Git command failed: {result.stderr}")
|
||||
|
||||
for tag in result.stdout.splitlines():
|
||||
if tag.startswith('deploy-'):
|
||||
return tag.split(' ')[1:]
|
||||
|
||||
return None
|
||||
|
||||
def parse_deployment_tags(
|
||||
tag_messages: List[str],
|
||||
service_config: Dict[str, ServiceConfig]
|
||||
) -> DeploymentPlan:
|
||||
"""
|
||||
Parse deployment and hold tags using service configuration.
|
||||
|
||||
Args:
|
||||
tag_messages: List of tag messages (e.g. ['deploy:all,-frontend', 'hold:index-service-7'])
|
||||
service_config: Dictionary mapping service names to their configuration
|
||||
|
||||
Returns:
|
||||
DeploymentPlan containing services to build and instances to hold
|
||||
"""
|
||||
services_to_build = set()
|
||||
services_to_exclude = set()
|
||||
instances_to_hold = set()
|
||||
|
||||
available_services = set(service_config.keys())
|
||||
available_groups = set()
|
||||
|
||||
partitions = set()
|
||||
|
||||
for service in service_config.values():
|
||||
available_groups = available_groups | service.groups
|
||||
|
||||
for tag in [tag.strip() for tag in tag_messages]:
|
||||
if tag.startswith('partition:'):
|
||||
for p in tag[10:].strip().split(','):
|
||||
partitions.add(int(p))
|
||||
if tag.startswith('deploy:'):
|
||||
parts = tag[7:].strip().split(',')
|
||||
|
||||
for part in parts:
|
||||
part = part.strip()
|
||||
|
||||
if part.startswith('-'):
|
||||
service = part[1:]
|
||||
if not service in available_services:
|
||||
raise ValueError(f"Unknown service {service}")
|
||||
|
||||
services_to_exclude.add(service)
|
||||
elif part.startswith('+'):
|
||||
service = part[1:]
|
||||
if not service in available_services:
|
||||
raise ValueError(f"Unknown service {service}")
|
||||
|
||||
services_to_build.add(service)
|
||||
else:
|
||||
group = part
|
||||
if not group in available_groups:
|
||||
raise ValueError(f"Unknown service group {group}")
|
||||
for name, service in service_config.items():
|
||||
if group in service.groups:
|
||||
services_to_build.add(name)
|
||||
|
||||
elif tag.startswith('hold:'):
|
||||
instances = tag[5:].strip().split(',')
|
||||
instances_to_hold.update(i.strip() for i in instances if i.strip())
|
||||
|
||||
print(partitions)
|
||||
|
||||
# Remove any explicitly excluded services
|
||||
services_to_build = services_to_build - services_to_exclude
|
||||
|
||||
# Validate that all specified services exist
|
||||
invalid_services = (services_to_build | services_to_exclude) - available_services
|
||||
if invalid_services:
|
||||
raise ValueError(f"Unknown services specified: {invalid_services}")
|
||||
|
||||
to_deploy = list()
|
||||
for service in services_to_build:
|
||||
config = service_config[service]
|
||||
|
||||
if config.instances == None:
|
||||
if config.docker_name in instances_to_hold:
|
||||
continue
|
||||
container = DockerContainer(config.docker_name, 0, config)
|
||||
|
||||
if len(partitions) == 0 or 0 in partitions:
|
||||
to_deploy.append(container)
|
||||
else:
|
||||
for instance in range(1,config.instances + 1):
|
||||
if config.docker_name in instances_to_hold:
|
||||
continue
|
||||
|
||||
container_name = f"{config.docker_name}-{instance}"
|
||||
if container_name in instances_to_hold:
|
||||
continue
|
||||
|
||||
if len(partitions) == 0 or instance in partitions:
|
||||
to_deploy.append(DockerContainer(container_name, instance, config))
|
||||
|
||||
return DeploymentPlan(
|
||||
services_to_build=sorted(list(services_to_build)),
|
||||
instances_to_deploy=sorted(to_deploy, key = lambda c : c.deploy_key())
|
||||
)
|
||||
|
||||
|
||||
def deploy_container(container: DockerContainer) -> None:
|
||||
|
||||
"""
|
||||
Run a docker deployment for the specified service and target.
|
||||
Raises BuildError if the build fails.
|
||||
"""
|
||||
print(f"Deploying {container.name}")
|
||||
process = subprocess.Popen(
|
||||
['docker', 'compose', '--progress', 'quiet', 'up', '-d', container.name],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True
|
||||
)
|
||||
|
||||
|
||||
# Stream output in real-time
|
||||
while True:
|
||||
output = process.stdout.readline()
|
||||
if output == '' and process.poll() is not None:
|
||||
break
|
||||
if output:
|
||||
print(output.rstrip())
|
||||
|
||||
return_code = process.poll()
|
||||
if return_code != 0:
|
||||
raise BuildError(container, return_code)
|
||||
|
||||
def deploy_services(containers: List[str]) -> None:
|
||||
print(f"Deploying {containers}")
|
||||
os.chdir(docker_dir)
|
||||
for container in containers:
|
||||
deploy_container(container)
|
||||
|
||||
def build_and_deploy(plan: DeploymentPlan, service_config: Dict[str, ServiceConfig]):
|
||||
"""Execute the deployment plan"""
|
||||
run_gradle_build([service_config[service].gradle_target for service in plan.services_to_build])
|
||||
|
||||
deploy_services(plan.instances_to_deploy)
|
||||
|
||||
|
||||
|
||||
def run_gradle_build(targets: str) -> None:
|
||||
"""
|
||||
Run a Gradle build for the specified target.
|
||||
Raises BuildError if the build fails.
|
||||
"""
|
||||
print(f"\nBuilding targets {targets}")
|
||||
process = subprocess.Popen(
|
||||
['./gradlew', '-q'] + targets,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True
|
||||
)
|
||||
|
||||
# Stream output in real-time
|
||||
while True:
|
||||
output = process.stdout.readline()
|
||||
if output == '' and process.poll() is not None:
|
||||
break
|
||||
if output:
|
||||
print(output.rstrip())
|
||||
|
||||
return_code = process.poll()
|
||||
if return_code != 0:
|
||||
raise BuildError(service, return_code)
|
||||
|
||||
# Example usage:
|
||||
if __name__ == '__main__':
|
||||
# Define service configuration
|
||||
SERVICE_CONFIG = {
|
||||
'search': ServiceConfig(
|
||||
gradle_target=':code:services-application:search-service:docker',
|
||||
docker_name='search-service',
|
||||
instances=2,
|
||||
deploy_tier=2,
|
||||
groups={"all", "frontend", "core"}
|
||||
),
|
||||
'api': ServiceConfig(
|
||||
gradle_target=':code:services-application:api-service:docker',
|
||||
docker_name='api-service',
|
||||
instances=2,
|
||||
deploy_tier=1,
|
||||
groups={"all", "core"}
|
||||
),
|
||||
'assistant': ServiceConfig(
|
||||
gradle_target=':code:services-core:assistant-service:docker',
|
||||
docker_name='assistant-service',
|
||||
instances=2,
|
||||
deploy_tier=2,
|
||||
groups={"all", "core"}
|
||||
),
|
||||
'explorer': ServiceConfig(
|
||||
gradle_target=':code:services-application:explorer-service:docker',
|
||||
docker_name='explorer-service',
|
||||
instances=None,
|
||||
deploy_tier=1,
|
||||
groups={"all", "extra"}
|
||||
),
|
||||
'dating': ServiceConfig(
|
||||
gradle_target=':code:services-application:dating-service:docker',
|
||||
docker_name='dating-service',
|
||||
instances=None,
|
||||
deploy_tier=1,
|
||||
groups={"all", "extra"}
|
||||
),
|
||||
'index': ServiceConfig(
|
||||
gradle_target=':code:services-core:index-service:docker',
|
||||
docker_name='index-service',
|
||||
instances=10,
|
||||
deploy_tier=3,
|
||||
groups={"all", "index"}
|
||||
),
|
||||
'executor': ServiceConfig(
|
||||
gradle_target=':code:services-core:executor-service:docker',
|
||||
docker_name='executor-service',
|
||||
instances=10,
|
||||
deploy_tier=3,
|
||||
groups={"all", "executor"}
|
||||
),
|
||||
'control': ServiceConfig(
|
||||
gradle_target=':code:services-core:control-service:docker',
|
||||
docker_name='control-service',
|
||||
instances=None,
|
||||
deploy_tier=0,
|
||||
groups={"all", "core"}
|
||||
),
|
||||
'query': ServiceConfig(
|
||||
gradle_target=':code:services-core:query-service:docker',
|
||||
docker_name='query-service',
|
||||
instances=2,
|
||||
deploy_tier=2,
|
||||
groups={"all", "query"}
|
||||
),
|
||||
}
|
||||
|
||||
try:
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='deployment.py',
|
||||
description='Continuous Deployment helper')
|
||||
parser.add_argument('-v', '--verify', help='Verify the tags are valid, if present', action='store_true')
|
||||
parser.add_argument('-t', '--tag', help='Use the specified tag value instead of the head git tag starting with deploy-')
|
||||
|
||||
args = parser.parse_args()
|
||||
tags = args.tag
|
||||
if tags is None:
|
||||
tags = get_deployment_tag()
|
||||
else:
|
||||
tags = tags.split(' ')
|
||||
|
||||
|
||||
|
||||
if tags != None:
|
||||
print("Found deployment tags:", tags)
|
||||
|
||||
plan = parse_deployment_tags(tags, SERVICE_CONFIG)
|
||||
|
||||
print("\nDeployment Plan:")
|
||||
print("Services to build:", plan.services_to_build)
|
||||
print("Instances to deploy:", [container.name for container in plan.instances_to_deploy])
|
||||
|
||||
if not args.verify:
|
||||
print("\nExecution Plan:")
|
||||
|
||||
build_and_deploy(plan, SERVICE_CONFIG)
|
||||
else:
|
||||
print("No tags found")
|
||||
|
||||
except ValueError as e:
|
||||
print(f"Error: {e}")
|
Reference in New Issue
Block a user