Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 79 additions & 25 deletions src/VecSim/algorithms/svs/svs.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,23 @@ struct SVSIndexBase
virtual ~SVSIndexBase() = default;
virtual int addVectors(const void *vectors_data, const labelType *labels, size_t n) = 0;
virtual int deleteVectors(const labelType *labels, size_t n) = 0;
virtual bool isLabelExists(labelType label) const = 0;
virtual size_t indexStorageSize() const = 0;
virtual size_t getNumThreads() const = 0;
virtual void setNumThreads(size_t numThreads) = 0;
virtual size_t getThreadPoolCapacity() const = 0;
virtual bool isCompressed() const = 0;
size_t getNumMarkedDeleted() const { return num_marked_deleted; }

// Abstract handler to manage SVS implementation instance
// declared to avoid unsafe unique_ptr<void> usage
// Derived SVSIndex class should implement it
struct ImplHandler {
virtual ~ImplHandler() = default;
};
virtual std::unique_ptr<ImplHandler> createImpl(const void *vectors_data,
const labelType *labels, size_t n) = 0;
virtual void setImpl(std::unique_ptr<ImplHandler> impl) = 0;
#ifdef BUILD_TESTS
virtual svs::logging::logger_ptr getLogger() const = 0;
#endif
Expand Down Expand Up @@ -144,7 +155,8 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
// Create SVS index instance with initial data
// Data should not be empty
template <svs::data::ImmutableMemoryDataset Dataset>
void initImpl(const Dataset &points, std::span<const labelType> ids) {
std::unique_ptr<impl_type> initImpl(const Dataset &points,
std::span<const labelType> ids) const {
svs::threads::ThreadPoolHandle threadpool_handle{VecSimSVSThreadPool{threadpool_}};

// Construct SVS index initial storage with compression if needed
Expand All @@ -160,25 +172,26 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl

// Construct initial Vamana Graph
auto graph =
graph_builder_t::build_graph(parameters, data, distance, threadpool_, entry_point,
graph_builder_t::build_graph(parameters, data, distance, threadpool_handle, entry_point,
this->blockSize, this->getAllocator(), logger_);

// Create SVS MutableIndex instance
impl_ = std::make_unique<impl_type>(std::move(graph), std::move(data), entry_point,
std::move(distance), ids, threadpool_, logger_);
auto impl = std::make_unique<impl_type>(std::move(graph), std::move(data), entry_point,
std::move(distance), ids, threadpool_, logger_);

// Set SVS MutableIndex build parameters to be used in future updates
impl_->set_construction_window_size(parameters.window_size);
impl_->set_max_candidates(parameters.max_candidate_pool_size);
impl_->set_prune_to(parameters.prune_to);
impl_->set_alpha(parameters.alpha);
impl_->set_full_search_history(parameters.use_full_search_history);
impl->set_construction_window_size(parameters.window_size);
impl->set_max_candidates(parameters.max_candidate_pool_size);
impl->set_prune_to(parameters.prune_to);
impl->set_alpha(parameters.alpha);
impl->set_full_search_history(parameters.use_full_search_history);

// Configure default search parameters
auto sp = impl_->get_search_parameters();
auto sp = impl->get_search_parameters();
sp.buffer_config({this->search_window_size, this->search_buffer_capacity});
impl_->set_search_parameters(sp);
impl_->reset_performance_parameters();
impl->set_search_parameters(sp);
impl->reset_performance_parameters();
return impl;
}

// Preprocess batch of vectors
Expand All @@ -204,6 +217,42 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
return processed_blob;
}

// Handler to manage SVS implementation instance
struct SVSImplHandler : public SVSIndexBase::ImplHandler {
std::unique_ptr<impl_type> impl;
SVSImplHandler(std::unique_ptr<impl_type> impl) : impl{std::move(impl)} {}
};

std::unique_ptr<ImplHandler> createImpl(const void *vectors_data, const labelType *labels,
size_t n) override {
// If no data provided, return empty handler
if (n == 0) {
return std::make_unique<SVSImplHandler>(nullptr);
}

std::span<const labelType> ids(labels, n);
auto processed_blob = this->preprocessForBatchStorage(vectors_data, n);
auto typed_vectors_data = static_cast<DataType *>(processed_blob.get());
// Wrap data into SVS SimpleDataView for SVS API
auto points = svs::data::SimpleDataView<DataType>{typed_vectors_data, n, this->dim};

Comment on lines +233 to +238
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seems to be a duplication of what we do in addVectorsImpl. Consider unifying these into a single function

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main point here is the processed_blob which lifetime should be managed till end of initImpl() and impl_->add_points() calls.
A single function, which will wrap all this code would look like:

std::tuple<std::span<const labelType>, MemoryUtils::unique_blob, svs::data::SimpleDataView<DataType>> preprocessAndPrepareSVSArgs(...)

return std::make_unique<SVSImplHandler>(initImpl(points, ids));
}

void setImpl(std::unique_ptr<ImplHandler> handler) override {
assert(handler && "SVSIndex::setImpl called with null handler");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a debug-only assert, let's add this to the log in a warning level as well

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert is added just to simplify logic error catching in DEBUG mode - as well as the next-line assert.
In release mode, the logic_error will be thrown later if handler is null.

assert()s here are not really needed - except for debugging purposes.
I can just remove them.

assert(impl_ == nullptr); // Should be called only on empty impl_
if (impl_ != nullptr) {
throw std::logic_error("SVSIndex::setImpl called on non-empty impl_");
}

SVSImplHandler *svs_handler = dynamic_cast<SVSImplHandler *>(handler.get());
if (!svs_handler) {
throw std::logic_error("Failed to cast to SVSImplHandler");
}
Comment on lines +249 to +252
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the motivation to have an abstract ImplHandler rather than have only SVSImplHandler? The dynamic_cast here seems a bit awkward

Copy link
Collaborator Author

@rfsaliev rfsaliev Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SVSImplHandler is not just a simple type - it is template class with a number of parameters, and it's full declaration looks like:

template <typename MetricType,
          typename DataType,
          bool isMulti,
          size_t QuantBits,
          size_t ResidualBits,
          bool IsLeanVec>
struct SVSIndex<MetricType, DataType, isMulti, QuantBits, ResidualBits, IsLeanVec>::SVSImplHandler;

This why the abstract SVSIndexBase::ImplHandler is defined for client code (TieredSVSIndex).

this->impl_ = std::move(svs_handler->impl);
}

// Assuming numThreads was updated to reflect the number of available threads before this
// function was called.
// This function assumes that the caller has already set numThreads to the appropriate value
Expand All @@ -230,7 +279,7 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl

if (!impl_) {
// SVS index instance cannot be empty, so we have to construct it at first rows
initImpl(points, ids);
impl_ = initImpl(points, ids);
} else {
// Add new points to existing SVS index
impl_->add_points(points, ids);
Expand All @@ -239,6 +288,18 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
return n - deleted_num;
}

int deleteVectorImpl(const labelType label) {
if (indexLabelCount() == 0 || !impl_->has_id(label)) {
return 0;
}

const auto deleted_num = impl_->delete_entries(std::span{&label, 1});
assert(deleted_num == 1);

this->markIndexUpdate(deleted_num);
return deleted_num;
}

int deleteVectorsImpl(const labelType *labels, size_t n) {
if (indexLabelCount() == 0) {
return 0;
Expand All @@ -257,19 +318,8 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
return 0;
}

// If entries_to_delete.size() == 1, we should ensure single-threading
const size_t current_num_threads = getNumThreads();
if (n == 1 && current_num_threads > 1) {
setNumThreads(1);
}

const auto deleted_num = impl_->delete_entries(entries_to_delete);

// Restore multi-threading if needed
if (n == 1 && current_num_threads > 1) {
setNumThreads(current_num_threads);
}

this->markIndexUpdate(deleted_num);
return deleted_num;
}
Expand Down Expand Up @@ -484,12 +534,16 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
return addVectorsImpl(vectors_data, labels, n);
}

