mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-05 21:22:39 +02:00
(index) Add performance metrics
This commit is contained in:
@@ -223,7 +223,7 @@ public class PerfTestMain {
|
||||
int iter;
|
||||
for (iter = 0;; iter++) {
|
||||
SearchParameters searchParameters = new SearchParameters(parsedQuery, new SearchSetAny());
|
||||
var execution = new IndexQueryExecution(searchParameters, rankingService, indexReader);
|
||||
var execution = new IndexQueryExecution(searchParameters, 1, rankingService, indexReader);
|
||||
long start = System.nanoTime();
|
||||
execution.run();
|
||||
long end = System.nanoTime();
|
||||
|
@@ -71,9 +71,8 @@ public class IndexGrpcService
|
||||
private final SearchSetsService searchSetsService;
|
||||
|
||||
private final IndexResultRankingService rankingService;
|
||||
|
||||
private final String nodeName;
|
||||
|
||||
private final int nodeId;
|
||||
|
||||
@Inject
|
||||
public IndexGrpcService(ServiceConfiguration serviceConfiguration,
|
||||
@@ -81,7 +80,7 @@ public class IndexGrpcService
|
||||
SearchSetsService searchSetsService,
|
||||
IndexResultRankingService rankingService)
|
||||
{
|
||||
var nodeId = serviceConfiguration.node();
|
||||
this.nodeId = serviceConfiguration.node();
|
||||
this.nodeName = Integer.toString(nodeId);
|
||||
this.statefulIndex = statefulIndex;
|
||||
this.searchSetsService = searchSetsService;
|
||||
@@ -108,7 +107,7 @@ public class IndexGrpcService
|
||||
return List.of();
|
||||
}
|
||||
|
||||
return new IndexQueryExecution(params, rankingService, statefulIndex.get()).run();
|
||||
return new IndexQueryExecution(params, nodeId, rankingService, statefulIndex.get()).run();
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.error("Error in handling request", ex);
|
||||
@@ -149,7 +148,7 @@ public class IndexGrpcService
|
||||
return List.of();
|
||||
}
|
||||
|
||||
return new IndexQueryExecution(new SearchParameters(specsSet, getSearchSet(specsSet)), rankingService, statefulIndex.get()).run();
|
||||
return new IndexQueryExecution(new SearchParameters(specsSet, getSearchSet(specsSet)), 1, rankingService, statefulIndex.get()).run();
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.error("Error in handling request", ex);
|
||||
|
@@ -1,5 +1,6 @@
|
||||
package nu.marginalia.index;
|
||||
|
||||
import io.prometheus.client.Gauge;
|
||||
import nu.marginalia.api.searchquery.RpcDecoratedResultItem;
|
||||
import nu.marginalia.array.page.LongQueryBuffer;
|
||||
import nu.marginalia.index.index.CombinedIndexReader;
|
||||
@@ -19,7 +20,6 @@ import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/** Performs an index query */
|
||||
public class IndexQueryExecution {
|
||||
@@ -35,49 +35,60 @@ public class IndexQueryExecution {
|
||||
// operations per lookup and again optimize tail latency
|
||||
private static final int lookupBatchSize = SkipListConstants.BLOCK_SIZE / 16;
|
||||
|
||||
private static final AtomicLong lookupTime = new AtomicLong();
|
||||
private static final AtomicLong prepTime = new AtomicLong();
|
||||
private static final AtomicLong valuationTime = new AtomicLong();
|
||||
|
||||
private static final ExecutorService threadPool = new ThreadPoolExecutor(indexValuationThreads, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
|
||||
private static final Logger log = LoggerFactory.getLogger(IndexQueryExecution.class);
|
||||
|
||||
private final String nodeName;
|
||||
private final IndexResultRankingService rankingService;
|
||||
|
||||
private final ResultRankingContext rankingContext;
|
||||
private final List<IndexQuery> queries;
|
||||
private final IndexSearchBudget budget;
|
||||
private final ResultPriorityQueue resultHeap;
|
||||
|
||||
private final CountDownLatch lookupCountdown;
|
||||
private final CountDownLatch preparationCountdown;
|
||||
private final CountDownLatch rankingCountdown;
|
||||
|
||||
private final ArrayBlockingQueue<CombinedDocIdList> fullPreparationQueue = new ArrayBlockingQueue<>(8, true);
|
||||
private final ArrayBlockingQueue<CombinedDocIdList> priorityPreparationQueue = new ArrayBlockingQueue<>(8, true);
|
||||
private final ArrayBlockingQueue<IndexResultRankingService.RankingData> fullEvaluationQueue = new ArrayBlockingQueue<>(8, true);
|
||||
private final ArrayBlockingQueue<IndexResultRankingService.RankingData> priorityEvaluationQueue = new ArrayBlockingQueue<>(8, true);
|
||||
private final ArrayBlockingQueue<CombinedDocIdList> fullPreparationQueue = new ArrayBlockingQueue<>(1);
|
||||
private final ArrayBlockingQueue<CombinedDocIdList> priorityPreparationQueue = new ArrayBlockingQueue<>(1);
|
||||
private final ArrayBlockingQueue<IndexResultRankingService.RankingData> fullEvaluationQueue = new ArrayBlockingQueue<>(4);
|
||||
private final ArrayBlockingQueue<IndexResultRankingService.RankingData> priorityEvaluationQueue = new ArrayBlockingQueue<>(4);
|
||||
|
||||
private final int limitTotal;
|
||||
private final int limitByDomain;
|
||||
|
||||
static {
|
||||
Thread.ofPlatform().daemon().start(() -> {
|
||||
for (;;) {
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(10);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
break;
|
||||
}
|
||||
log.info("Lookup: {}, Valuation: {}, Prep Time: {}", lookupTime.get() / 1_000_000_000., valuationTime.get() / 1_000_000_000., prepTime.get() / 1_000_000_000.);
|
||||
}
|
||||
});
|
||||
}
|
||||
private static final Gauge metric_index_lookup_time_s = Gauge.build()
|
||||
.labelNames("node")
|
||||
.name("index_exec_lookup_time_s")
|
||||
.help("Time in query spent on lookups")
|
||||
.register();
|
||||
|
||||
private static final Gauge metric_index_prep_time_s = Gauge.build()
|
||||
.labelNames("node")
|
||||
.name("index_exec_prep_time_s")
|
||||
.help("Time in query spent retrieving positions and spans")
|
||||
.register();
|
||||
|
||||
private static final Gauge metric_index_rank_time_s = Gauge.build()
|
||||
.labelNames("node")
|
||||
.name("index_exec_ranking_time_s")
|
||||
.help("Time in query spent on ranking")
|
||||
.register();
|
||||
|
||||
private static final Gauge metric_index_documents_ranked = Gauge.build()
|
||||
.labelNames("node")
|
||||
.name("index_exec_documents_ranked")
|
||||
.help("Number of documents ranked")
|
||||
.register();
|
||||
|
||||
|
||||
|
||||
public IndexQueryExecution(SearchParameters params,
|
||||
int serviceNode,
|
||||
IndexResultRankingService rankingService,
|
||||
CombinedIndexReader currentIndex) {
|
||||
this.nodeName = Integer.toString(serviceNode);
|
||||
this.rankingService = rankingService;
|
||||
|
||||
resultHeap = new ResultPriorityQueue(params.fetchSize);
|
||||
@@ -123,6 +134,10 @@ public class IndexQueryExecution {
|
||||
data.close();
|
||||
}
|
||||
|
||||
metric_index_documents_ranked
|
||||
.labels(nodeName)
|
||||
.inc(resultHeap.getItemsProcessed());
|
||||
|
||||
// Final result selection
|
||||
return rankingService.selectBestResults(limitByDomain, limitTotal, rankingContext, resultHeap.toList());
|
||||
}
|
||||
@@ -138,7 +153,9 @@ public class IndexQueryExecution {
|
||||
long st = System.nanoTime();
|
||||
query.getMoreResults(buffer);
|
||||
long et = System.nanoTime();
|
||||
lookupTime.addAndGet(et - st);
|
||||
metric_index_lookup_time_s
|
||||
.labels(nodeName)
|
||||
.inc((et - st)/1_000_000_000.);
|
||||
|
||||
if (buffer.isEmpty())
|
||||
continue;
|
||||
@@ -181,10 +198,14 @@ public class IndexQueryExecution {
|
||||
while (budget.hasTimeLeft() && (lookupCountdown.getCount() > 0 || !inputQueue.isEmpty())) {
|
||||
var docIds = inputQueue.poll(Math.clamp(budget.timeLeft(), 1, 5), TimeUnit.MILLISECONDS);
|
||||
if (docIds == null) continue;
|
||||
|
||||
long st = System.nanoTime();
|
||||
var preparedData = rankingService.prepareRankingData(rankingContext, docIds, budget);
|
||||
long et = System.nanoTime();
|
||||
prepTime.addAndGet(et - st);
|
||||
metric_index_prep_time_s
|
||||
.labels(nodeName)
|
||||
.inc((et - st)/1_000_000_000.);
|
||||
|
||||
if (!outputQueue.offer(preparedData, Math.max(1, budget.timeLeft()), TimeUnit.MILLISECONDS))
|
||||
preparedData.close();
|
||||
}
|
||||
@@ -209,7 +230,10 @@ public class IndexQueryExecution {
|
||||
long st = System.nanoTime();
|
||||
resultHeap.addAll(rankingService.rankResults(budget, rankingContext, rankingData, false));
|
||||
long et = System.nanoTime();
|
||||
valuationTime.addAndGet(et - st);
|
||||
|
||||
metric_index_rank_time_s
|
||||
.labels(nodeName)
|
||||
.inc((et - st)/1_000_000_000.);
|
||||
}
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
|
@@ -80,8 +80,8 @@ class CrawlerRetreiverTest {
|
||||
var specs = CrawlerMain.CrawlSpecRecord
|
||||
.builder()
|
||||
.crawlDepth(5)
|
||||
.domain("www.marginalia.nu")
|
||||
.urls(List.of("https://www.marginalia.nu/misc/debian-laptop-install-log/"))
|
||||
.domain("thenewleafjournal.com")
|
||||
.urls(List.of("https://thenewleafjournal.com/"))
|
||||
.build();
|
||||
Path tempFile = null;
|
||||
try {
|
||||
|
@@ -224,7 +224,7 @@ public class IntegrationTest {
|
||||
|
||||
System.out.println(indexRequest);
|
||||
|
||||
var rs = new IndexQueryExecution(new SearchParameters(indexRequest, new SearchSetAny()), rankingService, statefulIndex.get());
|
||||
var rs = new IndexQueryExecution(new SearchParameters(indexRequest, new SearchSetAny()), 1, rankingService, statefulIndex.get());
|
||||
|
||||
System.out.println(rs);
|
||||
}
|
||||
|
Reference in New Issue
Block a user