Skip to content

[fix](rpc) Fix AutoReleaseClosure data race with callback reuse#61782

Draft
zclllyybb wants to merge 1 commit intoapache:masterfrom
zclllyybb:brpc_closure
Draft

[fix](rpc) Fix AutoReleaseClosure data race with callback reuse#61782
zclllyybb wants to merge 1 commit intoapache:masterfrom
zclllyybb:brpc_closure

Conversation

@zclllyybb
Copy link
Contributor

The callback's call() method may reuse the callback object (e.g., in vdata_stream_sender.h get_send_callback()), triggering a new RPC that mutates response_ and cntl_. If AutoReleaseClosure::Run() invokes call() before checking cntl_->Failed() or response_->status(), it reads the NEW RPC's state instead of the ORIGINAL RPC's result, causing:

*** SIGSEGV address not mapped to object (@0x0) received by PID 238162 (TID 240463 OR 0xfffa2c9898e0) from PID 0; stack trace: ***
 0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /home/zcp/repo_center/doris_release/doris/be/src/common/signal_handler.h:421
 1# os::Linux::chained_handler(int, siginfo_t*, void*) in /opt/module/doris/java8/jre/lib/aarch64/server/libjvm.so
 2# JVM_handle_linux_signal in /opt/module/doris/java8/jre/lib/aarch64/server/libjvm.so
 3# signalHandler(int, siginfo_t*, void*) in /opt/module/doris/java8/jre/lib/aarch64/server/libjvm.so
 4# 0x0000FFFF0AB107C0 in linux-vdso.so.1
 5# doris::Status doris::Status::create<true>(doris::PStatus const&) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:398
 6# void doris::AutoReleaseClosure<doris::PTransmitDataParams, doris::pipeline::ExchangeSendCallback<doris::PTransmitDataResult> >::_process_status<doris::PTransmitDataResult>(doris::PTransmitDataResult*) at /home/zcp/repo_center/doris_release/doris/be/src/util/ref_count_closure.h:128
 7# doris::AutoReleaseClosure<doris::PTransmitDataParams, doris::pipeline::ExchangeSendCallback<doris::PTransmitDataResult> >::Run() at /home/zcp/repo_center/doris_release/doris/be/src/util/ref_count_closure.h:102
 8# brpc::Controller::EndRPC(brpc::Controller::CompletionInfo const&) in /opt/module/doris/be/lib/doris_be
 9# brpc::policy::ProcessRpcResponse(brpc::InputMessageBase*) in /opt/module/doris/be/lib/doris_be
10# brpc::ProcessInputMessage(void*) in /opt/module/doris/be/lib/doris_be
11# bthread::TaskGroup::task_runner(long) in /opt/module/doris/be/lib/doris_be
12# bthread_make_fcontext in /opt/module/doris/be/lib/doris_be

we have confirmed the data race is real existing with temporary LOGs which has been removed:

F20260325 21:46:58.465230 3453395 brpc_closure.h:116] Check failed: _debug_generation_at_construction == current_gen (2 vs. 3) RACE DETECTED: AutoReleaseClosure response_ was reused by a new RPC (generation changed from 2 to 3) while still in Run(). The old closure is about to read response_->status() but the new RPC may be concurrently writing to the same response_ object.

and we add some be-ut which could only pass WITH this patch.
before we fix:

[----------] 7 tests from ExchangeSinkTest (5 ms total)

[----------] Global test environment tear-down
[==========] 7 tests from 1 test suite ran. (5 ms total)
[  PASSED  ] 4 tests.
[  FAILED  ] 3 tests, listed below:
[  FAILED  ] ExchangeSinkTest.test_closure_call_must_not_corrupt_status_check
[  FAILED  ] ExchangeSinkTest.test_closure_call_must_not_hide_error_status
[  FAILED  ] ExchangeSinkTest.test_closure_call_must_not_hide_rpc_failure

after:

[----------] 7 tests from ExchangeSinkTest (4 ms total)

[----------] Global test environment tear-down
[==========] 7 tests from 1 test suite ran. (5 ms total)
[  PASSED  ] 7 tests.

