Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 43 additions & 40 deletions diskann-benchmark/src/backend/disk_index/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,48 +261,51 @@ where
.zip(statistics_vec.par_iter_mut())
.zip(result_counts.par_iter_mut());

zipped.for_each_in_pool(&pool, |(((((q, vf), id_chunk), dist_chunk), stats), rc)| {
let vector_filter = if search_params.vector_filters_file.is_none() {
None
} else {
Some(Box::new(move |vid: &u32| vf.contains(vid))
as Box<dyn Fn(&u32) -> bool + Send + Sync>)
};

match searcher.search(
q,
search_params.recall_at,
l,
Some(search_params.beam_width),
vector_filter,
search_params.is_flat_search,
) {
Ok(search_result) => {
*stats = search_result.stats.query_statistics;
*rc = search_result.results.len() as u32;
let actual_results = search_result
.results
.len()
.min(search_params.recall_at as usize);
for (i, result_item) in search_result
.results
.iter()
.take(actual_results)
.enumerate()
{
id_chunk[i] = result_item.vertex_id;
dist_chunk[i] = result_item.distance;
zipped.for_each_in_pool(
pool.as_ref(),
|(((((q, vf), id_chunk), dist_chunk), stats), rc)| {
let vector_filter = if search_params.vector_filters_file.is_none() {
None
} else {
Some(Box::new(move |vid: &u32| vf.contains(vid))
as Box<dyn Fn(&u32) -> bool + Send + Sync>)
};

match searcher.search(
q,
search_params.recall_at,
l,
Some(search_params.beam_width),
vector_filter,
search_params.is_flat_search,
) {
Ok(search_result) => {
*stats = search_result.stats.query_statistics;
*rc = search_result.results.len() as u32;
let actual_results = search_result
.results
.len()
.min(search_params.recall_at as usize);
for (i, result_item) in search_result
.results
.iter()
.take(actual_results)
.enumerate()
{
id_chunk[i] = result_item.vertex_id;
dist_chunk[i] = result_item.distance;
}
}
Err(e) => {
eprintln!("Search failed for query: {:?}", e);
*rc = 0;
id_chunk.fill(0);
dist_chunk.fill(0.0);
has_any_search_failed.store(true, std::sync::atomic::Ordering::Release);
}
}
Err(e) => {
eprintln!("Search failed for query: {:?}", e);
*rc = 0;
id_chunk.fill(0);
dist_chunk.fill(0.0);
has_any_search_failed.store(true, std::sync::atomic::Ordering::Release);
}
}
});
},
);
let total_time = start.elapsed();

if has_any_search_failed.load(std::sync::atomic::Ordering::Acquire) {
Expand Down
3 changes: 2 additions & 1 deletion diskann-benchmark/src/backend/index/product.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ mod imp {
train_data.as_view(),
input.num_pq_chunks,
&mut StdRng::seed_from_u64(input.seed),
build.num_threads,
diskann_providers::utils::create_thread_pool(build.num_threads)?
.as_ref(),
)?
};

Expand Down
8 changes: 4 additions & 4 deletions diskann-disk/benches/benchmarks/kmeans_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use criterion::Criterion;
use diskann_disk::utils::{compute_vecs_l2sq, k_means_clustering};
use diskann_providers::utils::{create_thread_pool_for_bench, RayonThreadPool};
use diskann_providers::utils::{create_thread_pool_for_bench, RayonThreadPoolRef};
use rand::Rng;

const NUM_POINTS: usize = 100000;
Expand Down Expand Up @@ -37,21 +37,21 @@ pub fn benchmark_kmeans(c: &mut Criterion) {
MAX_KMEANS_REPS,
rng,
&mut false,
&pool,
pool.as_ref(),
)
})
});

group.bench_function("Snrm2 Rust Run", |f| {
f.iter(|| {
let data_copy = data.clone();
snrm2_benchmark_rust(&data_copy, NUM_POINTS, DIM, &pool);
snrm2_benchmark_rust(&data_copy, NUM_POINTS, DIM, pool.as_ref());
})
});
}

