1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

4 Commits

Author SHA1 Message Date
Viktor Lofgren
94d4d2edb7 (live-crawler) Add refresh date to feeds API
For now this is just the ctime for the feeds db.  We may want to store this per-record in the future.
2024-12-25 14:20:48 +01:00
Viktor Lofgren
7ae19a92ba (deploy) Improve deployment script to allow specification of partitions 2024-12-24 11:16:15 +01:00
Viktor Lofgren
56d14e56d7 (live-crawler) Improve LiveCrawlActor resilience to FeedService outages 2024-12-23 23:33:54 +01:00
Viktor Lofgren
a557c7ae7f (live-crawler) Limit concurrent accesses per domain using DomainLocks from main crawler 2024-12-23 23:31:03 +01:00
7 changed files with 117 additions and 62 deletions

View File

@@ -50,12 +50,18 @@ public class LiveCrawlActor extends RecordActorPrototype {
yield new Monitor("-"); yield new Monitor("-");
} }
case Monitor(String feedsHash) -> { case Monitor(String feedsHash) -> {
// Sleep initially in case this is during start-up
for (;;) { for (;;) {
String currentHash = feedsClient.getFeedDataHash(); try {
if (!Objects.equals(currentHash, feedsHash)) { Thread.sleep(Duration.ofMinutes(15));
yield new LiveCrawl(currentHash); String currentHash = feedsClient.getFeedDataHash();
if (!Objects.equals(currentHash, feedsHash)) {
yield new LiveCrawl(currentHash);
}
}
catch (RuntimeException ex) {
logger.error("Failed to fetch feed data hash");
} }
Thread.sleep(Duration.ofMinutes(15));
} }
} }
case LiveCrawl(String feedsHash, long msgId) when msgId < 0 -> { case LiveCrawl(String feedsHash, long msgId) when msgId < 0 -> {

View File

@@ -59,12 +59,6 @@ public class FeedsClient {
.forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList()))); .forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList())));
} }
public record UpdatedDomain(String domain, List<String> urls) {
public UpdatedDomain(RpcUpdatedLinksResponse rsp) {
this(rsp.getDomain(), new ArrayList<>(rsp.getUrlList()));
}
}
/** Get the hash of the feed data, for identifying when the data has been updated */ /** Get the hash of the feed data, for identifying when the data has been updated */
public String getFeedDataHash() { public String getFeedDataHash() {
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash) return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash)

View File

@@ -46,6 +46,7 @@ message RpcFeed {
string feedUrl = 3; string feedUrl = 3;
string updated = 4; string updated = 4;
repeated RpcFeedItem items = 5; repeated RpcFeedItem items = 5;
int64 fetchTimestamp = 6;
} }
message RpcFeedItem { message RpcFeedItem {

View File

@@ -12,9 +12,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.PosixFileAttributes;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.time.Instant; import java.time.Instant;
import java.util.Base64; import java.util.Base64;
@@ -209,4 +211,20 @@ public class FeedDb {
reader.getLinksUpdatedSince(since, consumer); reader.getLinksUpdatedSince(since, consumer);
} }
public Instant getFetchTime() {
if (!Files.exists(readerDbPath)) {
return Instant.ofEpochMilli(0);
}
try {
return Files.readAttributes(readerDbPath, PosixFileAttributes.class)
.creationTime()
.toInstant();
}
catch (IOException ex) {
logger.error("Failed to read the creatiom time of {}", readerDbPath);
return Instant.ofEpochMilli(0);
}
}
} }

View File