int deleteVector(labelType label) override { return deleteVectorsImpl(&label, 1); }
int deleteVector(labelType label) override { return deleteVectorImpl(label); }

int deleteVectors(const labelType *labels, size_t n) override {
return deleteVectorsImpl(labels, n);
}

bool isLabelExists(labelType label) const override {
return impl_ ? impl_->has_id(label) : false;
}

size_t getNumThreads() const override { return threadpool_.size(); }
void setNumThreads(size_t numThreads) override { threadpool_.resize(numThreads); }

Expand Down
45 changes: 37 additions & 8 deletions src/VecSim/algorithms/svs/svs_tiered.h
Original file line number Diff line number Diff line change
Expand Up @@ -672,12 +672,28 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {

executeTracingCallback("UpdateJob::before_add_to_svs");
{ // lock backend index for writing and add vectors there
std::lock_guard lock(this->mainIndexGuard);
std::shared_lock main_shared_lock(this->mainIndexGuard);
auto svs_index = GetSVSIndex();
assert(labels_to_move.size() == vectors_to_move.size() / this->frontendIndex->getDim());
svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size()));
svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(),
labels_to_move.size());
if (this->backendIndex->indexSize() == 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this also handle re-initialization after the index was emptied? Is that scenario tested?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as it was before in SVSIndex::AddVectors()

// If backend index is empty, we need to initialize it first.
svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size()));
auto impl = svs_index->createImpl(vectors_to_move.data(), labels_to_move.data(),
labels_to_move.size());

// Upgrade to unique lock to set the new impl
main_shared_lock.unlock();
std::lock_guard lock(this->mainIndexGuard);
svs_index->setImpl(std::move(impl));
} else {
// Backend index is initialized - just add the vectors.
main_shared_lock.unlock();
std::lock_guard lock(this->mainIndexGuard);
// Upgrade to unique lock to add vectors
svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size()));
svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(),
labels_to_move.size());
}
}
executeTracingCallback("UpdateJob::after_add_to_svs");
// clean-up frontend index
Expand Down Expand Up @@ -801,8 +817,15 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
}
}
// Remove vector from the backend index if it exists in case of non-MULTI.
std::lock_guard lock(this->mainIndexGuard);
ret -= svs_index->deleteVectors(&label, 1);
auto label_exists = [&]() {
std::shared_lock lock(this->mainIndexGuard);
return svs_index->isLabelExists(label);
}();

if (label_exists) {
std::lock_guard lock(this->mainIndexGuard);
ret -= this->backendIndex->deleteVector(label);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-atomic backend delete label check

Medium Severity

addVector and deleteVector now check svs_index->isLabelExists() under a shared lock, then perform deleteVectors() later under a unique lock only if that earlier check was true. This creates a TOCTOU race where backend state can change between the two locks, causing missed deletions in svs_tiered.h.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is irrelevant, because deleteVectors() performs required checks internally.

}
{ // Add vector to the frontend index.
std::lock_guard lock(this->flatIndexGuard);
Expand Down Expand Up @@ -887,9 +910,15 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
std::lock_guard lock(this->flatIndexGuard);
ret = this->deleteAndRecordSwaps_Unsafe(label);
}
{

label_exists = [&]() {
std::shared_lock lock(this->mainIndexGuard);
return svs_index->isLabelExists(label);
}();

if (label_exists) {
std::lock_guard lock(this->mainIndexGuard);
ret += svs_index->deleteVectors(&label, 1);
ret += this->backendIndex->deleteVector(label);
}
return ret;
}
Expand Down
14 changes: 14 additions & 0 deletions tests/benchmark/bm_initialization/bm_basics_svs_initialize_fp32.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,17 @@ BENCHMARK_REGISTER_F(BM_VecSimSVS, BM_FUNC_NAME(BM_TriggerUpdateTiered))
{2, 4, 8}})
->ArgNames({"update_threshold", "thread_count"})
->MeasureProcessCPUTime();

// Add vectors to reach training threshold in tiered index, and then add more vectors in parallel to
// backend training job. Measure time to add new vectors in this scenario.
BENCHMARK_TEMPLATE_DEFINE_F(BM_VecSimSVS, BM_FUNC_NAME(BM_AddVectorsDuringTraining),
DATA_TYPE_INDEX_T)
(benchmark::State &st) { AddVectorsDuringTraining(st); }
BENCHMARK_REGISTER_F(BM_VecSimSVS, BM_FUNC_NAME(BM_AddVectorsDuringTraining))
->Unit(benchmark::kMillisecond)
->Iterations(1)
->ArgsProduct({{static_cast<long int>(BM_VecSimGeneral::block_size), 5000,
static_cast<long int>(10 * BM_VecSimGeneral::block_size)},
{2, 4}})
->ArgNames({"training_threshold", "thread_count"})
->UseRealTime();
42 changes: 42 additions & 0 deletions tests/benchmark/bm_vecsim_svs.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ class BM_VecSimSVS : public BM_VecSimGeneral {
// index.
void TriggerUpdateTiered(benchmark::State &st);

// Add vectors to reach training threshold in tiered index, and then add more vectors in
// parallel to backend training job. Measure time to add new vectors in this scenario.
void AddVectorsDuringTraining(benchmark::State &st);

// Deletes an amount of labels from the index that triggers inplace consolidation.
void RunGC(benchmark::State &st);

Expand Down Expand Up @@ -403,6 +407,44 @@ void BM_VecSimSVS<index_type_t>::TriggerUpdateTiered(benchmark::State &st) {
.str());
}

template <typename index_type_t>
inline void BM_VecSimSVS<index_type_t>::AddVectorsDuringTraining(benchmark::State &st) {
// ensure mode is async
ASSERT_EQ(VecSimIndexInterface::asyncWriteMode, VecSim_WriteAsync);

auto training_threshold = st.range(0);
int unsigned num_threads = st.range(1);

if (num_threads > std::thread::hardware_concurrency()) {
GTEST_SKIP() << "Not enough threads available, skipping test...";
}

// Ensure we have enough vectors to train.
ASSERT_GE(N_QUERIES, training_threshold);

// In each iteration create a new index
auto mock_thread_pool = tieredIndexMock(num_threads);
ASSERT_EQ(mock_thread_pool.thread_pool_size, num_threads);
auto *tiered_index = CreateTieredSVSIndex(
mock_thread_pool, training_threshold,
1 << 30); // set very high update threshold to avoid updates during this test

// Add vectors to reach training threshold and trigger training.
for (size_t i = 0; i < training_threshold; ++i) {
VecSimIndex_AddVector(tiered_index, test_vectors[i].data(), i);
}
mock_thread_pool.init_threads();
size_t label = training_threshold;

for (auto _ : st) {
// While the backend is training, keep adding vectors in parallel to the training job.
for (size_t i = 0; i < 1000; i++, label++) {
VecSimIndex_AddVector(tiered_index, test_vectors[label].data(), label);
}
}
mock_thread_pool.thread_pool_join();
}

template <typename index_type_t>
void BM_VecSimSVS<index_type_t>::RunGC(benchmark::State &st) {

Expand Down
Loading