mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-06 17:32:39 +02:00
Compare commits
1 Commits
deploy-026
...
deploy-026
Author | SHA1 | Date | |
---|---|---|---|
|
6f1659ecb2 |
@@ -9,6 +9,7 @@ import nu.marginalia.executor.storage.FileStorageFile;
|
|||||||
import nu.marginalia.executor.upload.UploadDirContents;
|
import nu.marginalia.executor.upload.UploadDirContents;
|
||||||
import nu.marginalia.executor.upload.UploadDirItem;
|
import nu.marginalia.executor.upload.UploadDirItem;
|
||||||
import nu.marginalia.functions.execution.api.*;
|
import nu.marginalia.functions.execution.api.*;
|
||||||
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
import nu.marginalia.service.ServiceId;
|
import nu.marginalia.service.ServiceId;
|
||||||
import nu.marginalia.service.client.GrpcChannelPoolFactory;
|
import nu.marginalia.service.client.GrpcChannelPoolFactory;
|
||||||
import nu.marginalia.service.client.GrpcMultiNodeChannelPool;
|
import nu.marginalia.service.client.GrpcMultiNodeChannelPool;
|
||||||
@@ -25,27 +26,37 @@ import java.net.URISyntaxException;
|
|||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static nu.marginalia.functions.execution.api.ExecutorApiGrpc.ExecutorApiBlockingStub;
|
import static nu.marginalia.functions.execution.api.ExecutorApiGrpc.ExecutorApiBlockingStub;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
public class ExecutorClient {
|
public class ExecutorClient {
|
||||||
|
private final MqPersistence persistence;
|
||||||
private final GrpcMultiNodeChannelPool<ExecutorApiBlockingStub> channelPool;
|
private final GrpcMultiNodeChannelPool<ExecutorApiBlockingStub> channelPool;
|
||||||
private static final Logger logger = LoggerFactory.getLogger(ExecutorClient.class);
|
private static final Logger logger = LoggerFactory.getLogger(ExecutorClient.class);
|
||||||
private final ServiceRegistryIf registry;
|
private final ServiceRegistryIf registry;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public ExecutorClient(ServiceRegistryIf registry,
|
public ExecutorClient(ServiceRegistryIf registry,
|
||||||
|
MqPersistence persistence,
|
||||||
GrpcChannelPoolFactory grpcChannelPoolFactory)
|
GrpcChannelPoolFactory grpcChannelPoolFactory)
|
||||||
{
|
{
|
||||||
this.registry = registry;
|
this.registry = registry;
|
||||||
|
this.persistence = persistence;
|
||||||
this.channelPool = grpcChannelPoolFactory
|
this.channelPool = grpcChannelPoolFactory
|
||||||
.createMulti(
|
.createMulti(
|
||||||
ServiceKey.forGrpcApi(ExecutorApiGrpc.class, ServicePartition.multi()),
|
ServiceKey.forGrpcApi(ExecutorApiGrpc.class, ServicePartition.multi()),
|
||||||
ExecutorApiGrpc::newBlockingStub);
|
ExecutorApiGrpc::newBlockingStub);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private long createTrackingTokenMsg(String task, int node, Duration ttl) throws Exception {
|
||||||
|
return persistence.sendNewMessage("task-tracking[" + node + "]", "export-client", null, task, "", ttl);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public void startFsm(int node, String actorName) {
|
public void startFsm(int node, String actorName) {
|
||||||
channelPool.call(ExecutorApiBlockingStub::startFsm)
|
channelPool.call(ExecutorApiBlockingStub::startFsm)
|
||||||
.forNode(node)
|
.forNode(node)
|
||||||
@@ -96,6 +107,16 @@ public class ExecutorClient {
|
|||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long updateNsfwFilters() throws Exception {
|
||||||
|
long msgId = createTrackingTokenMsg("nsfw-filters", 1, Duration.ofHours(6));
|
||||||
|
|
||||||
|
channelPool.call(ExecutorApiBlockingStub::updateNsfwFilters)
|
||||||
|
.forNode(1)
|
||||||
|
.run(RpcUpdateNsfwFilters.newBuilder().setMsgId(msgId).build());
|
||||||
|
|
||||||
|
return msgId;
|
||||||
|
}
|
||||||
|
|
||||||
public ActorRunStates getActorStates(int node) {
|
public ActorRunStates getActorStates(int node) {
|
||||||
try {
|
try {
|
||||||
var rs = channelPool.call(ExecutorApiBlockingStub::getActorStates)
|
var rs = channelPool.call(ExecutorApiBlockingStub::getActorStates)
|
||||||
|
@@ -18,6 +18,8 @@ service ExecutorApi {
|
|||||||
rpc calculateAdjacencies(Empty) returns (Empty) {}
|
rpc calculateAdjacencies(Empty) returns (Empty) {}
|
||||||
rpc restoreBackup(RpcFileStorageId) returns (Empty) {}
|
rpc restoreBackup(RpcFileStorageId) returns (Empty) {}
|
||||||
|
|
||||||
|
rpc updateNsfwFilters(RpcUpdateNsfwFilters) returns (Empty) {}
|
||||||
|
|
||||||
rpc restartExecutorService(Empty) returns (Empty) {}
|
rpc restartExecutorService(Empty) returns (Empty) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -66,6 +68,9 @@ message RpcExportRequest {
|
|||||||
int64 fileStorageId = 1;
|
int64 fileStorageId = 1;
|
||||||
int64 msgId = 2;
|
int64 msgId = 2;
|
||||||
}
|
}
|
||||||
|
message RpcUpdateNsfwFilters {
|
||||||
|
int64 msgId = 1;
|
||||||
|
}
|
||||||
message RpcFileStorageIdWithDomainName {
|
message RpcFileStorageIdWithDomainName {
|
||||||
int64 fileStorageId = 1;
|
int64 fileStorageId = 1;
|
||||||
string targetDomainName = 2;
|
string targetDomainName = 2;
|
||||||
|
@@ -6,7 +6,7 @@ import java.util.Set;
|
|||||||
|
|
||||||
public enum ExecutorActor {
|
public enum ExecutorActor {
|
||||||
PREC_EXPORT_ALL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
PREC_EXPORT_ALL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||||
SYNC_NSFW_LISTS(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
UPDATE_NSFW_LISTS(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.SIDELOAD, NodeProfile.REALTIME),
|
||||||
|
|
||||||
CRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
CRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||||
RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||||
|
@@ -113,7 +113,7 @@ public class ExecutorActorControlService {
|
|||||||
register(ExecutorActor.UPDATE_RSS, updateRssActor);
|
register(ExecutorActor.UPDATE_RSS, updateRssActor);
|
||||||
|
|
||||||
register(ExecutorActor.MIGRATE_CRAWL_DATA, migrateCrawlDataActor);
|
register(ExecutorActor.MIGRATE_CRAWL_DATA, migrateCrawlDataActor);
|
||||||
register(ExecutorActor.SYNC_NSFW_LISTS, updateNsfwFiltersActor);
|
register(ExecutorActor.UPDATE_NSFW_LISTS, updateNsfwFiltersActor);
|
||||||
|
|
||||||
if (serviceConfiguration.node() == 1) {
|
if (serviceConfiguration.node() == 1) {
|
||||||
register(ExecutorActor.PREC_EXPORT_ALL, exportAllPrecessionActor);
|
register(ExecutorActor.PREC_EXPORT_ALL, exportAllPrecessionActor);
|
||||||
|
@@ -25,6 +25,10 @@ import java.util.concurrent.Executors;
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
|
||||||
|
// Unlike other monitor actors, the ping monitor will not merely wait for a request
|
||||||
|
// to be sent, but send one itself, hence we can't extend AbstractProcessSpawnerActor
|
||||||
|
// but have to reimplement a lot of the same logic ourselves.
|
||||||
@Singleton
|
@Singleton
|
||||||
public class PingMonitorActor extends RecordActorPrototype {
|
public class PingMonitorActor extends RecordActorPrototype {
|
||||||
|
|
||||||
@@ -53,7 +57,6 @@ public class PingMonitorActor extends RecordActorPrototype {
|
|||||||
return switch (self) {
|
return switch (self) {
|
||||||
case Initial i -> {
|
case Initial i -> {
|
||||||
PingRequest request = new PingRequest();
|
PingRequest request = new PingRequest();
|
||||||
|
|
||||||
persistence.sendNewMessage(inboxName, null, null,
|
persistence.sendNewMessage(inboxName, null, null,
|
||||||
"PingRequest",
|
"PingRequest",
|
||||||
gson.toJson(request),
|
gson.toJson(request),
|
||||||
|
@@ -44,7 +44,6 @@ public class LiveCrawlActor extends RecordActorPrototype {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ActorStep transition(ActorStep self) throws Exception {
|
public ActorStep transition(ActorStep self) throws Exception {
|
||||||
logger.info("{}", self);
|
|
||||||
return switch (self) {
|
return switch (self) {
|
||||||
case Initial() -> {
|
case Initial() -> {
|
||||||
yield new Monitor("-");
|
yield new Monitor("-");
|
||||||
|
@@ -5,6 +5,8 @@ import com.google.inject.Inject;
|
|||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import nu.marginalia.actor.prototype.RecordActorPrototype;
|
import nu.marginalia.actor.prototype.RecordActorPrototype;
|
||||||
import nu.marginalia.actor.state.ActorStep;
|
import nu.marginalia.actor.state.ActorStep;
|
||||||
|
import nu.marginalia.mq.MqMessageState;
|
||||||
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
import nu.marginalia.nsfw.NsfwDomainFilter;
|
import nu.marginalia.nsfw.NsfwDomainFilter;
|
||||||
import nu.marginalia.service.module.ServiceConfiguration;
|
import nu.marginalia.service.module.ServiceConfiguration;
|
||||||
|
|
||||||
@@ -12,23 +14,26 @@ import nu.marginalia.service.module.ServiceConfiguration;
|
|||||||
public class UpdateNsfwFiltersActor extends RecordActorPrototype {
|
public class UpdateNsfwFiltersActor extends RecordActorPrototype {
|
||||||
private final ServiceConfiguration serviceConfiguration;
|
private final ServiceConfiguration serviceConfiguration;
|
||||||
private final NsfwDomainFilter nsfwDomainFilter;
|
private final NsfwDomainFilter nsfwDomainFilter;
|
||||||
|
private final MqPersistence persistence;
|
||||||
|
|
||||||
public record Initial() implements ActorStep {}
|
public record Initial(long respondMsgId) implements ActorStep {}
|
||||||
public record Run() implements ActorStep {}
|
public record Run(long respondMsgId) implements ActorStep {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ActorStep transition(ActorStep self) throws Exception {
|
public ActorStep transition(ActorStep self) throws Exception {
|
||||||
return switch(self) {
|
return switch(self) {
|
||||||
case Initial() -> {
|
case Initial(long respondMsgId) -> {
|
||||||
if (serviceConfiguration.node() != 1) {
|
if (serviceConfiguration.node() != 1) {
|
||||||
|
persistence.updateMessageState(respondMsgId, MqMessageState.ERR);
|
||||||
yield new Error("This actor can only run on node 1");
|
yield new Error("This actor can only run on node 1");
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
yield new Run();
|
yield new Run(respondMsgId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case Run() -> {
|
case Run(long respondMsgId) -> {
|
||||||
nsfwDomainFilter.fetchLists();
|
nsfwDomainFilter.fetchLists();
|
||||||
|
persistence.updateMessageState(respondMsgId, MqMessageState.OK);
|
||||||
yield new End();
|
yield new End();
|
||||||
}
|
}
|
||||||
default -> new Error();
|
default -> new Error();
|
||||||
@@ -43,11 +48,13 @@ public class UpdateNsfwFiltersActor extends RecordActorPrototype {
|
|||||||
@Inject
|
@Inject
|
||||||
public UpdateNsfwFiltersActor(Gson gson,
|
public UpdateNsfwFiltersActor(Gson gson,
|
||||||
ServiceConfiguration serviceConfiguration,
|
ServiceConfiguration serviceConfiguration,
|
||||||
NsfwDomainFilter nsfwDomainFilter)
|
NsfwDomainFilter nsfwDomainFilter,
|
||||||
|
MqPersistence persistence)
|
||||||
{
|
{
|
||||||
super(gson);
|
super(gson);
|
||||||
this.serviceConfiguration = serviceConfiguration;
|
this.serviceConfiguration = serviceConfiguration;
|
||||||
this.nsfwDomainFilter = nsfwDomainFilter;
|
this.nsfwDomainFilter = nsfwDomainFilter;
|
||||||
|
this.persistence = persistence;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@@ -10,6 +10,7 @@ import nu.marginalia.actor.state.ActorStateInstance;
|
|||||||
import nu.marginalia.actor.task.DownloadSampleActor;
|
import nu.marginalia.actor.task.DownloadSampleActor;
|
||||||
import nu.marginalia.actor.task.RestoreBackupActor;
|
import nu.marginalia.actor.task.RestoreBackupActor;
|
||||||
import nu.marginalia.actor.task.TriggerAdjacencyCalculationActor;
|
import nu.marginalia.actor.task.TriggerAdjacencyCalculationActor;
|
||||||
|
import nu.marginalia.actor.task.UpdateNsfwFiltersActor;
|
||||||
import nu.marginalia.functions.execution.api.*;
|
import nu.marginalia.functions.execution.api.*;
|
||||||
import nu.marginalia.service.module.ServiceConfiguration;
|
import nu.marginalia.service.module.ServiceConfiguration;
|
||||||
import nu.marginalia.service.server.DiscoverableService;
|
import nu.marginalia.service.server.DiscoverableService;
|
||||||
@@ -263,4 +264,19 @@ public class ExecutorGrpcService
|
|||||||
System.exit(0);
|
System.exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateNsfwFilters(RpcUpdateNsfwFilters request, StreamObserver<Empty> responseObserver) {
|
||||||
|
logger.info("Got request {}", request);
|
||||||
|
try {
|
||||||
|
actorControlService.startFrom(ExecutorActor.UPDATE_NSFW_LISTS,
|
||||||
|
new UpdateNsfwFiltersActor.Initial(request.getMsgId()));
|
||||||
|
|
||||||
|
responseObserver.onNext(Empty.getDefaultInstance());
|
||||||
|
responseObserver.onCompleted();
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
logger.error("Failed to update nsfw filters", e);
|
||||||
|
responseObserver.onError(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -74,6 +74,8 @@ public class ControlSysActionsService {
|
|||||||
Spark.post("/actions/recrawl-all", this::recrawlAll, Redirects.redirectToOverview);
|
Spark.post("/actions/recrawl-all", this::recrawlAll, Redirects.redirectToOverview);
|
||||||
Spark.post("/actions/flush-api-caches", this::flushApiCaches, Redirects.redirectToOverview);
|
Spark.post("/actions/flush-api-caches", this::flushApiCaches, Redirects.redirectToOverview);
|
||||||
Spark.post("/actions/reload-blogs-list", this::reloadBlogsList, Redirects.redirectToOverview);
|
Spark.post("/actions/reload-blogs-list", this::reloadBlogsList, Redirects.redirectToOverview);
|
||||||
|
|
||||||
|
Spark.post("/actions/update-nsfw-filters", this::updateNsfwFilters, Redirects.redirectToOverview);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
@@ -132,6 +134,14 @@ public class ControlSysActionsService {
|
|||||||
return "";
|
return "";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Object updateNsfwFilters(Request request, Response response) throws Exception {
|
||||||
|
eventLog.logEvent("USER-ACTION", "UPDATE-NSFW-FILTERS");
|
||||||
|
|
||||||
|
executorClient.updateNsfwFilters();
|
||||||
|
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
public Object flushApiCaches(Request request, Response response) throws Exception {
|
public Object flushApiCaches(Request request, Response response) throws Exception {
|
||||||
eventLog.logEvent("USER-ACTION", "FLUSH-API-CACHES");
|
eventLog.logEvent("USER-ACTION", "FLUSH-API-CACHES");
|
||||||
apiOutbox.sendNotice("FLUSH_CACHES", "");
|
apiOutbox.sendNotice("FLUSH_CACHES", "");
|
||||||
|
@@ -53,6 +53,31 @@
|
|||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
|
<div class="accordion-item">
|
||||||
|
<h2 class="accordion-header">
|
||||||
|
<button class="accordion-button collapsed"
|
||||||
|
type="button"
|
||||||
|
data-bs-toggle="collapse"
|
||||||
|
data-bs-target="#collapseNsfwFilters"
|
||||||
|
aria-expanded="false"
|
||||||
|
aria-controls="collapseNsfwFilters">
|
||||||
|
Update NSFW Filters Definitions
|
||||||
|
</button>
|
||||||
|
</h2>
|
||||||
|
<div id="collapseNsfwFilters" class="accordion-collapse collapse p-3" data-bs-parent="#accordionActions">
|
||||||
|
<div class="mb-3">
|
||||||
|
This will fetch NSFW filter definitions.
|
||||||
|
</div>
|
||||||
|
<form method="post" action="actions/update-nsfw-filters">
|
||||||
|
<button
|
||||||
|
class="btn btn-primary me-md-2"
|
||||||
|
onclick="return confirm('Confirm update NSFW filters');"
|
||||||
|
type="submit">
|
||||||
|
Update NSFW Filter</button>
|
||||||
|
</form>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
<div class="accordion-item">
|
<div class="accordion-item">
|
||||||
<h2 class="accordion-header">
|
<h2 class="accordion-header">
|
||||||
<button class="accordion-button collapsed"
|
<button class="accordion-button collapsed"
|
||||||
|
Reference in New Issue
Block a user