Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1459,7 +1459,15 @@ void Controller::HandleStreamConnection(Socket *host_socket) {
if(!ptrs[i]) continue;
Stream* extra_stream = (Stream *) ptrs[i]->conn();
_remote_stream_settings->set_stream_id(extra_stream_ids[i - 1]);
s->SetHostSocket(host_socket);
if (s->SetHostSocket(host_socket) != 0) {
const int ec = errno;
Stream::SetFailed(_request_streams, ec,
"Fail to bind response stream to %s",
host_socket->description().c_str());
SetFailed(ec, "Fail to bind response stream to %s",
host_socket->description().c_str());
return;
}
extra_stream->SetConnected(_remote_stream_settings);
}
}
Expand Down
45 changes: 37 additions & 8 deletions src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,19 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
Stream* s = (Stream *) stream_ptr->conn();
StreamSettings *stream_settings = meta.mutable_stream_settings();
s->FillSettings(stream_settings);
s->SetHostSocket(sock);
if (s->SetHostSocket(sock) != 0) {
const int errcode = errno;
LOG_IF(WARNING, errcode != EPIPE)
<< "Fail to bind response stream=" << response_stream_id
<< " to " << sock->description() << ": "
<< berror(errcode);
cntl->SetFailed(errcode, "Fail to bind response stream to %s",
sock->description().c_str());
Comment on lines +363 to +370
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On SetHostSocket() failure you’re logging/propagating sock->non_zero_error_code(), but SetHostSocket() already sets errno to the specific bind failure. Using errno from the failing call will be more accurate and avoids relying on the socket’s racy _error_code publication timing (see comment in Socket::non_zero_error_code()).

Copilot uses AI. Check for mistakes.
Stream::SetFailed(response_stream_ids, errcode,
"Fail to bind response stream to %s",
sock->description().c_str());
Comment on lines +372 to +373
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These new Stream::SetFailed(..., "...%s", ...) calls assume Stream::SetFailed correctly forwards printf-style varargs. In src/brpc/stream.cpp, Stream::SetFailed currently passes a va_list into Stream::Close(...) via ..., which is undefined behavior when the format string has specifiers. This can corrupt the failure reason or crash on some ABIs. Please fix Stream::SetFailed/Close to properly accept/forward a va_list (or pre-format the message into a string and pass it as "%s").

Suggested change
"Fail to bind response stream to %s",
sock->description().c_str());
"Fail to bind response stream to host socket");

Copilot uses AI. Check for mistakes.
return;
}
for (size_t i = 1; i < response_stream_ids.size(); ++i) {
stream_settings->mutable_extra_stream_ids()->Add(response_stream_ids[i]);
}
Expand Down Expand Up @@ -390,6 +402,15 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,

ResponseWriteInfo args;
bthread_id_t response_id = INVALID_BTHREAD_ID;
auto response_write_guard = butil::MakeScopeGuard([&response_id, &args, span] {
if (response_id == INVALID_BTHREAD_ID) {
return;
}
bthread_id_join(response_id);
// Do not care about the result of background writing.
// TODO: this is not sent
span->set_sent_us(args.sent_us);
});
if (span) {
span->set_response_size(res_buf.size());
CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten));
Expand Down Expand Up @@ -426,7 +447,21 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
SocketUniquePtr extra_stream_ptr;
if (Socket::Address(extra_stream_id, &extra_stream_ptr) == 0) {
Stream* extra_stream = (Stream *) extra_stream_ptr->conn();
extra_stream->SetHostSocket(sock);
if (extra_stream->SetHostSocket(sock) != 0) {
const int errcode = errno;
LOG_IF(WARNING, errcode != EPIPE)
<< "Fail to bind response stream=" << extra_stream_id
<< " to " << sock->description() << ": "
<< berror(errcode);
cntl->SetFailed(errcode, "Fail to bind response stream to %s",
sock->description().c_str());
StreamIds remaining_stream_ids(response_stream_ids.begin() + i,
response_stream_ids.end());
Stream::SetFailed(remaining_stream_ids, errcode,
"Fail to bind response stream to %s",
sock->description().c_str());
Comment on lines +458 to +462
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same varargs-forwarding issue applies here: the formatted Stream::SetFailed(remaining_stream_ids, ..., "...%s", ...) relies on undefined behavior in Stream::SetFailed/Close when the format string contains specifiers. Please address the varargs forwarding (or pre-format the message before calling Stream::SetFailed).

Copilot uses AI. Check for mistakes.
return;
}
extra_stream->SetConnected();
} else {
LOG(WARNING) << "Stream=" << extra_stream_id
Expand All @@ -451,12 +486,6 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
}
}

