From e19066d75eb77eeee673864d5c63e7dba2067e97 Mon Sep 17 00:00:00 2001 From: nickpoindexter Date: Fri, 15 May 2026 14:39:48 -0500 Subject: [PATCH] add atlas stream processing commands --- examples/stream_processing.rb | 122 +++++++++++++ lib/mongo.rb | 1 + lib/mongo/stream_processing.rb | 31 ++++ lib/mongo/stream_processing/client.rb | 119 +++++++++++++ lib/mongo/stream_processing/processor.rb | 163 ++++++++++++++++++ lib/mongo/stream_processing/processor_info.rb | 142 +++++++++++++++ lib/mongo/stream_processing/processors.rb | 82 +++++++++ lib/mongo/stream_processing/samples_result.rb | 46 +++++ spec/integration/stream_processing_spec.rb | 81 +++++++++ spec/mongo/stream_processing/client_spec.rb | 97 +++++++++++ .../stream_processing/processor_info_spec.rb | 122 +++++++++++++ .../stream_processing/samples_result_spec.rb | 26 +++ 12 files changed, 1032 insertions(+) create mode 100644 examples/stream_processing.rb create mode 100644 lib/mongo/stream_processing.rb create mode 100644 lib/mongo/stream_processing/client.rb create mode 100644 lib/mongo/stream_processing/processor.rb create mode 100644 lib/mongo/stream_processing/processor_info.rb create mode 100644 lib/mongo/stream_processing/processors.rb create mode 100644 lib/mongo/stream_processing/samples_result.rb create mode 100644 spec/integration/stream_processing_spec.rb create mode 100644 spec/mongo/stream_processing/client_spec.rb create mode 100644 spec/mongo/stream_processing/processor_info_spec.rb create mode 100644 spec/mongo/stream_processing/samples_result_spec.rb diff --git a/examples/stream_processing.rb b/examples/stream_processing.rb new file mode 100644 index 0000000000..2730726276 --- /dev/null +++ b/examples/stream_processing.rb @@ -0,0 +1,122 @@ +# frozen_string_literal: true + +# Demonstrates the full lifecycle of an Atlas Stream Processing (ASP) stream +# processor using Mongo::StreamProcessing::Client. It creates, starts, samples, +# stops, and drops a processor. +# +# Requirements: +# - An Atlas Stream Processing workspace with a hostname matching the pattern +# atlas-stream--..a.query.mongodb.net +# (or .mongodb-.net for staging). +# - A user with the `atlasAdmin` role. +# - Two connections registered in the workspace: +# - `sample_stream_solar` (built-in sample source) +# - `__testLog` (built-in test sink) +# +# Run with: +# +# MONGODB_STREAM_PROCESSING_URI='mongodb://user:pass@atlas-stream-….a.query.mongodb.net/' \ +# bundle exec ruby examples/stream_processing.rb + +require 'mongo' + +uri = ENV['MONGODB_STREAM_PROCESSING_URI'] +if uri.nil? || uri.empty? + warn 'This example requires an Atlas Stream Processing workspace endpoint.' + warn 'Set MONGODB_STREAM_PROCESSING_URI to the workspace connection string.' + exit 1 +end + +unless Mongo::StreamProcessing::Client.workspace_uri?(uri) + warn 'MONGODB_STREAM_PROCESSING_URI does not look like a workspace endpoint.' + warn 'Expected: atlas-stream-*..a.query.mongodb.net (or .mongodb-stage.net for staging)' + exit 1 +end + +client = Mongo::StreamProcessing::Client.new(uri) +processors = client.stream_processors +name = "rubydriver_demo_#{BSON::ObjectId.new}" + +puts "Workspace: #{uri}" +puts "Processor: #{name}" +puts + +created = false +begin + pipeline = [ + { '$source' => { 'connectionName' => 'sample_stream_solar' } }, + { '$emit' => { 'connectionName' => '__testLog', 'topic' => 'ruby-driver-demo' } } + ] + + # 1. create + puts "[1/6] create(#{name})" + processors.create(name, pipeline) + created = true + info = processors.get_info(name) + puts " state=#{info.state}" + puts + + # 2. start + puts '[2/6] start()' + processor = processors.get(name) + processor.start + deadline = Time.now + 30 + state = processors.get_info(name).state + while state != 'STARTED' && Time.now < deadline + sleep 0.5 + state = processors.get_info(name).state + end + puts " state=#{state}" + puts + raise "processor did not reach STARTED within 30s (got #{state})" if state != 'STARTED' + + # 3. stats + puts '[3/6] stats()' + stats = processor.stats + puts " #{stats.inspect}" + puts + + # 4. samples + puts '[4/6] samples()' + opened = processor.samples(limit: 5) + puts " open cursor_id=#{opened.cursor_id} docs=#{opened.documents.size}" + + unless opened.exhausted? + sleep 2 # give the stream a moment to produce something + batch = processor.samples(cursor_id: opened.cursor_id, batch_size: 5) + puts " batch cursor_id=#{batch.cursor_id} docs=#{batch.documents.size}" + batch.documents.each_with_index do |doc, i| + puts " [#{i}] #{doc.inspect}" + end + end + puts + + # 5. stop + puts '[5/6] stop()' + processor.stop + puts " state=#{processors.get_info(name).state}" + puts + + # 6. drop + puts '[6/6] drop()' + processor.drop + puts ' dropped' + puts + + puts 'OK.' +rescue StandardError => e + warn '' + warn "FAILED: #{e.class}: #{e.message}" + warn e.backtrace.first(15).join("\n") if e.backtrace + if created + begin + processors.get(name).drop + warn "(cleaned up processor #{name})" + rescue StandardError + # best-effort cleanup + end + end + exit 1 +ensure + client.close +end diff --git a/lib/mongo.rb b/lib/mongo.rb index a5a43fb0d9..72a5d9bcfc 100644 --- a/lib/mongo.rb +++ b/lib/mongo.rb @@ -73,6 +73,7 @@ require 'mongo/session' require 'mongo/socket' require 'mongo/srv' +require 'mongo/stream_processing' require 'mongo/timeout' require 'mongo/tracing' require 'mongo/uri' diff --git a/lib/mongo/stream_processing.rb b/lib/mongo/stream_processing.rb new file mode 100644 index 0000000000..7d152764ba --- /dev/null +++ b/lib/mongo/stream_processing.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +# Copyright (C) 2026-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + # Atlas Stream Processing (ASP) workspace client and helpers. + # + # See {Mongo::StreamProcessing::Client} for the entry point. + # + # @since 2.25.0 + module StreamProcessing + end +end + +require 'mongo/stream_processing/processor_info' +require 'mongo/stream_processing/samples_result' +require 'mongo/stream_processing/processor' +require 'mongo/stream_processing/processors' +require 'mongo/stream_processing/client' diff --git a/lib/mongo/stream_processing/client.rb b/lib/mongo/stream_processing/client.rb new file mode 100644 index 0000000000..feebc02b3f --- /dev/null +++ b/lib/mongo/stream_processing/client.rb @@ -0,0 +1,119 @@ +# frozen_string_literal: true + +# Copyright (C) 2026-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + module StreamProcessing + # Client for an Atlas Stream Processing workspace. + # + # Distinct from {Mongo::Client} so that connection intent is explicit and + # ASP commands cannot be accidentally routed to a standard `mongod`. The + # underlying {Mongo::Client} is still available via {#client} for advanced + # uses such as `run_command` against admin. + # + # Workspace endpoints share the `mongodb://` URI scheme with standard + # MongoDB clusters but follow a distinct hostname pattern: + # + # mongodb://atlas-stream--..a.query.mongodb.net/ + # + # Atlas staging endpoints use `.a.query.mongodb-stage.net` instead; both + # are accepted. + # + # Per the ASP spec, TLS is required and `authSource` defaults to `admin`. + # + # @since 2.25.0 + class Client + # @return [ Mongo::Client ] The underlying client. + attr_reader :client + + # @return [ String ] The workspace URI as given to the constructor. + attr_reader :uri + + # @param uri [ String ] Workspace connection string. + # @param options [ Hash ] Additional client options (passed through to + # {Mongo::Client}). + def initialize(uri, options = {}) + unless self.class.workspace_uri?(uri) + raise ArgumentError, + 'StreamProcessing::Client requires a workspace endpoint URI ' \ + '(atlas-stream-*.a.query.mongodb.net or .mongodb-.net). ' \ + 'For standard MongoDB clusters, use Mongo::Client instead.' + end + + if uri.to_s.downcase.start_with?('mongodb+srv://') + raise ArgumentError, 'mongodb+srv:// is not supported for workspace endpoints; use mongodb://' + end + + options = options.dup + # TLS is required and MUST NOT be disabled. The Ruby driver uses the + # `:ssl` option (rather than `:tls`) to enable transport security. + if options.key?(:ssl) && options[:ssl] == false + raise ArgumentError, 'TLS cannot be disabled for an Atlas Stream Processing workspace connection' + end + + options[:ssl] = true unless options.key?(:ssl) + options[:auth_source] = 'admin' unless options.key?(:auth_source) + + @uri = uri + @client = Mongo::Client.new(uri, options) + end + + # Returns a handle for managing stream processors in this workspace. + # + # @return [ Processors ] + def stream_processors + Processors.new(@client) + end + + # Closes the underlying client. + def close + @client.close + end + + # Returns `true` when the supplied URI targets an Atlas Stream Processing + # workspace endpoint. + # + # Matches hostnames that begin with `atlas-stream-` and end with + # `.a.query.mongodb.net` (production) or `.a.query.mongodb-.net` + # (e.g. `mongodb-stage.net` for Atlas staging). + # + # @param uri [ String ] + # @return [ Boolean ] + def self.workspace_uri?(uri) + return false unless uri.is_a?(String) + + lower = uri.downcase + return false unless lower.start_with?('mongodb://') + + after_scheme = lower[10..] + # Strip userinfo (if @ appears before the first /, ?). + path_or_query = after_scheme.index(%r{[/?]}) || after_scheme.length + at_idx = after_scheme.rindex('@', path_or_query - 1) + host_section = at_idx ? after_scheme[(at_idx + 1)..] : after_scheme + + # Strip path/query/port. + end_idx = host_section.index(%r{[/?:]}) || host_section.length + host = host_section[0, end_idx] + + return false unless host.start_with?('atlas-stream-') + return true if host.end_with?('.a.query.mongodb.net') + + # Accept .a.query.mongodb-.net + m = host.match(/\.a\.query\.mongodb-([a-z0-9-]+)\.net\z/) + !m.nil? + end + end + end +end diff --git a/lib/mongo/stream_processing/processor.rb b/lib/mongo/stream_processing/processor.rb new file mode 100644 index 0000000000..31f80519a0 --- /dev/null +++ b/lib/mongo/stream_processing/processor.rb @@ -0,0 +1,163 @@ +# frozen_string_literal: true + +# Copyright (C) 2026-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + module StreamProcessing + # Handle for a specific named stream processor. + # + # Holding a handle does not imply the processor currently exists on the + # server. Obtained via {Processors#get}. + # + # @since 2.25.0 + class Processor + VALID_FAILOVER_MODES = %w[GRACEFUL FORCED].freeze + + # @return [ String ] The processor name. + attr_reader :name + + # @param client [ Mongo::Client ] Workspace-bound client. + # @param name [ String ] + def initialize(client, name) + raise ArgumentError, 'name must be non-empty' if name.nil? || name.empty? + + @client = client + @admin = Mongo::Database.new(client, 'admin') + @name = name + end + + # Starts the processor. The processor MUST be in STOPPED or FAILED; + # starting a STARTED processor returns an error from the server. + # + # @param opts [ Hash ] Options + # @option opts [ Integer ] :workers Number of workers. + # @option opts [ Boolean ] :clear_checkpoints Clear checkpoints before starting. + # @option opts [ BSON::Timestamp ] :start_at_operation_time Resume from + # a specific operation time. + # @option opts [ String ] :tier Compute tier. One of `"SP2"`, `"SP5"`, + # `"SP10"`, `"SP30"`, `"SP50"`. + # @option opts [ Boolean ] :enable_auto_scaling Enable auto-scaling. + # @option opts [ Hash ] :failover Failover configuration. Requires `:region`. + # Optional `:mode` ("GRACEFUL" / "FORCED") and `:dry_run`. + def start(**opts) + # NOTE: the spec's `startAfter` option is RESERVED for future use and is + # not yet accepted by the server; this driver MUST NOT send it. + cmd = { startStreamProcessor: @name } + cmd[:workers] = opts[:workers] if opts.key?(:workers) + + sub = {} + sub[:clearCheckpoints] = opts[:clear_checkpoints] if opts.key?(:clear_checkpoints) + sub[:startAtOperationTime] = opts[:start_at_operation_time] if opts.key?(:start_at_operation_time) + sub[:tier] = opts[:tier] if opts.key?(:tier) + sub[:enableAutoScaling] = opts[:enable_auto_scaling] if opts.key?(:enable_auto_scaling) + cmd[:options] = sub unless sub.empty? + + if opts.key?(:failover) + failover = opts[:failover] || {} + unless failover[:region].is_a?(String) && !failover[:region].empty? + raise ArgumentError, ':failover requires a :region string' + end + + if failover.key?(:mode) && !VALID_FAILOVER_MODES.include?(failover[:mode]) + raise ArgumentError, + "invalid :failover mode #{failover[:mode].inspect}; expected one of: " \ + "#{VALID_FAILOVER_MODES.join(', ')}" + end + + f = { region: failover[:region] } + f[:mode] = failover[:mode] if failover.key?(:mode) + f[:dryRun] = failover[:dry_run] if failover.key?(:dry_run) + cmd[:failover] = f + end + + run_command(cmd) + nil + end + + # Stops the processor. The processor remains in STOPPED and can be + # restarted. + def stop + run_command(stopStreamProcessor: @name) + nil + end + + # Drops the processor permanently. A dropped processor cannot be + # recovered. + def drop + run_command(dropStreamProcessor: @name) + nil + end + + # Returns runtime statistics for the processor. Returns an error from the + # server if the processor is not in the STARTED state. + # + # @param opts [ Hash ] + # @option opts [ Boolean ] :verbose Include per-operator statistics. + # @return [ Hash ] The full stats response document. + def stats(**opts) + cmd = { getStreamProcessorStats: @name } + cmd[:options] = { verbose: opts[:verbose] } if opts.key?(:verbose) + run_command(cmd).documents.first + end + + # Retrieves a batch of sampled documents. + # + # Routes to `startSampleStreamProcessor` when no `:cursor_id` is supplied + # (or it is 0); otherwise routes to `getMoreSampleStreamProcessor` with + # the supplied cursor id. The caller MUST stop iterating when the + # returned {SamplesResult#cursor_id} is 0. + # + # @param opts [ Hash ] + # @option opts [ Integer ] :cursor_id Cursor id from a prior call. Absent + # or 0 opens a new sample cursor. + # @option opts [ Integer ] :limit Maximum docs to sample. Only sent on + # the initial call. + # @option opts [ Integer ] :batch_size Documents per batch. Only sent on + # subsequent calls. + # @return [ SamplesResult ] + def samples(**opts) + cursor_id = opts[:cursor_id].to_i + + if cursor_id.zero? + cmd = { startSampleStreamProcessor: @name } + cmd[:limit] = opts[:limit] if opts.key?(:limit) + doc = run_command(cmd).documents.first || {} + new_cursor_id = (doc['cursorId'] || 0).to_i + if new_cursor_id.zero? + raise Error::OperationFailure, 'startSampleStreamProcessor did not return a cursorId' + end + + return SamplesResult.new(new_cursor_id, []) + end + + cmd = { getMoreSampleStreamProcessor: @name, cursorId: cursor_id } + cmd[:batchSize] = opts[:batch_size] if opts.key?(:batch_size) + doc = run_command(cmd).documents.first || {} + next_cursor_id = (doc['cursorId'] || 0).to_i + # Dev-server deviation: some server builds use "messages" instead of + # "nextBatch". Prefer the spec-defined "nextBatch" but fall back to + # "messages" if present. + batch = doc['nextBatch'] || doc['messages'] || [] + SamplesResult.new(next_cursor_id, batch) + end + + private + + def run_command(cmd) + @admin.command(cmd) + end + end + end +end diff --git a/lib/mongo/stream_processing/processor_info.rb b/lib/mongo/stream_processing/processor_info.rb new file mode 100644 index 0000000000..188dda92e5 --- /dev/null +++ b/lib/mongo/stream_processing/processor_info.rb @@ -0,0 +1,142 @@ +# frozen_string_literal: true + +# Copyright (C) 2026-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + module StreamProcessing + # Information about a single stream processor, returned by the + # `getStreamProcessor` command. + # + # Fields the spec marks as Optional may be absent depending on server + # version; the corresponding accessors return `nil` in that case. + # + # @since 2.25.0 + class ProcessorInfo + # @return [ Hash ] The raw response document. + attr_reader :raw + + # @param raw [ Hash ] The response document from `getStreamProcessor`. + def initialize(raw) + @raw = raw + end + + # Processor id. Optional: not returned by all server versions. + # @return [ String, nil ] + def id + @raw['id'] + end + + # @return [ String ] + def name + @raw['name'] + end + + # Current state. Per the ASP spec, drivers MUST surface unknown state + # values as-is rather than mapping to a closed set, so this is returned + # as a plain string. + # @return [ String ] + def state + @raw['state'] + end + + # @return [ Array ] + def pipeline + @raw['pipeline'] || [] + end + + # @return [ Integer, nil ] Optional: not returned by all server versions. + def pipeline_version + @raw['pipelineVersion'] + end + + # @return [ String, nil ] + def tier + @raw['tier'] + end + + # @return [ Hash, nil ] + def dlq + @raw['dlq'] + end + + # @return [ String, nil ] + def stream_meta_field_name + @raw['streamMetaFieldName'] + end + + # @return [ Boolean ] + def auto_scaling_enabled? + !!@raw['enableAutoScaling'] + end + + # @return [ Boolean ] + def failover_enabled? + !!@raw['failoverEnabled'] + end + + # @return [ String, nil ] + def active_region + @raw['activeRegion'] + end + + # @return [ String, nil ] + def workspace_default_region + @raw['workspaceDefaultRegion'] + end + + # @return [ BSON::Timestamp, Time, nil ] + def last_state_change + @raw['lastStateChange'] + end + + # @return [ BSON::Timestamp, Time, nil ] + def last_modified_at + @raw['lastModifiedAt'] + end + + # @return [ String, nil ] + def modified_by + @raw['modifiedBy'] + end + + # @return [ Boolean ] + def started? + !!@raw['hasStarted'] + end + + # Error message. Per spec this is always present; empty string indicates + # no error has occurred. + # @return [ String ] + def error_msg + @raw['errorMsg'] || '' + end + + # @return [ Boolean ] + def error_retryable? + !!@raw['errorRetryable'] + end + + # @return [ Integer, nil ] + def error_code + @raw['errorCode'] + end + + # @return [ Object ] Field's value from the underlying document, or nil. + def [](key) + @raw[key.to_s] + end + end + end +end diff --git a/lib/mongo/stream_processing/processors.rb b/lib/mongo/stream_processing/processors.rb new file mode 100644 index 0000000000..403b9fcc18 --- /dev/null +++ b/lib/mongo/stream_processing/processors.rb @@ -0,0 +1,82 @@ +# frozen_string_literal: true + +# Copyright (C) 2026-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + module StreamProcessing + # Handle for managing stream processors in a workspace. + # + # Obtained from {Mongo::StreamProcessing::Client#stream_processors}. + # + # @since 2.25.0 + class Processors + # @param client [ Mongo::Client ] Workspace-bound client. + def initialize(client) + @client = client + @admin = Mongo::Database.new(client, 'admin') + end + + # Creates a new stream processor. + # + # @param name [ String ] Stream processor name. + # @param pipeline [ Array ] Aggregation pipeline. + # @param opts [ Hash ] Options + # @option opts [ Hash ] :dlq Dead letter queue configuration. + # @option opts [ String ] :stream_meta_field_name Field name used for + # stream metadata. + # @option opts [ String ] :tier Compute tier. + # @option opts [ Boolean ] :failover Whether failover is enabled. + def create(name, pipeline, **opts) + raise ArgumentError, 'name must be non-empty' if name.nil? || name.empty? + + cmd = { createStreamProcessor: name, pipeline: pipeline } + sub = {} + sub[:dlq] = opts[:dlq] if opts.key?(:dlq) + sub[:streamMetaFieldName] = opts[:stream_meta_field_name] if opts.key?(:stream_meta_field_name) + sub[:tier] = opts[:tier] if opts.key?(:tier) + sub[:failover] = opts[:failover] if opts.key?(:failover) + cmd[:options] = sub unless sub.empty? + + @admin.command(cmd) + nil + end + + # Returns a handle for the named processor. Does not imply that the + # processor currently exists on the server. + # + # @param name [ String ] + # @return [ Processor ] + def get(name) + Processor.new(@client, name) + end + + # Returns information about a single stream processor. + # + # Sends the `getStreamProcessor` command. Dev-server deviation: some + # server builds wrap the processor document in a top-level `result` key. + # This is unwrapped transparently. + # + # @param name [ String ] + # @return [ ProcessorInfo ] + def get_info(name) + raise ArgumentError, 'name must be non-empty' if name.nil? || name.empty? + + doc = @admin.command(getStreamProcessor: name).documents.first || {} + doc = doc['result'] if doc.is_a?(Hash) && doc['result'].is_a?(Hash) + ProcessorInfo.new(doc) + end + end + end +end diff --git a/lib/mongo/stream_processing/samples_result.rb b/lib/mongo/stream_processing/samples_result.rb new file mode 100644 index 0000000000..25777205e8 --- /dev/null +++ b/lib/mongo/stream_processing/samples_result.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +# Copyright (C) 2026-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + module StreamProcessing + # The result of a call to {Processor#samples}. + # + # Callers MUST stop iterating when {#cursor_id} is 0 — the cursor is + # exhausted and no further calls should be made. + # + # @since 2.25.0 + class SamplesResult + # @return [ Integer ] The cursor id to pass to the next call. + # A value of 0 means the cursor is exhausted. + attr_reader :cursor_id + + # @return [ Array ] The batch of sampled documents. + attr_reader :documents + + # @param cursor_id [ Integer ] + # @param documents [ Array ] + def initialize(cursor_id, documents) + @cursor_id = cursor_id + @documents = documents || [] + end + + # @return [ Boolean ] Whether the cursor is exhausted (cursor_id == 0). + def exhausted? + @cursor_id.zero? + end + end + end +end diff --git a/spec/integration/stream_processing_spec.rb b/spec/integration/stream_processing_spec.rb new file mode 100644 index 0000000000..7031b70e14 --- /dev/null +++ b/spec/integration/stream_processing_spec.rb @@ -0,0 +1,81 @@ +# frozen_string_literal: true + +require 'lite_spec_helper' + +# Functional smoke test for Atlas Stream Processing. +# +# Skipped unless the `MONGODB_STREAM_PROCESSING_URI` env var is set to a +# workspace endpoint (atlas-stream-*..a.query.mongodb{,-stage}.net) +# with valid credentials. +# +# Exercises the full lifecycle: create -> start -> stats -> sample -> stop -> drop. +describe 'Atlas Stream Processing lifecycle' do + before do + uri = ENV['MONGODB_STREAM_PROCESSING_URI'] + skip 'MONGODB_STREAM_PROCESSING_URI is not configured' if uri.nil? || uri.empty? + + unless Mongo::StreamProcessing::Client.workspace_uri?(uri) + skip "MONGODB_STREAM_PROCESSING_URI=#{uri.inspect} is not a workspace endpoint" + end + end + + let(:uri) { ENV['MONGODB_STREAM_PROCESSING_URI'] } + let(:client) { Mongo::StreamProcessing::Client.new(uri) } + let(:processors) { client.stream_processors } + let(:name) { "rubydriver_test_#{BSON::ObjectId.new}" } + + after do + # Best-effort cleanup. The drop will fail silently if the processor was + # never created or has already been dropped, which is the safe behavior. + begin + processors.get(name).drop + rescue Mongo::Error + # ignored + end + client.close + end + + it 'runs the full lifecycle' do + pipeline = [ + { '$source' => { 'connectionName' => 'sample_stream_solar' } }, + { '$emit' => { 'connectionName' => '__testLog', 'topic' => 'ruby-driver-demo' } } + ] + + # create + processors.create(name, pipeline) + info = processors.get_info(name) + expect(info.name).to eq(name) + expect(info.state).to eq('CREATED').or(eq('VALIDATING')).or(eq('CREATING')) + + # start + processor = processors.get(name) + processor.start + + # Wait for STARTED + deadline = Time.now + 30 + while Time.now < deadline + state = processors.get_info(name).state + break if state == 'STARTED' + + sleep 0.5 + end + expect(processors.get_info(name).state).to eq('STARTED') + + # stats + stats = processor.stats + expect(stats).to be_a(Hash) + + # sample: open + fetch one batch + opened = processor.samples(limit: 5) + expect(opened.cursor_id).to be > 0 + expect(opened.documents).to eq([]) + + batch = processor.samples(cursor_id: opened.cursor_id, batch_size: 5) + # cursor_id may be 0 if the stream has nothing yet — both are valid. + expect(batch.cursor_id).to be >= 0 + + # stop + drop + processor.stop + processor.drop + end +end diff --git a/spec/mongo/stream_processing/client_spec.rb b/spec/mongo/stream_processing/client_spec.rb new file mode 100644 index 0000000000..7f0c611035 --- /dev/null +++ b/spec/mongo/stream_processing/client_spec.rb @@ -0,0 +1,97 @@ +# frozen_string_literal: true + +require 'lite_spec_helper' + +describe Mongo::StreamProcessing::Client do + describe '.workspace_uri?' do + context 'with a production workspace URI' do + it 'returns true' do + uri = 'mongodb://atlas-stream-699c842ef433fe6001480b17-etif1.virginia-usa.a.query.mongodb.net/' + expect(described_class.workspace_uri?(uri)).to be true + end + end + + context 'with a workspace URI that includes credentials and port' do + it 'returns true' do + uri = 'mongodb://user:pass@atlas-stream-xyz.us-east-1.a.query.mongodb.net:27017/?retryWrites=true' + expect(described_class.workspace_uri?(uri)).to be true + end + end + + context 'with a staging workspace URI (mongodb-stage.net)' do + it 'returns true' do + uri = 'mongodb://user:pass@atlas-stream-699c842ef433fe6001480b17-etif1.virginia-usa.a.query.mongodb-stage.net' + expect(described_class.workspace_uri?(uri)).to be true + end + end + + context 'with a URI using uppercase scheme' do + it 'returns true' do + uri = 'MONGODB://atlas-stream-xyz.us-east-1.a.query.mongodb.net/' + expect(described_class.workspace_uri?(uri)).to be true + end + end + + context 'with a standard cluster URI' do + it 'returns false' do + expect(described_class.workspace_uri?('mongodb://localhost:27017/')).to be false + end + end + + context 'with an SRV URI' do + it 'returns false' do + expect(described_class.workspace_uri?('mongodb+srv://cluster0.example.mongodb.net/')).to be false + end + end + + context 'with a URI missing the atlas-stream- prefix' do + it 'returns false' do + expect(described_class.workspace_uri?('mongodb://abc.virginia-usa.a.query.mongodb.net/')).to be false + end + end + + context 'with a hostname that contains atlas-stream- but the wrong TLD' do + it 'returns false' do + expect(described_class.workspace_uri?('mongodb://atlas-stream-x.example.com/')).to be false + end + end + + context 'with a non-string argument' do + it 'returns false' do + expect(described_class.workspace_uri?(nil)).to be false + expect(described_class.workspace_uri?(123)).to be false + end + end + end + + describe '#initialize' do + let(:workspace_uri) { 'mongodb://atlas-stream-x.us-east-1.a.query.mongodb.net/' } + + context 'with a non-workspace URI' do + it 'raises ArgumentError' do + expect do + described_class.new('mongodb://localhost:27017/') + end.to raise_error(ArgumentError, /workspace endpoint URI/) + end + end + + context 'with an SRV URI' do + it 'raises ArgumentError' do + # NOTE: `workspace_uri?` already rejects the SRV scheme because the + # scheme check is `mongodb://` only; this still surfaces with the + # workspace-URI error message. + expect do + described_class.new('mongodb+srv://atlas-stream-x.us-east-1.a.query.mongodb.net/') + end.to raise_error(ArgumentError, /workspace endpoint URI/) + end + end + + context 'with ssl: false' do + it 'raises ArgumentError' do + expect do + described_class.new(workspace_uri, ssl: false) + end.to raise_error(ArgumentError, /TLS cannot be disabled/) + end + end + end +end diff --git a/spec/mongo/stream_processing/processor_info_spec.rb b/spec/mongo/stream_processing/processor_info_spec.rb new file mode 100644 index 0000000000..c3a084ec51 --- /dev/null +++ b/spec/mongo/stream_processing/processor_info_spec.rb @@ -0,0 +1,122 @@ +# frozen_string_literal: true + +require 'lite_spec_helper' + +describe Mongo::StreamProcessing::ProcessorInfo do + describe 'getters exposed on the populated fixture' do + let(:info) do + described_class.new( + 'id' => 'proc-1', + 'name' => 'smokeTestProcessor', + 'state' => 'CREATED', + 'pipeline' => [ { '$source' => { 'connectionName' => 'sample_stream_solar' } } ], + 'pipelineVersion' => 2, + 'tier' => 'SP2', + 'streamMetaFieldName' => '_stream_meta', + 'enableAutoScaling' => true, + 'failoverEnabled' => false, + 'activeRegion' => 'us-east-1', + 'workspaceDefaultRegion' => 'us-east-1', + 'modifiedBy' => 'user-1', + 'hasStarted' => false, + 'errorMsg' => '', + 'errorRetryable' => false + ) + end + + it 'exposes scalar fields' do + expect(info.id).to eq('proc-1') + expect(info.name).to eq('smokeTestProcessor') + expect(info.state).to eq('CREATED') + expect(info.pipeline_version).to eq(2) + expect(info.tier).to eq('SP2') + expect(info.stream_meta_field_name).to eq('_stream_meta') + expect(info.active_region).to eq('us-east-1') + expect(info.workspace_default_region).to eq('us-east-1') + expect(info.modified_by).to eq('user-1') + end + + it 'returns boolean predicates' do + expect(info.auto_scaling_enabled?).to be true + expect(info.failover_enabled?).to be false + expect(info.started?).to be false + expect(info.error_retryable?).to be false + end + + it 'returns the pipeline as an array' do + expect(info.pipeline).to eq([ { '$source' => { 'connectionName' => 'sample_stream_solar' } } ]) + end + + it 'returns the empty error message' do + expect(info.error_msg).to eq('') + end + end + + describe 'getters when optional fields are missing' do + let(:info) do + described_class.new('name' => 'p', 'state' => 'CREATED') + end + + it 'returns nil or sensible defaults' do + expect(info.id).to be_nil + expect(info.pipeline_version).to be_nil + expect(info.tier).to be_nil + expect(info.dlq).to be_nil + expect(info.stream_meta_field_name).to be_nil + expect(info.active_region).to be_nil + expect(info.workspace_default_region).to be_nil + expect(info.last_state_change).to be_nil + expect(info.last_modified_at).to be_nil + expect(info.modified_by).to be_nil + expect(info.error_code).to be_nil + end + + it 'returns false for boolean predicates' do + expect(info.auto_scaling_enabled?).to be false + expect(info.failover_enabled?).to be false + expect(info.started?).to be false + expect(info.error_retryable?).to be false + end + + it 'returns empty string for error_msg' do + expect(info.error_msg).to eq('') + end + + it 'returns empty array for pipeline' do + expect(info.pipeline).to eq([]) + end + end + + describe 'getters when error fields are set' do + let(:info) do + described_class.new( + 'name' => 'p', + 'state' => 'FAILED', + 'errorMsg' => 'something went wrong', + 'errorRetryable' => true, + 'errorCode' => 125 + ) + end + + it 'exposes the error message' do + expect(info.error_msg).to eq('something went wrong') + end + + it 'exposes retryability' do + expect(info.error_retryable?).to be true + end + + it 'exposes the error code' do + expect(info.error_code).to eq(125) + end + end + + describe '#[]' do + it 'reads the raw document' do + info = described_class.new('name' => 'p', 'state' => 'CREATED') + expect(info[:name]).to eq('p') + expect(info['state']).to eq('CREATED') + expect(info[:nope]).to be_nil + end + end +end diff --git a/spec/mongo/stream_processing/samples_result_spec.rb b/spec/mongo/stream_processing/samples_result_spec.rb new file mode 100644 index 0000000000..f32e0c391d --- /dev/null +++ b/spec/mongo/stream_processing/samples_result_spec.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +require 'lite_spec_helper' + +describe Mongo::StreamProcessing::SamplesResult do + describe '#exhausted?' do + it 'returns true when cursor_id is zero' do + expect(described_class.new(0, []).exhausted?).to be true + end + + it 'returns false when cursor_id is non-zero' do + expect(described_class.new(42, []).exhausted?).to be false + end + end + + describe '#documents' do + it 'returns the documents array' do + docs = [ { 'x' => 1 }, { 'x' => 2 } ] + expect(described_class.new(7, docs).documents).to eq(docs) + end + + it 'defaults to an empty array when given nil' do + expect(described_class.new(7, nil).documents).to eq([]) + end + end +end