1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-05 21:22:39 +02:00

(architecture) Remove the separate executor service and merge it into the index service

The primary motivation for this is that in production, the large number of partitioned services has lead to an intermittent exhaustion of available database connections, as each service has a connection pool.

The decision to have a separate executor service dates back from when the index service was very slow to start, and the executor didn't always spin off its memory-hungry tasks into separate processes, which meant the executor would sometimes OOM and crash, and it was undesirable to bring the index down with it.
This commit is contained in:
Viktor Lofgren
2025-07-23 12:55:53 +02:00
parent f1a900f383
commit 62b696b1c3
24 changed files with 50 additions and 271 deletions

View File

@@ -7,7 +7,6 @@ public enum ServiceId {
Search("search-service"),
Index("index-service"),
Query("query-service"),
Executor("executor-service"),
Control("control-service"),

View File

@@ -189,7 +189,7 @@ public class ExecutorClient {
String uriPath = "/transfer/file/" + fileStorage.id();
String uriQuery = "path=" + URLEncoder.encode(path, StandardCharsets.UTF_8);
var endpoints = registry.getEndpoints(ServiceKey.forRest(ServiceId.Executor, fileStorage.node()));
var endpoints = registry.getEndpoints(ServiceKey.forRest(ServiceId.Index, fileStorage.node()));
if (endpoints.isEmpty()) {
throw new RuntimeException("No endpoints for node " + fileStorage.node());
}

View File

@@ -1,4 +1,4 @@
package nu.marginalia.executor;
package nu.marginalia.svc;
import com.google.inject.Inject;
import nu.marginalia.storage.FileStorageService;

View File

@@ -1,4 +1,4 @@
package nu.marginalia.executor;
package nu.marginalia.svc;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorage;

View File

@@ -87,7 +87,7 @@ class FeedFetcherServiceTest extends AbstractModule {
bind(DomainCoordinator.class).to(LocalDomainCoordinator.class);
bind(HikariDataSource.class).toInstance(dataSource);
bind(ServiceRegistryIf.class).toInstance(Mockito.mock(ServiceRegistryIf.class));
bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration(ServiceId.Executor, 1, "", "", 0, UUID.randomUUID()));
bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration(ServiceId.Index, 1, "", "", 0, UUID.randomUUID()));
bind(Integer.class).annotatedWith(Names.named("wmsa-system-node")).toInstance(1);
}

View File

@@ -12,4 +12,5 @@ class DDGTrackerDataTest {
data.getDomainInfo("hotjar.com").ifPresent(System.out::println);
data.getAllClassifications().forEach(System.out::println);
}
}

View File

@@ -32,7 +32,6 @@ service, and for monitoring the health of the system. It also offers a web inte
* [index](services-core/index-service)
* Exposes the [index](index) subsystem
* Exposes the [functions/link-graph](functions/link-graph) subsystem
* [executor](services-core/executor-service)
* Exposes the [execution](execution) subsystem
* [assistant](services-core/assistant-service)
* Exposes the [functions/math](functions/math) subsystem

View File

