Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/mongo/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class Client
:scan,
:sdam_proc,
:server_api,
:server_monitoring_mode,
:server_selection_timeout,
:socket_timeout,
:srv_max_hosts,
Expand Down
29 changes: 26 additions & 3 deletions lib/mongo/server/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,12 @@ def check
end
@connection = connection
if tv_doc = result['topologyVersion']
# Successful response, server 4.4+
create_push_monitor!(TopologyVersion.new(tv_doc))
push_monitor.run!
if streaming_enabled?
create_push_monitor!(TopologyVersion.new(tv_doc))
push_monitor.run!
else
stop_push_monitor!
end
else
# Failed response or pre-4.4 server
stop_push_monitor!
Expand All @@ -351,6 +354,26 @@ def throttle_scan_frequency!
sleep(delta)
end
end

# Returns whether the streaming protocol is enabled, based on the
# serverMonitoringMode option. Default mode is :auto.
#
# - :stream - always use streaming when server supports it
# - :poll - never use streaming
# - :auto - use polling on FaaS platforms, streaming otherwise
#
# @return [ true | false ] Whether streaming is enabled.
def streaming_enabled?
mode = options[:server_monitoring_mode] || :auto
case mode
when :poll
false
when :stream
Comment on lines +367 to +371
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streaming_enabled? can return nil if :server_monitoring_mode is set to an unexpected value (e.g. a String like "poll" or any other symbol), because the case has no else. This makes the method violate its boolean contract and will implicitly disable streaming without any warning. Consider normalizing string values (e.g. downcase+to_sym) and adding an else branch that logs a warning and defaults to :auto (or raises) so the return value is always true/false.

Copilot uses AI. Check for mistakes.
true
when :auto
!Server::AppMetadata::Environment.new.faas?
end
end
end
end
end
Expand Down
3 changes: 3 additions & 0 deletions lib/mongo/uri.rb
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ class URI
'SCRAM-SHA-256' => :scram256,
}.freeze

# Valid values for the serverMonitoringMode URI option.
SERVER_MONITORING_MODES = %w(stream poll auto).freeze

# Options that are allowed to appear more than once in the uri.
#
# In order to follow the URI options spec requirement that all instances
Expand Down
28 changes: 28 additions & 0 deletions lib/mongo/uri/options_mapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,9 @@ def self.uri_option(uri_key, name, **extra)
uri_option 'retryWrites', :retry_writes, type: :bool
uri_option 'zlibCompressionLevel', :zlib_compression_level, type: :zlib_compression_level

# Monitoring Options
uri_option 'serverMonitoringMode', :server_monitoring_mode, type: :server_monitoring_mode

# Converts +value+ to a boolean.
#
# Returns true for 'true', false for 'false', otherwise nil.
Expand Down Expand Up @@ -713,6 +716,31 @@ def revert_read_mode(value)
end
alias :stringify_read_mode :revert_read_mode

# Server monitoring mode transformation.
#
# @param [ String ] name Name of the URI option being processed.
# @param [ String ] value The server monitoring mode string value.
#
# @return [ Symbol | nil ] The server monitoring mode symbol.
def convert_server_monitoring_mode(name, value)
mode = value.downcase
if SERVER_MONITORING_MODES.include?(mode)
mode.to_sym
else
log_warn("#{value} is not a valid server monitoring mode")
nil
end
end

# Stringifies server monitoring mode.
#
# @param [ Symbol ] value The server monitoring mode.
#
# @return [ String ] The server monitoring mode as a string.
def stringify_server_monitoring_mode(value)
value.to_s
end

# Read preference tags transformation.
#
# @param [ String ] name Name of the URI option being processed.
Expand Down
80 changes: 80 additions & 0 deletions spec/mongo/server/monitor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -323,4 +323,84 @@
end
end
end

describe '#streaming_enabled?' do
context 'when server_monitoring_mode is :stream' do
let(:monitor_options) do
{ server_monitoring_mode: :stream }
end

it 'returns true' do
expect(monitor.send(:streaming_enabled?)).to be true
end
end

context 'when server_monitoring_mode is :poll' do
let(:monitor_options) do
{ server_monitoring_mode: :poll }
end

it 'returns false' do
expect(monitor.send(:streaming_enabled?)).to be false
end
end

context 'when server_monitoring_mode is :auto' do
let(:monitor_options) do
{ server_monitoring_mode: :auto }
end

context 'when not in a FaaS environment' do
local_env(
'AWS_EXECUTION_ENV' => nil,
'AWS_LAMBDA_RUNTIME_API' => nil,
'FUNCTIONS_WORKER_RUNTIME' => nil,
'K_SERVICE' => nil,
'FUNCTION_NAME' => nil,
'VERCEL' => nil,
)

it 'returns true' do
expect(monitor.send(:streaming_enabled?)).to be true
end
end