if (span) {
bthread_id_join(response_id);
// Do not care about the result of background writing.
// TODO: this is not sent
span->set_sent_us(args.sent_us);
}
}

namespace {
Expand Down
31 changes: 22 additions & 9 deletions src/brpc/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "brpc/stream.h"

#include <gflags/gflags.h>
#include "butil/string_printf.h"
#include "butil/time.h"
#include "butil/object_pool.h"
#include "butil/unique_ptr.h"
Expand Down Expand Up @@ -56,6 +57,7 @@ Stream::Stream()
, _pending_buf(NULL)
, _start_idle_timer_us(0)
, _idle_timer(0)
, _set_host_socket_ec(0)
{
_connect_meta.on_connect = NULL;
CHECK_EQ(0, bthread_mutex_init(&_connect_mutex, NULL));
Expand Down Expand Up @@ -643,13 +645,16 @@ int Stream::SetHostSocket(Socket *host_socket) {
std::call_once(_set_host_socket_flag, [this, host_socket]() {
SocketUniquePtr ptr;
host_socket->ReAddress(&ptr);
// TODO add *this to host socke
if (ptr->AddStream(id()) != 0) {
CHECK(false) << id() << " fail to add stream to host socket";
_set_host_socket_ec = errno ? errno : ptr->non_zero_error_code();
return;
}
_host_socket = ptr.release();
});
if (_host_socket == NULL) {
errno = _set_host_socket_ec ? _set_host_socket_ec : EFAILEDSOCKET;
return -1;
Comment on lines +654 to +656
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that SetHostSocket() can return -1 instead of aborting, it’s important that all call sites check the return value to avoid later CHECK(_host_socket != NULL)/null-deref paths (e.g. WriteToHostSocket, SetConnected). Please audit/update remaining callers that still ignore the return value.

Copilot uses AI. Check for mistakes.
}
return 0;
}

Expand Down Expand Up @@ -709,27 +714,35 @@ void Stream::Close(int error_code, const char* reason_fmt, ...) {
return TriggerOnConnectIfNeed();
}

int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) {
int Stream::SetFailedWithReason(StreamId id, int error_code,
const std::string& reason) {
SocketUniquePtr ptr;
if (Socket::AddressFailedAsWell(id, &ptr) == -1) {
// Don't care recycled stream
return 0;
}
Stream* s = (Stream*)ptr->conn();
s->Close(error_code, "%s", reason.c_str());
return 0;
}

int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) {
va_list ap;
va_start(ap, reason_fmt);
s->Close(error_code, reason_fmt, ap);
std::string reason;
butil::string_vprintf(&reason, reason_fmt, ap);
va_end(ap);
return 0;
return SetFailedWithReason(id, error_code, reason);
}

int Stream::SetFailed(const StreamIds& ids, int error_code, const char* reason_fmt, ...) {
va_list ap;
va_start(ap, reason_fmt);
for(size_t i = 0; i< ids.size(); ++i) {
Stream::SetFailed(ids[i], error_code, reason_fmt, ap);
}
std::string reason;
butil::string_vprintf(&reason, reason_fmt, ap);
va_end(ap);
for (size_t i = 0; i < ids.size(); ++i) {
Stream::SetFailedWithReason(ids[i], error_code, reason);
}
return 0;
}

Expand Down
4 changes: 4 additions & 0 deletions src/brpc/stream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#define BRPC_STREAM_IMPL_H

#include <mutex>
#include <string>
#include "bthread/bthread.h"
#include "bthread/execution_queue.h"
#include "brpc/socket.h"
Expand Down Expand Up @@ -91,6 +92,8 @@ friend struct butil::DefaultDeleter<Stream>;
static int TriggerOnWritable(bthread_id_t id, void *data, int error_code);
static void *RunOnWritable(void* arg);
static void* RunOnConnect(void* arg);
static int SetFailedWithReason(StreamId id, int error_code,
const std::string& reason);

struct ConnectMeta {
int (*on_connect)(int, int, void*);
Expand Down Expand Up @@ -135,6 +138,7 @@ friend struct butil::DefaultDeleter<Stream>;
int64_t _start_idle_timer_us;
bthread_timer_t _idle_timer;
std::once_flag _set_host_socket_flag;
int _set_host_socket_ec;
};

} // namespace brpc
Expand Down
81 changes: 81 additions & 0 deletions test/brpc_streaming_rpc_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
// Date: 2015/10/22 16:28:44