…g callback

The callback's call() method may reuse the callback object (e.g., in
vdata_stream_sender.h get_send_callback()), triggering a new RPC that
mutates response_ and cntl_. If AutoReleaseClosure::Run() invokes
call() before checking cntl_->Failed() or response_->status(), it reads
the NEW RPC's state instead of the ORIGINAL RPC's result, causing:
@zclllyybb zclllyybb added usercase Important user case type label dev/4.0.x dev/4.1.x labels Mar 26, 2026
@zclllyybb
Copy link
Contributor Author

/review

@hello-stephen
Copy link
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@zclllyybb
Copy link
Contributor Author

run buildall

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes a brpc AutoReleaseClosure race when callbacks are reused to start a new RPC (mutating response_/cntl_), by ensuring status/failure checks happen before invoking callback->call(), and adds unit tests to prevent regressions.

Changes:

  • Reorders AutoReleaseClosure::Run() so it checks cntl_ / response_->status() before invoking callback->call().
  • Refactors runtime-filter sync-size RPC handling to keep the callback alive until RPC completion.
  • Adds BE unit tests that simulate callback reuse mutating shared RPC state.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
be/test/exec/exchange/exchange_sink_test.cpp Adds tests that simulate callback reuse and verify correct ordering.
be/src/util/brpc_closure.h Reorders Run() and simplifies error handling paths.
be/src/exec/runtime_filter/runtime_filter_producer.h Adds storage to keep sync-size callback alive across async RPC.
be/src/exec/runtime_filter/runtime_filter_producer.cpp Introduces SyncSizeCallback and changes closure construction/lifetime.
be/src/exec/runtime_filter/runtime_filter_mgr.h Updates _send_rf_to_target signature (removes QueryContext arg).
be/src/exec/runtime_filter/runtime_filter_mgr.cpp Updates runtime-filter RPC closure construction (drops ctx passing).
be/src/exec/runtime_filter/runtime_filter.cpp Updates runtime-filter RPC closure construction (drops ctx passing).
be/src/exec/operator/exchange_sink_buffer.cpp Adds comments documenting callback reuse ordering constraints.
be/src/exec/exchange/vdata_stream_sender.h Documents callback reuse rationale.
Comments suppressed due to low confidence (1)

be/src/util/brpc_closure.h:1

  • This change removes the previous QueryContext-based failure propagation (and error_msg augmentation) from AutoReleaseClosure, replacing it with logging only. That’s a behavior change: some call sites previously relied on AutoReleaseClosure to cancel the query on RPC failure / non-OK status(), gated by ignore_runtime_filter_error; after this patch, errors may no longer interrupt the query and could lead to hangs or delayed failure handling. Recommendation (mandatory): restore a mechanism to propagate failures (either by reintroducing the optional QueryContext + captured status/failure info before calling tmp->call(), or by requiring callers to pass a callback that performs the cancel/sub/error handling) while keeping the call() invocation as the last step to avoid the reuse race.
// Licensed to the Apache Software Foundation (ASF) under one

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +194 to +202
auto callback = SyncSizeCallback::create_shared(_dependency, _wrapper);
// Store callback in the producer to keep it alive until the RPC completes.
// AutoReleaseClosure holds callbacks via weak_ptr, so without this the callback
// would be destroyed when this function returns and error-path sub() would never fire.
_sync_size_callback = callback;
// RuntimeFilter maybe deconstructed before the rpc finished, so that could not use
// a raw pointer in closure. Has to use the context's shared ptr.
auto closure = SyncSizeClosure::create_unique(request, callback, _dependency, _wrapper,
state->query_options().ignore_runtime_filter_error
? std::weak_ptr<QueryContext> {}
: state->get_query_ctx_weak());
auto closure = AutoReleaseClosure<PSendFilterSizeRequest, SyncSizeCallback>::create_unique(
request, callback);
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_sync_size_callback is a single slot, but send_size() can be invoked more than once during the producer lifecycle in some implementations; if a second call happens before the first RPC completes, this assignment overwrites the stored callback and can still allow the earlier callback to be destroyed early (reintroducing the 'weak_ptr callback dies before completion' failure). Also, the stored callback is never cleared on completion, which can unnecessarily retain _dependency until producer teardown. Recommendation (mandatory): make the storage support multiple in-flight RPCs (e.g., vector/list keyed by RPC generation or request id), and clear the stored entry when the RPC completes (both success and error paths), e.g., by having the callback clear a weak back-reference to the producer or by clearing in the completion handler that observes the response.

