mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-05 21:22:39 +02:00
Compare commits
5 Commits
deploy-000
...
deploy-000
Author | SHA1 | Date | |
---|---|---|---|
|
b66879ccb1 | ||
|
f1b7157ca2 | ||
|
7622335e84 | ||
|
0da2047eae | ||
|
5ee4321110 |
@@ -38,7 +38,6 @@ import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BiFunction;
|
||||
@@ -74,6 +73,17 @@ public class FeedFetcherService {
|
||||
this.nodeConfigurationService = nodeConfigurationService;
|
||||
this.serviceHeartbeat = serviceHeartbeat;
|
||||
this.executorClient = executorClient;
|
||||
|
||||
|
||||
// Add support for some alternate date tags for atom
|
||||
rssReader.addItemExtension("issued", this::setDateFallback);
|
||||
rssReader.addItemExtension("created", this::setDateFallback);
|
||||
}
|
||||
|
||||
private void setDateFallback(Item item, String value) {
|
||||
if (item.getPubDate().isEmpty()) {
|
||||
item.setPubDate(value);
|
||||
}
|
||||
}
|
||||
|
||||
public enum UpdateMode {
|
||||
@@ -124,51 +134,57 @@ public class FeedFetcherService {
|
||||
|
||||
for (var feed : definitions) {
|
||||
executor.submitQuietly(() -> {
|
||||
var oldData = feedDb.getFeed(new EdgeDomain(feed.domain()));
|
||||
try {
|
||||
var oldData = feedDb.getFeed(new EdgeDomain(feed.domain()));
|
||||
|
||||
// If we have existing data, we might skip updating it with a probability that increases with time,
|
||||
// this is to avoid hammering the feeds that are updated very rarely and save some time and resources
|
||||
// on our end
|
||||
// If we have existing data, we might skip updating it with a probability that increases with time,
|
||||
// this is to avoid hammering the feeds that are updated very rarely and save some time and resources
|
||||
// on our end
|
||||
|
||||
if (!oldData.isEmpty()) {
|
||||
Duration duration = feed.durationSinceUpdated();
|
||||
long daysSinceUpdate = duration.toDays();
|
||||
/* Disable for now:
|
||||
|
||||
if (!oldData.isEmpty()) {
|
||||
Duration duration = feed.durationSinceUpdated();
|
||||
long daysSinceUpdate = duration.toDays();
|
||||
|
||||
|
||||
if (deterministic || (daysSinceUpdate > 2 && ThreadLocalRandom.current()
|
||||
.nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1))
|
||||
{
|
||||
// Skip updating this feed, just write the old data back instead
|
||||
writer.saveFeed(oldData);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
FetchResult feedData;
|
||||
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
|
||||
feedData = fetchFeedData(feed, client);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
feedData = new FetchResult.TransientError();
|
||||
}
|
||||
|
||||
switch (feedData) {
|
||||
case FetchResult.Success(String value) -> writer.saveFeed(parseFeed(value, feed));
|
||||
case FetchResult.TransientError() -> {
|
||||
int errorCount = errorCounts.getOrDefault(feed.domain().toLowerCase(), 0);
|
||||
writer.setErrorCount(feed.domain().toLowerCase(), ++errorCount);
|
||||
|
||||
if (errorCount < 5) {
|
||||
// Permit the server a few days worth of retries before we drop the feed entirely
|
||||
if (deterministic || (daysSinceUpdate > 2 && ThreadLocalRandom.current()
|
||||
.nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1)) {
|
||||
// Skip updating this feed, just write the old data back instead
|
||||
writer.saveFeed(oldData);
|
||||
return;
|
||||
}
|
||||
}
|
||||
case FetchResult.PermanentError() -> {} // let the definition be forgotten about
|
||||
}
|
||||
*/
|
||||
|
||||
if ((definitionsUpdated.incrementAndGet() % 1_000) == 0) {
|
||||
// Update the progress every 1k feeds, to avoid hammering the database and flooding the logs
|
||||
heartbeat.progress("Updated " + definitionsUpdated + "/" + totalDefinitions + " feeds", definitionsUpdated.get(), totalDefinitions);
|
||||
FetchResult feedData;
|
||||
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
|
||||
feedData = fetchFeedData(feed, client);
|
||||
} catch (Exception ex) {
|
||||
feedData = new FetchResult.TransientError();
|
||||
}
|
||||
|
||||
switch (feedData) {
|
||||
case FetchResult.Success(String value) -> writer.saveFeed(parseFeed(value, feed));
|
||||
case FetchResult.TransientError() -> {
|
||||
int errorCount = errorCounts.getOrDefault(feed.domain().toLowerCase(), 0);
|
||||
writer.setErrorCount(feed.domain().toLowerCase(), ++errorCount);
|
||||
|
||||
if (errorCount < 5) {
|
||||
// Permit the server a few days worth of retries before we drop the feed entirely
|
||||
writer.saveFeed(oldData);
|
||||
}
|
||||
}
|
||||
case FetchResult.PermanentError() -> {
|
||||
} // let the definition be forgotten about
|
||||
}
|
||||
|
||||
}
|
||||
finally {
|
||||
if ((definitionsUpdated.incrementAndGet() % 1_000) == 0) {
|
||||
// Update the progress every 1k feeds, to avoid hammering the database and flooding the logs
|
||||
heartbeat.progress("Updated " + definitionsUpdated + "/" + totalDefinitions + " feeds", definitionsUpdated.get(), totalDefinitions);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -361,7 +377,7 @@ public class FeedFetcherService {
|
||||
return seenFragments.size() > 1;
|
||||
}
|
||||
|
||||
private static class IsFeedItemDateValid implements Predicate<FeedItem> {
|
||||
static class IsFeedItemDateValid implements Predicate<FeedItem> {
|
||||
private final String today = ZonedDateTime.now().format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
|
||||
|
||||
public boolean test(FeedItem item) {
|
||||
|
@@ -1,6 +1,7 @@
|
||||
from dataclasses import dataclass
|
||||
import subprocess, os
|
||||
from typing import List, Set, Dict, Optional
|
||||
import argparse
|
||||
|
||||
build_dir = "/app/search.marginalia.nu/build"
|
||||
docker_dir = "/app/search.marginalia.nu/docker"
|
||||
@@ -82,9 +83,17 @@ def parse_deployment_tags(
|
||||
if part == 'all':
|
||||
services_to_build.update(available_services)
|
||||
elif part.startswith('-'):
|
||||
services_to_exclude.add(part[1:])
|
||||
service = part[1:]
|
||||
if not service in available_services:
|
||||
raise ValueError(f"Unknown service {service}")
|
||||
|
||||
services_to_exclude.add(service)
|
||||
elif part.startswith('+'):
|
||||
services_to_build.add(part[1:])
|
||||
service = part[1:]
|
||||
if not service in available_services:
|
||||
raise ValueError(f"Unknown service {service}")
|
||||
|
||||
services_to_build.add(service)
|
||||
|
||||
elif tag.startswith('hold:'):
|
||||
instances = tag[5:].strip().split(',')
|
||||
@@ -118,6 +127,7 @@ def deploy_container(container: DockerContainer) -> None:
|
||||
text=True
|
||||
)
|
||||
|
||||
|
||||
# Stream output in real-time
|
||||
while True:
|
||||
output = process.stdout.readline()
|
||||
@@ -156,6 +166,9 @@ def build_and_deploy(plan: DeploymentPlan, service_config: Dict[str, ServiceConf
|
||||
to_deploy.append(container)
|
||||
else:
|
||||
for instance in range(1,config.instances + 1):
|
||||
if config.docker_name in plan.instances_to_hold:
|
||||
continue
|
||||
|
||||
container_name = f"{config.docker_name}-{instance}"
|
||||
if container_name in plan.instances_to_hold:
|
||||
continue
|
||||
@@ -207,7 +220,7 @@ if __name__ == '__main__':
|
||||
instances=2,
|
||||
deploy_tier=1
|
||||
),
|
||||
'api': ServiceConfig(
|
||||
'assistant': ServiceConfig(
|
||||
gradle_target=':code:services-core:assistant-service:docker',
|
||||
docker_name='assistant-service',
|
||||
instances=2,
|
||||
@@ -216,13 +229,13 @@ if __name__ == '__main__':
|
||||
'explorer': ServiceConfig(
|
||||
gradle_target=':code:services-application:explorer-service:docker',
|
||||
docker_name='explorer-service',
|
||||
instances=1,
|
||||
instances=None,
|
||||
deploy_tier=1
|
||||
),
|
||||
'dating': ServiceConfig(
|
||||
gradle_target=':code:services-application:dating-service:docker',
|
||||
docker_name='dating-service',
|
||||
instances=1,
|
||||
instances=None,
|
||||
deploy_tier=1
|
||||
),
|
||||
'index': ServiceConfig(
|
||||
@@ -253,19 +266,28 @@ if __name__ == '__main__':
|
||||
|
||||
try:
|
||||
tags = get_deployment_tag()
|
||||
if tags == None:
|
||||
exit
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='deployment.py',
|
||||
description='Continuous Deployment helper')
|
||||
parser.add_argument('-v', '--verify', help='Verify the tags are valid, if present', action='store_true')
|
||||
|
||||
print(tags)
|
||||
args = parser.parse_args()
|
||||
|
||||
plan = parse_deployment_tags(tags, SERVICE_CONFIG)
|
||||
print("\nDeployment Plan:")
|
||||
print("Services to build:", plan.services_to_build)
|
||||
print("Instances to hold:", plan.instances_to_hold)
|
||||
if tags != None:
|
||||
print("Found deployment tags:", tags)
|
||||
|
||||
print("\nExecution Plan:")
|
||||
plan = parse_deployment_tags(tags, SERVICE_CONFIG)
|
||||
|
||||
build_and_deploy(plan, SERVICE_CONFIG)
|
||||
print("\nDeployment Plan:")
|
||||
print("Services to build:", plan.services_to_build)
|
||||
print("Instances to hold:", plan.instances_to_hold)
|
||||
|
||||
if not args.verify:
|
||||
print("\nExecution Plan:")
|
||||
|
||||
build_and_deploy(plan, SERVICE_CONFIG)
|
||||
else:
|
||||
print("No tags found")
|
||||
|
||||
except ValueError as e:
|
||||
print(f"Error: {e}")
|
||||
|
Reference in New Issue
Block a user