[fix](rpc) Fix AutoReleaseClosure data race with callback reuse#61782
[fix](rpc) Fix AutoReleaseClosure data race with callback reuse#61782zclllyybb wants to merge 1 commit intoapache:masterfrom
Conversation
…g callback The callback's call() method may reuse the callback object (e.g., in vdata_stream_sender.h get_send_callback()), triggering a new RPC that mutates response_ and cntl_. If AutoReleaseClosure::Run() invokes call() before checking cntl_->Failed() or response_->status(), it reads the NEW RPC's state instead of the ORIGINAL RPC's result, causing:
|
/review |
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
There was a problem hiding this comment.
Pull request overview
Fixes a brpc AutoReleaseClosure race when callbacks are reused to start a new RPC (mutating response_/cntl_), by ensuring status/failure checks happen before invoking callback->call(), and adds unit tests to prevent regressions.
Changes:
- Reorders
AutoReleaseClosure::Run()so it checkscntl_/response_->status()before invokingcallback->call(). - Refactors runtime-filter sync-size RPC handling to keep the callback alive until RPC completion.
- Adds BE unit tests that simulate callback reuse mutating shared RPC state.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| be/test/exec/exchange/exchange_sink_test.cpp | Adds tests that simulate callback reuse and verify correct ordering. |
| be/src/util/brpc_closure.h | Reorders Run() and simplifies error handling paths. |
| be/src/exec/runtime_filter/runtime_filter_producer.h | Adds storage to keep sync-size callback alive across async RPC. |
| be/src/exec/runtime_filter/runtime_filter_producer.cpp | Introduces SyncSizeCallback and changes closure construction/lifetime. |
| be/src/exec/runtime_filter/runtime_filter_mgr.h | Updates _send_rf_to_target signature (removes QueryContext arg). |
| be/src/exec/runtime_filter/runtime_filter_mgr.cpp | Updates runtime-filter RPC closure construction (drops ctx passing). |
| be/src/exec/runtime_filter/runtime_filter.cpp | Updates runtime-filter RPC closure construction (drops ctx passing). |
| be/src/exec/operator/exchange_sink_buffer.cpp | Adds comments documenting callback reuse ordering constraints. |
| be/src/exec/exchange/vdata_stream_sender.h | Documents callback reuse rationale. |
Comments suppressed due to low confidence (1)
be/src/util/brpc_closure.h:1
- This change removes the previous
QueryContext-based failure propagation (anderror_msgaugmentation) fromAutoReleaseClosure, replacing it with logging only. That’s a behavior change: some call sites previously relied onAutoReleaseClosureto cancel the query on RPC failure / non-OKstatus(), gated byignore_runtime_filter_error; after this patch, errors may no longer interrupt the query and could lead to hangs or delayed failure handling. Recommendation (mandatory): restore a mechanism to propagate failures (either by reintroducing the optionalQueryContext+ captured status/failure info before callingtmp->call(), or by requiring callers to pass a callback that performs the cancel/sub/error handling) while keeping thecall()invocation as the last step to avoid the reuse race.
// Licensed to the Apache Software Foundation (ASF) under one
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| auto callback = SyncSizeCallback::create_shared(_dependency, _wrapper); | ||
| // Store callback in the producer to keep it alive until the RPC completes. | ||
| // AutoReleaseClosure holds callbacks via weak_ptr, so without this the callback | ||
| // would be destroyed when this function returns and error-path sub() would never fire. | ||
| _sync_size_callback = callback; | ||
| // RuntimeFilter maybe deconstructed before the rpc finished, so that could not use | ||
| // a raw pointer in closure. Has to use the context's shared ptr. | ||
| auto closure = SyncSizeClosure::create_unique(request, callback, _dependency, _wrapper, | ||
| state->query_options().ignore_runtime_filter_error | ||
| ? std::weak_ptr<QueryContext> {} | ||
| : state->get_query_ctx_weak()); | ||
| auto closure = AutoReleaseClosure<PSendFilterSizeRequest, SyncSizeCallback>::create_unique( | ||
| request, callback); |
There was a problem hiding this comment.
_sync_size_callback is a single slot, but send_size() can be invoked more than once during the producer lifecycle in some implementations; if a second call happens before the first RPC completes, this assignment overwrites the stored callback and can still allow the earlier callback to be destroyed early (reintroducing the 'weak_ptr callback dies before completion' failure). Also, the stored callback is never cleared on completion, which can unnecessarily retain _dependency until producer teardown. Recommendation (mandatory): make the storage support multiple in-flight RPCs (e.g., vector/list keyed by RPC generation or request id), and clear the stored entry when the RPC completes (both success and error paths), e.g., by having the callback clear a weak back-reference to the producer or by clearing in the completion handler that observes the response.
| ((CountedFinishDependency*)_dependency.get())->sub(); | ||
| }}; | ||
| auto wrapper = _wrapper.lock(); | ||
| if (!wrapper) { | ||
| return; | ||
| } | ||
|
|
||
| wrapper->set_state(RuntimeFilterWrapper::State::DISABLED, cntl_->ErrorText()); | ||
| } | ||
|
|
||
| void _process_if_meet_error_status(const Status& status) override { | ||
| Defer defer {[&]() { | ||
| Base::_process_if_meet_error_status(status); | ||
| Status status = Status::create(response_->status()); | ||
| if (!status.ok()) { | ||
| if (auto w = _wrapper.lock()) { | ||
| w->set_state(RuntimeFilterWrapper::State::DISABLED, status.to_string()); | ||
| } | ||
| ((CountedFinishDependency*)_dependency.get())->sub(); | ||
| }}; | ||
| auto wrapper = _wrapper.lock(); | ||
| if (!wrapper) { | ||
| return; | ||
| } |
There was a problem hiding this comment.
The C-style cast to CountedFinishDependency* can silently turn into UB if _dependency is not actually that type. Recommendation (mandatory): store _dependency as std::shared_ptr<CountedFinishDependency> in SyncSizeCallback (and pass that type in), or use a static_cast with a debug assertion documenting the invariant; avoid C-style casts here.
| auto closure = AutoReleaseClosure<PSyncFilterSizeRequest, | ||
| DummyBrpcCallback<PSyncFilterSizeResponse>>:: | ||
| create_unique(std::make_shared<PSyncFilterSizeRequest>(), | ||
| DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared(), ctx); | ||
| DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared()); |
There was a problem hiding this comment.
This call site previously passed a QueryContext weak_ptr (conditionally based on ignore_runtime_filter_error) into AutoReleaseClosure so failures could be escalated/canceled when the option is false. With the ctx parameter removed and the callback being DummyBrpcCallback, RPC failure/non-OK status() handling appears to be reduced to logging only, which can change query correctness/termination behavior. Recommendation (mandatory): either (1) restore ctx-based cancellation semantics in AutoReleaseClosure (capturing status/failure before invoking call()), or (2) replace DummyBrpcCallback with a runtime-filter-specific callback that performs the required cancel/disable/sub behavior under the same option gating.
| TEST_F(ExchangeSinkTest, test_closure_call_sees_original_ok_response) { | ||
| auto callback = TestCallback::create_shared(); | ||
| // Response starts OK (default). | ||
| callback->set_mutate_action(TestCallback::MutateAction::WRITE_ERROR); |
There was a problem hiding this comment.
The PR description lists failing/passing test names test_closure_call_must_not_corrupt_status_check, test_closure_call_must_not_hide_error_status, and test_closure_call_must_not_hide_rpc_failure, but the added tests in the diff use different names (e.g., test_closure_call_sees_original_ok_response). Recommendation (optional): update the PR description (or rename tests) so the referenced test names match what's actually added, to make it easier to validate the claim.
|
|
||
| std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> get_send_callback(RpcInstance* ins, | ||
| bool eos) { | ||
| // here we reuse the callback because it's re-construction may be expensive due to many parameters' capture |
There was a problem hiding this comment.
Fix grammar/spelling: change "it's re-construction" to "its reconstruction" (and consider rephrasing to avoid the apostrophe possessive).
| // here we reuse the callback because it's re-construction may be expensive due to many parameters' capture | |
| // here we reuse the callback because reconstructing it may be expensive due to capturing many parameters |
There was a problem hiding this comment.
Code Review Summary
PR Goal & Accomplishment
This PR fixes a real data race in AutoReleaseClosure::Run() where call() was invoked before checking cntl_->Failed() and response_->status(). Because call() can trigger callback reuse (e.g., in ExchangeSendCallback::call() -> _suc_fn -> _send_rpc() which reuses the same callback via get_send_callback()), the closure would then read mutated cntl_/response_ state from the new RPC instead of the original. The fix correctly reorders: log/check first, then call() last. The crash stacktrace and debug-log evidence clearly confirm the race.
The secondary change — removing QueryContext cancellation from AutoReleaseClosure and moving error handling into individual callbacks — is also sound. All callback types that need error handling (ExchangeSendCallback, SyncSizeCallback, WriteBlockCallback) already fully handle errors in their call() methods. The DummyBrpcCallback paths either have caller-side join()+check patterns or are fire-and-forget operations where query cancellation was too aggressive.
Compilation Bug Found
The old code had two _process_status overloads: an unconstrained no-op fallback template <typename Response> void _process_status(Response*) {} and a constrained template <HasStatus Response> void _process_status(Response*). The new code only has the constrained _log_error_status<HasStatus Response> but removed the unconstrained fallback. This will cause a compilation failure for PTabletWriterCancelResult (an empty protobuf message with no status() field), used in vtablet_writer.cpp:1229.
Critical Checkpoint Conclusions
-
Goal accomplished? Yes, the core data race fix is correct. Tests prove the ordering. However, there is a compilation bug (see inline comment).
-
Modification minimal and focused? Yes. The changes are well-scoped: core fix in
brpc_closure.h, consequent cleanup ofQueryContextremoval, andSyncSizeClosure->SyncSizeCallbackrefactor. -
Concurrency: The core fix addresses the concurrency issue correctly by ensuring
call()(which may trigger callback reuse and new RPCs) is the last operation inRun(). TheSyncSizeCallback::call()readscntl_andresponse_which are safe becausecall()is now called before any reuse can happen. -
Lifecycle management:
_sync_size_callback(asshared_ptr<void>) correctly keeps theSyncSizeCallbackalive until theRuntimeFilterProduceris destroyed. Without this,AutoReleaseClosure'sweak_ptrto the callback would expire whensend_size()returns, and the error-pathsub()would never fire. This is correct. -
Parallel code paths: The two success handler lambdas in
exchange_sink_buffer.cpp(unicast and broadcast) both have the same comment added about_send_rpcbeing last. Good. -
Test coverage: Three well-designed unit tests verify the ordering invariant by capturing state at
call()time and mutating afterward. Tests correctly fail with the old ordering and pass with the new. -
Error handling: The behavioral change from
QueryContext::cancel()toLOG(WARNING)is safe — all critical paths already handle errors in their callbacks or via caller-sidejoin()+check. -
No incompatible changes: No function symbol or storage format changes.
-
Configuration: No new configs added.
-
Observability: Error logging is preserved (downgraded from
canceltoLOG(WARNING)inAutoReleaseClosure, but callbacks handle errors appropriately).
| // this must be the LAST operation in this function, because call() may reuse the callback! (response_ is in callback_) | ||
| if (auto tmp = callback_.lock()) { | ||
| tmp->call(); | ||
| } |
There was a problem hiding this comment.
Bug: Missing unconstrained fallback overload for _log_error_status — will fail to compile.
The old code had two _process_status overloads:
template <typename Response>
void _process_status(Response* response) {} // no-op fallback
template <HasStatus Response>
void _process_status(Response* response) { ... } // constrainedThe new code only has the constrained version _log_error_status<HasStatus Response>, but the unconstrained no-op fallback was not carried over. This means _log_error_status<ResponseType>(response_.get()) will fail to compile when ResponseType does not satisfy the HasStatus concept.
Affected caller: AutoReleaseClosure<PTabletWriterCancelRequest, DummyBrpcCallback<PTabletWriterCancelResult>> in vtablet_writer.cpp:1229. PTabletWriterCancelResult is an empty protobuf message with no status() field.
Fix: Add a no-op fallback:
template <typename Response>
void _log_error_status(Response*) {
// No status field to check — nothing to log.
}C++20 concept subsumption ensures the HasStatus-constrained overload is preferred when it matches.
The callback's call() method may reuse the callback object (e.g., in vdata_stream_sender.h get_send_callback()), triggering a new RPC that mutates response_ and cntl_. If AutoReleaseClosure::Run() invokes call() before checking cntl_->Failed() or response_->status(), it reads the NEW RPC's state instead of the ORIGINAL RPC's result, causing:
we have confirmed the data race is real existing with temporary LOGs which has been removed:
and we add some be-ut which could only pass WITH this patch.
before we fix:
after: