Skip to content

Commit c137f91

Browse files
committed
fix missing skip archival
1 parent a347e8c commit c137f91

9 files changed

+506
-4
lines changed

pkgs/core/schemas/0100_function__cascade_force_skip_steps.sql

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,16 @@ BEGIN
9090
false
9191
) as _broadcast_result
9292
),
93+
-- ---------- Archive queued/started task messages for skipped steps ----------
94+
archived_messages AS (
95+
SELECT pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) as result
96+
FROM pgflow.step_tasks st
97+
WHERE st.run_id = _cascade_force_skip_steps.run_id
98+
AND st.step_slug IN (SELECT sk.step_slug FROM skipped sk)
99+
AND st.status IN ('queued', 'started')
100+
AND st.message_id IS NOT NULL
101+
HAVING COUNT(st.message_id) > 0
102+
),
93103
-- ---------- Update run counters ----------
94104
run_updates AS (
95105
UPDATE pgflow.runs r
@@ -100,6 +110,18 @@ BEGIN
100110
)
101111
SELECT COUNT(*) INTO v_total_skipped FROM skipped;
102112

113+
-- Archive queued/started task messages for all steps that were just skipped
114+
-- (query step_states since CTE state is no longer accessible)
115+
PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
116+
FROM pgflow.step_tasks st
117+
JOIN pgflow.step_states ss ON ss.run_id = st.run_id AND ss.step_slug = st.step_slug
118+
WHERE st.run_id = _cascade_force_skip_steps.run_id
119+
AND st.status IN ('queued', 'started')
120+
AND st.message_id IS NOT NULL
121+
AND ss.status = 'skipped'
122+
AND ss.skipped_at >= now() - interval '1 second' -- Only recently skipped
123+
HAVING COUNT(st.message_id) > 0;
124+
103125
RETURN v_total_skipped;
104126
END;
105127
$$;

pkgs/core/schemas/0100_function_complete_task.sql

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,30 @@ WHERE pgflow.step_states.run_id = complete_task.run_id
4040
AND pgflow.step_states.step_slug = complete_task.step_slug
4141
FOR UPDATE;
4242

43+
-- ==========================================
44+
-- GUARD: Late callback - step not started
45+
-- ==========================================
46+
-- If the step is not in 'started' state, this is a late callback.
47+
-- Do not mutate step_states or runs, archive message, return task row.
48+
IF v_step_record.status != 'started' THEN
49+
-- Archive the task message if present (prevents stuck work)
50+
PERFORM pgmq.archive(
51+
v_run_record.flow_slug,
52+
st.message_id
53+
)
54+
FROM pgflow.step_tasks st
55+
WHERE st.run_id = complete_task.run_id
56+
AND st.step_slug = complete_task.step_slug
57+
AND st.task_index = complete_task.task_index
58+
AND st.message_id IS NOT NULL;
59+
-- Return the current task row without any mutations
60+
RETURN QUERY SELECT * FROM pgflow.step_tasks
61+
WHERE pgflow.step_tasks.run_id = complete_task.run_id
62+
AND pgflow.step_tasks.step_slug = complete_task.step_slug
63+
AND pgflow.step_tasks.task_index = complete_task.task_index;
64+
RETURN;
65+
END IF;
66+
4367
-- Check for type violations AFTER acquiring locks
4468
SELECT child_step.step_slug INTO v_dependent_map_slug
4569
FROM pgflow.deps dependency

pkgs/core/schemas/0100_function_fail_task.sql

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ DECLARE
1616
v_when_exhausted text;
1717
v_task_exhausted boolean; -- True if task has exhausted retries
1818
v_flow_slug_for_deps text; -- Used for decrementing remaining_deps on plain skip
19+
v_prev_step_status text; -- Previous step status for transition-based decrement
1920
begin
2021

2122
-- If run is already failed, no retries allowed
@@ -47,6 +48,45 @@ IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id
4748
RETURN;
4849
END IF;
4950

