1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-06 07:32:38 +02:00

Compare commits

...

58 Commits

Author SHA1 Message Date
Viktor Lofgren
233f0acfb1 (index) Further reduce query buffer size 2025-07-27 17:13:08 +02:00
Viktor Lofgren
e3a4ff02e9 (index) Abandon ongoing evaluation tasks if time is up 2025-07-27 17:04:01 +02:00
Viktor Lofgren
c786283ae1 (index) Reduce quer buffer size 2025-07-27 16:57:55 +02:00
Viktor Lofgren
a3f65ac0e0 (deploy) Trigger index deployment 2025-07-27 16:50:23 +02:00
Viktor
aba1a32af0 Merge pull request #217 from MarginaliaSearch/uncompressed-spans-file
Index optimizations
2025-07-27 16:49:27 +02:00
Viktor Lofgren
c9c442345b (perf) Change execution test to use processing rate instead of count 2025-07-27 16:39:51 +02:00
Viktor Lofgren
2e126ba30e (perf) Change execution test to use processing rate instead of count 2025-07-27 16:37:20 +02:00
Viktor Lofgren
2087985f49 (index) Implement work stealing in IndexQueryExecution as a better approach to backpressure 2025-07-27 16:29:57 +02:00
Viktor Lofgren
2b13ebd18b (index) Tweak evaluation backlog handling 2025-07-27 16:08:16 +02:00
Viktor Lofgren
6d92c125fe (perf) Fix perf test 2025-07-27 15:50:28 +02:00
Viktor Lofgren
f638cfa39a (index) Avoid possibility of negative timeout 2025-07-27 15:39:12 +02:00
Viktor Lofgren
89447c12af (index) Avoid possibility of negative timeout 2025-07-27 15:24:47 +02:00
Viktor Lofgren
c71fc46f04 (perf) Update perf test with execution scenario 2025-07-27 15:22:07 +02:00
Viktor Lofgren
f96874d828 (sequence) Implement a largestValue abort condition for minDistance()
This is something like 3500% faster in certain common scenarios
2025-07-27 15:05:50 +02:00
Viktor Lofgren
583a84d5a0 (index) Clean up of the index query execution logic 2025-07-27 15:05:50 +02:00
Viktor Lofgren
f65b946448 (index) Clean up code 2025-07-27 15:05:50 +02:00
Viktor Lofgren
3682815855 (index) Optimize sequence intersection for the n=1 case 2025-07-26 19:14:32 +02:00
Viktor Lofgren
3a94357660 (index) Perf test tool (WIP!) 2025-07-26 11:49:33 +02:00
Viktor Lofgren
673b0d3de1 (index) Perf test tool (WIP!) 2025-07-26 11:49:31 +02:00
Viktor Lofgren
ea942bc664 (spans) Add signature to the footer of the spans file, including a version byte so we can detect whether ot use the old or new decoding logic 2025-07-25 12:07:18 +02:00
Viktor Lofgren
7ed5083c54 (index) Don't split results into chunks 2025-07-25 11:45:07 +02:00
Viktor Lofgren
08bb2c097b (refac) Clean up the data model used in the index service 2025-07-25 10:54:07 +02:00
Viktor Lofgren
495fb325be (sequence) Correct sequence intersection bug introduced in optimizations 2025-07-25 10:48:33 +02:00
Viktor Lofgren
05c25bbaec (chore) Clean up 2025-07-24 23:43:27 +02:00
Viktor Lofgren
2a028b84f3 (chore) Clean up 2025-07-24 20:12:56 +02:00
Viktor Lofgren
a091a23623 (ranking) Remove unnecessary metadata retrievals 2025-07-24 20:08:09 +02:00
Viktor Lofgren
e8897acb45 (ranking) Remove unnecessary metadata retrievals 2025-07-24 20:05:39 +02:00
Viktor Lofgren
b89ffcf2be (index) Evaluate hash based idx mapping in ForwardIndexReader 2025-07-24 19:47:27 +02:00
Viktor Lofgren
dbcc9055b0 (index) Evaluate using MinMaxPriorityQueue as guts of ResultPriorityQueue 2025-07-24 19:31:51 +02:00
Viktor Lofgren
d9740557f4 (sequence) Optimize intersection logic with a fast abort condition 2025-07-24 19:04:10 +02:00
Viktor Lofgren
0d6cd015fd (index) Evaluate reading all spans at once 2025-07-24 18:34:11 +02:00
Viktor Lofgren
c6034efcc8 (index) Cache value of bitset cardinality for speed 2025-07-24 17:24:55 +02:00
Viktor Lofgren
76068014ad (index) More spans optimizations 2025-07-24 15:03:43 +02:00
Viktor Lofgren
1c3ed67127 (index) Byte align document spans 2025-07-24 14:06:14 +02:00
Viktor Lofgren
fc0cb6bd9a (index) Reserve a larger size for IntArrayList in SeqenceOperations.findIntersections 2025-07-24 14:03:44 +02:00
Viktor Lofgren
c2601bac78 (converter) Remove unnecessary allocation of a 16 KB byte buffer 2025-07-24 13:25:37 +02:00
Viktor Lofgren
f5641b72e9 (index) Fix broken test 2025-07-24 13:21:05 +02:00
Viktor Lofgren
36efe2e219 (index) Optimize PositionsFileReader for concurrent reads
In benchmarks this is roughly twice as fast as the previous approach.  Main caveat being we need multiple file descriptors to avoid read instruction serialization by the kernel.  This is undesirable since the reads are complete scattershot and can't be reordered by the kernel in a way that optimizes anything.
2025-07-24 13:20:54 +02:00
Viktor Lofgren
983fe3829e (spans) Evaluate uncompressed spans files
Span decompression appears to be somewhat of a performance bottleneck.  This change removes compression of the spans file.  The spans are still compressed in transit between the converter and index constructor at this stage.  The change is intentionally kept small to just evaluate the performance implications, change in file sizes, etc.
2025-07-23 18:10:41 +02:00
Viktor Lofgren
668c87aa86 (ssr) Drop Executor from SSR as it no longer exists 2025-07-23 13:55:41 +02:00
Viktor Lofgren
9d3f9adb05 Force redeploy of everything 2025-07-23 13:36:02 +02:00
Viktor
a43a1773f1 Merge pull request #216 from MarginaliaSearch/deprecate-executor
Architecture: Remove the separate executor service and roll it into the index service.
2025-07-23 13:32:42 +02:00
Viktor Lofgren
1e7a3a3c4f (docs) Update docs to reflect the change 2025-07-23 13:18:23 +02:00
Viktor Lofgren
62b696b1c3 (architecture) Remove the separate executor service and merge it into the index service
The primary motivation for this is that in production, the large number of partitioned services has lead to an intermittent exhaustion of available database connections, as each service has a connection pool.

The decision to have a separate executor service dates back from when the index service was very slow to start, and the executor didn't always spin off its memory-hungry tasks into separate processes, which meant the executor would sometimes OOM and crash, and it was undesirable to bring the index down with it.
2025-07-23 12:57:13 +02:00
Viktor Lofgren
f1a900f383 (search) Clean up front page mobile design a bit 2025-07-23 12:20:40 +02:00
Viktor Lofgren
700364b86d (sample) Remove debug logging
The problem sat in the desk chair all along
2025-07-21 15:08:20 +02:00
Viktor Lofgren
7e725ddaed (sample) Remove debug logging
The problem sat in the desk chair all along
2025-07-21 14:41:59 +02:00
Viktor Lofgren
120209e138 (sample) Diagnosing compression errors 2025-07-21 14:34:08 +02:00
Viktor Lofgren
a771a5b6ce (sample) Test different approach to decoding 2025-07-21 14:19:01 +02:00
Viktor Lofgren
dac5b54128 (sample) Better logging for sample errors 2025-07-21 14:03:58 +02:00
Viktor Lofgren
6cfb143c15 (sample) Compress sample HTML data and introduce new API for only getting requests 2025-07-21 13:55:25 +02:00
Viktor Lofgren
23c818281b (converter) Reduce DomSample logging for NOT_FOUND 2025-07-21 13:37:55 +02:00
Viktor Lofgren
8aad253cf6 (converter) Add more logging around dom sample data retrieval errors 2025-07-21 13:26:38 +02:00
Viktor Lofgren
556d7af9dc Reapply "(grpc) Use grpc-netty instead of grpc-netty-shaded"
This reverts commit b7a5219ed3.
2025-07-21 13:23:32 +02:00
Viktor Lofgren
b7a5219ed3 Revert "(grpc) Use grpc-netty instead of grpc-netty-shaded"
Reverting this change to see if it's the cause of some instability issues observed.
2025-07-21 13:10:41 +02:00
Viktor Lofgren
a23ec521fe (converter) Ensure features is mutable on DetailsWithWords as this is assumed later 2025-07-21 12:50:04 +02:00
Viktor Lofgren
fff3babc6d (classier) Add rule for */pixel.gif as likely tracking pixels 2025-07-21 12:35:57 +02:00
Viktor Lofgren
b2bfb8217c (special) Trigger CD run 2025-07-21 12:28:24 +02:00
88 changed files with 1438 additions and 972 deletions

View File

@@ -105,8 +105,6 @@ public enum HtmlFeature {
}
public int getFeatureBit() {
if (getClass().desiredAssertionStatus() && ordinal() >= 32)
throw new IllegalStateException("Attempting to extract feature bit of " + name() + ", with ordinal " + ordinal());
return (1<< ordinal());
}
}

View File

@@ -7,7 +7,6 @@ public enum ServiceId {
Search("search-service"),
Index("index-service"),
Query("query-service"),
Executor("executor-service"),
Control("control-service"),

View File

@@ -189,7 +189,7 @@ public class ExecutorClient {
String uriPath = "/transfer/file/" + fileStorage.id();
String uriQuery = "path=" + URLEncoder.encode(path, StandardCharsets.UTF_8);
var endpoints = registry.getEndpoints(ServiceKey.forRest(ServiceId.Executor, fileStorage.node()));
var endpoints = registry.getEndpoints(ServiceKey.forRest(ServiceId.Index, fileStorage.node()));
if (endpoints.isEmpty()) {
throw new RuntimeException("No endpoints for node " + fileStorage.node());
}

View File

@@ -1,4 +1,4 @@
package nu.marginalia.executor;
package nu.marginalia.svc;
import com.google.inject.Inject;
import nu.marginalia.storage.FileStorageService;

View File

@@ -1,5 +1,5 @@
The execution subsystem is responsible for the execution of long running tasks on each
index node. It lives in the [executor-service](../services-core/executor-service) module.
index node. It lives in the [index-service](../services-core/index-service) module.
It accomplishes this using the [message queue and actor library](../libraries/message-queue/),
which permits program state to survive crashes and reboots.

View File

@@ -1,4 +1,4 @@
package nu.marginalia.executor;
package nu.marginalia.svc;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorage;

View File

@@ -41,7 +41,22 @@ public class DomSampleClient {
}
catch (StatusRuntimeException sre) {
if (sre.getStatus() != Status.NOT_FOUND) {
logger.error("Failed to fetch DOM sample");
logger.error("Failed to fetch DOM sample", sre);
}
return Optional.empty();
}
}
public Optional<RpcDomainSampleRequests> getSampleRequests(String domainName) {
try {
var val = channelPool.call(DomSampleApiGrpc.DomSampleApiBlockingStub::getSampleRequests)
.run(RpcDomainName.newBuilder().setDomainName(domainName).build());
return Optional.of(val);
}
catch (StatusRuntimeException sre) {
if (sre.getStatus() != Status.NOT_FOUND) {
logger.error("Failed to fetch DOM sample", sre);
}
return Optional.empty();
}

View File

@@ -7,6 +7,7 @@ option java_multiple_files=true;
service DomSampleApi {
rpc getSample(RpcDomainName) returns (RpcDomainSample) {}
rpc getSampleRequests(RpcDomainName) returns (RpcDomainSampleRequests) {}
rpc hasSample(RpcDomainName) returns (RpcBooleanRsp) {}
rpc getAllSamples(RpcDomainName) returns (stream RpcDomainSample) {}
}
@@ -19,10 +20,16 @@ message RpcBooleanRsp {
bool answer = 1;
}
message RpcDomainSampleRequests {
string domainName = 1;
string url = 2;
repeated RpcOutgoingRequest outgoingRequests = 5;
}
message RpcDomainSample {
string domainName = 1;
string url = 2;
string htmlSample = 3;
bytes htmlSampleZstd = 3;
bool accepted_popover = 4;
repeated RpcOutgoingRequest outgoingRequests = 5;
}

View File

@@ -31,6 +31,7 @@ dependencies {
implementation libs.jsoup
implementation libs.opencsv
implementation libs.slop
implementation libs.zstd
implementation libs.sqlite
implementation libs.bundles.slf4j
implementation libs.commons.lang3

View File

@@ -1,6 +1,8 @@
package nu.marginalia.domsample;
import com.github.luben.zstd.Zstd;
import com.google.inject.Inject;
import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import nu.marginalia.api.domsample.*;
@@ -9,6 +11,7 @@ import nu.marginalia.service.server.DiscoverableService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class DomSampleGrpcService
@@ -42,7 +45,36 @@ public class DomSampleGrpcService
}
// Grab the first sample
RpcDomainSample.Builder response = convert(dbRecords.getFirst());
RpcDomainSample.Builder response = convertFullSample(dbRecords.getFirst());
responseObserver.onNext(response.build());
responseObserver.onCompleted();
}
catch (Exception e) {
logger.error("Error in getSample()", e);
responseObserver.onError(Status.INTERNAL.withCause(e).asRuntimeException());
}
}
@Override
public void getSampleRequests(RpcDomainName request, StreamObserver<RpcDomainSampleRequests> responseObserver) {
String domainName = request.getDomainName();
if (domainName.isBlank()) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("Invalid domain name")
.asRuntimeException());
return;
}
try {
List<DomSampleDb.Sample> dbRecords = domSampleDb.getSamples(domainName);
if (dbRecords.isEmpty()) {
responseObserver.onError(Status.NOT_FOUND.withDescription("No sample found").asRuntimeException());
return;
}
// Grab the first sample
RpcDomainSampleRequests.Builder response = convertRequestData(dbRecords.getFirst());
responseObserver.onNext(response.build());
responseObserver.onCompleted();
@@ -87,7 +119,7 @@ public class DomSampleGrpcService
List<DomSampleDb.Sample> dbRecords = domSampleDb.getSamples(domainName);
for (var record : dbRecords) {
responseObserver.onNext(convert(record).build());
responseObserver.onNext(convertFullSample(record).build());
}
responseObserver.onCompleted();
@@ -98,12 +130,14 @@ public class DomSampleGrpcService
}
}
private RpcDomainSample.Builder convert(DomSampleDb.Sample dbSample) {
private RpcDomainSample.Builder convertFullSample(DomSampleDb.Sample dbSample) {
ByteString htmlZstd = ByteString.copyFrom(Zstd.compress(dbSample.sample().getBytes(StandardCharsets.UTF_8)));
var sampleBuilder = RpcDomainSample.newBuilder()
.setDomainName(dbSample.domain())
.setAcceptedPopover(dbSample.acceptedPopover())
.setHtmlSample(dbSample.sample());
.setHtmlSampleZstd(htmlZstd);
for (var req : dbSample.parseRequests()) {
sampleBuilder.addOutgoingRequestsBuilder()
@@ -120,4 +154,23 @@ public class DomSampleGrpcService
return sampleBuilder;
}
private RpcDomainSampleRequests.Builder convertRequestData(DomSampleDb.Sample dbSample) {
var sampleBuilder = RpcDomainSampleRequests.newBuilder()
.setDomainName(dbSample.domain());
for (var req : dbSample.parseRequests()) {
sampleBuilder.addOutgoingRequestsBuilder()
.setUrl(req.uri().toString())
.setMethod(switch (req.method().toUpperCase())
{
case "GET" -> RpcOutgoingRequest.RequestMethod.GET;
case "POST" -> RpcOutgoingRequest.RequestMethod.POST;
default -> RpcOutgoingRequest.RequestMethod.OTHER;
})
.setTimestamp(req.timestamp());
}
return sampleBuilder;
}
}

View File

@@ -87,7 +87,7 @@ class FeedFetcherServiceTest extends AbstractModule {
bind(DomainCoordinator.class).to(LocalDomainCoordinator.class);
bind(HikariDataSource.class).toInstance(dataSource);
bind(ServiceRegistryIf.class).toInstance(Mockito.mock(ServiceRegistryIf.class));
bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration(ServiceId.Executor, 1, "", "", 0, UUID.randomUUID()));
bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration(ServiceId.Index, 1, "", "", 0, UUID.randomUUID()));
bind(Integer.class).annotatedWith(Names.named("wmsa-system-node")).toInstance(1);
}

View File

