Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions examples/stream_processing.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# frozen_string_literal: true

# Demonstrates the full lifecycle of an Atlas Stream Processing (ASP) stream
# processor using Mongo::StreamProcessing::Client. It creates, starts, samples,
# stops, and drops a processor.
#
# Requirements:
# - An Atlas Stream Processing workspace with a hostname matching the pattern
# atlas-stream-<workspaceId>-<suffix>.<region>.a.query.mongodb.net
# (or .mongodb-<env>.net for staging).
# - A user with the `atlasAdmin` role.
# - Two connections registered in the workspace:
# - `sample_stream_solar` (built-in sample source)
# - `__testLog` (built-in test sink)
#
# Run with:
#
# MONGODB_STREAM_PROCESSING_URI='mongodb://user:pass@atlas-stream-….a.query.mongodb.net/' \
# bundle exec ruby examples/stream_processing.rb

require 'mongo'

uri = ENV['MONGODB_STREAM_PROCESSING_URI']
if uri.nil? || uri.empty?
warn 'This example requires an Atlas Stream Processing workspace endpoint.'
warn 'Set MONGODB_STREAM_PROCESSING_URI to the workspace connection string.'
exit 1
end

unless Mongo::StreamProcessing::Client.workspace_uri?(uri)
warn 'MONGODB_STREAM_PROCESSING_URI does not look like a workspace endpoint.'
warn 'Expected: atlas-stream-*.<region>.a.query.mongodb.net (or .mongodb-stage.net for staging)'
exit 1
end

client = Mongo::StreamProcessing::Client.new(uri)
processors = client.stream_processors
name = "rubydriver_demo_#{BSON::ObjectId.new}"

puts "Workspace: #{uri}"
puts "Processor: #{name}"
puts

created = false
begin
pipeline = [
{ '$source' => { 'connectionName' => 'sample_stream_solar' } },
{ '$emit' => { 'connectionName' => '__testLog', 'topic' => 'ruby-driver-demo' } }
]

# 1. create
puts "[1/6] create(#{name})"
processors.create(name, pipeline)
created = true
info = processors.get_info(name)
puts " state=#{info.state}"
puts

# 2. start
puts '[2/6] start()'
processor = processors.get(name)
processor.start
deadline = Time.now + 30
state = processors.get_info(name).state
while state != 'STARTED' && Time.now < deadline
sleep 0.5
state = processors.get_info(name).state
end
puts " state=#{state}"
puts
raise "processor did not reach STARTED within 30s (got #{state})" if state != 'STARTED'

# 3. stats
puts '[3/6] stats()'
stats = processor.stats
puts " #{stats.inspect}"
puts

# 4. samples
puts '[4/6] samples()'
opened = processor.samples(limit: 5)
puts " open cursor_id=#{opened.cursor_id} docs=#{opened.documents.size}"

unless opened.exhausted?
sleep 2 # give the stream a moment to produce something
batch = processor.samples(cursor_id: opened.cursor_id, batch_size: 5)
puts " batch cursor_id=#{batch.cursor_id} docs=#{batch.documents.size}"
batch.documents.each_with_index do |doc, i|
puts " [#{i}] #{doc.inspect}"
end
end
puts

# 5. stop
puts '[5/6] stop()'
processor.stop
puts " state=#{processors.get_info(name).state}"
puts

# 6. drop
puts '[6/6] drop()'
processor.drop
puts ' dropped'
puts

puts 'OK.'
rescue StandardError => e
warn ''
warn "FAILED: #{e.class}: #{e.message}"
warn e.backtrace.first(15).join("\n") if e.backtrace
if created
begin
processors.get(name).drop
warn "(cleaned up processor #{name})"
rescue StandardError
# best-effort cleanup
end
end
exit 1
ensure
client.close
end
1 change: 1 addition & 0 deletions lib/mongo.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
require 'mongo/session'
require 'mongo/socket'
require 'mongo/srv'
require 'mongo/stream_processing'
require 'mongo/timeout'
require 'mongo/tracing'
require 'mongo/uri'
Expand Down
31 changes: 31 additions & 0 deletions lib/mongo/stream_processing.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# frozen_string_literal: true