51+
-- Late callback guard: if step is not 'started', don't mutate step/run state
52+
-- This handles callbacks arriving after step was skipped/completed/failed
53+
-- Also capture previous status for transition-based decrement
54+
DECLARE
55+
v_step_status text;
56+
v_flow_slug text;
57+
v_prev_step_status text;
58+
BEGIN
59+
-- Capture previous status BEFORE any CTE updates (for transition-based decrement)
60+
SELECT ss.status INTO v_prev_step_status
61+
FROM pgflow.step_states ss
62+
WHERE ss.run_id = fail_task.run_id
63+
AND ss.step_slug = fail_task.step_slug;
64+
65+
v_step_status := v_prev_step_status;
66+
67+
IF v_step_status IS NOT NULL AND v_step_status != 'started' THEN
68+
-- Archive the task message if present
69+
SELECT r.flow_slug INTO v_flow_slug
70+
FROM pgflow.runs r
71+
WHERE r.run_id = fail_task.run_id;
72+
73+
PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
74+
FROM pgflow.step_tasks st
75+
WHERE st.run_id = fail_task.run_id
76+
AND st.step_slug = fail_task.step_slug
77+
AND st.task_index = fail_task.task_index
78+
AND st.message_id IS NOT NULL
79+
HAVING COUNT(st.message_id) > 0;
80+
81+
-- Return current task row without mutations
82+
RETURN QUERY SELECT * FROM pgflow.step_tasks
83+
WHERE pgflow.step_tasks.run_id = fail_task.run_id
84+
AND pgflow.step_tasks.step_slug = fail_task.step_slug
85+
AND pgflow.step_tasks.task_index = fail_task.task_index;
86+
RETURN;
87+
END IF;
88+
END;
89+
5090
WITH run_lock AS (
5191
SELECT * FROM pgflow.runs
5292
WHERE pgflow.runs.run_id = fail_task.run_id
@@ -58,6 +98,10 @@ step_lock AS (
5898
AND pgflow.step_states.step_slug = fail_task.step_slug
5999
FOR UPDATE
60100
),
101+
prev_step_status AS (
102+
-- Capture previous status BEFORE any updates (must be separate CTE for correct visibility)
103+
SELECT status FROM step_lock
104+
),
61105
flow_info AS (
62106
SELECT r.flow_slug
63107
FROM pgflow.runs r
@@ -152,9 +196,12 @@ run_update AS (
152196
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
153197
ELSE NULL
154198
END,
155-
-- Decrement remaining_steps when step was skipped (not failed, run continues)
199+
-- Decrement remaining_steps only on FIRST transition to skipped
200+
-- (not when step was already skipped and a second task fails)
156201
remaining_steps = CASE
157-
WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1
202+
WHEN (select status from maybe_fail_step) = 'skipped'
203+
AND (select status from prev_step_status) != 'skipped'
204+
THEN pgflow.runs.remaining_steps - 1
158205
ELSE pgflow.runs.remaining_steps
159206
END
160207
WHERE pgflow.runs.run_id = fail_task.run_id
@@ -193,6 +240,17 @@ END IF;
193240

194241
-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade')
195242
IF v_task_exhausted AND v_step_skipped THEN
243+
-- Archive all queued/started sibling task messages for this step
244+
PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
245+
FROM pgflow.step_tasks st
246+
JOIN pgflow.runs r ON st.run_id = r.run_id
247+
WHERE st.run_id = fail_task.run_id
248+
AND st.step_slug = fail_task.step_slug
249+
AND st.status IN ('queued', 'started')
250+
AND st.message_id IS NOT NULL
251+
GROUP BY r.flow_slug
252+
HAVING COUNT(st.message_id) > 0;
253+
196254
-- Send broadcast event for step skipped
197255
PERFORM realtime.send(
198256
jsonb_build_object(

pkgs/core/schemas/0120_function_start_tasks.sql

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,14 @@ as $$
2020
where task.flow_slug = start_tasks.flow_slug
2121
and task.message_id = any(msg_ids)
2222
and task.status = 'queued'
23-
-- MVP: Don't start tasks on failed runs
24-
and r.status != 'failed'
23+
and r.status = 'started'
24+
and exists (
25+
select 1
26+
from pgflow.step_states ss
27+
where ss.run_id = task.run_id
28+
and ss.step_slug = task.step_slug
29+
and ss.status = 'started'
30+
)
2531
),
2632
start_tasks_update as (
2733
update pgflow.step_tasks
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
\set ON_ERROR_STOP on
2+
\set QUIET on
3+
4+
begin;
5+
select plan(5);
6+
7+
select pgflow_tests.reset_db();
8+
9+
select pgflow.create_flow('cascade_skip_archive');
10+
select pgflow.add_step('cascade_skip_archive', 'map_a', '{}', max_attempts=>0, step_type=>'map', when_exhausted=>'skip');
11+
select pgflow.add_step('cascade_skip_archive', 'other', '{}');
12+
13+
select pgflow.start_flow('cascade_skip_archive', '[1, 2, 3]'::jsonb);
14+
15+
with tasks as (
16+
select message_id, task_index
17+
from pgflow.step_tasks
18+
where flow_slug = 'cascade_skip_archive' and step_slug = 'map_a'
19+
order by task_index
20+
)
21+
select pgflow.start_tasks('cascade_skip_archive', array[(select message_id from tasks where task_index = 0)::bigint], pgflow_tests.ensure_worker('cascade_skip_archive'));
22+
23+
select ok(
24+
(select count(*) = 3 from pgflow.step_tasks
25+
where flow_slug = 'cascade_skip_archive' and step_slug = 'map_a'),
26+
'Setup: map_a should have 3 tasks'
27+
);
28+
29+
select ok(
30+
(select count(*) >= 1 from pgmq.q_cascade_skip_archive),
31+
'Setup: queue should have messages'
32+
);
33+
34+
select pgflow._cascade_force_skip_steps(
35+
(select run_id from pgflow.runs where flow_slug = 'cascade_skip_archive'),
36+
'map_a',
37+
'condition_unmet'
38+
);
39+
40+
select is(
41+
(select status from pgflow.step_states where flow_slug = 'cascade_skip_archive' and step_slug = 'map_a'),
42+
'skipped'::text,
43+
'Step state should be skipped'
44+
);
45+
46+
select is_empty(
47+
$$
48+
select 1
49+
from pgmq.q_cascade_skip_archive q
50+
join pgflow.step_tasks st on st.message_id = q.msg_id
51+
where st.flow_slug = 'cascade_skip_archive'
52+
and st.step_slug = 'map_a'
53+
$$,
54+
'Queue should have 0 messages for skipped map_a step after _cascade_force_skip_steps'
55+
);
56+
57+
select ok(
58+
(select count(*) >= 3 from pgmq.a_cascade_skip_archive a
59+
join pgflow.step_tasks st on st.message_id = a.msg_id
60+
where st.flow_slug = 'cascade_skip_archive' and st.step_slug = 'map_a'),
61+
'Archive should contain all 3 map_a task messages'
62+
);
63+
64+
select * from finish();
65+
rollback;
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
-- Test: Late complete after step is skipped should not mutate step or run state
2+
-- Verifies defense-in-depth: callbacks cannot change state after step is no longer started
3+
begin;
4+
select plan(4);
5+
select pgflow_tests.reset_db();
6+
7+
-- Setup: Create flow with map_a (skip on exhaust) and independent 'other' step
8+
select pgflow.create_flow('late_complete_test');
9+
select pgflow.add_step(
10+
flow_slug => 'late_complete_test',
11+
step_slug => 'map_a',
12+
step_type => 'map',
13+
max_attempts => 0,
14+
when_exhausted => 'skip'
15+
);
16+
select pgflow.add_step(
17+
flow_slug => 'late_complete_test',
18+
step_slug => 'other'
19+
);
20+
21+
-- Start flow with 2 array elements for map_a (root map gets array directly)
22+
select run_id as test_run_id from pgflow.start_flow('late_complete_test', '[1, 2]'::jsonb) \gset
23+
24+
-- Ensure worker exists
25+
select pgflow_tests.ensure_worker('late_complete_test') as test_worker_id \gset
26+
27+
-- Start both map_a tasks
28+
select message_id as msg_0 from pgflow.step_tasks
29+
where run_id = :'test_run_id'::uuid and step_slug = 'map_a' and task_index = 0 \gset
30+
31+
select pgflow.start_tasks('late_complete_test', array[:'msg_0'::bigint], :'test_worker_id'::uuid);
32+
33+
select message_id as msg_1 from pgflow.step_tasks
34+
where run_id = :'test_run_id'::uuid and step_slug = 'map_a' and task_index = 1 \gset
35+
36+
select pgflow.start_tasks('late_complete_test', array[:'msg_1'::bigint], :'test_worker_id'::uuid);
37+
38+
-- Fail map_a[0] to trigger skip (max_attempts=0, when_exhausted='skip')
39+
-- This makes the step transition to 'skipped'
40+
select pgflow.fail_task(
41+
:'test_run_id'::uuid,
42+
'map_a',
43+
0,
44+
'Task 0 failed!'
45+
);
46+
47+
-- Verify step became skipped
48+
select is(
49+
(select status from pgflow.step_states
50+
where run_id = :'test_run_id'::uuid and step_slug = 'map_a'),
51+
'skipped',
52+
'Step should be skipped after fail with when_exhausted=skip'
53+
);
54+
55+
-- Capture remaining_steps after skip
56+
select remaining_steps as remaining_steps_after_skip
57+
from pgflow.runs
58+
where run_id = :'test_run_id'::uuid \gset
59+
60+
-- LATE COMPLETE: Try to complete map_a[1] after step is already skipped
61+
-- This should NOT mutate step or run state
62+
select lives_ok(
63+
format($$
64+
select pgflow.complete_task(
65+
'%s'::uuid,
66+
'map_a',
67+
1,
68+
'{"ok":true}'::jsonb
69+
)
70+
$$, :'test_run_id'),
71+
'Late complete should not error'
72+
);
73+
74+
-- Verify step state unchanged (remains skipped)
75+
select is(
76+
(select status from pgflow.step_states
77+
where run_id = :'test_run_id'::uuid and step_slug = 'map_a'),
78+
'skipped',
79+
'Step should remain skipped after late complete'
80+
);
81+
82+
-- Verify remaining_steps unchanged by late complete
83+
select is(
84+
(select remaining_steps from pgflow.runs where run_id = :'test_run_id'::uuid),
85+
:remaining_steps_after_skip,
86+
'remaining_steps should not be decremented by late complete'
87+
);
88+
89+
select * from finish();
90+
rollback;

0 commit comments

Comments
 (0)