From b567af3d1cd112904a7585c86dd95ada65875710 Mon Sep 17 00:00:00 2001 From: amoxic Date: Wed, 25 Mar 2026 10:40:27 +0800 Subject: [PATCH] Handle response stream bind failures gracefully --- src/brpc/controller.cpp | 10 +++- src/brpc/policy/baidu_rpc_protocol.cpp | 45 +++++++++++--- src/brpc/stream.cpp | 31 +++++++--- src/brpc/stream_impl.h | 4 ++ test/brpc_streaming_rpc_unittest.cpp | 81 ++++++++++++++++++++++++++ 5 files changed, 153 insertions(+), 18 deletions(-) diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 133d1f0453..c72db7c910 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -1459,7 +1459,15 @@ void Controller::HandleStreamConnection(Socket *host_socket) { if(!ptrs[i]) continue; Stream* extra_stream = (Stream *) ptrs[i]->conn(); _remote_stream_settings->set_stream_id(extra_stream_ids[i - 1]); - s->SetHostSocket(host_socket); + if (s->SetHostSocket(host_socket) != 0) { + const int ec = errno; + Stream::SetFailed(_request_streams, ec, + "Fail to bind response stream to %s", + host_socket->description().c_str()); + SetFailed(ec, "Fail to bind response stream to %s", + host_socket->description().c_str()); + return; + } extra_stream->SetConnected(_remote_stream_settings); } } diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 0dba01624a..0917915994 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -360,7 +360,19 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl, Stream* s = (Stream *) stream_ptr->conn(); StreamSettings *stream_settings = meta.mutable_stream_settings(); s->FillSettings(stream_settings); - s->SetHostSocket(sock); + if (s->SetHostSocket(sock) != 0) { + const int errcode = errno; + LOG_IF(WARNING, errcode != EPIPE) + << "Fail to bind response stream=" << response_stream_id + << " to " << sock->description() << ": " + << berror(errcode); + cntl->SetFailed(errcode, "Fail to bind response stream to %s", + sock->description().c_str()); + Stream::SetFailed(response_stream_ids, errcode, + "Fail to bind response stream to %s", + sock->description().c_str()); + return; + } for (size_t i = 1; i < response_stream_ids.size(); ++i) { stream_settings->mutable_extra_stream_ids()->Add(response_stream_ids[i]); } @@ -390,6 +402,15 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl, ResponseWriteInfo args; bthread_id_t response_id = INVALID_BTHREAD_ID; + auto response_write_guard = butil::MakeScopeGuard([&response_id, &args, span] { + if (response_id == INVALID_BTHREAD_ID) { + return; + } + bthread_id_join(response_id); + // Do not care about the result of background writing. + // TODO: this is not sent + span->set_sent_us(args.sent_us); + }); if (span) { span->set_response_size(res_buf.size()); CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten)); @@ -426,7 +447,21 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl, SocketUniquePtr extra_stream_ptr; if (Socket::Address(extra_stream_id, &extra_stream_ptr) == 0) { Stream* extra_stream = (Stream *) extra_stream_ptr->conn(); - extra_stream->SetHostSocket(sock); + if (extra_stream->SetHostSocket(sock) != 0) { + const int errcode = errno; + LOG_IF(WARNING, errcode != EPIPE) + << "Fail to bind response stream=" << extra_stream_id + << " to " << sock->description() << ": " + << berror(errcode); + cntl->SetFailed(errcode, "Fail to bind response stream to %s", + sock->description().c_str()); + StreamIds remaining_stream_ids(response_stream_ids.begin() + i, + response_stream_ids.end()); + Stream::SetFailed(remaining_stream_ids, errcode, + "Fail to bind response stream to %s", + sock->description().c_str()); + return; + } extra_stream->SetConnected(); } else { LOG(WARNING) << "Stream=" << extra_stream_id @@ -451,12 +486,6 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl, } } - if (span) { - bthread_id_join(response_id); - // Do not care about the result of background writing. - // TODO: this is not sent - span->set_sent_us(args.sent_us); - } } namespace { diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp index 2a4430548f..7225dd36f2 100644 --- a/src/brpc/stream.cpp +++ b/src/brpc/stream.cpp @@ -19,6 +19,7 @@ #include "brpc/stream.h" #include +#include "butil/string_printf.h" #include "butil/time.h" #include "butil/object_pool.h" #include "butil/unique_ptr.h" @@ -56,6 +57,7 @@ Stream::Stream() , _pending_buf(NULL) , _start_idle_timer_us(0) , _idle_timer(0) + , _set_host_socket_ec(0) { _connect_meta.on_connect = NULL; CHECK_EQ(0, bthread_mutex_init(&_connect_mutex, NULL)); @@ -643,13 +645,16 @@ int Stream::SetHostSocket(Socket *host_socket) { std::call_once(_set_host_socket_flag, [this, host_socket]() { SocketUniquePtr ptr; host_socket->ReAddress(&ptr); - // TODO add *this to host socke if (ptr->AddStream(id()) != 0) { - CHECK(false) << id() << " fail to add stream to host socket"; + _set_host_socket_ec = errno ? errno : ptr->non_zero_error_code(); return; } _host_socket = ptr.release(); }); + if (_host_socket == NULL) { + errno = _set_host_socket_ec ? _set_host_socket_ec : EFAILEDSOCKET; + return -1; + } return 0; } @@ -709,27 +714,35 @@ void Stream::Close(int error_code, const char* reason_fmt, ...) { return TriggerOnConnectIfNeed(); } -int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) { +int Stream::SetFailedWithReason(StreamId id, int error_code, + const std::string& reason) { SocketUniquePtr ptr; if (Socket::AddressFailedAsWell(id, &ptr) == -1) { - // Don't care recycled stream return 0; } Stream* s = (Stream*)ptr->conn(); + s->Close(error_code, "%s", reason.c_str()); + return 0; +} + +int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) { va_list ap; va_start(ap, reason_fmt); - s->Close(error_code, reason_fmt, ap); + std::string reason; + butil::string_vprintf(&reason, reason_fmt, ap); va_end(ap); - return 0; + return SetFailedWithReason(id, error_code, reason); } int Stream::SetFailed(const StreamIds& ids, int error_code, const char* reason_fmt, ...) { va_list ap; va_start(ap, reason_fmt); - for(size_t i = 0; i< ids.size(); ++i) { - Stream::SetFailed(ids[i], error_code, reason_fmt, ap); - } + std::string reason; + butil::string_vprintf(&reason, reason_fmt, ap); va_end(ap); + for (size_t i = 0; i < ids.size(); ++i) { + Stream::SetFailedWithReason(ids[i], error_code, reason); + } return 0; } diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h index 5ff7cb04a2..300b01cecb 100644 --- a/src/brpc/stream_impl.h +++ b/src/brpc/stream_impl.h @@ -20,6 +20,7 @@ #define BRPC_STREAM_IMPL_H #include +#include #include "bthread/bthread.h" #include "bthread/execution_queue.h" #include "brpc/socket.h" @@ -91,6 +92,8 @@ friend struct butil::DefaultDeleter; static int TriggerOnWritable(bthread_id_t id, void *data, int error_code); static void *RunOnWritable(void* arg); static void* RunOnConnect(void* arg); + static int SetFailedWithReason(StreamId id, int error_code, + const std::string& reason); struct ConnectMeta { int (*on_connect)(int, int, void*); @@ -135,6 +138,7 @@ friend struct butil::DefaultDeleter; int64_t _start_idle_timer_us; bthread_timer_t _idle_timer; std::once_flag _set_host_socket_flag; + int _set_host_socket_ec; }; } // namespace brpc diff --git a/test/brpc_streaming_rpc_unittest.cpp b/test/brpc_streaming_rpc_unittest.cpp index 056ea9a963..1b03048e24 100644 --- a/test/brpc_streaming_rpc_unittest.cpp +++ b/test/brpc_streaming_rpc_unittest.cpp @@ -20,11 +20,13 @@ // Date: 2015/10/22 16:28:44 #include +#include #include "brpc/server.h" #include "brpc/controller.h" #include "brpc/channel.h" #include "brpc/socket.h" +#include "brpc/details/controller_private_accessor.h" #include "brpc/stream_impl.h" #include "brpc/policy/streaming_rpc_protocol.h" #include "echo.pb.h" @@ -78,6 +80,52 @@ class StreamingRpcTest : public testing::Test { test::EchoResponse response; }; +class MyServiceWithStreamAndFailedSocket : public test::EchoService { +public: + explicit MyServiceWithStreamAndFailedSocket(const brpc::StreamOptions& options) + : _options(options) {} + + void Echo(::google::protobuf::RpcController* controller, + const ::test::EchoRequest* request, + ::test::EchoResponse* response, + ::google::protobuf::Closure* done) override { + brpc::ClosureGuard done_guard(done); + response->set_message(request->message()); + brpc::Controller* cntl = static_cast(controller); + brpc::StreamId response_stream; + ASSERT_EQ(0, StreamAccept(&response_stream, *cntl, &_options)); + brpc::ControllerPrivateAccessor accessor(cntl); + ASSERT_TRUE(accessor.get_sending_socket() != NULL); + accessor.get_sending_socket()->SetFailed(); + } + +private: + brpc::StreamOptions _options; +}; + +TEST_F(StreamingRpcTest, set_host_socket_returns_error_when_socket_is_failed) { + brpc::SocketOptions socket_options; + brpc::SocketId host_socket_id; + ASSERT_EQ(0, brpc::Socket::Create(socket_options, &host_socket_id)); + brpc::SocketUniquePtr host_socket; + ASSERT_EQ(0, brpc::Socket::Address(host_socket_id, &host_socket)); + ASSERT_EQ(0, host_socket->SetFailed()); + + brpc::StreamId stream_id; + brpc::StreamOptions stream_options; + ASSERT_EQ(0, brpc::Stream::Create(stream_options, NULL, &stream_id, false)); + brpc::ScopedStream stream_guard(stream_id); + + brpc::SocketUniquePtr stream_socket; + ASSERT_EQ(0, brpc::Socket::Address(stream_id, &stream_socket)); + brpc::Stream* stream = static_cast(stream_socket->conn()); + + errno = 0; + ASSERT_EQ(-1, stream->SetHostSocket(host_socket.get())); + ASSERT_NE(0, errno); + ASSERT_TRUE(stream->_host_socket == NULL); +} + TEST_F(StreamingRpcTest, sanity) { brpc::Server server; MyServiceWithStream service; @@ -159,6 +207,39 @@ class OrderedInputHandler : public brpc::StreamInputHandler { HandlerControl* _cntl; }; +TEST_F(StreamingRpcTest, server_failed_socket_before_response_closes_stream_without_abort) { + OrderedInputHandler handler; + brpc::StreamOptions response_stream_options; + response_stream_options.handler = &handler; + brpc::Server server; + MyServiceWithStreamAndFailedSocket service(response_stream_options); + ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); + ASSERT_EQ(0, server.Start(9007, NULL)); + + brpc::Channel channel; + ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL)); + brpc::Controller cntl; + brpc::StreamId request_stream; + ASSERT_EQ(0, StreamCreate(&request_stream, cntl, NULL)); + brpc::ScopedStream stream_guard(request_stream); + + test::EchoService_Stub stub(&channel); + stub.Echo(&cntl, &request, &response, NULL); + ASSERT_TRUE(cntl.Failed()); + + for (int i = 0; i < 10000 && !handler.stopped(); ++i) { + usleep(100); + } + + server.Stop(0); + server.Join(); + + ASSERT_TRUE(handler.stopped()); + ASSERT_TRUE(handler.failed()); + ASSERT_EQ(0, handler.idle_times()); + ASSERT_EQ(0, handler._expected_next_value); +} + TEST_F(StreamingRpcTest, received_in_order) { OrderedInputHandler handler; brpc::StreamOptions opt;