1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-05 21:22:39 +02:00

Compare commits

...

10 Commits

Author SHA1 Message Date
Viktor Lofgren
bc49406881 (build) Compatibility hack debian server 2025-08-11 23:26:53 +02:00
Viktor Lofgren
90325be447 (minor) Fix comments 2025-08-11 23:19:53 +02:00
Viktor Lofgren
dc89587af3 (index) Improve disk locality of the positions data 2025-08-11 21:17:12 +02:00
Viktor Lofgren
7b552afd6b (index) Improve disk locality of the positions data 2025-08-11 20:59:11 +02:00
Viktor Lofgren
73557edc67 (index) Improve disk locality of the positions data 2025-08-11 20:57:32 +02:00
Viktor Lofgren
83919e448a (index) Use O_DIRECT buffered reads for spans 2025-08-11 18:04:25 +02:00
Viktor Lofgren
6f5b75b84d (cleanup) Remove accidentally committed print stmt 2025-08-11 18:04:25 +02:00
Viktor Lofgren
db315e2813 (index) Use O_DIRECT position reads 2025-08-11 18:04:25 +02:00
Viktor Lofgren
e9977e08b7 (index) Block-align positions data
This will make reads more efficient, and possibly pave way for O_DIRECT reads of this data
2025-08-11 14:36:45 +02:00
Viktor Lofgren
1df3757e5f (native) Clean up io_uring code and check in execution queue, currently unused but nifty 2025-08-11 13:54:05 +02:00
21 changed files with 1035 additions and 146 deletions

View File

