mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-05 21:22:39 +02:00
Compare commits
4 Commits
827aadafcd
...
4a98a3c711
Author | SHA1 | Date | |
---|---|---|---|
|
4a98a3c711 | ||
|
68f52ca350 | ||
|
2a2d951c2f | ||
|
379a1be074 |
@@ -22,6 +22,7 @@ dependencies {
|
||||
|
||||
implementation project(':code:libraries:array')
|
||||
implementation project(':code:libraries:btree')
|
||||
implementation project(':code:libraries:skiplist')
|
||||
implementation project(':code:libraries:coded-sequence')
|
||||
implementation project(':code:libraries:language-processing')
|
||||
|
||||
|
@@ -6,6 +6,7 @@ import nu.marginalia.array.LongArrayFactory;
|
||||
import nu.marginalia.ffi.LinuxSystemCalls;
|
||||
import nu.marginalia.index.forward.spans.DocumentSpans;
|
||||
import nu.marginalia.index.forward.spans.IndexSpansReader;
|
||||
import nu.marginalia.index.query.IndexSearchBudget;
|
||||
import nu.marginalia.model.id.UrlIdCodec;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -14,6 +15,7 @@ import java.io.IOException;
|
||||
import java.lang.foreign.Arena;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static nu.marginalia.index.forward.ForwardIndexParameters.*;
|
||||
|
||||
@@ -138,7 +140,7 @@ public class ForwardIndexReader {
|
||||
return (int) offset;
|
||||
}
|
||||
|
||||
public DocumentSpans[] getDocumentSpans(Arena arena, long[] docIds) {
|
||||
public DocumentSpans[] getDocumentSpans(Arena arena, IndexSearchBudget budget, long[] docIds) throws TimeoutException {
|
||||
long[] offsets = new long[docIds.length];
|
||||
for (int i = 0; i < docIds.length; i++) {
|
||||
long offset = idxForDoc(docIds[i]);
|
||||
@@ -151,7 +153,7 @@ public class ForwardIndexReader {
|
||||
}
|
||||
|
||||
try {
|
||||
return spansReader.readSpans(arena, offsets);
|
||||
return spansReader.readSpans(arena, budget, offsets);
|
||||
}
|
||||
catch (IOException ex) {
|
||||
logger.error("Failed to read spans for docIds", ex);
|
||||
|
@@ -1,12 +1,17 @@
|
||||
package nu.marginalia.index.forward.spans;
|
||||
|
||||
import nu.marginalia.index.query.IndexSearchBudget;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.foreign.Arena;
|
||||
import java.nio.file.Path;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public interface IndexSpansReader extends AutoCloseable {
|
||||
@Deprecated
|
||||
DocumentSpans readSpans(Arena arena, long encodedOffset) throws IOException;
|
||||
DocumentSpans[] readSpans(Arena arena, long[] encodedOffsets) throws IOException;
|
||||
|
||||
DocumentSpans[] readSpans(Arena arena, IndexSearchBudget budget, long[] encodedOffsets) throws TimeoutException, IOException;
|
||||
|
||||
static IndexSpansReader open(Path fileName) throws IOException {
|
||||
int version = SpansCodec.parseSpanFilesFooter(fileName);
|
||||
|
@@ -1,5 +1,6 @@
|
||||
package nu.marginalia.index.forward.spans;
|
||||
|
||||
import nu.marginalia.index.query.IndexSearchBudget;
|
||||
import nu.marginalia.sequence.VarintCodedSequence;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -52,7 +53,7 @@ public class IndexSpansReaderCompressed implements AutoCloseable, IndexSpansRead
|
||||
}
|
||||
|
||||
@Override
|
||||
public DocumentSpans[] readSpans(Arena arena, long[] encodedOffsets) throws IOException {
|
||||
public DocumentSpans[] readSpans(Arena arena, IndexSearchBudget budget, long[] encodedOffsets) throws IOException {
|
||||
DocumentSpans[] ret = new DocumentSpans[encodedOffsets.length];
|
||||
for (int i = 0; i < encodedOffsets.length; i++) {
|
||||
if (encodedOffsets[i] >= 0) {
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package nu.marginalia.index.forward.spans;
|
||||
|
||||
import it.unimi.dsi.fastutil.ints.IntArrayList;
|
||||
import nu.marginalia.index.query.IndexSearchBudget;
|
||||
import nu.marginalia.uring.UringFileReader;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -10,6 +11,7 @@ import java.lang.foreign.ValueLayout;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class IndexSpansReaderPlain implements IndexSpansReader {
|
||||
private final UringFileReader uringReader;
|
||||
@@ -32,7 +34,11 @@ public class IndexSpansReaderPlain implements IndexSpansReader {
|
||||
@Override
|
||||
public DocumentSpans readSpans(Arena arena, long encodedOffset) throws IOException {
|
||||
// for testing, slow
|
||||
return readSpans(arena, new long[] { encodedOffset})[0];
|
||||
try {
|
||||
return readSpans(arena, new IndexSearchBudget(1000), new long[] { encodedOffset})[0];
|
||||
} catch (TimeoutException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public DocumentSpans decode(MemorySegment ms) {
|
||||
@@ -59,7 +65,7 @@ public class IndexSpansReaderPlain implements IndexSpansReader {
|
||||
}
|
||||
|
||||
@Override
|
||||
public DocumentSpans[] readSpans(Arena arena, long[] encodedOffsets) {
|
||||
public DocumentSpans[] readSpans(Arena arena, IndexSearchBudget budget, long[] encodedOffsets) throws TimeoutException {
|
||||
|
||||
int readCnt = 0;
|
||||
for (long offset : encodedOffsets) {
|
||||
@@ -85,7 +91,7 @@ public class IndexSpansReaderPlain implements IndexSpansReader {
|
||||
j++;
|
||||
}
|
||||
|
||||
List<MemorySegment> buffers = uringReader.readUnaligned(arena, offsets, sizes, 4096);
|
||||
List<MemorySegment> buffers = uringReader.readUnaligned(arena, budget.timeLeft(), offsets, sizes, 4096);
|
||||
|
||||
DocumentSpans[] ret = new DocumentSpans[encodedOffsets.length];
|
||||
|
||||
|
@@ -17,6 +17,7 @@ dependencies {
|
||||
implementation project(':code:libraries:array')
|
||||
implementation project(':code:libraries:native')
|
||||
implementation project(':code:libraries:btree')
|
||||
implementation project(':code:libraries:skiplist')
|
||||
implementation project(':code:libraries:coded-sequence')
|
||||
implementation project(':code:libraries:random-write-funnel')
|
||||
implementation project(':code:index:query')
|
||||
|
@@ -21,6 +21,7 @@ import java.lang.foreign.Arena;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class FullReverseIndexReader {
|
||||
@@ -165,8 +166,10 @@ public class FullReverseIndexReader {
|
||||
}
|
||||
|
||||
public TermData[] getTermData(Arena arena,
|
||||
IndexSearchBudget budget,
|
||||
long[] termIds,
|
||||
long[] docIds)
|
||||
throws TimeoutException
|
||||
{
|
||||
|
||||
long[] offsetsAll = new long[termIds.length * docIds.length];
|
||||
@@ -188,7 +191,7 @@ public class FullReverseIndexReader {
|
||||
System.arraycopy(offsetsForTerm, 0, offsetsAll, i * docIds.length, docIds.length);
|
||||
}
|
||||
|
||||
return positionsFileReader.getTermData(arena, offsetsAll);
|
||||
return positionsFileReader.getTermData(arena, budget, offsetsAll);
|
||||
}
|
||||
|
||||
public TermData[] getTermData(Arena arena,
|
||||
@@ -210,7 +213,13 @@ public class FullReverseIndexReader {
|
||||
// Read the size and offset of the position data
|
||||
var offsets = reader.getValueOffsets(docIds);
|
||||
|
||||
return positionsFileReader.getTermData(arena, offsets);
|
||||
// FIXME this entire method chain only exists for a single unit test
|
||||
// remove me!
|
||||
try {
|
||||
return positionsFileReader.getTermData(arena, new IndexSearchBudget(1000), offsets);
|
||||
} catch (TimeoutException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void close() {
|
||||
|
@@ -1,5 +1,6 @@
|
||||
package nu.marginalia.index.positions;
|
||||
|
||||
import nu.marginalia.index.query.IndexSearchBudget;
|
||||
import nu.marginalia.uring.UringFileReader;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -10,6 +11,7 @@ import java.lang.foreign.MemorySegment;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/** Reads positions data from the positions file */
|
||||
public class PositionsFileReader implements AutoCloseable {
|
||||
@@ -37,7 +39,7 @@ public class PositionsFileReader implements AutoCloseable {
|
||||
|
||||
/** Get the positions for a keywords in the index, as pointed out by the encoded offsets;
|
||||
* intermediate buffers are allocated from the provided arena allocator. */
|
||||
public TermData[] getTermData(Arena arena, long[] offsets) {
|
||||
public TermData[] getTermData(Arena arena, IndexSearchBudget budget, long[] offsets) throws TimeoutException {
|
||||
|
||||
int cnt = 0;
|
||||
|
||||
@@ -63,7 +65,7 @@ public class PositionsFileReader implements AutoCloseable {
|
||||
j++;
|
||||
}
|
||||
|
||||
List<MemorySegment> buffers = uringFileReader.readUnaligned(arena, readOffsets, readSizes, 4096);
|
||||
List<MemorySegment> buffers = uringFileReader.readUnaligned(arena, budget.timeLeft(), readOffsets, readSizes, 4096);
|
||||
|
||||
TermData[] ret = new TermData[offsets.length];
|
||||
for (int i = 0, j=0; i < offsets.length; i++) {
|
||||
|
@@ -4,6 +4,7 @@ import it.unimi.dsi.fastutil.ints.IntList;
|
||||
import nu.marginalia.index.construction.PositionsFileConstructor;
|
||||
import nu.marginalia.index.positions.PositionsFileReader;
|
||||
import nu.marginalia.index.positions.TermData;
|
||||
import nu.marginalia.index.query.IndexSearchBudget;
|
||||
import nu.marginalia.sequence.VarintCodedSequence;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
@@ -13,6 +14,7 @@ import java.io.IOException;
|
||||
import java.lang.foreign.Arena;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
@@ -30,7 +32,7 @@ class PositionsFileReaderTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void getTermData() throws IOException {
|
||||
void getTermData() throws IOException, TimeoutException {
|
||||
long key1, key2, key3;
|
||||
try (PositionsFileConstructor constructor = new PositionsFileConstructor(file)) {
|
||||
var block = constructor.getBlock();
|
||||
@@ -47,7 +49,7 @@ class PositionsFileReaderTest {
|
||||
try (Arena arena = Arena.ofShared();
|
||||
PositionsFileReader reader = new PositionsFileReader(file))
|
||||
{
|
||||
TermData[] data = reader.getTermData(arena, new long[] { key1, key2, key3 });
|
||||
TermData[] data = reader.getTermData(arena, new IndexSearchBudget(10000), new long[] { key1, key2, key3 });
|
||||
|
||||
assertEquals(43, data[0].flags());
|
||||
assertEquals(IntList.of( 1, 2, 3), data[0].positions().values());
|
||||
|
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
public class IndexQueryExecution {
|
||||
|
||||
private static final int indexValuationThreads = Integer.getInteger("index.valuationThreads", 16);
|
||||
private static final int indexPreparationThreads = Integer.getInteger("index.preparationThreads", 4);
|
||||
private static final int indexPreparationThreads = Integer.getInteger("index.preparationThreads", 2);
|
||||
|
||||
// Since most NVMe drives have a maximum read size of 128 KB, and most small reads are 512B
|
||||
// this should probably be 128*1024 / 512 = 256 to reduce queue depth and optimize tail latency
|
||||
|
@@ -32,6 +32,7 @@ import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
/** A reader for the combined forward and reverse indexes.
|
||||
@@ -187,10 +188,12 @@ public class CombinedIndexReader {
|
||||
|
||||
/** Retrieves the term metadata for the specified word for the provided documents */
|
||||
public TermMetadataList[] getTermMetadata(Arena arena,
|
||||
long[] wordIds,
|
||||
CombinedDocIdList docIds)
|
||||
IndexSearchBudget budget,
|
||||
long[] wordIds,
|
||||
CombinedDocIdList docIds)
|
||||
throws TimeoutException
|
||||
{
|
||||
TermData[] combinedTermData = reverseIndexFullReader.getTermData(arena, wordIds, docIds.array());
|
||||
TermData[] combinedTermData = reverseIndexFullReader.getTermData(arena, budget, wordIds, docIds.array());
|
||||
TermMetadataList[] ret = new TermMetadataList[wordIds.length];
|
||||
for (int i = 0; i < wordIds.length; i++) {
|
||||
ret[i] = new TermMetadataList(Arrays.copyOfRange(combinedTermData, i*docIds.size(), (i+1)*docIds.size()));
|
||||
@@ -226,13 +229,13 @@ public class CombinedIndexReader {
|
||||
}
|
||||
|
||||
/** Retrieves the document spans for the specified documents */
|
||||
public DocumentSpans[] getDocumentSpans(Arena arena, CombinedDocIdList docIds) {
|
||||
public DocumentSpans[] getDocumentSpans(Arena arena, IndexSearchBudget budget, CombinedDocIdList docIds) throws TimeoutException {
|
||||
long[] decodedIDs = docIds.array();
|
||||
for (int i = 0; i < decodedIDs.length; i++) {
|
||||
decodedIDs[i] = UrlIdCodec.removeRank(decodedIDs[i]);
|
||||
}
|
||||
|
||||
return forwardIndexReader.getDocumentSpans(arena, decodedIDs);
|
||||
return forwardIndexReader.getDocumentSpans(arena, budget, decodedIDs);
|
||||
}
|
||||
|
||||
/** Close the indexes (this is not done immediately)
|
||||
|
@@ -92,10 +92,14 @@ public class IndexResultRankingService {
|
||||
|
||||
// Perform expensive I/O operations
|
||||
|
||||
this.termsForDocs = currentIndex.getTermMetadata(arena, searchTerms.termIdsAll.array, resultIds);
|
||||
if (!budget.hasTimeLeft())
|
||||
throw new TimeoutException();
|
||||
this.documentSpans = currentIndex.getDocumentSpans(arena, resultIds);
|
||||
try {
|
||||
this.termsForDocs = currentIndex.getTermMetadata(arena, budget, searchTerms.termIdsAll.array, resultIds);
|
||||
this.documentSpans = currentIndex.getDocumentSpans(arena, budget, resultIds);
|
||||
}
|
||||
catch (TimeoutException ex) {
|
||||
arena.close();
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
public CodedSequence[] positions() {
|
||||
|
@@ -4,18 +4,31 @@ import nu.marginalia.array.DirectFileReader;
|
||||
import nu.marginalia.array.LongArray;
|
||||
import nu.marginalia.array.LongArrayFactory;
|
||||
import nu.marginalia.ffi.LinuxSystemCalls;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.foreign.Arena;
|
||||
import java.lang.foreign.MemorySegment;
|
||||
import java.lang.foreign.ValueLayout;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
public class NativeAlgosTest {
|
||||
Path testFile;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws IOException {
|
||||
testFile = Files.createTempFile("NativeAlgosTest", ".dat");
|
||||
}
|
||||
@AfterEach
|
||||
public void tearDown() throws IOException {
|
||||
Files.deleteIfExists(testFile);
|
||||
}
|
||||
@Test
|
||||
public void test() throws IOException {
|
||||
LongArray array = LongArrayFactory.mmapForWritingShared(Path.of("/tmp/test"), 1024);
|
||||
LongArray array = LongArrayFactory.mmapForWritingShared(testFile, 1024);
|
||||
for (int i = 0; i < 1024; i++) {
|
||||
array.set(i, i);
|
||||
}
|
||||
@@ -23,7 +36,7 @@ public class NativeAlgosTest {
|
||||
|
||||
var ms = Arena.global().allocate(512, 8);
|
||||
|
||||
int fd = LinuxSystemCalls.openDirect(Path.of("/tmp/test"));
|
||||
int fd = LinuxSystemCalls.openDirect(testFile);
|
||||
int ret = LinuxSystemCalls.readAt(fd, ms, 512);
|
||||
System.out.println(ret);
|
||||
System.out.println(ms.byteSize());
|
||||
@@ -38,14 +51,14 @@ public class NativeAlgosTest {
|
||||
|
||||
@Test
|
||||
void testDirectFileReader() throws IOException {
|
||||
LongArray array = LongArrayFactory.mmapForWritingShared(Path.of("/tmp/test"), 1024);
|
||||
LongArray array = LongArrayFactory.mmapForWritingShared(testFile, 1024);
|
||||
for (int i = 0; i < 1024; i++) {
|
||||
array.set(i, i);
|
||||
}
|
||||
array.close();
|
||||
|
||||
|
||||
try (var dfr = new DirectFileReader(Path.of("/tmp/test"))) {
|
||||
try (var dfr = new DirectFileReader(testFile)) {
|
||||
LongArray array2 = LongArrayFactory.onHeapConfined(64);
|
||||
dfr.readAligned(array2, 0);
|
||||
for (int i = 0; i < array2.size(); i++) {
|
||||
@@ -54,7 +67,7 @@ public class NativeAlgosTest {
|
||||
}
|
||||
|
||||
var alignedBuffer = Arena.ofAuto().allocate(4096, 4096);
|
||||
try (var dfr = new DirectFileReader(Path.of("/tmp/test"))) {
|
||||
try (var dfr = new DirectFileReader(testFile)) {
|
||||
MemorySegment dest = Arena.ofAuto().allocate(504, 1);
|
||||
dfr.readUnaligned(dest, alignedBuffer, 8);
|
||||
|
||||
|
@@ -12,6 +12,7 @@ import java.lang.foreign.SegmentAllocator;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class UringFileReader implements AutoCloseable {
|
||||
@@ -77,15 +78,54 @@ public class UringFileReader implements AutoCloseable {
|
||||
}
|
||||
}
|
||||
|
||||
public List<MemorySegment> readUnaligned(Arena arena, long[] offsets, int[] sizes, int blockSize) {
|
||||
if (direct) {
|
||||
return readUnalignedInDirectMode(arena, offsets, sizes, blockSize);
|
||||
} else {
|
||||
return readUnalignedInBufferedMode(arena, offsets, sizes);
|
||||
public void read(List<MemorySegment> destinations, List<Long> offsets, long timeoutMs) throws TimeoutException {
|
||||
if (destinations.size() < 5) {
|
||||
for (int i = 0; i < destinations.size(); i++) {
|
||||
var ms = destinations.get(i);
|
||||
long offset = offsets.get(i);
|
||||
|
||||
int ret;
|
||||
if (ms.byteSize() != (ret = LinuxSystemCalls.readAt(fd, ms, offset))) {
|
||||
throw new RuntimeException("Read failed, rv=" + ret + " at " + offset + " : " + ms.byteSize());
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
var ring = rings[(int) (ringIdx.getAndIncrement() % rings.length)];
|
||||
|
||||
if (destinations.size() <= QUEUE_SIZE) {
|
||||
int ret = ring.readBatch(destinations, offsets, timeoutMs, direct);
|
||||
if (ret != offsets.size()) {
|
||||
throw new RuntimeException("Read failed, rv=" + ret);
|
||||
}
|
||||
}
|
||||
else {
|
||||
long timeEnd = System.currentTimeMillis() + timeoutMs;
|
||||
for (int i = 0; i < destinations.size(); i+=QUEUE_SIZE) {
|
||||
long timeRemainingMs = timeEnd - System.currentTimeMillis();
|
||||
if (timeRemainingMs <= 0)
|
||||
throw new TimeoutException();
|
||||
|
||||
var destSlice = destinations.subList(i, Math.min(destinations.size(), i+QUEUE_SIZE));
|
||||
var offSlice = offsets.subList(i, Math.min(offsets.size(), i+QUEUE_SIZE));
|
||||
int ret = ring.readBatch(destSlice, offSlice, timeRemainingMs, direct);
|
||||
if (ret != offSlice.size()) {
|
||||
throw new RuntimeException("Read failed, rv=" + ret);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<MemorySegment> readUnalignedInBufferedMode(Arena arena, long[] offsets, int[] sizes) {
|
||||
|
||||
public List<MemorySegment> readUnaligned(Arena arena, long timeoutMs, long[] offsets, int[] sizes, int blockSize) throws TimeoutException {
|
||||
if (direct) {
|
||||
return readUnalignedInDirectMode(arena, timeoutMs, offsets, sizes, blockSize);
|
||||
} else {
|
||||
return readUnalignedInBufferedMode(arena, timeoutMs, offsets, sizes);
|
||||
}
|
||||
}
|
||||
|
||||
private List<MemorySegment> readUnalignedInBufferedMode(Arena arena, long timeoutMs, long[] offsets, int[] sizes) throws TimeoutException {
|
||||
int totalSize = 0;
|
||||
for (int size : sizes) {
|
||||
totalSize += size;
|
||||
@@ -101,7 +141,7 @@ public class UringFileReader implements AutoCloseable {
|
||||
offsetsList.add(offsets[i]);
|
||||
}
|
||||
|
||||
read(segmentsList, offsetsList);
|
||||
read(segmentsList, offsetsList, timeoutMs);
|
||||
|
||||
return segmentsList;
|
||||
}
|
||||
@@ -113,7 +153,7 @@ public class UringFileReader implements AutoCloseable {
|
||||
*
|
||||
* @return MemorySegment slices that contain only the requested data.
|
||||
*/
|
||||
public List<MemorySegment> readUnalignedInDirectMode(Arena arena, long[] offsets, int[] sizes, int blockSize) {
|
||||
public List<MemorySegment> readUnalignedInDirectMode(Arena arena, long timeoutMs, long[] offsets, int[] sizes, int blockSize) throws TimeoutException {
|
||||
|
||||
if (offsets.length < 1)
|
||||
return List.of();
|
||||
@@ -171,7 +211,7 @@ public class UringFileReader implements AutoCloseable {
|
||||
}
|
||||
|
||||
// Perform the read
|
||||
read(buffers, bufferOffsets);
|
||||
read(buffers, bufferOffsets, timeoutMs);
|
||||
|
||||
// Slice the big memory chunk into the requested slices
|
||||
List<MemorySegment> ret = new ArrayList<>(sizes.length);
|
||||
|
@@ -6,6 +6,7 @@ import java.lang.foreign.MemorySegment;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
@@ -23,6 +24,24 @@ public final class UringQueue {
|
||||
return IoUring.uringOpen(fd, size);
|
||||
}
|
||||
|
||||
public int readBatch(List<MemorySegment> dest, List<Long> offsets, long timeout, boolean direct)
|
||||
throws TimeoutException {
|
||||
try {
|
||||
if (!lock.tryLock(timeout, TimeUnit.MILLISECONDS))
|
||||
throw new TimeoutException();
|
||||
|
||||
try {
|
||||
return IoUring.uringReadBatch(fd, this, dest, offsets, direct);
|
||||
}
|
||||
finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
catch (InterruptedException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
public int readBatch(List<MemorySegment> dest, List<Long> offsets, boolean direct) {
|
||||
try {
|
||||
if (!lock.tryLock(10, TimeUnit.MILLISECONDS))
|
||||
|
@@ -14,6 +14,7 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class UringFileReaderTest {
|
||||
Path testFile;
|
||||
@@ -63,11 +64,12 @@ public class UringFileReaderTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void testUringFileReaderUnaligned() throws IOException {
|
||||
void testUringFileReaderUnaligned() throws IOException, TimeoutException {
|
||||
createTestFileWithLongs(65536);
|
||||
|
||||
try (var dfr = new UringFileReader(testFile, true)) {
|
||||
var ret = dfr.readUnalignedInDirectMode(Arena.ofAuto(),
|
||||
1000,
|
||||
new long[] { 10*8, 20*8, 5000*8, 5100*8},
|
||||
new int[] { 32*8, 10*8, 100*8, 100*8},
|
||||
4096);
|
||||
|
47
code/libraries/skiplist/build.gradle
Normal file
47
code/libraries/skiplist/build.gradle
Normal file
@@ -0,0 +1,47 @@
|
||||
plugins {
|
||||
id 'java'
|
||||
id "me.champeau.jmh" version "0.6.6"
|
||||
}
|
||||
|
||||
java {
|
||||
toolchain {
|
||||
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
|
||||
dependencies {
|
||||
implementation libs.bundles.slf4j
|
||||
|
||||
implementation libs.notnull
|
||||
implementation libs.commons.lang3
|
||||
implementation libs.fastutil
|
||||
implementation libs.lz4
|
||||
implementation libs.guava
|
||||
|
||||
implementation project(':code:libraries:native')
|
||||
implementation project(':code:libraries:array')
|
||||
|
||||
testImplementation libs.bundles.slf4j.test
|
||||
testImplementation libs.bundles.junit
|
||||
testImplementation libs.mockito
|
||||
|
||||
testImplementation project(':code:libraries:test-helpers')
|
||||
}
|
||||
|
||||
jmh {
|
||||
jvmArgs = [ "--enable-preview" ]
|
||||
}
|
||||
tasks.withType(me.champeau.jmh.WithJavaToolchain).configureEach {
|
||||
javaLauncher.set(javaToolchains.launcherFor {
|
||||
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
|
||||
})
|
||||
}
|
||||
tasks.withType(me.champeau.jmh.JmhBytecodeGeneratorTask).configureEach {
|
||||
jvmArgs = ["--enable-preview"]
|
||||
}
|
||||
test {
|
||||
useJUnitPlatform()
|
||||
}
|
@@ -47,6 +47,7 @@ include 'code:libraries:native'
|
||||
include 'code:libraries:coded-sequence'
|
||||
include 'code:libraries:geo-ip'
|
||||
include 'code:libraries:btree'
|
||||
include 'code:libraries:skiplist'
|
||||
include 'code:libraries:easy-lsh'
|
||||
include 'code:libraries:guarded-regex'
|
||||
include 'code:libraries:random-write-funnel'
|
||||
|
Reference in New Issue
Block a user