From cf1654d09a90a34c369d9c87dc8fdff4833a57d7 Mon Sep 17 00:00:00 2001 From: Robert Ross Date: Tue, 18 Mar 2025 13:33:29 -0700 Subject: [PATCH 1/4] WIP: Load executables dynamically based on name instead of class registry --- lib/temporal/activity/task_processor.rb | 11 +++-- lib/temporal/executable_lookup.rb | 23 +++++++-- lib/temporal/workflow/task_processor.rb | 9 +++- .../temporal/activity/task_processor_spec.rb | 49 +++++++++++++++++++ .../lib/temporal/executable_lookup_spec.rb | 2 +- 5 files changed, 84 insertions(+), 10 deletions(-) diff --git a/lib/temporal/activity/task_processor.rb b/lib/temporal/activity/task_processor.rb index ef20780b..5c9af8c8 100644 --- a/lib/temporal/activity/task_processor.rb +++ b/lib/temporal/activity/task_processor.rb @@ -16,7 +16,7 @@ def initialize(task, task_queue, namespace, activity_lookup, middleware_chain, c @metadata = Metadata.generate_activity_metadata(task, namespace, config.converter) @task_token = task.task_token @activity_name = task.activity_type.name - @activity_class = activity_lookup.find(activity_name) + @activity_lookup = activity_lookup @middleware_chain = middleware_chain @config = config @heartbeat_thread_pool = heartbeat_thread_pool @@ -30,6 +30,7 @@ def process context = Activity::Context.new(connection, metadata, config, heartbeat_thread_pool) + activity_class = find_activity_class if !activity_class raise ActivityNotRegistered, 'Activity is not registered with this worker' end @@ -65,8 +66,8 @@ def metric_tags private - attr_reader :task, :task_queue, :namespace, :task_token, :activity_name, :activity_class, - :middleware_chain, :metadata, :config, :heartbeat_thread_pool + attr_reader :task, :task_queue, :namespace, :task_token, :activity_name, :activity_lookup, + :middleware_chain, :metadata, :config, :heartbeat_thread_pool def connection @connection ||= Temporal::Connection.generate(config.for_connection) @@ -78,6 +79,10 @@ def queue_time_ms ((started - scheduled) * 1_000).round end + def find_activity_class + activity_lookup.find(activity_name) + end + def respond_completed(result) Temporal.logger.info("Activity task completed", metadata.to_h) log_retry = proc do diff --git a/lib/temporal/executable_lookup.rb b/lib/temporal/executable_lookup.rb index 88d85bf4..7541ad71 100644 --- a/lib/temporal/executable_lookup.rb +++ b/lib/temporal/executable_lookup.rb @@ -27,20 +27,35 @@ def add_dynamic(name, executable) raise SecondDynamicExecutableError, @fallback_executable_name end - @fallback_executable = executable + @fallback_executable_class_name = executable.is_a?(String) ? executable : executable.name @fallback_executable_name = name end def add(name, executable) - executables[name] = executable + executables[name] = executable.is_a?(String) ? executable : executable.name end def find(name) - executables[name] || @fallback_executable + if executables[name] + resolve_executable(executables[name]) + elsif @fallback_executable_class_name + resolve_executable(@fallback_executable_class_name) + else + nil + end end private - attr_reader :executables, :fallback_executable, :fallback_executable_name + def resolve_executable(class_name) + # Use Ruby's built-in constant lookup + class_name.split('::').inject(Object) do |mod, class_segment| + mod.const_get(class_segment) + end + rescue NameError + nil + end + + attr_reader :executables, :fallback_executable_name, :fallback_executable_class_name end end diff --git a/lib/temporal/workflow/task_processor.rb b/lib/temporal/workflow/task_processor.rb index b3620ad8..10abb151 100644 --- a/lib/temporal/workflow/task_processor.rb +++ b/lib/temporal/workflow/task_processor.rb @@ -29,7 +29,7 @@ def initialize(task, task_queue, namespace, workflow_lookup, middleware_chain, w @metadata = Metadata.generate_workflow_task_metadata(task, namespace) @task_token = task.task_token @workflow_name = task.workflow_type.name - @workflow_class = workflow_lookup.find(workflow_name) + @workflow_lookup = workflow_lookup @middleware_chain = middleware_chain @workflow_middleware_chain = workflow_middleware_chain @config = config @@ -42,6 +42,7 @@ def process Temporal.logger.debug("Processing Workflow task", metadata.to_h) Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_QUEUE_TIME, queue_time_ms, metric_tags) + workflow_class = find_workflow_class raise Temporal::WorkflowNotRegistered, 'Workflow is not registered with this worker' unless workflow_class history = fetch_full_history @@ -85,7 +86,11 @@ def metric_tags private - attr_reader :task, :task_queue, :namespace, :task_token, :workflow_name, :workflow_class, + def find_workflow_class + workflow_lookup.find(workflow_name) + end + + attr_reader :task, :task_queue, :namespace, :task_token, :workflow_name, :workflow_lookup, :middleware_chain, :workflow_middleware_chain, :metadata, :config, :binary_checksum def connection diff --git a/spec/unit/lib/temporal/activity/task_processor_spec.rb b/spec/unit/lib/temporal/activity/task_processor_spec.rb index 6999ba60..a7d527ac 100644 --- a/spec/unit/lib/temporal/activity/task_processor_spec.rb +++ b/spec/unit/lib/temporal/activity/task_processor_spec.rb @@ -310,5 +310,54 @@ end end end + + context 'when a namespaced activity is registered' do + let(:activity_name) { 'MyNamespace::TestActivity' } + + module MyNamespace + class TestActivity + def self.execute_in_context(context, input) + 'namespaced result' + end + end + end + + let(:activity_class) { MyNamespace::TestActivity } + + before do + allow(lookup).to receive(:find).with(activity_name).and_return(activity_class) + allow(activity_class).to receive(:execute_in_context).and_call_original + end + + it 'correctly resolves and executes the namespaced activity' do + subject.process + + expect(lookup).to have_received(:find).with(activity_name) + expect(activity_class).to have_received(:execute_in_context).with(context, input) + end + + it 'completes the activity task with the correct result' do + subject.process + + expect(connection) + .to have_received(:respond_activity_task_completed) + .with(namespace: namespace, task_token: task.task_token, result: 'namespaced result') + end + + it 'sends metrics with the correct namespaced activity name' do + subject.process + + expect(Temporal.metrics) + .to have_received(:timing) + .with( + Temporal::MetricKeys::ACTIVITY_TASK_LATENCY, + an_instance_of(Integer), + activity: activity_name, + namespace: namespace, + task_queue: task_queue, + workflow: workflow_name + ) + end + end end end diff --git a/spec/unit/lib/temporal/executable_lookup_spec.rb b/spec/unit/lib/temporal/executable_lookup_spec.rb index 197dc334..a89523b2 100644 --- a/spec/unit/lib/temporal/executable_lookup_spec.rb +++ b/spec/unit/lib/temporal/executable_lookup_spec.rb @@ -18,7 +18,7 @@ class IllegalSecondDynamicActivity it 'adds a class to the lookup map' do subject.add('foo', TestClass) - expect(subject.send(:executables)).to eq('foo' => TestClass) + expect(subject.send(:executables)).to eq('foo' => "TestClass") end end From a14ce2cae45118282cc25be3fea4fb60ac383698 Mon Sep 17 00:00:00 2001 From: Robert Ross Date: Tue, 18 Mar 2025 19:45:14 -0700 Subject: [PATCH 2/4] WIP --- .../temporal/activity/task_processor_spec.rb | 27 +++----- .../temporal/workflow/task_processor_spec.rb | 62 ++++++++++++++++++- 2 files changed, 71 insertions(+), 18 deletions(-) diff --git a/spec/unit/lib/temporal/activity/task_processor_spec.rb b/spec/unit/lib/temporal/activity/task_processor_spec.rb index a7d527ac..60a3e890 100644 --- a/spec/unit/lib/temporal/activity/task_processor_spec.rb +++ b/spec/unit/lib/temporal/activity/task_processor_spec.rb @@ -313,6 +313,16 @@ context 'when a namespaced activity is registered' do let(:activity_name) { 'MyNamespace::TestActivity' } + let(:activity_type) do + Temporalio::Api::Common::V1::ActivityType.new(name: activity_name) + end + let(:task) do + Fabricate( + :api_activity_task, + activity_type: activity_type, + input: config.converter.to_payloads(input) + ) + end module MyNamespace class TestActivity @@ -326,14 +336,12 @@ def self.execute_in_context(context, input) before do allow(lookup).to receive(:find).with(activity_name).and_return(activity_class) - allow(activity_class).to receive(:execute_in_context).and_call_original end it 'correctly resolves and executes the namespaced activity' do subject.process expect(lookup).to have_received(:find).with(activity_name) - expect(activity_class).to have_received(:execute_in_context).with(context, input) end it 'completes the activity task with the correct result' do @@ -343,21 +351,6 @@ def self.execute_in_context(context, input) .to have_received(:respond_activity_task_completed) .with(namespace: namespace, task_token: task.task_token, result: 'namespaced result') end - - it 'sends metrics with the correct namespaced activity name' do - subject.process - - expect(Temporal.metrics) - .to have_received(:timing) - .with( - Temporal::MetricKeys::ACTIVITY_TASK_LATENCY, - an_instance_of(Integer), - activity: activity_name, - namespace: namespace, - task_queue: task_queue, - workflow: workflow_name - ) - end end end end diff --git a/spec/unit/lib/temporal/workflow/task_processor_spec.rb b/spec/unit/lib/temporal/workflow/task_processor_spec.rb index 6ad3c12c..28228298 100644 --- a/spec/unit/lib/temporal/workflow/task_processor_spec.rb +++ b/spec/unit/lib/temporal/workflow/task_processor_spec.rb @@ -10,7 +10,7 @@ let(:namespace) { 'test-namespace' } let(:task_queue) { 'test-queue' } - let(:lookup) { instance_double('Temporal::ExecutableLookup', find: nil) } + let(:lookup) { Temporal::ExecutableLookup.new } let(:query) { nil } let(:queries) { nil } let(:task) do @@ -25,6 +25,10 @@ let(:config) { Temporal::Configuration.new } let(:binary_checksum) { 'v1.0.0' } + before "register workflow in lookup" do + lookup.add(workflow_name, Temporal::Workflow::TestWorkflow) + end + describe '#process' do let(:context) { instance_double('Temporal::Workflow::Context') } @@ -453,5 +457,61 @@ end end end + + context 'when a namespaced workflow is registered' do + let(:workflow_name) { 'MyNamespace::TestWorkflow' } + let(:api_workflow_type) do + Fabricate(:api_workflow_type, name: workflow_name) + end + + module MyNamespace + class TestWorkflow + def self.execute_in_context(context, input) + 'namespaced workflow result' + end + end + end + + let(:workflow_class) { MyNamespace::TestWorkflow } + let(:executor) { double('Temporal::Workflow::Executor') } + let(:commands) { double('commands') } + let(:new_sdk_flags_used) { double('new_sdk_flags_used') } + let(:run_result) do + Temporal::Workflow::Executor::RunResult.new(commands: commands, new_sdk_flags_used: new_sdk_flags_used) + end + + before do + allow(lookup).to receive(:find).with(workflow_name).and_return(workflow_class) + allow(workflow_class).to receive(:execute_in_context).and_return('namespaced workflow result') + allow(Temporal::Workflow::Executor).to receive(:new).and_return(executor) + allow(executor).to receive(:run) do + workflow_class.execute_in_context(context, input) + run_result + end + allow(executor).to receive(:process_queries) + end + + it 'correctly resolves and executes the namespaced workflow' do + subject.process + + expect(lookup).to have_received(:find).with(workflow_name) + expect(workflow_class).to have_received(:execute_in_context).with(context, input) + end + + it 'completes the workflow task' do + subject.process + + expect(connection) + .to have_received(:respond_workflow_task_completed) + .with( + namespace: namespace, + task_token: task.task_token, + commands: commands, + query_results: nil, + binary_checksum: binary_checksum, + new_sdk_flags_used: new_sdk_flags_used + ) + end + end end end From cb0505b3a436f13ab989a4e99cc8006ae88501bf Mon Sep 17 00:00:00 2001 From: Robert Ross Date: Tue, 18 Mar 2025 19:51:24 -0700 Subject: [PATCH 3/4] Revert "WIP" This reverts commit a14ce2cae45118282cc25be3fea4fb60ac383698. --- .../temporal/activity/task_processor_spec.rb | 27 +++++--- .../temporal/workflow/task_processor_spec.rb | 62 +------------------ 2 files changed, 18 insertions(+), 71 deletions(-) diff --git a/spec/unit/lib/temporal/activity/task_processor_spec.rb b/spec/unit/lib/temporal/activity/task_processor_spec.rb index 60a3e890..a7d527ac 100644 --- a/spec/unit/lib/temporal/activity/task_processor_spec.rb +++ b/spec/unit/lib/temporal/activity/task_processor_spec.rb @@ -313,16 +313,6 @@ context 'when a namespaced activity is registered' do let(:activity_name) { 'MyNamespace::TestActivity' } - let(:activity_type) do - Temporalio::Api::Common::V1::ActivityType.new(name: activity_name) - end - let(:task) do - Fabricate( - :api_activity_task, - activity_type: activity_type, - input: config.converter.to_payloads(input) - ) - end module MyNamespace class TestActivity @@ -336,12 +326,14 @@ def self.execute_in_context(context, input) before do allow(lookup).to receive(:find).with(activity_name).and_return(activity_class) + allow(activity_class).to receive(:execute_in_context).and_call_original end it 'correctly resolves and executes the namespaced activity' do subject.process expect(lookup).to have_received(:find).with(activity_name) + expect(activity_class).to have_received(:execute_in_context).with(context, input) end it 'completes the activity task with the correct result' do @@ -351,6 +343,21 @@ def self.execute_in_context(context, input) .to have_received(:respond_activity_task_completed) .with(namespace: namespace, task_token: task.task_token, result: 'namespaced result') end + + it 'sends metrics with the correct namespaced activity name' do + subject.process + + expect(Temporal.metrics) + .to have_received(:timing) + .with( + Temporal::MetricKeys::ACTIVITY_TASK_LATENCY, + an_instance_of(Integer), + activity: activity_name, + namespace: namespace, + task_queue: task_queue, + workflow: workflow_name + ) + end end end end diff --git a/spec/unit/lib/temporal/workflow/task_processor_spec.rb b/spec/unit/lib/temporal/workflow/task_processor_spec.rb index 28228298..6ad3c12c 100644 --- a/spec/unit/lib/temporal/workflow/task_processor_spec.rb +++ b/spec/unit/lib/temporal/workflow/task_processor_spec.rb @@ -10,7 +10,7 @@ let(:namespace) { 'test-namespace' } let(:task_queue) { 'test-queue' } - let(:lookup) { Temporal::ExecutableLookup.new } + let(:lookup) { instance_double('Temporal::ExecutableLookup', find: nil) } let(:query) { nil } let(:queries) { nil } let(:task) do @@ -25,10 +25,6 @@ let(:config) { Temporal::Configuration.new } let(:binary_checksum) { 'v1.0.0' } - before "register workflow in lookup" do - lookup.add(workflow_name, Temporal::Workflow::TestWorkflow) - end - describe '#process' do let(:context) { instance_double('Temporal::Workflow::Context') } @@ -457,61 +453,5 @@ end end end - - context 'when a namespaced workflow is registered' do - let(:workflow_name) { 'MyNamespace::TestWorkflow' } - let(:api_workflow_type) do - Fabricate(:api_workflow_type, name: workflow_name) - end - - module MyNamespace - class TestWorkflow - def self.execute_in_context(context, input) - 'namespaced workflow result' - end - end - end - - let(:workflow_class) { MyNamespace::TestWorkflow } - let(:executor) { double('Temporal::Workflow::Executor') } - let(:commands) { double('commands') } - let(:new_sdk_flags_used) { double('new_sdk_flags_used') } - let(:run_result) do - Temporal::Workflow::Executor::RunResult.new(commands: commands, new_sdk_flags_used: new_sdk_flags_used) - end - - before do - allow(lookup).to receive(:find).with(workflow_name).and_return(workflow_class) - allow(workflow_class).to receive(:execute_in_context).and_return('namespaced workflow result') - allow(Temporal::Workflow::Executor).to receive(:new).and_return(executor) - allow(executor).to receive(:run) do - workflow_class.execute_in_context(context, input) - run_result - end - allow(executor).to receive(:process_queries) - end - - it 'correctly resolves and executes the namespaced workflow' do - subject.process - - expect(lookup).to have_received(:find).with(workflow_name) - expect(workflow_class).to have_received(:execute_in_context).with(context, input) - end - - it 'completes the workflow task' do - subject.process - - expect(connection) - .to have_received(:respond_workflow_task_completed) - .with( - namespace: namespace, - task_token: task.task_token, - commands: commands, - query_results: nil, - binary_checksum: binary_checksum, - new_sdk_flags_used: new_sdk_flags_used - ) - end - end end end From c2f6dd3e3fe6f480901b930bcd1494d68980201a Mon Sep 17 00:00:00 2001 From: Robert Ross Date: Tue, 18 Mar 2025 20:55:00 -0700 Subject: [PATCH 4/4] Unfreeze look. Update specs for namespaced constants --- lib/temporal/executable_lookup.rb | 9 ++- lib/temporal/worker.rb | 4 +- .../grpc/activity_task_fabricator.rb | 2 +- .../grpc/activity_type_fabricator.rb | 2 +- .../temporal/activity/task_processor_spec.rb | 66 ++++--------------- .../lib/temporal/executable_lookup_spec.rb | 10 +++ 6 files changed, 30 insertions(+), 63 deletions(-) diff --git a/lib/temporal/executable_lookup.rb b/lib/temporal/executable_lookup.rb index 7541ad71..d7477231 100644 --- a/lib/temporal/executable_lookup.rb +++ b/lib/temporal/executable_lookup.rb @@ -17,6 +17,8 @@ def initialize(previous_executable_name) end end + ExecutableNotFoundError = Class.new(StandardError) + def initialize @executables = {} end @@ -48,12 +50,9 @@ def find(name) private def resolve_executable(class_name) - # Use Ruby's built-in constant lookup - class_name.split('::').inject(Object) do |mod, class_segment| - mod.const_get(class_segment) - end + Object.const_get(class_name) rescue NameError - nil + raise Temporal::ExecutableLookup::ExecutableNotFoundError, "Executable #{class_name} not found" end attr_reader :executables, :fallback_executable_name, :fallback_executable_class_name diff --git a/lib/temporal/worker.rb b/lib/temporal/worker.rb index e9a3b2f3..3ddd52a1 100644 --- a/lib/temporal/worker.rb +++ b/lib/temporal/worker.rb @@ -172,12 +172,12 @@ def while_stopping_hook; end def on_stopped_hook; end def workflow_poller_for(namespace, task_queue, lookup) - Workflow::Poller.new(namespace, task_queue, lookup.freeze, config, workflow_task_middleware, workflow_middleware, + Workflow::Poller.new(namespace, task_queue, lookup, config, workflow_task_middleware, workflow_middleware, workflow_poller_options) end def activity_poller_for(namespace, task_queue, lookup) - Activity::Poller.new(namespace, task_queue, lookup.freeze, config, activity_middleware, activity_poller_options) + Activity::Poller.new(namespace, task_queue, lookup, config, activity_middleware, activity_poller_options) end def executable_registration(executable_class, options) diff --git a/spec/fabricators/grpc/activity_task_fabricator.rb b/spec/fabricators/grpc/activity_task_fabricator.rb index 82e0886f..258c1c34 100644 --- a/spec/fabricators/grpc/activity_task_fabricator.rb +++ b/spec/fabricators/grpc/activity_task_fabricator.rb @@ -5,7 +5,7 @@ activity_id { SecureRandom.uuid } task_token { |attrs| attrs[:task_token] || SecureRandom.uuid } - activity_type { Fabricate(:api_activity_type) } + activity_type { |attrs| Fabricate(:api_activity_type, name: attrs[:activity_name]) } input { TEST_CONVERTER.to_payloads(nil) } workflow_type { Fabricate(:api_workflow_type) } workflow_execution { Fabricate(:api_workflow_execution) } diff --git a/spec/fabricators/grpc/activity_type_fabricator.rb b/spec/fabricators/grpc/activity_type_fabricator.rb index b1e232ff..34a16120 100644 --- a/spec/fabricators/grpc/activity_type_fabricator.rb +++ b/spec/fabricators/grpc/activity_type_fabricator.rb @@ -1,3 +1,3 @@ Fabricator(:api_activity_type, from: Temporalio::Api::Common::V1::ActivityType) do - name 'TestActivity' + name { |attrs| attrs[:name] || 'TestActivity' } end diff --git a/spec/unit/lib/temporal/activity/task_processor_spec.rb b/spec/unit/lib/temporal/activity/task_processor_spec.rb index a7d527ac..ee9984a7 100644 --- a/spec/unit/lib/temporal/activity/task_processor_spec.rb +++ b/spec/unit/lib/temporal/activity/task_processor_spec.rb @@ -3,13 +3,14 @@ require 'temporal/metric_keys' require 'temporal/middleware/chain' require 'temporal/scheduled_thread_pool' +require 'temporal/executable_lookup' describe Temporal::Activity::TaskProcessor do subject { described_class.new(task, task_queue, namespace, lookup, middleware_chain, config, heartbeat_thread_pool) } let(:namespace) { 'test-namespace' } let(:task_queue) { 'test-queue' } - let(:lookup) { instance_double('Temporal::ExecutableLookup', find: nil) } + let(:lookup) { Temporal::ExecutableLookup.new } let(:task) do Fabricate( :api_activity_task, @@ -19,7 +20,7 @@ end let(:metadata) { Temporal::Metadata.generate_activity_metadata(task, namespace, config.converter) } let(:workflow_name) { task.workflow_type.name } - let(:activity_name) { 'TestActivity' } + let(:activity_name) { 'Temporal::MyActivity' } let(:connection) { instance_double('Temporal::Connection::GRPC') } let(:middleware_chain) { Temporal::Middleware::Chain.new } let(:config) { Temporal::Configuration.new } @@ -94,10 +95,16 @@ end context 'when activity is registered' do - let(:activity_class) { double('Temporal::Activity', execute_in_context: nil) } + let(:activity_class) do + stub_const('Temporal::MyActivity', Class.new do + def self.execute_in_context(context, input) + 'result' + end + end) + end - before do - allow(lookup).to receive(:find).with(activity_name).and_return(activity_class) + before "register activity in lookup" do + lookup.add(activity_class.name, activity_class) end context 'when activity completes' do @@ -310,54 +317,5 @@ end end end - - context 'when a namespaced activity is registered' do - let(:activity_name) { 'MyNamespace::TestActivity' } - - module MyNamespace - class TestActivity - def self.execute_in_context(context, input) - 'namespaced result' - end - end - end - - let(:activity_class) { MyNamespace::TestActivity } - - before do - allow(lookup).to receive(:find).with(activity_name).and_return(activity_class) - allow(activity_class).to receive(:execute_in_context).and_call_original - end - - it 'correctly resolves and executes the namespaced activity' do - subject.process - - expect(lookup).to have_received(:find).with(activity_name) - expect(activity_class).to have_received(:execute_in_context).with(context, input) - end - - it 'completes the activity task with the correct result' do - subject.process - - expect(connection) - .to have_received(:respond_activity_task_completed) - .with(namespace: namespace, task_token: task.task_token, result: 'namespaced result') - end - - it 'sends metrics with the correct namespaced activity name' do - subject.process - - expect(Temporal.metrics) - .to have_received(:timing) - .with( - Temporal::MetricKeys::ACTIVITY_TASK_LATENCY, - an_instance_of(Integer), - activity: activity_name, - namespace: namespace, - task_queue: task_queue, - workflow: workflow_name - ) - end - end end end diff --git a/spec/unit/lib/temporal/executable_lookup_spec.rb b/spec/unit/lib/temporal/executable_lookup_spec.rb index a89523b2..23a34226 100644 --- a/spec/unit/lib/temporal/executable_lookup_spec.rb +++ b/spec/unit/lib/temporal/executable_lookup_spec.rb @@ -42,6 +42,16 @@ class IllegalSecondDynamicActivity expect(subject.find('bar')).to eq(nil) end + it "still returns the class even it was unloaded and has a new object_id" do + original_object_id = stub_const('TestClass', Class.new).object_id + subject.add('foo', TestClass) + + expected_class = stub_const('TestClass', Class.new) + + expect(expected_class.object_id).not_to eq(original_object_id) + expect(subject.find('foo')).to be(expected_class) + end + it 'falls back to the dynamic executable' do subject.add('TestClass', TestClass) subject.add_dynamic('MyDynamicActivity', MyDynamicActivity)