1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

5 Commits

Author SHA1 Message Date
Viktor Lofgren
b66879ccb1 (feed) Add support for date discovery through atom:issued and atom:created
This is specifically to help parse monadnock.net's Atom feed.
2024-12-23 20:05:58 +01:00
Viktor Lofgren
f1b7157ca2 (deploy) Add basic linting ability to deployment script. 2024-12-23 16:21:29 +01:00
Viktor Lofgren
7622335e84 (deploy) Correct deploy script, set correct name for assistant 2024-12-23 15:59:02 +01:00
Viktor Lofgren
0da2047eae (live-capture) Correctly update processed count, disable poll rate adjustment based on freshness. 2024-12-23 15:56:27 +01:00
Viktor Lofgren
5ee4321110 (ci) Correct deploy script 2024-12-22 20:08:37 +01:00
2 changed files with 91 additions and 53 deletions

View File

@@ -38,7 +38,6 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
@@ -74,6 +73,17 @@ public class FeedFetcherService {
this.nodeConfigurationService = nodeConfigurationService;
this.serviceHeartbeat = serviceHeartbeat;
this.executorClient = executorClient;
// Add support for some alternate date tags for atom
rssReader.addItemExtension("issued", this::setDateFallback);
rssReader.addItemExtension("created", this::setDateFallback);
}
private void setDateFallback(Item item, String value) {
if (item.getPubDate().isEmpty()) {
item.setPubDate(value);
}
}
public enum UpdateMode {
@@ -124,51 +134,57 @@ public class FeedFetcherService {
for (var feed : definitions) {
executor.submitQuietly(() -> {
var oldData = feedDb.getFeed(new EdgeDomain(feed.domain()));
try {
var oldData = feedDb.getFeed(new EdgeDomain(feed.domain()));
// If we have existing data, we might skip updating it with a probability that increases with time,
// this is to avoid hammering the feeds that are updated very rarely and save some time and resources
// on our end
// If we have existing data, we might skip updating it with a probability that increases with time,
// this is to avoid hammering the feeds that are updated very rarely and save some time and resources
// on our end
if (!oldData.isEmpty()) {
Duration duration = feed.durationSinceUpdated();
long daysSinceUpdate = duration.toDays();
/* Disable for now:
if (!oldData.isEmpty()) {
Duration duration = feed.durationSinceUpdated();
long daysSinceUpdate = duration.toDays();
if (deterministic || (daysSinceUpdate > 2 && ThreadLocalRandom.current()
.nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1))
{
// Skip updating this feed, just write the old data back instead
writer.saveFeed(oldData);
return;
}
}
FetchResult feedData;
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
feedData = fetchFeedData(feed, client);
}
catch (Exception ex) {
feedData = new FetchResult.TransientError();
}
switch (feedData) {
case FetchResult.Success(String value) -> writer.saveFeed(parseFeed(value, feed));
case FetchResult.TransientError() -> {
int errorCount = errorCounts.getOrDefault(feed.domain().toLowerCase(), 0);
writer.setErrorCount(feed.domain().toLowerCase(), ++errorCount);
if (errorCount < 5) {
// Permit the server a few days worth of retries before we drop the feed entirely
if (deterministic || (daysSinceUpdate > 2 && ThreadLocalRandom.current()
.nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1)) {
// Skip updating this feed, just write the old data back instead
writer.saveFeed(oldData);
return;
}
}
case FetchResult.PermanentError() -> {} // let the definition be forgotten about
}
*/
if ((definitionsUpdated.incrementAndGet() % 1_000) == 0) {
// Update the progress every 1k feeds, to avoid hammering the database and flooding the logs
heartbeat.progress("Updated " + definitionsUpdated + "/" + totalDefinitions + " feeds", definitionsUpdated.get(), totalDefinitions);
FetchResult feedData;
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
feedData = fetchFeedData(feed, client);
} catch (Exception ex) {
feedData = new FetchResult.TransientError();
}
switch (feedData) {
case FetchResult.Success(String value) -> writer.saveFeed(parseFeed(value, feed));
case FetchResult.TransientError() -> {
int errorCount = errorCounts.getOrDefault(feed.domain().toLowerCase(), 0);
writer.setErrorCount(feed.domain().toLowerCase(), ++errorCount);
if (errorCount < 5) {
// Permit the server a few days worth of retries before we drop the feed entirely
writer.saveFeed(oldData);
}
}
case FetchResult.PermanentError() -> {
} // let the definition be forgotten about
}
}
finally {
if ((definitionsUpdated.incrementAndGet() % 1_000) == 0) {
// Update the progress every 1k feeds, to avoid hammering the database and flooding the logs
heartbeat.progress("Updated " + definitionsUpdated + "/" + totalDefinitions + " feeds", definitionsUpdated.get(), totalDefinitions);
}
}
});
}
@@ -361,7 +377,7 @@ public class FeedFetcherService {
return seenFragments.size() > 1;
}
private static class IsFeedItemDateValid implements Predicate<FeedItem> {
static class IsFeedItemDateValid implements Predicate<FeedItem> {
private final String today = ZonedDateTime.now().format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
public boolean test(FeedItem item) {

View File

@@ -1,6 +1,7 @@
from dataclasses import dataclass
import subprocess, os
from typing import List, Set, Dict, Optional
import argparse
build_dir = "/app/search.marginalia.nu/build"
docker_dir = "/app/search.marginalia.nu/docker"
@@ -82,9 +83,17 @@ def parse_deployment_tags(
if part == 'all':
services_to_build.update(available_services)
elif part.startswith('-'):
services_to_exclude.add(part[1:])
service = part[1:]
if not service in available_services:
raise ValueError(f"Unknown service {service}")
services_to_exclude.add(service)
elif part.startswith('+'):
services_to_build.add(part[1:])
service = part[1:]
if not service in available_services:
raise ValueError(f"Unknown service {service}")
services_to_build.add(service)
elif tag.startswith('hold:'):
instances = tag[5:].strip().split(',')
@@ -118,6 +127,7 @@ def deploy_container(container: DockerContainer) -> None:
text=True
)
# Stream output in real-time
while True:
output = process.stdout.readline()
@@ -156,6 +166,9 @@ def build_and_deploy(plan: DeploymentPlan, service_config: Dict[str, ServiceConf
to_deploy.append(container)
else:
for instance in range(1,config.instances + 1):
if config.docker_name in plan.instances_to_hold:
continue
container_name = f"{config.docker_name}-{instance}"
if container_name in plan.instances_to_hold:
continue
@@ -207,7 +220,7 @@ if __name__ == '__main__':
instances=2,
deploy_tier=1
),
'api': ServiceConfig(
'assistant': ServiceConfig(
gradle_target=':code:services-core:assistant-service:docker',
docker_name='assistant-service',
instances=2,
@@ -216,13 +229,13 @@ if __name__ == '__main__':
'explorer': ServiceConfig(
gradle_target=':code:services-application:explorer-service:docker',
docker_name='explorer-service',
instances=1,
instances=None,
deploy_tier=1
),
'dating': ServiceConfig(
gradle_target=':code:services-application:dating-service:docker',
docker_name='dating-service',
instances=1,
instances=None,
deploy_tier=1
),
'index': ServiceConfig(
@@ -253,19 +266,28 @@ if __name__ == '__main__':
try:
tags = get_deployment_tag()
if tags == None:
exit
parser = argparse.ArgumentParser(
prog='deployment.py',
description='Continuous Deployment helper')
parser.add_argument('-v', '--verify', help='Verify the tags are valid, if present', action='store_true')
print(tags)
args = parser.parse_args()
plan = parse_deployment_tags(tags, SERVICE_CONFIG)
print("\nDeployment Plan:")
print("Services to build:", plan.services_to_build)
print("Instances to hold:", plan.instances_to_hold)
if tags != None:
print("Found deployment tags:", tags)
print("\nExecution Plan:")
plan = parse_deployment_tags(tags, SERVICE_CONFIG)
build_and_deploy(plan, SERVICE_CONFIG)
print("\nDeployment Plan:")
print("Services to build:", plan.services_to_build)
print("Instances to hold:", plan.instances_to_hold)
if not args.verify:
print("\nExecution Plan:")
build_and_deploy(plan, SERVICE_CONFIG)
else:
print("No tags found")
except ValueError as e:
print(f"Error: {e}")