mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-07 03:42:39 +02:00
Compare commits
4 Commits
deploy-000
...
deploy-000
Author | SHA1 | Date | |
---|---|---|---|
|
7622335e84 | ||
|
0da2047eae | ||
|
5ee4321110 | ||
|
9459b9933b |
@@ -38,7 +38,6 @@ import java.time.ZonedDateTime;
|
|||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.BiFunction;
|
import java.util.function.BiFunction;
|
||||||
@@ -124,51 +123,57 @@ public class FeedFetcherService {
|
|||||||
|
|
||||||
for (var feed : definitions) {
|
for (var feed : definitions) {
|
||||||
executor.submitQuietly(() -> {
|
executor.submitQuietly(() -> {
|
||||||
var oldData = feedDb.getFeed(new EdgeDomain(feed.domain()));
|
try {
|
||||||
|
var oldData = feedDb.getFeed(new EdgeDomain(feed.domain()));
|
||||||
|
|
||||||
// If we have existing data, we might skip updating it with a probability that increases with time,
|
// If we have existing data, we might skip updating it with a probability that increases with time,
|
||||||
// this is to avoid hammering the feeds that are updated very rarely and save some time and resources
|
// this is to avoid hammering the feeds that are updated very rarely and save some time and resources
|
||||||
// on our end
|
// on our end
|
||||||
|
|
||||||
if (!oldData.isEmpty()) {
|
/* Disable for now:
|
||||||
Duration duration = feed.durationSinceUpdated();
|
|
||||||
long daysSinceUpdate = duration.toDays();
|
if (!oldData.isEmpty()) {
|
||||||
|
Duration duration = feed.durationSinceUpdated();
|
||||||
|
long daysSinceUpdate = duration.toDays();
|
||||||
|
|
||||||
|
|
||||||
if (deterministic || (daysSinceUpdate > 2 && ThreadLocalRandom.current()
|
if (deterministic || (daysSinceUpdate > 2 && ThreadLocalRandom.current()
|
||||||
.nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1))
|
.nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1)) {
|
||||||
{
|
// Skip updating this feed, just write the old data back instead
|
||||||
// Skip updating this feed, just write the old data back instead
|
|
||||||
writer.saveFeed(oldData);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
FetchResult feedData;
|
|
||||||
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
|
|
||||||
feedData = fetchFeedData(feed, client);
|
|
||||||
}
|
|
||||||
catch (Exception ex) {
|
|
||||||
feedData = new FetchResult.TransientError();
|
|
||||||
}
|
|
||||||
|
|
||||||
switch (feedData) {
|
|
||||||
case FetchResult.Success(String value) -> writer.saveFeed(parseFeed(value, feed));
|
|
||||||
case FetchResult.TransientError() -> {
|
|
||||||
int errorCount = errorCounts.getOrDefault(feed.domain().toLowerCase(), 0);
|
|
||||||
writer.setErrorCount(feed.domain().toLowerCase(), ++errorCount);
|
|
||||||
|
|
||||||
if (errorCount < 5) {
|
|
||||||
// Permit the server a few days worth of retries before we drop the feed entirely
|
|
||||||
writer.saveFeed(oldData);
|
writer.saveFeed(oldData);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case FetchResult.PermanentError() -> {} // let the definition be forgotten about
|
*/
|
||||||
}
|
|
||||||
|
|
||||||
if ((definitionsUpdated.incrementAndGet() % 1_000) == 0) {
|
FetchResult feedData;
|
||||||
// Update the progress every 1k feeds, to avoid hammering the database and flooding the logs
|
try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
|
||||||
heartbeat.progress("Updated " + definitionsUpdated + "/" + totalDefinitions + " feeds", definitionsUpdated.get(), totalDefinitions);
|
feedData = fetchFeedData(feed, client);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
feedData = new FetchResult.TransientError();
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (feedData) {
|
||||||
|
case FetchResult.Success(String value) -> writer.saveFeed(parseFeed(value, feed));
|
||||||
|
case FetchResult.TransientError() -> {
|
||||||
|
int errorCount = errorCounts.getOrDefault(feed.domain().toLowerCase(), 0);
|
||||||
|
writer.setErrorCount(feed.domain().toLowerCase(), ++errorCount);
|
||||||
|
|
||||||
|
if (errorCount < 5) {
|
||||||
|
// Permit the server a few days worth of retries before we drop the feed entirely
|
||||||
|
writer.saveFeed(oldData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case FetchResult.PermanentError() -> {
|
||||||
|
} // let the definition be forgotten about
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
if ((definitionsUpdated.incrementAndGet() % 1_000) == 0) {
|
||||||
|
// Update the progress every 1k feeds, to avoid hammering the database and flooding the logs
|
||||||
|
heartbeat.progress("Updated " + definitionsUpdated + "/" + totalDefinitions + " feeds", definitionsUpdated.get(), totalDefinitions);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@@ -118,6 +118,7 @@ def deploy_container(container: DockerContainer) -> None:
|
|||||||
text=True
|
text=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# Stream output in real-time
|
# Stream output in real-time
|
||||||
while True:
|
while True:
|
||||||
output = process.stdout.readline()
|
output = process.stdout.readline()
|
||||||
@@ -156,6 +157,9 @@ def build_and_deploy(plan: DeploymentPlan, service_config: Dict[str, ServiceConf
|
|||||||
to_deploy.append(container)
|
to_deploy.append(container)
|
||||||
else:
|
else:
|
||||||
for instance in range(1,config.instances + 1):
|
for instance in range(1,config.instances + 1):
|
||||||
|
if config.docker_name in plan.instances_to_hold:
|
||||||
|
continue
|
||||||
|
|
||||||
container_name = f"{config.docker_name}-{instance}"
|
container_name = f"{config.docker_name}-{instance}"
|
||||||
if container_name in plan.instances_to_hold:
|
if container_name in plan.instances_to_hold:
|
||||||
continue
|
continue
|
||||||
@@ -207,22 +211,22 @@ if __name__ == '__main__':
|
|||||||
instances=2,
|
instances=2,
|
||||||
deploy_tier=1
|
deploy_tier=1
|
||||||
),
|
),
|
||||||
'api': ServiceConfig(
|
'assistant': ServiceConfig(
|
||||||
gradle_target=':code:services-core:assistant-service:docker',
|
gradle_target=':code:services-core:assistant-service:docker',
|
||||||
docker_name='assistant-service',
|
docker_name='assistant-service',
|
||||||
instances=2,
|
instances=2,
|
||||||
deploy_tier1=2
|
deploy_tier=2
|
||||||
),
|
),
|
||||||
'explorer': ServiceConfig(
|
'explorer': ServiceConfig(
|
||||||
gradle_target=':code:services-application:explorer-service:docker',
|
gradle_target=':code:services-application:explorer-service:docker',
|
||||||
docker_name='explorer-service',
|
docker_name='explorer-service',
|
||||||
instances=1,
|
instances=None,
|
||||||
deploy_tier=1
|
deploy_tier=1
|
||||||
),
|
),
|
||||||
'dating': ServiceConfig(
|
'dating': ServiceConfig(
|
||||||
gradle_target=':code:services-application:dating-service:docker',
|
gradle_target=':code:services-application:dating-service:docker',
|
||||||
docker_name='dating-service',
|
docker_name='dating-service',
|
||||||
instances=1,
|
instances=None,
|
||||||
deploy_tier=1
|
deploy_tier=1
|
||||||
),
|
),
|
||||||
'index': ServiceConfig(
|
'index': ServiceConfig(
|
||||||
@@ -253,19 +257,19 @@ if __name__ == '__main__':
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
tags = get_deployment_tag()
|
tags = get_deployment_tag()
|
||||||
if tags == None:
|
if tags != None:
|
||||||
exit
|
print("Found deployment tags:", tags)
|
||||||
|
|
||||||
print(tags)
|
plan = parse_deployment_tags(tags, SERVICE_CONFIG)
|
||||||
|
print("\nDeployment Plan:")
|
||||||
|
print("Services to build:", plan.services_to_build)
|
||||||
|
print("Instances to hold:", plan.instances_to_hold)
|
||||||
|
|
||||||
plan = parse_deployment_tags(tags, SERVICE_CONFIG)
|
print("\nExecution Plan:")
|
||||||
print("\nDeployment Plan:")
|
|
||||||
print("Services to build:", plan.services_to_build)
|
|
||||||
print("Instances to hold:", plan.instances_to_hold)
|
|
||||||
|
|
||||||
print("\nExecution Plan:")
|
build_and_deploy(plan, SERVICE_CONFIG)
|
||||||
|
else:
|
||||||
build_and_deploy(plan, SERVICE_CONFIG)
|
print("No tags found")
|
||||||
|
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
print(f"Error: {e}")
|
print(f"Error: {e}")
|
||||||
|
Reference in New Issue
Block a user