@@ -412,6 +412,16 @@ BEGIN
412412 false
413413 ) as _broadcast_result
414414 ),
415+ -- ---------- Archive queued/started task messages for skipped steps ----------
416+ archived_messages AS (
417+ SELECT pgmq .archive (v_flow_slug, ARRAY_AGG(st .message_id )) as result
418+ FROM pgflow .step_tasks st
419+ WHERE st .run_id = _cascade_force_skip_steps .run_id
420+ AND st .step_slug IN (SELECT sk .step_slug FROM skipped sk)
421+ AND st .status IN (' queued' , ' started' )
422+ AND st .message_id IS NOT NULL
423+ HAVING COUNT (st .message_id ) > 0
424+ ),
415425 -- ---------- Update run counters ----------
416426 run_updates AS (
417427 UPDATE pgflow .runs r
@@ -422,6 +432,18 @@ BEGIN
422432 )
423433 SELECT COUNT (* ) INTO v_total_skipped FROM skipped;
424434
435+ -- Archive queued/started task messages for all steps that were just skipped
436+ -- (query step_states since CTE state is no longer accessible)
437+ PERFORM pgmq .archive (v_flow_slug, ARRAY_AGG(st .message_id ))
438+ FROM pgflow .step_tasks st
439+ JOIN pgflow .step_states ss ON ss .run_id = st .run_id AND ss .step_slug = st .step_slug
440+ WHERE st .run_id = _cascade_force_skip_steps .run_id
441+ AND st .status IN (' queued' , ' started' )
442+ AND st .message_id IS NOT NULL
443+ AND ss .status = ' skipped'
444+ AND ss .skipped_at >= now() - interval ' 1 second' -- Only recently skipped
445+ HAVING COUNT (st .message_id ) > 0 ;
446+
425447 RETURN v_total_skipped;
426448END;
427449$$;
@@ -890,6 +912,30 @@ WHERE pgflow.step_states.run_id = complete_task.run_id
890912 AND pgflow .step_states .step_slug = complete_task .step_slug
891913FOR UPDATE ;
892914
915+ -- ==========================================
916+ -- GUARD: Late callback - step not started
917+ -- ==========================================
918+ -- If the step is not in 'started' state, this is a late callback.
919+ -- Do not mutate step_states or runs, archive message, return task row.
920+ IF v_step_record .status != ' started' THEN
921+ -- Archive the task message if present (prevents stuck work)
922+ PERFORM pgmq .archive (
923+ v_run_record .flow_slug ,
924+ st .message_id
925+ )
926+ FROM pgflow .step_tasks st
927+ WHERE st .run_id = complete_task .run_id
928+ AND st .step_slug = complete_task .step_slug
929+ AND st .task_index = complete_task .task_index
930+ AND st .message_id IS NOT NULL ;
931+ -- Return the current task row without any mutations
932+ RETURN QUERY SELECT * FROM pgflow .step_tasks
933+ WHERE pgflow .step_tasks .run_id = complete_task .run_id
934+ AND pgflow .step_tasks .step_slug = complete_task .step_slug
935+ AND pgflow .step_tasks .task_index = complete_task .task_index ;
936+ RETURN;
937+ END IF;
938+
893939-- Check for type violations AFTER acquiring locks
894940SELECT child_step .step_slug INTO v_dependent_map_slug
895941FROM pgflow .deps dependency
@@ -1246,8 +1292,10 @@ DECLARE
12461292 v_step_failed boolean ;
12471293 v_step_skipped boolean ;
12481294 v_when_exhausted text ;
1249- v_task_exhausted boolean ; -- True if task has exhausted retries
1250- v_flow_slug_for_deps text ; -- Used for decrementing remaining_deps on plain skip
1295+ v_task_exhausted boolean ;
1296+ v_flow_slug_for_deps text ;
1297+ v_prev_step_status text ;
1298+ v_flow_slug text ;
12511299begin
12521300
12531301-- If run is already failed, no retries allowed
@@ -1279,6 +1327,34 @@ IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id
12791327 RETURN;
12801328END IF;
12811329
1330+ -- Late callback guard: if step is not 'started', don't mutate step/run state
1331+ -- Capture previous status BEFORE any CTE updates (for transition-based decrement)
1332+ SELECT ss .status INTO v_prev_step_status
1333+ FROM pgflow .step_states ss
1334+ WHERE ss .run_id = fail_task .run_id
1335+ AND ss .step_slug = fail_task .step_slug ;
1336+
1337+ IF v_prev_step_status IS NOT NULL AND v_prev_step_status != ' started' THEN
1338+ -- Archive the task message if present
1339+ SELECT r .flow_slug INTO v_flow_slug
1340+ FROM pgflow .runs r
1341+ WHERE r .run_id = fail_task .run_id ;
1342+
1343+ PERFORM pgmq .archive (v_flow_slug, ARRAY_AGG(st .message_id ))
1344+ FROM pgflow .step_tasks st
1345+ WHERE st .run_id = fail_task .run_id
1346+ AND st .step_slug = fail_task .step_slug
1347+ AND st .task_index = fail_task .task_index
1348+ AND st .message_id IS NOT NULL
1349+ HAVING COUNT (st .message_id ) > 0 ;
1350+
1351+ RETURN QUERY SELECT * FROM pgflow .step_tasks
1352+ WHERE pgflow .step_tasks .run_id = fail_task .run_id
1353+ AND pgflow .step_tasks .step_slug = fail_task .step_slug
1354+ AND pgflow .step_tasks .task_index = fail_task .task_index ;
1355+ RETURN;
1356+ END IF;
1357+
12821358WITH run_lock AS (
12831359 SELECT * FROM pgflow .runs
12841360 WHERE pgflow .runs .run_id = fail_task .run_id
@@ -1384,9 +1460,13 @@ run_update AS (
13841460 WHEN (select status from maybe_fail_step) = ' failed' THEN now()
13851461 ELSE NULL
13861462 END,
1387- -- Decrement remaining_steps when step was skipped (not failed, run continues)
1463+ -- Decrement remaining_steps only on FIRST transition to skipped
1464+ -- (not when step was already skipped and a second task fails)
1465+ -- Uses PL/pgSQL variable captured before CTE chain
13881466 remaining_steps = CASE
1389- WHEN (select status from maybe_fail_step) = ' skipped' THEN pgflow .runs .remaining_steps - 1
1467+ WHEN (select status from maybe_fail_step) = ' skipped'
1468+ AND v_prev_step_status != ' skipped'
1469+ THEN pgflow .runs .remaining_steps - 1
13901470 ELSE pgflow .runs .remaining_steps
13911471 END
13921472 WHERE pgflow .runs .run_id = fail_task .run_id
@@ -1425,6 +1505,17 @@ END IF;
14251505
14261506-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade')
14271507 IF v_task_exhausted AND v_step_skipped THEN
1508+ -- Archive all queued/started sibling task messages for this step
1509+ PERFORM pgmq .archive (r .flow_slug , ARRAY_AGG(st .message_id ))
1510+ FROM pgflow .step_tasks st
1511+ JOIN pgflow .runs r ON st .run_id = r .run_id
1512+ WHERE st .run_id = fail_task .run_id
1513+ AND st .step_slug = fail_task .step_slug
1514+ AND st .status IN (' queued' , ' started' )
1515+ AND st .message_id IS NOT NULL
1516+ GROUP BY r .flow_slug
1517+ HAVING COUNT (st .message_id ) > 0 ;
1518+
14281519 -- Send broadcast event for step skipped
14291520 PERFORM realtime .send (
14301521 jsonb_build_object(
@@ -1717,8 +1808,14 @@ with tasks as (
17171808 where task .flow_slug = start_tasks .flow_slug
17181809 and task .message_id = any(msg_ids)
17191810 and task .status = ' queued'
1720- -- MVP: Don't start tasks on failed runs
1721- and r .status != ' failed'
1811+ and r .status = ' started'
1812+ and exists (
1813+ select 1
1814+ from pgflow .step_states ss
1815+ where ss .run_id = task .run_id
1816+ and ss .step_slug = task .step_slug
1817+ and ss .status = ' started'
1818+ )
17221819 ),
17231820 start_tasks_update as (
17241821 update pgflow .step_tasks
0 commit comments