Skip to content

Commit f0a6c8d

Browse files
committed
feat: add infrastructure schema for step skipping and cascading
Introduce new columns and constraints to support skip logic in steps, including condition patterns, skip reasons, and cascade skip functionality. Extend runtime tables with skip-related fields and indexes. Update functions to handle new skip parameters and cascade logic, ensuring proper validation and data integrity.
1 parent 2da7657 commit f0a6c8d

16 files changed

Lines changed: 979 additions & 12 deletions
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@pgflow/core': patch
3+
---
4+
5+
Add skip infrastructure schema for conditional execution - new columns (condition_pattern, when_unmet, when_failed, skip_reason, skipped_at), 'skipped' status, and cascade_skip_steps function

pkgs/core/schemas/0050_tables_definitions.sql

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ create table pgflow.steps (
2424
opt_base_delay int,
2525
opt_timeout int,
2626
opt_start_delay int,
27+
condition_pattern jsonb, -- JSON pattern for @> containment check
28+
when_unmet text not null default 'skip', -- What to do when condition not met (skip is natural default)
29+
when_failed text not null default 'fail', -- What to do when handler fails after retries
2730
created_at timestamptz not null default now(),
2831
primary key (flow_slug, step_slug),
2932
unique (flow_slug, step_index), -- Ensure step_index is unique within a flow
@@ -32,7 +35,9 @@ create table pgflow.steps (
3235
constraint opt_max_attempts_is_nonnegative check (opt_max_attempts is null or opt_max_attempts >= 0),
3336
constraint opt_base_delay_is_nonnegative check (opt_base_delay is null or opt_base_delay >= 0),
3437
constraint opt_timeout_is_positive check (opt_timeout is null or opt_timeout > 0),
35-
constraint opt_start_delay_is_nonnegative check (opt_start_delay is null or opt_start_delay >= 0)
38+
constraint opt_start_delay_is_nonnegative check (opt_start_delay is null or opt_start_delay >= 0),
39+
constraint when_unmet_is_valid check (when_unmet in ('fail', 'skip', 'skip-cascade')),
40+
constraint when_failed_is_valid check (when_failed in ('fail', 'skip', 'skip-cascade'))
3641
);
3742

3843
-- Dependencies table - stores relationships between steps

pkgs/core/schemas/0060_tables_runtime.sql

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,20 @@ create table pgflow.step_states (
3131
remaining_deps int not null default 0 check (remaining_deps >= 0),
3232
output jsonb, -- Step output: stored atomically with status=completed transition
3333
error_message text,
34+
skip_reason text, -- Why step was skipped: condition_unmet, handler_failed, dependency_skipped
3435
created_at timestamptz not null default now(),
3536
started_at timestamptz,
3637
completed_at timestamptz,
3738
failed_at timestamptz,
39+
skipped_at timestamptz,
3840
primary key (run_id, step_slug),
3941
foreign key (flow_slug, step_slug)
4042
references pgflow.steps (flow_slug, step_slug),
41-
constraint status_is_valid check (status in ('created', 'started', 'completed', 'failed')),
43+
constraint status_is_valid check (status in ('created', 'started', 'completed', 'failed', 'skipped')),
4244
constraint status_and_remaining_tasks_match check (status != 'completed' or remaining_tasks = 0),
4345
-- Add constraint to ensure remaining_tasks is only set when step has started
4446
constraint remaining_tasks_state_consistency check (
45-
remaining_tasks is null or status != 'created'
47+
remaining_tasks is null or status not in ('created', 'skipped')
4648
),
4749
constraint initial_tasks_known_when_started check (
4850
status != 'started' or initial_tasks is not null
@@ -52,16 +54,29 @@ create table pgflow.step_states (
5254
constraint output_only_for_completed_or_null check (
5355
output is null or status = 'completed'
5456
),
55-
constraint completed_at_or_failed_at check (not (completed_at is not null and failed_at is not null)),
57+
-- skip_reason is required for skipped status and forbidden for other statuses
58+
constraint skip_reason_matches_status check (
59+
(status = 'skipped' and skip_reason is not null) or
60+
(status != 'skipped' and skip_reason is null)
61+
),
62+
constraint completed_at_or_failed_at_or_skipped_at check (
63+
(
64+
case when completed_at is not null then 1 else 0 end +
65+
case when failed_at is not null then 1 else 0 end +
66+
case when skipped_at is not null then 1 else 0 end
67+
) <= 1
68+
),
5669
constraint started_at_is_after_created_at check (started_at is null or started_at >= created_at),
5770
constraint completed_at_is_after_started_at check (completed_at is null or completed_at >= started_at),
58-
constraint failed_at_is_after_started_at check (failed_at is null or failed_at >= started_at)
71+
constraint failed_at_is_after_started_at check (failed_at is null or failed_at >= started_at),
72+
constraint skipped_at_is_after_created_at check (skipped_at is null or skipped_at >= created_at)
5973
);
6074

6175
create index if not exists idx_step_states_ready on pgflow.step_states (run_id, status, remaining_deps) where status
6276
= 'created'
6377
and remaining_deps = 0;
6478
create index if not exists idx_step_states_failed on pgflow.step_states (run_id, step_slug) where status = 'failed';
79+
create index if not exists idx_step_states_skipped on pgflow.step_states (run_id, step_slug) where status = 'skipped';
6580
create index if not exists idx_step_states_flow_slug on pgflow.step_states (flow_slug);
6681
create index if not exists idx_step_states_run_id on pgflow.step_states (run_id);
6782

pkgs/core/schemas/0100_function_add_step.sql

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ create or replace function pgflow.add_step(
66
base_delay int default null,
77
timeout int default null,
88
start_delay int default null,
9-
step_type text default 'single'
9+
step_type text default 'single',
10+
condition_pattern jsonb default null,
11+
when_unmet text default 'skip',
12+
when_failed text default 'fail'
1013
)
1114
returns pgflow.steps
1215
language plpgsql
@@ -22,7 +25,7 @@ BEGIN
2225
-- 0 dependencies (root map - maps over flow input array)
2326
-- 1 dependency (dependent map - maps over dependency output array)
2427
IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN
25-
RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %',
28+
RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %',
2629
add_step.step_slug,
2730
COALESCE(array_length(add_step.deps_slugs, 1), 0),
2831
array_to_string(add_step.deps_slugs, ', ');
@@ -36,18 +39,22 @@ BEGIN
3639
-- Create the step
3740
INSERT INTO pgflow.steps (
3841
flow_slug, step_slug, step_type, step_index, deps_count,
39-
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay
42+
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay,
43+
condition_pattern, when_unmet, when_failed
4044
)
4145
VALUES (
4246
add_step.flow_slug,
4347
add_step.step_slug,
4448
COALESCE(add_step.step_type, 'single'),
45-
next_idx,
49+
next_idx,
4650
COALESCE(array_length(add_step.deps_slugs, 1), 0),
4751
add_step.max_attempts,
4852
add_step.base_delay,
4953
add_step.timeout,
50-
add_step.start_delay
54+
add_step.start_delay,
55+
add_step.condition_pattern,
56+
add_step.when_unmet,
57+
add_step.when_failed
5158
)
5259
ON CONFLICT ON CONSTRAINT steps_pkey
5360
DO UPDATE SET step_slug = EXCLUDED.step_slug
@@ -59,7 +66,7 @@ BEGIN
5966
FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug)
6067
WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0
6168
ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING;
62-
69+
6370
RETURN result_step;
6471
END;
6572
$$;
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
-- cascade_skip_steps: Skip a step and cascade to all downstream dependents
2+
-- Used when a condition is unmet (whenUnmet: skip-cascade) or handler fails (whenFailed: skip-cascade)
3+
create or replace function pgflow.cascade_skip_steps(
4+
run_id uuid,
5+
step_slug text,
6+
skip_reason text
7+
)
8+
returns int
9+
language plpgsql
10+
as $$
11+
DECLARE
12+
v_flow_slug text;
13+
v_total_skipped int := 0;
14+
BEGIN
15+
-- Get flow_slug for this run
16+
SELECT r.flow_slug INTO v_flow_slug
17+
FROM pgflow.runs r
18+
WHERE r.run_id = cascade_skip_steps.run_id;
19+
20+
IF v_flow_slug IS NULL THEN
21+
RAISE EXCEPTION 'Run not found: %', cascade_skip_steps.run_id;
22+
END IF;
23+
24+
-- ==========================================
25+
-- SKIP STEPS IN TOPOLOGICAL ORDER
26+
-- ==========================================
27+
-- Use recursive CTE to find all downstream dependents,
28+
-- then skip them in topological order (by step_index)
29+
WITH RECURSIVE
30+
-- ---------- Find all downstream steps ----------
31+
downstream_steps AS (
32+
-- Base case: the trigger step
33+
SELECT
34+
s.flow_slug,
35+
s.step_slug,
36+
s.step_index,
37+
cascade_skip_steps.skip_reason AS reason -- Original reason for trigger step
38+
FROM pgflow.steps s
39+
WHERE s.flow_slug = v_flow_slug
40+
AND s.step_slug = cascade_skip_steps.step_slug
41+
42+
UNION ALL
43+
44+
-- Recursive case: steps that depend on already-found steps
45+
SELECT
46+
s.flow_slug,
47+
s.step_slug,
48+
s.step_index,
49+
'dependency_skipped'::text AS reason -- Downstream steps get this reason
50+
FROM pgflow.steps s
51+
JOIN pgflow.deps d ON d.flow_slug = s.flow_slug AND d.step_slug = s.step_slug
52+
JOIN downstream_steps ds ON ds.flow_slug = d.flow_slug AND ds.step_slug = d.dep_slug
53+
),
54+
-- ---------- Deduplicate and order by step_index ----------
55+
steps_to_skip AS (
56+
SELECT DISTINCT ON (ds.step_slug)
57+
ds.flow_slug,
58+
ds.step_slug,
59+
ds.step_index,
60+
ds.reason
61+
FROM downstream_steps ds
62+
ORDER BY ds.step_slug, ds.step_index -- Keep first occurrence (trigger step has original reason)
63+
),
64+
-- ---------- Skip the steps ----------
65+
skipped AS (
66+
UPDATE pgflow.step_states ss
67+
SET status = 'skipped',
68+
skip_reason = sts.reason,
69+
skipped_at = now(),
70+
remaining_tasks = NULL -- Clear remaining_tasks for skipped steps
71+
FROM steps_to_skip sts
72+
WHERE ss.run_id = cascade_skip_steps.run_id
73+
AND ss.step_slug = sts.step_slug
74+
AND ss.status IN ('created', 'started') -- Only skip non-terminal steps
75+
RETURNING
76+
ss.*,
77+
-- Broadcast step:skipped event
78+
realtime.send(
79+
jsonb_build_object(
80+
'event_type', 'step:skipped',
81+
'run_id', ss.run_id,
82+
'flow_slug', ss.flow_slug,
83+
'step_slug', ss.step_slug,
84+
'status', 'skipped',
85+
'skip_reason', ss.skip_reason,
86+
'skipped_at', ss.skipped_at
87+
),
88+
concat('step:', ss.step_slug, ':skipped'),
89+
concat('pgflow:run:', ss.run_id),
90+
false
91+
) as _broadcast_result
92+
),
93+
-- ---------- Update run counters ----------
94+
run_updates AS (
95+
UPDATE pgflow.runs r
96+
SET remaining_steps = r.remaining_steps - skipped_count.count
97+
FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count
98+
WHERE r.run_id = cascade_skip_steps.run_id
99+
AND skipped_count.count > 0
100+
)
101+
SELECT COUNT(*) INTO v_total_skipped FROM skipped;
102+
103+
RETURN v_total_skipped;
104+
END;
105+
$$;

pkgs/core/src/database-types.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ export type Database = {
132132
remaining_deps: number
133133
remaining_tasks: number | null
134134
run_id: string
135+
skip_reason: string | null
136+
skipped_at: string | null
135137
started_at: string | null
136138
status: string
137139
step_slug: string
@@ -147,6 +149,8 @@ export type Database = {
147149
remaining_deps?: number
148150
remaining_tasks?: number | null
149151
run_id: string
152+
skip_reason?: string | null
153+
skipped_at?: string | null
150154
started_at?: string | null
151155
status?: string
152156
step_slug: string
@@ -162,6 +166,8 @@ export type Database = {
162166
remaining_deps?: number
163167
remaining_tasks?: number | null
164168
run_id?: string
169+
skip_reason?: string | null
170+
skipped_at?: string | null
165171
started_at?: string | null
166172
status?: string
167173
step_slug?: string
@@ -272,6 +278,7 @@ export type Database = {
272278
}
273279
steps: {
274280
Row: {
281+
condition_pattern: Json | null
275282
created_at: string
276283
deps_count: number
277284
flow_slug: string
@@ -282,8 +289,11 @@ export type Database = {
282289
step_index: number
283290
step_slug: string
284291
step_type: string
292+
when_failed: string
293+
when_unmet: string
285294
}
286295
Insert: {
296+
condition_pattern?: Json | null
287297
created_at?: string
288298
deps_count?: number
289299
flow_slug: string
@@ -294,8 +304,11 @@ export type Database = {
294304
step_index?: number
295305
step_slug: string
296306
step_type?: string
307+
when_failed?: string
308+
when_unmet?: string
297309
}
298310
Update: {
311+
condition_pattern?: Json | null
299312
created_at?: string
300313
deps_count?: number
301314
flow_slug?: string
@@ -306,6 +319,8 @@ export type Database = {
306319
step_index?: number
307320
step_slug?: string
308321
step_type?: string
322+
when_failed?: string
323+
when_unmet?: string
309324
}
310325
Relationships: [
311326
{
@@ -391,15 +406,19 @@ export type Database = {
391406
add_step: {
392407
Args: {
393408
base_delay?: number
409+
condition_pattern?: Json
394410
deps_slugs?: string[]
395411
flow_slug: string
396412
max_attempts?: number
397413
start_delay?: number
398414
step_slug: string
399415
step_type?: string
400416
timeout?: number
417+
when_failed?: string
418+
when_unmet?: string
401419
}
402420
Returns: {
421+
condition_pattern: Json | null
403422
created_at: string
404423
deps_count: number
405424
flow_slug: string
@@ -410,6 +429,8 @@ export type Database = {
410429
step_index: number
411430
step_slug: string
412431
step_type: string
432+
when_failed: string
433+
when_unmet: string
413434
}
414435
SetofOptions: {
415436
from: "*"
@@ -426,6 +447,10 @@ export type Database = {
426447
Args: { run_id: string }
427448
Returns: number
428449
}
450+
cascade_skip_steps: {
451+
Args: { run_id: string; skip_reason: string; step_slug: string }
452+
Returns: number
453+
}
429454
cleanup_ensure_workers_logs: {
430455
Args: { retention_hours?: number }
431456
Returns: {

0 commit comments

Comments
 (0)