Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions src/brpc/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Stream::Stream()
, _remote_consumed(0)
, _cur_buf_size(0)
, _local_consumed(0)
, _atomic_local_consumed(0)
, _parse_rpc_response(false)
, _pending_buf(NULL)
, _start_idle_timer_us(0)
Expand Down Expand Up @@ -287,14 +288,21 @@ void Stream::SetConnected(const StreamSettings* remote_settings) {
CHECK(_host_socket != NULL);
RPC_VLOG << "stream=" << id() << " is connected to stream_id="
<< _remote_settings.stream_id() << " at host_socket=" << *_host_socket;
_connected = true;
_connected.store(true, butil::memory_order_release);
_connect_meta.ec = 0;
TriggerOnConnectIfNeed();
if (remote_settings == NULL) {
// Start the timer at server-side
// Client-side timer would triggered in Consume after received the first
// message which is the very RPC response
StartIdleTimer();
} else {
// send first feedback for client-side stream if it already consumed data
if (_remote_settings.need_feedback()) {
auto consumed_bytes = _atomic_local_consumed.load(butil::memory_order_acquire);
if (consumed_bytes > 0)
SendFeedback(consumed_bytes);
}
}
}

Expand Down Expand Up @@ -620,20 +628,34 @@ int Stream::Consume(void *meta, bthread::TaskIterator<butil::IOBuf*>& iter) {
}
mb.flush();

if (s->_remote_settings.need_feedback() && mb.total_length() > 0) {
s->_local_consumed += mb.total_length();
s->SendFeedback();
auto total_length = mb.total_length();
if (total_length > 0) {
// fast path for connected stream
if (s->_connected.load(butil::memory_order_acquire)){
if (s->_remote_settings.need_feedback()) {
s->_local_consumed += total_length;
s->SendFeedback(s->_local_consumed);
}
} else {
// Under the scenario of batch creation of Streams, there is concurrency between SetConnected and Consume for the same stream,
// and it is necessary to ensure the memory order.
s->_local_consumed = s->_atomic_local_consumed.fetch_add(total_length, butil::memory_order_release) + total_length;
if (s->_connected.load(butil::memory_order_acquire) && s->_remote_settings.need_feedback()) {
s->SendFeedback(s->_local_consumed);
}
}
}

s->StartIdleTimer();
return 0;
}

void Stream::SendFeedback() {
void Stream::SendFeedback(int64_t _consumed_bytes) {
StreamFrameMeta fm;
fm.set_frame_type(FRAME_TYPE_FEEDBACK);
fm.set_stream_id(_remote_settings.stream_id());
fm.set_source_stream_id(id());
fm.mutable_feedback()->set_consumed_size(_local_consumed);
fm.mutable_feedback()->set_consumed_size(_consumed_bytes);
butil::IOBuf out;
policy::PackStreamMessage(&out, fm, NULL);
WriteToHostSocket(&out);
Expand Down
7 changes: 4 additions & 3 deletions src/brpc/stream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ friend struct butil::DefaultDeleter<Stream>;
void TriggerOnConnectIfNeed();
void Wait(void (*on_writable)(StreamId, void*, int), void* arg,
const timespec* due_time, bool new_thread, bthread_id_t *join_id);
void SendFeedback();
void SendFeedback(int64_t _consumed_bytes);
void StartIdleTimer();
void StopIdleTimer();
void HandleRpcResponse(butil::IOBuf* response_buffer);
Expand Down Expand Up @@ -115,7 +115,7 @@ friend struct butil::DefaultDeleter<Stream>;

bthread_mutex_t _connect_mutex;
ConnectMeta _connect_meta;
bool _connected;
butil::atomic<bool> _connected;
bool _closed;
int _error_code;
std::string _error_text;
Expand All @@ -127,7 +127,8 @@ friend struct butil::DefaultDeleter<Stream>;
bthread_id_list_t _writable_wait_list;

int64_t _local_consumed;
StreamSettings _remote_settings;
butil::atomic<int64_t> _atomic_local_consumed;
StreamSettings _remote_settings;

bool _parse_rpc_response;
bthread::ExecutionQueueId<butil::IOBuf*> _consumer_queue;
Expand Down
Loading