@@ -107,8 +107,7 @@ public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements Dis
@Override @Override
public void getFeed(RpcDomainId request, public void getFeed(RpcDomainId request,
StreamObserver<RpcFeed> responseObserver) StreamObserver<RpcFeed> responseObserver) {
{
if (!feedDb.isEnabled()) { if (!feedDb.isEnabled()) {
responseObserver.onError(new IllegalStateException("Feed database is disabled on this node")); responseObserver.onError(new IllegalStateException("Feed database is disabled on this node"));
return; return;
@@ -126,7 +125,8 @@ public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements Dis
.setDomainId(request.getDomainId()) .setDomainId(request.getDomainId())
.setDomain(domainName.get().toString()) .setDomain(domainName.get().toString())
.setFeedUrl(feedItems.feedUrl()) .setFeedUrl(feedItems.feedUrl())
.setUpdated(feedItems.updated()); .setUpdated(feedItems.updated())
.setFetchTimestamp(feedDb.getFetchTime().toEpochMilli());
for (var item : feedItems.items()) { for (var item : feedItems.items()) {
retB.addItemsBuilder() retB.addItemsBuilder()

View File

@@ -4,6 +4,7 @@ import crawlercommons.robots.SimpleRobotRules;
import crawlercommons.robots.SimpleRobotRulesParser; import crawlercommons.robots.SimpleRobotRulesParser;
import nu.marginalia.WmsaHome; import nu.marginalia.WmsaHome;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.logic.DomainLocks;
import nu.marginalia.crawl.retreival.CrawlDelayTimer; import nu.marginalia.crawl.retreival.CrawlDelayTimer;
import nu.marginalia.db.DbDomainQueries; import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.db.DomainBlacklist; import nu.marginalia.db.DomainBlacklist;
@@ -40,6 +41,7 @@ public class SimpleLinkScraper implements AutoCloseable {
private final DomainBlacklist domainBlacklist; private final DomainBlacklist domainBlacklist;
private final Duration connectTimeout = Duration.ofSeconds(10); private final Duration connectTimeout = Duration.ofSeconds(10);
private final Duration readTimeout = Duration.ofSeconds(10); private final Duration readTimeout = Duration.ofSeconds(10);
private final DomainLocks domainLocks = new DomainLocks();
public SimpleLinkScraper(LiveCrawlDataSet dataSet, public SimpleLinkScraper(LiveCrawlDataSet dataSet,
DbDomainQueries domainQueries, DbDomainQueries domainQueries,
@@ -65,7 +67,9 @@ public class SimpleLinkScraper implements AutoCloseable {
.connectTimeout(connectTimeout) .connectTimeout(connectTimeout)
.followRedirects(HttpClient.Redirect.NEVER) .followRedirects(HttpClient.Redirect.NEVER)
.version(HttpClient.Version.HTTP_2) .version(HttpClient.Version.HTTP_2)
.build()) { .build();
DomainLocks.DomainLock lock = domainLocks.lockDomain(domain) // throttle concurrent access per domain; do not remove
) {
EdgeUrl rootUrl = domain.toRootUrlHttps(); EdgeUrl rootUrl = domain.toRootUrlHttps();

View File

@@ -13,11 +13,12 @@ class ServiceConfig:
docker_name: str docker_name: str
instances: int | None instances: int | None
deploy_tier: int deploy_tier: int
groups: Set[str]
@dataclass @dataclass
class DeploymentPlan: class DeploymentPlan:
services_to_build: List[str] services_to_build: List[str]
instances_to_hold: Set[str] instances_to_deploy: Set[str]
@dataclass @dataclass
class DockerContainer: class DockerContainer:
@@ -73,16 +74,24 @@ def parse_deployment_tags(
instances_to_hold = set() instances_to_hold = set()
available_services = set(service_config.keys()) available_services = set(service_config.keys())
available_groups = set()
partitions = set()
for service in service_config.values():
available_groups = available_groups | service.groups
for tag in [tag.strip() for tag in tag_messages]: for tag in [tag.strip() for tag in tag_messages]:
if tag.startswith('partition:'):
for p in tag[10:].strip().split(','):
partitions.add(int(p))
if tag.startswith('deploy:'): if tag.startswith('deploy:'):
parts = tag[7:].strip().split(',') parts = tag[7:].strip().split(',')
for part in parts: for part in parts:
part = part.strip() part = part.strip()
if part == 'all':
services_to_build.update(available_services) if part.startswith('-'):
elif part.startswith('-'):
service = part[1:] service = part[1:]
if not service in available_services: if not service in available_services:
raise ValueError(f"Unknown service {service}") raise ValueError(f"Unknown service {service}")
@@ -94,11 +103,20 @@ def parse_deployment_tags(
raise ValueError(f"Unknown service {service}") raise ValueError(f"Unknown service {service}")
services_to_build.add(service) services_to_build.add(service)
else:
group = part
if not group in available_groups:
raise ValueError(f"Unknown service group {group}")
for name, service in service_config.items():
if group in service.groups:
services_to_build.add(name)
elif tag.startswith('hold:'): elif tag.startswith('hold:'):
instances = tag[5:].strip().split(',') instances = tag[5:].strip().split(',')
instances_to_hold.update(i.strip() for i in instances if i.strip()) instances_to_hold.update(i.strip() for i in instances if i.strip())
print(partitions)
# Remove any explicitly excluded services # Remove any explicitly excluded services
services_to_build = services_to_build - services_to_exclude services_to_build = services_to_build - services_to_exclude
@@ -107,9 +125,32 @@ def parse_deployment_tags(
if invalid_services: if invalid_services:
raise ValueError(f"Unknown services specified: {invalid_services}") raise ValueError(f"Unknown services specified: {invalid_services}")
to_deploy = list()
for service in services_to_build:
config = service_config[service]
if config.instances == None:
if config.docker_name in instances_to_hold:
continue
container = DockerContainer(config.docker_name, 0, config)
if len(partitions) == 0 or 0 in partitions:
to_deploy.append(container)
else:
for instance in range(1,config.instances + 1):
if config.docker_name in instances_to_hold:
continue
container_name = f"{config.docker_name}-{instance}"
if container_name in instances_to_hold:
continue
if len(partitions) == 0 or instance in partitions:
to_deploy.append(DockerContainer(container_name, instance, config))
return DeploymentPlan( return DeploymentPlan(
services_to_build=sorted(list(services_to_build)), services_to_build=sorted(list(services_to_build)),
instances_to_hold=instances_to_hold instances_to_deploy=sorted(to_deploy, key = lambda c : c.deploy_key())
) )
@@ -141,52 +182,27 @@ def deploy_container(container: DockerContainer) -> None:
raise BuildError(container, return_code) raise BuildError(container, return_code)
def deploy_services(containers: List[str]) -> None: def deploy_services(containers: List[str]) -> None:
cwd = os.getcwd() print(f"Deploying {containers}")
os.chdir(docker_dir) os.chdir(docker_dir)
for container in containers: for container in containers:
deploy_container(container) deploy_container(container)
def build_and_deploy(plan: DeploymentPlan, service_config: Dict[str, ServiceConfig]): def build_and_deploy(plan: DeploymentPlan, service_config: Dict[str, ServiceConfig]):
"""Execute the deployment plan""" """Execute the deployment plan"""
for service in plan.services_to_build: run_gradle_build([service_config[service].gradle_target for service in plan.services_to_build])
config = service_config[service]
print(f"Building {service}:")
run_gradle_build(service, config.gradle_target)
to_deploy = list() deploy_services(plan.instances_to_deploy)
for service in plan.services_to_build:
config = service_config[service]
if config.instances == None:
if config.docker_name in plan.instances_to_hold:
continue
container = DockerContainer(config.docker_name, 0, config)
to_deploy.append(container)
else:
for instance in range(1,config.instances + 1):
if config.docker_name in plan.instances_to_hold:
continue
container_name = f"{config.docker_name}-{instance}"
if container_name in plan.instances_to_hold:
continue
to_deploy.append(DockerContainer(container_name, instance, config))
to_deploy = sorted(to_deploy, key = lambda c : c.deploy_key())
deploy_services(to_deploy)
def run_gradle_build(service: str, target: str) -> None: def run_gradle_build(targets: str) -> None:
""" """
Run a Gradle build for the specified service and target. Run a Gradle build for the specified target.
Raises BuildError if the build fails. Raises BuildError if the build fails.
""" """
print(f"\nBuilding {service} with target {target}") print(f"\nBuilding targets {targets}")
process = subprocess.Popen( process = subprocess.Popen(
['./gradlew', '-q', target], ['./gradlew', '-q'] + targets,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
text=True text=True
@@ -212,66 +228,82 @@ if __name__ == '__main__':
gradle_target=':code:services-application:search-service:docker', gradle_target=':code:services-application:search-service:docker',
docker_name='search-service', docker_name='search-service',
instances=2, instances=2,
deploy_tier=2 deploy_tier=2,
groups={"all", "frontend", "core"}
), ),
'api': ServiceConfig( 'api': ServiceConfig(
gradle_target=':code:services-application:api-service:docker', gradle_target=':code:services-application:api-service:docker',
docker_name='api-service', docker_name='api-service',
instances=2, instances=2,
deploy_tier=1 deploy_tier=1,
groups={"all", "core"}
), ),
'assistant': ServiceConfig( 'assistant': ServiceConfig(
gradle_target=':code:services-core:assistant-service:docker', gradle_target=':code:services-core:assistant-service:docker',
docker_name='assistant-service', docker_name='assistant-service',
instances=2, instances=2,
deploy_tier=2 deploy_tier=2,
groups={"all", "core"}
), ),
'explorer': ServiceConfig( 'explorer': ServiceConfig(
gradle_target=':code:services-application:explorer-service:docker', gradle_target=':code:services-application:explorer-service:docker',
docker_name='explorer-service', docker_name='explorer-service',
instances=None, instances=None,
deploy_tier=1 deploy_tier=1,
groups={"all", "extra"}
), ),
'dating': ServiceConfig( 'dating': ServiceConfig(
gradle_target=':code:services-application:dating-service:docker', gradle_target=':code:services-application:dating-service:docker',
docker_name='dating-service', docker_name='dating-service',
instances=None, instances=None,
deploy_tier=1 deploy_tier=1,
groups={"all", "extra"}
), ),
'index': ServiceConfig( 'index': ServiceConfig(
gradle_target=':code:services-core:index-service:docker', gradle_target=':code:services-core:index-service:docker',
docker_name='index-service', docker_name='index-service',
instances=10, instances=10,
deploy_tier=3 deploy_tier=3,
groups={"all", "index"}
), ),
'executor': ServiceConfig( 'executor': ServiceConfig(
gradle_target=':code:services-core:executor-service:docker', gradle_target=':code:services-core:executor-service:docker',
docker_name='executor-service', docker_name='executor-service',
instances=10, instances=10,
deploy_tier=3 deploy_tier=3,
groups={"all", "executor"}
), ),
'control': ServiceConfig( 'control': ServiceConfig(
gradle_target=':code:services-core:control-service:docker', gradle_target=':code:services-core:control-service:docker',
docker_name='control-service', docker_name='control-service',
instances=None, instances=None,
deploy_tier=0 deploy_tier=0,
groups={"all", "core"}
), ),
'query': ServiceConfig( 'query': ServiceConfig(
gradle_target=':code:services-core:query-service:docker', gradle_target=':code:services-core:query-service:docker',
docker_name='query-service', docker_name='query-service',
instances=2, instances=2,
deploy_tier=2 deploy_tier=2,
groups={"all", "query"}
), ),
} }
try: try:
tags = get_deployment_tag()
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog='deployment.py', prog='deployment.py',
description='Continuous Deployment helper') description='Continuous Deployment helper')
parser.add_argument('-v', '--verify', help='Verify the tags are valid, if present', action='store_true') parser.add_argument('-v', '--verify', help='Verify the tags are valid, if present', action='store_true')
parser.add_argument('-t', '--tag', help='Use the specified tag value instead of the head git tag starting with deploy-')
args = parser.parse_args() args = parser.parse_args()
tags = args.tag
if tags is None:
tags = get_deployment_tag()
else:
tags = tags.split(' ')
if tags != None: if tags != None:
print("Found deployment tags:", tags) print("Found deployment tags:", tags)
@@ -280,7 +312,7 @@ if __name__ == '__main__':
print("\nDeployment Plan:") print("\nDeployment Plan:")
print("Services to build:", plan.services_to_build) print("Services to build:", plan.services_to_build)
print("Instances to hold:", plan.instances_to_hold) print("Instances to deploy:", [container.name for container in plan.instances_to_deploy])
if not args.verify: if not args.verify:
print("\nExecution Plan:") print("\nExecution Plan:")