diff --git a/README.md b/README.md index b128408..356ffba 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,9 @@ Prerequisites: * [env_config](env_config) - Load client configuration from TOML files with programmatic overrides. * [message_passing_simple](message_passing_simple) - Simple workflow that accepts signals, queries, and updates. * [patching](patching) - Demonstrates how to safely alter a workflow. +* [polling/frequent](polling/frequent) - Implement a frequent polling mechanism inside an Activity. * [polling/infrequent](polling/infrequent) - Implement an infrequent polling mechanism using Temporal's automatic Activity Retry feature. +* [polling/periodic_sequence](polling/periodic_sequence) - Implement a periodic polling mechanism using a Child Workflow. * [rails_app](rails_app) - Basic Rails API application using Temporal workflows and activities. * [saga](saga) - Using undo/compensation using a very simplistic Saga pattern. * [sorbet_generic](sorbet_generic) - Proof of concept of how to do _advanced_ Sorbet typing with the SDK. diff --git a/polling/frequent/README.md b/polling/frequent/README.md new file mode 100644 index 0000000..5b029fa --- /dev/null +++ b/polling/frequent/README.md @@ -0,0 +1,35 @@ +# Frequent Polling Activity + +This sample demonstrates how to implement a frequent polling mechanism +(1 second or faster) inside an Activity. +The implementation is a loop that polls a service and then sleeps for the poll +interval (1 second in the sample). + +## How to Run + +1. **Start the Worker:** + + Open a terminal and run the following command to start the worker process. + The worker will listen for tasks on the `frequent-polling-sample` task queue. + + ```bash + bundle exec ruby worker.rb + ``` + + You will see the worker log messages indicating it is calling the service. + It will try several times, with a short delay between each attempt. + +2. **Start the Workflow:** + + In a separate terminal, run this command to start the workflow. + This script will start the workflow and wait for its completion, + printing the final result. + + ```bash + bundle exec ruby starter.rb + ``` + + After a few seconds, the service will succeed. + You will see the final result printed in the starter's terminal, + and the worker will log the successful completion. + diff --git a/polling/frequent/compose_greeting_activity.rb b/polling/frequent/compose_greeting_activity.rb new file mode 100644 index 0000000..2825c02 --- /dev/null +++ b/polling/frequent/compose_greeting_activity.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +require 'temporalio/activity' +require_relative '../test_service' + +module Polling + module Frequent + class ComposeGreetingActivity < Temporalio::Activity::Definition + def execute(input) + loop do + activity_info = Temporalio::Activity::Context.current.info + begin + return TestService.get_service_result(input, activity_info) + rescue TestService::TestServiceError + Temporalio::Activity::Context.current.logger.info('Test service was down') + end + Temporalio::Activity::Context.current.heartbeat + sleep 1 + end + end + end + end +end diff --git a/polling/frequent/greeting_workflow.rb b/polling/frequent/greeting_workflow.rb new file mode 100644 index 0000000..9fe5452 --- /dev/null +++ b/polling/frequent/greeting_workflow.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +require 'temporalio/workflow' +require_relative 'compose_greeting_activity' + +module Polling + module Frequent + class GreetingWorkflow < Temporalio::Workflow::Definition + def execute(name) + Temporalio::Workflow.execute_activity( + ComposeGreetingActivity, + { greeting: 'Hello', name: name }, + start_to_close_timeout: 60, + heartbeat_timeout: 2 + ) + end + end + end +end diff --git a/polling/frequent/starter.rb b/polling/frequent/starter.rb new file mode 100644 index 0000000..506b398 --- /dev/null +++ b/polling/frequent/starter.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +require 'temporalio/client' +require_relative 'greeting_workflow' + +# Create a client +client = Temporalio::Client.connect('localhost:7233', 'default') + +# Run workflow +puts 'Executing workflow' +result = client.execute_workflow( + Polling::Frequent::GreetingWorkflow, + 'World', + id: "frequent-polling-sample-workflow-id-#{Time.now.to_i}", + task_queue: 'frequent-polling-sample' +) +puts "Workflow result: #{result}" diff --git a/polling/frequent/worker.rb b/polling/frequent/worker.rb new file mode 100644 index 0000000..26e7054 --- /dev/null +++ b/polling/frequent/worker.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +require_relative 'greeting_workflow' +require_relative 'compose_greeting_activity' +require 'temporalio/client' +require 'temporalio/worker' + +# Create a client +client = Temporalio::Client.connect('localhost:7233', 'default') + +worker = Temporalio::Worker.new( + client:, + task_queue: 'frequent-polling-sample', + workflows: [Polling::Frequent::GreetingWorkflow], + activities: [Polling::Frequent::ComposeGreetingActivity] +) + +puts 'Starting worker (ctrl+c to exit)' +worker.run(shutdown_signals: ['SIGINT']) diff --git a/polling/infrequent/compose_greeting_activity.rb b/polling/infrequent/compose_greeting_activity.rb index 7979ee2..b271992 100644 --- a/polling/infrequent/compose_greeting_activity.rb +++ b/polling/infrequent/compose_greeting_activity.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true require 'temporalio/activity' -require_relative 'test_service' +require_relative '../test_service' module Polling module Infrequent diff --git a/polling/infrequent/test_service.rb b/polling/infrequent/test_service.rb deleted file mode 100644 index 1ec5b3c..0000000 --- a/polling/infrequent/test_service.rb +++ /dev/null @@ -1,26 +0,0 @@ -# frozen_string_literal: true - -module Polling - module Infrequent - # A mock external service with simulated errors. - module TestService - class TestServiceError < StandardError; end - - @attempts = Hash.new(0) - ERROR_ATTEMPTS = 5 - - def get_service_result(input, activity_info) - workflow_id = activity_info.workflow_id - @attempts[workflow_id] ||= 0 - @attempts[workflow_id] += 1 - - puts "Attempt #{@attempts[workflow_id]} of #{ERROR_ATTEMPTS} to invoke service" - - raise TestServiceError, 'service is down' unless @attempts[workflow_id] == ERROR_ATTEMPTS - - "#{input['greeting']}, #{input['name']}!" - end - module_function :get_service_result - end - end -end diff --git a/polling/periodic_sequence/README.md b/polling/periodic_sequence/README.md new file mode 100644 index 0000000..9e97c64 --- /dev/null +++ b/polling/periodic_sequence/README.md @@ -0,0 +1,36 @@ +# Periodic Polling of a Sequence of Activities + +This sample demonstrates how to use a Child Workflow for periodic Activity polling. + +This is a rare scenario where polling requires execution of a Sequence of Activities, or Activity arguments need to change between polling retries. For this case we use a Child Workflow to call polling activities a set number of times in a loop and then periodically call Continue-As-New. + +## How to Run + +To run, first see [README.md](../README.md) for prerequisites. + +1. **Start the Worker:** + + Open a terminal and run the following command to start the worker process. + The worker will listen for tasks on the `frequent-polling-sample` task queue. + + ```bash + bundle exec ruby worker.rb + ``` + + You will see the worker log messages indicating it is calling the service. + It will try several times, with a short delay between each attempt. + +2. **Start the Workflow:** + + In a separate terminal, run this command to start the workflow. + This script will start the workflow and wait for its completion, + printing the final result. + + ```bash + bundle exec ruby starter.rb + ``` + + After a few seconds, the service will succeed. + You will see the final result printed in the starter's terminal, + and the worker will log the successful completion. + diff --git a/polling/periodic_sequence/child_workflow.rb b/polling/periodic_sequence/child_workflow.rb new file mode 100644 index 0000000..0705f7a --- /dev/null +++ b/polling/periodic_sequence/child_workflow.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +require 'temporalio/workflow' +require_relative 'compose_greeting_activity' +require_relative '../test_service' + +module Polling + module PeriodicSequence + class ChildWorkflow < Temporalio::Workflow::Definition + def execute(name) + 4.times do + begin + return Temporalio::Workflow.execute_activity( + ComposeGreetingActivity, + { greeting: 'Hello', name: name }, + retry_policy: Temporalio::RetryPolicy.new( + max_attempts: 1 + ), + start_to_close_timeout: 4 + ) + rescue Temporalio::Error::ActivityError + Temporalio::Workflow.logger.info('Activity failed') + end + Temporalio::Workflow.sleep(1) + end + raise Temporalio::Workflow::ContinueAsNewError, name + end + end + end +end diff --git a/polling/periodic_sequence/compose_greeting_activity.rb b/polling/periodic_sequence/compose_greeting_activity.rb new file mode 100644 index 0000000..d7cd6dc --- /dev/null +++ b/polling/periodic_sequence/compose_greeting_activity.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +require 'temporalio/activity' +require_relative '../test_service' + +module Polling + module PeriodicSequence + class ComposeGreetingActivity < Temporalio::Activity::Definition + def execute(input) + activity_info = Temporalio::Activity::Context.current.info + TestService.get_service_result(input, activity_info) + end + end + end +end diff --git a/polling/periodic_sequence/greeting_workflow.rb b/polling/periodic_sequence/greeting_workflow.rb new file mode 100644 index 0000000..29a0c51 --- /dev/null +++ b/polling/periodic_sequence/greeting_workflow.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +require 'temporalio/workflow' +require_relative 'child_workflow' + +module Polling + module PeriodicSequence + class GreetingWorkflow < Temporalio::Workflow::Definition + def execute(name) + Temporalio::Workflow.execute_child_workflow(ChildWorkflow, name) + end + end + end +end diff --git a/polling/periodic_sequence/starter.rb b/polling/periodic_sequence/starter.rb new file mode 100644 index 0000000..a890cd8 --- /dev/null +++ b/polling/periodic_sequence/starter.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +require 'temporalio/client' +require_relative 'greeting_workflow' + +# Create a client +client = Temporalio::Client.connect('localhost:7233', 'default') + +# Run workflow +puts 'Executing workflow' +result = client.execute_workflow( + Polling::PeriodicSequence::GreetingWorkflow, + 'World', + id: "periodic-sequence-polling-sample-workflow-id-#{Time.now.to_i}", + task_queue: 'periodic-sequence-polling-sample' +) +puts "Workflow result: #{result}" diff --git a/polling/periodic_sequence/worker.rb b/polling/periodic_sequence/worker.rb new file mode 100644 index 0000000..af5ab27 --- /dev/null +++ b/polling/periodic_sequence/worker.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +require_relative 'greeting_workflow' +require_relative 'child_workflow' +require_relative 'compose_greeting_activity' +require 'temporalio/client' +require 'temporalio/worker' + +# Create a client +client = Temporalio::Client.connect('localhost:7233', 'default') + +worker = Temporalio::Worker.new( + client:, + task_queue: 'periodic-sequence-polling-sample', + workflows: [Polling::PeriodicSequence::GreetingWorkflow, Polling::PeriodicSequence::ChildWorkflow], + activities: [Polling::PeriodicSequence::ComposeGreetingActivity] +) + +puts 'Starting worker (ctrl+c to exit)' +worker.run(shutdown_signals: ['SIGINT']) diff --git a/polling/test_service.rb b/polling/test_service.rb new file mode 100644 index 0000000..f82bf34 --- /dev/null +++ b/polling/test_service.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +module Polling + # A mock external service with simulated errors. + module TestService + class TestServiceError < StandardError; end + + @attempts = Hash.new(0) + ERROR_ATTEMPTS = 5 + + def get_service_result(input, activity_info) + workflow_id = activity_info.workflow_id + @attempts[workflow_id] ||= 0 + @attempts[workflow_id] += 1 + + puts "Attempt #{@attempts[workflow_id]} of #{ERROR_ATTEMPTS} to invoke service" + + raise TestServiceError, 'service is down' unless @attempts[workflow_id] == ERROR_ATTEMPTS + + "#{input['greeting']}, #{input['name']}!" + end + module_function :get_service_result + end +end diff --git a/test/polling/frequent/greeting_workflow_test.rb b/test/polling/frequent/greeting_workflow_test.rb new file mode 100644 index 0000000..9de178e --- /dev/null +++ b/test/polling/frequent/greeting_workflow_test.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +require 'test' +require 'securerandom' +require 'temporalio/testing' +require 'temporalio/worker' +require 'polling/frequent/greeting_workflow' +require 'polling/frequent/compose_greeting_activity' +require 'polling/test_service' + +module Polling + module Frequent + class GreetingWorkflowTest < Test + def test_workflow_completes_after_polling + task_queue = "tq-#{SecureRandom.uuid}" + + Temporalio::Testing::WorkflowEnvironment.start_local do |env| + worker = Temporalio::Worker.new( + client: env.client, + task_queue: task_queue, + activities: [Polling::Frequent::ComposeGreetingActivity], + workflows: [Polling::Frequent::GreetingWorkflow] + ) + + handle = env.client.start_workflow( + Polling::Frequent::GreetingWorkflow, + 'Temporal', + id: "wf-#{SecureRandom.uuid}", + task_queue: task_queue + ) + + worker.run do + # Wait for the workflow to complete and assert its result + result = handle.result + assert_equal('Hello, Temporal!', result) + end + end + end + end + end +end diff --git a/test/polling/infrequent/greeting_workflow_test.rb b/test/polling/infrequent/greeting_workflow_test.rb index 027cec9..b92bdee 100644 --- a/test/polling/infrequent/greeting_workflow_test.rb +++ b/test/polling/infrequent/greeting_workflow_test.rb @@ -6,7 +6,7 @@ require 'temporalio/worker' require 'polling/infrequent/greeting_workflow' require 'polling/infrequent/compose_greeting_activity' -require 'polling/infrequent/test_service' +require 'polling/test_service' module Polling module Infrequent diff --git a/test/polling/periodic_sequence/greeting_workflow_test.rb b/test/polling/periodic_sequence/greeting_workflow_test.rb new file mode 100644 index 0000000..a99c478 --- /dev/null +++ b/test/polling/periodic_sequence/greeting_workflow_test.rb @@ -0,0 +1,47 @@ +# frozen_string_literal: true + +require 'test' +require 'securerandom' +require 'temporalio/testing' +require 'temporalio/worker' +require 'polling/periodic_sequence/greeting_workflow' +require 'polling/periodic_sequence/compose_greeting_activity' +require 'polling/test_service' + +module Polling + module PeriodicSequence + class GreetingWorkflowTest < Test + def test_workflow_completes_after_polling + task_queue = "tq-#{SecureRandom.uuid}" + + Temporalio::Testing::WorkflowEnvironment.start_time_skipping do |env| + worker = Temporalio::Worker.new( + client: env.client, + task_queue: task_queue, + activities: [Polling::PeriodicSequence::ComposeGreetingActivity], + workflows: [Polling::PeriodicSequence::GreetingWorkflow, Polling::PeriodicSequence::ChildWorkflow] + ) + + handle = env.client.start_workflow( + Polling::PeriodicSequence::GreetingWorkflow, + 'Temporal', + id: "wf-#{SecureRandom.uuid}", + task_queue: task_queue + ) + + worker.run do + env.sleep(5) + # Wait for the workflow to complete and assert its result + result = handle.result + assert_equal 'Hello, Temporal!', result + end + + child_started_event = handle.fetch_history_events.filter_map do |e| + e.child_workflow_execution_started_event_attributes&.workflow_type&.name + end.first + assert_equal 'ChildWorkflow', child_started_event + end + end + end + end +end diff --git a/test/saga/saga_workflow_test.rb b/test/saga/saga_workflow_test.rb index c5bde74..66686dd 100644 --- a/test/saga/saga_workflow_test.rb +++ b/test/saga/saga_workflow_test.rb @@ -7,7 +7,7 @@ require 'temporalio/testing' require 'temporalio/worker' require 'polling/infrequent/compose_greeting_activity' -require 'polling/infrequent/test_service' +require 'polling/test_service' module Saga class SagaWorkflowTest < Test