Complete API documentation for the Singularity.Workflow library.
- Workflow Execution
- Workflow Lifecycle Management
- Real-Time Messaging
- Goal-Driven Orchestration (HTDAG)
- Dynamic Workflow Creation
- Execution Strategies
Execute workflows defined as Elixir modules with automatic dependency resolution and parallel execution.
What it does: Executes a workflow module with given input, managing task dependencies, parallel execution, and state persistence.
What it solves: Manual task coordination, dependency tracking, and retry logic. You define WHAT tasks to run and their dependencies; the executor handles HOW and WHEN.
@spec Singularity.Workflow.Executor.execute(
workflow_module :: module(),
input :: map(),
repo :: module(),
opts :: keyword()
) :: {:ok, result :: any(), run_id :: String.t()} | {:error, reason :: term()}Example:
defmodule MyWorkflow do
def __workflow_steps__ do
[
{:fetch, &__MODULE__.fetch/1, depends_on: []},
{:process, &__MODULE__.process/1, depends_on: [:fetch]},
{:save, &__MODULE__.save/1, depends_on: [:process]}
]
end
def fetch(input), do: {:ok, %{data: "..."}}
def process(input), do: {:ok, %{result: "..."}}
def save(input), do: {:ok, %{saved: true}}
end
{:ok, result, run_id} = Singularity.Workflow.Executor.execute(
MyWorkflow,
%{user_id: 123},
MyApp.Repo
)Options:
:timeout- Maximum execution time in milliseconds (default: 300000)
Control running workflows programmatically during execution.
What it does: Retrieves current status of a workflow execution.
What it solves: Real-time monitoring and progress tracking without polling the database manually.
@spec Singularity.Workflow.get_run_status(
run_id :: String.t(),
repo :: module()
) :: {:ok, :completed | :failed | :in_progress, details :: term()} | {:error, :not_found}Example:
{:ok, :in_progress, %{total_steps: 5, completed_steps: 2, percentage: 40.0}} =
Singularity.Workflow.get_run_status(run_id, MyApp.Repo)What it does: Query workflow runs with filtering and pagination.
What it solves: Dashboard creation, monitoring interfaces, and operational visibility without writing custom queries.
@spec Singularity.Workflow.list_workflow_runs(
repo :: module(),
filters :: keyword()
) :: {:ok, [WorkflowRun.t()]} | {:error, term()}Filters:
:status- "started", "completed", or "failed":workflow_slug- Filter by workflow module name:limit- Maximum results (default: 100):offset- Pagination offset (default: 0):order_by- Tuple like{:desc, :inserted_at}
Example:
# List all running workflows
{:ok, runs} = Singularity.Workflow.list_workflow_runs(
MyApp.Repo,
status: "started",
limit: 20
)
# List failed workflows for specific module
{:ok, failed} = Singularity.Workflow.list_workflow_runs(
MyApp.Repo,
status: "failed",
workflow_slug: "MyApp.Workflows.ProcessOrder"
)What it does: Cancels a running workflow, stopping pending tasks and marking the run as failed.
What it solves: User-initiated cancellation, timeout handling, and resource cleanup without manual database updates.
@spec Singularity.Workflow.cancel_workflow_run(
run_id :: String.t(),
repo :: module(),
opts :: keyword()
) :: :ok | {:error, term()}Options:
:reason- Cancellation reason (default: "User requested cancellation"):force- Force cancel even if already completed (default: false)
Example:
:ok = Singularity.Workflow.cancel_workflow_run(
run_id,
MyApp.Repo,
reason: "Timeout exceeded"
)What it does: Pauses workflow execution, preventing new tasks from starting while allowing currently running tasks to complete.
What it solves: Temporary suspension for maintenance windows, rate limiting, or manual intervention scenarios.
@spec Singularity.Workflow.pause_workflow_run(
run_id :: String.t(),
repo :: module()
) :: :ok | {:error, term()}Example:
# Pause for maintenance
:ok = Singularity.Workflow.pause_workflow_run(run_id, MyApp.Repo)
# Perform maintenance...
# Resume
:ok = Singularity.Workflow.resume_workflow_run(run_id, MyApp.Repo)What it does: Resumes a paused workflow, allowing queued tasks to continue execution.
What it solves: Workflow continuation after maintenance or manual review without restarting from scratch.
@spec Singularity.Workflow.resume_workflow_run(
run_id :: String.t(),
repo :: module()
) :: :ok | {:error, term()}What it does: Creates a new workflow execution from a failed run, optionally skipping completed steps.
What it solves: Transient failure recovery and partial re-execution without losing previous progress.
@spec Singularity.Workflow.retry_failed_workflow(
run_id :: String.t(),
repo :: module(),
opts :: keyword()
) :: {:ok, new_run_id :: String.t()} | {:error, term()}Options:
:skip_completed- Skip previously completed steps (default: true):reset_all- Restart entire workflow from beginning (default: false)
Example:
# Retry from point of failure
{:ok, new_run_id} = Singularity.Workflow.retry_failed_workflow(
failed_run_id,
MyApp.Repo
)
# Restart completely
{:ok, new_run_id} = Singularity.Workflow.retry_failed_workflow(
failed_run_id,
MyApp.Repo,
reset_all: true
)PostgreSQL NOTIFY-based messaging for event-driven communication between system components.
What it does: Sends a message to a channel and triggers PostgreSQL NOTIFY for real-time delivery.
What it solves: Instant event propagation without polling, enabling reactive architectures and real-time UIs.
@spec Singularity.Workflow.send_with_notify(
channel :: String.t(),
message :: map(),
repo :: module()
) :: {:ok, message_id :: String.t()} | {:error, term()}Example:
{:ok, message_id} = Singularity.Workflow.send_with_notify(
"workflow_events",
%{
type: "task_completed",
workflow_id: "wf_123",
task_id: "task_456",
duration_ms: 1500
},
MyApp.Repo
)What it does: Subscribes to a PostgreSQL NOTIFY channel for real-time message delivery.
What it solves: Event-driven architectures, real-time dashboards, and inter-service communication without external message brokers.
@spec Singularity.Workflow.listen(
channel :: String.t(),
repo :: module()
) :: {:ok, pid()} | {:error, term()}Example:
{:ok, listener_pid} = Singularity.Workflow.listen("workflow_events", MyApp.Repo)
receive do
{:notification, ^listener_pid, channel, message_id} ->
IO.puts("Received message on #{channel}: #{message_id}")
endWhat it does: Stops listening to a channel and cleans up the listener process.
What it solves: Resource cleanup and graceful shutdown of event listeners.
@spec Singularity.Workflow.unlisten(
listener_pid :: pid(),
repo :: module()
) :: :ok | {:error, term()}What it does: Sends a PostgreSQL NOTIFY without persisting to pgmq (fire-and-forget).
What it solves: Ephemeral notifications where message persistence isn't needed (e.g., UI updates).
@spec Singularity.Workflow.notify_only(
channel :: String.t(),
payload :: String.t(),
repo :: module()
) :: :ok | {:error, term()}Hierarchical Task Directed Acyclic Graph for AI/LLM-powered workflow generation.
Problem: AI systems need to break down high-level goals into executable task graphs dynamically. Traditional workflow systems require predefined steps, making them unsuitable for agent-based architectures.
Solution: HTDAG provides goal decomposition → task graph generation → workflow execution in a single pipeline, perfect for LLM-powered agents that plan their own work.
What it does: Takes a natural language goal, decomposes it into tasks via a decomposer function, creates a workflow, and executes it.
What it solves: The gap between high-level intentions and executable workflows. Enables AI agents to autonomously plan and execute complex multi-step tasks.
@spec Singularity.Workflow.Orchestrator.execute_goal(
goal :: String.t(),
decomposer :: (String.t() -> {:ok, [task_map()]} | {:error, term()}),
step_functions :: %{String.t() => function()},
repo :: module(),
opts :: keyword()
) :: {:ok, result :: any()} | {:error, term()}Example:
# Define how to decompose goals (could use LLM)
defmodule MyApp.GoalDecomposer do
def decompose(goal) do
# Call LLM or use rules to break down goal
tasks = [
%{id: "analyze", description: "Analyze requirements", depends_on: []},
%{id: "design", description: "Design solution", depends_on: ["analyze"]},
%{id: "implement", description: "Implement", depends_on: ["design"]}
]
{:ok, tasks}
end
end
# Define task implementations
step_functions = %{
"analyze" => fn input -> {:ok, %{requirements: "..."}} end,
"design" => fn input -> {:ok, %{architecture: "..."}} end,
"implement" => fn input -> {:ok, %{code: "..."}} end
}
# Execute goal
{:ok, result} = Singularity.Workflow.Orchestrator.execute_goal(
"Build user authentication system",
&MyApp.GoalDecomposer.decompose/1,
step_functions,
MyApp.Repo
)Use Cases:
- AI agents that plan their own execution
- LLM-powered task automation
- Dynamic workflow generation from natural language
- Autonomous systems that adapt workflows based on context
What it does: Decomposes a goal into a hierarchical task graph without executing it.
What it solves: Separation of planning from execution, allowing preview/approval of task graphs before execution.
@spec Singularity.Workflow.Orchestrator.decompose_goal(
goal :: String.t(),
decomposer :: function(),
repo :: module()
) :: {:ok, task_graph :: map()} | {:error, term()}Example:
{:ok, task_graph} = Singularity.Workflow.Orchestrator.decompose_goal(
"Deploy microservice to production",
&MyApp.GoalDecomposer.decompose/1,
MyApp.Repo
)
# task_graph contains:
# %{
# tasks: [
# %{id: "task1", description: "...", depends_on: []},
# %{id: "task2", description: "...", depends_on: ["task1"]}
# ],
# id: "htdag_123",
# decomposed_at: ~U[2025-11-09 ...]
# }What it does: High-level convenience wrapper combining decomposition and execution.
What it solves: Single-function API for goal → execution without managing intermediate steps.
@spec Singularity.Workflow.WorkflowComposer.compose_from_goal(
goal :: String.t(),
decomposer :: function(),
step_functions :: map(),
repo :: module()
) :: {:ok, result :: any()} | {:error, term()}Runtime workflow generation for AI/LLM systems that don't know task structure ahead of time.
What it does: Creates a new dynamic workflow definition in the database.
What it solves: Workflow creation when structure is determined at runtime (e.g., generated by AI).
@spec Singularity.Workflow.FlowBuilder.create_flow(
name :: String.t(),
repo :: module()
) :: {:ok, workflow_id :: String.t()} | {:error, term()}Example:
{:ok, workflow_id} = Singularity.Workflow.FlowBuilder.create_flow(
"ai_generated_workflow",
MyApp.Repo
)What it does: Adds a step to a dynamic workflow with dependencies.
What it solves: Incremental workflow construction as tasks are discovered/generated.
@spec Singularity.Workflow.FlowBuilder.add_step(
workflow_id :: String.t(),
step_name :: String.t(),
depends_on :: [String.t()],
repo :: module()
) :: {:ok, step :: map()} | {:error, term()}Example:
# Build workflow incrementally
{:ok, _} = FlowBuilder.add_step(workflow_id, "step1", [], MyApp.Repo)
{:ok, _} = FlowBuilder.add_step(workflow_id, "step2", ["step1"], MyApp.Repo)
{:ok, _} = FlowBuilder.add_step(workflow_id, "step3", ["step2"], MyApp.Repo)
# Execute with step function map
step_functions = %{
"step1" => fn input -> {:ok, %{data: "..."}} end,
"step2" => fn input -> {:ok, %{processed: "..."}} end,
"step3" => fn input -> {:ok, %{saved: true}} end
}
{:ok, result} = Singularity.Workflow.Executor.execute_dynamic(
workflow_id,
%{user_id: 123},
step_functions,
MyApp.Repo
)Control WHERE and HOW workflow tasks execute.
What it does: Executes tasks in the current process sequentially or in parallel based on dependencies.
What it solves: Simple workflows that don't need distributed coordination or can run entirely on one node.
def __workflow_steps__ do
[
{:step1, &__MODULE__.step1/1, depends_on: [], execution: :local},
{:step2, &__MODULE__.step2/1, depends_on: [:step1], execution: :local}
]
endUse when:
- Single-node deployments
- Fast-running tasks (<30 seconds)
- No resource-specific requirements
What it does: Distributes tasks across multiple worker nodes via PostgreSQL message queuing.
What it solves: Horizontal scaling, resource allocation (GPU/CPU queues), and workload distribution without manual coordination.
def __workflow_steps__ do
[
{:analyze, &__MODULE__.analyze/1,
depends_on: [],
execution: :distributed,
queue: :cpu_workers},
{:train_model, &__MODULE__.train/1,
depends_on: [:analyze],
execution: :distributed,
queue: :gpu_workers,
resources: [gpu: true]}
]
endUse when:
- Multi-node deployments
- Long-running tasks
- Resource-specific tasks (GPU/high-memory)
- Need for fault tolerance across nodes
Architecture:
- Tasks are enqueued to PostgreSQL via pgmq
- Workers poll queues and claim tasks
- PostgreSQL provides coordination (no leader election needed)
- Automatic retry and fault tolerance
| Category | APIs | Solves |
|---|---|---|
| Workflow Execution | Executor.execute/3 |
Task orchestration, dependency management, parallel execution |
| Lifecycle Control | cancel/pause/resume/retry/list |
Operational control, monitoring, failure recovery |
| Messaging | send_with_notify/listen/unlisten |
Real-time communication, event-driven architectures |
| HTDAG | Orchestrator.execute_goal |
AI/LLM goal → task graph → execution |
| Dynamic Workflows | FlowBuilder.create_flow/add_step |
Runtime workflow generation |
| Execution Strategies | :local / :distributed |
Local vs distributed execution |
- PostgreSQL-Centric: All coordination via database (no external brokers)
- Simple API: Complex distributed systems with simple function calls
- AI-Ready: HTDAG enables autonomous agent workflows
- Production-Grade: Lifecycle management, monitoring, fault tolerance built-in
Phoenix LiveView and Channels can use Singularity.Workflow messaging directly - no Phoenix.PubSub needed.
What it solves: Real-time UI updates for workflow progress without polling or separate pub/sub infrastructure.
defmodule MyAppWeb.WorkflowLive do
use MyAppWeb, :live_view
def mount(_params, _session, socket) do
# Start listening to workflow events
{:ok, listener_pid} = Singularity.Workflow.listen("workflow_events", MyApp.Repo)
{:ok,
socket
|> assign(:listener_pid, listener_pid)
|> assign(:workflows, [])}
end
def handle_info({:notification, _pid, "workflow_events", message_id}, socket) do
# Fetch message details and update UI
workflow_updated = fetch_workflow_by_message(message_id)
{:noreply,
socket
|> update(:workflows, fn workflows ->
update_workflow_list(workflows, workflow_updated)
end)}
end
def terminate(_reason, socket) do
# Cleanup listener
Singularity.Workflow.unlisten(socket.assigns.listener_pid, MyApp.Repo)
:ok
end
enddefmodule MyAppWeb.WorkflowChannel do
use MyAppWeb, :channel
def join("workflow:lobby", _payload, socket) do
# Subscribe to workflow messages
{:ok, listener_pid} = Singularity.Workflow.listen("workflow_events", MyApp.Repo)
{:ok, assign(socket, :listener_pid, listener_pid)}
end
def handle_info({:notification, _pid, channel, message_id}, socket) do
# Forward to connected clients
push(socket, "workflow_update", %{
channel: channel,
message_id: message_id,
timestamp: DateTime.utc_now()
})
{:noreply, socket}
end
def terminate(_reason, socket) do
Singularity.Workflow.unlisten(socket.assigns.listener_pid, MyApp.Repo)
:ok
end
end# In your workflow step
def process_data(input) do
result = do_processing(input)
# Broadcast to all connected LiveViews/Channels
Singularity.Workflow.send_with_notify(
"workflow_events",
%{
type: "processing_complete",
workflow_id: input.workflow_id,
result: result
},
MyApp.Repo
)
{:ok, result}
end| Feature | Singularity.Workflow | Phoenix.PubSub |
|---|---|---|
| Persistence | Messages stored in PostgreSQL | Ephemeral (memory only) |
| Multi-node | PostgreSQL handles distribution | Requires node clustering |
| Message History | Queryable via pgmq | Not available |
| Reliability | ACID guarantees | Best-effort delivery |
| Setup | Uses existing database | Separate infrastructure |
| Workflow Integration | Native | Requires manual bridging |
Use Singularity.Workflow Messaging:
- Workflow status updates
- Critical notifications that need persistence
- Cross-service communication
- Multi-datacenter deployments
- When message history is needed
Use Phoenix.PubSub:
- Presence tracking
- Temporary UI state sync
- High-frequency ephemeral updates
- When already using Phoenix PubSub for other features
Use Both Together:
# Critical workflow events → PostgreSQL NOTIFY
Singularity.Workflow.send_with_notify("workflow_critical", event, repo)
# Ephemeral UI updates → Phoenix.PubSub
Phoenix.PubSub.broadcast(MyApp.PubSub, "ui:updates", {:cursor_moved, data})