1
1
mirror of https://github.com/MarginaliaSearch/MarginaliaSearch.git synced 2025-10-05 21:22:39 +02:00

(native) Clean up native helpers and break them into their own library

This commit is contained in:
Viktor Lofgren
2025-08-10 20:55:27 +02:00
parent 85360e61b2
commit ca283f9684
32 changed files with 503 additions and 260 deletions

View File

@@ -14,7 +14,7 @@ apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation project(':code:libraries:array')
implementation project(':code:libraries:array:cpp')
implementation project(':code:libraries:native')
implementation project(':code:libraries:btree')
implementation project(':code:libraries:coded-sequence')
implementation project(':code:libraries:language-processing')

View File

@@ -1,9 +1,9 @@
package nu.marginalia.index.forward;
import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
import nu.marginalia.NativeAlgos;
import nu.marginalia.array.LongArray;
import nu.marginalia.array.LongArrayFactory;
import nu.marginalia.ffi.LinuxSystemCalls;
import nu.marginalia.index.forward.spans.DocumentSpans;
import nu.marginalia.index.forward.spans.IndexSpansReader;
import nu.marginalia.model.id.UrlIdCodec;
@@ -66,8 +66,8 @@ public class ForwardIndexReader {
ids = loadIds(idsFile);
data = loadData(dataFile);
NativeAlgos.madviseRandom(data.getMemorySegment());
NativeAlgos.madviseRandom(ids.getMemorySegment());
LinuxSystemCalls.madviseRandom(data.getMemorySegment());
LinuxSystemCalls.madviseRandom(ids.getMemorySegment());
spansReader = IndexSpansReader.open(spansFile);

View File

@@ -1,7 +1,7 @@
package nu.marginalia.index.forward.spans;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import nu.marginalia.array.UringFileReader;
import nu.marginalia.uring.UringFileReader;
import java.io.IOException;
import java.lang.foreign.Arena;

View File

@@ -21,6 +21,7 @@ dependencies {
implementation project(':code:common:db')
implementation project(':code:libraries:array')
implementation project(':code:libraries:native')
implementation project(':code:libraries:btree')
implementation project(':code:libraries:term-frequency-dict')
implementation project(':code:common:linkdb')

View File

@@ -15,7 +15,7 @@ apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation project(':code:libraries:array')
implementation project(':code:libraries:array:cpp')
implementation project(':code:libraries:native')
implementation project(':code:libraries:btree')
implementation project(':code:libraries:coded-sequence')
implementation project(':code:libraries:random-write-funnel')

View File

@@ -1,10 +1,10 @@
package nu.marginalia.index;
import nu.marginalia.NativeAlgos;
import nu.marginalia.array.LongArray;
import nu.marginalia.array.LongArrayFactory;
import nu.marginalia.array.pool.BufferPool;
import nu.marginalia.btree.BTreeReader;
import nu.marginalia.ffi.LinuxSystemCalls;
import nu.marginalia.index.positions.PositionsFileReader;
import nu.marginalia.index.positions.TermData;
import nu.marginalia.index.query.*;
@@ -58,8 +58,8 @@ public class FullReverseIndexReader {
this.words = LongArrayFactory.mmapForReadingShared(words);
this.documents = LongArrayFactory.mmapForReadingShared(documents);
NativeAlgos.madviseRandom(this.words.getMemorySegment());
NativeAlgos.madviseRandom(this.documents.getMemorySegment());
LinuxSystemCalls.madviseRandom(this.words.getMemorySegment());
LinuxSystemCalls.madviseRandom(this.documents.getMemorySegment());
dataPool = new BufferPool(documents, SkipListConstants.BLOCK_SIZE, (int) (Long.getLong("index.bufferPoolSize", 512*1024*1024L) / SkipListConstants.BLOCK_SIZE));

View File

@@ -1,9 +1,9 @@
package nu.marginalia.index;
import nu.marginalia.NativeAlgos;
import nu.marginalia.array.LongArray;
import nu.marginalia.array.LongArrayFactory;
import nu.marginalia.btree.BTreeReader;
import nu.marginalia.ffi.LinuxSystemCalls;
import nu.marginalia.index.query.EmptyEntrySource;
import nu.marginalia.index.query.EntrySource;
import org.slf4j.Logger;
@@ -41,7 +41,7 @@ public class PrioReverseIndexReader {
this.words = LongArrayFactory.mmapForReadingShared(words);
NativeAlgos.madviseRandom(this.words.getMemorySegment());
LinuxSystemCalls.madviseRandom(this.words.getMemorySegment());
wordsBTreeReader = new BTreeReader(this.words, ReverseIndexParameters.wordsBTreeContext, 0);
wordsDataOffset = wordsBTreeReader.getHeader().dataOffsetLongs();

View File

@@ -1,6 +1,6 @@
package nu.marginalia.index.positions;
import nu.marginalia.array.UringFileReader;
import nu.marginalia.uring.UringFileReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@@ -21,7 +21,7 @@ dependencies {
implementation libs.lz4
implementation libs.guava
implementation project(':code:libraries:array:cpp')
implementation project(':code:libraries:native')
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit

View File

@@ -1,10 +0,0 @@
#!/usr/bin/env sh
CXX=${CXX:-g++}
if ! which ${CXX} > /dev/null; then
echo "g++ not found, skipping compilation"
exit 0
fi
${CXX} -O3 -march=native -std=c++14 -fPIC -luring -shared -Isrc/main/public src/main/cpp/*.cpp -o resources/libcpp.so

View File

@@ -1,21 +0,0 @@
#include <stdint.h>
#include <liburing.h>
#pragma once
extern "C" {
void ms_sort_64(int64_t* area, uint64_t start, uint64_t end);
void ms_sort_128(int64_t* area, uint64_t start, uint64_t end);
int open_direct_fd(char* filename);
int open_buffered_fd(char* filename);
int read_at(int fd, void* buf, unsigned int count, long offset);
int uring_read_buffered(int fd, io_uring* ring, int n, void** buffers, unsigned int* sizes, long* offsets);
int uring_read_direct(int fd, io_uring* ring, int n, void** buffers, unsigned int* sizes, long* offsets);
void close_fd(int fd);
void madvise_random(void* address, unsigned long size);
void fadvise_random(int fd);
void fadvise_willneed(int fd);
io_uring* initialize_uring(int queue_size);
void close_uring(io_uring* ring);
}

View File

@@ -1,6 +1,6 @@
package nu.marginalia.array;
import nu.marginalia.NativeAlgos;
import nu.marginalia.ffi.LinuxSystemCalls;
import java.io.IOException;
import java.lang.foreign.MemorySegment;
@@ -10,7 +10,7 @@ public class DirectFileReader implements AutoCloseable {
int fd;
public DirectFileReader(Path filename) throws IOException {
fd = NativeAlgos.openDirect(filename);
fd = LinuxSystemCalls.openDirect(filename);
if (fd < 0) {
throw new IOException("Error opening direct file: " + filename);
}
@@ -21,7 +21,7 @@ public class DirectFileReader implements AutoCloseable {
}
public void readAligned(MemorySegment segment, long offset) throws IOException {
if (NativeAlgos.readAt(fd, segment, offset) != segment.byteSize()) {
if (LinuxSystemCalls.readAt(fd, segment, offset) != segment.byteSize()) {
throw new IOException("Failed to read data at " + offset);
}
}
@@ -35,7 +35,7 @@ public class DirectFileReader implements AutoCloseable {
long srcPageEnd = Math.min(srcPageOffset + totalBytesToCopy, 4096);
// wrapper for O_DIRECT pread
if (NativeAlgos.readAt(fd, alignedBuffer, alignedPageAddress) != alignedBuffer.byteSize()) {
if (LinuxSystemCalls.readAt(fd, alignedBuffer, alignedPageAddress) != alignedBuffer.byteSize()) {
throw new IOException("Failed to read data at " + alignedPageAddress + " of size " + dest.byteSize());
}
@@ -50,6 +50,6 @@ public class DirectFileReader implements AutoCloseable {
}
public void close() {
NativeAlgos.closeFd(fd);
LinuxSystemCalls.closeFd(fd);
}
}

View File

@@ -1,7 +1,7 @@
package nu.marginalia.array.algo;
import nu.marginalia.NativeAlgos;
import nu.marginalia.array.LongArray;
import nu.marginalia.ffi.NativeAlgos;
import java.io.IOException;
import java.nio.channels.FileChannel;

View File

@@ -1,6 +1,6 @@
package nu.marginalia.array.pool;
import nu.marginalia.NativeAlgos;
import nu.marginalia.ffi.LinuxSystemCalls;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +44,7 @@ public class BufferPool implements AutoCloseable {
}
public BufferPool(Path filename, int pageSizeBytes, int poolSize) {
this.fd = NativeAlgos.openDirect(filename);
this.fd = LinuxSystemCalls.openDirect(filename);
this.pageSizeBytes = pageSizeBytes;
try {
this.fileSize = Files.size(filename);
@@ -102,7 +102,7 @@ public class BufferPool implements AutoCloseable {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
NativeAlgos.closeFd(fd);
LinuxSystemCalls.closeFd(fd);
arena.close();
System.out.println("Disk read count: " + diskReadCount.get());
@@ -188,7 +188,7 @@ public class BufferPool implements AutoCloseable {
if (getClass().desiredAssertionStatus()) {
buffer.getMemorySegment().set(ValueLayout.JAVA_INT, 0, 9999);
}
NativeAlgos.readAt(fd, buffer.getMemorySegment(), buffer.pageAddress());
LinuxSystemCalls.readAt(fd, buffer.getMemorySegment(), buffer.pageAddress());
assert buffer.getMemorySegment().get(ValueLayout.JAVA_INT, 0) != 9999;
buffer.dirty(false);

View File

@@ -1,8 +1,8 @@
package nu.marginalia.array.page;
import nu.marginalia.NativeAlgos;
import nu.marginalia.array.LongArray;
import nu.marginalia.array.algo.LongArraySort;
import nu.marginalia.ffi.NativeAlgos;
import org.openjdk.jmh.annotations.*;
import java.lang.foreign.Arena;

View File

@@ -1,10 +1,10 @@
package nu.marginalia.array.algo;
package nu.marginalia;
import nu.marginalia.NativeAlgos;
import nu.marginalia.array.DirectFileReader;
import nu.marginalia.array.LongArray;
import nu.marginalia.array.LongArrayFactory;
import nu.marginalia.array.UringFileReader;
import nu.marginalia.ffi.LinuxSystemCalls;
import nu.marginalia.uring.UringFileReader;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -25,11 +25,11 @@ public class NativeAlgosTest {
var ms = Arena.global().allocate(512, 8);
int fd = NativeAlgos.openDirect(Path.of("/tmp/test"));
int ret = NativeAlgos.readAt(fd, ms, 512);
int fd = LinuxSystemCalls.openDirect(Path.of("/tmp/test"));
int ret = LinuxSystemCalls.readAt(fd, ms, 512);
System.out.println(ret);
System.out.println(ms.byteSize());
NativeAlgos.closeFd(fd);
LinuxSystemCalls.closeFd(fd);
var array2 = LongArrayFactory.wrap(ms);
for (int i = 0; i < array2.size(); i++) {

View File

@@ -0,0 +1,95 @@
package nu.marginalia.array;
import nu.marginalia.ffi.LinuxSystemCalls;
import nu.marginalia.uring.UringFileReader;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.file.Path;
import java.util.List;
public class NativeAlgosTest {
@Test
public void test() throws IOException {
LongArray array = LongArrayFactory.mmapForWritingShared(Path.of("/tmp/test"), 1024);
for (int i = 0; i < 1024; i++) {
array.set(i, i);
}
array.close();
var ms = Arena.global().allocate(512, 8);
int fd = LinuxSystemCalls.openDirect(Path.of("/tmp/test"));
int ret = LinuxSystemCalls.readAt(fd, ms, 512);
System.out.println(ret);
System.out.println(ms.byteSize());
LinuxSystemCalls.closeFd(fd);
var array2 = LongArrayFactory.wrap(ms);
for (int i = 0; i < array2.size(); i++) {
System.out.println(i + ": " + array2.get(i));
}
}
@Test
void testDirectFileReader() throws IOException {
LongArray array = LongArrayFactory.mmapForWritingShared(Path.of("/tmp/test"), 1024);
for (int i = 0; i < 1024; i++) {
array.set(i, i);
}
array.close();
try (var dfr = new DirectFileReader(Path.of("/tmp/test"))) {
LongArray array2 = LongArrayFactory.onHeapConfined(64);
dfr.readAligned(array2, 0);
for (int i = 0; i < array2.size(); i++) {
System.out.println(i + ": " + array2.get(i));
}
}
var alignedBuffer = Arena.ofAuto().allocate(4096, 4096);
try (var dfr = new DirectFileReader(Path.of("/tmp/test"))) {
MemorySegment dest = Arena.ofAuto().allocate(504, 1);
dfr.readUnaligned(dest, alignedBuffer, 8);
for (int i = 0; i < dest.byteSize(); i+=8) {
System.out.println(i + ": " + dest.get(ValueLayout.JAVA_LONG, i));
}
dfr.readUnaligned(dest, alignedBuffer, 4000);
for (int i = 0; i < dest.byteSize(); i+=8) {
System.out.println(i + ": " + dest.get(ValueLayout.JAVA_LONG, i));
}
}
}
@Test
void testAioFileReader() throws IOException {
LongArray array = LongArrayFactory.mmapForWritingShared(Path.of("/tmp/test"), 1024);
for (int i = 0; i < 1024; i++) {
array.set(i, i);
}
array.close();
try (var dfr = new UringFileReader(Path.of("/tmp/test"), false)) {
MemorySegment buf1 = Arena.ofAuto().allocate(32, 8);
MemorySegment buf2 = Arena.ofAuto().allocate(16, 8);
dfr.read(List.of(buf1, buf2), List.of(0L, 8L));
for (int i = 0; i < buf1.byteSize(); i+=8) {
System.out.println(buf1.get(ValueLayout.JAVA_LONG, i));
}
for (int i = 0; i < buf2.byteSize(); i+=8) {
System.out.println(buf2.get(ValueLayout.JAVA_LONG, i));
}
}
}
}

15
code/libraries/native/Makefile Executable file
View File

@@ -0,0 +1,15 @@
#!/usr/bin/env sh
CXXFLAGS=-O3 -march=native -std=c++14 -fPIC `pkg-config --cflags liburing`
LDFLAGS=`pkg-config --libs liburing` -shared
CXX=c++
SOURCES=src/sort.cc src/unix.cc src/uring.cc
all: resources/libcpp.so resources/liburing.so
resources/libcpp.so:
${CXX} -shared ${LDFLAGS} ${CXXFLAGS} ${SOURCES} -o resources/libcpp.so
resources/liburing.so:
cp /usr/lib/liburing.so resources/
clean:
rm -rf resources/{libcpp,liburing}.so

View File

@@ -22,18 +22,14 @@ apply from: "$rootProject.projectDir/srcsets.gradle"
// with a shellscript as gradle's c++ tasks are kind of insufferable
tasks.register('compileCpp', Exec) {
inputs.files('compile.sh', 'src/main/cpp/cpphelpers.cpp', 'src/main/public/cpphelpers.h')
outputs.file 'resources/libcpp.so'
commandLine 'sh', 'compile.sh'
inputs.files('Makefile', 'src/sort.cc', 'src/unix.cc', 'src/uring.cc')
outputs.files('resources/libcpp.so', 'resources/liburing.so')
doLast {
// copy /usr/lib/liburing.so to resources
copy {
from '/usr/lib/liburing.so'
into 'resources'
rename { 'liburing.so' }
}
}
commandLine 'make', 'all'
}
tasks.register('cleanCpp', Exec) {
commandLine 'make', 'clean'
}
processResources.dependsOn('compileCpp')
processResources.dependsOn('compileCpp')
clean.dependsOn('cleanCpp')

View File

@@ -0,0 +1,147 @@
package nu.marginalia.ffi;
import nu.marginalia.uring.UringQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.lang.foreign.*;
import java.lang.invoke.MethodHandle;
import java.nio.file.Path;
import java.util.List;
import static java.lang.foreign.ValueLayout.*;
/** This class provides access to wrapper around Linux system calls.
* <p></p>
* isAvailable is a boolean flag that indicates whether the native
* implementations are available. If the shared library cannot be loaded,
* isAvailable will be false. This flag must be checked before calling
* any of the native functions.
* */
@SuppressWarnings("preview")
public class IoUring {
private final MethodHandle uringInit;
private final MethodHandle uringClose;
private final MethodHandle uringReadBuffered;
private final MethodHandle uringReadDirect;
public static final IoUring instance;
/** Indicates whether the native implementations are available */
public static final boolean isAvailable;
private static final Logger logger = LoggerFactory.getLogger(LinuxSystemCalls.class);
private IoUring(Path libFile) {
SymbolLookup libraryLookup = SymbolLookup.libraryLookup(libFile, Arena.global());
var nativeLinker = Linker.nativeLinker();
MemorySegment handle = libraryLookup.findOrThrow("uring_read_buffered");
uringReadBuffered = nativeLinker.downcallHandle(handle, FunctionDescriptor.of(JAVA_INT, JAVA_INT, ADDRESS, JAVA_INT, ADDRESS, ADDRESS, ADDRESS));
handle = libraryLookup.findOrThrow("uring_read_direct");
uringReadDirect = nativeLinker.downcallHandle(handle, FunctionDescriptor.of(JAVA_INT, JAVA_INT, ADDRESS, JAVA_INT, ADDRESS, ADDRESS, ADDRESS));
handle = libraryLookup.findOrThrow("initialize_uring");
uringInit = nativeLinker.downcallHandle(handle, FunctionDescriptor.of(ADDRESS, JAVA_INT));
handle = libraryLookup.findOrThrow("close_uring");
uringClose = nativeLinker.downcallHandle(handle, FunctionDescriptor.ofVoid(ADDRESS));
}
static {
Path libFile;
IoUring nativeAlgosI = null;
// copy resource to temp file so it can be loaded
try (var is = NativeAlgos.class.getClassLoader().getResourceAsStream("liburing.so")) {
var tempFile = File.createTempFile("liburing", ".so");
tempFile.deleteOnExit();
try (var os = new FileOutputStream(tempFile)) {
is.transferTo(os);
os.flush();
}
System.load(tempFile.getAbsolutePath());
}
catch (Exception e) {
logger.info("Failed to load native library, likely not built", e);
}
try (var is = NativeAlgos.class.getClassLoader().getResourceAsStream("libcpp.so")) {
var tempFile = File.createTempFile("libcpp", ".so");
tempFile.deleteOnExit();
try (var os = new FileOutputStream(tempFile)) {
is.transferTo(os);
os.flush();
}
libFile = tempFile.toPath();
nativeAlgosI = new IoUring(libFile);
}
catch (Exception e) {
logger.info("Failed to load native library, likely not built", e);
}
instance = nativeAlgosI;
isAvailable = instance != null;
}
public static UringQueue uringOpen(int fd, int queueSize) {
try {
return new UringQueue((MemorySegment) instance.uringInit.invoke(queueSize), fd);
}
catch (Throwable t) {
throw new RuntimeException("Failed to invoke native function", t);
}
}
public static void uringClose(UringQueue ring) {
try {
instance.uringClose.invoke(ring.pointer());
}
catch (Throwable t) {
throw new RuntimeException("Failed to invoke native function", t);
}
}
public static int uringReadBatch(int fd, UringQueue ring, List<MemorySegment> dest, List<Long> offsets, boolean direct) {
if (offsets.isEmpty()) {
throw new IllegalArgumentException("Empty offset list in uringRead");
}
if (offsets.size() == 1) {
if (LinuxSystemCalls.readAt(fd, dest.getFirst(), offsets.getFirst()) > 0)
return 1;
else return -1;
}
try {
MemorySegment bufferList = Arena.ofAuto().allocate(8L * offsets.size(), 8);
MemorySegment sizeList = Arena.ofAuto().allocate(4L * offsets.size(), 8);
MemorySegment offsetList = Arena.ofAuto().allocate(8L * offsets.size(), 8);
if (dest.size() != offsets.size()) {
throw new IllegalStateException();
}
for (int i = 0; i < offsets.size(); i++) {
var buffer = dest.get(i);
bufferList.setAtIndex(JAVA_LONG, i, buffer.address());
sizeList.setAtIndex(JAVA_INT, i, (int) buffer.byteSize());
offsetList.setAtIndex(JAVA_LONG, i, offsets.get(i));
}
if (direct) {
return (Integer) instance.uringReadDirect.invoke(fd, ring.pointer(), dest.size(), bufferList, sizeList, offsetList);
}
else {
return (Integer) instance.uringReadBuffered.invoke(fd, ring.pointer(), dest.size(), bufferList, sizeList, offsetList);
}
}
catch (Throwable t) {
throw new RuntimeException("Failed to invoke native function", t);
}
}
}

View File

@@ -1,4 +1,4 @@
package nu.marginalia;
package nu.marginalia.ffi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -8,16 +8,10 @@ import java.io.FileOutputStream;
import java.lang.foreign.*;
import java.lang.invoke.MethodHandle;
import java.nio.file.Path;
import java.util.List;
import static java.lang.foreign.ValueLayout.*;
/** This class provides access to native implementations of key algorithms
* used in index construction and querying.
* <p></p>
* The native implementations are provided in a shared library, which is
* loaded at runtime. The shared library is copied from the resources
* to a temporary file, and then loaded using the foreign linker API.
/** This class provides access to wrapper around Linux system calls.
* <p></p>
* isAvailable is a boolean flag that indicates whether the native
* implementations are available. If the shared library cannot be loaded,
@@ -25,9 +19,7 @@ import static java.lang.foreign.ValueLayout.*;
* any of the native functions.
* */
@SuppressWarnings("preview")
public class NativeAlgos {
private final MethodHandle qsortHandle;
private final MethodHandle qsort128Handle;
public class LinuxSystemCalls {
private final MethodHandle openDirect;
private final MethodHandle openBuffered;
private final MethodHandle closeFd;
@@ -36,46 +28,21 @@ public class NativeAlgos {
private final MethodHandle fadviseWillneed;
private final MethodHandle madviseRandom;
private final MethodHandle uringInit;
private final MethodHandle uringClose;
private final MethodHandle uringReadBuffered;
private final MethodHandle uringReadDirect;
public static final NativeAlgos instance;
public static final LinuxSystemCalls instance;
/** Indicates whether the native implementations are available */
public static final boolean isAvailable;
private static final Logger logger = LoggerFactory.getLogger(NativeAlgos.class);
private static final Logger logger = LoggerFactory.getLogger(LinuxSystemCalls.class);
private NativeAlgos(Path libFile) {
private LinuxSystemCalls(Path libFile) {
SymbolLookup libraryLookup = SymbolLookup.libraryLookup(libFile, Arena.global());
var nativeLinker = Linker.nativeLinker();
MemorySegment handle = libraryLookup.findOrThrow("ms_sort_64");
qsortHandle = nativeLinker.downcallHandle(handle, FunctionDescriptor.ofVoid(ADDRESS, JAVA_LONG, JAVA_LONG));
handle = libraryLookup.findOrThrow("ms_sort_128");
qsort128Handle = nativeLinker.downcallHandle(handle,
FunctionDescriptor.ofVoid(ADDRESS, JAVA_LONG, JAVA_LONG));
handle = libraryLookup.findOrThrow("open_direct_fd");
MemorySegment handle = libraryLookup.findOrThrow("open_direct_fd");
openDirect = nativeLinker.downcallHandle(handle, FunctionDescriptor.of(JAVA_INT, ADDRESS));
handle = libraryLookup.findOrThrow("open_buffered_fd");
openBuffered = nativeLinker.downcallHandle(handle, FunctionDescriptor.of(JAVA_INT, ADDRESS));
handle = libraryLookup.findOrThrow("uring_read_buffered");
uringReadBuffered = nativeLinker.downcallHandle(handle, FunctionDescriptor.of(JAVA_INT, JAVA_INT, ADDRESS, JAVA_INT, ADDRESS, ADDRESS, ADDRESS));
handle = libraryLookup.findOrThrow("uring_read_direct");
uringReadDirect = nativeLinker.downcallHandle(handle, FunctionDescriptor.of(JAVA_INT, JAVA_INT, ADDRESS, JAVA_INT, ADDRESS, ADDRESS, ADDRESS));
handle = libraryLookup.findOrThrow("initialize_uring");
uringInit = nativeLinker.downcallHandle(handle, FunctionDescriptor.of(ADDRESS, JAVA_INT));
handle = libraryLookup.findOrThrow("close_uring");
uringClose = nativeLinker.downcallHandle(handle, FunctionDescriptor.ofVoid(ADDRESS));
handle = libraryLookup.findOrThrow("fadvise_random");
fadviseRandom = nativeLinker.downcallHandle(handle, FunctionDescriptor.ofVoid(JAVA_INT));
@@ -93,7 +60,7 @@ public class NativeAlgos {
static {
Path libFile;
NativeAlgos nativeAlgosI = null;
LinuxSystemCalls nativeAlgosI = null;
// copy resource to temp file so it can be loaded
try (var is = NativeAlgos.class.getClassLoader().getResourceAsStream("liburing.so")) {
var tempFile = File.createTempFile("liburing", ".so");
@@ -120,7 +87,7 @@ public class NativeAlgos {
}
libFile = tempFile.toPath();
nativeAlgosI = new NativeAlgos(libFile);
nativeAlgosI = new LinuxSystemCalls(libFile);
}
catch (Exception e) {
logger.info("Failed to load native library, likely not built", e);
@@ -151,62 +118,7 @@ public class NativeAlgos {
public static int readAt(int fd, MemorySegment dest, long offset) {
try {
return (Integer) instance.readAtFd.invoke(fd, dest, (int) dest.byteSize(), offset);
}
catch (Throwable t) {
throw new RuntimeException("Failed to invoke native function", t);
}
}
public static UringQueue uringOpen(int fd, int queueSize) {
try {
return new UringQueue((MemorySegment) instance.uringInit.invoke(queueSize), fd);
}
catch (Throwable t) {
throw new RuntimeException("Failed to invoke native function", t);
}
}
public static void uringClose(UringQueue ring) {
try {
instance.uringClose.invoke(ring.pointer());
}
catch (Throwable t) {
throw new RuntimeException("Failed to invoke native function", t);
}
}
public static int uringRead(int fd, UringQueue ring, List<MemorySegment> dest, List<Long> offsets, boolean direct) {
if (offsets.isEmpty()) {
throw new IllegalArgumentException("Empty offset list in uringRead");
}
if (offsets.size() == 1) {
if (readAt(fd, dest.getFirst(), offsets.getFirst()) > 0)
return 1;
else return -1;
}
try {
MemorySegment bufferList = Arena.ofAuto().allocate(8L * offsets.size(), 8);
MemorySegment sizeList = Arena.ofAuto().allocate(4L * offsets.size(), 8);
MemorySegment offsetList = Arena.ofAuto().allocate(8L * offsets.size(), 8);
if (dest.size() != offsets.size()) {
throw new IllegalStateException();
}
for (int i = 0; i < offsets.size(); i++) {
var buffer = dest.get(i);
bufferList.setAtIndex(JAVA_LONG, i, buffer.address());
sizeList.setAtIndex(JAVA_INT, i, (int) buffer.byteSize());
offsetList.setAtIndex(JAVA_LONG, i, offsets.get(i));
}
if (direct) {
return (Integer) instance.uringReadDirect.invoke(fd, ring.pointer(), dest.size(), bufferList, sizeList, offsetList);
}
else {
return (Integer) instance.uringReadBuffered.invoke(fd, ring.pointer(), dest.size(), bufferList, sizeList, offsetList);
}
}
catch (Throwable t) {
} catch (Throwable t) {
throw new RuntimeException("Failed to invoke native function", t);
}
}
@@ -240,22 +152,4 @@ public class NativeAlgos {
throw new RuntimeException("Failed to invoke native function", t);
}
}
public static void sort(MemorySegment ms, long start, long end) {
try {
instance.qsortHandle.invoke(ms, start, end);
}
catch (Throwable t) {
throw new RuntimeException("Failed to invoke native function", t);
}
}
public static void sort128(MemorySegment ms, long start, long end) {
try {
instance.qsort128Handle.invoke(ms, start, end);
}
catch (Throwable t) {
throw new RuntimeException("Failed to invoke native function", t);
}
}
}

View File

@@ -0,0 +1,108 @@
package nu.marginalia.ffi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.lang.foreign.*;
import java.lang.invoke.MethodHandle;
import java.nio.file.Path;
import static java.lang.foreign.ValueLayout.ADDRESS;
import static java.lang.foreign.ValueLayout.JAVA_LONG;
/** This class provides access to native implementations of key algorithms
* used in index construction and querying.
* <p></p>
* The native implementations are provided in a shared library, which is
* loaded at runtime. The shared library is copied from the resources
* to a temporary file, and then loaded using the foreign linker API.
* <p></p>
* isAvailable is a boolean flag that indicates whether the native
* implementations are available. If the shared library cannot be loaded,
* isAvailable will be false. This flag must be checked before calling
* any of the native functions.
* */
@SuppressWarnings("preview")
public class NativeAlgos {
private final MethodHandle qsortHandle;
private final MethodHandle qsort128Handle;
public static final NativeAlgos instance;
/** Indicates whether the native implementations are available */
public static final boolean isAvailable;
private static final Logger logger = LoggerFactory.getLogger(NativeAlgos.class);
private NativeAlgos(Path libFile) {
SymbolLookup libraryLookup = SymbolLookup.libraryLookup(libFile, Arena.global());
var nativeLinker = Linker.nativeLinker();
MemorySegment handle = libraryLookup.findOrThrow("ms_sort_64");
qsortHandle = nativeLinker.downcallHandle(handle, FunctionDescriptor.ofVoid(ADDRESS, JAVA_LONG, JAVA_LONG));
handle = libraryLookup.findOrThrow("ms_sort_128");
qsort128Handle = nativeLinker.downcallHandle(handle,
FunctionDescriptor.ofVoid(ADDRESS, JAVA_LONG, JAVA_LONG));
}
static {
Path libFile;
NativeAlgos nativeAlgosI = null;
// copy resource to temp file so it can be loaded
try (var is = NativeAlgos.class.getClassLoader().getResourceAsStream("liburing.so")) {
var tempFile = File.createTempFile("liburing", ".so");
tempFile.deleteOnExit();
try (var os = new FileOutputStream(tempFile)) {
is.transferTo(os);
os.flush();
}
System.load(tempFile.getAbsolutePath());
}
catch (Exception e) {
logger.info("Failed to load native library, likely not built", e);
}
try (var is = NativeAlgos.class.getClassLoader().getResourceAsStream("libcpp.so")) {
var tempFile = File.createTempFile("libcpp", ".so");
tempFile.deleteOnExit();
try (var os = new FileOutputStream(tempFile)) {
is.transferTo(os);
os.flush();
}
libFile = tempFile.toPath();
nativeAlgosI = new NativeAlgos(libFile);
}
catch (Exception e) {
logger.info("Failed to load native library, likely not built", e);
}
instance = nativeAlgosI;
isAvailable = instance != null;
}
public static void sort(MemorySegment ms, long start, long end) {
try {
instance.qsortHandle.invoke(ms, start, end);
}
catch (Throwable t) {
throw new RuntimeException("Failed to invoke native function", t);
}
}
public static void sort128(MemorySegment ms, long start, long end) {
try {
instance.qsort128Handle.invoke(ms, start, end);
}
catch (Throwable t) {
throw new RuntimeException("Failed to invoke native function", t);
}
}
}

View File

@@ -1,7 +1,6 @@
package nu.marginalia.array;
package nu.marginalia.uring;
import nu.marginalia.NativeAlgos;
import nu.marginalia.UringQueue;
import nu.marginalia.ffi.LinuxSystemCalls;
import java.io.IOException;
import java.lang.foreign.MemorySegment;
@@ -19,12 +18,12 @@ public class UringFileReader implements AutoCloseable {
public UringFileReader(Path filename, boolean direct) throws IOException {
if (direct) {
fd = NativeAlgos.openDirect(filename);
fd = LinuxSystemCalls.openDirect(filename);
this.direct = true;
}
else {
fd = NativeAlgos.openBuffered(filename);
NativeAlgos.fadviseRandom(fd);
fd = LinuxSystemCalls.openBuffered(filename);
LinuxSystemCalls.fadviseRandom(fd);
this.direct = false;
}
for (int i = 0; i < rings.length; i++) {
@@ -36,7 +35,7 @@ public class UringFileReader implements AutoCloseable {
}
public void fadviseWillneed() {
NativeAlgos.fadviseWillneed(fd);
LinuxSystemCalls.fadviseWillneed(fd);
}
public void read(List<MemorySegment> destinations, List<Long> offsets) {
@@ -46,7 +45,7 @@ public class UringFileReader implements AutoCloseable {
long offset = offsets.get(i);
int ret;
if (ms.byteSize() != (ret = NativeAlgos.readAt(fd, ms, offset))) {
if (ms.byteSize() != (ret = LinuxSystemCalls.readAt(fd, ms, offset))) {
throw new RuntimeException("Read failed, rv=" + ret);
}
}
@@ -71,6 +70,6 @@ public class UringFileReader implements AutoCloseable {
for (var ring : rings) {
ring.close();
}
NativeAlgos.closeFd(fd);
LinuxSystemCalls.closeFd(fd);
}
}

View File

@@ -1,4 +1,6 @@
package nu.marginalia;
package nu.marginalia.uring;
import nu.marginalia.ffi.IoUring;
import java.lang.foreign.MemorySegment;
import java.util.List;
@@ -18,7 +20,7 @@ public final class UringQueue {
}
public static UringQueue open(int fd, int size) {
return NativeAlgos.uringOpen(fd, size);
return IoUring.uringOpen(fd, size);
}
public int read(List<MemorySegment> dest, List<Long> offsets, boolean direct) {
@@ -27,7 +29,7 @@ public final class UringQueue {
throw new RuntimeException("io_uring slow, likely backpressure!");
try {
return NativeAlgos.uringRead(fd, this, dest, offsets, direct);
return IoUring.uringReadBatch(fd, this, dest, offsets, direct);
}
finally {
lock.unlock();
@@ -42,7 +44,7 @@ public final class UringQueue {
}
public void close() {
NativeAlgos.uringClose(this);
IoUring.uringClose(this);
}
public MemorySegment pointer() {

View File

@@ -0,0 +1,34 @@
#include <algorithm>
#include <cstdint>
extern "C" {
/* Pair of 64-bit integers. */
/* The struct is packed to ensure that the struct is exactly 16 bytes in size, as we need to pointer
alias on an array of 8 byte longs. Since structs guarantee that the first element is at offset 0,
and __attribute__((packed)) guarantees that the struct is exactly 16 bytes in size, the only reasonable
implementation is that the struct is laid out as 2 64-bit integers. This assumption works only as
long as there are at most 2 fields.
This is a non-portable low level hack, but all this code strongly assumes a x86-64 Linux environment.
For other environments (e.g. outside of prod), the Java implementation code will have to do.
*/
struct __attribute__((packed)) p64x2 {
int64_t a;
int64_t b;
};
void ms_sort_64(int64_t* area, uint64_t start, uint64_t end) {
std::sort(&area[start], &area[end]);
}
void ms_sort_128(int64_t* area, uint64_t start, uint64_t end) {
std::sort(
reinterpret_cast<p64x2 *>(&area[start]),
reinterpret_cast<p64x2 *>(&area[end]),
[](const p64x2& fst, const p64x2& snd) {
return fst.a < snd.a;
});
}
}

View File

@@ -0,0 +1,35 @@
#include <algorithm>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <cstring>
#include <sys/mman.h>
extern "C" {
void fadvise_random(int fd) {
posix_fadvise(fd, 0, 0, POSIX_FADV_RANDOM);
}
void fadvise_willneed(int fd) {
posix_fadvise(fd, 0, 0, POSIX_FADV_WILLNEED);
}
void madvise_random(void* address, unsigned long size) {
madvise(address, size, MADV_RANDOM);
}
int open_buffered_fd(char* filename) {
return open(filename, O_RDONLY);
}
int open_direct_fd(char* filename) {
return open(filename, O_DIRECT | O_RDONLY);
}
int read_at(int fd, void* buf, unsigned int count, long offset) {
return pread(fd, buf, count, offset);
}
void close_fd(int fd) {
close(fd);
}
}

View File

@@ -1,49 +1,11 @@
#include "cpphelpers.hpp"
#include <algorithm>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <liburing.h>
#include <cstring>
#include <sys/mman.h>
#include <string.h>
extern "C" {
/* Pair of 64-bit integers. */
/* The struct is packed to ensure that the struct is exactly 16 bytes in size, as we need to pointer
alias on an array of 8 byte longs. Since structs guarantee that the first element is at offset 0,
and __attribute__((packed)) guarantees that the struct is exactly 16 bytes in size, the only reasonable
implementation is that the struct is laid out as 2 64-bit integers. This assumption works only as
long as there are at most 2 fields.
This is a non-portable low level hack, but all this code strongly assumes a x86-64 Linux environment.
For other environments (e.g. outside of prod), the Java implementation code will have to do.
*/
struct __attribute__((packed)) p64x2 {
int64_t a;
int64_t b;
};
void ms_sort_64(int64_t* area, uint64_t start, uint64_t end) {
std::sort(&area[start], &area[end]);
}
void ms_sort_128(int64_t* area, uint64_t start, uint64_t end) {
std::sort(
reinterpret_cast<p64x2 *>(&area[start]),
reinterpret_cast<p64x2 *>(&area[end]),
[](const p64x2& fst, const p64x2& snd) {
return fst.a < snd.a;
});
}
void fadvise_random(int fd) {
posix_fadvise(fd, 0, 0, POSIX_FADV_RANDOM);
}
void fadvise_willneed(int fd) {
posix_fadvise(fd, 0, 0, POSIX_FADV_WILLNEED);
}
void madvise_random(void* address, unsigned long size) {
madvise(address, size, MADV_RANDOM);
}
io_uring* initialize_uring(int queue_size) {
io_uring* ring = (io_uring*) malloc(sizeof(io_uring));
if (!ring) return NULL;
@@ -194,20 +156,4 @@ int uring_read_direct(int fd, io_uring* ring, int n, void** buffers, unsigned in
return n;
}
int open_buffered_fd(char* filename) {
return open(filename, O_RDONLY);
}
int open_direct_fd(char* filename) {
return open(filename, O_DIRECT | O_RDONLY);
}
int read_at(int fd, void* buf, unsigned int count, long offset) {
return pread(fd, buf, count, offset);
}
void close_fd(int fd) {
close(fd);
}
}

View File

@@ -1,5 +1,7 @@
package nu.marginalia;
import nu.marginalia.ffi.LinuxSystemCalls;
import nu.marginalia.uring.UringQueue;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -26,7 +28,7 @@ class UringQueueTest {
@Test
public void testSunnyDay() throws IOException {
int fd = NativeAlgos.openBuffered(file);
int fd = LinuxSystemCalls.openBuffered(file);
List<MemorySegment> segments = new ArrayList<>();
List<Long> offsets = new ArrayList<>();

View File

@@ -43,7 +43,7 @@ include 'code:index:index-reverse'
include 'code:index:index-perftest'
include 'code:libraries:array'
include 'code:libraries:array:cpp'
include 'code:libraries:native'
include 'code:libraries:coded-sequence'
include 'code:libraries:geo-ip'
include 'code:libraries:btree'