@@ -8,15 +8,14 @@ import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
public class IndexSpansReaderPlain implements IndexSpansReader {
private final UringFileReader urinReader;
private final UringFileReader uringReader;
public IndexSpansReaderPlain(Path spansFile) throws IOException {
urinReader = new UringFileReader(spansFile, false);
urinReader.fadviseWillneed();
uringReader = new UringFileReader(spansFile, true);
uringReader.fadviseWillneed();
}
@Override
@@ -51,39 +50,33 @@ public class IndexSpansReaderPlain implements IndexSpansReader {
@Override
public DocumentSpans[] readSpans(Arena arena, long[] encodedOffsets) {
long totalSize = 0;
int readCnt = 0;
for (long offset : encodedOffsets) {
if (offset < 0)
continue;
totalSize += SpansCodec.decodeSize(offset);
readCnt ++;
}
if (totalSize == 0) {
if (readCnt == 0) {
return new DocumentSpans[encodedOffsets.length];
}
MemorySegment segment = arena.allocate(totalSize, 8);
long[] offsets = new long[readCnt];
int[] sizes = new int[readCnt];
List<MemorySegment> buffers = new ArrayList<>(encodedOffsets.length);
List<Long> offsets = new ArrayList<>(encodedOffsets.length);
long bufferOffset = 0;
for (long offset : encodedOffsets) {
if (offset < 0)
for (int idx = 0, j = 0; idx < encodedOffsets.length; idx++) {
if (encodedOffsets[idx] < 0)
continue;
long offset = encodedOffsets[idx];
long size = SpansCodec.decodeSize(offset);
long start = SpansCodec.decodeStartOffset(offset);
buffers.add(segment.asSlice(bufferOffset, size));
offsets.add(start);
bufferOffset += size;
offsets[j] = SpansCodec.decodeStartOffset(offset);
sizes[j] = SpansCodec.decodeSize(offset);
j++;
}
DocumentSpans[] ret = new DocumentSpans[encodedOffsets.length];
List<MemorySegment> buffers = uringReader.readUnalignedInDirectMode(arena, offsets, sizes, 4096);
urinReader.read(buffers, offsets);
DocumentSpans[] ret = new DocumentSpans[encodedOffsets.length];
for (int idx = 0, j = 0; idx < encodedOffsets.length; idx++) {
if (encodedOffsets[idx] < 0)
@@ -96,7 +89,7 @@ public class IndexSpansReaderPlain implements IndexSpansReader {
@Override
public void close() throws IOException {
urinReader.close();
uringReader.close();
}
}

View File

@@ -26,8 +26,8 @@ public class SpansCodec {
return encoded >>> 28;
}
public static long decodeSize(long encoded) {
return encoded & 0x0FFF_FFFFL;
public static int decodeSize(long encoded) {
return (int) (encoded & 0x0FFF_FFFFL);
}
public static ByteBuffer createSpanFilesFooter(SpansCodecVersion version, int padSize) {

View File

@@ -0,0 +1,262 @@
package nu.marginalia.index.perftest;
import nu.marginalia.ffi.LinuxSystemCalls;
import nu.marginalia.uring.UringFileReader;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.stream.LongStream;
public class IoPatternsMain {
static void testBuffered(int sz, int small, int large, int iters) {
try {
Path largeFile = Path.of("/home/vlofgren/largefile.dat");
long fileSize = Files.size(largeFile);
Random r = new Random();
List<MemorySegment> segments = new ArrayList<>();
for (int i = 0; i < sz; i++) {
if (small == large) {
segments.add(Arena.ofAuto().allocate(small));
}
else {
segments.add(Arena.ofAuto().allocate(r.nextInt(small, large)));
}
}
List<Long> offsets = new ArrayList<>();
long[] samples = new long[1000];
int si = 0;
try (UringFileReader reader = new UringFileReader(largeFile, false)) {
for (int iter = 0; iter < iters; ) {
if (si == samples.length) {
Arrays.sort(samples);
double p1 = samples[10] / 1_000.;
double p10 = samples[100] / 1_000.;
double p90 = samples[900] / 1_000.;
double p99 = samples[990] / 1_000.;
double avg = LongStream.of(samples).average().getAsDouble() / 1000.;
System.out.println("B"+"\t"+avg+"\t"+p1 + " " + p10 + " " + p90 + " " + p99);
si = 0;
iter++;
}
offsets.clear();
for (int i = 0; i < sz; i++) {
offsets.add(r.nextLong(0, fileSize - 256));
}
long st = System.nanoTime();
reader.read(segments, offsets);
long et = System.nanoTime();
samples[si++] = et - st;
}
}
}
catch (IOException e) {
e.printStackTrace();
}
}
static void testBufferedPread(int sz, int iters) {
try {
Path largeFile = Path.of("/home/vlofgren/largefile.dat");
long fileSize = Files.size(largeFile);
Random r = new Random();
List<MemorySegment> segments = new ArrayList<>();
for (int i = 0; i < sz; i++) {
segments.add(Arena.ofAuto().allocate(r.nextInt(24, 256)));
}
List<Long> offsets = new ArrayList<>();
long[] samples = new long[1000];
int si = 0;
int fd = -1;
try {
fd = LinuxSystemCalls.openBuffered(largeFile);
LinuxSystemCalls.fadviseRandom(fd);
for (int iter = 0; iter < iters; ) {
if (si == samples.length) {
Arrays.sort(samples);
double p1 = samples[10] / 1_000.;
double p10 = samples[100] / 1_000.;
double p90 = samples[900] / 1_000.;
double p99 = samples[990] / 1_000.;
double avg = LongStream.of(samples).average().getAsDouble() / 1000.;
System.out.println("BP"+"\t"+avg+"\t"+p1 + " " + p10 + " " + p90 + " " + p99);
si = 0;
iter++;
}
offsets.clear();
for (int i = 0; i < sz; i++) {
offsets.add(r.nextLong(0, fileSize - 256));
}
long st = System.nanoTime();
for (int i = 0; i < sz; i++) {
LinuxSystemCalls.readAt(fd, segments.get(i), offsets.get(i));
}
long et = System.nanoTime();
samples[si++] = et - st;
}
}
finally {
LinuxSystemCalls.closeFd(fd);
}
}
catch (IOException e) {
e.printStackTrace();
}
}
static void testDirect(int blockSize, int sz, int iters) {
try {
Path largeFile = Path.of("/home/vlofgren/largefile.dat");
int fileSizeBlocks = (int) ((Files.size(largeFile) & -blockSize) / blockSize);
Random r = new Random();
List<MemorySegment> segments = new ArrayList<>();
for (int i = 0; i < sz; i++) {
segments.add(Arena.ofAuto().allocate(blockSize, blockSize));
}
List<Long> offsets = new ArrayList<>();
long[] samples = new long[1000];
int si = 0;
try (UringFileReader reader = new UringFileReader(largeFile, true)) {
for (int iter = 0; iter < iters; ) {
if (si == samples.length) {
Arrays.sort(samples);
double p1 = samples[10] / 1_000.;
double p10 = samples[100] / 1_000.;
double p90 = samples[900] / 1_000.;
double p99 = samples[990] / 1_000.;
double avg = LongStream.of(samples).average().getAsDouble() / 1000.;
System.out.println("DN"+blockSize+"\t"+avg+"\t"+p1 + " " + p10 + " " + p90 + " " + p99);
si = 0;
iters++;
}
offsets.clear();
for (int i = 0; i < sz; i++) {
offsets.add(blockSize * r.nextLong(0, fileSizeBlocks));
}
long st = System.nanoTime();
reader.read(segments, offsets);
long et = System.nanoTime();
samples[si++] = et - st;
}
}
}
catch (IOException e) {
e.printStackTrace();
}
}
static void testDirect1(int blockSize, int iters) {
try {
Path largeFile = Path.of("/home/vlofgren/largefile.dat");
int fileSizeBlocks = (int) ((Files.size(largeFile) & -blockSize) / blockSize);
Random r = new Random();
MemorySegment segment = Arena.global().allocate(blockSize, blockSize);
long[] samples = new long[1000];
int si = 0;
int fd = LinuxSystemCalls.openDirect(largeFile);
if (fd < 0) {
throw new IOException("open failed");
}
try {
for (int iter = 0; iter < iters; ) {
if (si == samples.length) {
Arrays.sort(samples);
double p1 = samples[10] / 1_000.;
double p10 = samples[100] / 1_000.;
double p90 = samples[900] / 1_000.;
double p99 = samples[990] / 1_000.;
double avg = LongStream.of(samples).average().getAsDouble() / 1000.;
System.out.println("D1"+blockSize+"\t"+avg+"\t"+p1 + " " + p10 + " " + p90 + " " + p99);
si = 0;
iters++;
}
long st = System.nanoTime();
int ret;
long readOffset = blockSize * r.nextLong(0, fileSizeBlocks);
if (blockSize != (ret = LinuxSystemCalls.readAt(fd, segment, readOffset))) {
throw new IOException("pread failed: " + ret);
}
long et = System.nanoTime();
samples[si++] = et - st;
}
}
finally {
LinuxSystemCalls.closeFd(fd);
}
}
catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
// Thread.ofPlatform().start(() -> testBuffered(128, 32, 65536,1000));
Thread.ofPlatform().start(() -> testDirect(8192*4, 128,1000));
// Thread.ofPlatform().start(() -> testBuffered(128, 1000));
// Thread.ofPlatform().start(() -> testBuffered(128, 1000));
// Thread.ofPlatform().start(() -> testBuffered(128, 1000));
// Thread.ofPlatform().start(() -> testBufferedPread(128, 1000));
// Thread.ofPlatform().start(() -> testDirect1(1024, 1000));
// Thread.ofPlatform().start(() -> testDirect1(1024, 1000));
// Thread.ofPlatform().start(() -> testDirect1(1024, 1000));
// Thread.ofPlatform().start(() -> testDirect1(1024*1024, 1000));
// Thread.ofPlatform().start(() -> testDirect1(1024*1024, 1000));
// Thread.ofPlatform().start(() -> testDirect(512, 512,1000));
// Thread.ofPlatform().start(() -> testDirect(512, 512,1000));
// Thread.ofPlatform().start(() -> testDirect(512, 512,1000));
// Thread.ofPlatform().start(() -> testDirect(512, 100));
// Thread.ofPlatform().start(() -> testDirect(512, 100));
// Thread.ofPlatform().start(() -> testDirect(512, 100));
// Thread.ofPlatform().start(() -> testDirect(512, 100));
// Thread.ofPlatform().start(() -> testBuffered(512, 1000));
// Thread.ofPlatform().start(() -> testBuffered(512, 1000));
// Thread.ofPlatform().start(() -> testBuffered(512, 1000));
// Thread.ofPlatform().start(() -> testBuffered(512, 1000));
// Thread.ofPlatform().start(() -> testBuffered(100));
// Thread.ofPlatform().start(() -> testBuffered(100));
for (;;);
// testBuffered(100);
}
}

View File

@@ -14,70 +14,103 @@ import java.nio.file.StandardOpenOption;
*
* The positions data is concatenated in the file, with each term's metadata
* followed by its positions. The metadata is a single byte, and the positions
* are encoded using the Elias Gamma code, with zero padded bits at the end to
* get octet alignment.
*
* are encoded varints.
* <p></p>
*
* It is the responsibility of the caller to keep track of the byte offset of
* each posting in the file.
*/
public class PositionsFileConstructor implements AutoCloseable {
private final ByteBuffer workBuffer = ByteBuffer.allocate(65536);
private final Path file;
private final FileChannel channel;
private long offset;
public PositionsFileConstructor(Path file) throws IOException {
this.file = file;
channel = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
}
/** Represents a block of positions lists. Each writer thread should hold on to
* a block object to ensure the locality of its positions lists.
* When finished, commit() must be run.
* */
public class PositionsFileBlock {
private final ByteBuffer workBuffer = ByteBuffer.allocate(1024*1024*16);
private long position;
public PositionsFileBlock(long position) {
this.position = position;
}
public boolean fitsData(int size) {
return workBuffer.remaining() >= size;
}
public void commit() throws IOException {
workBuffer.position(0);
workBuffer.limit(workBuffer.capacity());
int pos = 0;
while (workBuffer.hasRemaining()) {
pos += channel.write(workBuffer, this.position + pos + workBuffer.position());
}
}
private void relocate() throws IOException {
workBuffer.clear();
position = channel.position();
while (workBuffer.hasRemaining()) {
channel.write(workBuffer);
}
workBuffer.clear();
}
public long position() {
return this.position + workBuffer.position();
}
public void put(byte b) {
workBuffer.put(b);
}
public void put(ByteBuffer buffer) {
workBuffer.put(buffer);
}
}
public PositionsFileBlock getBlock() throws IOException {
synchronized (this) {
var block = new PositionsFileBlock(channel.position());
block.relocate();
return block;
}
}
/** Add a term to the positions file
*
* @param block a block token to ensure data locality
* @param termMeta the term metadata
* @param positionsBuffer the positions of the term
*
* @return the offset of the term in the file, with the size of the data in the highest byte
*/
public long add(byte termMeta, ByteBuffer positionsBuffer) throws IOException {
synchronized (file) {
int size = 1 + positionsBuffer.remaining();
public long add(PositionsFileBlock block, byte termMeta, ByteBuffer positionsBuffer) throws IOException {
int size = 1 + positionsBuffer.remaining();
if (workBuffer.remaining() < size) {
workBuffer.flip();
channel.write(workBuffer);
workBuffer.clear();
if (!block.fitsData(size)) {
synchronized (this) {
block.commit();
block.relocate();
}
}
synchronized (file) {
long offset = block.position();
workBuffer.put(termMeta);
workBuffer.put(positionsBuffer);
block.put(termMeta);
block.put(positionsBuffer);
long ret = PositionCodec.encode(size, offset);
offset += size;
return ret;
return PositionCodec.encode(size, offset);
}
}
public void close() throws IOException {
if (workBuffer.hasRemaining()) {
workBuffer.flip();
while (workBuffer.hasRemaining())
channel.write(workBuffer);
}
long remainingBlockSize = 4096 - (channel.position() & -4096);
if (remainingBlockSize != 0) {
workBuffer.position(0);
workBuffer.limit(0);
while (workBuffer.hasRemaining())
channel.write(workBuffer);
}
channel.force(false);
channel.close();
}

View File

@@ -79,6 +79,8 @@ public class FullPreindexDocuments {
var offsetMap = segments.asMap(RECORD_SIZE_LONGS);
offsetMap.defaultReturnValue(0);
var positionsBlock = positionsFileConstructor.getBlock();
while (docIds.hasRemaining()) {
long docId = docIds.get();
long rankEncodedId = docIdRewriter.rewriteDocId(docId);
@@ -94,12 +96,13 @@ public class FullPreindexDocuments {
ByteBuffer pos = tPos.get(i);
long offset = offsetMap.addTo(termId, RECORD_SIZE_LONGS);
long encodedPosOffset = positionsFileConstructor.add(meta, pos);
long encodedPosOffset = positionsFileConstructor.add(positionsBlock, meta, pos);
assembly.put(offset + 0, rankEncodedId);
assembly.put(offset + 1, encodedPosOffset);
}
}
positionsBlock.commit();
assembly.write(docsFile);
}

View File

@@ -7,8 +7,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
/** Reads positions data from the positions file */
@@ -18,7 +18,10 @@ public class PositionsFileReader implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(PositionsFileReader.class);
public PositionsFileReader(Path positionsFile) throws IOException {
uringFileReader = new UringFileReader(positionsFile, false);
if ((Files.size(positionsFile) & 4095) != 0) {
throw new IllegalArgumentException("Positions file is not block aligned in size: " + Files.size(positionsFile));
}
uringFileReader = new UringFileReader(positionsFile, true);
}
@Override
@@ -29,38 +32,34 @@ public class PositionsFileReader implements AutoCloseable {
/** Get the positions for a keywords in the index, as pointed out by the encoded offsets;
* intermediate buffers are allocated from the provided arena allocator. */
public TermData[] getTermData(Arena arena, long[] offsets) {
int cnt = 0;
for (int i = 0; i < offsets.length; i++) {
long encodedOffset = offsets[i];
if (encodedOffset == 0) continue;
cnt++;
}
if (cnt == 0) {
return new TermData[offsets.length];
}
long[] readOffsets = new long[cnt];
int[] readSizes = new int[cnt];
for (int i = 0, j = 0; i < offsets.length; i++) {
long encodedOffset = offsets[i];
if (encodedOffset == 0) continue;
readSizes[j] = PositionCodec.decodeSize(encodedOffset);
readOffsets[j] = PositionCodec.decodeOffset(encodedOffset);
j++;
}
List<MemorySegment> buffers = uringFileReader.readUnalignedInDirectMode(arena, readOffsets, readSizes, 4096);
TermData[] ret = new TermData[offsets.length];
int sizeTotal = 0;
for (int i = 0; i < offsets.length; i++) {
long encodedOffset = offsets[i];
if (encodedOffset == 0) continue;
sizeTotal += PositionCodec.decodeSize(encodedOffset);
}
if (sizeTotal == 0)
return ret;
MemorySegment segment = arena.allocate(sizeTotal, 512);
List<MemorySegment> buffers = new ArrayList<>(offsets.length);
List<Long> readOffsets = new ArrayList<>(offsets.length);
int bufOffset = 0;
for (int i = 0; i < offsets.length; i++) {
long encodedOffset = offsets[i];
if (encodedOffset == 0) continue;
int length = PositionCodec.decodeSize(encodedOffset);
long offset = PositionCodec.decodeOffset(encodedOffset);
buffers.add(segment.asSlice(bufOffset, length));
readOffsets.add(offset);
bufOffset+=length;
}
uringFileReader.read(buffers, readOffsets);
for (int i = 0, j=0; i < offsets.length; i++) {
long encodedOffset = offsets[i];
if (encodedOffset == 0) continue;

View File

@@ -33,9 +33,11 @@ class PositionsFileReaderTest {
void getTermData() throws IOException {
long key1, key2, key3;
try (PositionsFileConstructor constructor = new PositionsFileConstructor(file)) {
key1 = constructor.add((byte) 43, VarintCodedSequence.generate(1, 2, 3).buffer());
key2 = constructor.add((byte) 51, VarintCodedSequence.generate(2, 3, 5, 1000, 5000, 20241).buffer());
key3 = constructor.add((byte) 61, VarintCodedSequence.generate(3, 5, 7).buffer());
var block = constructor.getBlock();
key1 = constructor.add(block, (byte) 43, VarintCodedSequence.generate(1, 2, 3).buffer());
key2 = constructor.add(block, (byte) 51, VarintCodedSequence.generate(2, 3, 5, 1000, 5000, 20241).buffer());
key3 = constructor.add(block, (byte) 61, VarintCodedSequence.generate(3, 5, 7).buffer());
block.commit();
}
System.out.println("key1: " + Long.toHexString(key1));

View File

@@ -4,7 +4,6 @@ import nu.marginalia.array.DirectFileReader;
import nu.marginalia.array.LongArray;
import nu.marginalia.array.LongArrayFactory;
import nu.marginalia.ffi.LinuxSystemCalls;
import nu.marginalia.uring.UringFileReader;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -12,7 +11,6 @@ import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.file.Path;
import java.util.List;
public class NativeAlgosTest {
@Test
@@ -71,28 +69,4 @@ public class NativeAlgosTest {
}
}
@Test
void testAioFileReader() throws IOException {
LongArray array = LongArrayFactory.mmapForWritingShared(Path.of("/tmp/test"), 1024);
for (int i = 0; i < 1024; i++) {
array.set(i, i);
}
array.close();
try (var dfr = new UringFileReader(Path.of("/tmp/test"), false)) {
MemorySegment buf1 = Arena.ofAuto().allocate(32, 8);
MemorySegment buf2 = Arena.ofAuto().allocate(16, 8);
dfr.read(List.of(buf1, buf2), List.of(0L, 8L));
for (int i = 0; i < buf1.byteSize(); i+=8) {
System.out.println(buf1.get(ValueLayout.JAVA_LONG, i));
}
for (int i = 0; i < buf2.byteSize(); i+=8) {
System.out.println(buf2.get(ValueLayout.JAVA_LONG, i));
}
}
}
}

View File

@@ -1,15 +1,20 @@
#!/usr/bin/env sh
CXXFLAGS=-O3 -march=native -std=c++14 -fPIC `pkg-config --cflags liburing`
LDFLAGS=`pkg-config --libs liburing` -shared
LDFLAGS=
# Weird hack to get liburing to link on one particular debian server
LIBURING_PATH=`pkg-config liburing --keep-system-libs --libs-only-L | cut -c 3- | tr -d \ `/liburing.so
CXX=c++
SOURCES=src/sort.cc src/unix.cc src/uring.cc
all: resources/libcpp.so resources/liburing.so
resources/libcpp.so:
${CXX} -shared ${LDFLAGS} ${CXXFLAGS} ${SOURCES} -o resources/libcpp.so
all: resources/libcpp.so
resources/libcpp.so: ${SOURCES} resources/liburing.so
${CXX} -shared ${CXXFLAGS} ${SOURCES} resources/liburing.so -o resources/libcpp.so
resources/liburing.so:
cp /usr/lib/liburing.so resources/
cp ${LIBURING_PATH} resources/liburing.so
clean:
rm -rf resources/{libcpp,liburing}.so

View File

@@ -10,6 +10,7 @@ java {
dependencies {
implementation libs.bundles.slf4j
implementation libs.fastutil
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit

View File

@@ -0,0 +1,6 @@
package nu.marginalia.asyncio;
import java.lang.foreign.MemorySegment;
public record AsyncReadRequest(int fd, MemorySegment destination, long offset) {
}

View File

@@ -0,0 +1,55 @@
package nu.marginalia.asyncio;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
final class SubmittedReadRequest<T> {
public final long id;
private final T context;
private final List<AsyncReadRequest> requests;
private final CompletableFuture<T> future;
private int count;
private volatile boolean success = true;
SubmittedReadRequest(T context, List<AsyncReadRequest> requests, CompletableFuture<T> future, long id) {
this.context = context;
this.requests = requests;
this.future = future;
this.id = id;
this.count = requests.size();
}
public List<AsyncReadRequest> getRequests() {
return requests;
}
public int count() {
return count;
}
public void canNotFinish() {
success = false;
count = 0;
future.completeExceptionally(new IOException());
}
public boolean partFinished(boolean successfully) {
if (!successfully) {
success = false;
}
if (--count == 0) {
if (success) {
future.complete(context);
} else {
future.completeExceptionally(new IOException());
}
return true;
}
return false;
}
}

View File

@@ -0,0 +1,243 @@
package nu.marginalia.asyncio;
import nu.marginalia.ffi.IoUring;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import static java.lang.foreign.ValueLayout.*;
public class UringExecutionQueue implements AutoCloseable {
private static final IoUring ioUringInstance = IoUring.instance();
private final AtomicLong requestIdCounter = new AtomicLong(1);
private final int queueSize;
private final Thread executor;
private volatile boolean running = true;
private final MemorySegment uringQueue;
private final ArrayBlockingQueue<SubmittedReadRequest<? extends Object>> inputQueue;
public UringExecutionQueue(int queueSize) throws Throwable {
this.inputQueue = new ArrayBlockingQueue<>(queueSize, false);
this.queueSize = queueSize;
this.uringQueue = (MemorySegment) ioUringInstance.uringInit.invoke(queueSize);
executor = Thread.ofPlatform().daemon().start(this::executionPipe);
}
public void close() throws InterruptedException {
running = false;
executor.join();
try {
ioUringInstance.uringClose.invoke(uringQueue);
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
public <T> CompletableFuture<T> submit(T context, List<AsyncReadRequest> relatedRequests) throws InterruptedException {
if (relatedRequests.size() > queueSize) {
throw new IllegalArgumentException("Request batches may not exceed the queue size!");
}
long id = requestIdCounter.incrementAndGet();
CompletableFuture<T> future = new CompletableFuture<>();
inputQueue.put(new SubmittedReadRequest<>(context, relatedRequests, future, id));
return future;
}
static class UringDispatcher implements AutoCloseable {
private final Arena arena;
private final MemorySegment returnResultIds;
private final MemorySegment readBatchIds;
private final MemorySegment readFds;
private final MemorySegment readBuffers;
private final MemorySegment readSizes;
private final MemorySegment readOffsets;
private final MemorySegment uringQueue;
private int requestsToSend = 0;
UringDispatcher(int queueSize, MemorySegment uringQueue) {
this.uringQueue = uringQueue;
this.arena = Arena.ofConfined();
returnResultIds = arena.allocate(JAVA_LONG, queueSize);
readBatchIds = arena.allocate(JAVA_LONG, queueSize);
readFds = arena.allocate(JAVA_INT, queueSize);
readBuffers = arena.allocate(ADDRESS, queueSize);
readSizes = arena.allocate(JAVA_INT, queueSize);
readOffsets = arena.allocate(JAVA_LONG, queueSize);
}
void prepareRead(int fd, long batchId, MemorySegment segment, int size, long offset) {
readFds.setAtIndex(JAVA_INT, requestsToSend, fd);
readBuffers.setAtIndex(ADDRESS, requestsToSend, segment);
readBatchIds.setAtIndex(JAVA_LONG, requestsToSend, batchId);
readSizes.setAtIndex(JAVA_INT, requestsToSend, size);
readOffsets.setAtIndex(JAVA_LONG, requestsToSend, offset);
requestsToSend++;
}
long[] poll() {
try {
// Dispatch call
int result = (Integer) IoUring.instance.uringJustPoll.invoke(uringQueue, returnResultIds);
if (result < 0) {
throw new IOException("Error in io_uring");
}
else {
long[] ret = new long[result];
for (int i = 0; i < result; i++) {
ret[i] = returnResultIds.getAtIndex(JAVA_LONG, i);
}
return ret;
}
}
catch (Throwable e) {
throw new RuntimeException(e);
}
finally {
requestsToSend = 0;
}
}
long[] dispatchRead(int ongoingRequests) throws IOException {
try {
// Dispatch call
int result = (Integer) IoUring.instance.uringReadAndPoll.invoke(
uringQueue,
returnResultIds,
ongoingRequests,
requestsToSend,
readBatchIds,
readFds,
readBuffers,
readSizes,
readOffsets
);
if (result < 0) {
throw new IOException("Error in io_uring");
}
else {
long[] ret = new long[result];
for (int i = 0; i < result; i++) {
ret[i] = returnResultIds.getAtIndex(JAVA_LONG, i);
}
return ret;
}
}
catch (Throwable e) {
throw new RuntimeException(e);
}
finally {
requestsToSend = 0;
}
}
int getRequestsToSend() {
return requestsToSend;
}
public void close() {
arena.close();
}
}
public void executionPipe() {
try (var uringDispatcher = new UringDispatcher(queueSize, uringQueue)) {
int ongoingRequests = 0;
// recycle between iterations to avoid allocation churn
List<SubmittedReadRequest<?>> batchesToSend = new ArrayList<>();
Map<Long, SubmittedReadRequest<?>> requestsToId = new HashMap<>();
while (running) {
batchesToSend.clear();
// if (inputQueue.isEmpty() && ongoingRequests == 0) {
// LockSupport.parkNanos(10_000);
// continue;
// }
int remainingRequests = queueSize - ongoingRequests;
SubmittedReadRequest<?> request;
// Find batches to send that will not exceed the queue size
while ((request = inputQueue.peek()) != null) {
if (remainingRequests >= request.count()) {
remainingRequests -= request.count();
inputQueue.poll();
batchesToSend.add(request);
}
else {
break;
}
}
// Arrange requests from the batches into arrays to send to FFI call
int requestsToSend = 0;
for (var batch : batchesToSend) {
requestsToId.put(batch.id, batch);
for (var read : batch.getRequests()) {
uringDispatcher.prepareRead(read.fd(), batch.id, read.destination(), (int) read.destination().byteSize(), read.offset());
}
}
try {
ongoingRequests += uringDispatcher.getRequestsToSend();
long[] results;
if (uringDispatcher.getRequestsToSend() > 0) {
results = uringDispatcher.dispatchRead(ongoingRequests);
}
else {
results = uringDispatcher.poll();
}
for (long id : results) {
requestsToId.computeIfPresent(Math.abs(id), (_, req) -> {
if (req.partFinished(id > 0)) {
return null;
} else {
return req;
}
});
ongoingRequests--;
}
}
catch (IOException ex) {
ongoingRequests -= requestsToSend;
batchesToSend.forEach(req -> {
req.canNotFinish();
requestsToId.remove(req.id);
});
}
catch (Throwable ex) {
throw new RuntimeException(ex);
}
}
}
}
}

View File

@@ -23,17 +23,19 @@ import static java.lang.foreign.ValueLayout.*;
* */
@SuppressWarnings("preview")
public class IoUring {
private final MethodHandle uringInit;
private final MethodHandle uringClose;
public final MethodHandle uringInit;
public final MethodHandle uringClose;
private final MethodHandle uringReadBuffered;
private final MethodHandle uringReadDirect;
public final MethodHandle uringReadAndPoll;
public final MethodHandle uringJustPoll;
public static final IoUring instance;
/** Indicates whether the native implementations are available */
public static final boolean isAvailable;
private static final Logger logger = LoggerFactory.getLogger(LinuxSystemCalls.class);
private static final Logger logger = LoggerFactory.getLogger(IoUring.class);
private IoUring(Path libFile) {
SymbolLookup libraryLookup = SymbolLookup.libraryLookup(libFile, Arena.global());
@@ -44,6 +46,28 @@ public class IoUring {
handle = libraryLookup.findOrThrow("uring_read_direct");
uringReadDirect = nativeLinker.downcallHandle(handle, FunctionDescriptor.of(JAVA_INT, JAVA_INT, ADDRESS, JAVA_INT, ADDRESS, ADDRESS, ADDRESS));
handle = libraryLookup.findOrThrow("uring_read_submit_and_poll");
uringReadAndPoll = nativeLinker.downcallHandle(handle, FunctionDescriptor.of(
JAVA_INT,
ADDRESS, // io_uring* ring
ADDRESS, // long* result_ids
JAVA_INT, // int in_flight_requests
JAVA_INT, // int read_count
ADDRESS, // long* read_batch_ids
ADDRESS, // int* read_fds
ADDRESS, // void** read_buffers
ADDRESS, // unsigned int** read_sizes
ADDRESS // long* read_offsets
));
handle = libraryLookup.findOrThrow("uring_poll");
uringJustPoll = nativeLinker.downcallHandle(handle, FunctionDescriptor.of(
JAVA_INT,
ADDRESS, // io_uring* ring
ADDRESS // long* result_ids
));
handle = libraryLookup.findOrThrow("initialize_uring");
uringInit = nativeLinker.downcallHandle(handle, FunctionDescriptor.of(ADDRESS, JAVA_INT));
@@ -53,9 +77,9 @@ public class IoUring {
static {
Path libFile;
IoUring nativeAlgosI = null;
IoUring ioUringI = null;
// copy resource to temp file so it can be loaded
try (var is = NativeAlgos.class.getClassLoader().getResourceAsStream("liburing.so")) {
try (var is = IoUring.class.getClassLoader().getResourceAsStream("liburing.so")) {
var tempFile = File.createTempFile("liburing", ".so");
tempFile.deleteOnExit();
@@ -70,7 +94,7 @@ public class IoUring {
logger.info("Failed to load native library, likely not built", e);
}
try (var is = NativeAlgos.class.getClassLoader().getResourceAsStream("libcpp.so")) {
try (var is = IoUring.class.getClassLoader().getResourceAsStream("libcpp.so")) {
var tempFile = File.createTempFile("libcpp", ".so");
tempFile.deleteOnExit();
@@ -80,16 +104,20 @@ public class IoUring {
}
libFile = tempFile.toPath();
nativeAlgosI = new IoUring(libFile);
ioUringI = new IoUring(libFile);
}
catch (Exception e) {
logger.info("Failed to load native library, likely not built", e);
}
instance = nativeAlgosI;
instance = ioUringI;
isAvailable = instance != null;
}
public static IoUring instance() {
return instance;
}
public static UringQueue uringOpen(int fd, int queueSize) {
try {
return new UringQueue((MemorySegment) instance.uringInit.invoke(queueSize), fd);

View File

@@ -1,10 +1,15 @@
package nu.marginalia.uring;
import it.unimi.dsi.fastutil.longs.Long2IntAVLTreeMap;
import it.unimi.dsi.fastutil.longs.LongAVLTreeSet;
import it.unimi.dsi.fastutil.longs.LongIterator;
import nu.marginalia.ffi.LinuxSystemCalls;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -46,7 +51,7 @@ public class UringFileReader implements AutoCloseable {
int ret;
if (ms.byteSize() != (ret = LinuxSystemCalls.readAt(fd, ms, offset))) {
throw new RuntimeException("Read failed, rv=" + ret);
throw new RuntimeException("Read failed, rv=" + ret + " at " + offset + " : " + ms.byteSize());
}
}
return;
@@ -54,7 +59,7 @@ public class UringFileReader implements AutoCloseable {
var ring = rings[(int) (ringIdx.getAndIncrement() % rings.length)];
if (destinations.size() <= QUEUE_SIZE) {
int ret = ring.read(destinations, offsets, direct);
int ret = ring.readBatch(destinations, offsets, direct);
if (ret != offsets.size()) {
throw new RuntimeException("Read failed, rv=" + ret);
}
@@ -66,6 +71,87 @@ public class UringFileReader implements AutoCloseable {
}
}
/** This function takes a list of offsets and sizes, and translates them to a minium of blockSize'd O_DIRECT
* reads. A single buffer will be allocated to hold all the data, to encourage HugePages allocation and
* reduce TLB thrashing. It is still generally helpful for performance if the data is at least best-effort
* block aligned.
*
* @return MemorySegment slices that contain only the requested data.
*/
public List<MemorySegment> readUnalignedInDirectMode(Arena arena, long[] offsets, int[] sizes, int blockSize) {
if (offsets.length < 1)
return List.of();
if (offsets.length != sizes.length) throw new IllegalArgumentException("Offsets and Sizes arrays don't match!");
if ((blockSize & 511) != 0) throw new IllegalArgumentException("Block size must be a multiple of 512");
// First we work out which blocks we need to read, and how many they are
final LongAVLTreeSet neededBlocks = new LongAVLTreeSet();
for (int i = 0; i < offsets.length; i++) {
for (long block = offsets[i] & -blockSize;
block <= ((offsets[i] + sizes[i]) & -blockSize);
block+=blockSize)
{
neededBlocks.add(block);
}
}
MemorySegment allMemory = arena.allocate((long) blockSize * neededBlocks.size(), blockSize);
List<MemorySegment> buffers = new ArrayList<>(sizes.length);
List<Long> bufferOffsets = new ArrayList<>(sizes.length);
final Long2IntAVLTreeMap blockToIdx = new Long2IntAVLTreeMap();
LongIterator neededBlockIterator = neededBlocks.longIterator();
long runStart = -1;
long runCurrent = -1;
long sliceOffset = 0;
for (;;) {
long nextBlock = neededBlockIterator.nextLong();
blockToIdx.put(nextBlock, blockToIdx.size());
if (runStart < 0) runStart = nextBlock;
else if (runCurrent + blockSize != nextBlock) {
int bufferSize = (int) (blockSize + runCurrent - runStart);
bufferOffsets.add(runStart);
buffers.add(allMemory.asSlice(sliceOffset, bufferSize));
sliceOffset += bufferSize;
runStart = nextBlock;
}
runCurrent = nextBlock;
if (!neededBlockIterator.hasNext()) {
// If this is the last value, we need to wrap up the final run
int bufferSize = (int) (blockSize + runCurrent - runStart);
bufferOffsets.add(runStart);
buffers.add(allMemory.asSlice(sliceOffset, bufferSize));
break;
}
}
// Perform the read
read(buffers, bufferOffsets);
// Slice the big memory chunk into the requested slices
List<MemorySegment> ret = new ArrayList<>(sizes.length);
for (int i = 0; i < offsets.length; i++) {
long offset = offsets[i];
int size = sizes[i];
long startBlock = (long) blockSize * blockToIdx.get(offset & -blockSize);
long blockOffset = offset & (blockSize - 1);
ret.add(allMemory.asSlice(startBlock + blockOffset, size));
}
return ret;
}
public void close() {
for (var ring : rings) {
ring.close();

View File

@@ -23,7 +23,7 @@ public final class UringQueue {
return IoUring.uringOpen(fd, size);
}
public int read(List<MemorySegment> dest, List<Long> offsets, boolean direct) {
public int readBatch(List<MemorySegment> dest, List<Long> offsets, boolean direct) {
try {
if (!lock.tryLock(10, TimeUnit.MILLISECONDS))
throw new RuntimeException("io_uring slow, likely backpressure!");

View File

@@ -1,8 +1,14 @@
# LongArray C++ Helpers
# Native C++ Helpers
This package contains helper functions for working with LongArray objects,
as native C++ calls. The helpers are only built on Linux, and if they are absent,
Java substitutes should be used instead.
This package contains helper functions for calling native functions.
### Systems Programming Helpers
TBW
### Long Array Helpers.
The helpers are only built on Linux, and if they are absent, Java substitutes should be used instead.
Library loading and access is available through the
[NativeAlgos](java/nu/marginalia/NativeAlgos.java) class.

View File

@@ -31,6 +31,81 @@ void close_uring(io_uring* ring) {
free(ring);
}
int uring_read_submit_and_poll(
io_uring* ring,
long* result_ids,
int in_flight_requests,
int read_count,
long* read_batch_ids,
int* read_fds,
void** read_buffers,
unsigned int* read_sizes,
long* read_offsets)
{
for (int i = 0; i < read_count; i++) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (!sqe) {
fprintf(stderr, "uring_queue full!");
return -1;
}
io_uring_prep_read(sqe, read_fds[i], read_buffers[i], read_sizes[i], read_offsets[i]);
io_uring_sqe_set_data(sqe, (void*) read_batch_ids[i]);
}
int wait_cnt = 8;
if (wait_cnt > in_flight_requests) {
wait_cnt = in_flight_requests;
}
int submitted = io_uring_submit_and_wait(ring, wait_cnt);
if (submitted != read_count) {
if (submitted < 0) {
fprintf(stderr, "io_uring_submit %s\n", strerror(-submitted));
}
else {
fprintf(stderr, "io_uring_submit(): submitted != %d, was %d", read_count, submitted);
}
return -1;
}
int completed = 0;
struct io_uring_cqe *cqe;
while (io_uring_peek_cqe(ring, &cqe) == 0) {
if (cqe->res < 0) {
fprintf(stderr, "io_uring error: %s\n", strerror(-cqe->res));
result_ids[completed++] = -cqe->user_data; // flag an error by sending a negative ID back so we can clean up memory allocation etc
}
else {
result_ids[completed++] = cqe->user_data;
}
io_uring_cqe_seen(ring, cqe);
}
return completed;
}
int uring_poll(io_uring* ring, long* result_ids)
{
int completed = 0;
struct io_uring_cqe *cqe;
while (io_uring_peek_cqe(ring, &cqe) == 0) {
if (cqe->res < 0) {
fprintf(stderr, "io_uring error: %s\n", strerror(-cqe->res));
result_ids[completed++] = -cqe->user_data; // flag an error by sending a negative ID back so we can clean up memory allocation etc
}
else {
result_ids[completed++] = cqe->user_data;
}
io_uring_cqe_seen(ring, cqe);
}
return completed;
}
int uring_read_buffered(int fd, io_uring* ring, int n, void** buffers, unsigned int* sizes, long* offsets) {
#ifdef DEBUG_CHECKS

View File

@@ -0,0 +1,81 @@
package nu.marginalia;
import nu.marginalia.uring.UringFileReader;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.List;
public class UringFileReaderTest {
Path testFile;
@BeforeEach
public void setUp() throws IOException {
testFile = Files.createTempFile("UringFileReaderTest", ".dat");
}
@AfterEach
public void tearDown() throws IOException {
Files.deleteIfExists(testFile);
}
void createTestFileWithLongs(int size) {
ByteBuffer buffer = ByteBuffer.allocateDirect(size * 8);
for (int i = 0; i < size; i++) {
buffer.putLong(i);
}
buffer.flip();
try (var fc = Files.newByteChannel(testFile, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) {
while (buffer.hasRemaining())
fc.write(buffer);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test
void testUringFileReader() throws IOException {
createTestFileWithLongs(1024);
try (var dfr = new UringFileReader(testFile, false)) {
MemorySegment buf1 = Arena.ofAuto().allocate(32, 8);
MemorySegment buf2 = Arena.ofAuto().allocate(16, 8);
dfr.read(List.of(buf1, buf2), List.of(0L, 8L));
for (int i = 0; i < buf1.byteSize(); i+=8) {
System.out.println(buf1.get(ValueLayout.JAVA_LONG, i));
}
for (int i = 0; i < buf2.byteSize(); i+=8) {
System.out.println(buf2.get(ValueLayout.JAVA_LONG, i));
}
}
}
@Test
void testUringFileReaderUnaligned() throws IOException {
createTestFileWithLongs(65536);
try (var dfr = new UringFileReader(testFile, true)) {
var ret = dfr.readUnalignedInDirectMode(Arena.ofAuto(),
new long[] { 10*8, 20*8, 5000*8, 5100*8},
new int[] { 32*8, 10*8, 100*8, 100*8},
4096);
System.out.println(ret.get(0).get(ValueLayout.JAVA_LONG, 0));
System.out.println(ret.get(1).get(ValueLayout.JAVA_LONG, 0));
System.out.println(ret.get(2).get(ValueLayout.JAVA_LONG, 0));
System.out.println(ret.get(3).get(ValueLayout.JAVA_LONG, 0));
}
}
}

View File

@@ -38,7 +38,7 @@ class UringQueueTest {
offsets.add(32L*i);
}
var uring = UringQueue.open(fd, 16);
uring.read(segments, offsets, false);
uring.readBatch(segments, offsets, false);
uring.close();
}

View File

@@ -0,0 +1,37 @@
package nu.marginalia.uring;
import nu.marginalia.asyncio.AsyncReadRequest;
import nu.marginalia.asyncio.UringExecutionQueue;
import nu.marginalia.ffi.LinuxSystemCalls;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.nio.file.Path;
import java.util.List;
class UringExecutionQueueTest {
@Test
@Disabled
public void test() {
int fd = LinuxSystemCalls.openDirect(Path.of("/home/vlofgren/test.dat"));
MemorySegment ms = Arena.ofAuto().allocate(4096, 4096);
try (var eq = new UringExecutionQueue(128)) {
for (int i = 0;;i++) {
eq.submit(i, List.of(
new AsyncReadRequest(fd, ms, 0),
new AsyncReadRequest(fd, ms, 0),
new AsyncReadRequest(fd, ms, 0),
new AsyncReadRequest(fd, ms, 0),
new AsyncReadRequest(fd, ms, 0)
));
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
finally {
LinuxSystemCalls.closeFd(fd);
}
}
}