From d2c9f36953963c6b3bccc17422d7f5a28e4f7630 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Thu, 7 May 2026 12:43:32 +0200 Subject: [PATCH 1/2] Cherry-pick of https://github.com/Altinity/ClickHouse/pull/1748 with unresolved conflict markers (resolution in next commit) --- Original cherry-pick message follows: Merge pull request #1748 from Altinity/feature/antalya-26.3/pr-1493 Antalya 26.3: Fix file identifier in rescheduleTasksFromReplica # Conflicts: # src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp # src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h --- ...rageObjectStorageStableTaskDistributor.cpp | 156 ++++++++++++++++-- ...torageObjectStorageStableTaskDistributor.h | 7 + 2 files changed, 153 insertions(+), 10 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 5a54601b171c..a7b3923c416d 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -27,6 +27,22 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb { LOG_TRACE(log, "Received request from replica {} looking for a file", number_of_current_replica); +<<<<<<< HEAD +======= + saveLastNodeActivity(number_of_current_replica); + + { + std::lock_guard lock(mutex); + auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); + if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica number {} was marked as lost, can't set task for it anymore", + number_of_current_replica + ); + } + +>>>>>>> 58b280f1f26 (Merge pull request #1748 from Altinity/feature/antalya-26.3/pr-1493) // 1. Check pre-queued files first if (auto file = getPreQueuedFile(number_of_current_replica)) return file; @@ -36,7 +52,29 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb return file; // 3. Process unprocessed files if iterator is exhausted +<<<<<<< HEAD return getAnyUnprocessedFile(number_of_current_replica); +======= + if (!file) + file = getAnyUnprocessedFile(number_of_current_replica); + + if (file) + { + std::lock_guard lock(mutex); + auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); + if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) + { // It is possible that replica was lost after check in the begining of the method + auto file_identifier = getFileIdentifier(file); + auto file_replica_idx = getReplicaForFile(file_identifier); + unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx)); + connection_to_files[file_replica_idx].push_back(file); + } + else + processed_file_list_ptr->second.push_back(file); + } + + return file; +>>>>>>> 58b280f1f26 (Merge pull request #1748 from Altinity/feature/antalya-26.3/pr-1493) } size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path) @@ -81,7 +119,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t auto next_file = files.back(); files.pop_back(); - auto file_identifier = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getIdentifier(); + auto file_identifier = getFileIdentifier(next_file); auto it = unprocessed_files.find(file_identifier); if (it == unprocessed_files.end()) continue; @@ -125,20 +163,30 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter } } - String file_identifier; - if (send_over_whole_archive && object_info->isArchive()) + String file_identifier = getFileIdentifier(object_info, true); + +<<<<<<< HEAD + size_t file_replica_idx = getReplicaForFile(file_identifier); +======= + if (iceberg_read_optimization_enabled) { - file_identifier = object_info->getPathOrPathToArchiveIfArchive(); - LOG_TEST(log, "Will send over the whole archive {} to replicas. " - "This will be suboptimal, consider turning on " - "cluster_function_process_archive_on_multiple_nodes setting", file_identifier); + auto file_meta_info = object_info->relative_path_with_metadata.getFileMetaInfo(); + if (file_meta_info.has_value()) + { + auto file_path = send_over_whole_archive ? object_info->getPathOrPathToArchiveIfArchive() : object_info->getPath(); + object_info->relative_path_with_metadata.command.setFilePath(file_path); + object_info->relative_path_with_metadata.command.setFileMetaInfo(file_meta_info.value()); + } } - else + + size_t file_replica_idx; + { - file_identifier = object_info->getIdentifier(); + std::lock_guard lock(mutex); + file_replica_idx = getReplicaForFile(file_identifier); } - size_t file_replica_idx = getReplicaForFile(file_identifier); +>>>>>>> 58b280f1f26 (Merge pull request #1748 from Altinity/feature/antalya-26.3/pr-1493) if (file_replica_idx == number_of_current_replica) { LOG_TRACE( @@ -177,7 +225,39 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s auto next_file = it->second; unprocessed_files.erase(it); +<<<<<<< HEAD auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getPath(); +======= + while (it != unprocessed_files.end()) + { + auto number_of_matched_replica = it->second.second; + auto last_activity = last_node_activity.find(number_of_matched_replica); + if (lock_object_storage_task_distribution_us <= 0 // file deferring is turned off + || it->second.second == number_of_current_replica // file is matching with current replica + || last_activity == last_node_activity.end() // msut never be happen, last_activity is filled for each replica on start + || activity_limit > last_activity->second) // matched replica did not ask for a new files for a while + { + auto next_file = it->second.first; + unprocessed_files.erase(it); + + auto file_path = getFileIdentifier(next_file); + LOG_TRACE( + log, + "Iterator exhausted. Assigning unprocessed file {} to replica {} from matched replica {}", + file_path, + number_of_current_replica, + number_of_matched_replica + ); + + ProfileEvents::increment(ProfileEvents::ObjectStorageClusterSentToNonMatchedReplica); + return next_file; + } + + oldest_activity = std::min(oldest_activity, last_activity->second); + ++it; + } + +>>>>>>> 58b280f1f26 (Merge pull request #1748 from Altinity/feature/antalya-26.3/pr-1493) LOG_TRACE( log, "Iterator exhausted. Assigning unprocessed file {} to replica {}", @@ -191,4 +271,60 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s return {}; } +<<<<<<< HEAD +======= +void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t number_of_current_replica) +{ + Poco::Timestamp now; + std::lock_guard lock(mutex); + last_node_activity[number_of_current_replica] = now; +} + +void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_t number_of_current_replica) +{ + LOG_INFO(log, "Replica {} is marked as lost, tasks are returned to queue", number_of_current_replica); + std::lock_guard lock(mutex); + + auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); + if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica number {} was marked as lost already", + number_of_current_replica + ); + + if (replica_to_files_to_be_processed.size() < 2) + throw Exception( + ErrorCodes::CANNOT_READ_ALL_DATA, + "All replicas were marked as lost" + ); + + auto files = std::move(processed_file_list_ptr->second); + replica_to_files_to_be_processed.erase(number_of_current_replica); + for (const auto & file : files) + { + auto file_identifier = getFileIdentifier(file); + auto file_replica_idx = getReplicaForFile(file_identifier); + unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx)); + connection_to_files[file_replica_idx].push_back(file); + } +} + +String StorageObjectStorageStableTaskDistributor::getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log) const +{ + if (send_over_whole_archive && file_object->isArchive()) + { + auto file_identifier = file_object->getPathOrPathToArchiveIfArchive(); + if (write_to_log) + { + LOG_TEST(log, "Will send over the whole archive {} to replicas. " + "This will be suboptimal, consider turning on " + "cluster_function_process_archive_on_multiple_nodes setting", file_identifier); + } + return file_identifier; + } + return file_object->getIdentifier(); +} + +>>>>>>> 58b280f1f26 (Merge pull request #1748 from Altinity/feature/antalya-26.3/pr-1493) } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index 02d3ba7a030f..73a3409762d3 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -28,6 +28,13 @@ class StorageObjectStorageStableTaskDistributor ObjectInfoPtr getMatchingFileFromIterator(size_t number_of_current_replica); ObjectInfoPtr getAnyUnprocessedFile(size_t number_of_current_replica); +<<<<<<< HEAD +======= + void saveLastNodeActivity(size_t number_of_current_replica); + + String getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log = false) const; + +>>>>>>> 58b280f1f26 (Merge pull request #1748 from Altinity/feature/antalya-26.3/pr-1493) const std::shared_ptr iterator; const bool send_over_whole_archive; From 9229e477daa05762b01d2a498fe036edf4faef96 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Sun, 7 Jun 2026 19:54:47 +0200 Subject: [PATCH 2/2] Resolve conflicts in cherry-pick of #1748 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ported `getFileIdentifier` helper and its usage in `getPreQueuedFile`, `getMatchingFileFromIterator`, and `getAnyUnprocessedFile`. Applied mutex locking around `getReplicaForFile` in `getMatchingFileFromIterator` as introduced by the source PR. `saveLastNodeActivity`, `rescheduleTasksFromReplica`, and the locking of `replica_to_files_to_be_processed` from the source PR were not ported: `antalya-26.4` does not have `replica_to_files_to_be_processed`, `last_node_activity`, or `rescheduleTasksFromReplica` — those require a separate prerequisite port of the tracking infrastructure from `antalya-26.3`. The `iceberg_read_optimization_enabled` block in the conflict for `getMatchingFileFromIterator` was pre-existing `antalya-26.3` code not present in source PR #1748's diff; it was not added. --- ...rageObjectStorageStableTaskDistributor.cpp | 127 +----------------- ...torageObjectStorageStableTaskDistributor.h | 5 - 2 files changed, 1 insertion(+), 131 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index a7b3923c416d..223430e08f0f 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -27,22 +27,6 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb { LOG_TRACE(log, "Received request from replica {} looking for a file", number_of_current_replica); -<<<<<<< HEAD -======= - saveLastNodeActivity(number_of_current_replica); - - { - std::lock_guard lock(mutex); - auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); - if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Replica number {} was marked as lost, can't set task for it anymore", - number_of_current_replica - ); - } - ->>>>>>> 58b280f1f26 (Merge pull request #1748 from Altinity/feature/antalya-26.3/pr-1493) // 1. Check pre-queued files first if (auto file = getPreQueuedFile(number_of_current_replica)) return file; @@ -52,29 +36,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb return file; // 3. Process unprocessed files if iterator is exhausted -<<<<<<< HEAD return getAnyUnprocessedFile(number_of_current_replica); -======= - if (!file) - file = getAnyUnprocessedFile(number_of_current_replica); - - if (file) - { - std::lock_guard lock(mutex); - auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); - if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) - { // It is possible that replica was lost after check in the begining of the method - auto file_identifier = getFileIdentifier(file); - auto file_replica_idx = getReplicaForFile(file_identifier); - unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx)); - connection_to_files[file_replica_idx].push_back(file); - } - else - processed_file_list_ptr->second.push_back(file); - } - - return file; ->>>>>>> 58b280f1f26 (Merge pull request #1748 from Altinity/feature/antalya-26.3/pr-1493) } size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path) @@ -165,20 +127,6 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter String file_identifier = getFileIdentifier(object_info, true); -<<<<<<< HEAD - size_t file_replica_idx = getReplicaForFile(file_identifier); -======= - if (iceberg_read_optimization_enabled) - { - auto file_meta_info = object_info->relative_path_with_metadata.getFileMetaInfo(); - if (file_meta_info.has_value()) - { - auto file_path = send_over_whole_archive ? object_info->getPathOrPathToArchiveIfArchive() : object_info->getPath(); - object_info->relative_path_with_metadata.command.setFilePath(file_path); - object_info->relative_path_with_metadata.command.setFileMetaInfo(file_meta_info.value()); - } - } - size_t file_replica_idx; { @@ -186,7 +134,6 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter file_replica_idx = getReplicaForFile(file_identifier); } ->>>>>>> 58b280f1f26 (Merge pull request #1748 from Altinity/feature/antalya-26.3/pr-1493) if (file_replica_idx == number_of_current_replica) { LOG_TRACE( @@ -225,39 +172,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s auto next_file = it->second; unprocessed_files.erase(it); -<<<<<<< HEAD - auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getPath(); -======= - while (it != unprocessed_files.end()) - { - auto number_of_matched_replica = it->second.second; - auto last_activity = last_node_activity.find(number_of_matched_replica); - if (lock_object_storage_task_distribution_us <= 0 // file deferring is turned off - || it->second.second == number_of_current_replica // file is matching with current replica - || last_activity == last_node_activity.end() // msut never be happen, last_activity is filled for each replica on start - || activity_limit > last_activity->second) // matched replica did not ask for a new files for a while - { - auto next_file = it->second.first; - unprocessed_files.erase(it); - - auto file_path = getFileIdentifier(next_file); - LOG_TRACE( - log, - "Iterator exhausted. Assigning unprocessed file {} to replica {} from matched replica {}", - file_path, - number_of_current_replica, - number_of_matched_replica - ); - - ProfileEvents::increment(ProfileEvents::ObjectStorageClusterSentToNonMatchedReplica); - return next_file; - } - - oldest_activity = std::min(oldest_activity, last_activity->second); - ++it; - } - ->>>>>>> 58b280f1f26 (Merge pull request #1748 from Altinity/feature/antalya-26.3/pr-1493) + auto file_path = getFileIdentifier(next_file); LOG_TRACE( log, "Iterator exhausted. Assigning unprocessed file {} to replica {}", @@ -271,45 +186,6 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s return {}; } -<<<<<<< HEAD -======= -void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t number_of_current_replica) -{ - Poco::Timestamp now; - std::lock_guard lock(mutex); - last_node_activity[number_of_current_replica] = now; -} - -void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_t number_of_current_replica) -{ - LOG_INFO(log, "Replica {} is marked as lost, tasks are returned to queue", number_of_current_replica); - std::lock_guard lock(mutex); - - auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); - if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Replica number {} was marked as lost already", - number_of_current_replica - ); - - if (replica_to_files_to_be_processed.size() < 2) - throw Exception( - ErrorCodes::CANNOT_READ_ALL_DATA, - "All replicas were marked as lost" - ); - - auto files = std::move(processed_file_list_ptr->second); - replica_to_files_to_be_processed.erase(number_of_current_replica); - for (const auto & file : files) - { - auto file_identifier = getFileIdentifier(file); - auto file_replica_idx = getReplicaForFile(file_identifier); - unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx)); - connection_to_files[file_replica_idx].push_back(file); - } -} - String StorageObjectStorageStableTaskDistributor::getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log) const { if (send_over_whole_archive && file_object->isArchive()) @@ -326,5 +202,4 @@ String StorageObjectStorageStableTaskDistributor::getFileIdentifier(ObjectInfoPt return file_object->getIdentifier(); } ->>>>>>> 58b280f1f26 (Merge pull request #1748 from Altinity/feature/antalya-26.3/pr-1493) } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index 73a3409762d3..136a9e7319e9 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -28,13 +28,8 @@ class StorageObjectStorageStableTaskDistributor ObjectInfoPtr getMatchingFileFromIterator(size_t number_of_current_replica); ObjectInfoPtr getAnyUnprocessedFile(size_t number_of_current_replica); -<<<<<<< HEAD -======= - void saveLastNodeActivity(size_t number_of_current_replica); - String getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log = false) const; ->>>>>>> 58b280f1f26 (Merge pull request #1748 from Altinity/feature/antalya-26.3/pr-1493) const std::shared_ptr iterator; const bool send_over_whole_archive;