Copilot uses AI. Check for mistakes.
Comment on lines 115 to 125
((CountedFinishDependency*)_dependency.get())->sub();
}};
auto wrapper = _wrapper.lock();
if (!wrapper) {
return;
}

wrapper->set_state(RuntimeFilterWrapper::State::DISABLED, cntl_->ErrorText());
}

void _process_if_meet_error_status(const Status& status) override {
Defer defer {[&]() {
Base::_process_if_meet_error_status(status);
Status status = Status::create(response_->status());
if (!status.ok()) {
if (auto w = _wrapper.lock()) {
w->set_state(RuntimeFilterWrapper::State::DISABLED, status.to_string());
}
((CountedFinishDependency*)_dependency.get())->sub();
}};
auto wrapper = _wrapper.lock();
if (!wrapper) {
return;
}
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The C-style cast to CountedFinishDependency* can silently turn into UB if _dependency is not actually that type. Recommendation (mandatory): store _dependency as std::shared_ptr<CountedFinishDependency> in SyncSizeCallback (and pass that type in), or use a static_cast with a debug assertion documenting the invariant; avoid C-style casts here.

Copilot uses AI. Check for mistakes.
Comment on lines 254 to +257
auto closure = AutoReleaseClosure<PSyncFilterSizeRequest,
DummyBrpcCallback<PSyncFilterSizeResponse>>::
create_unique(std::make_shared<PSyncFilterSizeRequest>(),
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared(), ctx);
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared());
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call site previously passed a QueryContext weak_ptr (conditionally based on ignore_runtime_filter_error) into AutoReleaseClosure so failures could be escalated/canceled when the option is false. With the ctx parameter removed and the callback being DummyBrpcCallback, RPC failure/non-OK status() handling appears to be reduced to logging only, which can change query correctness/termination behavior. Recommendation (mandatory): either (1) restore ctx-based cancellation semantics in AutoReleaseClosure (capturing status/failure before invoking call()), or (2) replace DummyBrpcCallback with a runtime-filter-specific callback that performs the required cancel/disable/sub behavior under the same option gating.

Copilot uses AI. Check for mistakes.
Comment on lines +304 to +307
TEST_F(ExchangeSinkTest, test_closure_call_sees_original_ok_response) {
auto callback = TestCallback::create_shared();
// Response starts OK (default).
callback->set_mutate_action(TestCallback::MutateAction::WRITE_ERROR);
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description lists failing/passing test names test_closure_call_must_not_corrupt_status_check, test_closure_call_must_not_hide_error_status, and test_closure_call_must_not_hide_rpc_failure, but the added tests in the diff use different names (e.g., test_closure_call_sees_original_ok_response). Recommendation (optional): update the PR description (or rename tests) so the referenced test names match what's actually added, to make it easier to validate the claim.

Copilot uses AI. Check for mistakes.

std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> get_send_callback(RpcInstance* ins,
bool eos) {
// here we reuse the callback because it's re-construction may be expensive due to many parameters' capture
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix grammar/spelling: change "it's re-construction" to "its reconstruction" (and consider rephrasing to avoid the apostrophe possessive).

Suggested change
// here we reuse the callback because it's re-construction may be expensive due to many parameters' capture
// here we reuse the callback because reconstructing it may be expensive due to capturing many parameters

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Summary

PR Goal & Accomplishment

This PR fixes a real data race in AutoReleaseClosure::Run() where call() was invoked before checking cntl_->Failed() and response_->status(). Because call() can trigger callback reuse (e.g., in ExchangeSendCallback::call() -> _suc_fn -> _send_rpc() which reuses the same callback via get_send_callback()), the closure would then read mutated cntl_/response_ state from the new RPC instead of the original. The fix correctly reorders: log/check first, then call() last. The crash stacktrace and debug-log evidence clearly confirm the race.

The secondary change — removing QueryContext cancellation from AutoReleaseClosure and moving error handling into individual callbacks — is also sound. All callback types that need error handling (ExchangeSendCallback, SyncSizeCallback, WriteBlockCallback) already fully handle errors in their call() methods. The DummyBrpcCallback paths either have caller-side join()+check patterns or are fire-and-forget operations where query cancellation was too aggressive.

Compilation Bug Found

The old code had two _process_status overloads: an unconstrained no-op fallback template <typename Response> void _process_status(Response*) {} and a constrained template <HasStatus Response> void _process_status(Response*). The new code only has the constrained _log_error_status<HasStatus Response> but removed the unconstrained fallback. This will cause a compilation failure for PTabletWriterCancelResult (an empty protobuf message with no status() field), used in vtablet_writer.cpp:1229.

Critical Checkpoint Conclusions

  1. Goal accomplished? Yes, the core data race fix is correct. Tests prove the ordering. However, there is a compilation bug (see inline comment).

  2. Modification minimal and focused? Yes. The changes are well-scoped: core fix in brpc_closure.h, consequent cleanup of QueryContext removal, and SyncSizeClosure -> SyncSizeCallback refactor.

  3. Concurrency: The core fix addresses the concurrency issue correctly by ensuring call() (which may trigger callback reuse and new RPCs) is the last operation in Run(). The SyncSizeCallback::call() reads cntl_ and response_ which are safe because call() is now called before any reuse can happen.

  4. Lifecycle management: _sync_size_callback (as shared_ptr<void>) correctly keeps the SyncSizeCallback alive until the RuntimeFilterProducer is destroyed. Without this, AutoReleaseClosure's weak_ptr to the callback would expire when send_size() returns, and the error-path sub() would never fire. This is correct.

  5. Parallel code paths: The two success handler lambdas in exchange_sink_buffer.cpp (unicast and broadcast) both have the same comment added about _send_rpc being last. Good.

  6. Test coverage: Three well-designed unit tests verify the ordering invariant by capturing state at call() time and mutating afterward. Tests correctly fail with the old ordering and pass with the new.

  7. Error handling: The behavioral change from QueryContext::cancel() to LOG(WARNING) is safe — all critical paths already handle errors in their callbacks or via caller-side join()+check.

  8. No incompatible changes: No function symbol or storage format changes.

  9. Configuration: No new configs added.

  10. Observability: Error logging is preserved (downgraded from cancel to LOG(WARNING) in AutoReleaseClosure, but callbacks handle errors appropriately).

// this must be the LAST operation in this function, because call() may reuse the callback! (response_ is in callback_)
if (auto tmp = callback_.lock()) {
tmp->call();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Missing unconstrained fallback overload for _log_error_status — will fail to compile.

The old code had two _process_status overloads:

template <typename Response>
void _process_status(Response* response) {} // no-op fallback

template <HasStatus Response>
void _process_status(Response* response) { ... } // constrained

The new code only has the constrained version _log_error_status<HasStatus Response>, but the unconstrained no-op fallback was not carried over. This means _log_error_status<ResponseType>(response_.get()) will fail to compile when ResponseType does not satisfy the HasStatus concept.

Affected caller: AutoReleaseClosure<PTabletWriterCancelRequest, DummyBrpcCallback<PTabletWriterCancelResult>> in vtablet_writer.cpp:1229. PTabletWriterCancelResult is an empty protobuf message with no status() field.

Fix: Add a no-op fallback:

template <typename Response>
void _log_error_status(Response*) {
    // No status field to check — nothing to log.
}

C++20 concept subsumption ensures the HasStatus-constrained overload is preferred when it matches.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dev/4.0.x dev/4.1.x usercase Important user case type label

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants