1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-05 21:22:39 +02:00

(refac) Move execution API out of executor service

This commit is contained in:
Viktor Lofgren
2024-02-23 13:26:11 +01:00
parent 2201b1a506
commit 56d35aa596
67 changed files with 979 additions and 936 deletions

View File

@@ -1,9 +0,0 @@
package nu.marginalia.executor.model.transfer;
import nu.marginalia.storage.model.FileStorageId;
public record TransferItem(String domainName,
int domainId,
FileStorageId fileStorageId,
String path) {
}

View File

@@ -1,13 +0,0 @@
package nu.marginalia.executor.model.transfer;
import java.util.List;
public record TransferSpec(List<TransferItem> items) {
public TransferSpec() {
this(List.of());
}
public int size() {
return items.size();
}
}

View File

@@ -1,23 +0,0 @@
# Clients
## Core Services
* [assistant-api](assistant-api/)
* [query-api](query-api/)
* [index-api](index-api/)
These are clients for the [core services](../services-core/), along with what models
are necessary for speaking to them. They each implement the abstract client classes from
[service-client](../common/service-client).
All that is necessary is to `@Inject` them into the constructor and then
requests can be sent.
**Note:** If you are looking for the public API, it's handled by the api service in [services-application/api-service](../services-application/api-service).
## MQ-API Process API
[process-mqapi](process-mqapi/) defines requests and inboxes for the message queue based API used
for interacting with processes.
See [libraries/message-queue](../libraries/message-queue) and [services-application/control-service](../services-core/control-service).

View File