@@ -2,7 +2,7 @@ package nu.marginalia.control.node.model;
import nu.marginalia.nodecfg.model.NodeConfiguration;
public record IndexNodeStatus(NodeConfiguration configuration, boolean indexServiceOnline, boolean executorServiceOnline) {
public record IndexNodeStatus(NodeConfiguration configuration, boolean indexServiceOnline) {
public int id() {
return configuration.node();
}

View File

@@ -338,7 +338,7 @@ public class ControlNodeService {
}
private List<EventLogEntry> getEvents(int nodeId) {
List<String> services = List.of(ServiceId.Index.serviceName +":"+nodeId, ServiceId.Executor.serviceName +":"+nodeId);
List<String> services = List.of(ServiceId.Index.serviceName +":"+nodeId);
List<EventLogEntry> events = new ArrayList<>(20);
for (var service :services) {
events.addAll(eventLogService.getLastEntriesForService(service, Long.MAX_VALUE, 10));
@@ -358,8 +358,7 @@ public class ControlNodeService {
public IndexNodeStatus getStatus(NodeConfiguration config) {
return new IndexNodeStatus(config,
monitors.isServiceUp(ServiceId.Index, config.node()),
monitors.isServiceUp(ServiceId.Executor, config.node())
monitors.isServiceUp(ServiceId.Index, config.node())
);
}

View File

@@ -2,7 +2,7 @@
<h2>Nodes</h2>
<table class="table">
<tr>
<th>Node</th><th>Profile</th><th>Queries</th><th>Enabled</th><th>Index</th><th>Executor</th>
<th>Node</th><th>Profile</th><th>Queries</th><th>Enabled</th><th>Index</th>
</tr>
{{#each .}}
<tr>
@@ -24,9 +24,6 @@
</td>
{{#if indexServiceOnline}}<td>Online</td>{{/if}}
{{#unless indexServiceOnline}}<td class="table-danger">Offline</td>{{/unless}}
{{#if executorServiceOnline}}<td>Online</td>{{/if}}
{{#unless executorServiceOnline}}<td class="table-warning">Offline</td>{{/unless}}
</tr>
{{/each}}
</table>

View File

@@ -1,100 +0,0 @@
plugins {
id 'java'
id 'application'
id 'jvm-test-suite'
id 'com.google.cloud.tools.jib' version '3.4.5'
}
application {
mainClass = 'nu.marginalia.executor.ExecutorMain'
applicationName = 'executor-service'
}
tasks.distZip.enabled = false
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
}
}
apply from: "$rootProject.projectDir/srcsets.gradle"
apply from: "$rootProject.projectDir/docker.gradle"
dependencies {
// These look weird but they're needed to be able to spawn the processes
// from the executor service
implementation project(':code:processes:crawling-process')
implementation project(':code:processes:loading-process')
implementation project(':code:processes:converting-process')
implementation project(':code:processes:index-constructor-process')
implementation project(':code:common:config')
implementation project(':code:common:model')
implementation project(':code:common:db')
implementation project(':code:common:linkdb')
implementation project(':code:common:service')
implementation project(':third-party:commons-codec')
implementation project(':code:libraries:message-queue')
implementation project(':code:functions:link-graph:api')
implementation project(':code:functions:favicon')
implementation project(':code:functions:favicon:api')
implementation project(':code:functions:nsfw-domain-filter')
implementation project(':code:processes:crawling-process:model')
implementation project(':code:processes:crawling-process:model')
implementation project(':code:processes:crawling-process:ft-link-parser')
implementation project(':code:index:index-journal')
implementation project(':code:index:api')
implementation project(':code:processes:process-mq-api')
implementation project(':code:execution')
implementation project(':code:execution:api')
implementation project(':third-party:encyclopedia-marginalia-nu')
implementation libs.bundles.slf4j
implementation dependencies.create(libs.spark.get()) {
exclude group: 'org.eclipse.jetty'
}
implementation libs.bundles.jetty
implementation libs.guava
libs.bundles.grpc.get().each {
implementation dependencies.create(it) {
exclude group: 'com.google.guava'
}
}
implementation libs.gson
implementation libs.prometheus
implementation libs.notnull
implementation libs.guava
implementation dependencies.create(libs.guice.get()) {
exclude group: 'com.google.guava'
}
implementation libs.trove
implementation libs.zstd
implementation libs.jsoup
implementation libs.commons.io
implementation libs.commons.compress
implementation libs.commons.lang3
implementation libs.bundles.mariadb
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4')
testImplementation libs.commons.codec
testImplementation 'org.testcontainers:mariadb:1.17.4'
testImplementation 'org.testcontainers:junit-jupiter:1.17.4'
testImplementation project(':code:libraries:test-helpers')
}

View File

@@ -1,45 +0,0 @@
package nu.marginalia.executor;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.nsfw.NsfwFilterModule;
import nu.marginalia.service.MainClass;
import nu.marginalia.service.ServiceId;
import nu.marginalia.service.discovery.ServiceRegistryIf;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.service.module.ServiceConfigurationModule;
import nu.marginalia.service.module.ServiceDiscoveryModule;
import nu.marginalia.service.server.Initialization;
import nu.marginalia.service.server.NodeStatusWatcher;
public class ExecutorMain extends MainClass {
private final ExecutorSvc service;
@Inject
public ExecutorMain(ExecutorSvc service) {
this.service = service;
}
public static void main(String... args) {
init(ServiceId.Executor, args);
Injector injector = Guice.createInjector(
new ExecutorModule(),
new DatabaseModule(false),
new NsfwFilterModule(),
new ServiceDiscoveryModule(),
new ServiceConfigurationModule(ServiceId.Executor)
);
// Orchestrate the boot order for the services
var registry = injector.getInstance(ServiceRegistryIf.class);
var configuration = injector.getInstance(ServiceConfiguration.class);
orchestrateBoot(registry, configuration);
injector.getInstance(NodeStatusWatcher.class);
injector.getInstance(ExecutorMain.class);
injector.getInstance(Initialization.class).setReady();
}
}

View File

@@ -1,8 +0,0 @@
package nu.marginalia.executor;
import com.google.inject.AbstractModule;
public class ExecutorModule extends AbstractModule {
public void configure() {
}
}

View File

@@ -1,55 +0,0 @@
package nu.marginalia.executor;
import com.google.inject.Inject;
import nu.marginalia.execution.*;
import nu.marginalia.functions.favicon.FaviconGrpcService;
import nu.marginalia.service.discovery.property.ServicePartition;
import nu.marginalia.service.server.BaseServiceParams;
import nu.marginalia.service.server.SparkService;
import nu.marginalia.service.server.mq.MqRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.Spark;
import java.util.List;
// Weird name for this one to not have clashes with java.util.concurrent.ExecutorService
public class ExecutorSvc extends SparkService {
private static final Logger logger = LoggerFactory.getLogger(ExecutorSvc.class);
private final ExecutionInit executionInit;
@Inject
public ExecutorSvc(BaseServiceParams params,
ExecutorGrpcService executorGrpcService,
ExecutorCrawlGrpcService executorCrawlGrpcService,
ExecutorSideloadGrpcService executorSideloadGrpcService,
ExecutorExportGrpcService executorExportGrpcService,
FaviconGrpcService faviconGrpcService,
ExecutionInit executionInit,
ExecutorFileTransferService fileTransferService) throws Exception {
super(params,
ServicePartition.partition(params.configuration.node()),
List.of(executorGrpcService,
executorCrawlGrpcService,
executorSideloadGrpcService,
executorExportGrpcService,
faviconGrpcService)
);
this.executionInit = executionInit;
Spark.get("/transfer/file/:fid", fileTransferService::transferFile);
Spark.head("/transfer/file/:fid", fileTransferService::transferFile);
}
@MqRequest(endpoint="FIRST-BOOT")
public void setUpDefaultActors(String message) throws Exception {
logger.info("Initializing default actors");
executionInit.initDefaultActors();
}
}

View File

@@ -1,10 +0,0 @@
The executor service is a partitioned service responsible for executing and keeping
track of long-running maintenance and operational tasks, such as crawling or data
processing.
The executor service is closely linked to the [control-service](../control-service),
which provides a user interface for much of the executor's functionality.
The service it itself relatively bare of code, but imports and exposes the [execution subsystem](../../execution),
which is responsible for the actual execution of tasks.

View File

@@ -30,10 +30,16 @@ dependencies {
implementation project(':code:common:db')
implementation project(':code:common:linkdb')
implementation project(':code:execution')
implementation project(':code:execution:api')
implementation project(':code:functions:favicon')
implementation project(':code:functions:favicon:api')
implementation project(':code:index')
implementation project(':code:functions:link-graph:partition')
implementation project(':code:functions:link-graph:api')
implementation project(':code:functions:search-query:api')
implementation project(':code:functions:nsfw-domain-filter')
implementation project(':code:index:api')
testImplementation project(path: ':code:services-core:control-service')

View File

@@ -3,13 +3,14 @@ package nu.marginalia.index;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.nsfw.NsfwFilterModule;
import nu.marginalia.service.MainClass;
import nu.marginalia.service.discovery.ServiceRegistryIf;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.service.module.ServiceDiscoveryModule;
import nu.marginalia.service.ServiceId;
import nu.marginalia.service.module.ServiceConfigurationModule;
import nu.marginalia.service.discovery.ServiceRegistryIf;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.service.module.ServiceConfigurationModule;
import nu.marginalia.service.module.ServiceDiscoveryModule;
import nu.marginalia.service.server.Initialization;
import nu.marginalia.service.server.NodeStatusWatcher;
@@ -28,6 +29,7 @@ public class IndexMain extends MainClass {
new IndexModule(),
new DatabaseModule(false),
new ServiceDiscoveryModule(),
new NsfwFilterModule(),
new ServiceConfigurationModule(ServiceId.Index)
);

View File

@@ -2,6 +2,8 @@ package nu.marginalia.index;
import com.google.inject.Inject;
import nu.marginalia.IndexLocations;
import nu.marginalia.execution.*;
import nu.marginalia.functions.favicon.FaviconGrpcService;
import nu.marginalia.index.api.IndexMqEndpoints;
import nu.marginalia.index.index.StatefulIndex;
import nu.marginalia.linkdb.docs.DocumentDbReader;
@@ -14,9 +16,11 @@ import nu.marginalia.service.server.Initialization;
import nu.marginalia.service.server.SparkService;
import nu.marginalia.service.server.mq.MqRequest;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.svc.ExecutorFileTransferService;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.Spark;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -38,6 +42,7 @@ public class IndexService extends SparkService {
private final DomainLinks domainLinks;
private final ServiceEventLog eventLog;
private final ExecutionInit executionInit;
@Inject
public IndexService(BaseServiceParams params,
@@ -48,13 +53,25 @@ public class IndexService extends SparkService {
DocumentDbReader documentDbReader,
DomainLinks domainLinks,
PartitionLinkGraphService partitionLinkGraphService,
ExecutorGrpcService executorGrpcService,
ExecutorCrawlGrpcService executorCrawlGrpcService,
ExecutorSideloadGrpcService executorSideloadGrpcService,
ExecutorExportGrpcService executorExportGrpcService,
FaviconGrpcService faviconGrpcService,
ExecutionInit executionInit,
ExecutorFileTransferService fileTransferService,
ServiceEventLog eventLog)
throws Exception
{
super(params,
ServicePartition.partition(params.configuration.node()),
List.of(indexQueryService,
partitionLinkGraphService)
partitionLinkGraphService,
executorGrpcService,
executorCrawlGrpcService,
executorSideloadGrpcService,
executorExportGrpcService,
faviconGrpcService)
);
this.opsService = opsService;
@@ -62,15 +79,26 @@ public class IndexService extends SparkService {
this.fileStorageService = fileStorageService;
this.documentDbReader = documentDbReader;
this.domainLinks = domainLinks;
this.executionInit = executionInit;
this.eventLog = eventLog;
this.init = params.initialization;
Spark.get("/transfer/file/:fid", fileTransferService::transferFile);
Spark.head("/transfer/file/:fid", fileTransferService::transferFile);
Thread.ofPlatform().name("initialize-index").start(this::initialize);
}
volatile boolean initialized = false;
@MqRequest(endpoint="FIRST-BOOT")
public void setUpDefaultActors(String message) throws Exception {
logger.info("Initializing default actors");
executionInit.initDefaultActors();
}
@MqRequest(endpoint = IndexMqEndpoints.INDEX_RERANK)
public String rerank(String message) {
if (!opsService.rerank()) {

View File

@@ -24,7 +24,6 @@ dependencies {
implementation project(':code:services-core:query-service')
implementation project(':code:services-core:index-service')
implementation project(':code:services-core:control-service')
implementation project(':code:services-core:executor-service')
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit

View File

@@ -40,11 +40,6 @@ services:
<<: *partition-1
image: "marginalia/index-service"
container_name: "index-service-1"
executor-service-1:
<<: *partition-1
restart: always
image: "marginalia/executor-service"
container_name: "executor-service-1"
query-service:
<<: *service
image: "marginalia/query-service"

View File

@@ -62,20 +62,10 @@ services:
<<: *partition-1
image: "marginalia/index-service"
container_name: "index-service-1"
executor-service-1:
<<: *partition-1
restart: always
image: "marginalia/executor-service"
container_name: "executor-service-1"
index-service-2:
<<: *partition-2
image: "marginalia/index-service"
container_name: "index-service-2"
executor-service-2:
<<: *partition-2
restart: always
image: "marginalia/executor-service"
container_name: "executor-service-2"
query-service:
<<: *service
image: "marginalia/query-service"

View File

@@ -62,20 +62,10 @@ services:
<<: *partition-1
image: "marginalia/index-service"
container_name: "index-service-1"
executor-service-1:
<<: *partition-1
restart: always
image: "marginalia/executor-service"
container_name: "executor-service-1"
index-service-2:
<<: *partition-2
image: "marginalia/index-service"
container_name: "index-service-2"
executor-service-2:
<<: *partition-2
restart: always
image: "marginalia/executor-service"
container_name: "executor-service-2"
query-service:
<<: *service
image: "marginalia/query-service"

View File

@@ -4,7 +4,6 @@ include 'code:services-core:index-service'
include 'code:services-core:assistant-service'
include 'code:services-core:control-service'
include 'code:services-core:query-service'
include 'code:services-core:executor-service'
include 'code:services-core:single-service-runner'
include 'code:services-application:search-service'

View File

@@ -76,13 +76,6 @@ SERVICE_CONFIG = {
deploy_tier=3,
groups={"all", "index"}
),
'executor': ServiceConfig(
gradle_target=':code:services-core:executor-service:docker',
docker_name='executor-service',
instances=10,
deploy_tier=3,
groups={"all", "executor"}
),
'control': ServiceConfig(
gradle_target=':code:services-core:control-service:docker',
docker_name='control-service',