From f120e726d3982cffbdd4be0850a7e4406c30a1cb Mon Sep 17 00:00:00 2001 From: jenrryyou Date: Thu, 5 Feb 2026 15:38:53 +0800 Subject: [PATCH] Make batch create stream SendFeedback thread safe --- src/brpc/stream.cpp | 34 ++++++++++++++++++++++++++++------ src/brpc/stream_impl.h | 7 ++++--- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp index 2a4430548f..a2a106a8b1 100644 --- a/src/brpc/stream.cpp +++ b/src/brpc/stream.cpp @@ -52,6 +52,7 @@ Stream::Stream() , _remote_consumed(0) , _cur_buf_size(0) , _local_consumed(0) + , _atomic_local_consumed(0) , _parse_rpc_response(false) , _pending_buf(NULL) , _start_idle_timer_us(0) @@ -287,7 +288,7 @@ void Stream::SetConnected(const StreamSettings* remote_settings) { CHECK(_host_socket != NULL); RPC_VLOG << "stream=" << id() << " is connected to stream_id=" << _remote_settings.stream_id() << " at host_socket=" << *_host_socket; - _connected = true; + _connected.store(true, butil::memory_order_release); _connect_meta.ec = 0; TriggerOnConnectIfNeed(); if (remote_settings == NULL) { @@ -295,6 +296,13 @@ void Stream::SetConnected(const StreamSettings* remote_settings) { // Client-side timer would triggered in Consume after received the first // message which is the very RPC response StartIdleTimer(); + } else { + // send first feedback for client-side stream if it already consumed data + if (_remote_settings.need_feedback()) { + auto consumed_bytes = _atomic_local_consumed.load(butil::memory_order_acquire); + if (consumed_bytes > 0) + SendFeedback(consumed_bytes); + } } } @@ -620,20 +628,34 @@ int Stream::Consume(void *meta, bthread::TaskIterator& iter) { } mb.flush(); - if (s->_remote_settings.need_feedback() && mb.total_length() > 0) { - s->_local_consumed += mb.total_length(); - s->SendFeedback(); + auto total_length = mb.total_length(); + if (total_length > 0) { + // fast path for connected stream + if (s->_connected.load(butil::memory_order_acquire)){ + if (s->_remote_settings.need_feedback()) { + s->_local_consumed += total_length; + s->SendFeedback(s->_local_consumed); + } + } else { + // Under the scenario of batch creation of Streams, there is concurrency between SetConnected and Consume for the same stream, + // and it is necessary to ensure the memory order. + s->_local_consumed = s->_atomic_local_consumed.fetch_add(total_length, butil::memory_order_release) + total_length; + if (s->_connected.load(butil::memory_order_acquire) && s->_remote_settings.need_feedback()) { + s->SendFeedback(s->_local_consumed); + } + } } + s->StartIdleTimer(); return 0; } -void Stream::SendFeedback() { +void Stream::SendFeedback(int64_t _consumed_bytes) { StreamFrameMeta fm; fm.set_frame_type(FRAME_TYPE_FEEDBACK); fm.set_stream_id(_remote_settings.stream_id()); fm.set_source_stream_id(id()); - fm.mutable_feedback()->set_consumed_size(_local_consumed); + fm.mutable_feedback()->set_consumed_size(_consumed_bytes); butil::IOBuf out; policy::PackStreamMessage(&out, fm, NULL); WriteToHostSocket(&out); diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h index 5ff7cb04a2..284b33ca33 100644 --- a/src/brpc/stream_impl.h +++ b/src/brpc/stream_impl.h @@ -81,7 +81,7 @@ friend struct butil::DefaultDeleter; void TriggerOnConnectIfNeed(); void Wait(void (*on_writable)(StreamId, void*, int), void* arg, const timespec* due_time, bool new_thread, bthread_id_t *join_id); - void SendFeedback(); + void SendFeedback(int64_t _consumed_bytes); void StartIdleTimer(); void StopIdleTimer(); void HandleRpcResponse(butil::IOBuf* response_buffer); @@ -115,7 +115,7 @@ friend struct butil::DefaultDeleter; bthread_mutex_t _connect_mutex; ConnectMeta _connect_meta; - bool _connected; + butil::atomic _connected; bool _closed; int _error_code; std::string _error_text; @@ -127,7 +127,8 @@ friend struct butil::DefaultDeleter; bthread_id_list_t _writable_wait_list; int64_t _local_consumed; - StreamSettings _remote_settings; + butil::atomic _atomic_local_consumed; + StreamSettings _remote_settings; bool _parse_rpc_response; bthread::ExecutionQueueId _consumer_queue;