-
Notifications
You must be signed in to change notification settings - Fork 21
[SVS] Implement 2-stage backend SVS index initialization #903
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
14e528e
616b4ba
f02a3b1
2538673
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,12 +39,23 @@ struct SVSIndexBase | |
| virtual ~SVSIndexBase() = default; | ||
| virtual int addVectors(const void *vectors_data, const labelType *labels, size_t n) = 0; | ||
| virtual int deleteVectors(const labelType *labels, size_t n) = 0; | ||
| virtual bool isLabelExists(labelType label) const = 0; | ||
| virtual size_t indexStorageSize() const = 0; | ||
| virtual size_t getNumThreads() const = 0; | ||
| virtual void setNumThreads(size_t numThreads) = 0; | ||
| virtual size_t getThreadPoolCapacity() const = 0; | ||
| virtual bool isCompressed() const = 0; | ||
| size_t getNumMarkedDeleted() const { return num_marked_deleted; } | ||
|
|
||
| // Abstract handler to manage SVS implementation instance | ||
| // declared to avoid unsafe unique_ptr<void> usage | ||
| // Derived SVSIndex class should implement it | ||
| struct ImplHandler { | ||
| virtual ~ImplHandler() = default; | ||
| }; | ||
| virtual std::unique_ptr<ImplHandler> createImpl(const void *vectors_data, | ||
| const labelType *labels, size_t n) = 0; | ||
| virtual void setImpl(std::unique_ptr<ImplHandler> impl) = 0; | ||
| #ifdef BUILD_TESTS | ||
| virtual svs::logging::logger_ptr getLogger() const = 0; | ||
| #endif | ||
|
|
@@ -144,7 +155,8 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl | |
| // Create SVS index instance with initial data | ||
| // Data should not be empty | ||
| template <svs::data::ImmutableMemoryDataset Dataset> | ||
| void initImpl(const Dataset &points, std::span<const labelType> ids) { | ||
| std::unique_ptr<impl_type> initImpl(const Dataset &points, | ||
| std::span<const labelType> ids) const { | ||
| svs::threads::ThreadPoolHandle threadpool_handle{VecSimSVSThreadPool{threadpool_}}; | ||
|
|
||
| // Construct SVS index initial storage with compression if needed | ||
|
|
@@ -160,25 +172,26 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl | |
|
|
||
| // Construct initial Vamana Graph | ||
| auto graph = | ||
| graph_builder_t::build_graph(parameters, data, distance, threadpool_, entry_point, | ||
| graph_builder_t::build_graph(parameters, data, distance, threadpool_handle, entry_point, | ||
| this->blockSize, this->getAllocator(), logger_); | ||
|
|
||
| // Create SVS MutableIndex instance | ||
| impl_ = std::make_unique<impl_type>(std::move(graph), std::move(data), entry_point, | ||
| std::move(distance), ids, threadpool_, logger_); | ||
| auto impl = std::make_unique<impl_type>(std::move(graph), std::move(data), entry_point, | ||
| std::move(distance), ids, threadpool_, logger_); | ||
|
|
||
| // Set SVS MutableIndex build parameters to be used in future updates | ||
| impl_->set_construction_window_size(parameters.window_size); | ||
| impl_->set_max_candidates(parameters.max_candidate_pool_size); | ||
| impl_->set_prune_to(parameters.prune_to); | ||
| impl_->set_alpha(parameters.alpha); | ||
| impl_->set_full_search_history(parameters.use_full_search_history); | ||
| impl->set_construction_window_size(parameters.window_size); | ||
| impl->set_max_candidates(parameters.max_candidate_pool_size); | ||
| impl->set_prune_to(parameters.prune_to); | ||
| impl->set_alpha(parameters.alpha); | ||
| impl->set_full_search_history(parameters.use_full_search_history); | ||
|
|
||
| // Configure default search parameters | ||
| auto sp = impl_->get_search_parameters(); | ||
| auto sp = impl->get_search_parameters(); | ||
| sp.buffer_config({this->search_window_size, this->search_buffer_capacity}); | ||
| impl_->set_search_parameters(sp); | ||
| impl_->reset_performance_parameters(); | ||
| impl->set_search_parameters(sp); | ||
| impl->reset_performance_parameters(); | ||
| return impl; | ||
| } | ||
|
|
||
| // Preprocess batch of vectors | ||
|
|
@@ -204,6 +217,42 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl | |
| return processed_blob; | ||
| } | ||
|
|
||
| // Handler to manage SVS implementation instance | ||
| struct SVSImplHandler : public SVSIndexBase::ImplHandler { | ||
| std::unique_ptr<impl_type> impl; | ||
| SVSImplHandler(std::unique_ptr<impl_type> impl) : impl{std::move(impl)} {} | ||
| }; | ||
|
|
||
| std::unique_ptr<ImplHandler> createImpl(const void *vectors_data, const labelType *labels, | ||
| size_t n) override { | ||
| // If no data provided, return empty handler | ||
| if (n == 0) { | ||
| return std::make_unique<SVSImplHandler>(nullptr); | ||
| } | ||
|
|
||
| std::span<const labelType> ids(labels, n); | ||
| auto processed_blob = this->preprocessForBatchStorage(vectors_data, n); | ||
| auto typed_vectors_data = static_cast<DataType *>(processed_blob.get()); | ||
| // Wrap data into SVS SimpleDataView for SVS API | ||
| auto points = svs::data::SimpleDataView<DataType>{typed_vectors_data, n, this->dim}; | ||
|
|
||
| return std::make_unique<SVSImplHandler>(initImpl(points, ids)); | ||
| } | ||
|
|
||
| void setImpl(std::unique_ptr<ImplHandler> handler) override { | ||
| assert(handler && "SVSIndex::setImpl called with null handler"); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is a debug-only assert, let's add this to the log in a warning level as well
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This assert is added just to simplify logic error catching in DEBUG mode - as well as the next-line assert.
|
||
| assert(impl_ == nullptr); // Should be called only on empty impl_ | ||
| if (impl_ != nullptr) { | ||
| throw std::logic_error("SVSIndex::setImpl called on non-empty impl_"); | ||
| } | ||
|
|
||
| SVSImplHandler *svs_handler = dynamic_cast<SVSImplHandler *>(handler.get()); | ||
| if (!svs_handler) { | ||
| throw std::logic_error("Failed to cast to SVSImplHandler"); | ||
| } | ||
|
Comment on lines
+249
to
+252
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the motivation to have an abstract
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
template <typename MetricType,
typename DataType,
bool isMulti,
size_t QuantBits,
size_t ResidualBits,
bool IsLeanVec>
struct SVSIndex<MetricType, DataType, isMulti, QuantBits, ResidualBits, IsLeanVec>::SVSImplHandler;This why the abstract |
||
| this->impl_ = std::move(svs_handler->impl); | ||
| } | ||
|
|
||
| // Assuming numThreads was updated to reflect the number of available threads before this | ||
| // function was called. | ||
| // This function assumes that the caller has already set numThreads to the appropriate value | ||
|
|
@@ -230,7 +279,7 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl | |
|
|
||
| if (!impl_) { | ||
| // SVS index instance cannot be empty, so we have to construct it at first rows | ||
| initImpl(points, ids); | ||
| impl_ = initImpl(points, ids); | ||
| } else { | ||
| // Add new points to existing SVS index | ||
| impl_->add_points(points, ids); | ||
|
|
@@ -239,6 +288,18 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl | |
| return n - deleted_num; | ||
| } | ||
|
|
||
| int deleteVectorImpl(const labelType label) { | ||
| if (indexLabelCount() == 0 || !impl_->has_id(label)) { | ||
| return 0; | ||
| } | ||
|
|
||
| const auto deleted_num = impl_->delete_entries(std::span{&label, 1}); | ||
| assert(deleted_num == 1); | ||
|
|
||
| this->markIndexUpdate(deleted_num); | ||
| return deleted_num; | ||
| } | ||
|
|
||
| int deleteVectorsImpl(const labelType *labels, size_t n) { | ||
| if (indexLabelCount() == 0) { | ||
| return 0; | ||
|
|
@@ -257,19 +318,8 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl | |
| return 0; | ||
| } | ||
|
|
||
| // If entries_to_delete.size() == 1, we should ensure single-threading | ||
| const size_t current_num_threads = getNumThreads(); | ||
| if (n == 1 && current_num_threads > 1) { | ||
| setNumThreads(1); | ||
| } | ||
|
|
||
| const auto deleted_num = impl_->delete_entries(entries_to_delete); | ||
|
|
||
| // Restore multi-threading if needed | ||
| if (n == 1 && current_num_threads > 1) { | ||
| setNumThreads(current_num_threads); | ||
| } | ||
|
|
||
| this->markIndexUpdate(deleted_num); | ||
| return deleted_num; | ||
| } | ||
|
|
@@ -484,12 +534,16 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl | |
| return addVectorsImpl(vectors_data, labels, n); | ||
| } | ||
|
|
||
| int deleteVector(labelType label) override { return deleteVectorsImpl(&label, 1); } | ||
| int deleteVector(labelType label) override { return deleteVectorImpl(label); } | ||
|
|
||
| int deleteVectors(const labelType *labels, size_t n) override { | ||
| return deleteVectorsImpl(labels, n); | ||
| } | ||
|
|
||
| bool isLabelExists(labelType label) const override { | ||
| return impl_ ? impl_->has_id(label) : false; | ||
| } | ||
|
|
||
| size_t getNumThreads() const override { return threadpool_.size(); } | ||
| void setNumThreads(size_t numThreads) override { threadpool_.resize(numThreads); } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -672,12 +672,28 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> { | |
|
|
||
| executeTracingCallback("UpdateJob::before_add_to_svs"); | ||
| { // lock backend index for writing and add vectors there | ||
| std::lock_guard lock(this->mainIndexGuard); | ||
| std::shared_lock main_shared_lock(this->mainIndexGuard); | ||
| auto svs_index = GetSVSIndex(); | ||
| assert(labels_to_move.size() == vectors_to_move.size() / this->frontendIndex->getDim()); | ||
| svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size())); | ||
| svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(), | ||
| labels_to_move.size()); | ||
| if (this->backendIndex->indexSize() == 0) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this also handle re-initialization after the index was emptied? Is that scenario tested?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, as it was before in |
||
| // If backend index is empty, we need to initialize it first. | ||
| svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size())); | ||
| auto impl = svs_index->createImpl(vectors_to_move.data(), labels_to_move.data(), | ||
| labels_to_move.size()); | ||
|
|
||
| // Upgrade to unique lock to set the new impl | ||
| main_shared_lock.unlock(); | ||
| std::lock_guard lock(this->mainIndexGuard); | ||
| svs_index->setImpl(std::move(impl)); | ||
| } else { | ||
| // Backend index is initialized - just add the vectors. | ||
| main_shared_lock.unlock(); | ||
| std::lock_guard lock(this->mainIndexGuard); | ||
| // Upgrade to unique lock to add vectors | ||
| svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size())); | ||
| svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(), | ||
| labels_to_move.size()); | ||
| } | ||
| } | ||
| executeTracingCallback("UpdateJob::after_add_to_svs"); | ||
| // clean-up frontend index | ||
|
|
@@ -801,8 +817,15 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> { | |
| } | ||
| } | ||
| // Remove vector from the backend index if it exists in case of non-MULTI. | ||
| std::lock_guard lock(this->mainIndexGuard); | ||
| ret -= svs_index->deleteVectors(&label, 1); | ||
| auto label_exists = [&]() { | ||
| std::shared_lock lock(this->mainIndexGuard); | ||
| return svs_index->isLabelExists(label); | ||
| }(); | ||
|
|
||
| if (label_exists) { | ||
| std::lock_guard lock(this->mainIndexGuard); | ||
| ret -= this->backendIndex->deleteVector(label); | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Non-atomic backend delete label checkMedium Severity
Additional Locations (1)
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The issue is irrelevant, because |
||
| } | ||
| { // Add vector to the frontend index. | ||
| std::lock_guard lock(this->flatIndexGuard); | ||
|
|
@@ -887,9 +910,15 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> { | |
| std::lock_guard lock(this->flatIndexGuard); | ||
| ret = this->deleteAndRecordSwaps_Unsafe(label); | ||
| } | ||
| { | ||
|
|
||
| label_exists = [&]() { | ||
| std::shared_lock lock(this->mainIndexGuard); | ||
| return svs_index->isLabelExists(label); | ||
| }(); | ||
|
|
||
| if (label_exists) { | ||
| std::lock_guard lock(this->mainIndexGuard); | ||
| ret += svs_index->deleteVectors(&label, 1); | ||
| ret += this->backendIndex->deleteVector(label); | ||
| } | ||
| return ret; | ||
| } | ||
|
|
||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic seems to be a duplication of what we do in
addVectorsImpl. Consider unifying these into a single functionThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main point here is the
processed_blobwhich lifetime should be managed till end ofinitImpl()andimpl_->add_points()calls.A single function, which will wrap all this code would look like: