diff --git a/src/VecSim/algorithms/svs/svs.h b/src/VecSim/algorithms/svs/svs.h index 59b056d3c..7edc0abae 100644 --- a/src/VecSim/algorithms/svs/svs.h +++ b/src/VecSim/algorithms/svs/svs.h @@ -39,12 +39,23 @@ struct SVSIndexBase virtual ~SVSIndexBase() = default; virtual int addVectors(const void *vectors_data, const labelType *labels, size_t n) = 0; virtual int deleteVectors(const labelType *labels, size_t n) = 0; + virtual bool isLabelExists(labelType label) const = 0; virtual size_t indexStorageSize() const = 0; virtual size_t getNumThreads() const = 0; virtual void setNumThreads(size_t numThreads) = 0; virtual size_t getThreadPoolCapacity() const = 0; virtual bool isCompressed() const = 0; size_t getNumMarkedDeleted() const { return num_marked_deleted; } + + // Abstract handler to manage SVS implementation instance + // declared to avoid unsafe unique_ptr usage + // Derived SVSIndex class should implement it + struct ImplHandler { + virtual ~ImplHandler() = default; + }; + virtual std::unique_ptr createImpl(const void *vectors_data, + const labelType *labels, size_t n) = 0; + virtual void setImpl(std::unique_ptr impl) = 0; #ifdef BUILD_TESTS virtual svs::logging::logger_ptr getLogger() const = 0; #endif @@ -144,7 +155,8 @@ class SVSIndex : public VecSimIndexAbstract, fl // Create SVS index instance with initial data // Data should not be empty template - void initImpl(const Dataset &points, std::span ids) { + std::unique_ptr initImpl(const Dataset &points, + std::span ids) const { svs::threads::ThreadPoolHandle threadpool_handle{VecSimSVSThreadPool{threadpool_}}; // Construct SVS index initial storage with compression if needed @@ -160,25 +172,26 @@ class SVSIndex : public VecSimIndexAbstract, fl // Construct initial Vamana Graph auto graph = - graph_builder_t::build_graph(parameters, data, distance, threadpool_, entry_point, + graph_builder_t::build_graph(parameters, data, distance, threadpool_handle, entry_point, this->blockSize, this->getAllocator(), logger_); // Create SVS MutableIndex instance - impl_ = std::make_unique(std::move(graph), std::move(data), entry_point, - std::move(distance), ids, threadpool_, logger_); + auto impl = std::make_unique(std::move(graph), std::move(data), entry_point, + std::move(distance), ids, threadpool_, logger_); // Set SVS MutableIndex build parameters to be used in future updates - impl_->set_construction_window_size(parameters.window_size); - impl_->set_max_candidates(parameters.max_candidate_pool_size); - impl_->set_prune_to(parameters.prune_to); - impl_->set_alpha(parameters.alpha); - impl_->set_full_search_history(parameters.use_full_search_history); + impl->set_construction_window_size(parameters.window_size); + impl->set_max_candidates(parameters.max_candidate_pool_size); + impl->set_prune_to(parameters.prune_to); + impl->set_alpha(parameters.alpha); + impl->set_full_search_history(parameters.use_full_search_history); // Configure default search parameters - auto sp = impl_->get_search_parameters(); + auto sp = impl->get_search_parameters(); sp.buffer_config({this->search_window_size, this->search_buffer_capacity}); - impl_->set_search_parameters(sp); - impl_->reset_performance_parameters(); + impl->set_search_parameters(sp); + impl->reset_performance_parameters(); + return impl; } // Preprocess batch of vectors @@ -204,6 +217,42 @@ class SVSIndex : public VecSimIndexAbstract, fl return processed_blob; } + // Handler to manage SVS implementation instance + struct SVSImplHandler : public SVSIndexBase::ImplHandler { + std::unique_ptr impl; + SVSImplHandler(std::unique_ptr impl) : impl{std::move(impl)} {} + }; + + std::unique_ptr createImpl(const void *vectors_data, const labelType *labels, + size_t n) override { + // If no data provided, return empty handler + if (n == 0) { + return std::make_unique(nullptr); + } + + std::span ids(labels, n); + auto processed_blob = this->preprocessForBatchStorage(vectors_data, n); + auto typed_vectors_data = static_cast(processed_blob.get()); + // Wrap data into SVS SimpleDataView for SVS API + auto points = svs::data::SimpleDataView{typed_vectors_data, n, this->dim}; + + return std::make_unique(initImpl(points, ids)); + } + + void setImpl(std::unique_ptr handler) override { + assert(handler && "SVSIndex::setImpl called with null handler"); + assert(impl_ == nullptr); // Should be called only on empty impl_ + if (impl_ != nullptr) { + throw std::logic_error("SVSIndex::setImpl called on non-empty impl_"); + } + + SVSImplHandler *svs_handler = dynamic_cast(handler.get()); + if (!svs_handler) { + throw std::logic_error("Failed to cast to SVSImplHandler"); + } + this->impl_ = std::move(svs_handler->impl); + } + // Assuming numThreads was updated to reflect the number of available threads before this // function was called. // This function assumes that the caller has already set numThreads to the appropriate value @@ -230,7 +279,7 @@ class SVSIndex : public VecSimIndexAbstract, fl if (!impl_) { // SVS index instance cannot be empty, so we have to construct it at first rows - initImpl(points, ids); + impl_ = initImpl(points, ids); } else { // Add new points to existing SVS index impl_->add_points(points, ids); @@ -239,6 +288,17 @@ class SVSIndex : public VecSimIndexAbstract, fl return n - deleted_num; } + int deleteVectorImpl(const labelType label) { + if (indexLabelCount() == 0 || !impl_->has_id(label)) { + return 0; + } + + const auto deleted_num = impl_->delete_entries(std::span{&label, 1}); + + this->markIndexUpdate(deleted_num); + return deleted_num; + } + int deleteVectorsImpl(const labelType *labels, size_t n) { if (indexLabelCount() == 0) { return 0; @@ -257,19 +317,8 @@ class SVSIndex : public VecSimIndexAbstract, fl return 0; } - // If entries_to_delete.size() == 1, we should ensure single-threading - const size_t current_num_threads = getNumThreads(); - if (n == 1 && current_num_threads > 1) { - setNumThreads(1); - } - const auto deleted_num = impl_->delete_entries(entries_to_delete); - // Restore multi-threading if needed - if (n == 1 && current_num_threads > 1) { - setNumThreads(current_num_threads); - } - this->markIndexUpdate(deleted_num); return deleted_num; } @@ -484,12 +533,16 @@ class SVSIndex : public VecSimIndexAbstract, fl return addVectorsImpl(vectors_data, labels, n); } - int deleteVector(labelType label) override { return deleteVectorsImpl(&label, 1); } + int deleteVector(labelType label) override { return deleteVectorImpl(label); } int deleteVectors(const labelType *labels, size_t n) override { return deleteVectorsImpl(labels, n); } + bool isLabelExists(labelType label) const override { + return impl_ ? impl_->has_id(label) : false; + } + size_t getNumThreads() const override { return threadpool_.size(); } void setNumThreads(size_t numThreads) override { threadpool_.resize(numThreads); } diff --git a/src/VecSim/algorithms/svs/svs_tiered.h b/src/VecSim/algorithms/svs/svs_tiered.h index 351122367..ceeece1f3 100644 --- a/src/VecSim/algorithms/svs/svs_tiered.h +++ b/src/VecSim/algorithms/svs/svs_tiered.h @@ -672,12 +672,28 @@ class TieredSVSIndex : public VecSimTieredIndex { executeTracingCallback("UpdateJob::before_add_to_svs"); { // lock backend index for writing and add vectors there - std::lock_guard lock(this->mainIndexGuard); + std::shared_lock main_shared_lock(this->mainIndexGuard); auto svs_index = GetSVSIndex(); assert(labels_to_move.size() == vectors_to_move.size() / this->frontendIndex->getDim()); - svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size())); - svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(), - labels_to_move.size()); + if (this->backendIndex->indexSize() == 0) { + // If backend index is empty, we need to initialize it first. + svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size())); + auto impl = svs_index->createImpl(vectors_to_move.data(), labels_to_move.data(), + labels_to_move.size()); + + // Upgrade to unique lock to set the new impl + main_shared_lock.unlock(); + std::lock_guard lock(this->mainIndexGuard); + svs_index->setImpl(std::move(impl)); + } else { + // Backend index is initialized - just add the vectors. + main_shared_lock.unlock(); + std::lock_guard lock(this->mainIndexGuard); + // Upgrade to unique lock to add vectors + svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size())); + svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(), + labels_to_move.size()); + } } executeTracingCallback("UpdateJob::after_add_to_svs"); // clean-up frontend index @@ -801,8 +817,15 @@ class TieredSVSIndex : public VecSimTieredIndex { } } // Remove vector from the backend index if it exists in case of non-MULTI. - std::lock_guard lock(this->mainIndexGuard); - ret -= svs_index->deleteVectors(&label, 1); + auto label_exists = [&]() { + std::shared_lock lock(this->mainIndexGuard); + return svs_index->isLabelExists(label); + }(); + + if (label_exists) { + std::lock_guard lock(this->mainIndexGuard); + ret -= this->backendIndex->deleteVector(label); + } } { // Add vector to the frontend index. std::lock_guard lock(this->flatIndexGuard); @@ -887,9 +910,15 @@ class TieredSVSIndex : public VecSimTieredIndex { std::lock_guard lock(this->flatIndexGuard); ret = this->deleteAndRecordSwaps_Unsafe(label); } - { + + label_exists = [&]() { + std::shared_lock lock(this->mainIndexGuard); + return svs_index->isLabelExists(label); + }(); + + if (label_exists) { std::lock_guard lock(this->mainIndexGuard); - ret += svs_index->deleteVectors(&label, 1); + ret += this->backendIndex->deleteVector(label); } return ret; } diff --git a/tests/benchmark/bm_initialization/bm_basics_svs_initialize_fp32.h b/tests/benchmark/bm_initialization/bm_basics_svs_initialize_fp32.h index 2576df29c..bdbb7e007 100644 --- a/tests/benchmark/bm_initialization/bm_basics_svs_initialize_fp32.h +++ b/tests/benchmark/bm_initialization/bm_basics_svs_initialize_fp32.h @@ -40,3 +40,17 @@ BENCHMARK_REGISTER_F(BM_VecSimSVS, BM_FUNC_NAME(BM_TriggerUpdateTiered)) {2, 4, 8}}) ->ArgNames({"update_threshold", "thread_count"}) ->MeasureProcessCPUTime(); + +// Add vectors to reach training threshold in tiered index, and then add more vectors in parallel to +// backend training job. Measure time to add new vectors in this scenario. +BENCHMARK_TEMPLATE_DEFINE_F(BM_VecSimSVS, BM_FUNC_NAME(BM_AddVectorsDuringTraining), + DATA_TYPE_INDEX_T) +(benchmark::State &st) { AddVectorsDuringTraining(st); } +BENCHMARK_REGISTER_F(BM_VecSimSVS, BM_FUNC_NAME(BM_AddVectorsDuringTraining)) + ->Unit(benchmark::kMillisecond) + ->Iterations(1) + ->ArgsProduct({{static_cast(BM_VecSimGeneral::block_size), 5000, + static_cast(10 * BM_VecSimGeneral::block_size)}, + {2, 4}}) + ->ArgNames({"training_threshold", "thread_count"}) + ->UseRealTime(); diff --git a/tests/benchmark/bm_vecsim_svs.h b/tests/benchmark/bm_vecsim_svs.h index 71a0c9f1c..b3025c209 100644 --- a/tests/benchmark/bm_vecsim_svs.h +++ b/tests/benchmark/bm_vecsim_svs.h @@ -47,6 +47,10 @@ class BM_VecSimSVS : public BM_VecSimGeneral { // index. void TriggerUpdateTiered(benchmark::State &st); + // Add vectors to reach training threshold in tiered index, and then add more vectors in + // parallel to backend training job. Measure time to add new vectors in this scenario. + void AddVectorsDuringTraining(benchmark::State &st); + // Deletes an amount of labels from the index that triggers inplace consolidation. void RunGC(benchmark::State &st); @@ -403,6 +407,44 @@ void BM_VecSimSVS::TriggerUpdateTiered(benchmark::State &st) { .str()); } +template +inline void BM_VecSimSVS::AddVectorsDuringTraining(benchmark::State &st) { + // ensure mode is async + ASSERT_EQ(VecSimIndexInterface::asyncWriteMode, VecSim_WriteAsync); + + auto training_threshold = st.range(0); + int unsigned num_threads = st.range(1); + + if (num_threads > std::thread::hardware_concurrency()) { + GTEST_SKIP() << "Not enough threads available, skipping test..."; + } + + // Ensure we have enough vectors to train. + ASSERT_GE(N_QUERIES, training_threshold); + + // In each iteration create a new index + auto mock_thread_pool = tieredIndexMock(num_threads); + ASSERT_EQ(mock_thread_pool.thread_pool_size, num_threads); + auto *tiered_index = CreateTieredSVSIndex( + mock_thread_pool, training_threshold, + 1 << 30); // set very high update threshold to avoid updates during this test + + // Add vectors to reach training threshold and trigger training. + for (size_t i = 0; i < training_threshold; ++i) { + VecSimIndex_AddVector(tiered_index, test_vectors[i].data(), i); + } + mock_thread_pool.init_threads(); + size_t label = training_threshold; + + for (auto _ : st) { + // While the backend is training, keep adding vectors in parallel to the training job. + for (size_t i = 0; i < 1000; i++, label++) { + VecSimIndex_AddVector(tiered_index, test_vectors[label].data(), label); + } + } + mock_thread_pool.thread_pool_join(); +} + template void BM_VecSimSVS::RunGC(benchmark::State &st) {