From 93e9b9bac348e447588f3d662d4eeb311dce0f56 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Sat, 28 Feb 2026 11:53:52 +0100 Subject: [PATCH 1/2] RUBY-3241 URI option for monitoring type --- lib/mongo/client.rb | 1 + lib/mongo/server/monitor.rb | 29 +++++++++-- lib/mongo/uri.rb | 3 ++ lib/mongo/uri/options_mapper.rb | 28 +++++++++++ spec/mongo/server/monitor_spec.rb | 80 +++++++++++++++++++++++++++++++ spec/mongo/uri_spec.rb | 64 +++++++++++++++++++++++++ 6 files changed, 202 insertions(+), 3 deletions(-) diff --git a/lib/mongo/client.rb b/lib/mongo/client.rb index 1845dd170c..a04f7ea061 100644 --- a/lib/mongo/client.rb +++ b/lib/mongo/client.rb @@ -92,6 +92,7 @@ class Client :scan, :sdam_proc, :server_api, + :server_monitoring_mode, :server_selection_timeout, :socket_timeout, :srv_max_hosts, diff --git a/lib/mongo/server/monitor.rb b/lib/mongo/server/monitor.rb index 9130fe7128..37af549f43 100644 --- a/lib/mongo/server/monitor.rb +++ b/lib/mongo/server/monitor.rb @@ -331,9 +331,12 @@ def check end @connection = connection if tv_doc = result['topologyVersion'] - # Successful response, server 4.4+ - create_push_monitor!(TopologyVersion.new(tv_doc)) - push_monitor.run! + if streaming_enabled? + create_push_monitor!(TopologyVersion.new(tv_doc)) + push_monitor.run! + else + stop_push_monitor! + end else # Failed response or pre-4.4 server stop_push_monitor! @@ -351,6 +354,26 @@ def throttle_scan_frequency! sleep(delta) end end + + # Returns whether the streaming protocol is enabled, based on the + # serverMonitoringMode option. Default mode is :auto. + # + # - :stream - always use streaming when server supports it + # - :poll - never use streaming + # - :auto - use polling on FaaS platforms, streaming otherwise + # + # @return [ true | false ] Whether streaming is enabled. + def streaming_enabled? + mode = options[:server_monitoring_mode] || :auto + case mode + when :poll + false + when :stream + true + when :auto + !Server::AppMetadata::Environment.new.faas? + end + end end end end diff --git a/lib/mongo/uri.rb b/lib/mongo/uri.rb index 0b891a858c..abfb192dc5 100644 --- a/lib/mongo/uri.rb +++ b/lib/mongo/uri.rb @@ -204,6 +204,9 @@ class URI 'SCRAM-SHA-256' => :scram256, }.freeze + # Valid values for the serverMonitoringMode URI option. + SERVER_MONITORING_MODES = %w(stream poll auto).freeze + # Options that are allowed to appear more than once in the uri. # # In order to follow the URI options spec requirement that all instances diff --git a/lib/mongo/uri/options_mapper.rb b/lib/mongo/uri/options_mapper.rb index eac2f59c3b..f818b5f8a6 100644 --- a/lib/mongo/uri/options_mapper.rb +++ b/lib/mongo/uri/options_mapper.rb @@ -325,6 +325,9 @@ def self.uri_option(uri_key, name, **extra) uri_option 'retryWrites', :retry_writes, type: :bool uri_option 'zlibCompressionLevel', :zlib_compression_level, type: :zlib_compression_level + # Monitoring Options + uri_option 'serverMonitoringMode', :server_monitoring_mode, type: :server_monitoring_mode + # Converts +value+ to a boolean. # # Returns true for 'true', false for 'false', otherwise nil. @@ -713,6 +716,31 @@ def revert_read_mode(value) end alias :stringify_read_mode :revert_read_mode + # Server monitoring mode transformation. + # + # @param [ String ] name Name of the URI option being processed. + # @param [ String ] value The server monitoring mode string value. + # + # @return [ Symbol | nil ] The server monitoring mode symbol. + def convert_server_monitoring_mode(name, value) + mode = value.downcase + if SERVER_MONITORING_MODES.include?(mode) + mode.to_sym + else + log_warn("#{value} is not a valid server monitoring mode") + nil + end + end + + # Stringifies server monitoring mode. + # + # @param [ Symbol ] value The server monitoring mode. + # + # @return [ String ] The server monitoring mode as a string. + def stringify_server_monitoring_mode(value) + value.to_s + end + # Read preference tags transformation. # # @param [ String ] name Name of the URI option being processed. diff --git a/spec/mongo/server/monitor_spec.rb b/spec/mongo/server/monitor_spec.rb index 5e93b3a384..a27cef66ea 100644 --- a/spec/mongo/server/monitor_spec.rb +++ b/spec/mongo/server/monitor_spec.rb @@ -323,4 +323,84 @@ end end end + + describe '#streaming_enabled?' do + context 'when server_monitoring_mode is :stream' do + let(:monitor_options) do + { server_monitoring_mode: :stream } + end + + it 'returns true' do + expect(monitor.send(:streaming_enabled?)).to be true + end + end + + context 'when server_monitoring_mode is :poll' do + let(:monitor_options) do + { server_monitoring_mode: :poll } + end + + it 'returns false' do + expect(monitor.send(:streaming_enabled?)).to be false + end + end + + context 'when server_monitoring_mode is :auto' do + let(:monitor_options) do + { server_monitoring_mode: :auto } + end + + context 'when not in a FaaS environment' do + local_env( + 'AWS_EXECUTION_ENV' => nil, + 'AWS_LAMBDA_RUNTIME_API' => nil, + 'FUNCTIONS_WORKER_RUNTIME' => nil, + 'K_SERVICE' => nil, + 'FUNCTION_NAME' => nil, + 'VERCEL' => nil, + ) + + it 'returns true' do + expect(monitor.send(:streaming_enabled?)).to be true + end + end + + context 'when in a FaaS environment' do + local_env('FUNCTIONS_WORKER_RUNTIME' => 'ruby') + + it 'returns false' do + expect(monitor.send(:streaming_enabled?)).to be false + end + end + end + + context 'when server_monitoring_mode is not set' do + let(:monitor_options) do + {} + end + + context 'when not in a FaaS environment' do + local_env( + 'AWS_EXECUTION_ENV' => nil, + 'AWS_LAMBDA_RUNTIME_API' => nil, + 'FUNCTIONS_WORKER_RUNTIME' => nil, + 'K_SERVICE' => nil, + 'FUNCTION_NAME' => nil, + 'VERCEL' => nil, + ) + + it 'defaults to auto and returns true' do + expect(monitor.send(:streaming_enabled?)).to be true + end + end + + context 'when in a FaaS environment' do + local_env('FUNCTIONS_WORKER_RUNTIME' => 'ruby') + + it 'defaults to auto and returns false' do + expect(monitor.send(:streaming_enabled?)).to be false + end + end + end + end end diff --git a/spec/mongo/uri_spec.rb b/spec/mongo/uri_spec.rb index 0fffa6ef62..3f659c7e65 100644 --- a/spec/mongo/uri_spec.rb +++ b/spec/mongo/uri_spec.rb @@ -1439,5 +1439,69 @@ include_examples "roundtrips string" end + + context 'when a serverMonitoringMode option is provided' do + context 'stream' do + let(:options) { 'serverMonitoringMode=stream' } + + it 'sets the server monitoring mode in uri options' do + expect(uri.uri_options[:server_monitoring_mode]).to eq(:stream) + end + + it 'sets the option on a client created with the uri' do + client = new_local_client_nmio(string) + expect(client.options[:server_monitoring_mode]).to eq(:stream) + end + + include_examples "roundtrips string" + end + + context 'poll' do + let(:options) { 'serverMonitoringMode=poll' } + + it 'sets the server monitoring mode in uri options' do + expect(uri.uri_options[:server_monitoring_mode]).to eq(:poll) + end + + it 'sets the option on a client created with the uri' do + client = new_local_client_nmio(string) + expect(client.options[:server_monitoring_mode]).to eq(:poll) + end + + include_examples "roundtrips string" + end + + context 'auto' do + let(:options) { 'serverMonitoringMode=auto' } + + it 'sets the server monitoring mode in uri options' do + expect(uri.uri_options[:server_monitoring_mode]).to eq(:auto) + end + + it 'sets the option on a client created with the uri' do + client = new_local_client_nmio(string) + expect(client.options[:server_monitoring_mode]).to eq(:auto) + end + + include_examples "roundtrips string" + end + + context 'case insensitive' do + let(:options) { 'serverMonitoringMode=Stream' } + + it 'sets the server monitoring mode in uri options' do + expect(uri.uri_options[:server_monitoring_mode]).to eq(:stream) + end + end + + context 'invalid value' do + let(:options) { 'serverMonitoringMode=invalid' } + + it 'warns and does not set the option' do + expect(Mongo::Logger.logger).to receive(:warn).with(/invalid is not a valid server monitoring mode/) + expect(uri.uri_options[:server_monitoring_mode]).to be_nil + end + end + end end end From 2d3933729bf8a9a2bda9e96b47fcf60b1b76b717 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Sat, 28 Feb 2026 12:08:38 +0100 Subject: [PATCH 2/2] Update spec tests runner --- spec/runners/unified/assertions.rb | 7 +++++++ spec/runners/unified/test.rb | 6 ++++++ 2 files changed, 13 insertions(+) diff --git a/spec/runners/unified/assertions.rb b/spec/runners/unified/assertions.rb index d85a804425..d571449c12 100644 --- a/spec/runners/unified/assertions.rb +++ b/spec/runners/unified/assertions.rb @@ -175,6 +175,10 @@ def assert_events actual_events.select! do |event| event.class.name.sub(/.*::/, '') =~ /^(?:Pool|Connection)/ end + when 'sdam' + actual_events.select! do |event| + event.class.name.sub(/.*::/, '') =~ /^(?:Server|Topology)/ + end end if (!ignore_extra_events && actual_events.length != expected_events.length) || @@ -217,6 +221,9 @@ def assert_event_matches(actual, expected) if interrupt_in_use_connections = spec.use('interruptInUseConnections') assert_matches(actual.options[:interrupt_in_use_connections], interrupt_in_use_connections, 'Command interrupt_in_use_connections does not match expectation') end + unless (awaited = spec.use('awaited')).nil? + assert_eq(actual.awaited?, awaited, 'Event awaited does not match expectation') + end unless spec.empty? raise NotImplementedError, "Unhandled keys: #{spec}" end diff --git a/spec/runners/unified/test.rb b/spec/runners/unified/test.rb index ff95009d44..4de8170385 100644 --- a/spec/runners/unified/test.rb +++ b/spec/runners/unified/test.rb @@ -191,6 +191,12 @@ def generate_entities(es) end kind = event.sub('Event', '').gsub(/([A-Z])/) { "_#{$1}" }.downcase.to_sym subscriber.add_wanted_events(kind) + when 'serverHeartbeatStartedEvent', 'serverHeartbeatSucceededEvent', 'serverHeartbeatFailedEvent' + unless client.send(:monitoring).subscribers[Mongo::Monitoring::SERVER_HEARTBEAT]&.include?(subscriber) + client.subscribe(Mongo::Monitoring::SERVER_HEARTBEAT, subscriber) + end + kind = event.sub('Event', '').gsub(/([A-Z])/) { "_#{$1}" }.downcase.to_sym + subscriber.add_wanted_events(kind) else raise NotImplementedError, "Unknown event #{event}" end