Core application - Ruby SDK
This page shows how to do the following:
- Develop a basic Workflow Definition
- Develop a basic Activity Definition
- Start an Activity from a Workflow
- Run a Worker Process
- Set a Dynamic Workflow
- Set a Dynamic Activity
Develop a Workflow
Workflows are the fundamental unit of a Temporal Application, and it all starts with the development of a Workflow Definition.
In the Temporal Ruby SDK programming model, Workflows are defined as classes.
Have the Workflow class extend Temporalio::Workflow::Definition
to define a Workflow.
The entrypoint is the execute
method.
class MyWorkflow < Temporalio::Workflow::Definition
def execute(name)
Temporalio::Workflow.execute_activity(
MyActivity,
{ greeting: 'Hello', name: },
start_to_close_timeout: 100
)
end
end
Temporal Workflows may have any number of custom parameters. However, we strongly recommend that hashes or objects are used as parameters, so that the object's individual fields may be altered without breaking the signature of the Workflow.
Workflow Logic Requirements
Temporal Workflows must be deterministic, which includes Ruby workflows. This means there are several things workflows cannot do such as:
- Perform IO (network, disk, stdio, etc)
- Access/alter external mutable state
- Do any threading
- Do anything using the system clock (e.g.
Time.Now
) - Make any random calls
- Make any not-guaranteed-deterministic calls
To prevent illegal workflow calls, a call tracer is put on the workflow thread that raises an exception if any illegal calls are made. Which calls are illegal is configurable in the worker options.
Customize Workflow Type
Workflows have a Type that are referred to as the Workflow name.
The following examples demonstrate how to set a custom name for your Workflow Type.
You can customize the Workflow name with a custom name in a workflow_name
class method call on the class.
The Workflow name defaults to the unqualified class name.
class MyWorkflow < Temporalio::Workflow::Definition
# Customize the name
workflow_name :MyDifferentWorkflowName
def execute(name)
Temporalio::Workflow.execute_activity(
MyActivity,
{ greeting: 'Hello', name: },
start_to_close_timeout: 100
)
end
end
Workflow Futures
Temporalio::Workflow::Future
can be used for running things in the background or concurrently.
Temporal provides Workflow-safe wrappers around some core language features in cases like these.
Temporalio::Workflow::Future
is a safe wrapper around Fiber.schedule
for running multiple Activities at once.
The Ruby SDK also provides Workflow.wait_condition
for awaiting a result.
Futures are never used implicitly, but they work with all Workflow code and constructs. For instance, to run 3 activities and wait for them all to complete, something like this can be written:
# Start 3 activities in background
fut1 = Temporalio::Workflow::Future.new do
Temporalio::Workflow.execute_activity(MyActivity1, schedule_to_close_timeout: 300)
end
fut2 = Temporalio::Workflow::Future.new do
Temporalio::Workflow.execute_activity(MyActivity2, schedule_to_close_timeout: 300)
end
fut3 = Temporalio::Workflow::Future.new do
Temporalio::Workflow.execute_activity(MyActivity3, schedule_to_close_timeout: 300)
end
# Wait for them all to complete
Temporalio::Workflow::Future.all_of(fut1, fut2, fut3).wait
Temporalio::Workflow.logger.info("Got: #{fut1.result}, #{fut2.result}, #{fut3.result}")
Or, say, to wait on the first of 5 activities or a timeout to complete:
# Start 5 activities
act_futs = 5.times.map do |i|
Temporalio::Workflow::Future.new do
Temporalio::Workflow.execute_activity(MyActivity, "my-arg-#{i}", schedule_to_close_timeout: 300)
end
end
# Start a timer
sleep_fut = Temporalio::Workflow::Future.new { Temporalio::Workflow.sleep(30) }
# Wait for first act result or sleep fut
act_result = Temporalio::Workflow::Future.any_of(sleep_fut, *act_futs).wait
# Fail if timer done first
raise Temporalio::Error::ApplicationError, 'Timer expired' if sleep_fut.done?
# Print act result otherwise
Temporalio::Workflow.logger.info("Act result: #{act_result}")
There are several other details not covered here about futures, such as how exceptions are handled, how to use a setter proc instead of a block, etc. See the API documentation for details.
Develop an Activity
One of the primary things that Workflows do is orchestrate the execution of Activities. An Activity is a normal method execution that's intended to execute a single, well-defined action (either short or long-running), such as querying a database, calling a third-party API, or transcoding a media file. An Activity can interact with world outside the Temporal Platform or use a Temporal Client to interact with a Temporal Service. For the Workflow to be able to execute the Activity, we must define the Activity Definition.
You can develop an Activity Definition by creating a class that extends Temporalio::Activity::Definition
.
To register a class as an Activity with a custom name, use the activity_name
class method in the class definition.
Otherwise, the activity name is the unqualified class name.
class MyActivity < Temporalio::Activity::Definition
def execute(input)
"#{input['greeting']}, #{input['name']}!"
end
end
There is no explicit limit to the total number of parameters that an Activity Definition may support. However, there is a limit to the total size of the data that ends up encoded into a gRPC message Payload.
A single argument is limited to a maximum size of 2 MB. And the total size of a gRPC message, which includes all the arguments, is limited to a maximum of 4 MB.
Also, keep in mind that all Payload data is recorded in the Workflow Execution Event History and large Event Histories can affect Worker performance. This is because the entire Event History could be transferred to a Worker Process with a Workflow Task.
Some SDKs require that you pass context objects, others do not. When it comes to your application data—that is, data that is serialized and encoded into a Payload—we recommend that you use a single hash or object as an argument that wraps the application data passed to Activities. This is so that you can change what data is passed to the Activity without breaking a method signature.
Activity parameters are the method parameters of the execute
method.
These can be any data type Temporal can convert.
Technically this can be multiple parameters, but Temporal strongly encourages a single parameter containing all input fields.
Activity Concurrency and Executors
By default, activities run in the "thread pool executor" (i.e. Temporalio::Worker::ActivityExecutor::ThreadPool
).
This default is shared across all workers and is a naive thread pool that continually makes threads as needed when none are
idle/available to handle incoming work.
If a thread sits idle long enough, it will be killed.
The maximum number of concurrent activities a worker will run at a time is configured via its tuner
option.
The default is Temporalio::Worker::Tuner.create_fixed
which defaults to 100 activities at a time for that worker.
When this value is reached, the worker will stop asking for work from the server until there are slots available again.
In addition to the thread pool executor, there is also a fiber executor in the default executor set.
To use fibers, call activity_executor :fiber
class method at the top of the activity class (the default of this value is :default
which is the thread pool executor).
Activities can only choose the fiber executor if the worker has been created and run in a fiber, but thread pool executor is always available.
Currently due to an issue, workers can only run in a fiber on Ruby versions 3.3 and newer.
Start Activity Execution
Calls to spawn Activity Executions are written within a Workflow Definition. The call to spawn an Activity Execution generates the ScheduleActivityTask Command. This results in the set of three Activity Task related Events (ActivityTaskScheduled, ActivityTaskStarted, and ActivityTask[Closed])in your Workflow Execution Event History.
Activity implementation code should be idempotent.
The values passed to Activities through invocation parameters or returned through a result value are recorded in the Execution history. The entire Execution history is transferred from the Temporal service to Workflow Workers when a Workflow state needs to recover. A large Execution history can thus adversely impact the performance of your Workflow.
Therefore, be mindful of the amount of data you transfer through Activity invocation parameters or Return Values. Otherwise, no additional limitations exist on Activity implementations.
To spawn an Activity Execution, use the execute_activity
operation from within your Workflow Definition.
class MyWorkflow < Temporalio::Workflow::Definition
# Customize the name
workflow_name :MyDifferentWorkflowName
def execute(name)
Temporalio::Workflow.execute_activity(
MyActivity,
{ greeting: 'Hello', name: },
start_to_close_timeout: 100
)
end
end
Activity Execution semantics rely on several parameters. The only required value that needs to be set is either a Schedule-To-Close Timeout or a Start-To-Close Timeout. These values are set as keyword parameters.
Get Activity Execution results
The Activity result is the returned from the execute_activity
call.
Run Worker Process
The Worker Process is where Workflow Functions and Activity Functions are actually executed. In a Temporal application deployment, you ship and scale as many Workers as you need to handle the load of your Workflows and Activities.
- Each Worker Entity in the Worker Process must register the exact Workflow Types and Activity Types it may execute.
- Each Worker Entity must also associate itself with exactly one Task Queue.
- Each Worker Entity polling the same Task Queue must be registered with the same Workflow Types and Activity Types.
A Worker Entity is the component within a Worker Process that listens to a specific Task Queue.
Although multiple Worker Entities can be in a single Worker Process, a single Worker Entity Worker Process may be perfectly sufficient. For more information, see the Worker tuning guide.
A Worker Entity contains a Workflow Worker and/or an Activity Worker, which makes progress on Workflow Executions and Activity Executions, respectively.
To develop a Worker, create a new Temporalio::Worker
providing the Client and worker options which include Task Queue, Workflows, and Activities and more.
The following code example creates a Worker that polls for tasks from the Task Queue and executes the Workflow.
When a Worker is created, it accepts a list of Workflows, a list of Activities, or both.
# Create a client to localhost on default namespace
client = Temporalio::Client.connect('localhost:7233', 'default')
# Create a worker with the client, activities, and workflows
worker = Temporalio::Worker.new(
client:,
task_queue: 'my-task-queue',
workflows: [MyWorkflow],
# This provides the activity instance which means it is reused for each attempt, but
# just the class can be provided to instantiate for each attempt
activities: [MyActivity.new]
)
# Run the worker until SIGINT. There are other ways to wait for shutdown, or a block can
# be provided that will shutdown when the block completes
worker.run(shutdown_signals: ['SIGINT'])
To run multiple workers, Temporalio::Worker.run_all
may be used instead.
All Workers listening to the same Task Queue name must be registered to handle the exact same Workflows Types and Activity Types.
If a Worker polls a Task for a Workflow Type or Activity Type it does not know about, it fails that Task. However, the failure of the Task does not cause the associated Workflow Execution to fail.
Set a Dynamic Workflow
A Dynamic Workflow in Temporal is a Workflow that is invoked dynamically at runtime if no other Workflow with the same name is registered.
A Workflow can be made dynamic by invoking workflow_dynamic
class method at the top of the definition.
You must register the Workflow with the Worker before it can be invoked.
Only one Dynamic Workflow can be present on a Worker.
Often, dynamic is used in conjunction with workflow_raw_args
which does not convert arguments but instead passes them
through as a splatted array of Temporalio::Converters::RawValue
instances.
class MyDynamicWorkflow < Temporalio::Workflow::Definition
# Make this the dynamic workflow and accept raw args
workflow_dynamic
workflow_raw_args
def execute(*raw_args)
# Require a single arg for our workflow
raise Temporalio::Error::ApplicationError, 'One arg expected' unless raw_args.size == 1
# Use payload converter to convert it
name = Temporalio::Workflow.payload_converter.from_payload(raw_args.first.payload)
Temporalio::Workflow.execute_activity(
MyActivity,
{ greeting: 'Hello', name: },
start_to_close_timeout: 100
)
end
end
Set a Dynamic Activity
A Dynamic Activity in Temporal is an Activity that is invoked dynamically at runtime if no other Activity with the same name is registered.
An Activity can be made dynamic by invoking activity_dynamic
class method at the top of the definition.
You must register the Activity with the Worker before it can be invoked.
Only one Dynamic Activity can be present on a Worker.
Often, dynamic is used in conjunction with activity_raw_args
which does not convert arguments but instead passes them
through as a splatted array of Temporalio::Converters::RawValue
instances.
class MyDynamicActivity < Temporalio::Activity::Definition
# Make this the dynamic activity and accept raw args
activity_dynamic
activity_raw_args
def execute(*raw_args)
raise Temporalio::Error::ApplicationError, 'One arg expected' unless raw_args.size == 1
# Use payload converter to convert it
input = Temporalio::Activity::Context.current.payload_converter.from_payload(raw_args.first.payload)
"#{input['greeting']}, #{input['name']}!"
end
end