# Copyright (C) 2026-present MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
# Atlas Stream Processing (ASP) workspace client and helpers.
#
# See {Mongo::StreamProcessing::Client} for the entry point.
#
# @since 2.25.0
module StreamProcessing
end
end

require 'mongo/stream_processing/processor_info'
require 'mongo/stream_processing/samples_result'
require 'mongo/stream_processing/processor'
require 'mongo/stream_processing/processors'
require 'mongo/stream_processing/client'
119 changes: 119 additions & 0 deletions lib/mongo/stream_processing/client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# frozen_string_literal: true

# Copyright (C) 2026-present MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
module StreamProcessing
# Client for an Atlas Stream Processing workspace.
#
# Distinct from {Mongo::Client} so that connection intent is explicit and
# ASP commands cannot be accidentally routed to a standard `mongod`. The
# underlying {Mongo::Client} is still available via {#client} for advanced
# uses such as `run_command` against admin.
#
# Workspace endpoints share the `mongodb://` URI scheme with standard
# MongoDB clusters but follow a distinct hostname pattern:
#
# mongodb://atlas-stream-<workspaceId>-<suffix>.<region>.a.query.mongodb.net/
#
# Atlas staging endpoints use `.a.query.mongodb-stage.net` instead; both
# are accepted.
#
# Per the ASP spec, TLS is required and `authSource` defaults to `admin`.
#
# @since 2.25.0
class Client
# @return [ Mongo::Client ] The underlying client.
attr_reader :client

# @return [ String ] The workspace URI as given to the constructor.
attr_reader :uri

# @param uri [ String ] Workspace connection string.
# @param options [ Hash ] Additional client options (passed through to
# {Mongo::Client}).
def initialize(uri, options = {})
unless self.class.workspace_uri?(uri)
raise ArgumentError,
'StreamProcessing::Client requires a workspace endpoint URI ' \
'(atlas-stream-*.a.query.mongodb.net or .mongodb-<env>.net). ' \
'For standard MongoDB clusters, use Mongo::Client instead.'
end

if uri.to_s.downcase.start_with?('mongodb+srv://')
raise ArgumentError, 'mongodb+srv:// is not supported for workspace endpoints; use mongodb://'
end

options = options.dup
# TLS is required and MUST NOT be disabled. The Ruby driver uses the
# `:ssl` option (rather than `:tls`) to enable transport security.
if options.key?(:ssl) && options[:ssl] == false
raise ArgumentError, 'TLS cannot be disabled for an Atlas Stream Processing workspace connection'
end

options[:ssl] = true unless options.key?(:ssl)
options[:auth_source] = 'admin' unless options.key?(:auth_source)

@uri = uri
@client = Mongo::Client.new(uri, options)
end

# Returns a handle for managing stream processors in this workspace.
#
# @return [ Processors ]
def stream_processors
Processors.new(@client)
end

# Closes the underlying client.
def close
@client.close
end

# Returns `true` when the supplied URI targets an Atlas Stream Processing
# workspace endpoint.
#
# Matches hostnames that begin with `atlas-stream-` and end with
# `.a.query.mongodb.net` (production) or `.a.query.mongodb-<env>.net`
# (e.g. `mongodb-stage.net` for Atlas staging).
#
# @param uri [ String ]
# @return [ Boolean ]
def self.workspace_uri?(uri)
return false unless uri.is_a?(String)

lower = uri.downcase
return false unless lower.start_with?('mongodb://')

after_scheme = lower[10..]
# Strip userinfo (if @ appears before the first /, ?).
path_or_query = after_scheme.index(%r{[/?]}) || after_scheme.length
at_idx = after_scheme.rindex('@', path_or_query - 1)
host_section = at_idx ? after_scheme[(at_idx + 1)..] : after_scheme

# Strip path/query/port.
end_idx = host_section.index(%r{[/?:]}) || host_section.length
host = host_section[0, end_idx]

return false unless host.start_with?('atlas-stream-')
return true if host.end_with?('.a.query.mongodb.net')

# Accept .a.query.mongodb-<env>.net
m = host.match(/\.a\.query\.mongodb-([a-z0-9-]+)\.net\z/)
!m.nil?
end
end
end
end
Loading
Loading