#include <gtest/gtest.h>
#include <errno.h>
#include "brpc/server.h"

#include "brpc/controller.h"
#include "brpc/channel.h"
#include "brpc/socket.h"
#include "brpc/details/controller_private_accessor.h"
#include "brpc/stream_impl.h"
#include "brpc/policy/streaming_rpc_protocol.h"
#include "echo.pb.h"
Expand Down Expand Up @@ -78,6 +80,52 @@ class StreamingRpcTest : public testing::Test {
test::EchoResponse response;
};

class MyServiceWithStreamAndFailedSocket : public test::EchoService {
public:
explicit MyServiceWithStreamAndFailedSocket(const brpc::StreamOptions& options)
: _options(options) {}

void Echo(::google::protobuf::RpcController* controller,
const ::test::EchoRequest* request,
::test::EchoResponse* response,
::google::protobuf::Closure* done) override {
brpc::ClosureGuard done_guard(done);
response->set_message(request->message());
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
brpc::StreamId response_stream;
ASSERT_EQ(0, StreamAccept(&response_stream, *cntl, &_options));
brpc::ControllerPrivateAccessor accessor(cntl);
ASSERT_TRUE(accessor.get_sending_socket() != NULL);
accessor.get_sending_socket()->SetFailed();
}

private:
brpc::StreamOptions _options;
};

TEST_F(StreamingRpcTest, set_host_socket_returns_error_when_socket_is_failed) {
brpc::SocketOptions socket_options;
brpc::SocketId host_socket_id;
ASSERT_EQ(0, brpc::Socket::Create(socket_options, &host_socket_id));
brpc::SocketUniquePtr host_socket;
ASSERT_EQ(0, brpc::Socket::Address(host_socket_id, &host_socket));
ASSERT_EQ(0, host_socket->SetFailed());

brpc::StreamId stream_id;
brpc::StreamOptions stream_options;
ASSERT_EQ(0, brpc::Stream::Create(stream_options, NULL, &stream_id, false));
brpc::ScopedStream stream_guard(stream_id);

brpc::SocketUniquePtr stream_socket;
ASSERT_EQ(0, brpc::Socket::Address(stream_id, &stream_socket));
brpc::Stream* stream = static_cast<brpc::Stream*>(stream_socket->conn());

errno = 0;
ASSERT_EQ(-1, stream->SetHostSocket(host_socket.get()));
ASSERT_NE(0, errno);
ASSERT_TRUE(stream->_host_socket == NULL);
}

TEST_F(StreamingRpcTest, sanity) {
brpc::Server server;
MyServiceWithStream service;
Expand Down Expand Up @@ -159,6 +207,39 @@ class OrderedInputHandler : public brpc::StreamInputHandler {
HandlerControl* _cntl;
};

TEST_F(StreamingRpcTest, server_failed_socket_before_response_closes_stream_without_abort) {
OrderedInputHandler handler;
brpc::StreamOptions response_stream_options;
response_stream_options.handler = &handler;
brpc::Server server;
MyServiceWithStreamAndFailedSocket service(response_stream_options);
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server.Start(9007, NULL));

brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL));
brpc::Controller cntl;
brpc::StreamId request_stream;
ASSERT_EQ(0, StreamCreate(&request_stream, cntl, NULL));
brpc::ScopedStream stream_guard(request_stream);

test::EchoService_Stub stub(&channel);
stub.Echo(&cntl, &request, &response, NULL);
ASSERT_TRUE(cntl.Failed());

for (int i = 0; i < 10000 && !handler.stopped(); ++i) {
usleep(100);
}

server.Stop(0);
server.Join();

ASSERT_TRUE(handler.stopped());
ASSERT_TRUE(handler.failed());
ASSERT_EQ(0, handler.idle_times());
ASSERT_EQ(0, handler._expected_next_value);
}

TEST_F(StreamingRpcTest, received_in_order) {
OrderedInputHandler handler;
brpc::StreamOptions opt;
Expand Down