context 'when in a FaaS environment' do
local_env('FUNCTIONS_WORKER_RUNTIME' => 'ruby')

it 'returns false' do
expect(monitor.send(:streaming_enabled?)).to be false
end
end
end

context 'when server_monitoring_mode is not set' do
let(:monitor_options) do
{}
end

context 'when not in a FaaS environment' do
local_env(
'AWS_EXECUTION_ENV' => nil,
'AWS_LAMBDA_RUNTIME_API' => nil,
'FUNCTIONS_WORKER_RUNTIME' => nil,
'K_SERVICE' => nil,
'FUNCTION_NAME' => nil,
'VERCEL' => nil,
)

it 'defaults to auto and returns true' do
expect(monitor.send(:streaming_enabled?)).to be true
end
end

context 'when in a FaaS environment' do
local_env('FUNCTIONS_WORKER_RUNTIME' => 'ruby')

it 'defaults to auto and returns false' do
expect(monitor.send(:streaming_enabled?)).to be false
end
end
end
end
end
64 changes: 64 additions & 0 deletions spec/mongo/uri_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1439,5 +1439,69 @@

include_examples "roundtrips string"
end

context 'when a serverMonitoringMode option is provided' do
context 'stream' do
let(:options) { 'serverMonitoringMode=stream' }

it 'sets the server monitoring mode in uri options' do
expect(uri.uri_options[:server_monitoring_mode]).to eq(:stream)
end

it 'sets the option on a client created with the uri' do
client = new_local_client_nmio(string)
expect(client.options[:server_monitoring_mode]).to eq(:stream)
end

include_examples "roundtrips string"
end

context 'poll' do
let(:options) { 'serverMonitoringMode=poll' }

it 'sets the server monitoring mode in uri options' do
expect(uri.uri_options[:server_monitoring_mode]).to eq(:poll)
end

it 'sets the option on a client created with the uri' do
client = new_local_client_nmio(string)
expect(client.options[:server_monitoring_mode]).to eq(:poll)
end

include_examples "roundtrips string"
end

context 'auto' do
let(:options) { 'serverMonitoringMode=auto' }

it 'sets the server monitoring mode in uri options' do
expect(uri.uri_options[:server_monitoring_mode]).to eq(:auto)
end

it 'sets the option on a client created with the uri' do
client = new_local_client_nmio(string)
expect(client.options[:server_monitoring_mode]).to eq(:auto)
end

include_examples "roundtrips string"
end

context 'case insensitive' do
let(:options) { 'serverMonitoringMode=Stream' }

it 'sets the server monitoring mode in uri options' do
expect(uri.uri_options[:server_monitoring_mode]).to eq(:stream)
end
end

context 'invalid value' do
let(:options) { 'serverMonitoringMode=invalid' }

it 'warns and does not set the option' do
expect(Mongo::Logger.logger).to receive(:warn).with(/invalid is not a valid server monitoring mode/)
expect(uri.uri_options[:server_monitoring_mode]).to be_nil
end
end
end
end
end
7 changes: 7 additions & 0 deletions spec/runners/unified/assertions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ def assert_events
actual_events.select! do |event|
event.class.name.sub(/.*::/, '') =~ /^(?:Pool|Connection)/
end
when 'sdam'
actual_events.select! do |event|
event.class.name.sub(/.*::/, '') =~ /^(?:Server|Topology)/
end
end

if (!ignore_extra_events && actual_events.length != expected_events.length) ||
Expand Down Expand Up @@ -217,6 +221,9 @@ def assert_event_matches(actual, expected)
if interrupt_in_use_connections = spec.use('interruptInUseConnections')
assert_matches(actual.options[:interrupt_in_use_connections], interrupt_in_use_connections, 'Command interrupt_in_use_connections does not match expectation')
end
unless (awaited = spec.use('awaited')).nil?
assert_eq(actual.awaited?, awaited, 'Event awaited does not match expectation')
end
unless spec.empty?
raise NotImplementedError, "Unhandled keys: #{spec}"
end
Expand Down
6 changes: 6 additions & 0 deletions spec/runners/unified/test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ def generate_entities(es)
end
kind = event.sub('Event', '').gsub(/([A-Z])/) { "_#{$1}" }.downcase.to_sym
subscriber.add_wanted_events(kind)
when 'serverHeartbeatStartedEvent', 'serverHeartbeatSucceededEvent', 'serverHeartbeatFailedEvent'
unless client.send(:monitoring).subscribers[Mongo::Monitoring::SERVER_HEARTBEAT]&.include?(subscriber)
client.subscribe(Mongo::Monitoring::SERVER_HEARTBEAT, subscriber)
end
kind = event.sub('Event', '').gsub(/([A-Z])/) { "_#{$1}" }.downcase.to_sym
subscriber.add_wanted_events(kind)
else
raise NotImplementedError, "Unknown event #{event}"
end
Expand Down
Loading