@@ -4,14 +4,18 @@ plugins {
id "com.google.protobuf" version "0.9.4"
}
jar.archiveBaseName = 'execution-api'
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(21))
}
}
apply from: "$rootProject.projectDir/protobuf.gradle"
sourceSets {
main {
proto {
@@ -20,8 +24,6 @@ sourceSets {
}
}
dependencies {
implementation project(':code:common:model')
implementation project(':code:index:api')

View File

@@ -2,16 +2,15 @@ package nu.marginalia.executor.client;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.service.client.GrpcMultiNodeChannelPool;
import nu.marginalia.executor.api.*;
import nu.marginalia.executor.api.ExecutorApiGrpc.ExecutorApiBlockingStub;
import nu.marginalia.executor.model.ActorRunState;
import nu.marginalia.executor.model.ActorRunStates;
import nu.marginalia.executor.storage.FileStorageContent;
import nu.marginalia.executor.storage.FileStorageFile;
import nu.marginalia.executor.upload.UploadDirContents;
import nu.marginalia.executor.upload.UploadDirItem;
import nu.marginalia.functions.execution.api.*;
import nu.marginalia.service.client.GrpcChannelPoolFactory;
import nu.marginalia.service.client.GrpcMultiNodeChannelPool;
import nu.marginalia.service.discovery.ServiceRegistryIf;
import nu.marginalia.service.discovery.property.ServiceKey;
import nu.marginalia.service.discovery.property.ServicePartition;
@@ -26,9 +25,10 @@ import java.io.OutputStream;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.List;
import static nu.marginalia.functions.execution.api.ExecutorApiGrpc.*;
@Singleton
public class ExecutorClient {
private final GrpcMultiNodeChannelPool<ExecutorApiBlockingStub> channelPool;
@@ -72,131 +72,13 @@ public class ExecutorClient {
}
public void triggerCrawl(int node, FileStorageId fid) {
channelPool.call(ExecutorApiBlockingStub::triggerCrawl)
.forNode(node)
.run(RpcFileStorageId.newBuilder()
.setFileStorageId(fid.id())
.build());
}
public void triggerRecrawl(int node, FileStorageId fid) {
channelPool.call(ExecutorApiBlockingStub::triggerRecrawl)
.forNode(node)
.run(RpcFileStorageId.newBuilder()
.setFileStorageId(fid.id())
.build());
}
public void triggerConvert(int node, FileStorageId fid) {
channelPool.call(ExecutorApiBlockingStub::triggerConvert)
.forNode(node)
.run(RpcFileStorageId.newBuilder()
.setFileStorageId(fid.id())
.build());
}
public void triggerConvertAndLoad(int node, FileStorageId fid) {
channelPool.call(ExecutorApiBlockingStub::triggerConvertAndLoad)
.forNode(node)
.run(RpcFileStorageId.newBuilder()
.setFileStorageId(fid.id())
.build());
}
public void loadProcessedData(int node, List<FileStorageId> ids) {
channelPool.call(ExecutorApiBlockingStub::loadProcessedData)
.forNode(node)
.run(RpcFileStorageIds.newBuilder()
.addAllFileStorageIds(ids.stream().map(FileStorageId::id).toList())
.build());
}
public void calculateAdjacencies(int node) {
channelPool.call(ExecutorApiBlockingStub::calculateAdjacencies)
.forNode(node)
.run(Empty.getDefaultInstance());
}
public void sideloadEncyclopedia(int node, Path sourcePath, String baseUrl) {
channelPool.call(ExecutorApiBlockingStub::sideloadEncyclopedia)
.forNode(node)
.run(RpcSideloadEncyclopedia.newBuilder()
.setBaseUrl(baseUrl)
.setSourcePath(sourcePath.toString())
.build());
}
public void sideloadDirtree(int node, Path sourcePath) {
channelPool.call(ExecutorApiBlockingStub::sideloadDirtree)
.forNode(node)
.run(RpcSideloadDirtree.newBuilder()
.setSourcePath(sourcePath.toString())
.build());
}
public void sideloadReddit(int node, Path sourcePath) {
channelPool.call(ExecutorApiBlockingStub::sideloadReddit)
.forNode(node)
.run(RpcSideloadReddit.newBuilder()
.setSourcePath(sourcePath.toString())
.build());
}
public void sideloadWarc(int node, Path sourcePath) {
channelPool.call(ExecutorApiBlockingStub::sideloadWarc)
.forNode(node)
.run(RpcSideloadWarc.newBuilder()
.setSourcePath(sourcePath.toString())
.build());
}
public void sideloadStackexchange(int node, Path sourcePath) {
channelPool.call(ExecutorApiBlockingStub::sideloadStackexchange)
.forNode(node)
.run(RpcSideloadStackexchange.newBuilder()
.setSourcePath(sourcePath.toString())
.build());
}
public void createCrawlSpecFromDownload(int node, String description, String url) {
channelPool.call(ExecutorApiBlockingStub::createCrawlSpecFromDownload)
.forNode(node)
.run(RpcCrawlSpecFromDownload.newBuilder()
.setDescription(description)
.setUrl(url)
.build());
}
public void exportAtags(int node, FileStorageId fid) {
channelPool.call(ExecutorApiBlockingStub::exportAtags)
.forNode(node)
.run(RpcFileStorageId.newBuilder()
.setFileStorageId(fid.id())
.build());
}
public void exportSampleData(int node, FileStorageId fid, int size, String name) {
channelPool.call(ExecutorApiBlockingStub::exportSampleData)
.forNode(node)
.run(RpcExportSampleData.newBuilder()
.setFileStorageId(fid.id())
.setSize(size)
.setName(name)
.build());
}
public void exportRssFeeds(int node, FileStorageId fid) {
channelPool.call(ExecutorApiBlockingStub::exportRssFeeds)
.forNode(node)
.run(RpcFileStorageId.newBuilder()
.setFileStorageId(fid.id())
.build());
}
public void exportTermFrequencies(int node, FileStorageId fid) {
channelPool.call(ExecutorApiBlockingStub::exportTermFrequencies)
.forNode(node)
.run(RpcFileStorageId.newBuilder()
.setFileStorageId(fid.id())
.build());
}
public void downloadSampleData(int node, String sampleSet) {
channelPool.call(ExecutorApiBlockingStub::downloadSampleData)
@@ -206,17 +88,11 @@ public class ExecutorClient {
.build());
}
public void exportData(int node) {
channelPool.call(ExecutorApiBlockingStub::exportData)
.forNode(node)
.run(Empty.getDefaultInstance());
}
public void restoreBackup(int node, FileStorageId fid) {
public void restoreBackup(int nodeId, FileStorageId toLoad) {
channelPool.call(ExecutorApiBlockingStub::restoreBackup)
.forNode(node)
.forNode(nodeId)
.run(RpcFileStorageId.newBuilder()
.setFileStorageId(fid.id())
.setFileStorageId(toLoad.id())
.build());
}
@@ -300,5 +176,4 @@ public class ExecutorClient {
}
}
}

View File

@@ -0,0 +1,80 @@
package nu.marginalia.executor.client;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.functions.execution.api.*;
import nu.marginalia.service.client.GrpcChannelPoolFactory;
import nu.marginalia.service.client.GrpcMultiNodeChannelPool;
import nu.marginalia.service.discovery.property.ServiceKey;
import nu.marginalia.service.discovery.property.ServicePartition;
import nu.marginalia.storage.model.FileStorageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import static nu.marginalia.functions.execution.api.ExecutorCrawlApiGrpc.*;
@Singleton
public class ExecutorCrawlClient {
private final GrpcMultiNodeChannelPool<ExecutorCrawlApiBlockingStub> channelPool;
private static final Logger logger = LoggerFactory.getLogger(ExecutorCrawlClient.class);
@Inject
public ExecutorCrawlClient(GrpcChannelPoolFactory grpcChannelPoolFactory)
{
this.channelPool = grpcChannelPoolFactory
.createMulti(
ServiceKey.forGrpcApi(ExecutorCrawlApiGrpc.class, ServicePartition.multi()),
ExecutorCrawlApiGrpc::newBlockingStub);
}
public void triggerCrawl(int node, FileStorageId fid) {
channelPool.call(ExecutorCrawlApiBlockingStub::triggerCrawl)
.forNode(node)
.run(RpcFileStorageId.newBuilder()
.setFileStorageId(fid.id())
.build());
}
public void triggerRecrawl(int node, FileStorageId fid) {
channelPool.call(ExecutorCrawlApiBlockingStub::triggerRecrawl)
.forNode(node)
.run(RpcFileStorageId.newBuilder()
.setFileStorageId(fid.id())
.build());
}
public void triggerConvert(int node, FileStorageId fid) {
channelPool.call(ExecutorCrawlApiBlockingStub::triggerConvert)
.forNode(node)
.run(RpcFileStorageId.newBuilder()
.setFileStorageId(fid.id())
.build());
}
public void triggerConvertAndLoad(int node, FileStorageId fid) {
channelPool.call(ExecutorCrawlApiBlockingStub::triggerConvertAndLoad)
.forNode(node)
.run(RpcFileStorageId.newBuilder()
.setFileStorageId(fid.id())
.build());
}
public void loadProcessedData(int node, List<FileStorageId> ids) {
channelPool.call(ExecutorCrawlApiBlockingStub::loadProcessedData)
.forNode(node)
.run(RpcFileStorageIds.newBuilder()
.addAllFileStorageIds(ids.stream().map(FileStorageId::id).toList())
.build());
}
public void createCrawlSpecFromDownload(int node, String description, String url) {
channelPool.call(ExecutorCrawlApiBlockingStub::createCrawlSpecFromDownload)
.forNode(node)
.run(RpcCrawlSpecFromDownload.newBuilder()
.setDescription(description)
.setUrl(url)
.build());
}
}

View File

@@ -0,0 +1,74 @@
package nu.marginalia.executor.client;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.functions.execution.api.Empty;
import nu.marginalia.functions.execution.api.ExecutorExportApiGrpc;
import nu.marginalia.functions.execution.api.RpcExportSampleData;
import nu.marginalia.functions.execution.api.RpcFileStorageId;
import nu.marginalia.service.client.GrpcChannelPoolFactory;
import nu.marginalia.service.client.GrpcMultiNodeChannelPool;
import nu.marginalia.service.discovery.property.ServiceKey;
import nu.marginalia.service.discovery.property.ServicePartition;
import nu.marginalia.storage.model.FileStorageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static nu.marginalia.functions.execution.api.ExecutorExportApiGrpc.ExecutorExportApiBlockingStub;
@Singleton
public class ExecutorExportClient {
private final GrpcMultiNodeChannelPool<ExecutorExportApiBlockingStub> channelPool;
private static final Logger logger = LoggerFactory.getLogger(ExecutorExportClient.class);
@Inject
public ExecutorExportClient(GrpcChannelPoolFactory grpcChannelPoolFactory)
{
this.channelPool = grpcChannelPoolFactory
.createMulti(
ServiceKey.forGrpcApi(ExecutorExportApiGrpc.class, ServicePartition.multi()),
ExecutorExportApiGrpc::newBlockingStub);
}
public void exportAtags(int node, FileStorageId fid) {
channelPool.call(ExecutorExportApiBlockingStub::exportAtags)
.forNode(node)
.run(RpcFileStorageId.newBuilder()
.setFileStorageId(fid.id())
.build());
}
public void exportSampleData(int node, FileStorageId fid, int size, String name) {
channelPool.call(ExecutorExportApiBlockingStub::exportSampleData)
.forNode(node)
.run(RpcExportSampleData.newBuilder()
.setFileStorageId(fid.id())
.setSize(size)
.setName(name)
.build());
}
public void exportRssFeeds(int node, FileStorageId fid) {
channelPool.call(ExecutorExportApiBlockingStub::exportRssFeeds)
.forNode(node)
.run(RpcFileStorageId.newBuilder()
.setFileStorageId(fid.id())
.build());
}
public void exportTermFrequencies(int node, FileStorageId fid) {
channelPool.call(ExecutorExportApiBlockingStub::exportTermFrequencies)
.forNode(node)
.run(RpcFileStorageId.newBuilder()
.setFileStorageId(fid.id())
.build());
}
public void exportData(int node) {
channelPool.call(ExecutorExportApiBlockingStub::exportData)
.forNode(node)
.run(Empty.getDefaultInstance());
}
}

View File

@@ -0,0 +1,71 @@
package nu.marginalia.executor.client;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.functions.execution.api.*;
import nu.marginalia.service.client.GrpcChannelPoolFactory;
import nu.marginalia.service.client.GrpcMultiNodeChannelPool;
import nu.marginalia.service.discovery.property.ServiceKey;
import nu.marginalia.service.discovery.property.ServicePartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import static nu.marginalia.functions.execution.api.ExecutorSideloadApiGrpc.ExecutorSideloadApiBlockingStub;
@Singleton
public class ExecutorSideloadClient {
private final GrpcMultiNodeChannelPool<ExecutorSideloadApiBlockingStub> channelPool;
private static final Logger logger = LoggerFactory.getLogger(ExecutorSideloadClient.class);
@Inject
public ExecutorSideloadClient(GrpcChannelPoolFactory grpcChannelPoolFactory)
{
this.channelPool = grpcChannelPoolFactory
.createMulti(
ServiceKey.forGrpcApi(ExecutorSideloadApiGrpc.class, ServicePartition.multi()),
ExecutorSideloadApiGrpc::newBlockingStub);
}
public void sideloadEncyclopedia(int node, Path sourcePath, String baseUrl) {
channelPool.call(ExecutorSideloadApiBlockingStub::sideloadEncyclopedia)
.forNode(node)
.run(RpcSideloadEncyclopedia.newBuilder()
.setBaseUrl(baseUrl)
.setSourcePath(sourcePath.toString())
.build());
}
public void sideloadDirtree(int node, Path sourcePath) {
channelPool.call(ExecutorSideloadApiBlockingStub::sideloadDirtree)
.forNode(node)
.run(RpcSideloadDirtree.newBuilder()
.setSourcePath(sourcePath.toString())
.build());
}
public void sideloadReddit(int node, Path sourcePath) {
channelPool.call(ExecutorSideloadApiBlockingStub::sideloadReddit)
.forNode(node)
.run(RpcSideloadReddit.newBuilder()
.setSourcePath(sourcePath.toString())
.build());
}
public void sideloadWarc(int node, Path sourcePath) {
channelPool.call(ExecutorSideloadApiBlockingStub::sideloadWarc)
.forNode(node)
.run(RpcSideloadWarc.newBuilder()
.setSourcePath(sourcePath.toString())
.build());
}
public void sideloadStackexchange(int node, Path sourcePath) {
channelPool.call(ExecutorSideloadApiBlockingStub::sideloadStackexchange)
.forNode(node)
.run(RpcSideloadStackexchange.newBuilder()
.setSourcePath(sourcePath.toString())
.build());
}
}

View File

@@ -1,39 +1,47 @@
syntax="proto3";
package actorapi;
option java_package="nu.marginalia.executor.api";
package nu.marginalia.functions.execution.api;
option java_package="nu.marginalia.functions.execution.api";
option java_multiple_files=true;
service ExecutorApi {
rpc startFsm(RpcFsmName) returns (Empty) {}
rpc stopFsm(RpcFsmName) returns (Empty) {}
rpc stopProcess(RpcProcessId) returns (Empty) {}
rpc getActorStates(Empty) returns (RpcActorRunStates) {}
rpc listSideloadDir(Empty) returns (RpcUploadDirContents) {}
rpc listFileStorage(RpcFileStorageId) returns (RpcFileStorageContent) {}
rpc downloadSampleData(RpcDownloadSampleData) returns (Empty) {}
rpc calculateAdjacencies(Empty) returns (Empty) {}
rpc restoreBackup(RpcFileStorageId) returns (Empty) {}
}
service ExecutorCrawlApi {
rpc triggerCrawl(RpcFileStorageId) returns (Empty) {}
rpc triggerRecrawl(RpcFileStorageId) returns (Empty) {}
rpc triggerConvert(RpcFileStorageId) returns (Empty) {}
rpc triggerConvertAndLoad(RpcFileStorageId) returns (Empty) {}
rpc loadProcessedData(RpcFileStorageIds) returns (Empty) {}
rpc calculateAdjacencies(Empty) returns (Empty) {}
rpc createCrawlSpecFromDownload(RpcCrawlSpecFromDownload) returns (Empty) {}
}
service ExecutorSideloadApi {
rpc sideloadEncyclopedia(RpcSideloadEncyclopedia) returns (Empty) {}
rpc sideloadDirtree(RpcSideloadDirtree) returns (Empty) {}
rpc sideloadWarc(RpcSideloadWarc) returns (Empty) {}
rpc sideloadReddit(RpcSideloadReddit) returns (Empty) {}
rpc sideloadStackexchange(RpcSideloadStackexchange) returns (Empty) {}
}
rpc createCrawlSpecFromDownload(RpcCrawlSpecFromDownload) returns (Empty) {}
service ExecutorExportApi {
rpc exportAtags(RpcFileStorageId) returns (Empty) {}
rpc exportSampleData(RpcExportSampleData) returns (Empty) {}
rpc exportRssFeeds(RpcFileStorageId) returns (Empty) {}
rpc exportTermFrequencies(RpcFileStorageId) returns (Empty) {}
rpc downloadSampleData(RpcDownloadSampleData) returns (Empty) {}
rpc exportData(Empty) returns (Empty) {}
rpc restoreBackup(RpcFileStorageId) returns (Empty) {}
rpc getActorStates(Empty) returns (RpcActorRunStates) {}
rpc listSideloadDir(Empty) returns (RpcUploadDirContents) {}
rpc listFileStorage(RpcFileStorageId) returns (RpcFileStorageContent) {}
}
message Empty {}

View File

@@ -0,0 +1,75 @@
plugins {
id 'java'
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(21))
}
}
dependencies {
// These look weird but they're needed to be able to spawn the processes
// from the executor service
implementation project(':code:processes:website-adjacencies-calculator')
implementation project(':code:processes:crawling-process')
implementation project(':code:processes:loading-process')
implementation project(':code:processes:converting-process')
implementation project(':code:processes:index-constructor-process')
implementation project(':code:common:config')
implementation project(':code:common:model')
implementation project(':code:common:process')
implementation project(':code:common:db')
implementation project(':code:common:linkdb')
implementation project(':code:common:service')
implementation project(':code:common:service-discovery')
implementation project(':third-party:commons-codec')
implementation project(':code:libraries:message-queue')
implementation project(':code:functions:domain-links:api')
implementation project(':code:execution:api')
implementation project(':code:process-models:crawl-spec')
implementation project(':code:process-models:crawling-model')
implementation project(':code:features-crawl:link-parser')
implementation project(':code:features-convert:data-extractors')
implementation project(':code:features-convert:stackexchange-xml')
implementation project(':code:features-convert:reddit-json')
implementation project(':code:index:index-journal')
implementation project(':code:index:api')
implementation project(':code:process-mqapi')
implementation project(':third-party:encyclopedia-marginalia-nu')
implementation libs.bundles.slf4j
implementation libs.spark
implementation libs.bundles.grpc
implementation libs.gson
implementation libs.prometheus
implementation libs.notnull
implementation libs.guice
implementation libs.trove
implementation libs.protobuf
implementation libs.zstd
implementation libs.jsoup
implementation libs.commons.io
implementation libs.commons.compress
implementation libs.commons.lang3
implementation libs.bundles.mariadb
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4')
testImplementation 'org.testcontainers:mariadb:1.17.4'
testImplementation 'org.testcontainers:junit-jupiter:1.17.4'
testImplementation project(':code:libraries:test-helpers')
}

View File

@@ -2,11 +2,7 @@ package nu.marginalia.actor;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.executor.api.RpcActorRunState;
import nu.marginalia.executor.api.RpcActorRunStates;
import nu.marginalia.executor.api.RpcFsmName;
import nu.marginalia.executor.api.RpcProcessId;
import nu.marginalia.functions.execution.api.*;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.process.ProcessService;
@@ -15,8 +11,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.Spark;
import java.util.Comparator;
@Singleton
public class ActorApi {
private final ExecutorActorControlService actors;
@@ -75,45 +69,6 @@ public class ActorApi {
return "OK";
}
public RpcActorRunStates getActorStates() {
var items = actors.getActorStates().entrySet().stream().map(e -> {
final var stateGraph = actors.getActorDefinition(e.getKey());
final ActorStateInstance state = e.getValue();
final String actorDescription = stateGraph.describe();
final String machineName = e.getKey().name();
final String stateName = state.name();
final String stateDescription = "";
final boolean terminal = state.isFinal();
final boolean canStart = actors.isDirectlyInitializable(e.getKey()) && terminal;
return RpcActorRunState
.newBuilder()
.setActorName(machineName)
.setState(stateName)
.setActorDescription(actorDescription)
.setStateDescription(stateDescription)
.setTerminal(terminal)
.setCanStart(canStart)
.build();
})
.filter(s -> !s.getTerminal() || s.getCanStart())
.sorted(Comparator.comparing(RpcActorRunState::getActorName))
.toList();
return RpcActorRunStates.newBuilder()
.setNode(serviceConfiguration.node())
.addAllActorRunStates(items)
.build();
}
public ExecutorActor translateActor(String name) {
try {
return ExecutorActor.valueOf(name.toUpperCase());

View File

@@ -0,0 +1,26 @@
package nu.marginalia.execution;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.ExecutorActorControlService;
import nu.marginalia.actor.ExecutorActor;
@Singleton
public class ExecutionInit {
private final ExecutorActorControlService actorControlService;
@Inject
public ExecutionInit(ExecutorActorControlService actorControlService) {
this.actorControlService = actorControlService;
}
public void initDefaultActors() throws Exception {
actorControlService.start(ExecutorActor.MONITOR_PROCESS_LIVENESS);
actorControlService.start(ExecutorActor.MONITOR_FILE_STORAGE);
actorControlService.start(ExecutorActor.PROC_CONVERTER_SPAWNER);
actorControlService.start(ExecutorActor.PROC_CRAWLER_SPAWNER);
actorControlService.start(ExecutorActor.PROC_INDEX_CONSTRUCTOR_SPAWNER);
actorControlService.start(ExecutorActor.PROC_LOADER_SPAWNER);
}
}

View File

@@ -0,0 +1,113 @@
package nu.marginalia.execution;
import com.google.inject.Inject;
import io.grpc.stub.StreamObserver;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.ExecutorActorControlService;
import nu.marginalia.actor.task.*;
import nu.marginalia.functions.execution.api.*;
import nu.marginalia.storage.model.FileStorageId;
import java.util.stream.Collectors;
public class ExecutorCrawlGrpcService extends ExecutorCrawlApiGrpc.ExecutorCrawlApiImplBase {
private final ExecutorActorControlService actorControlService;
@Inject
public ExecutorCrawlGrpcService(ExecutorActorControlService actorControlService)
{
this.actorControlService = actorControlService;
}
@Override
public void triggerCrawl(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
try {
actorControlService.startFrom(ExecutorActor.CRAWL,
new CrawlActor.Initial(FileStorageId.of(request.getFileStorageId())));
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void triggerRecrawl(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
try {
actorControlService.startFrom(ExecutorActor.RECRAWL,
new RecrawlActor.Initial(FileStorageId.of(request.getFileStorageId()), false));
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void triggerConvert(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
try {
actorControlService.startFrom(ExecutorActor.CONVERT,
new ConvertActor.Convert(FileStorageId.of(request.getFileStorageId())));
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void triggerConvertAndLoad(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
try {
actorControlService.startFrom(ExecutorActor.CONVERT_AND_LOAD,
new ConvertAndLoadActor.Initial(FileStorageId.of(request.getFileStorageId())));
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void loadProcessedData(RpcFileStorageIds request, StreamObserver<Empty> responseObserver) {
try {
actorControlService.startFrom(ExecutorActor.CONVERT_AND_LOAD,
new ConvertAndLoadActor.Load(request.getFileStorageIdsList()
.stream()
.map(FileStorageId::of)
.collect(Collectors.toList()))
);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void createCrawlSpecFromDownload(RpcCrawlSpecFromDownload request, StreamObserver<Empty> responseObserver) {
try {
actorControlService.startFrom(ExecutorActor.CRAWL_JOB_EXTRACTOR,
new CrawlJobExtractorActor.CreateFromUrl(
request.getDescription(),
request.getUrl())
);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
}

View File

@@ -0,0 +1,95 @@
package nu.marginalia.execution;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.grpc.stub.StreamObserver;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.ExecutorActorControlService;
import nu.marginalia.actor.task.*;
import nu.marginalia.functions.execution.api.Empty;
import nu.marginalia.functions.execution.api.ExecutorExportApiGrpc;
import nu.marginalia.functions.execution.api.RpcExportSampleData;
import nu.marginalia.functions.execution.api.RpcFileStorageId;
import nu.marginalia.storage.model.FileStorageId;
@Singleton
public class ExecutorExportGrpcService extends ExecutorExportApiGrpc.ExecutorExportApiImplBase {
private final ExecutorActorControlService actorControlService;
@Inject
public ExecutorExportGrpcService(ExecutorActorControlService actorControlService) {
this.actorControlService = actorControlService;
}
@Override
public void exportAtags(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
try {
actorControlService.startFrom(ExecutorActor.EXPORT_ATAGS,
new ExportAtagsActor.Export(FileStorageId.of(request.getFileStorageId()))
);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void exportSampleData(RpcExportSampleData request, StreamObserver<Empty> responseObserver) {
try {
actorControlService.startFrom(ExecutorActor.EXPORT_SAMPLE_DATA,
new ExportSampleDataActor.Export(
FileStorageId.of(request.getFileStorageId()),
request.getSize(),
request.getName()
)
);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void exportRssFeeds(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
try {
actorControlService.startFrom(ExecutorActor.EXPORT_FEEDS,
new ExportFeedsActor.Export(FileStorageId.of(request.getFileStorageId()))
);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void exportTermFrequencies(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
try {
actorControlService.startFrom(ExecutorActor.EXPORT_TERM_FREQUENCIES,
new ExportTermFreqActor.Export(FileStorageId.of(request.getFileStorageId()))
);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void exportData(Empty request, StreamObserver<Empty> responseObserver) {
try {
actorControlService.startFrom(ExecutorActor.EXPORT_DATA, new ExportDataActor.Export());
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
}

View File

@@ -0,0 +1,237 @@
package nu.marginalia.execution;
import com.google.inject.Inject;
import io.grpc.stub.StreamObserver;
import lombok.SneakyThrows;
import nu.marginalia.WmsaHome;
import nu.marginalia.actor.ActorApi;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.ExecutorActorControlService;
import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.actor.task.DownloadSampleActor;
import nu.marginalia.actor.task.RestoreBackupActor;
import nu.marginalia.actor.task.TriggerAdjacencyCalculationActor;
import nu.marginalia.functions.execution.api.*;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Comparator;
public class ExecutorGrpcService extends ExecutorApiGrpc.ExecutorApiImplBase {
private final ActorApi actorApi;
private final FileStorageService fileStorageService;
private final ServiceConfiguration serviceConfiguration;
private final ExecutorActorControlService actorControlService;
@Inject
public ExecutorGrpcService(ActorApi actorApi,
FileStorageService fileStorageService,
ServiceConfiguration serviceConfiguration,
ExecutorActorControlService actorControlService)
{
this.actorApi = actorApi;
this.fileStorageService = fileStorageService;
this.serviceConfiguration = serviceConfiguration;
this.actorControlService = actorControlService;
}
@Override
public void startFsm(RpcFsmName request, StreamObserver<Empty> responseObserver) {
try {
actorApi.startActor(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void stopFsm(RpcFsmName request, StreamObserver<Empty> responseObserver) {
try {
actorApi.stopActor(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void stopProcess(RpcProcessId request, StreamObserver<Empty> responseObserver) {
try {
actorApi.stopProcess(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void calculateAdjacencies(Empty request, StreamObserver<Empty> responseObserver) {
try {
actorControlService.startFrom(ExecutorActor.ADJACENCY_CALCULATION,
new TriggerAdjacencyCalculationActor.Run());
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void downloadSampleData(RpcDownloadSampleData request, StreamObserver<Empty> responseObserver) {
try {
String sampleSet = request.getSampleSet();
actorControlService.startFrom(ExecutorActor.DOWNLOAD_SAMPLE,
new DownloadSampleActor.Run(sampleSet));
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void restoreBackup(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
try {
var fid = FileStorageId.of(request.getFileStorageId());
actorControlService.startFrom(ExecutorActor.RESTORE_BACKUP,
new RestoreBackupActor.Restore(fid));
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void getActorStates(Empty request, StreamObserver<RpcActorRunStates> responseObserver) {
var items = actorControlService.getActorStates().entrySet().stream().map(e -> {
final var stateGraph = actorControlService.getActorDefinition(e.getKey());
final ActorStateInstance state = e.getValue();
final String actorDescription = stateGraph.describe();
final String machineName = e.getKey().name();
final String stateName = state.name();
final String stateDescription = "";
final boolean terminal = state.isFinal();
final boolean canStart = actorControlService.isDirectlyInitializable(e.getKey()) && terminal;
return RpcActorRunState
.newBuilder()
.setActorName(machineName)
.setState(stateName)
.setActorDescription(actorDescription)
.setStateDescription(stateDescription)
.setTerminal(terminal)
.setCanStart(canStart)
.build();
})
.filter(s -> !s.getTerminal() || s.getCanStart())
.sorted(Comparator.comparing(RpcActorRunState::getActorName))
.toList();
responseObserver.onNext(RpcActorRunStates.newBuilder()
.setNode(serviceConfiguration.node())
.addAllActorRunStates(items)
.build());
responseObserver.onCompleted();
}
@Override
public void listSideloadDir(Empty request, StreamObserver<RpcUploadDirContents> responseObserver) {
try {
Path uploadDir = WmsaHome.getUploadDir();
try (var items = Files.list(uploadDir).sorted(
Comparator.comparing((Path d) -> Files.isDirectory(d)).reversed()
.thenComparing(path -> path.getFileName().toString())
)) {
var builder = RpcUploadDirContents.newBuilder().setPath(uploadDir.toString());
var iter = items.iterator();
while (iter.hasNext()) {
var path = iter.next();
boolean isDir = Files.isDirectory(path);
long size = isDir ? 0 : Files.size(path);
var mtime = Files.getLastModifiedTime(path);
builder.addEntriesBuilder()
.setName(path.toString())
.setIsDirectory(isDir)
.setLastModifiedTime(
LocalDateTime.ofInstant(mtime.toInstant(), ZoneId.systemDefault()).format(DateTimeFormatter.ISO_DATE_TIME))
.setSize(size);
}
responseObserver.onNext(builder.build());
}
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void listFileStorage(RpcFileStorageId request, StreamObserver<RpcFileStorageContent> responseObserver) {
try {
FileStorageId fileStorageId = FileStorageId.of(request.getFileStorageId());
var storage = fileStorageService.getStorage(fileStorageId);
var builder = RpcFileStorageContent.newBuilder();
try (var fs = Files.list(storage.asPath())) {
fs.filter(Files::isRegularFile)
.map(this::createFileModel)
.sorted(Comparator.comparing(RpcFileStorageEntry::getName))
.forEach(builder::addEntries);
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@SneakyThrows
private RpcFileStorageEntry createFileModel(Path path) {
return RpcFileStorageEntry.newBuilder()
.setName(path.toFile().getName())
.setSize(Files.size(path))
.setLastModifiedTime(Files.getLastModifiedTime(path).toInstant().toString())
.build();
}
}

View File

@@ -0,0 +1,96 @@
package nu.marginalia.execution;
import com.google.inject.Inject;
import io.grpc.stub.StreamObserver;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.ExecutorActorControlService;
import nu.marginalia.actor.task.ConvertActor;
import nu.marginalia.functions.execution.api.*;
public class ExecutorSideloadGrpcService extends ExecutorSideloadApiGrpc.ExecutorSideloadApiImplBase {
private final ExecutorActorControlService actorControlService;
@Inject
public ExecutorSideloadGrpcService(ExecutorActorControlService actorControlService)
{
this.actorControlService = actorControlService;
}
@Override
public void sideloadEncyclopedia(RpcSideloadEncyclopedia request, StreamObserver<Empty> responseObserver) {
try {
actorControlService.startFrom(ExecutorActor.CONVERT,
new ConvertActor.ConvertEncyclopedia(
request.getSourcePath(),
request.getBaseUrl()
));
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void sideloadDirtree(RpcSideloadDirtree request, StreamObserver<Empty> responseObserver) {
try {
actorControlService.startFrom(ExecutorActor.CONVERT,
new ConvertActor.ConvertDirtree(request.getSourcePath())
);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void sideloadReddit(RpcSideloadReddit request, StreamObserver<Empty> responseObserver) {
try {
actorControlService.startFrom(ExecutorActor.CONVERT,
new ConvertActor.ConvertReddit(request.getSourcePath())
);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void sideloadWarc(RpcSideloadWarc request, StreamObserver<Empty> responseObserver) {
try {
actorControlService.startFrom(ExecutorActor.CONVERT,
new ConvertActor.ConvertWarc(request.getSourcePath())
);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void sideloadStackexchange(RpcSideloadStackexchange request, StreamObserver<Empty> responseObserver) {
try {
actorControlService.startFrom(ExecutorActor.CONVERT,
new ConvertActor.ConvertStackexchange(request.getSourcePath())
);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
}

View File

@@ -3,6 +3,7 @@ package nu.marginalia.svc;
import com.github.luben.zstd.ZstdInputStream;
import com.github.luben.zstd.ZstdOutputStream;
import nu.marginalia.IndexLocations;
import nu.marginalia.linkdb.LinkdbFileNames;
import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
@@ -18,9 +19,6 @@ import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.List;
import static nu.marginalia.linkdb.LinkdbFileNames.DOCDB_FILE_NAME;
import static nu.marginalia.linkdb.LinkdbFileNames.DOMAIN_LINKS_FILE_NAME;
public class BackupService {
private final FileStorageService storageService;
@@ -59,10 +57,10 @@ public class BackupService {
try (var heartbeat = serviceHeartbeat.createServiceTaskHeartbeat(BackupHeartbeatSteps.class, "Backup")) {
heartbeat.progress(BackupHeartbeatSteps.DOCS);
backupFileCompressed(DOCDB_FILE_NAME, linkdbStagingStorage, backupStorage.asPath());
backupFileCompressed(LinkdbFileNames.DOCDB_FILE_NAME, linkdbStagingStorage, backupStorage.asPath());
heartbeat.progress(BackupHeartbeatSteps.LINKS);
backupFileCompressed(DOMAIN_LINKS_FILE_NAME, linkdbStagingStorage, backupStorage.asPath());
backupFileCompressed(LinkdbFileNames.DOMAIN_LINKS_FILE_NAME, linkdbStagingStorage, backupStorage.asPath());
heartbeat.progress(BackupHeartbeatSteps.JOURNAL);
// This file format is already compressed
@@ -84,10 +82,10 @@ public class BackupService {
try (var heartbeat = serviceHeartbeat.createServiceTaskHeartbeat(BackupHeartbeatSteps.class, "Restore Backup")) {
heartbeat.progress(BackupHeartbeatSteps.DOCS);
restoreBackupCompressed(DOCDB_FILE_NAME, linkdbStagingStorage, backupStorage);
restoreBackupCompressed(LinkdbFileNames.DOCDB_FILE_NAME, linkdbStagingStorage, backupStorage);
heartbeat.progress(BackupHeartbeatSteps.LINKS);
restoreBackupCompressed(DOMAIN_LINKS_FILE_NAME, linkdbStagingStorage, backupStorage);
restoreBackupCompressed(LinkdbFileNames.DOMAIN_LINKS_FILE_NAME, linkdbStagingStorage, backupStorage);
heartbeat.progress(BackupHeartbeatSteps.JOURNAL);
restoreJournal(indexStagingStorage, backupStorage);

View File

@@ -46,7 +46,7 @@ dependencies {
implementation project(':code:libraries:message-queue')
implementation project(':code:common:service-discovery')
implementation project(':code:functions:search-query:api')
implementation project(':code:api:executor-api')
implementation project(':code:execution:api')
implementation project(':code:index:api')
implementation project(':code:process-mqapi')
implementation project(':code:features-search:screenshots')

View File

@@ -24,7 +24,8 @@ public class ControlFileStorageService {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Inject
public ControlFileStorageService(FileStorageService fileStorageService, ExecutorClient executorClient)
public ControlFileStorageService(FileStorageService fileStorageService,
ExecutorClient executorClient)
{
this.fileStorageService = fileStorageService;
this.executorClient = executorClient;

View File

@@ -5,6 +5,9 @@ import com.google.inject.Singleton;
import nu.marginalia.control.ControlValidationError;
import nu.marginalia.control.RedirectControl;
import nu.marginalia.executor.client.ExecutorClient;
import nu.marginalia.executor.client.ExecutorCrawlClient;
import nu.marginalia.executor.client.ExecutorExportClient;
import nu.marginalia.executor.client.ExecutorSideloadClient;
import nu.marginalia.index.api.IndexMqClient;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.storage.FileStorageService;
@@ -31,13 +34,16 @@ public class ControlNodeActionsService {
private final FileStorageService fileStorageService;
private final ServiceEventLog eventLog;
private final ExecutorClient executorClient;
private final ExecutorCrawlClient crawlClient;
private final ExecutorSideloadClient sideloadClient;
private final ExecutorExportClient exportClient;
@Inject
public ControlNodeActionsService(ExecutorClient executorClient,
IndexMqClient indexMqClient,
RedirectControl redirectControl,
FileStorageService fileStorageService,
ServiceEventLog eventLog)
ServiceEventLog eventLog, ExecutorCrawlClient crawlClient, ExecutorSideloadClient sideloadClient, ExecutorExportClient exportClient)
{
this.executorClient = executorClient;
@@ -46,6 +52,9 @@ public class ControlNodeActionsService {
this.fileStorageService = fileStorageService;
this.eventLog = eventLog;
this.crawlClient = crawlClient;
this.sideloadClient = sideloadClient;
this.exportClient = exportClient;
}
public void register() {
@@ -127,7 +136,7 @@ public class ControlNodeActionsService {
eventLog.logEvent("USER-ACTION", "SIDELOAD ENCYCLOPEDIA " + nodeId);
executorClient.sideloadEncyclopedia(nodeId, sourcePath, baseUrl);
sideloadClient.sideloadEncyclopedia(nodeId, sourcePath, baseUrl);
return "";
}
@@ -140,7 +149,7 @@ public class ControlNodeActionsService {
eventLog.logEvent("USER-ACTION", "SIDELOAD DIRTREE " + nodeId);
executorClient.sideloadDirtree(nodeId, sourcePath);
sideloadClient.sideloadDirtree(nodeId, sourcePath);
return "";
}
@@ -152,7 +161,7 @@ public class ControlNodeActionsService {
eventLog.logEvent("USER-ACTION", "SIDELOAD REDDIT " + nodeId);
executorClient.sideloadReddit(nodeId, sourcePath);
sideloadClient.sideloadReddit(nodeId, sourcePath);
return "";
}
@@ -163,7 +172,7 @@ public class ControlNodeActionsService {
eventLog.logEvent("USER-ACTION", "SIDELOAD WARC " + nodeId);
executorClient.sideloadWarc(nodeId, sourcePath);
sideloadClient.sideloadWarc(nodeId, sourcePath);
return "";
}
@@ -178,7 +187,7 @@ public class ControlNodeActionsService {
eventLog.logEvent("USER-ACTION", "SIDELOAD STACKEXCHANGE " + nodeId);
executorClient.sideloadStackexchange(nodeId, sourcePath);
sideloadClient.sideloadStackexchange(nodeId, sourcePath);
return "";
}
@@ -196,7 +205,7 @@ public class ControlNodeActionsService {
changeActiveStorage(nodeId, FileStorageType.CRAWL_DATA, toCrawl);
executorClient.triggerRecrawl(
crawlClient.triggerRecrawl(
nodeId,
toCrawl
);
@@ -211,7 +220,7 @@ public class ControlNodeActionsService {
changeActiveStorage(nodeId, FileStorageType.CRAWL_SPEC, toCrawl);
executorClient.triggerCrawl(nodeId, toCrawl);
crawlClient.triggerCrawl(nodeId, toCrawl);
return "";
}
@@ -224,10 +233,10 @@ public class ControlNodeActionsService {
changeActiveStorage(nodeId, FileStorageType.PROCESSED_DATA, toProcess);
if (isAutoload) {
executorClient.triggerConvertAndLoad(nodeId, toProcess);
crawlClient.triggerConvertAndLoad(nodeId, toProcess);
}
else {
executorClient.triggerConvert(nodeId, toProcess);
crawlClient.triggerConvert(nodeId, toProcess);
}
return "";
@@ -245,7 +254,7 @@ public class ControlNodeActionsService {
changeActiveStorage(nodeId, FileStorageType.PROCESSED_DATA, ids.toArray(new FileStorageId[0]));
executorClient.loadProcessedData(nodeId, ids);
crawlClient.loadProcessedData(nodeId, ids);
return "";
}
@@ -287,13 +296,13 @@ public class ControlNodeActionsService {
throw new ControlValidationError("No url specified", "A url must be specified", "..");
}
executorClient.createCrawlSpecFromDownload(nodeId, description, url);
crawlClient.createCrawlSpecFromDownload(nodeId, description, url);
return "";
}
private Object exportDbData(Request req, Response rsp) {
executorClient.exportData(Integer.parseInt(req.params("id")));
exportClient.exportData(Integer.parseInt(req.params("id")));
return "";
}
@@ -303,9 +312,9 @@ public class ControlNodeActionsService {
FileStorageId source = parseSourceFileStorageId(req.queryParams("source"));
switch (exportType) {
case "atags" -> executorClient.exportAtags(Integer.parseInt(req.params("id")), source);
case "rss" -> executorClient.exportRssFeeds(Integer.parseInt(req.params("id")), source);
case "termFreq" -> executorClient.exportTermFrequencies(Integer.parseInt(req.params("id")), source);
case "atags" -> exportClient.exportAtags(Integer.parseInt(req.params("id")), source);
case "rss" -> exportClient.exportRssFeeds(Integer.parseInt(req.params("id")), source);
case "termFreq" -> exportClient.exportTermFrequencies(Integer.parseInt(req.params("id")), source);
default -> throw new ControlValidationError("No export type specified", "An export type must be specified", "..");
}
@@ -317,7 +326,7 @@ public class ControlNodeActionsService {
int size = Integer.parseInt(req.queryParams("size"));
String name = req.queryParams("name");
executorClient.exportSampleData(Integer.parseInt(req.params("id")), source, size, name);
exportClient.exportSampleData(Integer.parseInt(req.params("id")), source, size, name);
return "";
}

View File

@@ -13,10 +13,6 @@ application {
tasks.distZip.enabled = false
clean {
delete fileTree('build/dist-extra')
}
jib {
from {
image = image = rootProject.ext.dockerImageBase
@@ -72,7 +68,10 @@ dependencies {
implementation project(':code:index:index-journal')
implementation project(':code:index:api')
implementation project(':code:process-mqapi')
implementation project(':code:api:executor-api')
implementation project(':code:execution')
implementation project(':code:execution:api')
implementation project(':third-party:encyclopedia-marginalia-nu')
implementation libs.bundles.slf4j

View File

@@ -1,325 +0,0 @@
package nu.marginalia.executor;
import com.google.inject.Inject;
import io.grpc.stub.StreamObserver;
import nu.marginalia.actor.ActorApi;
import nu.marginalia.executor.api.*;
import nu.marginalia.executor.svc.*;
public class ExecutorGrpcService extends ExecutorApiGrpc.ExecutorApiImplBase {
private final ActorApi actorApi;
private final ExportService exportService;
private final SideloadService sideloadService;
private final BackupService backupService;
private final TransferService transferService;
private final ProcessingService processingService;
@Inject
public ExecutorGrpcService(ActorApi actorApi,
ExportService exportService,
SideloadService sideloadService,
BackupService backupService,
TransferService transferService,
ProcessingService processingService)
{
this.actorApi = actorApi;
this.exportService = exportService;
this.sideloadService = sideloadService;
this.backupService = backupService;
this.transferService = transferService;
this.processingService = processingService;
}
@Override
public void startFsm(RpcFsmName request, StreamObserver<Empty> responseObserver) {
try {
actorApi.startActor(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void stopFsm(RpcFsmName request, StreamObserver<Empty> responseObserver) {
try {
actorApi.stopActor(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void stopProcess(RpcProcessId request, StreamObserver<Empty> responseObserver) {
try {
actorApi.stopProcess(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void triggerCrawl(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
try {
processingService.startCrawl(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void triggerRecrawl(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
try {
processingService.startRecrawl(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void triggerConvert(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
try {
processingService.startConversion(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void triggerConvertAndLoad(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
try {
processingService.startConvertLoad(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void loadProcessedData(RpcFileStorageIds request, StreamObserver<Empty> responseObserver) {
try {
processingService.startLoad(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void calculateAdjacencies(Empty request, StreamObserver<Empty> responseObserver) {
try {
processingService.startAdjacencyCalculation();
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void sideloadEncyclopedia(RpcSideloadEncyclopedia request, StreamObserver<Empty> responseObserver) {
try {
sideloadService.sideloadEncyclopedia(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void sideloadDirtree(RpcSideloadDirtree request, StreamObserver<Empty> responseObserver) {
try {
sideloadService.sideloadDirtree(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void sideloadReddit(RpcSideloadReddit request, StreamObserver<Empty> responseObserver) {
try {
sideloadService.sideloadReddit(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void sideloadWarc(RpcSideloadWarc request, StreamObserver<Empty> responseObserver) {
try {
sideloadService.sideloadWarc(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void sideloadStackexchange(RpcSideloadStackexchange request, StreamObserver<Empty> responseObserver) {
try {
sideloadService.sideloadStackexchange(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void createCrawlSpecFromDownload(RpcCrawlSpecFromDownload request, StreamObserver<Empty> responseObserver) {
try {
processingService.createCrawlSpecFromDownload(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void exportAtags(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
try {
exportService.exportAtags(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void exportSampleData(RpcExportSampleData request, StreamObserver<Empty> responseObserver) {
try {
exportService.exportSampleData(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void exportRssFeeds(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
try {
exportService.exportFeeds(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void exportTermFrequencies(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
try {
exportService.exportTermFrequencies(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void downloadSampleData(RpcDownloadSampleData request, StreamObserver<Empty> responseObserver) {
try {
sideloadService.downloadSampleData(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void exportData(Empty request, StreamObserver<Empty> responseObserver) {
try {
exportService.exportData();
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void restoreBackup(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
try {
backupService.restore(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void getActorStates(Empty request, StreamObserver<RpcActorRunStates> responseObserver) {
responseObserver.onNext(actorApi.getActorStates());
responseObserver.onCompleted();
}
@Override
public void listSideloadDir(Empty request, StreamObserver<RpcUploadDirContents> responseObserver) {
try {
responseObserver.onNext(sideloadService.listUploadDir());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void listFileStorage(RpcFileStorageId request, StreamObserver<RpcFileStorageContent> responseObserver) {
try {
responseObserver.onNext(transferService.listFiles(request));
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
}

View File

@@ -1,50 +1,75 @@
package nu.marginalia.executor;
import com.google.inject.Inject;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.ExecutorActorControlService;
import nu.marginalia.executor.svc.TransferService;
import nu.marginalia.execution.*;
import nu.marginalia.service.discovery.property.ServicePartition;
import nu.marginalia.service.server.BaseServiceParams;
import nu.marginalia.service.server.Service;
import nu.marginalia.service.server.mq.MqRequest;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.Request;
import spark.Response;
import spark.Spark;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
// Weird name for this one to not have clashes with java.util.concurrent.ExecutorService
public class ExecutorSvc extends Service {
private final ExecutorActorControlService actorControlService;
private static final Logger logger = LoggerFactory.getLogger(ExecutorSvc.class);
private final ExecutionInit executionInit;
private final FileStorageService fileStorageService;
@Inject
public ExecutorSvc(BaseServiceParams params,
ExecutorActorControlService actorControlService,
ExecutorGrpcService executorGrpcService,
TransferService transferService)
ExecutorCrawlGrpcService executorCrawlGrpcService,
ExecutorSideloadGrpcService executorSideloadGrpcService,
ExecutorExportGrpcService executorExportGrpcService,
ExecutionInit executionInit,
FileStorageService fileStorageService)
{
super(params,
ServicePartition.partition(params.configuration.node()),
List.of(executorGrpcService));
this.actorControlService = actorControlService;
List.of(executorGrpcService,
executorCrawlGrpcService,
executorSideloadGrpcService,
executorExportGrpcService)
);
Spark.get("/transfer/file/:fid", transferService::transferFile);
this.executionInit = executionInit;
this.fileStorageService = fileStorageService;
Spark.get("/transfer/file/:fid", this::transferFile);
}
@MqRequest(endpoint="FIRST-BOOT")
public void setUpDefaultActors(String message) throws Exception {
logger.info("Initializing default actors");
actorControlService.start(ExecutorActor.MONITOR_PROCESS_LIVENESS);
actorControlService.start(ExecutorActor.MONITOR_FILE_STORAGE);
actorControlService.start(ExecutorActor.PROC_CONVERTER_SPAWNER);
actorControlService.start(ExecutorActor.PROC_CRAWLER_SPAWNER);
actorControlService.start(ExecutorActor.PROC_INDEX_CONSTRUCTOR_SPAWNER);
actorControlService.start(ExecutorActor.PROC_LOADER_SPAWNER);
executionInit.initDefaultActors();
}
/** Allows transfer of files from each partition */
private Object transferFile(Request request, Response response) throws SQLException, IOException {
FileStorageId fileStorageId = FileStorageId.parse(request.params("fid"));
var fileStorage = fileStorageService.getStorage(fileStorageId);
Path basePath = fileStorage.asPath();
// This is not a public API so injection isn't a concern
Path filePath = basePath.resolve(request.queryParams("path"));
response.type("application/octet-stream");
FileUtils.copyFile(filePath.toFile(), response.raw().getOutputStream());
return "";
}
}

View File

@@ -1,22 +0,0 @@
package nu.marginalia.executor.svc;
import com.google.inject.Inject;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.ExecutorActorControlService;
import nu.marginalia.actor.task.RestoreBackupActor;
import nu.marginalia.executor.api.RpcFileStorageId;
import nu.marginalia.storage.model.FileStorageId;
public class BackupService {
private final ExecutorActorControlService actorControlService;
@Inject
public BackupService(ExecutorActorControlService actorControlService) {
this.actorControlService = actorControlService;
}
public void restore(RpcFileStorageId request) throws Exception {
var fid = FileStorageId.of(request.getFileStorageId());
actorControlService.startFrom(ExecutorActor.RESTORE_BACKUP, new RestoreBackupActor.Restore(fid));
}
}

View File

@@ -1,51 +0,0 @@
package nu.marginalia.executor.svc;
import com.google.inject.Inject;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.ExecutorActorControlService;
import nu.marginalia.actor.task.*;
import nu.marginalia.executor.api.RpcExportSampleData;
import nu.marginalia.executor.api.RpcFileStorageId;
import nu.marginalia.storage.model.FileStorageId;
public class ExportService {
private final ExecutorActorControlService actorControlService;
@Inject
public ExportService(ExecutorActorControlService actorControlService) {
this.actorControlService = actorControlService;
}
public void exportData() throws Exception {
actorControlService.startFrom(ExecutorActor.EXPORT_DATA, new ExportDataActor.Export());
}
public void exportSampleData(RpcExportSampleData request) throws Exception {
actorControlService.startFrom(ExecutorActor.EXPORT_SAMPLE_DATA,
new ExportSampleDataActor.Export(
FileStorageId.of(request.getFileStorageId()),
request.getSize(),
request.getName()
)
);
}
public void exportAtags(RpcFileStorageId request) throws Exception {
actorControlService.startFrom(ExecutorActor.EXPORT_ATAGS,
new ExportAtagsActor.Export(FileStorageId.of(request.getFileStorageId()))
);
}
public void exportFeeds(RpcFileStorageId request) throws Exception {
actorControlService.startFrom(ExecutorActor.EXPORT_FEEDS,
new ExportFeedsActor.Export(FileStorageId.of(request.getFileStorageId()))
);
}
public void exportTermFrequencies(RpcFileStorageId request) throws Exception {
actorControlService.startFrom(ExecutorActor.EXPORT_TERM_FREQUENCIES,
new ExportTermFreqActor.Export(FileStorageId.of(request.getFileStorageId()))
);
}
}

View File

@@ -1,62 +0,0 @@
package nu.marginalia.executor.svc;
import com.google.inject.Inject;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.ExecutorActorControlService;
import nu.marginalia.actor.task.*;
import nu.marginalia.executor.api.RpcCrawlSpecFromDownload;
import nu.marginalia.executor.api.RpcFileStorageId;
import nu.marginalia.executor.api.RpcFileStorageIds;
import nu.marginalia.storage.model.FileStorageId;
import java.util.stream.Collectors;
public class ProcessingService {
private final ExecutorActorControlService actorControlService;
@Inject
public ProcessingService(ExecutorActorControlService actorControlService) {
this.actorControlService = actorControlService;
}
public void startRecrawl(RpcFileStorageId request) throws Exception {
actorControlService.startFrom(ExecutorActor.RECRAWL,
new RecrawlActor.Initial(FileStorageId.of(request.getFileStorageId()), false));
}
public void startCrawl(RpcFileStorageId request) throws Exception {
actorControlService.startFrom(ExecutorActor.CRAWL,
new CrawlActor.Initial(FileStorageId.of(request.getFileStorageId())));
}
public void startConversion(RpcFileStorageId request) throws Exception {
actorControlService.startFrom(ExecutorActor.CONVERT,
new ConvertActor.Convert(FileStorageId.of(request.getFileStorageId())));
}
public void startConvertLoad(RpcFileStorageId request) throws Exception {
actorControlService.startFrom(ExecutorActor.CONVERT_AND_LOAD,
new ConvertAndLoadActor.Initial(FileStorageId.of(request.getFileStorageId())));
}
public void startLoad(RpcFileStorageIds request) throws Exception {
actorControlService.startFrom(ExecutorActor.CONVERT_AND_LOAD,
new ConvertAndLoadActor.Load(request.getFileStorageIdsList()
.stream()
.map(FileStorageId::of)
.collect(Collectors.toList()))
);
}
public void startAdjacencyCalculation() throws Exception {
actorControlService.startFrom(ExecutorActor.ADJACENCY_CALCULATION, new TriggerAdjacencyCalculationActor.Run());
}
public void createCrawlSpecFromDownload(RpcCrawlSpecFromDownload request) throws Exception {
actorControlService.startFrom(ExecutorActor.CRAWL_JOB_EXTRACTOR,
new CrawlJobExtractorActor.CreateFromUrl(
request.getDescription(),
request.getUrl())
);
}
}

View File

@@ -1,96 +0,0 @@
package nu.marginalia.executor.svc;
import com.google.inject.Inject;
import nu.marginalia.WmsaHome;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.ExecutorActorControlService;
import nu.marginalia.actor.task.ConvertActor;
import nu.marginalia.actor.task.DownloadSampleActor;
import nu.marginalia.executor.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Comparator;
public class SideloadService {
private final ExecutorActorControlService actorControlService;
private static final Logger logger = LoggerFactory.getLogger(SideloadService.class);
@Inject
public SideloadService(ExecutorActorControlService actorControlService) {
this.actorControlService = actorControlService;
}
public void sideloadDirtree(RpcSideloadDirtree request) throws Exception {
actorControlService.startFrom(ExecutorActor.CONVERT,
new ConvertActor.ConvertDirtree(request.getSourcePath())
);
}
public void sideloadReddit(RpcSideloadReddit request) throws Exception {
actorControlService.startFrom(ExecutorActor.CONVERT,
new ConvertActor.ConvertReddit(request.getSourcePath())
);
}
public void sideloadWarc(RpcSideloadWarc request) throws Exception {
actorControlService.startFrom(ExecutorActor.CONVERT,
new ConvertActor.ConvertWarc(request.getSourcePath())
);
}
public void sideloadEncyclopedia(RpcSideloadEncyclopedia request) throws Exception {
actorControlService.startFrom(ExecutorActor.CONVERT,
new ConvertActor.ConvertEncyclopedia(
request.getSourcePath(),
request.getBaseUrl()
));
}
public void sideloadStackexchange(RpcSideloadStackexchange request) throws Exception {
actorControlService.startFrom(ExecutorActor.CONVERT,
new ConvertActor.ConvertStackexchange(request.getSourcePath())
);
}
public RpcUploadDirContents listUploadDir() throws IOException {
Path uploadDir = WmsaHome.getUploadDir();
try (var items = Files.list(uploadDir).sorted(
Comparator.comparing((Path d) -> Files.isDirectory(d)).reversed()
.thenComparing(path -> path.getFileName().toString())
)) {
var builder = RpcUploadDirContents.newBuilder().setPath(uploadDir.toString());
var iter = items.iterator();
while (iter.hasNext()) {
var path = iter.next();
boolean isDir = Files.isDirectory(path);
long size = isDir ? 0 : Files.size(path);
var mtime = Files.getLastModifiedTime(path);
builder.addEntriesBuilder()
.setName(path.toString())
.setIsDirectory(isDir)
.setLastModifiedTime(
LocalDateTime.ofInstant(mtime.toInstant(), ZoneId.systemDefault()).format(DateTimeFormatter.ISO_DATE_TIME))
.setSize(size);
}
return builder.build();
}
}
public void downloadSampleData(RpcDownloadSampleData request) throws Exception {
String sampleSet = request.getSampleSet();
actorControlService.startFrom(ExecutorActor.DOWNLOAD_SAMPLE, new DownloadSampleActor.Run(sampleSet));
}
}

View File

@@ -1,97 +0,0 @@
package nu.marginalia.executor.svc;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import lombok.SneakyThrows;
import nu.marginalia.executor.api.RpcFileStorageContent;
import nu.marginalia.executor.api.RpcFileStorageEntry;
import nu.marginalia.executor.api.RpcFileStorageId;
import nu.marginalia.executor.client.ExecutorClient;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.Request;
import spark.Response;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Comparator;
public class TransferService {
private final Gson gson;
private final FileStorageService fileStorageService;
private final HikariDataSource dataSource;
private final ExecutorClient executorClient;
private final MqPersistence persistence;
private final String executorServiceName;
private final int nodeId;
private static final Logger logger = LoggerFactory.getLogger(TransferService.class);
@Inject
public TransferService(
Gson gson,
FileStorageService fileStorageService,
HikariDataSource dataSource,
ExecutorClient executorClient, MqPersistence persistence, ServiceConfiguration config)
{
this.gson = gson;
this.fileStorageService = fileStorageService;
this.dataSource = dataSource;
this.executorClient = executorClient;
this.persistence = persistence;
this.nodeId = config.node();
this.executorServiceName = config.serviceName();
}
public Object transferFile(Request request, Response response) throws SQLException, IOException {
FileStorageId fileStorageId = FileStorageId.parse(request.params("fid"));
var fileStorage = fileStorageService.getStorage(fileStorageId);
Path basePath = fileStorage.asPath();
// This is not a public API so injection isn't a concern
Path filePath = basePath.resolve(request.queryParams("path"));
response.type("application/octet-stream");
FileUtils.copyFile(filePath.toFile(), response.raw().getOutputStream());
return "";
}
public RpcFileStorageContent listFiles(RpcFileStorageId request) throws SQLException, IOException {
FileStorageId fileStorageId = FileStorageId.of(request.getFileStorageId());
var storage = fileStorageService.getStorage(fileStorageId);
var builder = RpcFileStorageContent.newBuilder();
try (var fs = Files.list(storage.asPath())) {
fs.filter(Files::isRegularFile)
.map(this::createFileModel)
.sorted(Comparator.comparing(RpcFileStorageEntry::getName))
.forEach(builder::addEntries);
}
return builder.build();
}
@SneakyThrows
private RpcFileStorageEntry createFileModel(Path path) {
return RpcFileStorageEntry.newBuilder()
.setName(path.toFile().getName())
.setSize(Files.size(path))
.setLastModifiedTime(Files.getLastModifiedTime(path).toInstant().toString())
.build();
}
public record TransferReq(int sourceNode, int count) { }
}

View File

@@ -24,6 +24,9 @@ include 'code:functions:domain-links:api'
include 'code:functions:search-query'
include 'code:functions:search-query:api'
include 'code:execution'
include 'code:execution:api'
include 'code:index'
include 'code:index:api'
include 'code:index:index-journal'
@@ -66,7 +69,6 @@ include 'code:features-crawl:link-parser'
include 'code:features-crawl:content-type'
include 'code:process-mqapi'
include 'code:api:executor-api'
include 'code:common:service-discovery'
include 'code:common:db'