/// compute_vecs_l2sq benchmark
fn snrm2_benchmark_rust(data: &[f32], num_points: usize, dim: usize, pool: &RayonThreadPool) {
fn snrm2_benchmark_rust(data: &[f32], num_points: usize, dim: usize, pool: RayonThreadPoolRef<'_>) {
let mut docs_l2sq = vec![0.0; num_points];
compute_vecs_l2sq(&mut docs_l2sq, data, dim, pool).unwrap();
}
4 changes: 2 additions & 2 deletions diskann-disk/benches/benchmarks_iai/kmeans_bench_iai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub fn benchmark_kmeans_iai(data: Vec<f32>) {
MAX_KMEANS_REPS,
rng,
&mut false,
&pool,
pool.as_ref(),
)
.unwrap();

Expand All @@ -60,5 +60,5 @@ pub fn snrm2_benchmark_rust_iai(data: Vec<f32>) {
pub fn snrm2_benchmark_rust(data: &[f32], num_points: usize, dim: usize) {
let mut docs_l2sq = vec![0.0; num_points];
let pool = create_thread_pool_for_bench();
compute_vecs_l2sq(&mut docs_l2sq, data, dim, &pool).unwrap();
compute_vecs_l2sq(&mut docs_l2sq, data, dim, pool.as_ref()).unwrap();
}
16 changes: 8 additions & 8 deletions diskann-disk/src/build/builder/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use diskann_providers::{
},
storage::{AsyncIndexMetadata, DiskGraphOnly, PQStorage},
utils::{
create_thread_pool, find_medoid_with_sampling, RayonThreadPool, VectorDataIterator,
create_thread_pool, find_medoid_with_sampling, RayonThreadPoolRef, VectorDataIterator,
MAX_MEDOID_SAMPLE_SIZE,
},
};
Expand Down Expand Up @@ -233,10 +233,10 @@ where
self.index_configuration.num_threads
);

self.generate_compressed_data(&pool).await?;
self.generate_compressed_data(pool.as_ref()).await?;
logger.log_checkpoint(DiskIndexBuildCheckpoint::PqConstruction);

self.build_inmem_index(&pool).await?;
self.build_inmem_index(pool.as_ref()).await?;
logger.log_checkpoint(DiskIndexBuildCheckpoint::InmemIndexBuild);

// Use physical file to pass the memory index to the disk writer
Expand All @@ -246,7 +246,7 @@ where
Ok(())
}

