diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index 23dd49fa6841..238b7c890856 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -354,20 +354,13 @@ void ExportPartitionTaskScheduler::handlePartExportFailure( size_t max_retries ) { - LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} export failed, will now increment counters", part_name); + LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} export failed", part_name); if (!exception) { throw Exception(ErrorCodes::LOGICAL_ERROR, "ExportPartition scheduler task: No exception provided for error handling. Sounds like a bug"); } - /// Early exit if the query was cancelled - no need to increment error counts - if (exception->code() == ErrorCodes::QUERY_WAS_CANCELLED) - { - LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} export was cancelled, skipping error handling", part_name); - return; - } - Coordination::Stat locked_by_stat; std::string locked_by; @@ -385,6 +378,40 @@ void ExportPartitionTaskScheduler::handlePartExportFailure( return; } + /// Early exit if the query was cancelled - no need to increment error counts + if (exception->code() == ErrorCodes::QUERY_WAS_CANCELLED) + { + /// Releasing the lock is important because a query can be cancelled due to SYSTEM STOP MOVES. If this is the case, + /// other replicas should still be able to export this individual part. That's why there is a retry loop here. + /// It is very unlikely this will be a problem in practice. The lock is ephemeral, which means it is automatically released + /// if ClickHouse loses connection to ZooKeeper + std::size_t retry_count = 0; + static constexpr std::size_t max_lock_release_retries = 3; + while (retry_count < max_lock_release_retries) + { + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemove); + + const auto removal_code = zk->tryRemove(export_path / "locks" / part_name, locked_by_stat.version); + + if (Coordination::Error::ZOK == removal_code) + { + break; + } + + if (Coordination::Error::ZBADVERSION == removal_code) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} lock version mismatch, will not increment error counts", part_name); + break; + } + + retry_count++; + } + + LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} export was cancelled, skipping error handling", part_name); + return; + } + Coordination::Requests ops; const auto processing_part_path = processing_parts_path / part_name; diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index b09dfeca1dbd..cb4bae9a6339 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -1378,3 +1378,152 @@ def test_sharded_export_partition_default_pattern(cluster): # only one file with 3 rows should be present assert int(total_count) == 3, f"Expected 3 rows, got {total_count}" + + +def test_export_partition_scheduler_skipped_when_moves_stopped(cluster): + node = cluster.instances["replica1"] + + uid = str(uuid.uuid4()).replace("-", "_") + mt_table = f"sched_skip_mt_{uid}" + s3_table = f"sched_skip_s3_{uid}" + + create_tables_and_insert_data(node, mt_table, s3_table, "replica1") + + node.query(f"SYSTEM STOP MOVES {mt_table}") + + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}" + ) + + wait_for_export_to_start(node, mt_table, s3_table, "2020") + + # Wait for several scheduler cycles (each fires every 5 s). + # If the guard is missing the scheduler would run and data would land in S3. + time.sleep(10) + + status = node.query( + f"SELECT status FROM system.replicated_partition_exports" + f" WHERE source_table = '{mt_table}' AND destination_table = '{s3_table}'" + f" AND partition_id = '2020'" + ).strip() + + assert status == "PENDING", ( + f"Expected PENDING while moves are stopped, got '{status}'" + ) + + row_count = int(node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip()) + assert row_count == 0, ( + f"Expected 0 rows in S3 while scheduler is skipped, got {row_count}" + ) + + node.query(f"SYSTEM START MOVES {mt_table}") + + wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED", timeout=60) + + row_count = int(node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip()) + assert row_count == 3, f"Expected 3 rows in S3 after export completed, got {row_count}" + + +def test_export_partition_resumes_after_stop_moves(cluster): + node = cluster.instances["replica1"] + + uid = str(uuid.uuid4()).replace("-", "_") + mt_table = f"stop_moves_before_mt_{uid}" + s3_table = f"stop_moves_before_s3_{uid}" + + create_tables_and_insert_data(node, mt_table, s3_table, "replica1") + + node.query(f"SYSTEM STOP MOVES {mt_table}") + + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}" + f" SETTINGS export_merge_tree_partition_max_retries = 50" + ) + + wait_for_export_to_start(node, mt_table, s3_table, "2020") + + # Give the scheduler enough time to attempt (and cancel) the part task at + # least once, exercising the lock-release code path. + time.sleep(5) + + status = node.query( + f"SELECT status FROM system.replicated_partition_exports" + f" WHERE source_table = '{mt_table}' AND destination_table = '{s3_table}'" + f" AND partition_id = '2020'" + ).strip() + assert status == "PENDING", f"Expected PENDING while moves are stopped, got '{status}'" + + row_count = int(node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip()) + assert row_count == 0, f"Expected 0 rows in S3 while moves are stopped, got {row_count}" + + node.query(f"SYSTEM START MOVES {mt_table}") + + wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED", timeout=60) + + row_count = int(node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip()) + assert row_count == 3, f"Expected 3 rows in S3 after export completed, got {row_count}" + + +def test_export_partition_resumes_after_stop_moves_during_export(cluster): + skip_if_remote_database_disk_enabled(cluster) + + node = cluster.instances["replica1"] + + uid = str(uuid.uuid4()).replace("-", "_") + mt_table = f"stop_moves_during_mt_{uid}" + s3_table = f"stop_moves_during_s3_{uid}" + + create_tables_and_insert_data(node, mt_table, s3_table, "replica1") + + minio_ip = cluster.minio_ip + minio_port = cluster.minio_port + + with PartitionManager() as pm: + pm.add_rule({ + "instance": node, + "destination": node.ip_address, + "protocol": "tcp", + "source_port": minio_port, + "action": "REJECT --reject-with tcp-reset", + }) + pm.add_rule({ + "instance": node, + "destination": minio_ip, + "protocol": "tcp", + "destination_port": minio_port, + "action": "REJECT --reject-with tcp-reset", + }) + + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}" + f" SETTINGS export_merge_tree_partition_max_retries = 50" + ) + + wait_for_export_to_start(node, mt_table, s3_table, "2020") + + # Let the tasks start executing and failing against the blocked S3. + time.sleep(2) + + node.query(f"SYSTEM STOP MOVES {mt_table}") + + # Give the cancel callback time to fire and the lock-release path to run. + time.sleep(3) + + status = node.query( + f"SELECT status FROM system.replicated_partition_exports" + f" WHERE source_table = '{mt_table}' AND destination_table = '{s3_table}'" + f" AND partition_id = '2020'" + ).strip() + + assert status == "PENDING", ( + f"Expected PENDING while moves are stopped and S3 is blocked, got '{status}'" + ) + + node.query(f"SYSTEM START MOVES {mt_table}") + + # MinIO is now unblocked; the next scheduler cycle should succeed. + wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED", timeout=60) + + row_count = int(node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip()) + assert row_count == 3, f"Expected 3 rows in S3 after export completed, got {row_count}" +