mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-10-05 21:22:39 +02:00
Compare commits
10 Commits
ca283f9684
...
bc49406881
Author | SHA1 | Date | |
---|---|---|---|
|
bc49406881 | ||
|
90325be447 | ||
|
dc89587af3 | ||
|
7b552afd6b | ||
|
73557edc67 | ||
|
83919e448a | ||
|
6f5b75b84d | ||
|
db315e2813 | ||
|
e9977e08b7 | ||
|
1df3757e5f |
@@ -8,15 +8,14 @@ import java.lang.foreign.Arena;
|
||||
import java.lang.foreign.MemorySegment;
|
||||
import java.lang.foreign.ValueLayout;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class IndexSpansReaderPlain implements IndexSpansReader {
|
||||
private final UringFileReader urinReader;
|
||||
private final UringFileReader uringReader;
|
||||
|
||||
public IndexSpansReaderPlain(Path spansFile) throws IOException {
|
||||
urinReader = new UringFileReader(spansFile, false);
|
||||
urinReader.fadviseWillneed();
|
||||
uringReader = new UringFileReader(spansFile, true);
|
||||
uringReader.fadviseWillneed();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -51,39 +50,33 @@ public class IndexSpansReaderPlain implements IndexSpansReader {
|
||||
@Override
|
||||
public DocumentSpans[] readSpans(Arena arena, long[] encodedOffsets) {
|
||||
|
||||
long totalSize = 0;
|
||||
int readCnt = 0;
|
||||
for (long offset : encodedOffsets) {
|
||||
if (offset < 0)
|
||||
continue;
|
||||
totalSize += SpansCodec.decodeSize(offset);
|
||||
readCnt ++;
|
||||
}
|
||||
|
||||
if (totalSize == 0) {
|
||||
if (readCnt == 0) {
|
||||
return new DocumentSpans[encodedOffsets.length];
|
||||
}
|
||||
|
||||
MemorySegment segment = arena.allocate(totalSize, 8);
|
||||
long[] offsets = new long[readCnt];
|
||||
int[] sizes = new int[readCnt];
|
||||
|
||||
List<MemorySegment> buffers = new ArrayList<>(encodedOffsets.length);
|
||||
List<Long> offsets = new ArrayList<>(encodedOffsets.length);
|
||||
|
||||
long bufferOffset = 0;
|
||||
|
||||
for (long offset : encodedOffsets) {
|
||||
if (offset < 0)
|
||||
for (int idx = 0, j = 0; idx < encodedOffsets.length; idx++) {
|
||||
if (encodedOffsets[idx] < 0)
|
||||
continue;
|
||||
long offset = encodedOffsets[idx];
|
||||
|
||||
long size = SpansCodec.decodeSize(offset);
|
||||
long start = SpansCodec.decodeStartOffset(offset);
|
||||
|
||||
buffers.add(segment.asSlice(bufferOffset, size));
|
||||
offsets.add(start);
|
||||
bufferOffset += size;
|
||||
offsets[j] = SpansCodec.decodeStartOffset(offset);
|
||||
sizes[j] = SpansCodec.decodeSize(offset);
|
||||
j++;
|
||||
}
|
||||
|
||||
DocumentSpans[] ret = new DocumentSpans[encodedOffsets.length];
|
||||
List<MemorySegment> buffers = uringReader.readUnalignedInDirectMode(arena, offsets, sizes, 4096);
|
||||
|
||||
urinReader.read(buffers, offsets);
|
||||
DocumentSpans[] ret = new DocumentSpans[encodedOffsets.length];
|
||||
|
||||
for (int idx = 0, j = 0; idx < encodedOffsets.length; idx++) {
|
||||
if (encodedOffsets[idx] < 0)
|
||||
@@ -96,7 +89,7 @@ public class IndexSpansReaderPlain implements IndexSpansReader {
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
urinReader.close();
|
||||
uringReader.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -26,8 +26,8 @@ public class SpansCodec {
|
||||
return encoded >>> 28;
|
||||
}
|
||||
|
||||
public static long decodeSize(long encoded) {
|
||||
return encoded & 0x0FFF_FFFFL;
|
||||
public static int decodeSize(long encoded) {
|
||||
return (int) (encoded & 0x0FFF_FFFFL);
|
||||
}
|
||||
|
||||
public static ByteBuffer createSpanFilesFooter(SpansCodecVersion version, int padSize) {
|
||||
|
@@ -0,0 +1,262 @@
|
||||
package nu.marginalia.index.perftest;
|
||||
|
||||
import nu.marginalia.ffi.LinuxSystemCalls;
|
||||
import nu.marginalia.uring.UringFileReader;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.foreign.Arena;
|
||||
import java.lang.foreign.MemorySegment;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.stream.LongStream;
|
||||
|
||||
public class IoPatternsMain {
|
||||
|
||||
static void testBuffered(int sz, int small, int large, int iters) {
|
||||
try {
|
||||
Path largeFile = Path.of("/home/vlofgren/largefile.dat");
|
||||
long fileSize = Files.size(largeFile);
|
||||
|
||||
Random r = new Random();
|
||||
List<MemorySegment> segments = new ArrayList<>();
|
||||
for (int i = 0; i < sz; i++) {
|
||||
if (small == large) {
|
||||
segments.add(Arena.ofAuto().allocate(small));
|
||||
}
|
||||
else {
|
||||
segments.add(Arena.ofAuto().allocate(r.nextInt(small, large)));
|
||||
}
|
||||
}
|
||||
List<Long> offsets = new ArrayList<>();
|
||||
|
||||
long[] samples = new long[1000];
|
||||
int si = 0;
|
||||
|
||||
try (UringFileReader reader = new UringFileReader(largeFile, false)) {
|
||||
for (int iter = 0; iter < iters; ) {
|
||||
if (si == samples.length) {
|
||||
Arrays.sort(samples);
|
||||
double p1 = samples[10] / 1_000.;
|
||||
double p10 = samples[100] / 1_000.;
|
||||
double p90 = samples[900] / 1_000.;
|
||||
double p99 = samples[990] / 1_000.;
|
||||
double avg = LongStream.of(samples).average().getAsDouble() / 1000.;
|
||||
System.out.println("B"+"\t"+avg+"\t"+p1 + " " + p10 + " " + p90 + " " + p99);
|
||||
si = 0;
|
||||
iter++;
|
||||
}
|
||||
|
||||
offsets.clear();
|
||||
for (int i = 0; i < sz; i++) {
|
||||
offsets.add(r.nextLong(0, fileSize - 256));
|
||||
}
|
||||
|
||||
long st = System.nanoTime();
|
||||
reader.read(segments, offsets);
|
||||
long et = System.nanoTime();
|
||||
|
||||
samples[si++] = et - st;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
static void testBufferedPread(int sz, int iters) {
|
||||
try {
|
||||
Path largeFile = Path.of("/home/vlofgren/largefile.dat");
|
||||
long fileSize = Files.size(largeFile);
|
||||
|
||||
Random r = new Random();
|
||||
List<MemorySegment> segments = new ArrayList<>();
|
||||
for (int i = 0; i < sz; i++) {
|
||||
segments.add(Arena.ofAuto().allocate(r.nextInt(24, 256)));
|
||||
}
|
||||
List<Long> offsets = new ArrayList<>();
|
||||
|
||||
long[] samples = new long[1000];
|
||||
int si = 0;
|
||||
|
||||
int fd = -1;
|
||||
try {
|
||||
fd = LinuxSystemCalls.openBuffered(largeFile);
|
||||
LinuxSystemCalls.fadviseRandom(fd);
|
||||
|
||||
for (int iter = 0; iter < iters; ) {
|
||||
if (si == samples.length) {
|
||||
Arrays.sort(samples);
|
||||
double p1 = samples[10] / 1_000.;
|
||||
double p10 = samples[100] / 1_000.;
|
||||
double p90 = samples[900] / 1_000.;
|
||||
double p99 = samples[990] / 1_000.;
|
||||
double avg = LongStream.of(samples).average().getAsDouble() / 1000.;
|
||||
System.out.println("BP"+"\t"+avg+"\t"+p1 + " " + p10 + " " + p90 + " " + p99);
|
||||
si = 0;
|
||||
iter++;
|
||||
}
|
||||
|
||||
offsets.clear();
|
||||
for (int i = 0; i < sz; i++) {
|
||||
offsets.add(r.nextLong(0, fileSize - 256));
|
||||
}
|
||||
|
||||
long st = System.nanoTime();
|
||||
for (int i = 0; i < sz; i++) {
|
||||
LinuxSystemCalls.readAt(fd, segments.get(i), offsets.get(i));
|
||||
}
|
||||
long et = System.nanoTime();
|
||||
|
||||
samples[si++] = et - st;
|
||||
}
|
||||
}
|
||||
finally {
|
||||
LinuxSystemCalls.closeFd(fd);
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void testDirect(int blockSize, int sz, int iters) {
|
||||
try {
|
||||
Path largeFile = Path.of("/home/vlofgren/largefile.dat");
|
||||
int fileSizeBlocks = (int) ((Files.size(largeFile) & -blockSize) / blockSize);
|
||||
|
||||
Random r = new Random();
|
||||
List<MemorySegment> segments = new ArrayList<>();
|
||||
for (int i = 0; i < sz; i++) {
|
||||
segments.add(Arena.ofAuto().allocate(blockSize, blockSize));
|
||||
}
|
||||
List<Long> offsets = new ArrayList<>();
|
||||
|
||||
long[] samples = new long[1000];
|
||||
int si = 0;
|
||||
|
||||
try (UringFileReader reader = new UringFileReader(largeFile, true)) {
|
||||
for (int iter = 0; iter < iters; ) {
|
||||
if (si == samples.length) {
|
||||
Arrays.sort(samples);
|
||||
double p1 = samples[10] / 1_000.;
|
||||
double p10 = samples[100] / 1_000.;
|
||||
double p90 = samples[900] / 1_000.;
|
||||
double p99 = samples[990] / 1_000.;
|
||||
double avg = LongStream.of(samples).average().getAsDouble() / 1000.;
|
||||
System.out.println("DN"+blockSize+"\t"+avg+"\t"+p1 + " " + p10 + " " + p90 + " " + p99);
|
||||
si = 0;
|
||||
iters++;
|
||||
}
|
||||
|
||||
offsets.clear();
|
||||
for (int i = 0; i < sz; i++) {
|
||||
offsets.add(blockSize * r.nextLong(0, fileSizeBlocks));
|
||||
}
|
||||
|
||||
long st = System.nanoTime();
|
||||
reader.read(segments, offsets);
|
||||
long et = System.nanoTime();
|
||||
|
||||
samples[si++] = et - st;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void testDirect1(int blockSize, int iters) {
|
||||
try {
|
||||
Path largeFile = Path.of("/home/vlofgren/largefile.dat");
|
||||
int fileSizeBlocks = (int) ((Files.size(largeFile) & -blockSize) / blockSize);
|
||||
|
||||
Random r = new Random();
|
||||
MemorySegment segment = Arena.global().allocate(blockSize, blockSize);
|
||||
|
||||
long[] samples = new long[1000];
|
||||
int si = 0;
|
||||
|
||||
int fd = LinuxSystemCalls.openDirect(largeFile);
|
||||
if (fd < 0) {
|
||||
throw new IOException("open failed");
|
||||
}
|
||||
try {
|
||||
for (int iter = 0; iter < iters; ) {
|
||||
if (si == samples.length) {
|
||||
Arrays.sort(samples);
|
||||
double p1 = samples[10] / 1_000.;
|
||||
double p10 = samples[100] / 1_000.;
|
||||
double p90 = samples[900] / 1_000.;
|
||||
double p99 = samples[990] / 1_000.;
|
||||
double avg = LongStream.of(samples).average().getAsDouble() / 1000.;
|
||||
System.out.println("D1"+blockSize+"\t"+avg+"\t"+p1 + " " + p10 + " " + p90 + " " + p99);
|
||||
si = 0;
|
||||
iters++;
|
||||
}
|
||||
|
||||
|
||||
long st = System.nanoTime();
|
||||
int ret;
|
||||
long readOffset = blockSize * r.nextLong(0, fileSizeBlocks);
|
||||
if (blockSize != (ret = LinuxSystemCalls.readAt(fd, segment, readOffset))) {
|
||||
throw new IOException("pread failed: " + ret);
|
||||
}
|
||||
long et = System.nanoTime();
|
||||
|
||||
samples[si++] = et - st;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
finally {
|
||||
LinuxSystemCalls.closeFd(fd);
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
// Thread.ofPlatform().start(() -> testBuffered(128, 32, 65536,1000));
|
||||
Thread.ofPlatform().start(() -> testDirect(8192*4, 128,1000));
|
||||
// Thread.ofPlatform().start(() -> testBuffered(128, 1000));
|
||||
// Thread.ofPlatform().start(() -> testBuffered(128, 1000));
|
||||
// Thread.ofPlatform().start(() -> testBuffered(128, 1000));
|
||||
// Thread.ofPlatform().start(() -> testBufferedPread(128, 1000));
|
||||
|
||||
// Thread.ofPlatform().start(() -> testDirect1(1024, 1000));
|
||||
// Thread.ofPlatform().start(() -> testDirect1(1024, 1000));
|
||||
// Thread.ofPlatform().start(() -> testDirect1(1024, 1000));
|
||||
// Thread.ofPlatform().start(() -> testDirect1(1024*1024, 1000));
|
||||
// Thread.ofPlatform().start(() -> testDirect1(1024*1024, 1000));
|
||||
// Thread.ofPlatform().start(() -> testDirect(512, 512,1000));
|
||||
// Thread.ofPlatform().start(() -> testDirect(512, 512,1000));
|
||||
// Thread.ofPlatform().start(() -> testDirect(512, 512,1000));
|
||||
// Thread.ofPlatform().start(() -> testDirect(512, 100));
|
||||
// Thread.ofPlatform().start(() -> testDirect(512, 100));
|
||||
// Thread.ofPlatform().start(() -> testDirect(512, 100));
|
||||
// Thread.ofPlatform().start(() -> testDirect(512, 100));
|
||||
// Thread.ofPlatform().start(() -> testBuffered(512, 1000));
|
||||
// Thread.ofPlatform().start(() -> testBuffered(512, 1000));
|
||||
// Thread.ofPlatform().start(() -> testBuffered(512, 1000));
|
||||
// Thread.ofPlatform().start(() -> testBuffered(512, 1000));
|
||||
// Thread.ofPlatform().start(() -> testBuffered(100));
|
||||
// Thread.ofPlatform().start(() -> testBuffered(100));
|
||||
|
||||
for (;;);
|
||||
// testBuffered(100);
|
||||
}
|
||||
}
|
@@ -14,70 +14,103 @@ import java.nio.file.StandardOpenOption;
|
||||
*
|
||||
* The positions data is concatenated in the file, with each term's metadata
|
||||
* followed by its positions. The metadata is a single byte, and the positions
|
||||
* are encoded using the Elias Gamma code, with zero padded bits at the end to
|
||||
* get octet alignment.
|
||||
*
|
||||
* are encoded varints.
|
||||
* <p></p>
|
||||
*
|
||||
* It is the responsibility of the caller to keep track of the byte offset of
|
||||
* each posting in the file.
|
||||
*/
|
||||
public class PositionsFileConstructor implements AutoCloseable {
|
||||
private final ByteBuffer workBuffer = ByteBuffer.allocate(65536);
|
||||
|
||||
private final Path file;
|
||||
private final FileChannel channel;
|
||||
|
||||
private long offset;
|
||||
|
||||
public PositionsFileConstructor(Path file) throws IOException {
|
||||
this.file = file;
|
||||
|
||||
channel = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
|
||||
}
|
||||
|
||||
/** Represents a block of positions lists. Each writer thread should hold on to
|
||||
* a block object to ensure the locality of its positions lists.
|
||||
* When finished, commit() must be run.
|
||||
* */
|
||||
public class PositionsFileBlock {
|
||||
private final ByteBuffer workBuffer = ByteBuffer.allocate(1024*1024*16);
|
||||
private long position;
|
||||
|
||||
public PositionsFileBlock(long position) {
|
||||
this.position = position;
|
||||
}
|
||||
|
||||
public boolean fitsData(int size) {
|
||||
return workBuffer.remaining() >= size;
|
||||
}
|
||||
|
||||
public void commit() throws IOException {
|
||||
workBuffer.position(0);
|
||||
workBuffer.limit(workBuffer.capacity());
|
||||
int pos = 0;
|
||||
while (workBuffer.hasRemaining()) {
|
||||
pos += channel.write(workBuffer, this.position + pos + workBuffer.position());
|
||||
}
|
||||
}
|
||||
|
||||
private void relocate() throws IOException {
|
||||
workBuffer.clear();
|
||||
position = channel.position();
|
||||
while (workBuffer.hasRemaining()) {
|
||||
channel.write(workBuffer);
|
||||
}
|
||||
workBuffer.clear();
|
||||
}
|
||||
|
||||
public long position() {
|
||||
return this.position + workBuffer.position();
|
||||
}
|
||||
public void put(byte b) {
|
||||
workBuffer.put(b);
|
||||
}
|
||||
public void put(ByteBuffer buffer) {
|
||||
workBuffer.put(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
public PositionsFileBlock getBlock() throws IOException {
|
||||
synchronized (this) {
|
||||
var block = new PositionsFileBlock(channel.position());
|
||||
block.relocate();
|
||||
return block;
|
||||
}
|
||||
}
|
||||
|
||||
/** Add a term to the positions file
|
||||
*
|
||||
* @param block a block token to ensure data locality
|
||||
* @param termMeta the term metadata
|
||||
* @param positionsBuffer the positions of the term
|
||||
*
|
||||
* @return the offset of the term in the file, with the size of the data in the highest byte
|
||||
*/
|
||||
public long add(byte termMeta, ByteBuffer positionsBuffer) throws IOException {
|
||||
synchronized (file) {
|
||||
int size = 1 + positionsBuffer.remaining();
|
||||
public long add(PositionsFileBlock block, byte termMeta, ByteBuffer positionsBuffer) throws IOException {
|
||||
int size = 1 + positionsBuffer.remaining();
|
||||
|
||||
if (workBuffer.remaining() < size) {
|
||||
workBuffer.flip();
|
||||
channel.write(workBuffer);
|
||||
workBuffer.clear();
|
||||
if (!block.fitsData(size)) {
|
||||
synchronized (this) {
|
||||
block.commit();
|
||||
block.relocate();
|
||||
}
|
||||
}
|
||||
synchronized (file) {
|
||||
long offset = block.position();
|
||||
|
||||
workBuffer.put(termMeta);
|
||||
workBuffer.put(positionsBuffer);
|
||||
block.put(termMeta);
|
||||
block.put(positionsBuffer);
|
||||
|
||||
long ret = PositionCodec.encode(size, offset);
|
||||
|
||||
offset += size;
|
||||
|
||||
return ret;
|
||||
return PositionCodec.encode(size, offset);
|
||||
}
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
if (workBuffer.hasRemaining()) {
|
||||
workBuffer.flip();
|
||||
|
||||
while (workBuffer.hasRemaining())
|
||||
channel.write(workBuffer);
|
||||
}
|
||||
|
||||
long remainingBlockSize = 4096 - (channel.position() & -4096);
|
||||
if (remainingBlockSize != 0) {
|
||||
workBuffer.position(0);
|
||||
workBuffer.limit(0);
|
||||
while (workBuffer.hasRemaining())
|
||||
channel.write(workBuffer);
|
||||
}
|
||||
|
||||
channel.force(false);
|
||||
channel.close();
|
||||
}
|
||||
|
@@ -79,6 +79,8 @@ public class FullPreindexDocuments {
|
||||
var offsetMap = segments.asMap(RECORD_SIZE_LONGS);
|
||||
offsetMap.defaultReturnValue(0);
|
||||
|
||||
var positionsBlock = positionsFileConstructor.getBlock();
|
||||
|
||||
while (docIds.hasRemaining()) {
|
||||
long docId = docIds.get();
|
||||
long rankEncodedId = docIdRewriter.rewriteDocId(docId);
|
||||
@@ -94,12 +96,13 @@ public class FullPreindexDocuments {
|
||||
ByteBuffer pos = tPos.get(i);
|
||||
|
||||
long offset = offsetMap.addTo(termId, RECORD_SIZE_LONGS);
|
||||
long encodedPosOffset = positionsFileConstructor.add(meta, pos);
|
||||
long encodedPosOffset = positionsFileConstructor.add(positionsBlock, meta, pos);
|
||||
|
||||
assembly.put(offset + 0, rankEncodedId);
|
||||
assembly.put(offset + 1, encodedPosOffset);
|
||||
}
|
||||
}
|
||||
positionsBlock.commit();
|
||||
|
||||
assembly.write(docsFile);
|
||||
}
|
||||
|
@@ -7,8 +7,8 @@ import org.slf4j.LoggerFactory;
|
||||
import java.io.IOException;
|
||||
import java.lang.foreign.Arena;
|
||||
import java.lang.foreign.MemorySegment;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/** Reads positions data from the positions file */
|
||||
@@ -18,7 +18,10 @@ public class PositionsFileReader implements AutoCloseable {
|
||||
private static final Logger logger = LoggerFactory.getLogger(PositionsFileReader.class);
|
||||
|
||||
public PositionsFileReader(Path positionsFile) throws IOException {
|
||||
uringFileReader = new UringFileReader(positionsFile, false);
|
||||
if ((Files.size(positionsFile) & 4095) != 0) {
|
||||
throw new IllegalArgumentException("Positions file is not block aligned in size: " + Files.size(positionsFile));
|
||||
}
|
||||
uringFileReader = new UringFileReader(positionsFile, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -29,38 +32,34 @@ public class PositionsFileReader implements AutoCloseable {
|
||||
/** Get the positions for a keywords in the index, as pointed out by the encoded offsets;
|
||||
* intermediate buffers are allocated from the provided arena allocator. */
|
||||
public TermData[] getTermData(Arena arena, long[] offsets) {
|
||||
|
||||
int cnt = 0;
|
||||
|
||||
for (int i = 0; i < offsets.length; i++) {
|
||||
long encodedOffset = offsets[i];
|
||||
if (encodedOffset == 0) continue;
|
||||
cnt++;
|
||||
}
|
||||
|
||||
if (cnt == 0) {
|
||||
return new TermData[offsets.length];
|
||||
}
|
||||
|
||||
long[] readOffsets = new long[cnt];
|
||||
int[] readSizes = new int[cnt];
|
||||
|
||||
for (int i = 0, j = 0; i < offsets.length; i++) {
|
||||
long encodedOffset = offsets[i];
|
||||
if (encodedOffset == 0) continue;
|
||||
|
||||
readSizes[j] = PositionCodec.decodeSize(encodedOffset);
|
||||
readOffsets[j] = PositionCodec.decodeOffset(encodedOffset);
|
||||
j++;
|
||||
}
|
||||
|
||||
List<MemorySegment> buffers = uringFileReader.readUnalignedInDirectMode(arena, readOffsets, readSizes, 4096);
|
||||
|
||||
TermData[] ret = new TermData[offsets.length];
|
||||
|
||||
int sizeTotal = 0;
|
||||
|
||||
for (int i = 0; i < offsets.length; i++) {
|
||||
long encodedOffset = offsets[i];
|
||||
if (encodedOffset == 0) continue;
|
||||
sizeTotal += PositionCodec.decodeSize(encodedOffset);
|
||||
}
|
||||
|
||||
if (sizeTotal == 0)
|
||||
return ret;
|
||||
|
||||
MemorySegment segment = arena.allocate(sizeTotal, 512);
|
||||
|
||||
List<MemorySegment> buffers = new ArrayList<>(offsets.length);
|
||||
List<Long> readOffsets = new ArrayList<>(offsets.length);
|
||||
|
||||
int bufOffset = 0;
|
||||
for (int i = 0; i < offsets.length; i++) {
|
||||
long encodedOffset = offsets[i];
|
||||
if (encodedOffset == 0) continue;
|
||||
|
||||
int length = PositionCodec.decodeSize(encodedOffset);
|
||||
long offset = PositionCodec.decodeOffset(encodedOffset);
|
||||
buffers.add(segment.asSlice(bufOffset, length));
|
||||
readOffsets.add(offset);
|
||||
bufOffset+=length;
|
||||
}
|
||||
|
||||
uringFileReader.read(buffers, readOffsets);
|
||||
|
||||
for (int i = 0, j=0; i < offsets.length; i++) {
|
||||
long encodedOffset = offsets[i];
|
||||
if (encodedOffset == 0) continue;
|
||||
|
@@ -33,9 +33,11 @@ class PositionsFileReaderTest {
|
||||
void getTermData() throws IOException {
|
||||
long key1, key2, key3;
|
||||
try (PositionsFileConstructor constructor = new PositionsFileConstructor(file)) {
|
||||
key1 = constructor.add((byte) 43, VarintCodedSequence.generate(1, 2, 3).buffer());
|
||||
key2 = constructor.add((byte) 51, VarintCodedSequence.generate(2, 3, 5, 1000, 5000, 20241).buffer());
|
||||
key3 = constructor.add((byte) 61, VarintCodedSequence.generate(3, 5, 7).buffer());
|
||||
var block = constructor.getBlock();
|
||||
key1 = constructor.add(block, (byte) 43, VarintCodedSequence.generate(1, 2, 3).buffer());
|
||||
key2 = constructor.add(block, (byte) 51, VarintCodedSequence.generate(2, 3, 5, 1000, 5000, 20241).buffer());
|
||||
key3 = constructor.add(block, (byte) 61, VarintCodedSequence.generate(3, 5, 7).buffer());
|
||||
block.commit();
|
||||
}
|
||||
|
||||
System.out.println("key1: " + Long.toHexString(key1));
|
||||
|
@@ -4,7 +4,6 @@ import nu.marginalia.array.DirectFileReader;
|
||||
import nu.marginalia.array.LongArray;
|
||||
import nu.marginalia.array.LongArrayFactory;
|
||||
import nu.marginalia.ffi.LinuxSystemCalls;
|
||||
import nu.marginalia.uring.UringFileReader;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -12,7 +11,6 @@ import java.lang.foreign.Arena;
|
||||
import java.lang.foreign.MemorySegment;
|
||||
import java.lang.foreign.ValueLayout;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
public class NativeAlgosTest {
|
||||
@Test
|
||||
@@ -71,28 +69,4 @@ public class NativeAlgosTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testAioFileReader() throws IOException {
|
||||
LongArray array = LongArrayFactory.mmapForWritingShared(Path.of("/tmp/test"), 1024);
|
||||
for (int i = 0; i < 1024; i++) {
|
||||
array.set(i, i);
|
||||
}
|
||||
array.close();
|
||||
|
||||
try (var dfr = new UringFileReader(Path.of("/tmp/test"), false)) {
|
||||
MemorySegment buf1 = Arena.ofAuto().allocate(32, 8);
|
||||
MemorySegment buf2 = Arena.ofAuto().allocate(16, 8);
|
||||
|
||||
dfr.read(List.of(buf1, buf2), List.of(0L, 8L));
|
||||
|
||||
for (int i = 0; i < buf1.byteSize(); i+=8) {
|
||||
System.out.println(buf1.get(ValueLayout.JAVA_LONG, i));
|
||||
}
|
||||
|
||||
for (int i = 0; i < buf2.byteSize(); i+=8) {
|
||||
System.out.println(buf2.get(ValueLayout.JAVA_LONG, i));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@@ -1,15 +1,20 @@
|
||||
#!/usr/bin/env sh
|
||||
|
||||
CXXFLAGS=-O3 -march=native -std=c++14 -fPIC `pkg-config --cflags liburing`
|
||||
LDFLAGS=`pkg-config --libs liburing` -shared
|
||||
LDFLAGS=
|
||||
|
||||
# Weird hack to get liburing to link on one particular debian server
|
||||
LIBURING_PATH=`pkg-config liburing --keep-system-libs --libs-only-L | cut -c 3- | tr -d \ `/liburing.so
|
||||
|
||||
CXX=c++
|
||||
|
||||
SOURCES=src/sort.cc src/unix.cc src/uring.cc
|
||||
|
||||
all: resources/libcpp.so resources/liburing.so
|
||||
resources/libcpp.so:
|
||||
${CXX} -shared ${LDFLAGS} ${CXXFLAGS} ${SOURCES} -o resources/libcpp.so
|
||||
all: resources/libcpp.so
|
||||
|
||||
resources/libcpp.so: ${SOURCES} resources/liburing.so
|
||||
${CXX} -shared ${CXXFLAGS} ${SOURCES} resources/liburing.so -o resources/libcpp.so
|
||||
resources/liburing.so:
|
||||
cp /usr/lib/liburing.so resources/
|
||||
cp ${LIBURING_PATH} resources/liburing.so
|
||||
clean:
|
||||
rm -rf resources/{libcpp,liburing}.so
|
@@ -10,6 +10,7 @@ java {
|
||||
|
||||
dependencies {
|
||||
implementation libs.bundles.slf4j
|
||||
implementation libs.fastutil
|
||||
|
||||
testImplementation libs.bundles.slf4j.test
|
||||
testImplementation libs.bundles.junit
|
||||
|
@@ -0,0 +1,6 @@
|
||||
package nu.marginalia.asyncio;
|
||||
|
||||
import java.lang.foreign.MemorySegment;
|
||||
|
||||
public record AsyncReadRequest(int fd, MemorySegment destination, long offset) {
|
||||
}
|
@@ -0,0 +1,55 @@
|
||||
package nu.marginalia.asyncio;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
final class SubmittedReadRequest<T> {
|
||||
|
||||
public final long id;
|
||||
|
||||
private final T context;
|
||||
private final List<AsyncReadRequest> requests;
|
||||
private final CompletableFuture<T> future;
|
||||
private int count;
|
||||
private volatile boolean success = true;
|
||||
|
||||
SubmittedReadRequest(T context, List<AsyncReadRequest> requests, CompletableFuture<T> future, long id) {
|
||||
this.context = context;
|
||||
this.requests = requests;
|
||||
this.future = future;
|
||||
this.id = id;
|
||||
this.count = requests.size();
|
||||
}
|
||||
|
||||
public List<AsyncReadRequest> getRequests() {
|
||||
return requests;
|
||||
}
|
||||
|
||||
public int count() {
|
||||
return count;
|
||||
}
|
||||
|
||||
public void canNotFinish() {
|
||||
success = false;
|
||||
count = 0;
|
||||
future.completeExceptionally(new IOException());
|
||||
}
|
||||
|
||||
public boolean partFinished(boolean successfully) {
|
||||
if (!successfully) {
|
||||
success = false;
|
||||
}
|
||||
|
||||
if (--count == 0) {
|
||||
if (success) {
|
||||
future.complete(context);
|
||||
} else {
|
||||
future.completeExceptionally(new IOException());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,243 @@
|
||||
package nu.marginalia.asyncio;
|
||||
|
||||
import nu.marginalia.ffi.IoUring;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.foreign.Arena;
|
||||
import java.lang.foreign.MemorySegment;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static java.lang.foreign.ValueLayout.*;
|
||||
|
||||
public class UringExecutionQueue implements AutoCloseable {
|
||||
private static final IoUring ioUringInstance = IoUring.instance();
|
||||
|
||||
private final AtomicLong requestIdCounter = new AtomicLong(1);
|
||||
private final int queueSize;
|
||||
|
||||
private final Thread executor;
|
||||
private volatile boolean running = true;
|
||||
private final MemorySegment uringQueue;
|
||||
|
||||
private final ArrayBlockingQueue<SubmittedReadRequest<? extends Object>> inputQueue;
|
||||
|
||||
public UringExecutionQueue(int queueSize) throws Throwable {
|
||||
this.inputQueue = new ArrayBlockingQueue<>(queueSize, false);
|
||||
this.queueSize = queueSize;
|
||||
this.uringQueue = (MemorySegment) ioUringInstance.uringInit.invoke(queueSize);
|
||||
|
||||
executor = Thread.ofPlatform().daemon().start(this::executionPipe);
|
||||
}
|
||||
|
||||
public void close() throws InterruptedException {
|
||||
running = false;
|
||||
executor.join();
|
||||
|
||||
try {
|
||||
ioUringInstance.uringClose.invoke(uringQueue);
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public <T> CompletableFuture<T> submit(T context, List<AsyncReadRequest> relatedRequests) throws InterruptedException {
|
||||
if (relatedRequests.size() > queueSize) {
|
||||
throw new IllegalArgumentException("Request batches may not exceed the queue size!");
|
||||
}
|
||||
long id = requestIdCounter.incrementAndGet();
|
||||
CompletableFuture<T> future = new CompletableFuture<>();
|
||||
inputQueue.put(new SubmittedReadRequest<>(context, relatedRequests, future, id));
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
static class UringDispatcher implements AutoCloseable {
|
||||
private final Arena arena;
|
||||
|
||||
private final MemorySegment returnResultIds;
|
||||
private final MemorySegment readBatchIds;
|
||||
private final MemorySegment readFds;
|
||||
private final MemorySegment readBuffers;
|
||||
private final MemorySegment readSizes;
|
||||
private final MemorySegment readOffsets;
|
||||
private final MemorySegment uringQueue;
|
||||
|
||||
private int requestsToSend = 0;
|
||||
|
||||
UringDispatcher(int queueSize, MemorySegment uringQueue) {
|
||||
this.uringQueue = uringQueue;
|
||||
this.arena = Arena.ofConfined();
|
||||
|
||||
returnResultIds = arena.allocate(JAVA_LONG, queueSize);
|
||||
readBatchIds = arena.allocate(JAVA_LONG, queueSize);
|
||||
readFds = arena.allocate(JAVA_INT, queueSize);
|
||||
readBuffers = arena.allocate(ADDRESS, queueSize);
|
||||
readSizes = arena.allocate(JAVA_INT, queueSize);
|
||||
readOffsets = arena.allocate(JAVA_LONG, queueSize);
|
||||
}
|
||||
|
||||
void prepareRead(int fd, long batchId, MemorySegment segment, int size, long offset) {
|
||||
readFds.setAtIndex(JAVA_INT, requestsToSend, fd);
|
||||
readBuffers.setAtIndex(ADDRESS, requestsToSend, segment);
|
||||
readBatchIds.setAtIndex(JAVA_LONG, requestsToSend, batchId);
|
||||
readSizes.setAtIndex(JAVA_INT, requestsToSend, size);
|
||||
readOffsets.setAtIndex(JAVA_LONG, requestsToSend, offset);
|
||||
requestsToSend++;
|
||||
}
|
||||
|
||||
long[] poll() {
|
||||
try {
|
||||
// Dispatch call
|
||||
int result = (Integer) IoUring.instance.uringJustPoll.invoke(uringQueue, returnResultIds);
|
||||
|
||||
if (result < 0) {
|
||||
throw new IOException("Error in io_uring");
|
||||
}
|
||||
else {
|
||||
long[] ret = new long[result];
|
||||
for (int i = 0; i < result; i++) {
|
||||
ret[i] = returnResultIds.getAtIndex(JAVA_LONG, i);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
||||
catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
finally {
|
||||
requestsToSend = 0;
|
||||
}
|
||||
}
|
||||
long[] dispatchRead(int ongoingRequests) throws IOException {
|
||||
try {
|
||||
// Dispatch call
|
||||
int result = (Integer) IoUring.instance.uringReadAndPoll.invoke(
|
||||
uringQueue,
|
||||
returnResultIds,
|
||||
ongoingRequests,
|
||||
requestsToSend,
|
||||
readBatchIds,
|
||||
readFds,
|
||||
readBuffers,
|
||||
readSizes,
|
||||
readOffsets
|
||||
);
|
||||
|
||||
if (result < 0) {
|
||||
throw new IOException("Error in io_uring");
|
||||
}
|
||||
else {
|
||||
long[] ret = new long[result];
|
||||
for (int i = 0; i < result; i++) {
|
||||
ret[i] = returnResultIds.getAtIndex(JAVA_LONG, i);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
||||
catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
finally {
|
||||
requestsToSend = 0;
|
||||
}
|
||||
}
|
||||
|
||||
int getRequestsToSend() {
|
||||
return requestsToSend;
|
||||
}
|
||||
|
||||
public void close() {
|
||||
arena.close();
|
||||
}
|
||||
}
|
||||
|
||||
public void executionPipe() {
|
||||
try (var uringDispatcher = new UringDispatcher(queueSize, uringQueue)) {
|
||||
int ongoingRequests = 0;
|
||||
|
||||
// recycle between iterations to avoid allocation churn
|
||||
List<SubmittedReadRequest<?>> batchesToSend = new ArrayList<>();
|
||||
|
||||
Map<Long, SubmittedReadRequest<?>> requestsToId = new HashMap<>();
|
||||
|
||||
while (running) {
|
||||
batchesToSend.clear();
|
||||
|
||||
// if (inputQueue.isEmpty() && ongoingRequests == 0) {
|
||||
// LockSupport.parkNanos(10_000);
|
||||
// continue;
|
||||
// }
|
||||
|
||||
int remainingRequests = queueSize - ongoingRequests;
|
||||
|
||||
SubmittedReadRequest<?> request;
|
||||
|
||||
// Find batches to send that will not exceed the queue size
|
||||
while ((request = inputQueue.peek()) != null) {
|
||||
if (remainingRequests >= request.count()) {
|
||||
remainingRequests -= request.count();
|
||||
inputQueue.poll();
|
||||
|
||||
batchesToSend.add(request);
|
||||
}
|
||||
else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Arrange requests from the batches into arrays to send to FFI call
|
||||
|
||||
int requestsToSend = 0;
|
||||
for (var batch : batchesToSend) {
|
||||
requestsToId.put(batch.id, batch);
|
||||
|
||||
for (var read : batch.getRequests()) {
|
||||
uringDispatcher.prepareRead(read.fd(), batch.id, read.destination(), (int) read.destination().byteSize(), read.offset());
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
ongoingRequests += uringDispatcher.getRequestsToSend();
|
||||
|
||||
long[] results;
|
||||
if (uringDispatcher.getRequestsToSend() > 0) {
|
||||
results = uringDispatcher.dispatchRead(ongoingRequests);
|
||||
}
|
||||
else {
|
||||
results = uringDispatcher.poll();
|
||||
}
|
||||
|
||||
for (long id : results) {
|
||||
requestsToId.computeIfPresent(Math.abs(id), (_, req) -> {
|
||||
if (req.partFinished(id > 0)) {
|
||||
return null;
|
||||
} else {
|
||||
return req;
|
||||
}
|
||||
});
|
||||
ongoingRequests--;
|
||||
}
|
||||
}
|
||||
catch (IOException ex) {
|
||||
ongoingRequests -= requestsToSend;
|
||||
batchesToSend.forEach(req -> {
|
||||
req.canNotFinish();
|
||||
requestsToId.remove(req.id);
|
||||
});
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@@ -23,17 +23,19 @@ import static java.lang.foreign.ValueLayout.*;
|
||||
* */
|
||||
@SuppressWarnings("preview")
|
||||
public class IoUring {
|
||||
private final MethodHandle uringInit;
|
||||
private final MethodHandle uringClose;
|
||||
public final MethodHandle uringInit;
|
||||
public final MethodHandle uringClose;
|
||||
private final MethodHandle uringReadBuffered;
|
||||
private final MethodHandle uringReadDirect;
|
||||
public final MethodHandle uringReadAndPoll;
|
||||
public final MethodHandle uringJustPoll;
|
||||
|
||||
public static final IoUring instance;
|
||||
|
||||
/** Indicates whether the native implementations are available */
|
||||
public static final boolean isAvailable;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(LinuxSystemCalls.class);
|
||||
private static final Logger logger = LoggerFactory.getLogger(IoUring.class);
|
||||
|
||||
private IoUring(Path libFile) {
|
||||
SymbolLookup libraryLookup = SymbolLookup.libraryLookup(libFile, Arena.global());
|
||||
@@ -44,6 +46,28 @@ public class IoUring {
|
||||
handle = libraryLookup.findOrThrow("uring_read_direct");
|
||||
uringReadDirect = nativeLinker.downcallHandle(handle, FunctionDescriptor.of(JAVA_INT, JAVA_INT, ADDRESS, JAVA_INT, ADDRESS, ADDRESS, ADDRESS));
|
||||
|
||||
handle = libraryLookup.findOrThrow("uring_read_submit_and_poll");
|
||||
|
||||
uringReadAndPoll = nativeLinker.downcallHandle(handle, FunctionDescriptor.of(
|
||||
JAVA_INT,
|
||||
ADDRESS, // io_uring* ring
|
||||
ADDRESS, // long* result_ids
|
||||
JAVA_INT, // int in_flight_requests
|
||||
JAVA_INT, // int read_count
|
||||
ADDRESS, // long* read_batch_ids
|
||||
ADDRESS, // int* read_fds
|
||||
ADDRESS, // void** read_buffers
|
||||
ADDRESS, // unsigned int** read_sizes
|
||||
ADDRESS // long* read_offsets
|
||||
));
|
||||
handle = libraryLookup.findOrThrow("uring_poll");
|
||||
|
||||
uringJustPoll = nativeLinker.downcallHandle(handle, FunctionDescriptor.of(
|
||||
JAVA_INT,
|
||||
ADDRESS, // io_uring* ring
|
||||
ADDRESS // long* result_ids
|
||||
));
|
||||
|
||||
handle = libraryLookup.findOrThrow("initialize_uring");
|
||||
uringInit = nativeLinker.downcallHandle(handle, FunctionDescriptor.of(ADDRESS, JAVA_INT));
|
||||
|
||||
@@ -53,9 +77,9 @@ public class IoUring {
|
||||
|
||||
static {
|
||||
Path libFile;
|
||||
IoUring nativeAlgosI = null;
|
||||
IoUring ioUringI = null;
|
||||
// copy resource to temp file so it can be loaded
|
||||
try (var is = NativeAlgos.class.getClassLoader().getResourceAsStream("liburing.so")) {
|
||||
try (var is = IoUring.class.getClassLoader().getResourceAsStream("liburing.so")) {
|
||||
var tempFile = File.createTempFile("liburing", ".so");
|
||||
tempFile.deleteOnExit();
|
||||
|
||||
@@ -70,7 +94,7 @@ public class IoUring {
|
||||
logger.info("Failed to load native library, likely not built", e);
|
||||
}
|
||||
|
||||
try (var is = NativeAlgos.class.getClassLoader().getResourceAsStream("libcpp.so")) {
|
||||
try (var is = IoUring.class.getClassLoader().getResourceAsStream("libcpp.so")) {
|
||||
var tempFile = File.createTempFile("libcpp", ".so");
|
||||
tempFile.deleteOnExit();
|
||||
|
||||
@@ -80,16 +104,20 @@ public class IoUring {
|
||||
}
|
||||
|
||||
libFile = tempFile.toPath();
|
||||
nativeAlgosI = new IoUring(libFile);
|
||||
ioUringI = new IoUring(libFile);
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.info("Failed to load native library, likely not built", e);
|
||||
}
|
||||
|
||||
instance = nativeAlgosI;
|
||||
instance = ioUringI;
|
||||
isAvailable = instance != null;
|
||||
}
|
||||
|
||||
public static IoUring instance() {
|
||||
return instance;
|
||||
}
|
||||
|
||||
public static UringQueue uringOpen(int fd, int queueSize) {
|
||||
try {
|
||||
return new UringQueue((MemorySegment) instance.uringInit.invoke(queueSize), fd);
|
||||
|
@@ -1,10 +1,15 @@
|
||||
package nu.marginalia.uring;
|
||||
|
||||
import it.unimi.dsi.fastutil.longs.Long2IntAVLTreeMap;
|
||||
import it.unimi.dsi.fastutil.longs.LongAVLTreeSet;
|
||||
import it.unimi.dsi.fastutil.longs.LongIterator;
|
||||
import nu.marginalia.ffi.LinuxSystemCalls;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.foreign.Arena;
|
||||
import java.lang.foreign.MemorySegment;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@@ -46,7 +51,7 @@ public class UringFileReader implements AutoCloseable {
|
||||
|
||||
int ret;
|
||||
if (ms.byteSize() != (ret = LinuxSystemCalls.readAt(fd, ms, offset))) {
|
||||
throw new RuntimeException("Read failed, rv=" + ret);
|
||||
throw new RuntimeException("Read failed, rv=" + ret + " at " + offset + " : " + ms.byteSize());
|
||||
}
|
||||
}
|
||||
return;
|
||||
@@ -54,7 +59,7 @@ public class UringFileReader implements AutoCloseable {
|
||||
var ring = rings[(int) (ringIdx.getAndIncrement() % rings.length)];
|
||||
|
||||
if (destinations.size() <= QUEUE_SIZE) {
|
||||
int ret = ring.read(destinations, offsets, direct);
|
||||
int ret = ring.readBatch(destinations, offsets, direct);
|
||||
if (ret != offsets.size()) {
|
||||
throw new RuntimeException("Read failed, rv=" + ret);
|
||||
}
|
||||
@@ -66,6 +71,87 @@ public class UringFileReader implements AutoCloseable {
|
||||
}
|
||||
}
|
||||
|
||||
/** This function takes a list of offsets and sizes, and translates them to a minium of blockSize'd O_DIRECT
|
||||
* reads. A single buffer will be allocated to hold all the data, to encourage HugePages allocation and
|
||||
* reduce TLB thrashing. It is still generally helpful for performance if the data is at least best-effort
|
||||
* block aligned.
|
||||
*
|
||||
* @return MemorySegment slices that contain only the requested data.
|
||||
*/
|
||||
public List<MemorySegment> readUnalignedInDirectMode(Arena arena, long[] offsets, int[] sizes, int blockSize) {
|
||||
|
||||
if (offsets.length < 1)
|
||||
return List.of();
|
||||
if (offsets.length != sizes.length) throw new IllegalArgumentException("Offsets and Sizes arrays don't match!");
|
||||
if ((blockSize & 511) != 0) throw new IllegalArgumentException("Block size must be a multiple of 512");
|
||||
|
||||
// First we work out which blocks we need to read, and how many they are
|
||||
final LongAVLTreeSet neededBlocks = new LongAVLTreeSet();
|
||||
|
||||
for (int i = 0; i < offsets.length; i++) {
|
||||
for (long block = offsets[i] & -blockSize;
|
||||
block <= ((offsets[i] + sizes[i]) & -blockSize);
|
||||
block+=blockSize)
|
||||
{
|
||||
neededBlocks.add(block);
|
||||
}
|
||||
}
|
||||
|
||||
MemorySegment allMemory = arena.allocate((long) blockSize * neededBlocks.size(), blockSize);
|
||||
|
||||
List<MemorySegment> buffers = new ArrayList<>(sizes.length);
|
||||
List<Long> bufferOffsets = new ArrayList<>(sizes.length);
|
||||
|
||||
final Long2IntAVLTreeMap blockToIdx = new Long2IntAVLTreeMap();
|
||||
LongIterator neededBlockIterator = neededBlocks.longIterator();
|
||||
|
||||
long runStart = -1;
|
||||
long runCurrent = -1;
|
||||
long sliceOffset = 0;
|
||||
|
||||
for (;;) {
|
||||
long nextBlock = neededBlockIterator.nextLong();
|
||||
|
||||
blockToIdx.put(nextBlock, blockToIdx.size());
|
||||
|
||||
if (runStart < 0) runStart = nextBlock;
|
||||
else if (runCurrent + blockSize != nextBlock) {
|
||||
int bufferSize = (int) (blockSize + runCurrent - runStart);
|
||||
bufferOffsets.add(runStart);
|
||||
buffers.add(allMemory.asSlice(sliceOffset, bufferSize));
|
||||
sliceOffset += bufferSize;
|
||||
|
||||
runStart = nextBlock;
|
||||
}
|
||||
|
||||
runCurrent = nextBlock;
|
||||
|
||||
if (!neededBlockIterator.hasNext()) {
|
||||
// If this is the last value, we need to wrap up the final run
|
||||
int bufferSize = (int) (blockSize + runCurrent - runStart);
|
||||
bufferOffsets.add(runStart);
|
||||
buffers.add(allMemory.asSlice(sliceOffset, bufferSize));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Perform the read
|
||||
read(buffers, bufferOffsets);
|
||||
|
||||
// Slice the big memory chunk into the requested slices
|
||||
List<MemorySegment> ret = new ArrayList<>(sizes.length);
|
||||
for (int i = 0; i < offsets.length; i++) {
|
||||
long offset = offsets[i];
|
||||
int size = sizes[i];
|
||||
|
||||
long startBlock = (long) blockSize * blockToIdx.get(offset & -blockSize);
|
||||
long blockOffset = offset & (blockSize - 1);
|
||||
ret.add(allMemory.asSlice(startBlock + blockOffset, size));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
public void close() {
|
||||
for (var ring : rings) {
|
||||
ring.close();
|
||||
|
@@ -23,7 +23,7 @@ public final class UringQueue {
|
||||
return IoUring.uringOpen(fd, size);
|
||||
}
|
||||
|
||||
public int read(List<MemorySegment> dest, List<Long> offsets, boolean direct) {
|
||||
public int readBatch(List<MemorySegment> dest, List<Long> offsets, boolean direct) {
|
||||
try {
|
||||
if (!lock.tryLock(10, TimeUnit.MILLISECONDS))
|
||||
throw new RuntimeException("io_uring slow, likely backpressure!");
|
||||
|
@@ -1,8 +1,14 @@
|
||||
# LongArray C++ Helpers
|
||||
# Native C++ Helpers
|
||||
|
||||
This package contains helper functions for working with LongArray objects,
|
||||
as native C++ calls. The helpers are only built on Linux, and if they are absent,
|
||||
Java substitutes should be used instead.
|
||||
This package contains helper functions for calling native functions.
|
||||
|
||||
### Systems Programming Helpers
|
||||
|
||||
TBW
|
||||
|
||||
### Long Array Helpers.
|
||||
|
||||
The helpers are only built on Linux, and if they are absent, Java substitutes should be used instead.
|
||||
|
||||
Library loading and access is available through the
|
||||
[NativeAlgos](java/nu/marginalia/NativeAlgos.java) class.
|
||||
|
@@ -31,6 +31,81 @@ void close_uring(io_uring* ring) {
|
||||
free(ring);
|
||||
}
|
||||
|
||||
|
||||
int uring_read_submit_and_poll(
|
||||
io_uring* ring,
|
||||
long* result_ids,
|
||||
int in_flight_requests,
|
||||
int read_count,
|
||||
long* read_batch_ids,
|
||||
int* read_fds,
|
||||
void** read_buffers,
|
||||
unsigned int* read_sizes,
|
||||
long* read_offsets)
|
||||
{
|
||||
|
||||
for (int i = 0; i < read_count; i++) {
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
|
||||
if (!sqe) {
|
||||
fprintf(stderr, "uring_queue full!");
|
||||
return -1;
|
||||
}
|
||||
|
||||
io_uring_prep_read(sqe, read_fds[i], read_buffers[i], read_sizes[i], read_offsets[i]);
|
||||
io_uring_sqe_set_data(sqe, (void*) read_batch_ids[i]);
|
||||
}
|
||||
|
||||
int wait_cnt = 8;
|
||||
|
||||
if (wait_cnt > in_flight_requests) {
|
||||
wait_cnt = in_flight_requests;
|
||||
}
|
||||
|
||||
int submitted = io_uring_submit_and_wait(ring, wait_cnt);
|
||||
if (submitted != read_count) {
|
||||
if (submitted < 0) {
|
||||
fprintf(stderr, "io_uring_submit %s\n", strerror(-submitted));
|
||||
}
|
||||
else {
|
||||
fprintf(stderr, "io_uring_submit(): submitted != %d, was %d", read_count, submitted);
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
int completed = 0;
|
||||
struct io_uring_cqe *cqe;
|
||||
while (io_uring_peek_cqe(ring, &cqe) == 0) {
|
||||
if (cqe->res < 0) {
|
||||
fprintf(stderr, "io_uring error: %s\n", strerror(-cqe->res));
|
||||
result_ids[completed++] = -cqe->user_data; // flag an error by sending a negative ID back so we can clean up memory allocation etc
|
||||
}
|
||||
else {
|
||||
result_ids[completed++] = cqe->user_data;
|
||||
}
|
||||
io_uring_cqe_seen(ring, cqe);
|
||||
}
|
||||
|
||||
return completed;
|
||||
}
|
||||
|
||||
int uring_poll(io_uring* ring, long* result_ids)
|
||||
{
|
||||
int completed = 0;
|
||||
struct io_uring_cqe *cqe;
|
||||
while (io_uring_peek_cqe(ring, &cqe) == 0) {
|
||||
if (cqe->res < 0) {
|
||||
fprintf(stderr, "io_uring error: %s\n", strerror(-cqe->res));
|
||||
result_ids[completed++] = -cqe->user_data; // flag an error by sending a negative ID back so we can clean up memory allocation etc
|
||||
}
|
||||
else {
|
||||
result_ids[completed++] = cqe->user_data;
|
||||
}
|
||||
io_uring_cqe_seen(ring, cqe);
|
||||
}
|
||||
|
||||
return completed;
|
||||
}
|
||||
|
||||
int uring_read_buffered(int fd, io_uring* ring, int n, void** buffers, unsigned int* sizes, long* offsets) {
|
||||
|
||||
#ifdef DEBUG_CHECKS
|
||||
|
@@ -0,0 +1,81 @@
|
||||
package nu.marginalia;
|
||||
|
||||
import nu.marginalia.uring.UringFileReader;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.foreign.Arena;
|
||||
import java.lang.foreign.MemorySegment;
|
||||
import java.lang.foreign.ValueLayout;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.List;
|
||||
|
||||
public class UringFileReaderTest {
|
||||
Path testFile;
|
||||
@BeforeEach
|
||||
public void setUp() throws IOException {
|
||||
testFile = Files.createTempFile("UringFileReaderTest", ".dat");
|
||||
}
|
||||
@AfterEach
|
||||
public void tearDown() throws IOException {
|
||||
Files.deleteIfExists(testFile);
|
||||
}
|
||||
|
||||
void createTestFileWithLongs(int size) {
|
||||
ByteBuffer buffer = ByteBuffer.allocateDirect(size * 8);
|
||||
for (int i = 0; i < size; i++) {
|
||||
buffer.putLong(i);
|
||||
}
|
||||
buffer.flip();
|
||||
try (var fc = Files.newByteChannel(testFile, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) {
|
||||
while (buffer.hasRemaining())
|
||||
fc.write(buffer);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testUringFileReader() throws IOException {
|
||||
|
||||
createTestFileWithLongs(1024);
|
||||
|
||||
try (var dfr = new UringFileReader(testFile, false)) {
|
||||
MemorySegment buf1 = Arena.ofAuto().allocate(32, 8);
|
||||
MemorySegment buf2 = Arena.ofAuto().allocate(16, 8);
|
||||
|
||||
dfr.read(List.of(buf1, buf2), List.of(0L, 8L));
|
||||
|
||||
for (int i = 0; i < buf1.byteSize(); i+=8) {
|
||||
System.out.println(buf1.get(ValueLayout.JAVA_LONG, i));
|
||||
}
|
||||
|
||||
for (int i = 0; i < buf2.byteSize(); i+=8) {
|
||||
System.out.println(buf2.get(ValueLayout.JAVA_LONG, i));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void testUringFileReaderUnaligned() throws IOException {
|
||||
createTestFileWithLongs(65536);
|
||||
|
||||
try (var dfr = new UringFileReader(testFile, true)) {
|
||||
var ret = dfr.readUnalignedInDirectMode(Arena.ofAuto(),
|
||||
new long[] { 10*8, 20*8, 5000*8, 5100*8},
|
||||
new int[] { 32*8, 10*8, 100*8, 100*8},
|
||||
4096);
|
||||
System.out.println(ret.get(0).get(ValueLayout.JAVA_LONG, 0));
|
||||
System.out.println(ret.get(1).get(ValueLayout.JAVA_LONG, 0));
|
||||
System.out.println(ret.get(2).get(ValueLayout.JAVA_LONG, 0));
|
||||
System.out.println(ret.get(3).get(ValueLayout.JAVA_LONG, 0));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@@ -38,7 +38,7 @@ class UringQueueTest {
|
||||
offsets.add(32L*i);
|
||||
}
|
||||
var uring = UringQueue.open(fd, 16);
|
||||
uring.read(segments, offsets, false);
|
||||
uring.readBatch(segments, offsets, false);
|
||||
uring.close();
|
||||
|
||||
}
|
||||
|
@@ -0,0 +1,37 @@
|
||||
package nu.marginalia.uring;
|
||||
|
||||
import nu.marginalia.asyncio.AsyncReadRequest;
|
||||
import nu.marginalia.asyncio.UringExecutionQueue;
|
||||
import nu.marginalia.ffi.LinuxSystemCalls;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.lang.foreign.Arena;
|
||||
import java.lang.foreign.MemorySegment;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
class UringExecutionQueueTest {
|
||||
@Test
|
||||
@Disabled
|
||||
public void test() {
|
||||
int fd = LinuxSystemCalls.openDirect(Path.of("/home/vlofgren/test.dat"));
|
||||
MemorySegment ms = Arena.ofAuto().allocate(4096, 4096);
|
||||
try (var eq = new UringExecutionQueue(128)) {
|
||||
for (int i = 0;;i++) {
|
||||
eq.submit(i, List.of(
|
||||
new AsyncReadRequest(fd, ms, 0),
|
||||
new AsyncReadRequest(fd, ms, 0),
|
||||
new AsyncReadRequest(fd, ms, 0),
|
||||
new AsyncReadRequest(fd, ms, 0),
|
||||
new AsyncReadRequest(fd, ms, 0)
|
||||
));
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
finally {
|
||||
LinuxSystemCalls.closeFd(fd);
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user