diff --git a/google/cloud/storage/async/object_descriptor.cc b/google/cloud/storage/async/object_descriptor.cc index 3f1bbabb88a5e..cb66a50f2658c 100644 --- a/google/cloud/storage/async/object_descriptor.cc +++ b/google/cloud/storage/async/object_descriptor.cc @@ -52,6 +52,8 @@ std::pair ObjectDescriptor::ReadLast( return {AsyncReader(std::move(reader)), std::move(token)}; } +bool ObjectDescriptor::IsOpen() const { return impl_->IsOpen(); } + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage } // namespace cloud diff --git a/google/cloud/storage/async/object_descriptor.h b/google/cloud/storage/async/object_descriptor.h index c826859d7a027..545d4ba96aeea 100644 --- a/google/cloud/storage/async/object_descriptor.h +++ b/google/cloud/storage/async/object_descriptor.h @@ -76,6 +76,14 @@ class ObjectDescriptor { */ std::pair ReadLast(std::int64_t limit); + /** + * Returns true if the descriptor is open. + * + * A descriptor is open if it has not been cancelled and has not hit a + * permanent failure. + */ + bool IsOpen() const; + private: std::shared_ptr impl_; }; diff --git a/google/cloud/storage/async/object_descriptor_connection.h b/google/cloud/storage/async/object_descriptor_connection.h index f7ea1898a12bf..41560d4673761 100644 --- a/google/cloud/storage/async/object_descriptor_connection.h +++ b/google/cloud/storage/async/object_descriptor_connection.h @@ -61,6 +61,14 @@ class ObjectDescriptorConnection { virtual std::unique_ptr Read(ReadParams p) = 0; virtual void MakeSubsequentStream() = 0; + + /** + * Returns true if the descriptor is open. + * + * A descriptor is open if it has not been cancelled and has not hit a + * permanent failure. + */ + virtual bool IsOpen() const { return true; } }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/storage/async/object_descriptor_test.cc b/google/cloud/storage/async/object_descriptor_test.cc index 4fd3dd0724235..5426d194ab2fd 100644 --- a/google/cloud/storage/async/object_descriptor_test.cc +++ b/google/cloud/storage/async/object_descriptor_test.cc @@ -186,6 +186,15 @@ TEST(ObjectDescriptor, ReadExceedsMaxRange) { EXPECT_FALSE(token.valid()); } +TEST(ObjectDescriptor, IsOpen) { + auto mock = std::make_shared(); + EXPECT_CALL(*mock, IsOpen).WillOnce(Return(true)).WillOnce(Return(false)); + + auto tested = ObjectDescriptor(mock); + EXPECT_TRUE(tested.IsOpen()); + EXPECT_FALSE(tested.IsOpen()); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index 32ae9a4e9f0e0..e67993317b6d0 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -55,6 +55,7 @@ #include "google/cloud/internal/async_streaming_read_rpc_timeout.h" #include "google/cloud/internal/async_streaming_write_rpc_timeout.h" #include "google/cloud/internal/make_status.h" +#include #include #include @@ -231,19 +232,33 @@ AsyncConnectionImpl::Open(OpenParams p) { auto pending = factory(std::move(initial_request)); using ReturnType = std::shared_ptr; - return pending.then( - [rp = std::move(resume_policy), fa = std::move(factory), - rs = std::move(p.read_spec), - options = std::move(p.options)](auto f) mutable -> StatusOr { - auto result = f.get(); - if (!result) return std::move(result).status(); - - auto impl = std::make_shared( - std::move(rp), std::move(fa), std::move(rs), - std::move(result->stream), std::move(options)); - impl->Start(std::move(result->first_response)); - return ReturnType(impl); - }); + return pending.then([rp = std::move(resume_policy), fa = std::move(factory), + rs = std::move(p.read_spec), + options = std::move(p.options), refresh = refresh_]( + auto f) mutable -> StatusOr { + auto result = f.get(); + if (!result) return std::move(result).status(); + + // The descriptor remains open if at least one gRPC channel is in a + // functional state. We consider READY, IDLE, and CONNECTING to be + // functional. TRANSIENT_FAILURE and SHUTDOWN are not included because they + // indicate a definitive loss of connectivity or terminal closure. + auto transport_ok = [refresh] { + if (!refresh) return true; + auto const& channels = refresh->channels(); + return std::any_of( + channels.begin(), channels.end(), [](auto const& channel) { + auto state = channel->GetState(false); + return state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE || + state == GRPC_CHANNEL_CONNECTING; + }); + }; + auto impl = std::make_shared( + std::move(rp), std::move(fa), std::move(rs), std::move(result->stream), + std::move(options), std::move(transport_ok)); + impl->Start(std::move(result->first_response)); + return ReturnType(impl); + }); } future>> diff --git a/google/cloud/storage/internal/async/object_descriptor_connection_tracing.cc b/google/cloud/storage/internal/async/object_descriptor_connection_tracing.cc index 64cf5bde9d4ed..58adf2e2c6c5f 100644 --- a/google/cloud/storage/internal/async/object_descriptor_connection_tracing.cc +++ b/google/cloud/storage/internal/async/object_descriptor_connection_tracing.cc @@ -46,6 +46,7 @@ class AsyncObjectDescriptorConnectionTracing absl::optional metadata() const override { return impl_->metadata(); } + bool IsOpen() const override { return impl_->IsOpen(); } std::unique_ptr Read(ReadParams p) override { internal::OTelScope scope(span_); diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.cc b/google/cloud/storage/internal/async/object_descriptor_impl.cc index c4ab3e28c9a43..e5ea63ccd3ed8 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl.cc @@ -34,11 +34,13 @@ ObjectDescriptorImpl::ObjectDescriptorImpl( std::unique_ptr resume_policy, OpenStreamFactory make_stream, google::storage::v2::BidiReadObjectSpec read_object_spec, - std::shared_ptr stream, Options options) + std::shared_ptr stream, Options options, + std::function transport_ok) : resume_policy_prototype_(std::move(resume_policy)), make_stream_(std::move(make_stream)), read_object_spec_(std::move(read_object_spec)), - options_(std::move(options)) { + options_(std::move(options)), + transport_ok_(std::move(transport_ok)) { stream_manager_ = std::make_unique( []() -> std::shared_ptr { return nullptr; }, // NOLINT std::make_shared(std::move(stream), @@ -62,8 +64,18 @@ void ObjectDescriptorImpl::Start( } } +bool ObjectDescriptorImpl::IsOpen() const { + { + std::unique_lock lk(mu_); + if (cancelled_) return false; + if (stream_manager_->Empty()) return false; + } + return !transport_ok_ || transport_ok_(); +} + void ObjectDescriptorImpl::Cancel() { std::unique_lock lk(mu_); + if (cancelled_) return; cancelled_ = true; if (stream_manager_) stream_manager_->CancelAll(); if (pending_stream_.valid()) pending_stream_.cancel(); diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.h b/google/cloud/storage/internal/async/object_descriptor_impl.h index f7f8a0d55a285..e45ce7caf6cfb 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.h +++ b/google/cloud/storage/internal/async/object_descriptor_impl.h @@ -27,6 +27,7 @@ #include "absl/types/optional.h" #include "google/storage/v2/storage.pb.h" #include +#include #include #include #include @@ -59,8 +60,8 @@ class ObjectDescriptorImpl ObjectDescriptorImpl(std::unique_ptr resume_policy, OpenStreamFactory make_stream, google::storage::v2::BidiReadObjectSpec read_object_spec, - std::shared_ptr stream, - Options options = {}); + std::shared_ptr stream, Options options = {}, + std::function transport_ok = {}); ~ObjectDescriptorImpl() override; // Start the read loop. @@ -82,6 +83,8 @@ class ObjectDescriptorImpl std::size_t StreamSize() const; + bool IsOpen() const override; + private: using StreamManager = MultiStreamManager; using StreamIterator = @@ -123,6 +126,7 @@ class ObjectDescriptorImpl google::cloud::StatusOr> pending_stream_; bool cancelled_ = false; + std::function transport_ok_; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/storage/internal/async/object_descriptor_impl_test.cc b/google/cloud/storage/internal/async/object_descriptor_impl_test.cc index 96c998b51c1eb..f87f8b3c1a939 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl_test.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl_test.cc @@ -67,15 +67,18 @@ auto constexpr kMetadataText = R"pb( auto NoResume() { return storage::LimitedErrorCountResumePolicy(0)(); } -auto MakeTested(std::unique_ptr resume_policy, - OpenStreamFactory make_stream, - google::storage::v2::BidiReadObjectSpec read_object_spec, - std::shared_ptr stream) { +auto MakeTested( + std::unique_ptr resume_policy, + OpenStreamFactory make_stream, + google::storage::v2::BidiReadObjectSpec read_object_spec, + std::shared_ptr stream, + std::function transport_ok = [] { return true; }) { Options options; options.set(true); return std::make_shared( std::move(resume_policy), std::move(make_stream), - std::move(read_object_spec), std::move(stream), std::move(options)); + std::move(read_object_spec), std::move(stream), std::move(options), + std::move(transport_ok)); } MATCHER_P(IsProtoEqualModuloRepeatedFieldOrdering, value, @@ -1784,6 +1787,83 @@ TEST(ObjectDescriptorImpl, MultiStreamOptimizationDisabled) { tested.reset(); } +/// @test Verify that IsOpen() is true by default. +TEST(ObjectDescriptorImpl, IsOpenTrueByDefault) { + MockFactory factory; + auto stream = std::make_unique(); + EXPECT_CALL(*stream, Finish).WillOnce(Return(make_ready_future(Status{}))); + EXPECT_CALL(*stream, Cancel).Times(AtMost(1)); + auto tested = MakeTested(NoResume(), factory.AsStdFunction(), + google::storage::v2::BidiReadObjectSpec{}, + std::make_shared(std::move(stream))); + EXPECT_TRUE(tested->IsOpen()); +} + +/// @test Verify that IsOpen() is false after Cancel(). +TEST(ObjectDescriptorImpl, IsOpenFalseOnCancel) { + MockFactory factory; + auto stream = std::make_unique(); + EXPECT_CALL(*stream, Finish).WillOnce(Return(make_ready_future(Status{}))); + EXPECT_CALL(*stream, Cancel).Times(AtMost(2)); + auto tested = MakeTested(NoResume(), factory.AsStdFunction(), + google::storage::v2::BidiReadObjectSpec{}, + std::make_shared(std::move(stream))); + EXPECT_TRUE(tested->IsOpen()); + tested->Cancel(); + EXPECT_FALSE(tested->IsOpen()); +} + +/// @test Verify that IsOpen() is false if transport health check fails. +TEST(ObjectDescriptorImpl, IsOpenFalseOnPermanentError) { + MockFactory factory; + auto stream = std::make_unique(); + EXPECT_CALL(*stream, Finish).WillOnce(Return(make_ready_future(Status{}))); + EXPECT_CALL(*stream, Cancel).Times(AtMost(1)); + bool transport_ok = true; + auto transport_ok_callback = [&transport_ok] { return transport_ok; }; + + auto tested = MakeTested(NoResume(), factory.AsStdFunction(), + google::storage::v2::BidiReadObjectSpec{}, + std::make_shared(std::move(stream)), + transport_ok_callback); + + EXPECT_TRUE(tested->IsOpen()); + transport_ok = false; + EXPECT_FALSE(tested->IsOpen()); +} + +TEST(ObjectDescriptorImpl, IsOpenFalseOnTransportFailure) { + auto stream = std::make_unique(); + EXPECT_CALL(*stream, Cancel).Times(1); + EXPECT_CALL(*stream, Finish).WillOnce([] { + return make_ready_future(Status{}); + }); + MockFactory factory; + auto transport_ok = [] { return false; }; + auto tested = std::make_shared( + NoResume(), factory.AsStdFunction(), + google::storage::v2::BidiReadObjectSpec{}, + std::make_shared(std::move(stream)), Options{}, + std::move(transport_ok)); + EXPECT_FALSE(tested->IsOpen()); +} + +TEST(ObjectDescriptorImpl, IsOpenTrueOnTransportSuccess) { + auto stream = std::make_unique(); + EXPECT_CALL(*stream, Cancel).Times(1); + EXPECT_CALL(*stream, Finish).WillOnce([] { + return make_ready_future(Status{}); + }); + MockFactory factory; + auto transport_ok = [] { return true; }; + auto tested = std::make_shared( + NoResume(), factory.AsStdFunction(), + google::storage::v2::BidiReadObjectSpec{}, + std::make_shared(std::move(stream)), Options{}, + std::move(transport_ok)); + EXPECT_TRUE(tested->IsOpen()); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal diff --git a/google/cloud/storage/mocks/mock_async_object_descriptor_connection.h b/google/cloud/storage/mocks/mock_async_object_descriptor_connection.h index f0e089ac4dada..ad4f2aa94783b 100644 --- a/google/cloud/storage/mocks/mock_async_object_descriptor_connection.h +++ b/google/cloud/storage/mocks/mock_async_object_descriptor_connection.h @@ -33,6 +33,7 @@ class MockAsyncObjectDescriptorConnection (ReadParams), (override)); MOCK_METHOD(void, MakeSubsequentStream, (), (override)); MOCK_METHOD(Options, options, (), (const, override)); + MOCK_METHOD(bool, IsOpen, (), (const, override)); }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END