async fn generate_compressed_data(&mut self, pool: &RayonThreadPool) -> ANNResult<()> {
async fn generate_compressed_data(&mut self, pool: RayonThreadPoolRef<'_>) -> ANNResult<()> {
let num_points = self.index_configuration.max_points;
let num_chunks = self.disk_build_param.search_pq_chunks();

Expand Down Expand Up @@ -289,13 +289,13 @@ where

let generator = QuantDataGenerator::<
Data::VectorDataType,
PQGeneration<Data::VectorDataType, StorageProvider, &RayonThreadPool>,
PQGeneration<Data::VectorDataType, StorageProvider>,
>::new(
self.index_writer.get_dataset_file(),
generator_context,
&quantizer_context,
)?;
let progress = generator.generate_data(storage_provider, &pool, &self.chunking_config)?;
let progress = generator.generate_data(storage_provider, pool, &self.chunking_config)?;

checkpoint_context.update(progress.clone())?;
if let Progress::Processed(progress_point) = progress {
Expand All @@ -310,7 +310,7 @@ where
Ok(())
}

async fn build_inmem_index(&mut self, pool: &RayonThreadPool) -> ANNResult<()> {
async fn build_inmem_index(&mut self, pool: RayonThreadPoolRef<'_>) -> ANNResult<()> {
match determine_build_strategy::<Data>(
&self.index_configuration,
self.disk_build_param.build_memory_limit().in_bytes() as f64,
Expand All @@ -324,7 +324,7 @@ where
}
}

async fn build_merged_vamana_index(&mut self, pool: &RayonThreadPool) -> ANNResult<()> {
async fn build_merged_vamana_index(&mut self, pool: RayonThreadPoolRef<'_>) -> ANNResult<()> {
let mut logger = PerfLogger::new_disk_index_build_logger();
let mut workflow = MergedVamanaIndexWorkflow::new(self, pool);

Expand Down
8 changes: 4 additions & 4 deletions diskann-disk/src/build/builder/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use diskann_providers::{
model::{IndexConfiguration, GRAPH_SLACK_FACTOR, MAX_PQ_TRAINING_SET_SIZE},
storage::PQStorage,
utils::{
load_metadata_from_file, RayonThreadPool, SampleVectorReader, SamplingDensity,
load_metadata_from_file, RayonThreadPoolRef, SampleVectorReader, SamplingDensity,
READ_WRITE_BLOCK_SIZE,
},
};
Expand Down Expand Up @@ -468,7 +468,7 @@ pub(crate) fn determine_build_strategy<Data: GraphDataType>(
}

pub(crate) struct MergedVamanaIndexWorkflow<'a> {
pool: &'a RayonThreadPool,
pool: RayonThreadPoolRef<'a>,
rng: diskann_providers::utils::StandardRng,
dataset_file: String,
max_degree: u32,
Expand All @@ -478,7 +478,7 @@ pub(crate) struct MergedVamanaIndexWorkflow<'a> {
impl<'a> MergedVamanaIndexWorkflow<'a> {
pub(crate) fn new<Data, StorageProvider>(
builder: &mut DiskIndexBuilderCore<'_, Data, StorageProvider>,
pool: &'a RayonThreadPool,
pool: RayonThreadPoolRef<'a>,
) -> Self
where
Data: GraphDataType<VectorIdType = u32>,
Expand Down Expand Up @@ -528,7 +528,7 @@ impl<'a> MergedVamanaIndexWorkflow<'a> {
builder.disk_build_param.build_memory_limit().in_bytes() as f64;
// calculate how many partitions we need, in order to fit in RAM budget
// save id_map for each partition to disk
partition_with_ram_budget::<Data::VectorDataType, _, _, _>(
partition_with_ram_budget::<Data::VectorDataType, _, _>(
&self.dataset_file,
builder.index_configuration.dim,
sampling_rate,
Expand Down
4 changes: 2 additions & 2 deletions diskann-disk/src/build/builder/quantizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use diskann_providers::{
FixedChunkPQTable, IndexConfiguration, MAX_PQ_TRAINING_SET_SIZE,
},
storage::{PQStorage, SQStorage},
utils::{BridgeErr, PQPathNames},
utils::{create_thread_pool, BridgeErr, PQPathNames},
};
use diskann_quantization::scalar::train::ScalarQuantizationParameters;
use diskann_utils::views::MatrixView;
Expand Down Expand Up @@ -63,7 +63,7 @@ impl BuildQuantizer {
MatrixView::try_from(&train_data, train_size, train_dim).bridge_err()?,
num_chunks,
&mut rnd,
index_configuration.num_threads,
create_thread_pool(index_configuration.num_threads)?.as_ref(),
)?
};
// Save at checkpoint. Note the the compressed data path and pivots path here
Expand Down
4 changes: 2 additions & 2 deletions diskann-disk/src/search/provider/disk_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1385,7 +1385,7 @@ mod disk_provider_tests {
queries
.par_row_iter()
.enumerate()
.for_each_in_pool(&pool, |(i, query)| {
.for_each_in_pool(pool.as_ref(), |(i, query)| {
let mut query_stats = QueryStatistics::default();
let mut indices = vec![0u32; 10];
let mut distances = vec![0f32; 10];
Expand Down Expand Up @@ -1445,7 +1445,7 @@ mod disk_provider_tests {
queries
.par_row_iter()
.enumerate()
.for_each_in_pool(&pool, |(i, query)| {
.for_each_in_pool(pool.as_ref(), |(i, query)| {
let result = params
.index_search_engine
.search(query, params.k as u32, params.l as u32, beam_width, None, false)
Expand Down
13 changes: 5 additions & 8 deletions diskann-disk/src/storage/quant/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ use std::{

use diskann::{error::IntoANNResult, utils::VectorRepr, ANNError, ANNResult};
use diskann_providers::storage::{StorageReadProvider, StorageWriteProvider};
use diskann_providers::{
forward_threadpool,
utils::{load_metadata_from_file, AsThreadPool, BridgeErr, ParallelIteratorInPool, Timer},
use diskann_providers::utils::{
load_metadata_from_file, BridgeErr, ParallelIteratorInPool, RayonThreadPoolRef, Timer,
};
use diskann_utils::{io::Metadata, views};
use rayon::iter::IndexedParallelIterator;
Expand Down Expand Up @@ -99,15 +98,14 @@ where
/// 4. Processes data in blocks of size given by chunking_config.data_compression_chunk_vector_count = 50_000
/// 5. Compresses each block in small batch sizes in parallel to (potentially) take advantage of batch compression with quantizer
/// 6. Writes compressed blocks to the output file.
pub fn generate_data<Storage, Pool>(
pub fn generate_data<Storage>(
&self,
storage_provider: &Storage, // Provider for reading source data and writing compressed results
pool: &Pool, // Thread pool for parallel processing
pool: RayonThreadPoolRef<'_>, // Thread pool for parallel processing
chunking_config: &ChunkingConfig, // Configuration for batching and checkpoint handling
) -> ANNResult<Progress>
where
Storage: StorageReadProvider + StorageWriteProvider,
Pool: AsThreadPool,
{
let timer = Timer::new();

Expand Down Expand Up @@ -157,7 +155,6 @@ where

let mut compressed_buffer = vec![0_u8; block_size * compressed_size];

forward_threadpool!(pool = pool: Pool);
//Every block has size exactly block_size, except for potentially the last one
let action = |block_index| -> ANNResult<()> {
let start_index: usize = offset + block_index * block_size;
Expand Down Expand Up @@ -431,7 +428,7 @@ mod generator_tests {
)
.unwrap();
// Run generator
let result = generator.generate_data(storage_provider, &&pool, chunking_config);
let result = generator.generate_data(storage_provider, pool.as_ref(), chunking_config);
(generator, result)
}

Expand Down
Loading
Loading