1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-05 21:22:39 +02:00

Compare commits

...

4 Commits

23 changed files with 196 additions and 38 deletions

View File

@@ -22,6 +22,7 @@ dependencies {
implementation project(':code:libraries:array')
implementation project(':code:libraries:btree')
implementation project(':code:libraries:skiplist')
implementation project(':code:libraries:coded-sequence')
implementation project(':code:libraries:language-processing')

View File

@@ -6,6 +6,7 @@ import nu.marginalia.array.LongArrayFactory;
import nu.marginalia.ffi.LinuxSystemCalls;
import nu.marginalia.index.forward.spans.DocumentSpans;
import nu.marginalia.index.forward.spans.IndexSpansReader;
import nu.marginalia.index.query.IndexSearchBudget;
import nu.marginalia.model.id.UrlIdCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -14,6 +15,7 @@ import java.io.IOException;
import java.lang.foreign.Arena;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.TimeoutException;
import static nu.marginalia.index.forward.ForwardIndexParameters.*;
@@ -138,7 +140,7 @@ public class ForwardIndexReader {
return (int) offset;
}
public DocumentSpans[] getDocumentSpans(Arena arena, long[] docIds) {
public DocumentSpans[] getDocumentSpans(Arena arena, IndexSearchBudget budget, long[] docIds) throws TimeoutException {
long[] offsets = new long[docIds.length];
for (int i = 0; i < docIds.length; i++) {
long offset = idxForDoc(docIds[i]);
@@ -151,7 +153,7 @@ public class ForwardIndexReader {
}
try {
return spansReader.readSpans(arena, offsets);
return spansReader.readSpans(arena, budget, offsets);
}
catch (IOException ex) {
logger.error("Failed to read spans for docIds", ex);

View File

@@ -1,12 +1,17 @@
package nu.marginalia.index.forward.spans;
import nu.marginalia.index.query.IndexSearchBudget;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.nio.file.Path;
import java.util.concurrent.TimeoutException;
public interface IndexSpansReader extends AutoCloseable {
@Deprecated
DocumentSpans readSpans(Arena arena, long encodedOffset) throws IOException;
DocumentSpans[] readSpans(Arena arena, long[] encodedOffsets) throws IOException;
DocumentSpans[] readSpans(Arena arena, IndexSearchBudget budget, long[] encodedOffsets) throws TimeoutException, IOException;
static IndexSpansReader open(Path fileName) throws IOException {
int version = SpansCodec.parseSpanFilesFooter(fileName);

View File

@@ -1,5 +1,6 @@
package nu.marginalia.index.forward.spans;
import nu.marginalia.index.query.IndexSearchBudget;
import nu.marginalia.sequence.VarintCodedSequence;
import java.io.IOException;
@@ -52,7 +53,7 @@ public class IndexSpansReaderCompressed implements AutoCloseable, IndexSpansRead
}
@Override
public DocumentSpans[] readSpans(Arena arena, long[] encodedOffsets) throws IOException {
public DocumentSpans[] readSpans(Arena arena, IndexSearchBudget budget, long[] encodedOffsets) throws IOException {
DocumentSpans[] ret = new DocumentSpans[encodedOffsets.length];
for (int i = 0; i < encodedOffsets.length; i++) {
if (encodedOffsets[i] >= 0) {

View File

@@ -1,6 +1,7 @@
package nu.marginalia.index.forward.spans;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import nu.marginalia.index.query.IndexSearchBudget;
import nu.marginalia.uring.UringFileReader;
import java.io.IOException;
@@ -10,6 +11,7 @@ import java.lang.foreign.ValueLayout;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.TimeoutException;
public class IndexSpansReaderPlain implements IndexSpansReader {
private final UringFileReader uringReader;
@@ -32,7 +34,11 @@ public class IndexSpansReaderPlain implements IndexSpansReader {
@Override
public DocumentSpans readSpans(Arena arena, long encodedOffset) throws IOException {
// for testing, slow
return readSpans(arena, new long[] { encodedOffset})[0];
try {
return readSpans(arena, new IndexSearchBudget(1000), new long[] { encodedOffset})[0];
} catch (TimeoutException e) {
throw new IOException(e);
}
}
public DocumentSpans decode(MemorySegment ms) {
@@ -59,7 +65,7 @@ public class IndexSpansReaderPlain implements IndexSpansReader {
}
@Override
public DocumentSpans[] readSpans(Arena arena, long[] encodedOffsets) {
public DocumentSpans[] readSpans(Arena arena, IndexSearchBudget budget, long[] encodedOffsets) throws TimeoutException {
int readCnt = 0;
for (long offset : encodedOffsets) {
@@ -85,7 +91,7 @@ public class IndexSpansReaderPlain implements IndexSpansReader {
j++;
}
List<MemorySegment> buffers = uringReader.readUnaligned(arena, offsets, sizes, 4096);
List<MemorySegment> buffers = uringReader.readUnaligned(arena, budget.timeLeft(), offsets, sizes, 4096);
DocumentSpans[] ret = new DocumentSpans[encodedOffsets.length];

View File

@@ -17,6 +17,7 @@ dependencies {
implementation project(':code:libraries:array')
implementation project(':code:libraries:native')
implementation project(':code:libraries:btree')
implementation project(':code:libraries:skiplist')
implementation project(':code:libraries:coded-sequence')
implementation project(':code:libraries:random-write-funnel')
implementation project(':code:index:query')

View File

@@ -21,6 +21,7 @@ import java.lang.foreign.Arena;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
public class FullReverseIndexReader {
@@ -165,8 +166,10 @@ public class FullReverseIndexReader {
}
public TermData[] getTermData(Arena arena,
IndexSearchBudget budget,
long[] termIds,
long[] docIds)
throws TimeoutException
{
long[] offsetsAll = new long[termIds.length * docIds.length];
@@ -188,7 +191,7 @@ public class FullReverseIndexReader {
System.arraycopy(offsetsForTerm, 0, offsetsAll, i * docIds.length, docIds.length);
}
return positionsFileReader.getTermData(arena, offsetsAll);
return positionsFileReader.getTermData(arena, budget, offsetsAll);
}
public TermData[] getTermData(Arena arena,
@@ -210,7 +213,13 @@ public class FullReverseIndexReader {
// Read the size and offset of the position data
var offsets = reader.getValueOffsets(docIds);
return positionsFileReader.getTermData(arena, offsets);
// FIXME this entire method chain only exists for a single unit test
// remove me!
try {
return positionsFileReader.getTermData(arena, new IndexSearchBudget(1000), offsets);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
public void close() {

View File

@@ -1,5 +1,6 @@
package nu.marginalia.index.positions;
import nu.marginalia.index.query.IndexSearchBudget;
import nu.marginalia.uring.UringFileReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -10,6 +11,7 @@ import java.lang.foreign.MemorySegment;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.TimeoutException;
/** Reads positions data from the positions file */
public class PositionsFileReader implements AutoCloseable {
@@ -37,7 +39,7 @@ public class PositionsFileReader implements AutoCloseable {
/** Get the positions for a keywords in the index, as pointed out by the encoded offsets;
* intermediate buffers are allocated from the provided arena allocator. */
public TermData[] getTermData(Arena arena, long[] offsets) {
public TermData[] getTermData(Arena arena, IndexSearchBudget budget, long[] offsets) throws TimeoutException {
int cnt = 0;
@@ -63,7 +65,7 @@ public class PositionsFileReader implements AutoCloseable {
j++;
}
List<MemorySegment> buffers = uringFileReader.readUnaligned(arena, readOffsets, readSizes, 4096);
List<MemorySegment> buffers = uringFileReader.readUnaligned(arena, budget.timeLeft(), readOffsets, readSizes, 4096);
TermData[] ret = new TermData[offsets.length];
for (int i = 0, j=0; i < offsets.length; i++) {

View File

@@ -4,6 +4,7 @@ import it.unimi.dsi.fastutil.ints.IntList;
import nu.marginalia.index.construction.PositionsFileConstructor;
import nu.marginalia.index.positions.PositionsFileReader;
import nu.marginalia.index.positions.TermData;
import nu.marginalia.index.query.IndexSearchBudget;
import nu.marginalia.sequence.VarintCodedSequence;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -13,6 +14,7 @@ import java.io.IOException;
import java.lang.foreign.Arena;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.TimeoutException;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -30,7 +32,7 @@ class PositionsFileReaderTest {
}
@Test
void getTermData() throws IOException {
void getTermData() throws IOException, TimeoutException {
long key1, key2, key3;
try (PositionsFileConstructor constructor = new PositionsFileConstructor(file)) {
var block = constructor.getBlock();
@@ -47,7 +49,7 @@ class PositionsFileReaderTest {
try (Arena arena = Arena.ofShared();
PositionsFileReader reader = new PositionsFileReader(file))
{
TermData[] data = reader.getTermData(arena, new long[] { key1, key2, key3 });
TermData[] data = reader.getTermData(arena, new IndexSearchBudget(10000), new long[] { key1, key2, key3 });
assertEquals(43, data[0].flags());
assertEquals(IntList.of( 1, 2, 3), data[0].positions().values());

View File

@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
public class IndexQueryExecution {
private static final int indexValuationThreads = Integer.getInteger("index.valuationThreads", 16);
private static final int indexPreparationThreads = Integer.getInteger("index.preparationThreads", 4);
private static final int indexPreparationThreads = Integer.getInteger("index.preparationThreads", 2);
// Since most NVMe drives have a maximum read size of 128 KB, and most small reads are 512B
// this should probably be 128*1024 / 512 = 256 to reduce queue depth and optimize tail latency

View File

@@ -32,6 +32,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
/** A reader for the combined forward and reverse indexes.
@@ -187,10 +188,12 @@ public class CombinedIndexReader {
/** Retrieves the term metadata for the specified word for the provided documents */
public TermMetadataList[] getTermMetadata(Arena arena,
long[] wordIds,
CombinedDocIdList docIds)
IndexSearchBudget budget,
long[] wordIds,
CombinedDocIdList docIds)
throws TimeoutException
{
TermData[] combinedTermData = reverseIndexFullReader.getTermData(arena, wordIds, docIds.array());
TermData[] combinedTermData = reverseIndexFullReader.getTermData(arena, budget, wordIds, docIds.array());
TermMetadataList[] ret = new TermMetadataList[wordIds.length];
for (int i = 0; i < wordIds.length; i++) {
ret[i] = new TermMetadataList(Arrays.copyOfRange(combinedTermData, i*docIds.size(), (i+1)*docIds.size()));
@@ -226,13 +229,13 @@ public class CombinedIndexReader {
}
/** Retrieves the document spans for the specified documents */
public DocumentSpans[] getDocumentSpans(Arena arena, CombinedDocIdList docIds) {
public DocumentSpans[] getDocumentSpans(Arena arena, IndexSearchBudget budget, CombinedDocIdList docIds) throws TimeoutException {
long[] decodedIDs = docIds.array();
for (int i = 0; i < decodedIDs.length; i++) {
decodedIDs[i] = UrlIdCodec.removeRank(decodedIDs[i]);
}
return forwardIndexReader.getDocumentSpans(arena, decodedIDs);
return forwardIndexReader.getDocumentSpans(arena, budget, decodedIDs);
}
/** Close the indexes (this is not done immediately)

View File

@@ -92,10 +92,14 @@ public class IndexResultRankingService {
// Perform expensive I/O operations
this.termsForDocs = currentIndex.getTermMetadata(arena, searchTerms.termIdsAll.array, resultIds);
if (!budget.hasTimeLeft())
throw new TimeoutException();
this.documentSpans = currentIndex.getDocumentSpans(arena, resultIds);
try {
this.termsForDocs = currentIndex.getTermMetadata(arena, budget, searchTerms.termIdsAll.array, resultIds);
this.documentSpans = currentIndex.getDocumentSpans(arena, budget, resultIds);
}
catch (TimeoutException ex) {
arena.close();
throw ex;
}
}
public CodedSequence[] positions() {

View File

@@ -4,18 +4,31 @@ import nu.marginalia.array.DirectFileReader;
import nu.marginalia.array.LongArray;
import nu.marginalia.array.LongArrayFactory;
import nu.marginalia.ffi.LinuxSystemCalls;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.file.Files;
import java.nio.file.Path;
public class NativeAlgosTest {
Path testFile;
@BeforeEach
public void setUp() throws IOException {
testFile = Files.createTempFile("NativeAlgosTest", ".dat");
}
@AfterEach
public void tearDown() throws IOException {
Files.deleteIfExists(testFile);
}
@Test
public void test() throws IOException {
LongArray array = LongArrayFactory.mmapForWritingShared(Path.of("/tmp/test"), 1024);
LongArray array = LongArrayFactory.mmapForWritingShared(testFile, 1024);
for (int i = 0; i < 1024; i++) {
array.set(i, i);
}
@@ -23,7 +36,7 @@ public class NativeAlgosTest {
var ms = Arena.global().allocate(512, 8);
int fd = LinuxSystemCalls.openDirect(Path.of("/tmp/test"));
int fd = LinuxSystemCalls.openDirect(testFile);
int ret = LinuxSystemCalls.readAt(fd, ms, 512);
System.out.println(ret);
System.out.println(ms.byteSize());
@@ -38,14 +51,14 @@ public class NativeAlgosTest {
@Test
void testDirectFileReader() throws IOException {
LongArray array = LongArrayFactory.mmapForWritingShared(Path.of("/tmp/test"), 1024);
LongArray array = LongArrayFactory.mmapForWritingShared(testFile, 1024);
for (int i = 0; i < 1024; i++) {
array.set(i, i);
}
array.close();
try (var dfr = new DirectFileReader(Path.of("/tmp/test"))) {
try (var dfr = new DirectFileReader(testFile)) {
LongArray array2 = LongArrayFactory.onHeapConfined(64);
dfr.readAligned(array2, 0);
for (int i = 0; i < array2.size(); i++) {
@@ -54,7 +67,7 @@ public class NativeAlgosTest {
}
var alignedBuffer = Arena.ofAuto().allocate(4096, 4096);
try (var dfr = new DirectFileReader(Path.of("/tmp/test"))) {
try (var dfr = new DirectFileReader(testFile)) {
MemorySegment dest = Arena.ofAuto().allocate(504, 1);
dfr.readUnaligned(dest, alignedBuffer, 8);

View File

@@ -12,6 +12,7 @@ import java.lang.foreign.SegmentAllocator;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
public class UringFileReader implements AutoCloseable {
@@ -77,15 +78,54 @@ public class UringFileReader implements AutoCloseable {
}
}
public List<MemorySegment> readUnaligned(Arena arena, long[] offsets, int[] sizes, int blockSize) {
if (direct) {
return readUnalignedInDirectMode(arena, offsets, sizes, blockSize);
} else {
return readUnalignedInBufferedMode(arena, offsets, sizes);
public void read(List<MemorySegment> destinations, List<Long> offsets, long timeoutMs) throws TimeoutException {
if (destinations.size() < 5) {
for (int i = 0; i < destinations.size(); i++) {
var ms = destinations.get(i);
long offset = offsets.get(i);
int ret;
if (ms.byteSize() != (ret = LinuxSystemCalls.readAt(fd, ms, offset))) {
throw new RuntimeException("Read failed, rv=" + ret + " at " + offset + " : " + ms.byteSize());
}
}
return;
}
var ring = rings[(int) (ringIdx.getAndIncrement() % rings.length)];
if (destinations.size() <= QUEUE_SIZE) {
int ret = ring.readBatch(destinations, offsets, timeoutMs, direct);
if (ret != offsets.size()) {
throw new RuntimeException("Read failed, rv=" + ret);
}
}
else {
long timeEnd = System.currentTimeMillis() + timeoutMs;
for (int i = 0; i < destinations.size(); i+=QUEUE_SIZE) {
long timeRemainingMs = timeEnd - System.currentTimeMillis();
if (timeRemainingMs <= 0)
throw new TimeoutException();
var destSlice = destinations.subList(i, Math.min(destinations.size(), i+QUEUE_SIZE));
var offSlice = offsets.subList(i, Math.min(offsets.size(), i+QUEUE_SIZE));
int ret = ring.readBatch(destSlice, offSlice, timeRemainingMs, direct);
if (ret != offSlice.size()) {
throw new RuntimeException("Read failed, rv=" + ret);
}
}
}
}
private List<MemorySegment> readUnalignedInBufferedMode(Arena arena, long[] offsets, int[] sizes) {
public List<MemorySegment> readUnaligned(Arena arena, long timeoutMs, long[] offsets, int[] sizes, int blockSize) throws TimeoutException {
if (direct) {
return readUnalignedInDirectMode(arena, timeoutMs, offsets, sizes, blockSize);
} else {
return readUnalignedInBufferedMode(arena, timeoutMs, offsets, sizes);
}
}
private List<MemorySegment> readUnalignedInBufferedMode(Arena arena, long timeoutMs, long[] offsets, int[] sizes) throws TimeoutException {
int totalSize = 0;
for (int size : sizes) {
totalSize += size;
@@ -101,7 +141,7 @@ public class UringFileReader implements AutoCloseable {
offsetsList.add(offsets[i]);
}
read(segmentsList, offsetsList);
read(segmentsList, offsetsList, timeoutMs);
return segmentsList;
}
@@ -113,7 +153,7 @@ public class UringFileReader implements AutoCloseable {
*
* @return MemorySegment slices that contain only the requested data.
*/
public List<MemorySegment> readUnalignedInDirectMode(Arena arena, long[] offsets, int[] sizes, int blockSize) {
public List<MemorySegment> readUnalignedInDirectMode(Arena arena, long timeoutMs, long[] offsets, int[] sizes, int blockSize) throws TimeoutException {
if (offsets.length < 1)
return List.of();
@@ -171,7 +211,7 @@ public class UringFileReader implements AutoCloseable {
}
// Perform the read
read(buffers, bufferOffsets);
read(buffers, bufferOffsets, timeoutMs);
// Slice the big memory chunk into the requested slices
List<MemorySegment> ret = new ArrayList<>(sizes.length);

View File

@@ -6,6 +6,7 @@ import java.lang.foreign.MemorySegment;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -23,6 +24,24 @@ public final class UringQueue {
return IoUring.uringOpen(fd, size);
}
public int readBatch(List<MemorySegment> dest, List<Long> offsets, long timeout, boolean direct)
throws TimeoutException {
try {
if (!lock.tryLock(timeout, TimeUnit.MILLISECONDS))
throw new TimeoutException();
try {
return IoUring.uringReadBatch(fd, this, dest, offsets, direct);
}
finally {
lock.unlock();
}
}
catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
public int readBatch(List<MemorySegment> dest, List<Long> offsets, boolean direct) {
try {
if (!lock.tryLock(10, TimeUnit.MILLISECONDS))

View File

@@ -14,6 +14,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.concurrent.TimeoutException;
public class UringFileReaderTest {
Path testFile;
@@ -63,11 +64,12 @@ public class UringFileReaderTest {
}
@Test
void testUringFileReaderUnaligned() throws IOException {
void testUringFileReaderUnaligned() throws IOException, TimeoutException {
createTestFileWithLongs(65536);
try (var dfr = new UringFileReader(testFile, true)) {
var ret = dfr.readUnalignedInDirectMode(Arena.ofAuto(),
1000,
new long[] { 10*8, 20*8, 5000*8, 5100*8},
new int[] { 32*8, 10*8, 100*8, 100*8},
4096);

View File

@@ -0,0 +1,47 @@
plugins {
id 'java'
id "me.champeau.jmh" version "0.6.6"
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
}
}
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation libs.bundles.slf4j
implementation libs.notnull
implementation libs.commons.lang3
implementation libs.fastutil
implementation libs.lz4
implementation libs.guava
implementation project(':code:libraries:native')
implementation project(':code:libraries:array')
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
testImplementation project(':code:libraries:test-helpers')
}
jmh {
jvmArgs = [ "--enable-preview" ]
}
tasks.withType(me.champeau.jmh.WithJavaToolchain).configureEach {
javaLauncher.set(javaToolchains.launcherFor {
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
})
}
tasks.withType(me.champeau.jmh.JmhBytecodeGeneratorTask).configureEach {
jvmArgs = ["--enable-preview"]
}
test {
useJUnitPlatform()
}

View File

@@ -47,6 +47,7 @@ include 'code:libraries:native'
include 'code:libraries:coded-sequence'
include 'code:libraries:geo-ip'
include 'code:libraries:btree'
include 'code:libraries:skiplist'
include 'code:libraries:easy-lsh'
include 'code:libraries:guarded-regex'
include 'code:libraries:random-write-funnel'