add an extend method to Nucleo's injector (#74)

* feat(injector): add an `extend` method to Nucleo's injector

* Update lib.rs

Co-authored-by: Michael Davis <mcarsondavis@gmail.com>

* remove benches and #73 patch

* udpates following pascalkuthe's review

* simplification

* adding tests

* remove unused method

---------

Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
This commit is contained in:
Alexandre Pasmantier
2025-02-14 10:34:16 +01:00
committed by GitHub
parent 9918bddeed
commit c754da51d5
3 changed files with 208 additions and 2 deletions

View File

@@ -13,8 +13,8 @@ exclude = ["/typos.toml", "/tarpaulin.toml"]
[dependencies]
nucleo-matcher = { version = "0.3.1", path = "matcher" }
parking_lot = { version = "0.12.1", features = ["send_guard", "arc_lock"]}
parking_lot = { version = "0.12.1", features = ["send_guard", "arc_lock"] }
rayon = "1.7.0"
[workspace]
members = [ "matcher", "bench" ]
members = ["matcher", "bench"]

View File

@@ -23,6 +23,7 @@
use std::alloc::Layout;
use std::cell::UnsafeCell;
use std::fmt::Debug;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, Ordering};
use std::{ptr, slice};
@@ -182,6 +183,94 @@ impl<T> Vec<T> {
index
}
/// Extends the vector by appending multiple elements at once.
pub fn extend<I>(&self, values: I, fill_columns: impl Fn(&T, &mut [Utf32String]))
where
I: IntoIterator<Item = T> + ExactSizeIterator,
{
let count: u32 = values
.len()
.try_into()
.expect("overflowed maximum capacity");
if count == 0 {
assert!(
values.into_iter().next().is_none(),
"The `values` variable reported incorrect length."
);
return;
}
// Reserve all indices at once
let start_index: u32 = self
.inflight
.fetch_add(u64::from(count), Ordering::Release)
.try_into()
.expect("overflowed maximum capacity");
// Compute first and last locations
let start_location = Location::of(start_index);
let end_location = Location::of(start_index + count);
// Eagerly allocate the next bucket if the last entry is close to the end of its next bucket
let alloc_entry = end_location.alloc_next_bucket_entry();
if end_location.entry >= alloc_entry
&& (start_location.bucket != end_location.bucket || start_location.entry <= alloc_entry)
{
// This might be the last bucket, hence the check
if let Some(next_bucket) = self.buckets.get(end_location.bucket as usize + 1) {
Vec::get_or_alloc(next_bucket, end_location.bucket_len << 1, self.columns);
}
}
let mut bucket = unsafe { self.buckets.get_unchecked(start_location.bucket as usize) };
let mut entries = bucket.entries.load(Ordering::Acquire);
if entries.is_null() {
entries = Vec::get_or_alloc(
bucket,
Location::bucket_len(start_location.bucket),
self.columns,
);
}
// Route each value to its corresponding bucket
let mut location;
let count = count as usize;
for (i, v) in values.into_iter().enumerate() {
// ExactSizeIterator is a safe trait that can have bugs/lie about it's size.
// Unsafe code cannot rely on the reported length being correct.
assert!(i < count);
location =
Location::of(start_index + u32::try_from(i).expect("overflowed maximum capacity"));
// if we're starting to insert into a different bucket, allocate it beforehand
if location.entry == 0 && i != 0 {
// safety: `location.bucket` is always in bounds
bucket = unsafe { self.buckets.get_unchecked(location.bucket as usize) };
entries = bucket.entries.load(Ordering::Acquire);
if entries.is_null() {
entries = Vec::get_or_alloc(
bucket,
Location::bucket_len(location.bucket),
self.columns,
);
}
}
unsafe {
let entry = Bucket::get(entries, location.entry, self.columns);
// Initialize matcher columns
for col in Entry::matcher_cols_raw(entry, self.columns) {
col.get().write(MaybeUninit::new(Utf32String::default()));
}
fill_columns(&v, Entry::matcher_cols_mut(entry, self.columns));
(*entry).slot.get().write(MaybeUninit::new(v));
(*entry).active.store(true, Ordering::Release);
}
}
}
/// race to initialize a bucket
fn get_or_alloc(bucket: &Bucket<T>, len: u32, cols: u32) -> *mut Entry<T> {
let entries = unsafe { Bucket::alloc(len, cols) };
@@ -557,6 +646,11 @@ impl Location {
fn bucket_len(bucket: u32) -> u32 {
1 << (bucket + SKIP_BUCKET)
}
/// The entry index at which the next bucket should be pre-allocated.
fn alloc_next_bucket_entry(&self) -> u32 {
self.bucket_len - (self.bucket_len >> 3)
}
}
#[cfg(test)]
@@ -594,4 +688,99 @@ mod tests {
assert_eq!(max.bucket_len, 1 << 31);
assert_eq!(max.entry, (1 << 31) - 1);
}
#[test]
fn extend_unique_bucket() {
let vec = Vec::<u32>::with_capacity(1, 1);
vec.extend(0..10, |_, _| {});
assert_eq!(vec.count(), 10);
for i in 0..10 {
assert_eq!(*vec.get(i).unwrap().data, i);
}
assert!(vec.get(10).is_none());
}
#[test]
fn extend_over_two_buckets() {
let vec = Vec::<u32>::with_capacity(1, 1);
vec.extend(0..100, |_, _| {});
assert_eq!(vec.count(), 100);
for i in 0..100 {
assert_eq!(*vec.get(i).unwrap().data, i);
}
assert!(vec.get(100).is_none());
}
#[test]
fn extend_over_more_than_two_buckets() {
let vec = Vec::<u32>::with_capacity(1, 1);
vec.extend(0..1000, |_, _| {});
assert_eq!(vec.count(), 1000);
for i in 0..1000 {
assert_eq!(*vec.get(i).unwrap().data, i);
}
assert!(vec.get(1000).is_none());
}
#[test]
/// test that ExactSizeIterator returning incorrect length is caught (0 AND more than reported)
fn extend_with_incorrect_reported_len_is_caught() {
struct IncorrectLenIter {
len: usize,
iter: std::ops::Range<u32>,
}
impl Iterator for IncorrectLenIter {
type Item = u32;
fn next(&mut self) -> Option<Self::Item> {
self.iter.next()
}
}
impl ExactSizeIterator for IncorrectLenIter {
fn len(&self) -> usize {
self.len
}
}
let vec = Vec::<u32>::with_capacity(1, 1);
let iter = IncorrectLenIter {
len: 10,
iter: (0..12),
};
// this should panic
assert!(std::panic::catch_unwind(|| vec.extend(iter, |_, _| {})).is_err());
let vec = Vec::<u32>::with_capacity(1, 1);
let iter = IncorrectLenIter {
len: 12,
iter: (0..10),
};
// this shouldn't panic and should just ignore the extra elements
assert!(std::panic::catch_unwind(|| vec.extend(iter, |_, _| {})).is_ok());
// we should reserve 12 elements but only 10 should be present
assert_eq!(vec.count(), 12);
for i in 0..10 {
assert_eq!(*vec.get(i).unwrap().data, i);
}
assert!(vec.get(10).is_none());
let vec = Vec::<u32>::with_capacity(1, 1);
let iter = IncorrectLenIter {
len: 0,
iter: (0..2),
};
// this should panic
assert!(std::panic::catch_unwind(|| vec.extend(iter, |_, _| {})).is_err());
}
// test |values| does not fit in the boxcar
#[test]
fn extend_over_max_capacity() {
let vec = Vec::<u32>::with_capacity(1, 1);
let count = MAX_ENTRIES as usize + 2;
let iter = std::iter::repeat(0).take(count);
assert!(std::panic::catch_unwind(|| vec.extend(iter, |_, _| {})).is_err());
}
}

View File

@@ -79,6 +79,23 @@ impl<T> Injector<T> {
idx
}
/// Appends multiple elements to the list of matched items.
/// This function is lock-free and wait-free.
///
/// You should favor this function over `push` if at least one of the following is true:
/// - the number of items you're adding can be computed beforehand and is typically larger
/// than 1k
/// - you're able to batch incoming items
/// - you're adding items from multiple threads concurrently (this function results in less
/// contention)
pub fn extend<I>(&self, values: I, fill_columns: impl Fn(&T, &mut [Utf32String]))
where
I: IntoIterator<Item = T> + ExactSizeIterator,
{
self.items.extend(values, fill_columns);
(self.notify)();
}
/// Returns the total number of items injected in the matcher. This might
/// not match the number of items in the match snapshot (if the matcher
/// is still running)