diff --git a/temporalio/ext/src/worker.rs b/temporalio/ext/src/worker.rs index 091824ea..8eff7dc3 100644 --- a/temporalio/ext/src/worker.rs +++ b/temporalio/ext/src/worker.rs @@ -30,6 +30,7 @@ use temporalio_common::protos::coresdk::{ NexusSlotInfo, WorkflowSlotInfo, }; use temporalio_common::protos::temporal::api::history::v1::History; +use temporalio_common::protos::temporal::api::worker::v1::PluginInfo; use temporalio_common::{ errors::{PollError, WorkflowErrorType}, worker::{ @@ -555,6 +556,16 @@ fn build_config(options: Struct, runtime_handle: &RuntimeHandle) -> Result>>(), ) + .plugins( + options + .member::>(id!("plugins"))? + .into_iter() + .map(|name| PluginInfo { + name, + version: String::new(), + }) + .collect::>(), + ) .build() .map_err(|err| error!("Invalid worker options: {}", err)) } diff --git a/temporalio/lib/temporalio/client.rb b/temporalio/lib/temporalio/client.rb index 54694233..9f9a2f14 100644 --- a/temporalio/lib/temporalio/client.rb +++ b/temporalio/lib/temporalio/client.rb @@ -6,6 +6,7 @@ require 'temporalio/client/async_activity_handle' require 'temporalio/client/connection' require 'temporalio/client/interceptor' +require 'temporalio/client/plugin' require 'temporalio/client/schedule' require 'temporalio/client/schedule_handle' require 'temporalio/client/with_start_workflow_operation' @@ -42,6 +43,7 @@ class Client :connection, :namespace, :data_converter, + :plugins, :interceptors, :logger, :default_workflow_query_reject_condition @@ -70,6 +72,9 @@ class ListWorkflowPage; end # rubocop:disable Lint/EmptyClass # @param tls [Boolean, Connection::TLSOptions] If false, do not use TLS. If true, use system default TLS options. If # TLS options are present, those TLS options will be used. # @param data_converter [Converters::DataConverter] Data converter to use for all data conversions to/from payloads. + # @param plugins [Array] Plugins to use for configuring clients and intercepting connection. Any plugins + # that also include {Worker::Plugin} will automatically be applied to the worker and should not be configured + # explicitly on the worker. WARNING: Plugins are experimental. # @param interceptors [Array] Set of interceptors that are chained together to allow intercepting of # client calls. The earlier interceptors wrap the later ones. Any interceptors that also implement worker # interceptor will be used as worker interceptors too so they should not be given separately when creating a @@ -101,6 +106,7 @@ def self.connect( api_key: nil, tls: nil, data_converter: Converters::DataConverter.default, + plugins: [], interceptors: [], logger: Logger.new($stdout, level: Logger::WARN), default_workflow_query_reject_condition: nil, @@ -112,27 +118,78 @@ def self.connect( runtime: Runtime.default, lazy_connect: false ) + # Prepare connection. The connection var is needed here so it can be used in callback for plugin. + base_connection = nil + final_connection = nil + around_connect = if plugins.any? + _validate_plugins!(plugins) + # For plugins, we have to do an around_connect approach with Connection where we provide a + # no-return-value proc that is invoked with the built options and yields newly built options. + # The connection will have been created before, but we allow plugins to return a + # different/extended connection, possibly avoiding actual connection altogether. + proc do |options, &block| + # Steep simply can't comprehend these advanced inline procs + # steep:ignore:start + + # Root next call + next_call_called = false + next_call = proc do |options| + raise 'next_call called more than once' if next_call_called + + next_call_called = true + block&.call(options) + base_connection + end + # Go backwards, building up new next_call invocations on plugins + next_call = plugins.reverse_each.reduce(next_call) do |next_call, plugin| + proc { |options| plugin.connect_client(options, next_call) } + end + # Do call + final_connection = next_call.call(options) + + # steep:ignore:end + end + end + # Now create connection + base_connection = Connection.new( + target_host:, + api_key:, + tls:, + rpc_metadata:, + rpc_retry:, + identity:, + keep_alive:, + http_connect_proxy:, + runtime:, + lazy_connect:, + around_connect: # steep:ignore + ) + + # Create client Client.new( - connection: Connection.new( - target_host:, - api_key:, - tls:, - rpc_metadata:, - rpc_retry:, - identity:, - keep_alive:, - http_connect_proxy:, - runtime:, - lazy_connect: - ), + connection: final_connection || base_connection, namespace:, data_converter:, + plugins:, interceptors:, logger:, default_workflow_query_reject_condition: ) end + # @!visibility private + def self._validate_plugins!(plugins) + plugins.each do |plugin| + raise ArgumentError, "#{plugin.class} does not implement Client::Plugin" unless plugin.is_a?(Plugin) + + # Validate plugin has implemented expected methods + missing = Plugin.instance_methods(false).select { |m| plugin.method(m).owner == Plugin } + unless missing.empty? + raise ArgumentError, "#{plugin.class} missing the following client plugin method(s): #{missing.join(', ')}" + end + end + end + # @return [Options] Frozen options for this client which has the same attributes as {initialize}. attr_reader :options @@ -143,6 +200,9 @@ def self.connect( # @param connection [Connection] Existing connection to create a client from. # @param namespace [String] Namespace to use for client calls. # @param data_converter [Converters::DataConverter] Data converter to use for all data conversions to/from payloads. + # @param plugins [Array] Plugins to use for configuring clients. Any plugins that also include + # {Worker::Plugin} will automatically be applied to the worker and should not be configured explicitly on the + # worker. WARNING: Plugins are experimental. # @param interceptors [Array] Set of interceptors that are chained together to allow intercepting of # client calls. The earlier interceptors wrap the later ones. Any interceptors that also implement worker # interceptor will be used as worker interceptors too so they should not be given separately when creating a @@ -157,6 +217,7 @@ def initialize( connection:, namespace:, data_converter: DataConverter.default, + plugins: [], interceptors: [], logger: Logger.new($stdout, level: Logger::WARN), default_workflow_query_reject_condition: nil @@ -165,12 +226,20 @@ def initialize( connection:, namespace:, data_converter:, + plugins:, interceptors:, logger:, default_workflow_query_reject_condition: ).freeze + + # Apply plugins + Client._validate_plugins!(plugins) + @options = plugins.reduce(@options) { |options, plugin| plugin.configure_client(options) } + # Initialize interceptors - @impl = interceptors.reverse_each.reduce(Internal::Client::Implementation.new(self)) do |acc, int| # steep:ignore + @impl = @options.interceptors.reverse_each.reduce( + Internal::Client::Implementation.new(self) + ) do |acc, int| # steep:ignore int.intercept_client(acc) end end diff --git a/temporalio/lib/temporalio/client/connection.rb b/temporalio/lib/temporalio/client/connection.rb index 21bb8e59..5fc550a8 100644 --- a/temporalio/lib/temporalio/client/connection.rb +++ b/temporalio/lib/temporalio/client/connection.rb @@ -169,6 +169,9 @@ class HTTPConnectProxyOptions; end # rubocop:disable Lint/EmptyClass # @param lazy_connect [Boolean] If true, there is no connection until the first call is attempted or a worker # is created with it. Clients from lazy connections cannot be used for workers if they have not performed a # connection. + # @param around_connect [Proc, nil] If present, this proc accepts two values: options and a block. The block is a + # must be yielded to only once with the options. The block does not return a meaningful value, nor should + # around_connect. # # @see Client.connect def initialize( @@ -181,7 +184,8 @@ def initialize( keep_alive: KeepAliveOptions.new, http_connect_proxy: nil, runtime: Runtime.default, - lazy_connect: false + lazy_connect: false, + around_connect: nil ) @options = Options.new( target_host:, @@ -195,9 +199,19 @@ def initialize( runtime:, lazy_connect: ).freeze - # Create core client now if not lazy @core_client_mutex = Mutex.new - _core_client unless lazy_connect + # Create core client now if not lazy, applying around_connect if present + if around_connect + # Technically around_connect can never run the block for whatever reason (i.e. plugin returning a mock + # connection), so we don't enforce it + around_connect.call(@options) do |options| + @options = options + _core_client unless lazy_connect + nil + end + else + _core_client unless lazy_connect + end # Create service instances @workflow_service = WorkflowService.new(self) @operator_service = OperatorService.new(self) diff --git a/temporalio/lib/temporalio/client/plugin.rb b/temporalio/lib/temporalio/client/plugin.rb new file mode 100644 index 00000000..c35484c3 --- /dev/null +++ b/temporalio/lib/temporalio/client/plugin.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +module Temporalio + class Client + # Plugin mixin to include for configuring clients and/or intercepting connect calls. + # + # This is a low-level implementation that requires abstract methods herein to be implemented. Many implementers may + # prefer {SimplePlugin} which includes this. + # + # WARNING: Plugins are experimental. + module Plugin + # @abstract + # @return [String] Name of the plugin. + def name + raise NotImplementedError + end + + # Configure a client. + # + # @abstract + # @param options [Options] Current immutable options set. + # @return [Options] Options to use, possibly updated from original. + def configure_client(options) + raise NotImplementedError + end + + # Connect a client. + # + # Implementers are expected to delegate to next_call to perform the connection. Note, this does not apply to users + # explicitly creating connections via {Connection} constructor. + # + # @abstract + # @param options [Connection::Options] Current immutable options set. + # @param next_call [Proc] Proc for the next plugin in the chain to call. It accepts the options and returns a + # {Connection}. + # @return [Connection] Connected connection. + def connect_client(options, next_call) + raise NotImplementedError + end + end + end +end diff --git a/temporalio/lib/temporalio/internal/bridge/worker.rb b/temporalio/lib/temporalio/internal/bridge/worker.rb index a23a4e6f..d542f9e1 100644 --- a/temporalio/lib/temporalio/internal/bridge/worker.rb +++ b/temporalio/lib/temporalio/internal/bridge/worker.rb @@ -28,6 +28,7 @@ class Worker :nondeterminism_as_workflow_fail, :nondeterminism_as_workflow_fail_for_types, :deployment_options, + :plugins, keyword_init: true ) diff --git a/temporalio/lib/temporalio/simple_plugin.rb b/temporalio/lib/temporalio/simple_plugin.rb new file mode 100644 index 00000000..4164c250 --- /dev/null +++ b/temporalio/lib/temporalio/simple_plugin.rb @@ -0,0 +1,192 @@ +# frozen_string_literal: true + +require 'temporalio/client' +require 'temporalio/worker' + +module Temporalio + # Plugin that implements both {Client::Plugin} and {Worker::Plugin} and provides a simplified common set of settings + # for configuring both. + # + # WARNING: Plugins are experimental. + class SimplePlugin + include Client::Plugin + include Worker::Plugin + + Options = Data.define( + :name, + :data_converter, + :client_interceptors, + :activities, + :workflows, + :worker_interceptors, + :workflow_failure_exception_types, + :run_context + ) + + # Options as returned from {options} representing the options passed to the constructor. + class Options; end # rubocop:disable Lint/EmptyClass + + # @return [Options] Frozen options for this plugin which has the same attributes as {initialize}. + attr_reader :options + + # Create a simple plugin. + # + # @param name [String] Required string name for this plugin. + # @param data_converter [Converters::DataConverter, Proc, nil] Data converter to apply to clients and workflow + # replayers. This can be a proc that accepts the existing data converter and returns a new one. + # @param client_interceptors [Array, Proc, nil] Client interceptors that are appended to the + # existing client set (which means if they implement worker interceptors they are applied for the workers too). A + # proc can be provided that accepts the existing array and returns a new one. + # @param activities [Array, Activity::Definition::Info>, Proc, + # nil] Activities to append to each worker activity set. A proc can be provided that accepts the existing array + # and returns a new one. + # @param workflows [Array>, Proc, nil] Workflows to append to each worker workflow set. + # A proc can be provided that accepts the existing array and returns a new one. + # @param worker_interceptors [Array, Proc, nil] Worker interceptors + # that are appended to the existing worker or workflow replayer set. A proc can be provided that accepts the + # existing array and returns a new one. + # @param workflow_failure_exception_types [Array>] Workflow failure exception types that are + # appended to the existing worker or workflow replayer set. A proc can be provided that accepts the existing array + # and returns a new one. + # @param run_context [Proc, nil] A proc that intercepts both {run_worker} or {with_workflow_replay_worker}. The proc + # should accept two positional parameters: options and next_call. The options are either + # {Worker::Plugin::RunWorkerOptions} or {Worker::Plugin::WithWorkflowReplayWorkerOptions}. The next_call is a proc + # itself that accepts the options and returns a value. This run_context proc should return the result of the + # next_call. + def initialize( + name:, + data_converter: nil, + client_interceptors: nil, + activities: nil, + workflows: nil, + worker_interceptors: nil, + workflow_failure_exception_types: nil, + run_context: nil + ) + @options = Options.new( + name:, + data_converter:, + client_interceptors:, + activities:, + workflows:, + worker_interceptors:, + workflow_failure_exception_types:, + run_context: + ).freeze + end + + # Implements {Client::Plugin#name} and {Worker::Plugin#name}. + def name + @options.name + end + + # Implements {Client::Plugin#configure_client}. + def configure_client(options) + if (data_converter = _single_option(new: @options.data_converter, existing: options.data_converter, + type: Converters::DataConverter, name: 'data converter')) + options = options.with(data_converter:) + end + if (interceptors = _array_option(new: @options.client_interceptors, existing: options.interceptors, + name: 'client interceptor')) + options = options.with(interceptors:) + end + options + end + + # Implements {Client::Plugin#connect_client}. + def connect_client(options, next_call) + next_call.call(options) + end + + # Implements {Worker::Plugin#configure_worker}. + def configure_worker(options) + if (activities = _array_option(new: @options.activities, existing: options.activities, name: 'activity')) + options = options.with(activities:) + end + if (workflows = _array_option(new: @options.workflows, existing: options.workflows, name: 'workflow')) + options = options.with(workflows:) + end + if (interceptors = _array_option(new: @options.worker_interceptors, existing: options.interceptors, + name: 'worker interceptor')) + options = options.with(interceptors:) + end + if (workflow_failure_exception_types = _array_option(new: @options.workflow_failure_exception_types, + existing: options.workflow_failure_exception_types, + name: 'workflow failure exception types')) + options = options.with(workflow_failure_exception_types:) + end + options + end + + # Implements {Worker::Plugin#run_worker}. + def run_worker(options, next_call) + if @options.run_context + @options.run_context.call(options, next_call) # steep:ignore + else + next_call.call(options) + end + end + + # Implements {Worker::Plugin#configure_workflow_replayer}. + def configure_workflow_replayer(options) + if (data_converter = _single_option(new: @options.data_converter, existing: options.data_converter, + type: Converters::DataConverter, name: 'data converter')) + options = options.with(data_converter:) + end + if (workflows = _array_option(new: @options.workflows, existing: options.workflows, name: 'workflow')) + options = options.with(workflows:) + end + if (interceptors = _array_option(new: @options.worker_interceptors, existing: options.interceptors, + name: 'worker interceptor')) + options = options.with(interceptors:) + end + if (workflow_failure_exception_types = _array_option(new: @options.workflow_failure_exception_types, + existing: options.workflow_failure_exception_types, + name: 'workflow failure exception types')) + options = options.with(workflow_failure_exception_types:) + end + options + end + + # Implements {Worker::Plugin#with_workflow_replay_worker}. + def with_workflow_replay_worker(options, next_call) + if @options.run_context + @options.run_context.call(options, next_call) # steep:ignore + else + next_call.call(options) + end + end + + # @!visibility private + def _single_option(new:, existing:, type:, name:) + case new + when nil + nil + when Proc + new.call(existing).tap do |val| + raise "Instance of #{name} required" unless val.is_a?(type) + end + when type + new + else + raise "Unrecognized #{name} type #{new.class}" + end + end + + # @!visibility private + def _array_option(new:, existing:, name:) + case new + when nil + nil + when Proc + new.call(existing).tap do |conv| + raise "Array for #{name} required" unless conv.is_a?(Array) + end + when Array + existing + new # steep:ignore + else + raise "Unrecognized #{name} type #{new.class}" + end + end + end +end diff --git a/temporalio/lib/temporalio/worker.rb b/temporalio/lib/temporalio/worker.rb index dfeda68d..75a21158 100644 --- a/temporalio/lib/temporalio/worker.rb +++ b/temporalio/lib/temporalio/worker.rb @@ -15,6 +15,7 @@ require 'temporalio/worker/deployment_options' require 'temporalio/worker/illegal_workflow_call_validator' require 'temporalio/worker/interceptor' +require 'temporalio/worker/plugin' require 'temporalio/worker/poller_behavior' require 'temporalio/worker/thread_pool' require 'temporalio/worker/tuner' @@ -35,6 +36,7 @@ class Worker :tuner, :activity_executors, :workflow_executor, + :plugins, :interceptors, :identity, :logger, @@ -122,6 +124,42 @@ def self.run_all( raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true, &block + ) + # We have to apply plugins. However, every plugin has a different worker. So we provide them the worker along with + # other options, but we disregard any mutation of the worker on the next call. + run_worker = proc do |options| + # @type var options: Plugin::RunWorkerOptions + _run_all_root(*workers, + cancellation: options.cancellation, + shutdown_signals: options.shutdown_signals, + raise_in_block_on_shutdown: options.raise_in_block_on_shutdown, + wait_block_complete:, &block) + end + plugins_with_workers = workers.flat_map { |w| w._plugins.map { |p| [p, w] } } + run_worker = plugins_with_workers.reverse_each.reduce(run_worker) do |next_call, plugin_with_worker| + plugin, worker = plugin_with_worker + proc do |options| + plugin.run_worker(options.with(worker:), next_call) # steep:ignore + end + end + + run_worker.call(Plugin::RunWorkerOptions.new( + # Intentionally violating typing here because we set this on each call + worker: nil, # steep:ignore + cancellation:, + shutdown_signals:, + raise_in_block_on_shutdown: + )) + end + + # @!visibility private + def self._run_all_root( + *workers, + cancellation:, + shutdown_signals:, + raise_in_block_on_shutdown:, + wait_block_complete:, + &block ) # Confirm there is at least one and they are all workers raise ArgumentError, 'At least one worker required' if workers.empty? @@ -301,6 +339,19 @@ def self.default_illegal_workflow_calls end end + # @!visibility private + def self._validate_plugins!(plugins) + plugins.each do |plugin| + raise ArgumentError, "#{plugin.class} does not implement Worker::Plugin" unless plugin.is_a?(Plugin) + + # Validate plugin has implemented expected methods + missing = Plugin.instance_methods(false).select { |m| plugin.method(m).owner == Plugin } + unless missing.empty? + raise ArgumentError, "#{plugin.class} missing the following worker plugin method(s): #{missing.join(', ')}" + end + end + end + # @return [Options] Options for this worker which has the same attributes as {initialize}. attr_reader :options @@ -315,6 +366,9 @@ def self.default_illegal_workflow_calls # @param activity_executors [Hash] Executors that activities can run within. # @param workflow_executor [WorkflowExecutor] Workflow executor that workflow tasks run within. This must be a # {WorkflowExecutor::ThreadPool} currently. + # @param plugins [Array] Plugins to use for configuring workers and intercepting the running of workers. Any + # plugins that were set on the client that include {Plugin} will automatically be applied to the worker and + # should not be configured explicitly via this option. WARNING: Plugins are experimental. # @param interceptors [Array] Interceptors specific to this worker. # Note, interceptors set on the client that include the {Interceptor::Activity} or {Interceptor::Workflow} module # are automatically included here, so no need to specify them again. @@ -387,6 +441,7 @@ def initialize( tuner: Tuner.create_fixed, activity_executors: ActivityExecutor.defaults, workflow_executor: WorkflowExecutor::ThreadPool.default, + plugins: [], interceptors: [], identity: nil, logger: client.options.logger, @@ -411,8 +466,6 @@ def initialize( activity_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_activity_task_polls), debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase) ) - raise ArgumentError, 'Must have at least one activity or workflow' if activities.empty? && workflows.empty? - Internal::ProtoUtils.assert_non_reserved_name(task_queue) @options = Options.new( @@ -423,6 +476,7 @@ def initialize( tuner:, activity_executors:, workflow_executor:, + plugins:, interceptors:, identity:, logger:, @@ -447,53 +501,68 @@ def initialize( activity_task_poller_behavior:, debug_mode: ).freeze + # Collect applicable client plugins and worker plugins, then validate and apply to options + @plugins = client.options.plugins.grep(Plugin) + plugins + Worker._validate_plugins!(@plugins) + @options = @plugins.reduce(@options) { |options, plugin| plugin.configure_worker(options) } + # Initialize the worker for the given options + _initialize_from_options + end + + # @!visibility private + def _initialize_from_options + if @options.activities.empty? && @options.workflows.empty? + raise ArgumentError, 'Must have at least one activity or workflow' + end should_enforce_versioning_behavior = - deployment_options.use_worker_versioning && - deployment_options.default_versioning_behavior == VersioningBehavior::UNSPECIFIED + @options.deployment_options.use_worker_versioning && + @options.deployment_options.default_versioning_behavior == VersioningBehavior::UNSPECIFIED # Preload workflow definitions and some workflow settings for the bridge workflow_definitions = Internal::Worker::WorkflowWorker.workflow_definitions( - workflows, - should_enforce_versioning_behavior: should_enforce_versioning_behavior + @options.workflows, + should_enforce_versioning_behavior: ) nondeterminism_as_workflow_fail, nondeterminism_as_workflow_fail_for_types = Internal::Worker::WorkflowWorker.bridge_workflow_failure_exception_type_options( - workflow_failure_exception_types:, workflow_definitions: + workflow_failure_exception_types: @options.workflow_failure_exception_types, + workflow_definitions: ) # Create the bridge worker @bridge_worker = Internal::Bridge::Worker.new( - client.connection._core_client, + @options.client.connection._core_client, Internal::Bridge::Worker::Options.new( - namespace: client.namespace, - task_queue:, - tuner: tuner._to_bridge_options, - identity_override: identity, - max_cached_workflows:, - workflow_task_poller_behavior: workflow_task_poller_behavior._to_bridge_options, - nonsticky_to_sticky_poll_ratio:, - activity_task_poller_behavior: activity_task_poller_behavior._to_bridge_options, - enable_workflows: !workflows.empty?, - enable_local_activities: !workflows.empty? && !activities.empty?, - enable_remote_activities: !activities.empty? && !no_remote_activities, + namespace: @options.client.namespace, + task_queue: @options.task_queue, + tuner: @options.tuner._to_bridge_options, + identity_override: @options.identity, + max_cached_workflows: @options.max_cached_workflows, + workflow_task_poller_behavior: @options.workflow_task_poller_behavior._to_bridge_options, + nonsticky_to_sticky_poll_ratio: @options.nonsticky_to_sticky_poll_ratio, + activity_task_poller_behavior: @options.activity_task_poller_behavior._to_bridge_options, + enable_workflows: !@options.workflows.empty?, + enable_local_activities: !@options.workflows.empty? && !@options.activities.empty?, + enable_remote_activities: !@options.activities.empty? && !@options.no_remote_activities, enable_nexus: false, - sticky_queue_schedule_to_start_timeout:, - max_heartbeat_throttle_interval:, - default_heartbeat_throttle_interval:, - max_worker_activities_per_second: max_activities_per_second, - max_task_queue_activities_per_second:, - graceful_shutdown_period:, + sticky_queue_schedule_to_start_timeout: @options.sticky_queue_schedule_to_start_timeout, + max_heartbeat_throttle_interval: @options.max_heartbeat_throttle_interval, + default_heartbeat_throttle_interval: @options.default_heartbeat_throttle_interval, + max_worker_activities_per_second: @options.max_activities_per_second, + max_task_queue_activities_per_second: @options.max_task_queue_activities_per_second, + graceful_shutdown_period: @options.graceful_shutdown_period, nondeterminism_as_workflow_fail:, nondeterminism_as_workflow_fail_for_types:, - deployment_options: deployment_options._to_bridge_options + deployment_options: @options.deployment_options._to_bridge_options, + plugins: (@options.client.options.plugins + @options.plugins).map(&:name).uniq.sort ) ) # Collect interceptors from client and params - @activity_interceptors = (client.options.interceptors + interceptors).select do |i| + @activity_interceptors = (@options.client.options.interceptors + @options.interceptors).select do |i| i.is_a?(Interceptor::Activity) end - @workflow_interceptors = (client.options.interceptors + interceptors).select do |i| + @workflow_interceptors = (@options.client.options.interceptors + @options.interceptors).select do |i| i.is_a?(Interceptor::Workflow) end @@ -501,27 +570,27 @@ def initialize( @worker_shutdown_cancellation = Cancellation.new # Create workers - unless activities.empty? + unless @options.activities.empty? @activity_worker = Internal::Worker::ActivityWorker.new(worker: self, bridge_worker: @bridge_worker) end - unless workflows.empty? + unless @options.workflows.empty? @workflow_worker = Internal::Worker::WorkflowWorker.new( bridge_worker: @bridge_worker, - namespace: client.namespace, - task_queue:, + namespace: @options.client.namespace, + task_queue: @options.task_queue, workflow_definitions:, - workflow_executor:, - logger:, - data_converter: client.data_converter, - metric_meter: client.connection.options.runtime.metric_meter, + workflow_executor: @options.workflow_executor, + logger: @options.logger, + data_converter: @options.client.data_converter, + metric_meter: @options.client.connection.options.runtime.metric_meter, workflow_interceptors: @workflow_interceptors, - disable_eager_activity_execution:, - illegal_workflow_calls:, - workflow_failure_exception_types:, - workflow_payload_codec_thread_pool:, - unsafe_workflow_io_enabled:, - debug_mode:, + disable_eager_activity_execution: @options.disable_eager_activity_execution, + illegal_workflow_calls: @options.illegal_workflow_calls, + workflow_failure_exception_types: @options.workflow_failure_exception_types, + workflow_payload_codec_thread_pool: @options.workflow_payload_codec_thread_pool, + unsafe_workflow_io_enabled: @options.unsafe_workflow_io_enabled, + debug_mode: @options.debug_mode, assert_valid_local_activity: ->(activity) { _assert_valid_local_activity(activity) } ) end @@ -643,5 +712,10 @@ def _assert_valid_local_activity(activity) "Activity #{activity} " \ 'is not registered on this worker, no available activities.' end + + # @!visibility private + def _plugins + @plugins + end end end diff --git a/temporalio/lib/temporalio/worker/plugin.rb b/temporalio/lib/temporalio/worker/plugin.rb new file mode 100644 index 00000000..44f909e8 --- /dev/null +++ b/temporalio/lib/temporalio/worker/plugin.rb @@ -0,0 +1,88 @@ +# frozen_string_literal: true + +module Temporalio + class Worker + # Plugin mixin to include for configuring workers and workflow replayers, and intercepting the running of them. + # + # This is a low-level implementation that requires abstract methods herein to be implemented. Many implementers may + # prefer {SimplePlugin} which includes this. + # + # WARNING: Plugins are experimental. + module Plugin + RunWorkerOptions = Data.define( + :worker, + :cancellation, + :shutdown_signals, + :raise_in_block_on_shutdown + ) + + # Options for {run_worker}. + # + # The options contain the worker and some other options from {Worker#run}/{Worker.run_all}. Unlike other memebers + # in this class, mutating the worker member before invoking the next call in the chain has no effect. + # + # @note Additional required attributes of this class may be added in the future. Users should never instantiate + # this class, but instead use `with` on it in {run_worker}. + class RunWorkerOptions; end # rubocop:disable Lint/EmptyClass + + WithWorkflowReplayWorkerOptions = Data.define( + :worker + ) + + # Options for {with_workflow_replay_worker}. + # + # @note Additional required attributes of this class may be added in the future. Users should never instantiate + # this class, but instead use `with` on it in {with_workflow_replay_worker}. + # + # @!attribute worker + # @return [WorkflowReplayer::ReplayWorker] Replay worker. + class WithWorkflowReplayWorkerOptions; end # rubocop:disable Lint/EmptyClass + + # @abstract + # @return [String] Name of the plugin. + def name + raise NotImplementedError + end + + # Configure a worker. + # + # @abstract + # @param options [Options] Current immutable options set. + # @return [Options] Options to use, possibly updated from original. + def configure_worker(options) + raise NotImplementedError + end + + # Run a worker. + # + # @abstract + # @param options [RunWorkerOptions] Current immutable options set. + # @param next_call [Proc] Proc for the next plugin in the chain to call. It accepts the options and returns an + # arbitrary object that should also be returned from this method. + # @return [Object] Result of next_call. + def run_worker(options, next_call) + raise NotImplementedError + end + + # Configure a workflow replayer. + # + # @abstract + # @param options [WorkflowReplayer::Options] Current immutable options set. + # @return [WorkflowReplayer::Options] Options to use, possibly updated from original. + def configure_workflow_replayer(options) + raise NotImplementedError + end + + # Run a replay worker. + # + # @abstract + # @param options [WithWorkflowReplayWorkerOptions] Current immutable options set. + # @param next_call [Proc] Proc for the next plugin in the chain to call. It accepts the options and returns an + # arbitrary object that should also be returned from this method. + # @return [Object] Result of next_call. + def with_workflow_replay_worker(options, next_call) + raise NotImplementedError + end + end + end +end diff --git a/temporalio/lib/temporalio/worker/workflow_replayer.rb b/temporalio/lib/temporalio/worker/workflow_replayer.rb index 4aeadf94..cff07e9d 100644 --- a/temporalio/lib/temporalio/worker/workflow_replayer.rb +++ b/temporalio/lib/temporalio/worker/workflow_replayer.rb @@ -7,6 +7,7 @@ require 'temporalio/internal/worker/multi_runner' require 'temporalio/internal/worker/workflow_worker' require 'temporalio/worker/interceptor' +require 'temporalio/worker/plugin' require 'temporalio/worker/poller_behavior' require 'temporalio/worker/thread_pool' require 'temporalio/worker/tuner' @@ -24,6 +25,7 @@ class WorkflowReplayer :task_queue, :data_converter, :workflow_executor, + :plugins, :interceptors, :identity, :logger, @@ -50,6 +52,8 @@ class Options; end # rubocop:disable Lint/EmptyClass # payloads. # @param workflow_executor [WorkflowExecutor] Workflow executor that workflow tasks run within. This must be a # {WorkflowExecutor::ThreadPool} currently. + # @param plugins [Array] Plugins to use for configuring replayer and intercepting replay. WARNING: Plugins + # are experimental. # @param interceptors [Array] Workflow interceptors. # @param identity [String, nil] Override the identity for this replater. # @param logger [Logger] Logger to use. Defaults to stdout with warn level. Callers setting this logger are @@ -83,6 +87,7 @@ def initialize( task_queue: 'ReplayTaskQueue', data_converter: Converters::DataConverter.default, workflow_executor: WorkflowExecutor::ThreadPool.default, + plugins: [], interceptors: [], identity: nil, logger: Logger.new($stdout, level: Logger::WARN), @@ -100,6 +105,7 @@ def initialize( task_queue:, data_converter:, workflow_executor:, + plugins:, interceptors:, identity:, logger:, @@ -110,13 +116,18 @@ def initialize( debug_mode:, runtime: ).freeze + # Apply plugins + Worker._validate_plugins!(plugins) + @options = plugins.reduce(@options) { |options, plugin| plugin.configure_workflow_replayer(options) } + # Preload definitions and other settings @workflow_definitions = Internal::Worker::WorkflowWorker.workflow_definitions( - workflows, should_enforce_versioning_behavior: false + @options.workflows, should_enforce_versioning_behavior: false ) @nondeterminism_as_workflow_fail, @nondeterminism_as_workflow_fail_for_types = Internal::Worker::WorkflowWorker.bridge_workflow_failure_exception_type_options( - workflow_failure_exception_types:, workflow_definitions: @workflow_definitions + workflow_failure_exception_types: @options.workflow_failure_exception_types, + workflow_definitions: @workflow_definitions ) # If there is a block, we'll go ahead and assume it's for with_replay_worker with_replay_worker(&) if block_given? # steep:ignore @@ -154,7 +165,18 @@ def replay_workflows(histories, raise_on_replay_failure: false) # @yield Block of code to run with a replay worker. # @yieldparam [ReplayWorker] Worker to run replays on. Note, only one workflow can replay at a time. # @yieldreturn [Object] Result of the block. - def with_replay_worker(&) + def with_replay_worker(&block) + # Apply plugins + run_block = proc do |options| + # @type var options: Plugin::WithWorkflowReplayWorkerOptions + block.call(options.worker) + end + run_block = options.plugins.reverse_each.reduce(run_block) do |next_call, plugin| + proc do |options| + plugin.with_workflow_replay_worker(options, next_call) # steep:ignore + end + end + worker = ReplayWorker.new( options:, workflow_definitions: @workflow_definitions, @@ -162,7 +184,7 @@ def with_replay_worker(&) nondeterminism_as_workflow_fail_for_types: @nondeterminism_as_workflow_fail_for_types ) begin - yield worker + run_block.call(Plugin::WithWorkflowReplayWorkerOptions.new(worker:)) ensure worker._shutdown end @@ -221,7 +243,8 @@ def initialize( graceful_shutdown_period: 0.0, nondeterminism_as_workflow_fail:, nondeterminism_as_workflow_fail_for_types:, - deployment_options: Worker.default_deployment_options._to_bridge_options + deployment_options: Worker.default_deployment_options._to_bridge_options, + plugins: options.plugins.map(&:name).uniq.sort ) ) diff --git a/temporalio/sig/temporalio/client.rbs b/temporalio/sig/temporalio/client.rbs index e40e9892..fd7e6761 100644 --- a/temporalio/sig/temporalio/client.rbs +++ b/temporalio/sig/temporalio/client.rbs @@ -4,6 +4,7 @@ module Temporalio attr_reader connection: Connection attr_reader namespace: String attr_reader data_converter: Converters::DataConverter + attr_reader plugins: Array[Plugin] attr_reader interceptors: Array[Interceptor] attr_reader logger: Logger attr_reader default_workflow_query_reject_condition: WorkflowQueryRejectCondition::enum? @@ -12,6 +13,7 @@ module Temporalio connection: Connection, namespace: String, data_converter: Converters::DataConverter, + plugins: Array[Plugin], interceptors: Array[Interceptor], logger: Logger, default_workflow_query_reject_condition: WorkflowQueryRejectCondition::enum? @@ -38,6 +40,7 @@ module Temporalio ?api_key: String?, ?tls: bool | Connection::TLSOptions | nil, ?data_converter: Converters::DataConverter, + ?plugins: Array[Plugin], ?interceptors: Array[Interceptor], ?logger: Logger, ?default_workflow_query_reject_condition: WorkflowQueryRejectCondition::enum?, @@ -50,12 +53,15 @@ module Temporalio ?lazy_connect: bool ) -> Client + def self._validate_plugins!: (Array[Plugin] plugins) -> void + attr_reader options: Options def initialize: ( connection: Connection, namespace: String, ?data_converter: Converters::DataConverter, + ?plugins: Array[Plugin], ?interceptors: Array[Interceptor], ?logger: Logger, ?default_workflow_query_reject_condition: WorkflowQueryRejectCondition::enum? diff --git a/temporalio/sig/temporalio/client/connection.rbs b/temporalio/sig/temporalio/client/connection.rbs index 1c43016a..9f53b749 100644 --- a/temporalio/sig/temporalio/client/connection.rbs +++ b/temporalio/sig/temporalio/client/connection.rbs @@ -105,7 +105,8 @@ module Temporalio ?keep_alive: KeepAliveOptions, ?http_connect_proxy: HTTPConnectProxyOptions?, ?runtime: Runtime, - ?lazy_connect: bool + ?lazy_connect: bool, + ?around_connect: nil | ^(Options) { (Options) -> void } -> void ) -> void def target_host: -> String diff --git a/temporalio/sig/temporalio/client/plugin.rbs b/temporalio/sig/temporalio/client/plugin.rbs new file mode 100644 index 00000000..febc1799 --- /dev/null +++ b/temporalio/sig/temporalio/client/plugin.rbs @@ -0,0 +1,14 @@ +module Temporalio + class Client + module Plugin + def name: -> String + + def configure_client: (Options options) -> Options + + def connect_client: ( + Connection::Options options, + ^(Connection::Options) -> Connection next_call + ) -> Connection + end + end +end \ No newline at end of file diff --git a/temporalio/sig/temporalio/internal/bridge/worker.rbs b/temporalio/sig/temporalio/internal/bridge/worker.rbs index ec5a299a..51272e33 100644 --- a/temporalio/sig/temporalio/internal/bridge/worker.rbs +++ b/temporalio/sig/temporalio/internal/bridge/worker.rbs @@ -24,6 +24,7 @@ module Temporalio attr_accessor nondeterminism_as_workflow_fail: bool attr_accessor nondeterminism_as_workflow_fail_for_types: Array[String] attr_accessor deployment_options: DeploymentOptions? + attr_accessor plugins: Array[String] def initialize: ( namespace: String, @@ -46,7 +47,8 @@ module Temporalio graceful_shutdown_period: Float, nondeterminism_as_workflow_fail: bool, nondeterminism_as_workflow_fail_for_types: Array[String], - deployment_options: DeploymentOptions? + deployment_options: DeploymentOptions?, + plugins: Array[String] ) -> void end diff --git a/temporalio/sig/temporalio/simple_plugin.rbs b/temporalio/sig/temporalio/simple_plugin.rbs new file mode 100644 index 00000000..c15fe6e3 --- /dev/null +++ b/temporalio/sig/temporalio/simple_plugin.rbs @@ -0,0 +1,65 @@ +module Temporalio + class SimplePlugin + include Client::Plugin + include Worker::Plugin + + type option[T] = nil | T | ^(T) -> T + + class Options + attr_reader name: String + attr_reader data_converter: option[Converters::DataConverter] + attr_reader client_interceptors: option[Array[Client::Interceptor]] + attr_reader activities: option[Array[Activity::Definition | singleton(Activity::Definition) | Activity::Definition::Info]] + attr_reader workflows: option[Array[singleton(Workflow::Definition) | Workflow::Definition::Info]] + attr_reader worker_interceptors: option[Array[Worker::Interceptor::Activity | Worker::Interceptor::Workflow]] + attr_reader workflow_failure_exception_types: option[Array[String]] + attr_reader run_context: nil | ^( + Worker::Plugin::RunWorkerOptions | Worker::Plugin::WithWorkflowReplayWorkerOptions options, + ^(Worker::Plugin::RunWorkerOptions | Worker::Plugin::WithWorkflowReplayWorkerOptions) -> untyped next_call + ) -> untyped + + def initialize: ( + name: String, + data_converter: option[Converters::DataConverter], + client_interceptors: option[Array[Client::Interceptor]], + activities: option[Array[Activity::Definition | singleton(Activity::Definition) | Activity::Definition::Info]], + workflows: option[Array[singleton(Workflow::Definition) | Workflow::Definition::Info]], + worker_interceptors: option[Array[Worker::Interceptor::Activity | Worker::Interceptor::Workflow]], + workflow_failure_exception_types: option[Array[String]], + run_context: nil | ^( + Worker::Plugin::RunWorkerOptions | Worker::Plugin::WithWorkflowReplayWorkerOptions options, + ^(Worker::Plugin::RunWorkerOptions | Worker::Plugin::WithWorkflowReplayWorkerOptions) -> untyped next_call + ) -> untyped + ) -> void + end + + attr_reader options: Options + + def initialize: ( + name: String, + ?data_converter: option[Converters::DataConverter], + ?client_interceptors: option[Array[Client::Interceptor]], + ?activities: option[Array[Activity::Definition | singleton(Activity::Definition) | Activity::Definition::Info]], + ?workflows: option[Array[singleton(Workflow::Definition) | Workflow::Definition::Info]], + ?worker_interceptors: option[Array[Worker::Interceptor::Activity | Worker::Interceptor::Workflow]], + ?workflow_failure_exception_types: option[Array[String]], + ?run_context: nil | ^( + Worker::Plugin::RunWorkerOptions | Worker::Plugin::WithWorkflowReplayWorkerOptions options, + ^(Worker::Plugin::RunWorkerOptions | Worker::Plugin::WithWorkflowReplayWorkerOptions) -> untyped next_call + ) -> untyped + ) -> void + + def _single_option: [T < ::Object] ( + new: option[T], + existing: T, + type: Class, + name: String + ) -> T? + + def _array_option: [T < ::Object] ( + new: option[T], + existing: T, + name: String + ) -> T? + end +end \ No newline at end of file diff --git a/temporalio/sig/temporalio/worker.rbs b/temporalio/sig/temporalio/worker.rbs index d8386598..5a3a2b5f 100644 --- a/temporalio/sig/temporalio/worker.rbs +++ b/temporalio/sig/temporalio/worker.rbs @@ -9,6 +9,7 @@ module Temporalio attr_reader tuner: Tuner attr_reader activity_executors: Hash[Symbol, Worker::ActivityExecutor] attr_reader workflow_executor: Worker::WorkflowExecutor + attr_reader plugins: Array[Plugin] attr_reader interceptors: Array[Interceptor::Activity | Interceptor::Workflow] attr_reader identity: String attr_reader logger: Logger @@ -30,6 +31,7 @@ module Temporalio attr_reader unsafe_workflow_io_enabled: bool attr_reader workflow_task_poller_behavior: PollerBehavior attr_reader activity_task_poller_behavior: PollerBehavior + attr_reader deployment_options: Worker::DeploymentOptions attr_reader debug_mode: bool def initialize: ( @@ -40,6 +42,7 @@ module Temporalio tuner: Tuner, activity_executors: Hash[Symbol, Worker::ActivityExecutor], workflow_executor: Worker::WorkflowExecutor, + plugins: Array[Plugin], interceptors: Array[Interceptor::Activity | Interceptor::Workflow], identity: String?, logger: Logger, @@ -80,8 +83,18 @@ module Temporalio ?wait_block_complete: bool ) ?{ -> T } -> T + def self._run_all_root: [T] ( + *Worker workers, + cancellation: Cancellation, + shutdown_signals: Array[String | Integer], + raise_in_block_on_shutdown: Exception?, + wait_block_complete: bool + ) ?{ -> T } -> T + def self.default_illegal_workflow_calls: -> Hash[String, :all | Array[Symbol | IllegalWorkflowCallValidator] | IllegalWorkflowCallValidator] + def self._validate_plugins!: (Array[Plugin] plugins) -> void + attr_reader options: Options def initialize: ( @@ -92,6 +105,7 @@ module Temporalio ?tuner: Tuner, ?activity_executors: Hash[Symbol, Worker::ActivityExecutor], ?workflow_executor: Worker::WorkflowExecutor, + ?plugins: Array[Plugin], ?interceptors: Array[Interceptor::Activity | Interceptor::Workflow], ?identity: String?, ?logger: Logger, @@ -117,6 +131,8 @@ module Temporalio ?debug_mode: bool ) -> void + def _initialize_from_options: -> void + def task_queue: -> String def client: -> Client @@ -137,5 +153,6 @@ module Temporalio def _on_poll_bytes: (Internal::Worker::MultiRunner runner, Symbol worker_type, String bytes) -> void def _on_shutdown_complete: -> void def _assert_valid_local_activity: (String) -> void + def _plugins: -> Array[Plugin] end end diff --git a/temporalio/sig/temporalio/worker/plugin.rbs b/temporalio/sig/temporalio/worker/plugin.rbs new file mode 100644 index 00000000..1b07683f --- /dev/null +++ b/temporalio/sig/temporalio/worker/plugin.rbs @@ -0,0 +1,47 @@ +module Temporalio + class Worker + module Plugin + class RunWorkerOptions + attr_reader worker: Worker + attr_reader cancellation: Cancellation + attr_reader shutdown_signals: Array[String | Integer] + attr_reader raise_in_block_on_shutdown: Exception? + + def initialize: ( + worker: Worker, + cancellation: Cancellation, + shutdown_signals: Array[String | Integer], + raise_in_block_on_shutdown: Exception? + ) -> void + + def with: (**untyped) -> RunWorkerOptions + end + + class WithWorkflowReplayWorkerOptions + attr_reader worker: WorkflowReplayer::ReplayWorker + + def initialize: ( + worker: WorkflowReplayer::ReplayWorker + ) -> void + + def with: (**untyped) -> WithWorkflowReplayWorkerOptions + end + + def name: -> String + + def configure_worker: (Options options) -> Options + + def run_worker: ( + RunWorkerOptions options, + ^(RunWorkerOptions) -> untyped next_call + ) -> untyped + + def configure_workflow_replayer: (WorkflowReplayer::Options options) -> WorkflowReplayer::Options + + def with_workflow_replay_worker: ( + WithWorkflowReplayWorkerOptions options, + ^(WithWorkflowReplayWorkerOptions) -> untyped next_call + ) -> untyped + end + end +end \ No newline at end of file diff --git a/temporalio/sig/temporalio/worker/workflow_replayer.rbs b/temporalio/sig/temporalio/worker/workflow_replayer.rbs index 0f457cb7..9a1088f8 100644 --- a/temporalio/sig/temporalio/worker/workflow_replayer.rbs +++ b/temporalio/sig/temporalio/worker/workflow_replayer.rbs @@ -7,6 +7,7 @@ module Temporalio attr_reader task_queue: String attr_reader data_converter: Converters::DataConverter attr_reader workflow_executor: Worker::WorkflowExecutor + attr_reader plugins: Array[Plugin] attr_reader interceptors: Array[Interceptor::Workflow] attr_reader identity: String? attr_reader logger: Logger @@ -23,6 +24,7 @@ module Temporalio task_queue: String, data_converter: Converters::DataConverter, workflow_executor: Worker::WorkflowExecutor, + plugins: Array[Plugin], interceptors: Array[Interceptor::Workflow], identity: String?, logger: Logger, @@ -33,6 +35,8 @@ module Temporalio debug_mode: bool, runtime: Runtime ) -> void + + def with: (**Object kwargs) -> Options end attr_reader options: Options @@ -43,6 +47,7 @@ module Temporalio ?task_queue: String, ?data_converter: Converters::DataConverter, ?workflow_executor: Worker::WorkflowExecutor, + ?plugins: Array[Plugin], ?interceptors: Array[Interceptor::Workflow], ?identity: String?, ?logger: Logger, diff --git a/temporalio/test/plugin_test.rb b/temporalio/test/plugin_test.rb new file mode 100644 index 00000000..fef265ef --- /dev/null +++ b/temporalio/test/plugin_test.rb @@ -0,0 +1,236 @@ +# frozen_string_literal: true + +require 'securerandom' +require 'temporalio/simple_plugin' + +class PluginTest < Test + class ClientPluginForTest + include Temporalio::Client::Plugin + include Temporalio::Client::Interceptor + + attr_reader :name + attr_accessor :start_workflow_called + + def initialize(target_host:, fail_connect: false) + @name = 'some-plugin' + @target_host = target_host + @fail_connect = fail_connect + end + + def configure_client(options) + # Set interceptor + options.with(interceptors: [self]) + end + + def connect_client(options, next_call) + raise 'Intentional client failure' if @fail_connect + + # Set target host + next_call.call(options.with(target_host: @target_host)) + end + + def intercept_client(next_interceptor) + Interceptor.new(self, next_interceptor) + end + + class Interceptor < Temporalio::Client::Interceptor::Outbound + def initialize(plugin, next_interceptor) + super(next_interceptor) + @plugin = plugin + end + + def start_workflow(input) + @plugin.start_workflow_called = true + super + end + end + end + + class SimpleWorkflow < Temporalio::Workflow::Definition + def execute(name) + "Hello, #{name}!" + end + end + + def test_client_plugin + # Connect with a plugin that fails on connect + err = assert_raises do + Temporalio::Client.connect( + 'bad', + env.client.namespace, + plugins: [ClientPluginForTest.new(target_host: env.client.connection.target_host, fail_connect: true)] + ) + end + assert_equal 'Intentional client failure', err.message + + # Connect with a plugin that sets address, run a workflow, confirm plugin properly configured interceptor + plugin = ClientPluginForTest.new(target_host: env.client.connection.target_host) + client = Temporalio::Client.connect('bad-address', env.client.namespace, plugins: [plugin]) + refute plugin.start_workflow_called + client.start_workflow(SimpleWorkflow, 'some-name', + id: "wf-#{SecureRandom.uuid}", task_queue: "tq-#{SecureRandom.uuid}") + assert plugin.start_workflow_called + end + + class WorkerPluginForTest + include Temporalio::Worker::Plugin + include Temporalio::Worker::Interceptor::Workflow + + attr_reader :name + attr_accessor :execute_workflow_called, :replay_workflow_called + + def initialize(fail_run: false) + @name = 'some-plugin' + @fail_run = fail_run + end + + def configure_worker(options) + options.with(workflows: [SimpleWorkflow], interceptors: [self]) + end + + def run_worker(options, next_call) + raise 'Intentional worker failure' if @fail_run + + next_call.call(options) + end + + def configure_workflow_replayer(options) + options.with(workflows: [SimpleWorkflow], interceptors: [self]) + end + + def with_workflow_replay_worker(options, next_call) + # Replace with our own replayer that will mark it called when called + next_call.call(options.with(worker: OverrideReplayWorker.new(options.worker, self))) + end + + def intercept_workflow(next_interceptor) + Interceptor.new(self, next_interceptor) + end + + class OverrideReplayWorker < SimpleDelegator + def initialize(underlying, plugin) + super(underlying) + @plugin = plugin + end + + def replay_workflow(*args, **kwargs) + @plugin.replay_workflow_called = true + super + end + end + + class Interceptor < Temporalio::Worker::Interceptor::Workflow::Inbound + def initialize(plugin, next_interceptor) + super(next_interceptor) + @plugin = plugin + end + + def execute(input) + @plugin.execute_workflow_called = true + super + end + end + end + + def test_worker_plugin + # Fail run + err = assert_raises do + Temporalio::Worker.new(client: env.client, + task_queue: "tq-#{SecureRandom.uuid}", + plugins: [WorkerPluginForTest.new(fail_run: true)]) + .run { flunk } + end + assert_equal 'Intentional worker failure', err.message + + # Run workflow in worker, confirm interceptor hit + plugin = WorkerPluginForTest.new + worker = Temporalio::Worker.new(client: env.client, task_queue: "tq-#{SecureRandom.uuid}", plugins: [plugin]) + handle = worker.run do + refute plugin.execute_workflow_called + env.client.start_workflow(SimpleWorkflow, 'some-name', + id: "wf-#{SecureRandom.uuid}", task_queue: worker.task_queue).tap do |handle| + assert_equal 'Hello, some-name!', handle.result + assert plugin.execute_workflow_called + end + end + + # Run replayer with a new version of the plugin + plugin = WorkerPluginForTest.new + replayer = Temporalio::Worker::WorkflowReplayer.new(workflows: [], plugins: [plugin]) + refute plugin.execute_workflow_called + refute plugin.replay_workflow_called + replayer.replay_workflow(handle.fetch_history) + assert plugin.execute_workflow_called + assert plugin.replay_workflow_called + end + + class MethodsNotImplementedPlugin + include Temporalio::Client::Plugin + include Temporalio::Worker::Plugin + end + + def test_plugin_methods_not_implemented + err = assert_raises(ArgumentError) do + Temporalio::Client.connect('does-not-matter', 'does-not-matter', plugins: [MethodsNotImplementedPlugin.new]) + end + assert err.message.include?('missing') && err.message.include?('connect_client') + err = assert_raises(ArgumentError) do + Temporalio::Worker.new(client: env.client, task_queue: 'does-not-matter', + plugins: [MethodsNotImplementedPlugin.new]) + end + assert err.message.include?('missing') && err.message.include?('run_worker') + end + + class ToPayloadTrackingPayloadConverter < SimpleDelegator + attr_accessor :to_payload_values + + def to_payload(value, **kwargs) + (@to_payload_values ||= []) << value + super + end + + def to_payloads(values, **kwargs) + (@to_payload_values ||= []).concat(values) + super + end + end + + def test_simple_plugin + # Create a simple plugin that just confirms some things are properly set + payload_converter = ToPayloadTrackingPayloadConverter.new(Temporalio::Converters::PayloadConverter.default) + run_context_calls = [] + plugin = Temporalio::SimplePlugin.new( + name: 'simple-plugin', + data_converter: Temporalio::Converters::DataConverter.new(payload_converter:), + workflows: [SimpleWorkflow], + run_context: proc do |options, next_call| # steep:ignore + run_context_calls << options + next_call.call(options) # steep:ignore + end + ) + + # Create a client and worker with the plugin, run workflow, confirm success + client = Temporalio::Client.connect(env.client.connection.target_host, env.client.namespace, plugins: [plugin]) + worker = Temporalio::Worker.new(client:, task_queue: "tq-#{SecureRandom.uuid}") + handle = worker.run do + client.start_workflow(SimpleWorkflow, 'some-name', + id: "wf-#{SecureRandom.uuid}", + task_queue: worker.task_queue).tap(&:result) + end + assert_equal 'Hello, some-name!', handle.result + + # Confirm custom payload converter called + assert_equal ['some-name', 'Hello, some-name!'], payload_converter.to_payload_values + + # Run in replayer with same plugin + replayer = Temporalio::Worker::WorkflowReplayer.new(workflows: [], plugins: [plugin]) + replayer.replay_workflow(handle.fetch_history) + # Confirm payload converter called for this too + assert_equal ['some-name', 'Hello, some-name!', 'Hello, some-name!'], payload_converter.to_payload_values + # Confirm run context called for both + assert_equal( + [Temporalio::Worker::Plugin::RunWorkerOptions, Temporalio::Worker::Plugin::WithWorkflowReplayWorkerOptions], + run_context_calls.map(&:class) + ) + end +end