Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,20 +354,13 @@ void ExportPartitionTaskScheduler::handlePartExportFailure(
size_t max_retries
)
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} export failed, will now increment counters", part_name);
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} export failed", part_name);

if (!exception)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "ExportPartition scheduler task: No exception provided for error handling. Sounds like a bug");
}

/// Early exit if the query was cancelled - no need to increment error counts
if (exception->code() == ErrorCodes::QUERY_WAS_CANCELLED)
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} export was cancelled, skipping error handling", part_name);
return;
}

Coordination::Stat locked_by_stat;
std::string locked_by;

Expand All @@ -385,6 +378,40 @@ void ExportPartitionTaskScheduler::handlePartExportFailure(
return;
}

/// Early exit if the query was cancelled - no need to increment error counts
if (exception->code() == ErrorCodes::QUERY_WAS_CANCELLED)
{
/// Releasing the lock is important because a query can be cancelled due to SYSTEM STOP MOVES. If this is the case,
/// other replicas should still be able to export this individual part. That's why there is a retry loop here.
/// It is very unlikely this will be a problem in practice. The lock is ephemeral, which means it is automatically released
/// if ClickHouse loses connection to ZooKeeper
std::size_t retry_count = 0;
static constexpr std::size_t max_lock_release_retries = 3;
while (retry_count < max_lock_release_retries)
{
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemove);

const auto removal_code = zk->tryRemove(export_path / "locks" / part_name, locked_by_stat.version);

if (Coordination::Error::ZOK == removal_code)
{
break;
}

if (Coordination::Error::ZBADVERSION == removal_code)
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} lock version mismatch, will not increment error counts", part_name);
break;
}

retry_count++;
}

LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} export was cancelled, skipping error handling", part_name);
return;
}

Coordination::Requests ops;

const auto processing_part_path = processing_parts_path / part_name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1378,3 +1378,152 @@ def test_sharded_export_partition_default_pattern(cluster):

# only one file with 3 rows should be present
assert int(total_count) == 3, f"Expected 3 rows, got {total_count}"


def test_export_partition_scheduler_skipped_when_moves_stopped(cluster):
node = cluster.instances["replica1"]

uid = str(uuid.uuid4()).replace("-", "_")
mt_table = f"sched_skip_mt_{uid}"
s3_table = f"sched_skip_s3_{uid}"

create_tables_and_insert_data(node, mt_table, s3_table, "replica1")

node.query(f"SYSTEM STOP MOVES {mt_table}")

node.query(
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}"
)

wait_for_export_to_start(node, mt_table, s3_table, "2020")

# Wait for several scheduler cycles (each fires every 5 s).
# If the guard is missing the scheduler would run and data would land in S3.
time.sleep(10)

status = node.query(
f"SELECT status FROM system.replicated_partition_exports"
f" WHERE source_table = '{mt_table}' AND destination_table = '{s3_table}'"
f" AND partition_id = '2020'"
).strip()

assert status == "PENDING", (
f"Expected PENDING while moves are stopped, got '{status}'"
)

row_count = int(node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip())
assert row_count == 0, (
f"Expected 0 rows in S3 while scheduler is skipped, got {row_count}"
)

node.query(f"SYSTEM START MOVES {mt_table}")

wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED", timeout=60)

row_count = int(node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip())
assert row_count == 3, f"Expected 3 rows in S3 after export completed, got {row_count}"


def test_export_partition_resumes_after_stop_moves(cluster):
node = cluster.instances["replica1"]

uid = str(uuid.uuid4()).replace("-", "_")
mt_table = f"stop_moves_before_mt_{uid}"
s3_table = f"stop_moves_before_s3_{uid}"

create_tables_and_insert_data(node, mt_table, s3_table, "replica1")

node.query(f"SYSTEM STOP MOVES {mt_table}")

node.query(
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}"
f" SETTINGS export_merge_tree_partition_max_retries = 50"
)

wait_for_export_to_start(node, mt_table, s3_table, "2020")

# Give the scheduler enough time to attempt (and cancel) the part task at
# least once, exercising the lock-release code path.
time.sleep(5)

status = node.query(
f"SELECT status FROM system.replicated_partition_exports"
f" WHERE source_table = '{mt_table}' AND destination_table = '{s3_table}'"
f" AND partition_id = '2020'"
).strip()
assert status == "PENDING", f"Expected PENDING while moves are stopped, got '{status}'"

row_count = int(node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip())
assert row_count == 0, f"Expected 0 rows in S3 while moves are stopped, got {row_count}"

node.query(f"SYSTEM START MOVES {mt_table}")

wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED", timeout=60)

row_count = int(node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip())
assert row_count == 3, f"Expected 3 rows in S3 after export completed, got {row_count}"


def test_export_partition_resumes_after_stop_moves_during_export(cluster):
skip_if_remote_database_disk_enabled(cluster)

node = cluster.instances["replica1"]

uid = str(uuid.uuid4()).replace("-", "_")
mt_table = f"stop_moves_during_mt_{uid}"
s3_table = f"stop_moves_during_s3_{uid}"

create_tables_and_insert_data(node, mt_table, s3_table, "replica1")

minio_ip = cluster.minio_ip
minio_port = cluster.minio_port

with PartitionManager() as pm:
pm.add_rule({
"instance": node,
"destination": node.ip_address,
"protocol": "tcp",
"source_port": minio_port,
"action": "REJECT --reject-with tcp-reset",
})
pm.add_rule({
"instance": node,
"destination": minio_ip,
"protocol": "tcp",
"destination_port": minio_port,
"action": "REJECT --reject-with tcp-reset",
})

node.query(
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}"
f" SETTINGS export_merge_tree_partition_max_retries = 50"
)

wait_for_export_to_start(node, mt_table, s3_table, "2020")

# Let the tasks start executing and failing against the blocked S3.
time.sleep(2)

node.query(f"SYSTEM STOP MOVES {mt_table}")

# Give the cancel callback time to fire and the lock-release path to run.
time.sleep(3)

status = node.query(
f"SELECT status FROM system.replicated_partition_exports"
f" WHERE source_table = '{mt_table}' AND destination_table = '{s3_table}'"
f" AND partition_id = '2020'"
).strip()

assert status == "PENDING", (
f"Expected PENDING while moves are stopped and S3 is blocked, got '{status}'"
)

node.query(f"SYSTEM START MOVES {mt_table}")

# MinIO is now unblocked; the next scheduler cycle should succeed.
wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED", timeout=60)

row_count = int(node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip())
assert row_count == 3, f"Expected 3 rows in S3 after export completed, got {row_count}"

Loading