@@ -304,7 +304,6 @@ public class QueryProtobufCodec {
IndexProtobufCodec.convertRpcQuery(specs.getQuery()),
specs.getDomainsList(),
specs.getSearchSetIdentifier(),
specs.getHumanQuery(),
IndexProtobufCodec.convertSpecLimit(specs.getQuality()),
IndexProtobufCodec.convertSpecLimit(specs.getYear()),
IndexProtobufCodec.convertSpecLimit(specs.getSize()),

View File

@@ -18,8 +18,6 @@ public class SearchSpecification {
public String searchSetIdentifier;
public final String humanQuery;
public SpecificationLimit quality;
public SpecificationLimit year;
public SpecificationLimit size;
@@ -35,7 +33,6 @@ public class SearchSpecification {
public SearchSpecification(SearchQuery query,
List<Integer> domains,
String searchSetIdentifier,
String humanQuery,
SpecificationLimit quality,
SpecificationLimit year,
SpecificationLimit size,
@@ -47,7 +44,6 @@ public class SearchSpecification {
this.query = query;
this.domains = domains;
this.searchSetIdentifier = searchSetIdentifier;
this.humanQuery = humanQuery;
this.quality = quality;
this.year = year;
this.size = size;
@@ -73,10 +69,6 @@ public class SearchSpecification {
return this.searchSetIdentifier;
}
public String getHumanQuery() {
return this.humanQuery;
}
public SpecificationLimit getQuality() {
return this.quality;
}
@@ -106,14 +98,13 @@ public class SearchSpecification {
}
public String toString() {
return "SearchSpecification(query=" + this.getQuery() + ", domains=" + this.getDomains() + ", searchSetIdentifier=" + this.getSearchSetIdentifier() + ", humanQuery=" + this.getHumanQuery() + ", quality=" + this.getQuality() + ", year=" + this.getYear() + ", size=" + this.getSize() + ", rank=" + this.getRank() + ", queryLimits=" + this.getQueryLimits() + ", queryStrategy=" + this.getQueryStrategy() + ", rankingParams=" + this.getRankingParams() + ")";
return "SearchSpecification(query=" + this.getQuery() + ", domains=" + this.getDomains() + ", searchSetIdentifier=" + this.getSearchSetIdentifier() + ", quality=" + this.getQuality() + ", year=" + this.getYear() + ", size=" + this.getSize() + ", rank=" + this.getRank() + ", queryLimits=" + this.getQueryLimits() + ", queryStrategy=" + this.getQueryStrategy() + ", rankingParams=" + this.getRankingParams() + ")";
}
public static class SearchSpecificationBuilder {
private SearchQuery query;
private List<Integer> domains;
private String searchSetIdentifier;
private String humanQuery;
private SpecificationLimit quality$value;
private boolean quality$set;
private SpecificationLimit year$value;
@@ -144,11 +135,6 @@ public class SearchSpecification {
return this;
}
public SearchSpecificationBuilder humanQuery(String humanQuery) {
this.humanQuery = humanQuery;
return this;
}
public SearchSpecificationBuilder quality(SpecificationLimit quality) {
this.quality$value = quality;
this.quality$set = true;
@@ -205,11 +191,7 @@ public class SearchSpecification {
if (!this.rank$set) {
rank$value = SpecificationLimit.none();
}
return new SearchSpecification(this.query, this.domains, this.searchSetIdentifier, this.humanQuery, quality$value, year$value, size$value, rank$value, this.queryLimits, this.queryStrategy, this.rankingParams);
}
public String toString() {
return "SearchSpecification.SearchSpecificationBuilder(query=" + this.query + ", domains=" + this.domains + ", searchSetIdentifier=" + this.searchSetIdentifier + ", humanQuery=" + this.humanQuery + ", quality$value=" + this.quality$value + ", year$value=" + this.year$value + ", size$value=" + this.size$value + ", rank$value=" + this.rank$value + ", queryLimits=" + this.queryLimits + ", queryStrategy=" + this.queryStrategy + ", rankingParams=" + this.rankingParams + ")";
return new SearchSpecification(this.query, this.domains, this.searchSetIdentifier, quality$value, year$value, size$value, rank$value, this.queryLimits, this.queryStrategy, this.rankingParams);
}
}
}

View File

@@ -1,56 +0,0 @@
package nu.marginalia.api.searchquery.model.results;
import nu.marginalia.api.searchquery.RpcResultRankingParameters;
import nu.marginalia.api.searchquery.model.compiled.CqDataInt;
import java.util.BitSet;
public class ResultRankingContext {
private final int docCount;
public final RpcResultRankingParameters params;
public final BitSet regularMask;
public final BitSet ngramsMask;
/** CqDataInt associated with frequency information of the terms in the query
* in the full index. The dataset is indexed by the compiled query. */
public final CqDataInt fullCounts;
/** CqDataInt associated with frequency information of the terms in the query
* in the full index. The dataset is indexed by the compiled query. */
public final CqDataInt priorityCounts;
public ResultRankingContext(int docCount,
RpcResultRankingParameters params,
BitSet ngramsMask,
BitSet regularMask,
CqDataInt fullCounts,
CqDataInt prioCounts)
{
this.docCount = docCount;
this.params = params;
this.ngramsMask = ngramsMask;
this.regularMask = regularMask;
this.fullCounts = fullCounts;
this.priorityCounts = prioCounts;
}
public int termFreqDocCount() {
return docCount;
}
@Override
public String toString() {
return "ResultRankingContext{" +
"docCount=" + docCount +
", params=" + params +
", regularMask=" + regularMask +
", ngramsMask=" + ngramsMask +
", fullCounts=" + fullCounts +
", priorityCounts=" + priorityCounts +
'}';
}
}

View File

@@ -34,8 +34,6 @@ public class QueryFactory {
this.queryExpansion = queryExpansion;
}
public ProcessedQuery createQuery(QueryParams params,
@Nullable RpcResultRankingParameters rankingParams) {
final var query = params.humanQuery();
@@ -153,7 +151,6 @@ public class QueryFactory {
var specsBuilder = SearchSpecification.builder()
.query(queryBuilder.build())
.humanQuery(query)
.quality(qualityLimit)
.year(year)
.size(size)

View File

@@ -241,7 +241,6 @@ public class QueryFactoryTest {
Assertions.assertTrue(subquery.query.compiledQuery.contains(" bob "));
Assertions.assertFalse(subquery.query.compiledQuery.contains(" bob's "));
Assertions.assertEquals("\"bob's cars\"", subquery.humanQuery);
}
@Test

View File

@@ -1,9 +1,10 @@
package nu.marginalia.index.forward;
import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
import nu.marginalia.array.LongArray;
import nu.marginalia.array.LongArrayFactory;
import nu.marginalia.index.forward.spans.DocumentSpans;
import nu.marginalia.index.forward.spans.ForwardIndexSpansReader;
import nu.marginalia.index.forward.spans.IndexSpansReader;
import nu.marginalia.model.id.UrlIdCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -22,16 +23,15 @@ import static nu.marginalia.index.forward.ForwardIndexParameters.*;
* and a mapping between document identifiers to the index into the
* data array.
* <p/>
* Since the total data is relatively small, this is kept in memory to
* reduce the amount of disk thrashing.
* <p/>
* The metadata is a binary encoding of {@see nu.marginalia.idx.DocumentMetadata}
*/
public class ForwardIndexReader {
private final LongArray ids;
private final LongArray data;
private final ForwardIndexSpansReader spansReader;
private volatile Long2IntOpenHashMap idsMap;
private final IndexSpansReader spansReader;
private final Logger logger = LoggerFactory.getLogger(getClass());
@@ -64,7 +64,18 @@ public class ForwardIndexReader {
ids = loadIds(idsFile);
data = loadData(dataFile);
spansReader = new ForwardIndexSpansReader(spansFile);
spansReader = IndexSpansReader.open(spansFile);
Thread.ofPlatform().start(this::createIdsMap);
}
private void createIdsMap() {
Long2IntOpenHashMap idsMap = new Long2IntOpenHashMap((int) ids.size());
for (int i = 0; i < ids.size(); i++) {
idsMap.put(ids.get(i), i);
}
this.idsMap = idsMap;
}
private static LongArray loadIds(Path idsFile) throws IOException {
@@ -106,6 +117,10 @@ public class ForwardIndexReader {
private int idxForDoc(long docId) {
assert UrlIdCodec.getRank(docId) == 0 : "Forward Index Reader fed dirty reverse index id";
if (idsMap != null) {
return idsMap.getOrDefault(docId, -1);
}
long offset = ids.binarySearch(docId, 0, ids.size());
if (offset >= ids.size() || offset < 0 || ids.get(offset) != docId) {
@@ -134,6 +149,27 @@ public class ForwardIndexReader {
}
public DocumentSpans[] getDocumentSpans(Arena arena, long[] docIds) {
long[] offsets = new long[docIds.length];
for (int i = 0; i < docIds.length; i++) {
long offset = idxForDoc(docIds[i]);
if (offset >= 0) {
offsets[i] = data.get(ENTRY_SIZE * offset + SPANS_OFFSET);
}
else {
offsets[i] = -1;
}
}
try {
return spansReader.readSpans(arena, offsets);
}
catch (IOException ex) {
logger.error("Failed to read spans for docIds", ex);
return new DocumentSpans[docIds.length];
}
}
public int totalDocCount() {
return (int) ids.size();
}
@@ -141,6 +177,8 @@ public class ForwardIndexReader {
public void close() {
if (data != null)
data.close();
if (ids != null)
ids.close();
}
public boolean isLoaded() {

View File

@@ -5,7 +5,7 @@ import nu.marginalia.array.LongArray;
import nu.marginalia.array.LongArrayFactory;
import nu.marginalia.index.domainrankings.DomainRankings;
import nu.marginalia.index.forward.ForwardIndexParameters;
import nu.marginalia.index.forward.spans.ForwardIndexSpansWriter;
import nu.marginalia.index.forward.spans.IndexSpansWriter;
import nu.marginalia.index.journal.IndexJournal;
import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.model.idx.DocumentMetadata;
@@ -65,7 +65,7 @@ public class ForwardIndexConverter {
logger.info("Domain Rankings size = {}", domainRankings.size());
try (var progress = heartbeat.createProcessTaskHeartbeat(TaskSteps.class, "forwardIndexConverter");
var spansWriter = new ForwardIndexSpansWriter(outputFileSpansData)
var spansWriter = new IndexSpansWriter(outputFileSpansData)
) {
progress.progress(TaskSteps.GET_DOC_IDS);

View File

@@ -11,6 +11,9 @@ public class DocumentSpan {
/** A list of the interlaced start and end positions of each span in the document of this type */
private final IntList startsEnds;
public DocumentSpan(IntList startsEnds) {
this.startsEnds = startsEnds;
}
public DocumentSpan(CodedSequence startsEnds) {
this.startsEnds = startsEnds.values();
}

View File

@@ -1,5 +1,6 @@
package nu.marginalia.index.forward.spans;
import it.unimi.dsi.fastutil.ints.IntList;
import nu.marginalia.language.sentence.tag.HtmlTag;
import nu.marginalia.sequence.CodedSequence;
@@ -39,6 +40,23 @@ public class DocumentSpans {
return EMPTY_SPAN;
}
void accept(byte code, IntList positions) {
if (code == HtmlTag.HEADING.code)
this.heading = new DocumentSpan(positions);
else if (code == HtmlTag.TITLE.code)
this.title = new DocumentSpan(positions);
else if (code == HtmlTag.NAV.code)
this.nav = new DocumentSpan(positions);
else if (code == HtmlTag.CODE.code)
this.code = new DocumentSpan(positions);
else if (code == HtmlTag.ANCHOR.code)
this.anchor = new DocumentSpan(positions);
else if (code == HtmlTag.EXTERNAL_LINKTEXT.code)
this.externalLinkText = new DocumentSpan(positions);
else if (code == HtmlTag.BODY.code)
this.body = new DocumentSpan(positions);
}
void accept(byte code, CodedSequence positions) {
if (code == HtmlTag.HEADING.code)
this.heading = new DocumentSpan(positions);

View File

@@ -0,0 +1,25 @@
package nu.marginalia.index.forward.spans;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.nio.file.Path;
public interface IndexSpansReader extends AutoCloseable {
DocumentSpans readSpans(Arena arena, long encodedOffset) throws IOException;
DocumentSpans[] readSpans(Arena arena, long[] encodedOffsets) throws IOException;
static IndexSpansReader open(Path fileName) throws IOException {
int version = SpansCodec.parseSpanFilesFooter(fileName);
if (version == SpansCodec.SpansCodecVersion.COMPRESSED.ordinal()) {
return new IndexSpansReaderCompressed(fileName);
}
else if (version == SpansCodec.SpansCodecVersion.PLAIN.ordinal()) {
return new IndexSpansReaderPlain(fileName);
}
else {
throw new IllegalArgumentException("Unsupported spans file version: " + version);
}
}
void close() throws IOException;
}

View File

@@ -10,11 +10,11 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
@SuppressWarnings("preview")
public class ForwardIndexSpansReader implements AutoCloseable {
@Deprecated
public class IndexSpansReaderCompressed implements AutoCloseable, IndexSpansReader {
private final FileChannel spansFileChannel;
public ForwardIndexSpansReader(Path spansFile) throws IOException {
public IndexSpansReaderCompressed(Path spansFile) throws IOException {
this.spansFileChannel = (FileChannel) Files.newByteChannel(spansFile, StandardOpenOption.READ);
}
@@ -51,6 +51,17 @@ public class ForwardIndexSpansReader implements AutoCloseable {
return ret;
}
@Override
public DocumentSpans[] readSpans(Arena arena, long[] encodedOffsets) throws IOException {
DocumentSpans[] ret = new DocumentSpans[encodedOffsets.length];
for (int i = 0; i < encodedOffsets.length; i++) {
if (encodedOffsets[i] >= 0) {
ret[i] = readSpans(arena, encodedOffsets[i]);
}
}
return ret;
}
@Override
public void close() throws IOException {
spansFileChannel.close();

View File

@@ -0,0 +1,122 @@
package nu.marginalia.index.forward.spans;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
public class IndexSpansReaderPlain implements IndexSpansReader {
private final FileChannel[] spansFileChannels;
private final ForkJoinPool forkJoinPool;
public IndexSpansReaderPlain(Path spansFile) throws IOException {
this.spansFileChannels = new FileChannel[8];
for (int i = 0; i < spansFileChannels.length; i++) {
spansFileChannels[i] = (FileChannel) Files.newByteChannel(spansFile, StandardOpenOption.READ);
}
forkJoinPool = new ForkJoinPool(spansFileChannels.length);
}
@Override
public DocumentSpans readSpans(Arena arena, long encodedOffset) throws IOException {
// Decode the size and offset from the encoded offset
long size = SpansCodec.decodeSize(encodedOffset);
long offset = SpansCodec.decodeStartOffset(encodedOffset);
var ms = arena.allocate(size, 4);
// Allocate a buffer from the arena
var buffer = ms.asByteBuffer();
while (buffer.hasRemaining()) {
spansFileChannels[0].read(buffer, offset + buffer.position());
}
return decode(ms);
}
public DocumentSpans decode(MemorySegment ms) {
int count = ms.get(ValueLayout.JAVA_INT, 0);
int pos = 4;
DocumentSpans ret = new DocumentSpans();
// Decode each span
for (int spanIdx = 0; spanIdx < count; spanIdx++) {
byte code = ms.get(ValueLayout.JAVA_BYTE, pos);
short len = ms.get(ValueLayout.JAVA_SHORT, pos+2);
IntArrayList values = new IntArrayList(len);
pos += 4;
for (int i = 0; i < len; i++) {
values.add(ms.get(ValueLayout.JAVA_INT, pos + 4*i));
}
ret.accept(code, values);
pos += 4*len;
}
return ret;
}
@Override
public DocumentSpans[] readSpans(Arena arena, long[] encodedOffsets) throws IOException {
long totalSize = 0;
int numJobs = 0;
for (long offset : encodedOffsets) {
if (offset < 0)
continue;
totalSize += SpansCodec.decodeSize(offset);
numJobs++;
}
DocumentSpans[] ret = new DocumentSpans[encodedOffsets.length];
if (numJobs == 0) return ret;
CountDownLatch latch = new CountDownLatch(numJobs);
MemorySegment segment = arena.allocate(totalSize, 8);
long bufferOffset = 0;
for (int idx = 0; idx < encodedOffsets.length; idx++) {
long size = SpansCodec.decodeSize(encodedOffsets[idx]);
long start = SpansCodec.decodeStartOffset(encodedOffsets[idx]);
MemorySegment slice = segment.asSlice(bufferOffset, size);
bufferOffset += size;
int i = idx;
forkJoinPool.execute(() -> {
var buffer = slice.asByteBuffer();
try {
spansFileChannels[i% spansFileChannels.length].read(buffer, start);
ret[i] = decode(slice);
}
catch (IOException ex) {
throw new RuntimeException(ex);
}
finally {
latch.countDown();
}
});
}
try {
latch.await();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
return ret;
}
@Override
public void close() throws IOException {
for (var spansFileChannel : spansFileChannels) {
spansFileChannel.close();
}
}
}

View File

@@ -1,20 +1,23 @@
package nu.marginalia.index.forward.spans;
import nu.marginalia.sequence.VarintCodedSequence;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
public class ForwardIndexSpansWriter implements AutoCloseable {
public class IndexSpansWriter implements AutoCloseable {
private final FileChannel outputChannel;
private final ByteBuffer work = ByteBuffer.allocate(32);
private final ByteBuffer work = ByteBuffer.allocate(65536).order(ByteOrder.nativeOrder());
private long stateStartOffset = -1;
private int stateLength = -1;
public ForwardIndexSpansWriter(Path outputFileSpansData) throws IOException {
public IndexSpansWriter(Path outputFileSpansData) throws IOException {
this.outputChannel = (FileChannel) Files.newByteChannel(outputFileSpansData, StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
}
@@ -23,7 +26,7 @@ public class ForwardIndexSpansWriter implements AutoCloseable {
stateLength = 0;
work.clear();
work.put((byte) count);
work.putInt(count);
work.flip();
while (work.hasRemaining())
@@ -33,12 +36,17 @@ public class ForwardIndexSpansWriter implements AutoCloseable {
public void writeSpan(byte spanCode, ByteBuffer sequenceData) throws IOException {
work.clear();
work.put(spanCode);
work.putShort((short) sequenceData.remaining());
work.put((byte) 0); // Ensure we're byte aligned
var sequence = new VarintCodedSequence(sequenceData);
work.putShort((short) sequence.valueCount());
var iter = sequence.iterator();
while (iter.hasNext()) {
work.putInt(iter.nextInt());
}
work.flip();
while (work.hasRemaining() || sequenceData.hasRemaining()) {
stateLength += (int) outputChannel.write(new ByteBuffer[]{work, sequenceData});
}
stateLength += outputChannel.write(work);
}
public long endRecord() {
@@ -47,6 +55,11 @@ public class ForwardIndexSpansWriter implements AutoCloseable {
@Override
public void close() throws IOException {
ByteBuffer footer = SpansCodec.createSpanFilesFooter(SpansCodec.SpansCodecVersion.PLAIN);
outputChannel.position(outputChannel.size());
while (footer.hasRemaining()) {
outputChannel.write(footer, outputChannel.size());
}
outputChannel.close();
}
}

View File

@@ -1,6 +1,21 @@
package nu.marginalia.index.forward.spans;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
public class SpansCodec {
public static int MAGIC_INT = 0xF000F000;
public static int FOOTER_SIZE = 8;
enum SpansCodecVersion {
@Deprecated
COMPRESSED,
PLAIN
}
public static long encode(long startOffset, long size) {
assert size < 0x1000_0000L : "Size must be less than 2^28";
@@ -14,4 +29,31 @@ public class SpansCodec {
public static long decodeSize(long encoded) {
return encoded & 0x0FFF_FFFFL;
}
public static ByteBuffer createSpanFilesFooter(SpansCodecVersion version) {
ByteBuffer footer = ByteBuffer.allocate(FOOTER_SIZE);
footer.putInt(SpansCodec.MAGIC_INT);
footer.put((byte) version.ordinal());
footer.put((byte) 0);
footer.put((byte) 0);
footer.put((byte) 0);
footer.flip();
return footer;
}
public static int parseSpanFilesFooter(Path spansFile) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(FOOTER_SIZE);
try (var fc = FileChannel.open(spansFile, StandardOpenOption.READ)) {
if (fc.size() < FOOTER_SIZE) return 0;
fc.read(buffer, fc.size() - buffer.capacity());
buffer.flip();
int magic = buffer.getInt();
if (magic != MAGIC_INT) {
return 0;
}
return buffer.get();
}
}
}

View File

@@ -1,8 +1,9 @@
package nu.marginalia.index.forward;
import it.unimi.dsi.fastutil.ints.IntList;
import nu.marginalia.index.forward.spans.ForwardIndexSpansReader;
import nu.marginalia.index.forward.spans.ForwardIndexSpansWriter;
import nu.marginalia.index.forward.spans.IndexSpansReader;
import nu.marginalia.index.forward.spans.IndexSpansReaderPlain;
import nu.marginalia.index.forward.spans.IndexSpansWriter;
import nu.marginalia.language.sentence.tag.HtmlTag;
import nu.marginalia.sequence.VarintCodedSequence;
import org.junit.jupiter.api.AfterEach;
@@ -17,10 +18,10 @@ import java.nio.file.Path;
import static org.junit.jupiter.api.Assertions.*;
class ForwardIndexSpansReaderTest {
class IndexSpansReaderTest {
Path testFile = Files.createTempFile("test", ".idx");
ForwardIndexSpansReaderTest() throws IOException {
IndexSpansReaderTest() throws IOException {
}
@AfterEach
@@ -34,7 +35,7 @@ class ForwardIndexSpansReaderTest {
long offset1;
long offset2;
try (var writer = new ForwardIndexSpansWriter(testFile)) {
try (var writer = new IndexSpansWriter(testFile)) {
writer.beginRecord(1);
writer.writeSpan(HtmlTag.HEADING.code, VarintCodedSequence.generate(1, 3, 5, 8).buffer());
offset1 = writer.endRecord();
@@ -46,7 +47,7 @@ class ForwardIndexSpansReaderTest {
offset2 = writer.endRecord();
}
try (var reader = new ForwardIndexSpansReader(testFile);
try (var reader = IndexSpansReader.open(testFile);
var arena = Arena.ofConfined()
) {
var spans1 = reader.readSpans(arena, offset1);
@@ -77,13 +78,13 @@ class ForwardIndexSpansReaderTest {
@Test
void testContainsRange() throws IOException {
long offset1;
try (var writer = new ForwardIndexSpansWriter(testFile)) {
try (var writer = new IndexSpansWriter(testFile)) {
writer.beginRecord(1);
writer.writeSpan(HtmlTag.HEADING.code, VarintCodedSequence.generate( 1, 2, 10, 15, 20, 25).buffer());
offset1 = writer.endRecord();
}
try (var reader = new ForwardIndexSpansReader(testFile);
try (var reader = new IndexSpansReaderPlain(testFile);
var arena = Arena.ofConfined()
) {
var spans1 = reader.readSpans(arena, offset1);
@@ -104,13 +105,13 @@ class ForwardIndexSpansReaderTest {
@Test
void testContainsRangeExact() throws IOException {
long offset1;
try (var writer = new ForwardIndexSpansWriter(testFile)) {
try (var writer = new IndexSpansWriter(testFile)) {
writer.beginRecord(1);
writer.writeSpan(HtmlTag.HEADING.code, VarintCodedSequence.generate( 1, 2, 10, 15, 20, 25).buffer());
offset1 = writer.endRecord();
}
try (var reader = new ForwardIndexSpansReader(testFile);
try (var reader = new IndexSpansReaderPlain(testFile);
var arena = Arena.ofConfined()
) {
var spans1 = reader.readSpans(arena, offset1);
@@ -131,13 +132,13 @@ class ForwardIndexSpansReaderTest {
@Test
void testCountRangeMatches() throws IOException {
long offset1;
try (var writer = new ForwardIndexSpansWriter(testFile)) {
try (var writer = new IndexSpansWriter(testFile)) {
writer.beginRecord(1);
writer.writeSpan(HtmlTag.HEADING.code, VarintCodedSequence.generate( 1, 2, 10, 15, 20, 25).buffer());
offset1 = writer.endRecord();
}
try (var reader = new ForwardIndexSpansReader(testFile);
try (var reader = new IndexSpansReaderPlain(testFile);
var arena = Arena.ofConfined()
) {
var spans1 = reader.readSpans(arena, offset1);

View File

@@ -0,0 +1,53 @@
plugins {
id 'java'
id 'application'
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
}
}
application {
mainClass = 'nu.marginalia.index.perftest.PerfTestMain'
}
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation project(':code:common:config')
implementation project(':code:common:db')
implementation project(':code:libraries:array')
implementation project(':code:libraries:btree')
implementation project(':code:libraries:term-frequency-dict')
implementation project(':code:common:linkdb')
implementation project(':code:index')
implementation project(':code:index:query')
implementation project(':code:index:index-forward')
implementation project(':code:index:index-reverse')
implementation project(':third-party:commons-codec')
implementation project(':code:functions:search-query')
implementation project(':code:functions:search-query:api')
implementation libs.slop
implementation libs.roaringbitmap
implementation libs.bundles.slf4j
implementation libs.guava
libs.bundles.grpc.get().each {
implementation dependencies.create(it) {
exclude group: 'com.google.guava'
}
}
implementation libs.notnull
implementation libs.trove
implementation libs.fastutil
implementation libs.bundles.gson
implementation libs.bundles.mariadb
}

View File

@@ -0,0 +1,333 @@
package nu.marginalia.index.perftest;
import gnu.trove.list.array.TLongArrayList;
import nu.marginalia.api.searchquery.RpcQueryLimits;
import nu.marginalia.api.searchquery.model.query.NsfwFilterTier;
import nu.marginalia.api.searchquery.model.query.QueryParams;
import nu.marginalia.api.searchquery.model.query.SearchSpecification;
import nu.marginalia.api.searchquery.model.results.PrototypeRankingParameters;
import nu.marginalia.array.page.LongQueryBuffer;
import nu.marginalia.functions.searchquery.QueryFactory;
import nu.marginalia.functions.searchquery.query_parser.QueryExpansion;
import nu.marginalia.index.FullReverseIndexReader;
import nu.marginalia.index.IndexQueryExecution;
import nu.marginalia.index.PrioReverseIndexReader;
import nu.marginalia.index.forward.ForwardIndexReader;
import nu.marginalia.index.index.CombinedIndexReader;
import nu.marginalia.index.index.StatefulIndex;
import nu.marginalia.index.model.ResultRankingContext;
import nu.marginalia.index.model.SearchParameters;
import nu.marginalia.index.model.SearchTerms;
import nu.marginalia.index.positions.PositionsFileReader;
import nu.marginalia.index.query.IndexQuery;
import nu.marginalia.index.results.DomainRankingOverrides;
import nu.marginalia.index.results.IndexResultRankingService;
import nu.marginalia.index.results.model.ids.CombinedDocIdList;
import nu.marginalia.index.searchset.SearchSetAny;
import nu.marginalia.linkdb.docs.DocumentDbReader;
import nu.marginalia.segmentation.NgramLexicon;
import nu.marginalia.term_frequency_dict.TermFrequencyDict;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class PerfTestMain {
static Duration warmupTime = Duration.ofMinutes(1);
static Duration runTime = Duration.ofMinutes(10);
public static void main(String[] args) {
if (args.length != 4) {
System.err.println("Arguments: home-dir index-dir query");
System.exit(255);
}
try {
Path indexDir = Paths.get(args[0]);
if (!Files.isDirectory(indexDir)) {
System.err.println("Index directory is not a directory");
System.exit(255);
}
Path homeDir = Paths.get(args[1]);
String scenario = args[2];
String query = args[3];
switch (scenario) {
case "valuation" -> runValuation(indexDir, homeDir, query);
case "lookup" -> runLookup(indexDir, homeDir, query);
case "execution" -> runExecution(indexDir, homeDir, query);
}
}
catch (NumberFormatException e) {
System.err.println("Arguments: data-dir index-dir query");
System.exit(255);
}
catch (Exception ex) {
System.err.println("Error during testing");
ex.printStackTrace();
System.exit(255);
}
System.out.println(Arrays.toString(args));
}
private static CombinedIndexReader createCombinedIndexReader(Path indexDir) throws IOException {
return new CombinedIndexReader(
new ForwardIndexReader(
indexDir.resolve("ir/fwd-doc-id.dat"),
indexDir.resolve("ir/fwd-doc-data.dat"),
indexDir.resolve("ir/fwd-spans.dat")
),
new FullReverseIndexReader(
"full",
indexDir.resolve("ir/rev-words.dat"),
indexDir.resolve("ir/rev-docs.dat"),
new PositionsFileReader(indexDir.resolve("ir/rev-positions.dat"))
),
new PrioReverseIndexReader(
"prio",
indexDir.resolve("ir/rev-prio-words.dat"),
indexDir.resolve("ir/rev-prio-docs.dat")
)
);
}
private static IndexResultRankingService createIndexResultRankingService(Path indexDir, CombinedIndexReader combinedIndexReader) throws IOException, SQLException {
return new IndexResultRankingService(
new DocumentDbReader(indexDir.resolve("ldbr/documents.db")),
new StatefulIndex(combinedIndexReader),
new DomainRankingOverrides(null, Path.of("xxxx"))
);
}
static QueryFactory createQueryFactory(Path homeDir) throws IOException {
return new QueryFactory(
new QueryExpansion(
new TermFrequencyDict(homeDir.resolve("model/tfreq-new-algo3.bin")),
new NgramLexicon()
)
);
}
public static void runValuation(Path homeDir,
Path indexDir,
String rawQuery) throws IOException, SQLException
{
CombinedIndexReader indexReader = createCombinedIndexReader(indexDir);
QueryFactory queryFactory = createQueryFactory(homeDir);
IndexResultRankingService rankingService = createIndexResultRankingService(indexDir, indexReader);
var queryLimits = RpcQueryLimits.newBuilder()
.setTimeoutMs(10_000)
.setResultsTotal(1000)
.setResultsByDomain(10)
.setFetchSize(4096)
.build();
SearchSpecification parsedQuery = queryFactory.createQuery(new QueryParams(rawQuery, queryLimits, "NONE", NsfwFilterTier.OFF), PrototypeRankingParameters.sensibleDefaults()).specs;
System.out.println("Query compiled to: " + parsedQuery.query.compiledQuery);
SearchParameters searchParameters = new SearchParameters(parsedQuery, new SearchSetAny());
List<IndexQuery> queries = indexReader.createQueries(new SearchTerms(searchParameters.query, searchParameters.compiledQueryIds), searchParameters.queryParams);
TLongArrayList allResults = new TLongArrayList();
LongQueryBuffer buffer = new LongQueryBuffer(4096);
for (var query : queries) {
while (query.hasMore() && allResults.size() < 4096 ) {
query.getMoreResults(buffer);
allResults.addAll(buffer.copyData());
}
if (allResults.size() >= 4096)
break;
}
allResults.sort();
if (allResults.size() > 4096) {
allResults.subList(4096, allResults.size()).clear();
}
var docIds = new CombinedDocIdList(allResults.toArray());
var rankingContext = ResultRankingContext.create(indexReader, searchParameters);
System.out.println("Running warmup loop!");
int sum = 0;
Instant runEndTime = Instant.now().plus(warmupTime);
int iter;
for (iter = 0;; iter++) {
sum += rankingService.rankResults(rankingContext, docIds, false).size();
if ((iter % 100) == 0 && Instant.now().isAfter(runEndTime)) {
break;
}
}
System.out.println("Warmup complete after " + iter + " iters!");
runEndTime = Instant.now().plus(runTime);
Instant runStartTime = Instant.now();
int sum2 = 0;
List<Double> times = new ArrayList<>();
for (iter = 0;; iter++) {
long start = System.nanoTime();
sum2 += rankingService.rankResults(rankingContext, docIds, false).size();
long end = System.nanoTime();
times.add((end - start)/1_000_000.);
if ((iter % 100) == 0) {
if (Instant.now().isAfter(runEndTime)) {
break;
}
System.out.println(Duration.between(runStartTime, Instant.now()).toMillis() / 1000. + " best times: " + (allResults.size() / 4096.) * times.stream().mapToDouble(Double::doubleValue).sorted().limit(3).average().orElse(-1));
}
}
System.out.println("Benchmark complete after " + iter + " iters!");
System.out.println("Best times: " + (allResults.size() / 4096.) * times.stream().mapToDouble(Double::doubleValue).sorted().limit(3).average().orElse(-1));
System.out.println("Warmup sum: " + sum);
System.out.println("Main sum: " + sum2);
System.out.println(docIds.size());
}
public static void runExecution(Path homeDir,
Path indexDir,
String rawQuery) throws IOException, SQLException, InterruptedException {
CombinedIndexReader indexReader = createCombinedIndexReader(indexDir);
QueryFactory queryFactory = createQueryFactory(homeDir);
IndexResultRankingService rankingService = createIndexResultRankingService(indexDir, indexReader);
var queryLimits = RpcQueryLimits.newBuilder()
.setTimeoutMs(50)
.setResultsTotal(1000)
.setResultsByDomain(10)
.setFetchSize(4096)
.build();
SearchSpecification parsedQuery = queryFactory.createQuery(new QueryParams(rawQuery, queryLimits, "NONE", NsfwFilterTier.OFF), PrototypeRankingParameters.sensibleDefaults()).specs;
System.out.println("Query compiled to: " + parsedQuery.query.compiledQuery);
System.out.println("Running warmup loop!");
int sum = 0;
Instant runEndTime = Instant.now().plus(warmupTime);
int iter;
for (iter = 0;; iter++) {
SearchParameters searchParameters = new SearchParameters(parsedQuery, new SearchSetAny());
var execution = new IndexQueryExecution(searchParameters, rankingService, indexReader);
execution.run();
sum += execution.itemsProcessed();
if ((iter % 100) == 0 && Instant.now().isAfter(runEndTime)) {
break;
}
}
System.out.println("Warmup complete after " + iter + " iters!");
runEndTime = Instant.now().plus(runTime);
Instant runStartTime = Instant.now();
int sum2 = 0;
List<Double> rates = new ArrayList<>();
for (iter = 0;; iter++) {
SearchParameters searchParameters = new SearchParameters(parsedQuery, new SearchSetAny());
var execution = new IndexQueryExecution(searchParameters, rankingService, indexReader);
long start = System.nanoTime();
execution.run();
long end = System.nanoTime();
sum2 += execution.itemsProcessed();
rates.add(execution.itemsProcessed() / ((end - start)/1_000_000_000.));
if ((iter % 100) == 0) {
if (Instant.now().isAfter(runEndTime)) {
break;
}
System.out.println(Duration.between(runStartTime, Instant.now()).toMillis() / 1000. + " best rates: " + rates.stream().mapToDouble(Double::doubleValue).map(i -> -i).sorted().map(i -> -i).limit(3).average().orElse(-1));
}
}
System.out.println("Benchmark complete after " + iter + " iters!");
System.out.println("Best counts: " + rates.stream().mapToDouble(Double::doubleValue).map(i -> -i).sorted().map(i -> -i).limit(3).average().orElse(-1));
System.out.println("Warmup sum: " + sum);
System.out.println("Main sum: " + sum2);
}
public static void runLookup(Path homeDir,
Path indexDir,
String rawQuery) throws IOException, SQLException
{
CombinedIndexReader indexReader = createCombinedIndexReader(indexDir);
QueryFactory queryFactory = createQueryFactory(homeDir);
var queryLimits = RpcQueryLimits.newBuilder()
.setTimeoutMs(10_000)
.setResultsTotal(1000)
.setResultsByDomain(10)
.setFetchSize(4096)
.build();
SearchSpecification parsedQuery = queryFactory.createQuery(new QueryParams(rawQuery, queryLimits, "NONE", NsfwFilterTier.OFF), PrototypeRankingParameters.sensibleDefaults()).specs;
System.out.println("Query compiled to: " + parsedQuery.query.compiledQuery);
SearchParameters searchParameters = new SearchParameters(parsedQuery, new SearchSetAny());
Instant runEndTime = Instant.now().plus(warmupTime);
LongQueryBuffer buffer = new LongQueryBuffer(4096);
int sum1 = 0;
int iter;
for (iter = 0;; iter++) {
List<IndexQuery> queries = indexReader.createQueries(new SearchTerms(searchParameters.query, searchParameters.compiledQueryIds), searchParameters.queryParams);
for (var query : queries) {
while (query.hasMore()) {
query.getMoreResults(buffer);
sum1 += buffer.end;
buffer.reset();
}
}
if ((iter % 100) == 0 && Instant.now().isAfter(runEndTime)) {
break;
}
}
System.out.println("Warmup complete after " + iter + " iters with sum1 = " + sum1);
runEndTime = Instant.now().plus(runTime);
Instant runStartTime = Instant.now();
int sum2 = 0;
List<Double> times = new ArrayList<>();
for (iter = 0;; iter++) {
List<IndexQuery> queries = indexReader.createQueries(new SearchTerms(searchParameters.query, searchParameters.compiledQueryIds), searchParameters.queryParams);
long start = System.nanoTime();
for (var query : queries) {
while (query.hasMore()) {
query.getMoreResults(buffer);
sum1 += buffer.end;
buffer.reset();
}
}
long end = System.nanoTime();
times.add((end - start)/1_000_000.);
if ((iter % 100) == 0) {
if (Instant.now().isAfter(runEndTime)) {
break;
}
System.out.println(Duration.between(runStartTime, Instant.now()).toMillis() / 1000. + " best times: " + times.stream().mapToDouble(Double::doubleValue).sorted().limit(3).average().orElse(-1));
}
}
System.out.println("Benchmark complete after " + iter + " iters!");
System.out.println("Best times: " + times.stream().mapToDouble(Double::doubleValue).sorted().limit(3).average().orElse(-1));
System.out.println("Warmup sum: " + sum1);
System.out.println("Main sum: " + sum2);
}
}

View File

@@ -3,8 +3,8 @@ package nu.marginalia.index;
import nu.marginalia.array.LongArray;
import nu.marginalia.array.LongArrayFactory;
import nu.marginalia.btree.BTreeReader;
import nu.marginalia.index.positions.TermData;
import nu.marginalia.index.positions.PositionsFileReader;
import nu.marginalia.index.positions.TermData;
import nu.marginalia.index.query.EmptyEntrySource;
import nu.marginalia.index.query.EntrySource;
import nu.marginalia.index.query.ReverseIndexRejectFilter;
@@ -161,12 +161,7 @@ public class FullReverseIndexReader {
// Read the size and offset of the position data
var offsets = reader.queryData(docIds, 1);
for (int i = 0; i < docIds.length; i++) {
if (offsets[i] == 0)
continue;
ret[i] = positionsFileReader.getTermData(arena, offsets[i]);
}
return ret;
return positionsFileReader.getTermData(arena, offsets);
}
public void close() {

View File

@@ -5,39 +5,84 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
/** Reads positions data from the positions file */
public class PositionsFileReader implements AutoCloseable {
private final FileChannel positions;
// We use multiple file channels to avoid reads becoming serialized by the kernel.
// If we don't do this, multi-threaded reads become strictly slower than single-threaded reads
// (which is why AsynchronousFileChannel sucks).
// This is likely the best option apart from O_DIRECT or FFI:ing in libaio or io_uring.
private final FileChannel[] positions;
private final ForkJoinPool forkJoinPool;
private static final Logger logger = LoggerFactory.getLogger(PositionsFileReader.class);
public PositionsFileReader(Path positionsFile) throws IOException {
this.positions = FileChannel.open(positionsFile, StandardOpenOption.READ);
this(positionsFile, 8);
}
/** Get the positions for a term in the index, as pointed out by the encoded offset;
* intermediate buffers are allocated from the provided arena allocator. */
public TermData getTermData(Arena arena, long sizeEncodedOffset) {
int length = PositionCodec.decodeSize(sizeEncodedOffset);
long offset = PositionCodec.decodeOffset(sizeEncodedOffset);
var segment = arena.allocate(length);
var buffer = segment.asByteBuffer();
try {
positions.read(buffer, offset);
} catch (IOException e) {
throw new RuntimeException(e);
public PositionsFileReader(Path positionsFile, int nreaders) throws IOException {
positions = new FileChannel[nreaders];
for (int i = 0; i < positions.length; i++) {
positions[i] = FileChannel.open(positionsFile, StandardOpenOption.READ);
}
return new TermData(buffer);
forkJoinPool = new ForkJoinPool(nreaders);
}
@Override
public void close() throws IOException {
positions.close();
for (FileChannel fc : positions) {
fc.close();
}
forkJoinPool.close();
}
/** Get the positions for a keywords in the index, as pointed out by the encoded offsets;
* intermediate buffers are allocated from the provided arena allocator. */
public TermData[] getTermData(Arena arena, long[] offsets) {
TermData[] ret = new TermData[offsets.length];
int tasks = 0;
for (long l : offsets) if (l != 0) tasks++;
CountDownLatch cl = new CountDownLatch(tasks);
for (int i = 0; i < offsets.length; i++) {
long encodedOffset = offsets[i];
if (encodedOffset == 0) continue;
int idx = i;
int length = PositionCodec.decodeSize(encodedOffset);
long offset = PositionCodec.decodeOffset(encodedOffset);
ByteBuffer buffer = arena.allocate(length).asByteBuffer();
forkJoinPool.execute(() -> {
try {
positions[idx % positions.length].read(buffer, offset);
ret[idx] = new TermData(buffer);
cl.countDown();
}
catch (IOException ex) {
logger.error("Failed to read positions file", ex);
}
});
}
try {
cl.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return ret;
}
}

View File

@@ -11,7 +11,6 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -32,7 +31,6 @@ class PositionsFileReaderTest {
@Test
void getTermData() throws IOException {
ByteBuffer workArea = ByteBuffer.allocate(8192);
long key1, key2, key3;
try (PositionsFileConstructor constructor = new PositionsFileConstructor(file)) {
key1 = constructor.add((byte) 43, VarintCodedSequence.generate(1, 2, 3).buffer());
@@ -44,20 +42,19 @@ class PositionsFileReaderTest {
System.out.println("key2: " + Long.toHexString(key2));
System.out.println("key3: " + Long.toHexString(key3));
try (Arena arena = Arena.ofConfined();
try (Arena arena = Arena.ofShared();
PositionsFileReader reader = new PositionsFileReader(file))
{
TermData data1 = reader.getTermData(arena, key1);
assertEquals(43, data1.flags());
assertEquals(IntList.of( 1, 2, 3), data1.positions().values());
TermData[] data = reader.getTermData(arena, new long[] { key1, key2, key3 });
TermData data2 = reader.getTermData(arena, key2);
assertEquals(51, data2.flags());
assertEquals(IntList.of(2, 3, 5, 1000, 5000, 20241), data2.positions().values());
assertEquals(43, data[0].flags());
assertEquals(IntList.of( 1, 2, 3), data[0].positions().values());
TermData data3 = reader.getTermData(arena, key3);
assertEquals(61, data3.flags());
assertEquals(IntList.of(3, 5, 7), data3.positions().values());
assertEquals(51, data[1].flags());
assertEquals(IntList.of(2, 3, 5, 1000, 5000, 20241), data[1].positions().values());
assertEquals(61, data[2].flags());
assertEquals(IntList.of(3, 5, 7), data[2].positions().values());
}
}
}

View File

@@ -7,24 +7,13 @@ import io.grpc.stub.StreamObserver;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import nu.marginalia.api.searchquery.IndexApiGrpc;
import nu.marginalia.api.searchquery.RpcDecoratedResultItem;
import nu.marginalia.api.searchquery.RpcIndexQuery;
import nu.marginalia.api.searchquery.RpcResultRankingParameters;
import nu.marginalia.api.searchquery.model.compiled.CompiledQuery;
import nu.marginalia.api.searchquery.model.compiled.CompiledQueryLong;
import nu.marginalia.api.searchquery.model.compiled.CqDataInt;
import nu.marginalia.api.searchquery.model.query.SearchSpecification;
import nu.marginalia.api.searchquery.model.results.ResultRankingContext;
import nu.marginalia.array.page.LongQueryBuffer;
import nu.marginalia.index.index.StatefulIndex;
import nu.marginalia.index.model.SearchParameters;
import nu.marginalia.index.model.SearchTerms;
import nu.marginalia.index.query.IndexQuery;
import nu.marginalia.index.query.IndexSearchBudget;
import nu.marginalia.index.results.IndexResultRankingService;
import nu.marginalia.index.results.model.ids.CombinedDocIdList;
import nu.marginalia.index.searchset.SearchSet;
import nu.marginalia.index.searchset.SearchSetsService;
import nu.marginalia.index.searchset.SmallSearchSet;
@@ -35,14 +24,7 @@ import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@Singleton
public class IndexGrpcService
@@ -88,23 +70,22 @@ public class IndexGrpcService
private final StatefulIndex statefulIndex;
private final SearchSetsService searchSetsService;
private final IndexResultRankingService resultValuator;
private final IndexResultRankingService rankingService;
private final String nodeName;
private static final int indexValuationThreads = Integer.getInteger("index.valuationThreads", 16);
@Inject
public IndexGrpcService(ServiceConfiguration serviceConfiguration,
StatefulIndex statefulIndex,
SearchSetsService searchSetsService,
IndexResultRankingService resultValuator)
IndexResultRankingService rankingService)
{
var nodeId = serviceConfiguration.node();
this.nodeName = Integer.toString(nodeId);
this.statefulIndex = statefulIndex;
this.searchSetsService = searchSetsService;
this.resultValuator = resultValuator;
this.rankingService = rankingService;
}
// GRPC endpoint
@@ -121,7 +102,13 @@ public class IndexGrpcService
.time(() -> {
// Perform the search
try {
return executeSearch(params);
if (!statefulIndex.isLoaded()) {
// Short-circuit if the index is not loaded, as we trivially know that there can be no results
return List.of();
}
return new IndexQueryExecution(params, rankingService, statefulIndex.get()).run();
}
catch (Exception ex) {
logger.error("Error in handling request", ex);
@@ -157,7 +144,12 @@ public class IndexGrpcService
// exists for test access
public List<RpcDecoratedResultItem> justQuery(SearchSpecification specsSet) {
try {
return executeSearch(new SearchParameters(specsSet, getSearchSet(specsSet)));
if (!statefulIndex.isLoaded()) {
// Short-circuit if the index is not loaded, as we trivially know that there can be no results
return List.of();
}
return new IndexQueryExecution(new SearchParameters(specsSet, getSearchSet(specsSet)), rankingService, statefulIndex.get()).run();
}
catch (Exception ex) {
logger.error("Error in handling request", ex);
@@ -183,262 +175,6 @@ public class IndexGrpcService
return searchSetsService.getSearchSetByName(request.getSearchSetIdentifier());
}
// accessible for tests
public List<RpcDecoratedResultItem> executeSearch(SearchParameters params) throws Exception {
if (!statefulIndex.isLoaded()) {
// Short-circuit if the index is not loaded, as we trivially know that there can be no results
return List.of();
}
ResultRankingContext rankingContext = createRankingContext(params.rankingParams,
params.compiledQuery,
params.compiledQueryIds);
var queryExecution = new QueryExecution(rankingContext, params.fetchSize);
List<RpcDecoratedResultItem> ret = queryExecution.run(params);
wmsa_index_query_exec_block_time
.labels(nodeName)
.set(queryExecution.getBlockTime() / 1000.);
wmsa_index_query_exec_stall_time
.labels(nodeName)
.set(queryExecution.getStallTime() / 1000.);
return ret;
}
/** This class is responsible for ranking the results and adding the best results to the
* resultHeap, which depending on the state of the indexLookup threads may or may not block
*/
private ResultRankingContext createRankingContext(RpcResultRankingParameters rankingParams,
CompiledQuery<String> compiledQuery,
CompiledQueryLong compiledQueryIds)
{
int[] full = new int[compiledQueryIds.size()];
int[] prio = new int[compiledQueryIds.size()];
BitSet ngramsMask = new BitSet(compiledQuery.size());
BitSet regularMask = new BitSet(compiledQuery.size());
var currentIndex = statefulIndex.get();
for (int idx = 0; idx < compiledQueryIds.size(); idx++) {
long id = compiledQueryIds.at(idx);
full[idx] = currentIndex.numHits(id);
prio[idx] = currentIndex.numHitsPrio(id);
if (compiledQuery.at(idx).contains("_")) {
ngramsMask.set(idx);
}
else {
regularMask.set(idx);
}
}
return new ResultRankingContext(currentIndex.totalDocCount(),
rankingParams,
ngramsMask,
regularMask,
new CqDataInt(full),
new CqDataInt(prio));
}
/** This class is responsible for executing a search query. It uses a thread pool to
* execute the subqueries and their valuation in parallel. The results are then combined
* into a bounded priority queue, and finally the best results are returned.
*/
private class QueryExecution {
private static final Executor workerPool = Executors.newCachedThreadPool();
/** The queue where the results from the index lookup threads are placed,
* pending ranking by the result ranker threads */
private final ArrayBlockingQueue<CombinedDocIdList> resultCandidateQueue
= new ArrayBlockingQueue<>(64);
private final ResultPriorityQueue resultHeap;
private final ResultRankingContext resultRankingContext;
private final AtomicInteger remainingIndexTasks = new AtomicInteger(0);
private final AtomicInteger remainingValuationTasks = new AtomicInteger(0);
private final AtomicLong blockTime = new AtomicLong(0);
private final AtomicLong stallTime = new AtomicLong(0);
public long getStallTime() {
return stallTime.get();
}
public long getBlockTime() {
return blockTime.get();
}
private QueryExecution(ResultRankingContext resultRankingContext, int maxResults) {
this.resultRankingContext = resultRankingContext;
this.resultHeap = new ResultPriorityQueue(maxResults);
}
/** Execute a search query */
public List<RpcDecoratedResultItem> run(SearchParameters parameters) throws Exception {
var terms = new SearchTerms(parameters.query, parameters.compiledQueryIds);
var currentIndex = statefulIndex.get();
for (var indexQuery : currentIndex.createQueries(terms, parameters.queryParams)) {
workerPool.execute(new IndexLookup(indexQuery, parameters.budget));
}
for (int i = 0; i < indexValuationThreads; i++) {
workerPool.execute(new ResultRanker(parameters, resultRankingContext));
}
// Wait for all tasks to complete
awaitCompletion();
// Return the best results
return resultValuator.selectBestResults(parameters, resultRankingContext, resultHeap);
}
/** Wait for all tasks to complete */
private void awaitCompletion() throws InterruptedException {
synchronized (remainingValuationTasks) {
while (remainingValuationTasks.get() > 0) {
remainingValuationTasks.wait(20);
}
}
}
/** This class is responsible for executing a subquery and adding the results to the
* resultCandidateQueue, which depending on the state of the valuator threads may
* or may not block */
class IndexLookup implements Runnable {
private final IndexQuery query;
private final IndexSearchBudget budget;
IndexLookup(IndexQuery query,
IndexSearchBudget budget) {
this.query = query;
this.budget = budget;
remainingIndexTasks.incrementAndGet();
}
public void run() {
try {
executeSearch();
}
catch (Exception ex) {
logger.error("Error in index lookup", ex);
}
finally {
synchronized (remainingIndexTasks) {
if (remainingIndexTasks.decrementAndGet() == 0) {
remainingIndexTasks.notifyAll();
}
}
}
}
private void executeSearch() {
final LongArrayList results = new LongArrayList(16);
// These queries are different indices for one subquery
final LongQueryBuffer buffer = new LongQueryBuffer(4096);
while (query.hasMore() && budget.hasTimeLeft())
{
buffer.reset();
query.getMoreResults(buffer);
for (int i = 0; i < buffer.end; i+=16) {
for (int j = 0; j < Math.min(buffer.end - i, 16); j++) {
results.add(buffer.data.get(i+j));
}
enqueueResults(new CombinedDocIdList(results));
results.clear();
}
}
buffer.dispose();
}
private void enqueueResults(CombinedDocIdList resultIds) {
long remainingTime = budget.timeLeft();
try {
if (!resultCandidateQueue.offer(resultIds)) {
long start = System.currentTimeMillis();
resultCandidateQueue.offer(resultIds, remainingTime, TimeUnit.MILLISECONDS);
blockTime.addAndGet(System.currentTimeMillis() - start);
}
}
catch (InterruptedException e) {
logger.warn("Interrupted while waiting to offer resultIds to queue", e);
}
}
}
class ResultRanker implements Runnable {
private final SearchParameters parameters;
private final ResultRankingContext rankingContext;
ResultRanker(SearchParameters parameters, ResultRankingContext rankingContext) {
this.parameters = parameters;
this.rankingContext = rankingContext;
remainingValuationTasks.incrementAndGet();
}
public void run() {
try {
while (parameters.budget.timeLeft() > 0 && execute());
}
catch (InterruptedException e) {
logger.warn("Interrupted while waiting to poll resultIds from queue", e);
}
catch (Exception e) {
logger.error("Exception while ranking results", e);
}
finally {
synchronized (remainingValuationTasks) {
if (remainingValuationTasks.decrementAndGet() == 0)
remainingValuationTasks.notifyAll();
}
}
}
private boolean execute() throws Exception {
long start = System.currentTimeMillis();
// Do a relatively short poll to ensure we terminate in a timely manner
// in the event all work is done
final long pollTime = Math.clamp(parameters.budget.timeLeft(), 1, 5);
CombinedDocIdList resultIds = resultCandidateQueue.poll(pollTime, TimeUnit.MILLISECONDS);
if (resultIds == null) {
// check if we are done and can terminate
if (remainingIndexTasks.get() == 0 && resultCandidateQueue.isEmpty()) {
return false;
}
}
else {
stallTime.addAndGet(System.currentTimeMillis() - start);
resultHeap.addAll(
resultValuator.rankResults(parameters, false, rankingContext, resultIds)
);
}
return true; // keep going
}
}
}
}

View File

@@ -0,0 +1,137 @@
package nu.marginalia.index;
import nu.marginalia.api.searchquery.RpcDecoratedResultItem;
import nu.marginalia.array.page.LongQueryBuffer;
import nu.marginalia.index.index.CombinedIndexReader;
import nu.marginalia.index.model.ResultRankingContext;
import nu.marginalia.index.model.SearchParameters;
import nu.marginalia.index.model.SearchTerms;
import nu.marginalia.index.query.IndexQuery;
import nu.marginalia.index.query.IndexSearchBudget;
import nu.marginalia.index.results.IndexResultRankingService;
import nu.marginalia.index.results.model.ids.CombinedDocIdList;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
/** Performs an index query */
public class IndexQueryExecution {
private static final int indexValuationThreads = Integer.getInteger("index.valuationThreads", 16);
private static final ForkJoinPool lookupPool = new ForkJoinPool(indexValuationThreads);
private static final ForkJoinPool evaluationPool = new ForkJoinPool(indexValuationThreads);
private final IndexResultRankingService rankingService;
private final ResultRankingContext rankingContext;
private final List<IndexQuery> queries;
private final IndexSearchBudget budget;
private final ResultPriorityQueue resultHeap;
private final CountDownLatch executionCountdown;
private final int limitTotal;
private final int limitByDomain;
private int evaluationJobCounter;
public IndexQueryExecution(SearchParameters params,
IndexResultRankingService rankingService,
CombinedIndexReader currentIndex) {
this.rankingService = rankingService;
resultHeap = new ResultPriorityQueue(params.fetchSize);
budget = params.budget;
limitByDomain = params.limitByDomain;
limitTotal = params.limitTotal;
rankingContext = ResultRankingContext.create(currentIndex, params);
queries = currentIndex.createQueries(new SearchTerms(params.query, params.compiledQueryIds), params.queryParams);
executionCountdown = new CountDownLatch(queries.size());
evaluationJobCounter = 0;
}
public List<RpcDecoratedResultItem> run() throws InterruptedException, SQLException {
// Spawn lookup tasks for each query
for (IndexQuery query : queries) {
lookupPool.execute(() -> lookup(query));
}
// Await lookup task termination (this guarantees we're no longer creating new evaluation tasks)
executionCountdown.await();
// Await evaluation task termination
synchronized (IndexQueryExecution.this) {
while (evaluationJobCounter > 0 && budget.hasTimeLeft()) {
IndexQueryExecution.this.wait(budget.timeLeft());
}
}
// Final result selection
return rankingService.selectBestResults(limitByDomain, limitTotal, rankingContext, resultHeap);
}
private void lookup(IndexQuery query) {
final LongQueryBuffer buffer = new LongQueryBuffer(512);
try {
while (query.hasMore() && budget.hasTimeLeft()) {
buffer.reset();
query.getMoreResults(buffer);
if (buffer.isEmpty())
continue;
CombinedDocIdList docIds = new CombinedDocIdList(buffer);
boolean stealWork = false;
synchronized (IndexQueryExecution.this) {
// Hold off on spawning new evaluation jobs if we have too many queued
// to avoid backpressure, instead steal work into the lookup thread
// in this scenario
if (evaluationJobCounter > indexValuationThreads * 8) {
stealWork = true;
}
else {
evaluationJobCounter++;
}
}
if (stealWork) {
resultHeap.addAll(rankingService.rankResults(rankingContext, docIds, false));
}
else {
// Spawn an evaluation task
evaluationPool.execute(() -> evaluate(docIds));
}
}
} finally {
buffer.dispose();
executionCountdown.countDown();
}
}
private void evaluate(CombinedDocIdList docIds) {
try {
if (!budget.hasTimeLeft())
return;
resultHeap.addAll(rankingService.rankResults(rankingContext, docIds, false));
} finally {
synchronized (IndexQueryExecution.this) {
if (--evaluationJobCounter == 0) {
IndexQueryExecution.this.notifyAll();
}
}
}
}
public int itemsProcessed() {
return resultHeap.getItemsProcessed();
}
}

View File

@@ -1,116 +1,59 @@
package nu.marginalia.index;
import com.google.common.collect.MinMaxPriorityQueue;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import nu.marginalia.api.searchquery.model.results.SearchResultItem;
import org.jetbrains.annotations.NotNull;
import java.util.*;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
/** A priority queue for search results. This class is not thread-safe,
* in general, except for concurrent use of the addAll method.
* <p></p>
* The class implements a subset of the Collection interface, and
* is intended to be used as a priority queue for search results,
* with a maximum size.
* <p></p>
* Since the expected use case is to add a large number of items
* and then iterate over the items, the class is optimized for
* this scenario, and does not implement other mutating methods
* than addAll().
*/
public class ResultPriorityQueue implements Iterable<SearchResultItem>,
Collection<SearchResultItem> {
private final int limit;
private final ArrayList<SearchResultItem> backingList = new ArrayList<>();
public class ResultPriorityQueue implements Iterable<SearchResultItem> {
private final LongOpenHashSet idsInSet = new LongOpenHashSet();
private final MinMaxPriorityQueue<SearchResultItem> queue;
private int itemsProcessed = 0;
public ResultPriorityQueue(int limit) {
this.limit = limit;
this.queue = MinMaxPriorityQueue.<SearchResultItem>orderedBy(Comparator.naturalOrder()).maximumSize(limit).create();
}
public Iterator<SearchResultItem> iterator() {
return backingList.iterator();
}
@NotNull
@Override
public Object[] toArray() {
return backingList.toArray();
}
@NotNull
@Override
public <T> T[] toArray(@NotNull T[] a) {
return backingList.toArray(a);
}
@Override
public boolean add(SearchResultItem searchResultItem) {
throw new UnsupportedOperationException("Use addAll instead");
}
@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsAll(@NotNull Collection<?> c) {
return idsInSet.containsAll(c);
return queue.iterator();
}
/** Adds all items to the queue, and returns true if any items were added.
* This is a thread-safe operation.
*/
@Override
public synchronized boolean addAll(@NotNull Collection<? extends SearchResultItem> items) {
boolean itemsAdded = false;
for (var item: items) {
if (idsInSet.add(item.getDocumentId())) {
backingList.add(item);
itemsAdded = true;
}
}
if (!itemsAdded) {
return false;
}
itemsProcessed+=items.size();
backingList.sort(Comparator.naturalOrder());
if (backingList.size() > limit) {
backingList.subList(limit, backingList.size()).clear();
for (var item : items) {
if (idsInSet.add(item.getDocumentId())) {
queue.add(item);
}
}
return true;
}
@Override
public boolean removeAll(@NotNull Collection<?> c) {
throw new UnsupportedOperationException();
}
@Override
public boolean retainAll(@NotNull Collection<?> c) {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
backingList.clear();
idsInSet.clear();
}
public int size() {
return backingList.size();
return queue.size();
}
public int getItemsProcessed() {
return itemsProcessed;
}
@Override
public boolean isEmpty() {
return backingList.isEmpty();
}
@Override
public boolean contains(Object o) {
return backingList.contains(o);
return queue.isEmpty();
}
}

View File

@@ -205,14 +205,19 @@ public class CombinedIndexReader {
return forwardIndexReader.getDocumentSize(docId);
}
/** Retrieves the document spans for the specified document */
public DocumentSpans getDocumentSpans(Arena arena, long docId) {
return forwardIndexReader.getDocumentSpans(arena, docId);
/** Retrieves the document spans for the specified documents */
public DocumentSpans[] getDocumentSpans(Arena arena, CombinedDocIdList docIds) {
long[] decodedIDs = docIds.array();
for (int i = 0; i < decodedIDs.length; i++) {
decodedIDs[i] = UrlIdCodec.removeRank(decodedIDs[i]);
}
return forwardIndexReader.getDocumentSpans(arena, decodedIDs);
}
/** Close the indexes (this is not done immediately)
* */
public void close() throws InterruptedException {
public void close() {
/* Delay the invocation of close method to allow for a clean shutdown of the service.
*
* This is especially important when using Unsafe-based LongArrays, since we have
@@ -227,7 +232,7 @@ public class CombinedIndexReader {
}
private void delayedCall(Runnable call, Duration delay) throws InterruptedException {
private void delayedCall(Runnable call, Duration delay) {
Thread.ofPlatform().start(() -> {
try {
TimeUnit.SECONDS.sleep(delay.toSeconds());
@@ -248,12 +253,13 @@ public class CombinedIndexReader {
class ParamMatchingQueryFilter implements QueryFilterStepIf {
private final QueryParams params;
private final ForwardIndexReader forwardIndexReader;
private final boolean imposesMetaConstraint;
public ParamMatchingQueryFilter(QueryParams params,
ForwardIndexReader forwardIndexReader)
{
this.params = params;
this.forwardIndexReader = forwardIndexReader;
this.imposesMetaConstraint = params.imposesDomainMetadataConstraint();
}
@Override
@@ -261,12 +267,16 @@ class ParamMatchingQueryFilter implements QueryFilterStepIf {
long docId = UrlIdCodec.removeRank(combinedId);
int domainId = UrlIdCodec.getDomainId(docId);
long meta = forwardIndexReader.getDocMeta(docId);
if (!validateDomain(domainId, meta)) {
if (!validateDomain(domainId)) {
return false;
}
if (!imposesMetaConstraint) {
return true;
}
long meta = forwardIndexReader.getDocMeta(docId);
if (!validateQuality(meta)) {
return false;
}
@@ -286,8 +296,8 @@ class ParamMatchingQueryFilter implements QueryFilterStepIf {
return true;
}
private boolean validateDomain(int domainId, long meta) {
return params.searchSet().contains(domainId, meta);
private boolean validateDomain(int domainId) {
return params.searchSet().contains(domainId);
}
private boolean validateQuality(long meta) {

View File

@@ -35,6 +35,13 @@ public class StatefulIndex {
this.eventLog = eventLog;
}
/** For use in testing only */
public StatefulIndex(CombinedIndexReader combinedIndexReader) {
this.combinedIndexReader = combinedIndexReader;
this.servicesFactory = null;
this.eventLog = null;
}
public void init() {
Lock lock = indexReplacementLock.writeLock();

View File

@@ -1,8 +1,9 @@
package nu.marginalia.index.model;
import nu.marginalia.index.searchset.SearchSet;
import nu.marginalia.index.query.limit.QueryStrategy;
import nu.marginalia.index.query.limit.SpecificationLimit;
import nu.marginalia.index.query.limit.SpecificationLimitType;
import nu.marginalia.index.searchset.SearchSet;
import java.util.Objects;
@@ -41,6 +42,13 @@ public final class QueryParams {
this.queryStrategy = queryStrategy;
}
public boolean imposesDomainMetadataConstraint() {
return qualityLimit.type() != SpecificationLimitType.NONE
|| year.type() != SpecificationLimitType.NONE
|| size.type() != SpecificationLimitType.NONE
|| rank.type() != SpecificationLimitType.NONE;
}
public SpecificationLimit qualityLimit() {
return qualityLimit;
}

View File

@@ -0,0 +1,106 @@
package nu.marginalia.index.model;
import nu.marginalia.api.searchquery.RpcResultRankingParameters;
import nu.marginalia.api.searchquery.model.compiled.CompiledQuery;
import nu.marginalia.api.searchquery.model.compiled.CompiledQueryLong;
import nu.marginalia.api.searchquery.model.compiled.CqDataInt;
import nu.marginalia.api.searchquery.model.query.SearchQuery;
import nu.marginalia.index.index.CombinedIndexReader;
import java.util.BitSet;
public class ResultRankingContext {
private final int docCount;
public final RpcResultRankingParameters params;
public final SearchQuery searchQuery;
public final QueryParams queryParams;
public final CompiledQuery<String> compiledQuery;
public final CompiledQueryLong compiledQueryIds;
public final BitSet regularMask;
public final BitSet ngramsMask;
/** CqDataInt associated with frequency information of the terms in the query
* in the full index. The dataset is indexed by the compiled query. */
public final CqDataInt fullCounts;
/** CqDataInt associated with frequency information of the terms in the query
* in the full index. The dataset is indexed by the compiled query. */
public final CqDataInt priorityCounts;
public static ResultRankingContext create(CombinedIndexReader currentIndex, SearchParameters searchParameters) {
var compiledQueryIds = searchParameters.compiledQueryIds;
var compiledQuery = searchParameters.compiledQuery;
int[] full = new int[compiledQueryIds.size()];
int[] prio = new int[compiledQueryIds.size()];
BitSet ngramsMask = new BitSet(compiledQuery.size());
BitSet regularMask = new BitSet(compiledQuery.size());
for (int idx = 0; idx < compiledQueryIds.size(); idx++) {
long id = compiledQueryIds.at(idx);
full[idx] = currentIndex.numHits(id);
prio[idx] = currentIndex.numHitsPrio(id);
if (compiledQuery.at(idx).contains("_")) {
ngramsMask.set(idx);
}
else {
regularMask.set(idx);
}
}
return new ResultRankingContext(currentIndex.totalDocCount(),
searchParameters,
compiledQuery,
compiledQueryIds,
ngramsMask,
regularMask,
new CqDataInt(full),
new CqDataInt(prio));
}
public ResultRankingContext(int docCount,
SearchParameters searchParameters,
CompiledQuery<String> compiledQuery,
CompiledQueryLong compiledQueryIds,
BitSet ngramsMask,
BitSet regularMask,
CqDataInt fullCounts,
CqDataInt prioCounts)
{
this.docCount = docCount;
this.searchQuery = searchParameters.query;
this.params = searchParameters.rankingParams;
this.queryParams = searchParameters.queryParams;
this.compiledQuery = compiledQuery;
this.compiledQueryIds = compiledQueryIds;
this.ngramsMask = ngramsMask;
this.regularMask = regularMask;
this.fullCounts = fullCounts;
this.priorityCounts = prioCounts;
}
public int termFreqDocCount() {
return docCount;
}
@Override
public String toString() {
return "ResultRankingContext{" +
"docCount=" + docCount +
", params=" + params +
", regularMask=" + regularMask +
", ngramsMask=" + ngramsMask +
", fullCounts=" + fullCounts +
", priorityCounts=" + priorityCounts +
'}';
}
}

View File

@@ -2,7 +2,7 @@ package nu.marginalia.index.results;
import nu.marginalia.api.searchquery.model.compiled.CqDataInt;
import nu.marginalia.api.searchquery.model.compiled.CqExpression;
import nu.marginalia.api.searchquery.model.results.ResultRankingContext;
import nu.marginalia.index.model.ResultRankingContext;
import java.util.BitSet;
import java.util.List;

View File

@@ -12,12 +12,13 @@ import nu.marginalia.api.searchquery.model.compiled.CompiledQuery;
import nu.marginalia.api.searchquery.model.compiled.CqDataLong;
import nu.marginalia.api.searchquery.model.query.SearchPhraseConstraint;
import nu.marginalia.api.searchquery.model.query.SearchQuery;
import nu.marginalia.api.searchquery.model.results.ResultRankingContext;
import nu.marginalia.api.searchquery.model.results.SearchResultItem;
import nu.marginalia.api.searchquery.model.results.debug.DebugRankingFactors;
import nu.marginalia.index.ResultPriorityQueue;
import nu.marginalia.index.forward.spans.DocumentSpans;
import nu.marginalia.index.index.CombinedIndexReader;
import nu.marginalia.index.index.StatefulIndex;
import nu.marginalia.index.model.SearchParameters;
import nu.marginalia.index.model.ResultRankingContext;
import nu.marginalia.index.model.SearchTermsUtil;
import nu.marginalia.index.results.model.PhraseConstraintGroupList;
import nu.marginalia.index.results.model.QuerySearchTerms;
@@ -32,7 +33,10 @@ import org.slf4j.LoggerFactory;
import java.lang.foreign.Arena;
import java.sql.SQLException;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Singleton
public class IndexResultRankingService {
@@ -52,15 +56,15 @@ public class IndexResultRankingService {
this.domainRankingOverrides = domainRankingOverrides;
}
public List<SearchResultItem> rankResults(SearchParameters params,
boolean exportDebugData,
public List<SearchResultItem> rankResults(
ResultRankingContext rankingContext,
CombinedDocIdList resultIds)
CombinedDocIdList resultIds,
boolean exportDebugData)
{
if (resultIds.isEmpty())
return List.of();
IndexResultScoreCalculator resultRanker = new IndexResultScoreCalculator(statefulIndex, domainRankingOverrides, rankingContext, params);
IndexResultScoreCalculator resultRanker = new IndexResultScoreCalculator(statefulIndex, domainRankingOverrides, rankingContext);
List<SearchResultItem> results = new ArrayList<>(resultIds.size());
@@ -68,13 +72,11 @@ public class IndexResultRankingService {
// this may change during the calculation, but we don't want to switch over mid-calculation
final CombinedIndexReader currentIndex = statefulIndex.get();
final QuerySearchTerms searchTerms = getSearchTerms(params.compiledQuery, params.query);
final QuerySearchTerms searchTerms = getSearchTerms(rankingContext.compiledQuery, rankingContext.searchQuery);
final int termCount = searchTerms.termIdsAll.size();
// We use an arena for the position data to avoid gc pressure
// from the gamma coded sequences, which can be large and have a lifetime
// that matches the try block here
try (var arena = Arena.ofConfined()) {
// We use an arena for the position and spans data to limit gc pressure
try (var arena = Arena.ofShared()) {
TermMetadataList[] termsForDocs = new TermMetadataList[termCount];
for (int ti = 0; ti < termCount; ti++) {
@@ -87,6 +89,7 @@ public class IndexResultRankingService {
long[] flags = new long[termCount];
CodedSequence[] positions = new CodedSequence[termCount];
DocumentSpans[] documentSpans = currentIndex.getDocumentSpans(arena, resultIds);
// Iterate over documents by their index in the combinedDocIds, as we need the index for the
// term data arrays as well
@@ -109,14 +112,15 @@ public class IndexResultRankingService {
}
if (!exportDebugData) {
var score = resultRanker.calculateScore(arena, null, resultIds.at(i), searchTerms, flags, positions);
var score = resultRanker.calculateScore(null, resultIds.at(i), searchTerms, flags, positions, documentSpans[i]);
if (score != null) {
results.add(score);
}
}
else {
var rankingFactors = new DebugRankingFactors();
var score = resultRanker.calculateScore(arena, rankingFactors, resultIds.at(i), searchTerms, flags, positions);
var score = resultRanker.calculateScore( rankingFactors, resultIds.at(i), searchTerms, flags, positions, documentSpans[i]);
if (score != null) {
score.debugRankingFactors = rankingFactors;
results.add(score);
@@ -129,19 +133,20 @@ public class IndexResultRankingService {
}
public List<RpcDecoratedResultItem> selectBestResults(SearchParameters params,
public List<RpcDecoratedResultItem> selectBestResults(int limitByDomain,
int limitTotal,
ResultRankingContext resultRankingContext,
Collection<SearchResultItem> results) throws SQLException {
ResultPriorityQueue results) throws SQLException {
var domainCountFilter = new IndexResultDomainDeduplicator(params.limitByDomain);
var domainCountFilter = new IndexResultDomainDeduplicator(limitByDomain);
List<SearchResultItem> resultsList = new ArrayList<>(results.size());
TLongList idsList = new TLongArrayList(params.limitTotal);
TLongList idsList = new TLongArrayList(limitTotal);
for (var item : results) {
if (domainCountFilter.test(item)) {
if (resultsList.size() < params.limitTotal) {
if (resultsList.size() < limitTotal) {
resultsList.add(item);
idsList.add(item.getDocumentId());
}
@@ -159,7 +164,7 @@ public class IndexResultRankingService {
// for the selected results, as this would be comically expensive to do for all the results we
// discard along the way
if (params.rankingParams.getExportDebugData()) {
if (resultRankingContext.params.getExportDebugData()) {
var combinedIdsList = new LongArrayList(resultsList.size());
for (var item : resultsList) {
combinedIdsList.add(item.combinedId);
@@ -167,10 +172,9 @@ public class IndexResultRankingService {
resultsList.clear();
resultsList.addAll(this.rankResults(
params,
true,
resultRankingContext,
new CombinedDocIdList(combinedIdsList))
new CombinedDocIdList(combinedIdsList),
true)
);
}
@@ -247,7 +251,7 @@ public class IndexResultRankingService {
var termOutputs = RpcResultTermRankingOutputs.newBuilder();
CqDataLong termIds = params.compiledQueryIds.data;;
CqDataLong termIds = resultRankingContext.compiledQueryIds.data;
for (var entry : debugFactors.getTermFactors()) {
String term = "[ERROR IN LOOKUP]";
@@ -255,7 +259,7 @@ public class IndexResultRankingService {
// CURSED: This is a linear search, but the number of terms is small, and it's in a debug path
for (int i = 0; i < termIds.size(); i++) {
if (termIds.get(i) == entry.termId()) {
term = params.compiledQuery.at(i);
term = resultRankingContext.compiledQuery.at(i);
break;
}
}

View File

@@ -6,14 +6,13 @@ import nu.marginalia.api.searchquery.RpcResultRankingParameters;
import nu.marginalia.api.searchquery.RpcTemporalBias;
import nu.marginalia.api.searchquery.model.compiled.CompiledQuery;
import nu.marginalia.api.searchquery.model.compiled.CompiledQueryLong;
import nu.marginalia.api.searchquery.model.results.ResultRankingContext;
import nu.marginalia.api.searchquery.model.results.SearchResultItem;
import nu.marginalia.api.searchquery.model.results.debug.DebugRankingFactors;
import nu.marginalia.index.forward.spans.DocumentSpans;
import nu.marginalia.index.index.CombinedIndexReader;
import nu.marginalia.index.index.StatefulIndex;
import nu.marginalia.index.model.QueryParams;
import nu.marginalia.index.model.SearchParameters;
import nu.marginalia.index.model.ResultRankingContext;
import nu.marginalia.index.query.limit.QueryStrategy;
import nu.marginalia.index.results.model.PhraseConstraintGroupList;
import nu.marginalia.index.results.model.QuerySearchTerms;
@@ -28,7 +27,6 @@ import nu.marginalia.sequence.CodedSequence;
import nu.marginalia.sequence.SequenceOperations;
import javax.annotation.Nullable;
import java.lang.foreign.Arena;
import java.util.BitSet;
import static nu.marginalia.api.searchquery.model.compiled.aggregate.CompiledQueryAggregates.booleanAggregate;
@@ -47,24 +45,23 @@ public class IndexResultScoreCalculator {
public IndexResultScoreCalculator(StatefulIndex statefulIndex,
DomainRankingOverrides domainRankingOverrides,
ResultRankingContext rankingContext,
SearchParameters params)
ResultRankingContext rankingContext)
{
this.index = statefulIndex.get();
this.domainRankingOverrides = domainRankingOverrides;
this.rankingContext = rankingContext;
this.queryParams = params.queryParams;
this.compiledQuery = params.compiledQuery;
this.queryParams = rankingContext.queryParams;
this.compiledQuery = rankingContext.compiledQuery;
}
@Nullable
public SearchResultItem calculateScore(Arena arena,
@Nullable DebugRankingFactors debugRankingFactors,
public SearchResultItem calculateScore(@Nullable DebugRankingFactors debugRankingFactors,
long combinedId,
QuerySearchTerms searchTerms,
long[] wordFlags,
CodedSequence[] positions)
CodedSequence[] positions,
DocumentSpans spans)
{
CompiledQuery<CodedSequence> positionsQuery = compiledQuery.root.newQuery(positions);
@@ -92,8 +89,6 @@ public class IndexResultScoreCalculator {
int docSize = index.getDocumentSize(docId);
if (docSize <= 0) docSize = 5000;
DocumentSpans spans = index.getDocumentSpans(arena, docId);
if (debugRankingFactors != null) {
debugRankingFactors.addDocumentFactor("doc.docId", Long.toString(combinedId));
debugRankingFactors.addDocumentFactor("doc.combinedId", Long.toString(docId));
@@ -235,7 +230,7 @@ public class IndexResultScoreCalculator {
long result = 0;
int bit = 0;
IntIterator intersection = phraseConstraints.getFullGroup().findIntersections(positions).intIterator();
IntIterator intersection = phraseConstraints.getFullGroup().findIntersections(positions, 64).intIterator();
while (intersection.hasNext() && bit < 64) {
bit = (int) (Math.sqrt(intersection.nextInt()));

View File

@@ -3,7 +3,7 @@ package nu.marginalia.index.results;
import nu.marginalia.api.searchquery.model.compiled.CqDataInt;
import nu.marginalia.api.searchquery.model.compiled.CqDataLong;
import nu.marginalia.api.searchquery.model.compiled.CqExpression;
import nu.marginalia.api.searchquery.model.results.ResultRankingContext;
import nu.marginalia.index.model.ResultRankingContext;
import nu.marginalia.model.idx.WordFlags;
import java.util.List;

View File

@@ -58,6 +58,7 @@ public class PhraseConstraintGroupList {
private final int[] offsets;
private final BitSet present;
private final BitSet termIdsMask;
private final int presentCardinality;
public final int size;
public PhraseConstraintGroup(List<String> terms, TermIdList termIdsAll) {
@@ -85,6 +86,8 @@ public class PhraseConstraintGroupList {
termIdsMask.set(idx);
}
}
presentCardinality = present.cardinality();
}
/** Returns true if the term with index termIdx in the query is in the group */
@@ -93,7 +96,7 @@ public class PhraseConstraintGroupList {
}
public boolean test(CodedSequence[] positions) {
IntIterator[] sequences = new IntIterator[present.cardinality()];
IntIterator[] sequences = new IntIterator[presentCardinality];
for (int oi = 0, si = 0; oi < offsets.length; oi++) {
if (!present.get(oi)) {
@@ -120,7 +123,7 @@ public class PhraseConstraintGroupList {
public IntList findIntersections(IntList[] positions) {
IntList[] sequences = new IntList[present.cardinality()];
IntList[] sequences = new IntList[presentCardinality];
int[] iterOffsets = new int[sequences.length];
for (int oi = 0, si = 0; oi < offsets.length; oi++) {
@@ -144,12 +147,41 @@ public class PhraseConstraintGroupList {
iterOffsets[si - 1] = -oi;
}
return SequenceOperations.findIntersections(sequences, iterOffsets);
return SequenceOperations.findIntersections(sequences, iterOffsets, Integer.MAX_VALUE);
}
public IntList findIntersections(IntList[] positions, int n) {
IntList[] sequences = new IntList[presentCardinality];
int[] iterOffsets = new int[sequences.length];
for (int oi = 0, si = 0; oi < offsets.length; oi++) {
if (!present.get(oi)) {
continue;
}
int offset = offsets[oi];
if (offset < 0)
return IntList.of();
// Create iterators that are offset by their relative position in the
// sequence. This is done by subtracting the index from the offset,
// so that when we intersect them, an overlap means that the terms are
// in the correct order. Note the offset is negative!
var posForTerm = positions[offset];
if (posForTerm == null) {
return IntList.of();
}
sequences[si++] = posForTerm;
iterOffsets[si - 1] = -oi;
}
return SequenceOperations.findIntersections(sequences, iterOffsets, n);
}
public int minDistance(IntList[] positions) {
List<IntList> sequences = new ArrayList<>(present.cardinality());
IntList iterOffsets = new IntArrayList(present.cardinality());
List<IntList> sequences = new ArrayList<>(presentCardinality);
IntList iterOffsets = new IntArrayList(presentCardinality);
for (int oi = 0; oi < offsets.length; oi++) {
if (!present.get(oi)) {

View File

@@ -1,6 +1,7 @@
package nu.marginalia.index.results.model.ids;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import nu.marginalia.array.page.LongQueryBuffer;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import java.util.Arrays;
@@ -17,7 +18,9 @@ public final class CombinedDocIdList {
public CombinedDocIdList(long... data) {
this.data = Arrays.copyOf(data, data.length);
}
public CombinedDocIdList(LongQueryBuffer buffer) {
this.data = buffer.copyData();
}
public CombinedDocIdList(LongArrayList data) {
this.data = data.toLongArray();
}

View File

@@ -59,7 +59,7 @@ public class RankingSearchSet implements SearchSet {
}
@Override
public boolean contains(int domainId, long documentMetadata) {
public boolean contains(int domainId) {
// This is the main check
if (set.contains(domainId) || set.isEmpty()) {

View File

@@ -7,6 +7,6 @@ public interface SearchSet {
* or if the documentMetadata vibes with the set
*
*/
boolean contains(int domainId, long documentMetadata);
boolean contains(int domainId);
}

View File

@@ -2,7 +2,7 @@ package nu.marginalia.index.searchset;
public class SearchSetAny implements SearchSet {
@Override
public boolean contains(int domainId, long meta) {
public boolean contains(int domainId) {
return true;
}

View File

@@ -14,7 +14,7 @@ public class SmallSearchSet implements SearchSet {
}
@Override
public boolean contains(int domainId, long meta) {
public boolean contains(int domainId) {
return entries.contains(domainId);
}

View File

@@ -10,5 +10,5 @@ public class IndexSearchBudget {
}
public boolean hasTimeLeft() { return System.currentTimeMillis() < timeout; }
public long timeLeft() { return timeout - System.currentTimeMillis(); }
public long timeLeft() { return Math.max(0, timeout - System.currentTimeMillis()); }
}

View File

@@ -25,10 +25,10 @@ class RankingSearchSetTest {
set.write();
RankingSearchSet set2 = new RankingSearchSet("ACADEMIA", p);
assertTrue(set2.contains(1, 0));
assertTrue(set2.contains(5, 0));
assertTrue(set2.contains(7, 0));
assertTrue(set2.contains(9, 0));
assertTrue(set2.contains(1));
assertTrue(set2.contains(5));
assertTrue(set2.contains(7));
assertTrue(set2.contains(9));
Files.delete(p);

View File

@@ -56,7 +56,7 @@ public class SequenceOperations {
* <p></p>
*/
public static IntList findIntersections(IntList... positions) {
return findIntersections(positions, new int[positions.length]);
return findIntersections(positions, new int[positions.length], Integer.MAX_VALUE);
}
/** Find any intersections between the given positions lists, and return the list of intersections.
@@ -67,53 +67,80 @@ public class SequenceOperations {
* @param positions the positions lists to compare - each list must be sorted in ascending order
* and contain unique values.
* @param offsets constant offsets to apply to each position
* @param n maximum number of intersections we're interested in. The algorithm does not guarantee
* the return value will have a smaller size than this if it is cheaper to return back e.g.
* an input list.
* */
public static IntList findIntersections(IntList[] positions, int[] offsets) {
public static IntList findIntersections(IntList[] positions, int[] offsets, int n) {
if (positions.length < 1)
// Trivial cases
if (positions.length < 1) { // n = 0
return IntList.of();
}
// else if (positions.length == 1) { // n = 1
// if (offsets[0] == 0) { // with zero offset, we'll just return the input back
// return positions[0];
// }
//
// // Calculate an offset input array
// IntList ret = new IntArrayList(positions[0].size());
// for (int i = 0; i < positions[0].size() && i < n; i++) {
// ret.add(positions[0].getInt(i) + offsets[0]);
// }
// return ret;
// }
int[] indexes = new int[positions.length];
// Initialize values and find the maximum value
int[] values = new int[positions.length];
int minLength = Integer.MAX_VALUE;
int largestValue = Integer.MAX_VALUE;
for (int i = 0; i < positions.length; i++) {
minLength = Math.min(minLength, positions[i].size());
if (indexes[i] < positions[i].size())
values[i] = positions[i].getInt(indexes[i]++) + offsets[i];
else
return IntList.of();
largestValue = Math.min(largestValue, positions[i].getInt(positions[i].size() - 1) + offsets[i]);
}
// Intersect the sequences by advancing all values smaller than the maximum seen so far
// until they are equal to the maximum value, or until the end of the sequence is reached
int max = Integer.MIN_VALUE;
int successes = 0;
int currentMax = Integer.MIN_VALUE;
IntList ret = new IntArrayList();
int listMatches = 0;
int foundIntersections = 0;
IntList ret = new IntArrayList(Math.min(n, Math.max(1, minLength)));
outer:
for (int i = 0;; i = (i + 1) % positions.length)
for (int i = 0; currentMax <= largestValue; i = (i + 1) % positions.length)
{
if (successes == positions.length) {
ret.add(max);
successes = 1;
if (listMatches == positions.length) {
ret.add(currentMax);
if (++foundIntersections > n) return ret;
listMatches = 1;
if (indexes[i] < positions[i].size()) {
values[i] = positions[i].getInt(indexes[i]++) + offsets[i];
// Update the maximum value, if necessary
max = Math.max(max, values[i]);
currentMax = Math.max(currentMax, values[i]);
} else {
break;
}
} else if (values[i] == max) {
successes++;
} else if (values[i] == currentMax) {
listMatches++;
} else {
successes = 1;
listMatches = 1;
// Discard values until we reach the maximum value seen so far,
// or until the end of the sequence is reached
while (values[i] < max) {
while (values[i] < currentMax) {
if (indexes[i] < positions[i].size()) {
values[i] = positions[i].getInt(indexes[i]++) + offsets[i];
} else {
@@ -122,14 +149,13 @@ public class SequenceOperations {
}
// Update the maximum value, if necessary
max = Math.max(max, values[i]);
currentMax = Math.max(currentMax, values[i]);
}
}
return ret;
}
/** Given each set of positions, one from each list, find the set with the smallest distance between them
* and return that distance. If any of the lists are empty, return 0.
* */
@@ -146,10 +172,14 @@ public class SequenceOperations {
public static int minDistance(IntList[] positions, int[] offsets) {
if (positions.length <= 1)
return 0;
if (positions.length == 1)
return 0;
int[] values = new int[positions.length];
int[] indexes = new int[positions.length];
int largestValue = 0;
for (int i = 0; i < positions.length; i++) {
// if any of the lists are empty, return MAX_VALUE
@@ -158,6 +188,7 @@ public class SequenceOperations {
}
values[i] = positions[i].getInt(indexes[i]++) + offsets[i];
largestValue = Math.min(largestValue, positions[i].getInt(positions[i].size() - 1) + offsets[i]);
}
int minDist = Integer.MAX_VALUE;
@@ -173,7 +204,7 @@ public class SequenceOperations {
}
}
for (;;) {
do {
// For all the other indexes except maxI, update values[] with the largest value smaller than maxVal
for (int idx = 0; idx < positions.length - 1; idx++) {
int i = (maxI + idx) % positions.length;
@@ -228,6 +259,8 @@ public class SequenceOperations {
else {
return minDist;
}
}
} while (maxVal <= largestValue);
return minDist;
}
}

View File

@@ -2,31 +2,38 @@ package nu.marginalia.bench;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import nu.marginalia.sequence.GammaCodedSequence;
import nu.marginalia.sequence.VarintCodedSequence;
import nu.marginalia.sequence.SequenceOperations;
import org.openjdk.jmh.annotations.*;
import java.nio.ByteBuffer;
import java.util.Random;
public class SequenceBenchmarks {
@State(Scope.Benchmark)
public static class SequenceState {
VarintCodedSequence vcs;
GammaCodedSequence gcs;
IntList list;
ByteBuffer workArea;
int[] arrayValues;
int[] valueBuffer;
public SequenceState()
{
valueBuffer = new int[128];
IntList a;
IntList b;
IntList c;
workArea = ByteBuffer.allocate(65536);
arrayValues = new int[] { 1, 3, 5, 16, 1024, 2048, 4096, 4098, 4100 };
list = new IntArrayList(arrayValues);
vcs = VarintCodedSequence.generate(16,21,24,28,66,71,76,83,87,98,101,106,113,115,119,122,143,148,159,164,167,177,182,211,223,242,245,250,273,275,280,289,292,300,307,322,330,338,345,371,397,402,411,420,427,430,433,437,440,448,451,481,490,513,522,555,571,573,585,597,606,613,634,638,640,644,656,660,666,683,689,692,696,709,712,718,727,731,735,738);
gcs = GammaCodedSequence.generate(workArea, 16,21,24,28,66,71,76,83,87,98,101,106,113,115,119,122,143,148,159,164,167,177,182,211,223,242,245,250,273,275,280,289,292,300,307,322,330,338,345,371,397,402,411,420,427,430,433,437,440,448,451,481,490,513,522,555,571,573,585,597,606,613,634,638,640,644,656,660,666,683,689,692,696,709,712,718,727,731,735,738);
public SequenceState() {
a = new IntArrayList();
b = new IntArrayList();
c = new IntArrayList();
var r = new Random(1000);
for (int i = 0; i < 10; i++) {
b.add(r.nextInt(0, 5000));
}
for (int i = 0; i < 100; i++) {
c.add(r.nextInt(0, 5000));
}
for (int i = 0; i < 1000; i++) {
a.add(r.nextInt(0, 5000));
}
}
}
@@ -34,57 +41,17 @@ public class SequenceBenchmarks {
@Warmup(iterations = 1)
@Benchmark
@BenchmarkMode(Mode.Throughput)
public int vcsDecode(SequenceState state) {
var iter = state.vcs.iterator();
int sum = 0;
while (iter.hasNext()) {
sum += iter.nextInt();
public IntList intersect(SequenceState state) {
return SequenceOperations.findIntersections(state.a, state.b, state.c);
}
return sum;
}
//
// @Fork(value = 5, warmups = 5)
// @Warmup(iterations = 5)
// @Benchmark
// @BenchmarkMode(Mode.Throughput)
// public int listDecode2(SequenceState state) {
// var list = state.arrayValues;
// int sum = 0;
// for (int i = 0; i < list.length; i++) {
// sum += list[i];
// }
// return sum;
// }
@Fork(value = 1, warmups = 1)
@Warmup(iterations = 1)
@Benchmark
@BenchmarkMode(Mode.Throughput)
public int gcsDecode(SequenceState state) {
var iter = state.gcs.iterator();
int sum = 0;
while (iter.hasNext()) {
sum += iter.nextInt();
public IntList intersect1(SequenceState state) {
return SequenceOperations.findIntersections(state.a);
}
return sum;
}
// @Fork(value = 1, warmups = 1)
// @Warmup(iterations = 1)
// @Benchmark
// @BenchmarkMode(Mode.Throughput)
// public VarintCodedSequence vcsEncode(SequenceState state) {
// return VarintCodedSequence.generate(1, 3, 5, 16, 1024, 2048, 4096, 4098, 4100);
// }
// @Fork(value = 1, warmups = 1)
// @Warmup(iterations = 1)
// @Benchmark
// @BenchmarkMode(Mode.Throughput)
// public GammaCodedSequence gcsEncode(SequenceState state) {
// return GammaCodedSequence.generate(state.workArea, 1, 3, 5, 16, 1024, 2048, 4096, 4098, 4100);
// }
}

View File

@@ -90,6 +90,7 @@ dependencies {
implementation libs.commons.lang3
implementation libs.commons.compress
implementation libs.sqlite
implementation libs.bundles.grpc
implementation libs.bundles.httpcomponents

View File

@@ -22,6 +22,7 @@ dependencies {
implementation libs.bundles.slf4j
implementation libs.guava
implementation libs.zstd
implementation dependencies.create(libs.guice.get()) {
exclude group: 'com.google.guava'
}

View File

@@ -1,5 +1,6 @@
package nu.marginalia.domclassifier;
import com.github.luben.zstd.ZstdInputStream;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.api.domsample.RpcDomainSample;
@@ -19,6 +20,7 @@ import javax.xml.parsers.ParserConfigurationException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.function.Predicate;
import java.util.regex.Pattern;
@@ -84,8 +86,9 @@ public class DomSampleClassifier {
EdgeDomain sampleDomain = new EdgeDomain(sample.getDomainName());
try {
var parsedDoc = Jsoup.parse(sample.getHtmlSample());
try (var compressedStream = new ZstdInputStream(sample.getHtmlSampleZstd().newInput())) {
String html = new String(compressedStream.readAllBytes(), StandardCharsets.UTF_8);
var parsedDoc = Jsoup.parse(html);
var fixedElements = parsedDoc.select("*[data-position=fixed]");
if (sample.getAcceptedPopover()) {
@@ -104,7 +107,7 @@ public class DomSampleClassifier {
}
}
catch (Exception ex) {
logger.warn("Error when parsing DOM HTML sample");
logger.warn("Error when parsing DOM HTML sample", ex);
}
// Classify outgoing requests

View File

@@ -15,6 +15,7 @@
<classifier target="url-regex" rule="tracking">/ccm/collect$</classifier>
<classifier target="url-regex" rule="tracking">^/[0-9]+\.js$</classifier>
<classifier target="url-regex" rule="tracking">^/[a-z0-9]\.gif$</classifier>
<classifier target="url-regex" rule="tracking">^/pixel\.gif$</classifier>
<classifier target="url-regex" rule="ads">/pagead/</classifier>
<classifier target="url-regex" rule="ads">/google-ads/</classifier>

View File

@@ -12,4 +12,5 @@ class DDGTrackerDataTest {
data.getDomainInfo("hotjar.com").ifPresent(System.out::println);
data.getAllClassifications().forEach(System.out::println);
}
}

View File

@@ -11,7 +11,6 @@ import nu.marginalia.sequence.VarintCodedSequence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.*;
public class DocumentKeywordsBuilder {
@@ -36,7 +35,7 @@ public class DocumentKeywordsBuilder {
this(1600);
}
public DocumentKeywords build(ByteBuffer workArea) {
public DocumentKeywords build() {
final List<String> wordArray = new ArrayList<>(wordToMeta.size());
final TByteArrayList meta = new TByteArrayList(wordToMeta.size());
final List<VarintCodedSequence> positions = new ArrayList<>(wordToMeta.size());

View File

@@ -13,7 +13,6 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
@@ -56,7 +55,7 @@ class DocumentKeywordExtractorTest {
new LinkTexts(), new EdgeUrl("https://encyclopedia.marginalia.nu/article/Don't_Tell_Me_(Madonna_song)")
);
var keywordsBuilt = keywords.build(ByteBuffer.allocate(1024));
var keywordsBuilt = keywords.build();
Map<String, Byte> flags = new HashMap<>();
Map<String, CodedSequence> positions = new HashMap<>();

View File

@@ -1,6 +1,8 @@
package nu.marginalia.converting.processor;
import com.google.inject.Inject;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import nu.marginalia.api.domsample.DomSampleClient;
import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.atags.source.AnchorTagsSource;
@@ -98,10 +100,16 @@ public class DomainProcessor {
return domSampleClient
.getSampleAsync(domainName, domSampleExecutor)
.thenApply(domSampleClassifier::classifySample)
.handle((a,b) ->
Objects.requireNonNullElseGet(a,
() -> EnumSet.of(DomSampleClassification.UNCLASSIFIED)))
.get();
.handle((a,b) -> {
if (b != null) {
var cause = b.getCause();
if (!(cause instanceof StatusRuntimeException sre && sre.getStatus() != Status.NOT_FOUND)) {
logger.warn("Exception when fetching sample data", b);
}
return EnumSet.of(DomSampleClassification.UNCLASSIFIED);
}
return a;
}).get();
}
@Nullable

View File

@@ -161,7 +161,6 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
final Set<HtmlFeature> features = featureExtractor.getFeatures(url, doc, documentHeaders, dld);
if (!documentLengthLogic.validateLength(dld, specialization.lengthModifier() * documentClass.lengthLimitModifier())) {
features.add(HtmlFeature.SHORT_DOCUMENT);
}

View File

@@ -115,7 +115,9 @@ public class PdfDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
ret.quality = -5;
ret.features = Set.of(HtmlFeature.PDF);
ret.features = new HashSet<>(); // must be mutable!
ret.features.add(HtmlFeature.PDF);
ret.description = getDescription(doc);
ret.hashCode = dld.localitySensitiveHashCode();

View File

@@ -16,7 +16,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
@@ -94,8 +93,6 @@ public class ConverterBatchWriter implements AutoCloseable, ConverterBatchWriter
String domainName = domain.toString();
ByteBuffer workArea = ByteBuffer.allocate(16384);
while (documentIterator.hasNext()) {
var document = documentIterator.next();
@@ -103,7 +100,7 @@ public class ConverterBatchWriter implements AutoCloseable, ConverterBatchWriter
continue;
}
var wb = document.words.build(workArea);
var wb = document.words.build();
List<VarintCodedSequence> spanSequences = new ArrayList<>(wb.spans.size());
byte[] spanCodes = new byte[wb.spans.size()];

View File

@@ -13,13 +13,13 @@ A map of the most important components and how they relate can be found below.
![image](../doc/diagram/conceptual-overview.svg)
The core part of the search engine is the index service, which is responsible for storing and retrieving
the document data. The index serive is partitioned, along with the executor service, which is responsible for executing
processes. At least one instance of each service must be run, but more can be run
the document data. The index service is partitioned and is responsible for both index lookups and spawning
per-partition processing tasks. At least one instance of each service must be run, but more can be run
alongside. Multiple partitions is desirable in production to distribute load across multiple physical drives,
as well as reducing the impact of downtime.
Search queries are delegated via the query service, which is a proxy that fans out the query to all
eligible index services. The control service is responsible for distributing commands to the executor
eligible index services. The control service is responsible for distributing commands to the partitions
service, and for monitoring the health of the system. It also offers a web interface for operating the system.
### Services
@@ -32,7 +32,6 @@ service, and for monitoring the health of the system. It also offers a web inte
* [index](services-core/index-service)
* Exposes the [index](index) subsystem
* Exposes the [functions/link-graph](functions/link-graph) subsystem
* [executor](services-core/executor-service)
* Exposes the [execution](execution) subsystem
* [assistant](services-core/assistant-service)
* Exposes the [functions/math](functions/math) subsystem
@@ -57,7 +56,7 @@ Services that expose HTTP endpoints tend to have more code. They are marked wit
### Processes
Processes are batch jobs that deal with data retrieval, processing and loading. These are spawned and orchestrated by
the executor service, which is controlled by the control service.
the index service, which is controlled by the control service.
* [processes](processes/)
* [crawling-process](processes/crawling-process)

View File

@@ -11,7 +11,7 @@ import nu.marginalia.api.domains.DomainInfoClient;
import nu.marginalia.api.domains.model.DomainInformation;
import nu.marginalia.api.domains.model.SimilarDomain;
import nu.marginalia.api.domsample.DomSampleClient;
import nu.marginalia.api.domsample.RpcDomainSample;
import nu.marginalia.api.domsample.RpcDomainSampleRequests;
import nu.marginalia.api.domsample.RpcOutgoingRequest;
import nu.marginalia.api.feeds.FeedsClient;
import nu.marginalia.api.feeds.RpcFeed;
@@ -399,7 +399,7 @@ public class SearchSiteInfoService {
return forServiceUnavailable(domainName);
}
Optional<RpcDomainSample> sample = domSampleClient.getSample(domainName.toLowerCase());
Optional<RpcDomainSampleRequests> sample = domSampleClient.getSampleRequests(domainName.toLowerCase());
if (sample.isEmpty()) {
return forNoData(domainName);
}

View File

@@ -38,8 +38,8 @@
<a href="https://old-search.marginalia.nu/" class="underline text-liteblue dark:text-blue-200">here</a>.
</div>
</div>
<div class="mx-auto flex flex-col sm:flex-row my-4 sm:space-x-2 space-y-2 sm:space-y-0 w-full md:w-auto px-2 items-center sm:items-stretch">
<div class="flex flex-col border border-gray-300 dark:border-gray-600 rounded overflow-hidden dark:bg-gray-800 bg-white p-8 sm:p-4 space-y-3 w-96 sm:w-64">
<div class="mx-auto px-8 flex flex-col sm:flex-row my-4 sm:space-x-2 space-y-2 sm:space-y-0 w-full md:w-auto items-center sm:items-stretch">
<div class="flex flex-col items-center border border-gray-300 dark:border-gray-600 rounded overflow-hidden dark:bg-gray-800 bg-white p-8 sm:p-4 space-y-3 w-[300px] sm:w-64">
<div><i class="fas fa-sailboat mx-2 text-margeblue dark:text-slate-200"></i>Explore the Web</div>
<ul class="list-disc ml-8 sm:ml-6 text-slate-700 dark:text-white text-xs leading-5">
<li>Prioritizes non-commercial content</li>
@@ -48,14 +48,14 @@
</ul>
</div>
<div class="flex flex-col border border-gray-300 dark:border-gray-600 rounded overflow-hidden dark:bg-gray-800 bg-white p-8 sm:p-4 space-y-3 w-96 sm:w-64">
<div class="flex flex-col items-center border border-gray-300 dark:border-gray-600 rounded overflow-hidden dark:bg-gray-800 bg-white p-8 sm:p-4 space-y-3 w-[300px] sm:w-64">
<div><i class="fas fa-hand-holding-hand mx-2 text-margeblue dark:text-slate-200"></i>Open Source</div>
<ul class="list-disc ml-8 sm:ml-6 text-slate-700 dark:text-white text-xs leading-5">
<li>Custom index and crawler software</li>
<li>Simple technology, no AI</li>
<li>AGPL license</li>
</ul>
<div class="flex pt-4 gap-2">
<div class="flex pt-4 gap-2 flex-col md:flex-row">
<div class="text-xs text-liteblue dark:text-blue-200">
<i class="fa-brands fa-github"></i>
<a href="https://git.marginalia.nu/" class="underline">Git Repository</a>
@@ -67,7 +67,7 @@
</div>
</div>
<div class="flex flex-col border border-gray-300 dark:border-gray-600 rounded overflow-hidden dark:bg-gray-800 bg-white p-8 sm:p-4 space-y-3 w-96 sm:w-64">
<div class="flex flex-col items-center border border-gray-300 dark:border-gray-600 rounded overflow-hidden dark:bg-gray-800 bg-white p-8 sm:p-4 space-y-3 w-[300px] sm:w-64">
<div><i class="fas fa-lock mx-2 text-margeblue dark:text-slate-200"></i> Privacy by default</div>
<ul class="list-disc ml-8 sm:ml-6 text-slate-700 dark:text-white text-xs leading-5">
<li>Filter out tracking </li>

View File

@@ -2,7 +2,7 @@ package nu.marginalia.control.node.model;
import nu.marginalia.nodecfg.model.NodeConfiguration;
public record IndexNodeStatus(NodeConfiguration configuration, boolean indexServiceOnline, boolean executorServiceOnline) {
public record IndexNodeStatus(NodeConfiguration configuration, boolean indexServiceOnline) {
public int id() {
return configuration.node();
}

View File

@@ -338,7 +338,7 @@ public class ControlNodeService {
}
private List<EventLogEntry> getEvents(int nodeId) {
List<String> services = List.of(ServiceId.Index.serviceName +":"+nodeId, ServiceId.Executor.serviceName +":"+nodeId);
List<String> services = List.of(ServiceId.Index.serviceName +":"+nodeId);
List<EventLogEntry> events = new ArrayList<>(20);
for (var service :services) {
events.addAll(eventLogService.getLastEntriesForService(service, Long.MAX_VALUE, 10));
@@ -358,8 +358,7 @@ public class ControlNodeService {
public IndexNodeStatus getStatus(NodeConfiguration config) {
return new IndexNodeStatus(config,
monitors.isServiceUp(ServiceId.Index, config.node()),
monitors.isServiceUp(ServiceId.Executor, config.node())
monitors.isServiceUp(ServiceId.Index, config.node())
);
}

View File

@@ -2,7 +2,7 @@
<h2>Nodes</h2>
<table class="table">
<tr>
<th>Node</th><th>Profile</th><th>Queries</th><th>Enabled</th><th>Index</th><th>Executor</th>
<th>Node</th><th>Profile</th><th>Queries</th><th>Enabled</th><th>Index</th>
</tr>
{{#each .}}
<tr>
@@ -24,9 +24,6 @@
</td>
{{#if indexServiceOnline}}<td>Online</td>{{/if}}
{{#unless indexServiceOnline}}<td class="table-danger">Offline</td>{{/unless}}
{{#if executorServiceOnline}}<td>Online</td>{{/if}}
{{#unless executorServiceOnline}}<td class="table-warning">Offline</td>{{/unless}}
</tr>
{{/each}}
</table>

View File

@@ -1,100 +0,0 @@
plugins {
id 'java'
id 'application'
id 'jvm-test-suite'
id 'com.google.cloud.tools.jib' version '3.4.5'
}
application {
mainClass = 'nu.marginalia.executor.ExecutorMain'
applicationName = 'executor-service'
}
tasks.distZip.enabled = false
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
}
}
apply from: "$rootProject.projectDir/srcsets.gradle"
apply from: "$rootProject.projectDir/docker.gradle"
dependencies {
// These look weird but they're needed to be able to spawn the processes
// from the executor service
implementation project(':code:processes:crawling-process')
implementation project(':code:processes:loading-process')
implementation project(':code:processes:converting-process')
implementation project(':code:processes:index-constructor-process')
implementation project(':code:common:config')
implementation project(':code:common:model')
implementation project(':code:common:db')
implementation project(':code:common:linkdb')
implementation project(':code:common:service')
implementation project(':third-party:commons-codec')
implementation project(':code:libraries:message-queue')
implementation project(':code:functions:link-graph:api')
implementation project(':code:functions:favicon')
implementation project(':code:functions:favicon:api')
implementation project(':code:functions:nsfw-domain-filter')
implementation project(':code:processes:crawling-process:model')
implementation project(':code:processes:crawling-process:model')
implementation project(':code:processes:crawling-process:ft-link-parser')
implementation project(':code:index:index-journal')
implementation project(':code:index:api')
implementation project(':code:processes:process-mq-api')
implementation project(':code:execution')
implementation project(':code:execution:api')
implementation project(':third-party:encyclopedia-marginalia-nu')
implementation libs.bundles.slf4j
implementation dependencies.create(libs.spark.get()) {
exclude group: 'org.eclipse.jetty'
}
implementation libs.bundles.jetty
implementation libs.guava
libs.bundles.grpc.get().each {
implementation dependencies.create(it) {
exclude group: 'com.google.guava'
}
}
implementation libs.gson
implementation libs.prometheus
implementation libs.notnull
implementation libs.guava
implementation dependencies.create(libs.guice.get()) {
exclude group: 'com.google.guava'
}
implementation libs.trove
implementation libs.zstd
implementation libs.jsoup
implementation libs.commons.io
implementation libs.commons.compress
implementation libs.commons.lang3
implementation libs.bundles.mariadb
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4')
testImplementation libs.commons.codec
testImplementation 'org.testcontainers:mariadb:1.17.4'
testImplementation 'org.testcontainers:junit-jupiter:1.17.4'
testImplementation project(':code:libraries:test-helpers')
}

View File

@@ -1,45 +0,0 @@
package nu.marginalia.executor;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.nsfw.NsfwFilterModule;
import nu.marginalia.service.MainClass;
import nu.marginalia.service.ServiceId;
import nu.marginalia.service.discovery.ServiceRegistryIf;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.service.module.ServiceConfigurationModule;
import nu.marginalia.service.module.ServiceDiscoveryModule;
import nu.marginalia.service.server.Initialization;
import nu.marginalia.service.server.NodeStatusWatcher;
public class ExecutorMain extends MainClass {
private final ExecutorSvc service;
@Inject
public ExecutorMain(ExecutorSvc service) {
this.service = service;
}
public static void main(String... args) {
init(ServiceId.Executor, args);
Injector injector = Guice.createInjector(
new ExecutorModule(),
new DatabaseModule(false),
new NsfwFilterModule(),
new ServiceDiscoveryModule(),
new ServiceConfigurationModule(ServiceId.Executor)
);
// Orchestrate the boot order for the services
var registry = injector.getInstance(ServiceRegistryIf.class);
var configuration = injector.getInstance(ServiceConfiguration.class);
orchestrateBoot(registry, configuration);
injector.getInstance(NodeStatusWatcher.class);
injector.getInstance(ExecutorMain.class);
injector.getInstance(Initialization.class).setReady();
}
}

View File

@@ -1,8 +0,0 @@
package nu.marginalia.executor;
import com.google.inject.AbstractModule;
public class ExecutorModule extends AbstractModule {
public void configure() {
}
}

View File

@@ -1,55 +0,0 @@
package nu.marginalia.executor;
import com.google.inject.Inject;
import nu.marginalia.execution.*;
import nu.marginalia.functions.favicon.FaviconGrpcService;
import nu.marginalia.service.discovery.property.ServicePartition;
import nu.marginalia.service.server.BaseServiceParams;
import nu.marginalia.service.server.SparkService;
import nu.marginalia.service.server.mq.MqRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.Spark;
import java.util.List;
// Weird name for this one to not have clashes with java.util.concurrent.ExecutorService
public class ExecutorSvc extends SparkService {
private static final Logger logger = LoggerFactory.getLogger(ExecutorSvc.class);
private final ExecutionInit executionInit;
@Inject
public ExecutorSvc(BaseServiceParams params,
ExecutorGrpcService executorGrpcService,
ExecutorCrawlGrpcService executorCrawlGrpcService,
ExecutorSideloadGrpcService executorSideloadGrpcService,
ExecutorExportGrpcService executorExportGrpcService,
FaviconGrpcService faviconGrpcService,
ExecutionInit executionInit,
ExecutorFileTransferService fileTransferService) throws Exception {
super(params,
ServicePartition.partition(params.configuration.node()),
List.of(executorGrpcService,
executorCrawlGrpcService,
executorSideloadGrpcService,
executorExportGrpcService,
faviconGrpcService)
);
this.executionInit = executionInit;
Spark.get("/transfer/file/:fid", fileTransferService::transferFile);
Spark.head("/transfer/file/:fid", fileTransferService::transferFile);
}
@MqRequest(endpoint="FIRST-BOOT")
public void setUpDefaultActors(String message) throws Exception {
logger.info("Initializing default actors");
executionInit.initDefaultActors();
}
}

View File

@@ -1,10 +0,0 @@
The executor service is a partitioned service responsible for executing and keeping
track of long-running maintenance and operational tasks, such as crawling or data
processing.
The executor service is closely linked to the [control-service](../control-service),
which provides a user interface for much of the executor's functionality.
The service it itself relatively bare of code, but imports and exposes the [execution subsystem](../../execution),
which is responsible for the actual execution of tasks.

View File

@@ -30,10 +30,16 @@ dependencies {
implementation project(':code:common:db')
implementation project(':code:common:linkdb')
implementation project(':code:execution')
implementation project(':code:execution:api')
implementation project(':code:functions:favicon')
implementation project(':code:functions:favicon:api')
implementation project(':code:index')
implementation project(':code:functions:link-graph:partition')
implementation project(':code:functions:link-graph:api')
implementation project(':code:functions:search-query:api')
implementation project(':code:functions:nsfw-domain-filter')
implementation project(':code:index:api')
testImplementation project(path: ':code:services-core:control-service')

View File

@@ -3,13 +3,14 @@ package nu.marginalia.index;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.nsfw.NsfwFilterModule;
import nu.marginalia.service.MainClass;
import nu.marginalia.service.discovery.ServiceRegistryIf;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.service.module.ServiceDiscoveryModule;
import nu.marginalia.service.ServiceId;
import nu.marginalia.service.module.ServiceConfigurationModule;
import nu.marginalia.service.discovery.ServiceRegistryIf;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.service.module.ServiceConfigurationModule;
import nu.marginalia.service.module.ServiceDiscoveryModule;
import nu.marginalia.service.server.Initialization;
import nu.marginalia.service.server.NodeStatusWatcher;
@@ -28,6 +29,7 @@ public class IndexMain extends MainClass {
new IndexModule(),
new DatabaseModule(false),
new ServiceDiscoveryModule(),
new NsfwFilterModule(),
new ServiceConfigurationModule(ServiceId.Index)
);

View File

@@ -2,6 +2,8 @@ package nu.marginalia.index;
import com.google.inject.Inject;
import nu.marginalia.IndexLocations;
import nu.marginalia.execution.*;
import nu.marginalia.functions.favicon.FaviconGrpcService;
import nu.marginalia.index.api.IndexMqEndpoints;
import nu.marginalia.index.index.StatefulIndex;
import nu.marginalia.linkdb.docs.DocumentDbReader;
@@ -14,9 +16,11 @@ import nu.marginalia.service.server.Initialization;
import nu.marginalia.service.server.SparkService;
import nu.marginalia.service.server.mq.MqRequest;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.svc.ExecutorFileTransferService;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.Spark;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -38,6 +42,7 @@ public class IndexService extends SparkService {
private final DomainLinks domainLinks;
private final ServiceEventLog eventLog;
private final ExecutionInit executionInit;
@Inject
public IndexService(BaseServiceParams params,
@@ -48,13 +53,25 @@ public class IndexService extends SparkService {
DocumentDbReader documentDbReader,
DomainLinks domainLinks,
PartitionLinkGraphService partitionLinkGraphService,
ExecutorGrpcService executorGrpcService,
ExecutorCrawlGrpcService executorCrawlGrpcService,
ExecutorSideloadGrpcService executorSideloadGrpcService,
ExecutorExportGrpcService executorExportGrpcService,
FaviconGrpcService faviconGrpcService,
ExecutionInit executionInit,
ExecutorFileTransferService fileTransferService,
ServiceEventLog eventLog)
throws Exception
{
super(params,
ServicePartition.partition(params.configuration.node()),
List.of(indexQueryService,
partitionLinkGraphService)
partitionLinkGraphService,
executorGrpcService,
executorCrawlGrpcService,
executorSideloadGrpcService,
executorExportGrpcService,
faviconGrpcService)
);
this.opsService = opsService;
@@ -62,15 +79,26 @@ public class IndexService extends SparkService {
this.fileStorageService = fileStorageService;
this.documentDbReader = documentDbReader;
this.domainLinks = domainLinks;
this.executionInit = executionInit;
this.eventLog = eventLog;
this.init = params.initialization;
Spark.get("/transfer/file/:fid", fileTransferService::transferFile);
Spark.head("/transfer/file/:fid", fileTransferService::transferFile);
Thread.ofPlatform().name("initialize-index").start(this::initialize);
}
volatile boolean initialized = false;
@MqRequest(endpoint="FIRST-BOOT")
public void setUpDefaultActors(String message) throws Exception {
logger.info("Initializing default actors");
executionInit.initDefaultActors();
}
@MqRequest(endpoint = IndexMqEndpoints.INDEX_RERANK)
public String rerank(String message) {
if (!opsService.rerank()) {

View File

@@ -24,7 +24,6 @@ dependencies {
implementation project(':code:services-core:query-service')
implementation project(':code:services-core:index-service')
implementation project(':code:services-core:control-service')
implementation project(':code:services-core:executor-service')
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit

View File

@@ -82,7 +82,6 @@ public class SingleService {
enum Service {
IndexService("index", "nu.marginalia.index.IndexMain"),
ControlService("control", "nu.marginalia.control.ControlMain"),
ExecutorService("executor", "nu.marginalia.executor.ExecutorMain"),
QueryService("query", "nu.marginalia.query.QueryMain"),
;

View File

@@ -14,7 +14,7 @@ import nu.marginalia.crawl.fetcher.DomainCookies;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.functions.searchquery.QueryFactory;
import nu.marginalia.index.IndexGrpcService;
import nu.marginalia.index.IndexQueryExecution;
import nu.marginalia.index.ReverseIndexFullFileNames;
import nu.marginalia.index.ReverseIndexPrioFileNames;
import nu.marginalia.index.construction.full.FullIndexConstructor;
@@ -25,6 +25,7 @@ import nu.marginalia.index.forward.construction.ForwardIndexConverter;
import nu.marginalia.index.index.StatefulIndex;
import nu.marginalia.index.journal.IndexJournal;
import nu.marginalia.index.model.SearchParameters;
import nu.marginalia.index.results.IndexResultRankingService;
import nu.marginalia.index.searchset.SearchSetAny;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.linkdb.docs.DocumentDbReader;
@@ -89,10 +90,11 @@ public class IntegrationTest {
@Inject
StatefulIndex statefulIndex;
@Inject
IndexGrpcService indexGrpcService;
@Inject
DocumentDbReader documentDbReader;
@Inject
IndexResultRankingService rankingService;
@Inject
QueryFactory queryFactory;
@@ -222,7 +224,7 @@ public class IntegrationTest {
System.out.println(indexRequest);
var rs = indexGrpcService.executeSearch(new SearchParameters(indexRequest, new SearchSetAny()));
var rs = new IndexQueryExecution(new SearchParameters(indexRequest, new SearchSetAny()), rankingService, statefulIndex.get());
System.out.println(rs);
}

View File

@@ -11,3 +11,8 @@
2025-05-17: Redeploy all.
2025-05-28: Deploy assistant and browserless.
2025-06-06: Deploy assistant and browserless.
2025-07-21: Deploy executor partition 1.
2025-07-21: Deploy search.
2025-07-23: Redeploy all.
2025-07-25: Redeploy index.

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 60 KiB

After

Width:  |  Height:  |  Size: 60 KiB

View File

@@ -40,11 +40,6 @@ services:
<<: *partition-1
image: "marginalia/index-service"
container_name: "index-service-1"
executor-service-1:
<<: *partition-1
restart: always
image: "marginalia/executor-service"
container_name: "executor-service-1"
query-service:
<<: *service
image: "marginalia/query-service"

View File

@@ -62,20 +62,10 @@ services:
<<: *partition-1
image: "marginalia/index-service"
container_name: "index-service-1"
executor-service-1:
<<: *partition-1
restart: always
image: "marginalia/executor-service"
container_name: "executor-service-1"
index-service-2:
<<: *partition-2
image: "marginalia/index-service"
container_name: "index-service-2"
executor-service-2:
<<: *partition-2
restart: always
image: "marginalia/executor-service"
container_name: "executor-service-2"
query-service:
<<: *service
image: "marginalia/query-service"

View File

@@ -62,20 +62,10 @@ services:
<<: *partition-1
image: "marginalia/index-service"
container_name: "index-service-1"
executor-service-1:
<<: *partition-1
restart: always
image: "marginalia/executor-service"
container_name: "executor-service-1"
index-service-2:
<<: *partition-2
image: "marginalia/index-service"
container_name: "index-service-2"
executor-service-2:
<<: *partition-2
restart: always
image: "marginalia/executor-service"
container_name: "executor-service-2"
query-service:
<<: *service
image: "marginalia/query-service"

View File

@@ -31,13 +31,11 @@ A working setup needs at all the services
* control [ http port is the control GUI ]
* query [ http port is the query GUI ]
* index [ http port is internal ]
* executor [ http port is internal ]
Since you will need to manage ports yourself, you must assign distinct ports-pairs to each service.
* An index and executor services should exist on the same partition e.g. index:1 and executor:1. The partition
number is the last digit of the service name, and should be positive. You can have multiple pairs of index
and executor partitions, but the pair should run on the same physical machine with the same install directory.
* An index service should exist on the same partition e.g. index:1. The partition
number is the last digit of the service name, and should be positive.
* The query service can use any partition number.

View File

@@ -4,7 +4,6 @@ include 'code:services-core:index-service'
include 'code:services-core:assistant-service'
include 'code:services-core:control-service'
include 'code:services-core:query-service'
include 'code:services-core:executor-service'
include 'code:services-core:single-service-runner'
include 'code:services-application:search-service'
@@ -41,6 +40,7 @@ include 'code:index:index-journal'
include 'code:index:query'
include 'code:index:index-forward'
include 'code:index:index-reverse'
include 'code:index:index-perftest'
include 'code:libraries:array'
include 'code:libraries:array:cpp'

View File

@@ -76,13 +76,6 @@ SERVICE_CONFIG = {
deploy_tier=3,
groups={"all", "index"}
),
'executor': ServiceConfig(
gradle_target=':code:services-core:executor-service:docker',
docker_name='executor-service',
instances=10,
deploy_tier=3,
groups={"all", "executor"}
),
'control': ServiceConfig(
gradle_target=':code:services-core:control-service:docker',
docker_name='control-service',