Skip to content

Commit 6eea9d1

Browse files
committed
implement ifNot negative condition pattern
1 parent bb09b62 commit 6eea9d1

28 files changed

Lines changed: 1467 additions & 365 deletions

pkgs/core/schemas/0050_tables_definitions.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ create table pgflow.steps (
2424
opt_base_delay int,
2525
opt_timeout int,
2626
opt_start_delay int,
27-
condition_pattern jsonb, -- JSON pattern for @> containment check
27+
condition_pattern jsonb, -- JSON pattern for @> containment check (if)
28+
condition_not_pattern jsonb, -- JSON pattern for NOT @> containment check (ifNot)
2829
when_unmet text not null default 'skip', -- What to do when condition not met (skip is natural default)
2930
when_failed text not null default 'fail', -- What to do when handler fails after retries
3031
created_at timestamptz not null default now(),

pkgs/core/schemas/0100_function_add_step.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ create or replace function pgflow.add_step(
88
start_delay int default null,
99
step_type text default 'single',
1010
condition_pattern jsonb default null,
11+
condition_not_pattern jsonb default null,
1112
when_unmet text default 'skip',
1213
when_failed text default 'fail'
1314
)
@@ -40,7 +41,7 @@ BEGIN
4041
INSERT INTO pgflow.steps (
4142
flow_slug, step_slug, step_type, step_index, deps_count,
4243
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay,
43-
condition_pattern, when_unmet, when_failed
44+
condition_pattern, condition_not_pattern, when_unmet, when_failed
4445
)
4546
VALUES (
4647
add_step.flow_slug,
@@ -53,6 +54,7 @@ BEGIN
5354
add_step.timeout,
5455
add_step.start_delay,
5556
add_step.condition_pattern,
57+
add_step.condition_not_pattern,
5658
add_step.when_unmet,
5759
add_step.when_failed
5860
)

pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,15 @@ BEGIN
4646
-- PHASE 1a: CHECK FOR FAIL CONDITIONS
4747
-- ==========================================
4848
-- Find first step (by topological order) with unmet condition and 'fail' mode.
49+
-- Condition is unmet when:
50+
-- (condition_pattern is set AND input does NOT contain it) OR
51+
-- (condition_not_pattern is set AND input DOES contain it)
4952
WITH steps_with_conditions AS (
5053
SELECT
5154
step_state.flow_slug,
5255
step_state.step_slug,
5356
step.condition_pattern,
57+
step.condition_not_pattern,
5458
step.when_unmet,
5559
step.deps_count,
5660
step.step_index
@@ -61,7 +65,7 @@ BEGIN
6165
WHERE step_state.run_id = cascade_resolve_conditions.run_id
6266
AND step_state.status = 'created'
6367
AND step_state.remaining_deps = 0
64-
AND step.condition_pattern IS NOT NULL
68+
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
6569
),
6670
step_deps_output AS (
6771
SELECT
@@ -79,26 +83,32 @@ BEGIN
7983
condition_evaluations AS (
8084
SELECT
8185
swc.*,
82-
CASE
83-
WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern
84-
ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern
85-
END AS condition_met
86+
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END AS eval_input,
87+
-- condition_met = (if IS NULL OR input @> if) AND (ifNot IS NULL OR NOT(input @> ifNot))
88+
(swc.condition_pattern IS NULL OR
89+
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_pattern)
90+
AND
91+
(swc.condition_not_pattern IS NULL OR
92+
NOT (CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_not_pattern))
93+
AS condition_met
8694
FROM steps_with_conditions swc
8795
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
8896
)
89-
SELECT flow_slug, step_slug, condition_pattern
97+
SELECT flow_slug, step_slug, condition_pattern, condition_not_pattern
9098
INTO v_first_fail
9199
FROM condition_evaluations
92100
WHERE NOT condition_met AND when_unmet = 'fail'
93101
ORDER BY step_index
94102
LIMIT 1;
95103

96104
-- Handle fail mode: fail step and run, return false
97-
IF v_first_fail IS NOT NULL THEN
105+
-- Note: Cannot use "v_first_fail IS NOT NULL" because records with NULL fields
106+
-- evaluate to NULL in IS NOT NULL checks. Use FOUND instead.
107+
IF FOUND THEN
98108
UPDATE pgflow.step_states
99109
SET status = 'failed',
100110
failed_at = now(),
101-
error_message = 'Condition not met: ' || v_first_fail.condition_pattern::text
111+
error_message = 'Condition not met'
102112
WHERE pgflow.step_states.run_id = cascade_resolve_conditions.run_id
103113
AND pgflow.step_states.step_slug = v_first_fail.step_slug;
104114

@@ -114,12 +124,13 @@ BEGIN
114124
-- PHASE 1b: HANDLE SKIP CONDITIONS (with propagation)
115125
-- ==========================================
116126
-- Skip steps with unmet conditions and whenUnmet='skip'.
117-
-- NEW: Also decrement remaining_deps on dependents and set initial_tasks=0 for map dependents.
127+
-- Also decrement remaining_deps on dependents and set initial_tasks=0 for map dependents.
118128
WITH steps_with_conditions AS (
119129
SELECT
120130
step_state.flow_slug,
121131
step_state.step_slug,
122132
step.condition_pattern,
133+
step.condition_not_pattern,
123134
step.when_unmet,
124135
step.deps_count,
125136
step.step_index
@@ -130,7 +141,7 @@ BEGIN
130141
WHERE step_state.run_id = cascade_resolve_conditions.run_id
131142
AND step_state.status = 'created'
132143
AND step_state.remaining_deps = 0
133-
AND step.condition_pattern IS NOT NULL
144+
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
134145
),
135146
step_deps_output AS (
136147
SELECT
@@ -148,10 +159,13 @@ BEGIN
148159
condition_evaluations AS (
149160
SELECT
150161
swc.*,
151-
CASE
152-
WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern
153-
ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern
154-
END AS condition_met
162+
-- condition_met = (if IS NULL OR input @> if) AND (ifNot IS NULL OR NOT(input @> ifNot))
163+
(swc.condition_pattern IS NULL OR
164+
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_pattern)
165+
AND
166+
(swc.condition_not_pattern IS NULL OR
167+
NOT (CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_not_pattern))
168+
AS condition_met
155169
FROM steps_with_conditions swc
156170
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
157171
),
@@ -231,13 +245,15 @@ BEGIN
231245
WHERE ready_step.run_id = cascade_resolve_conditions.run_id
232246
AND ready_step.status = 'created'
233247
AND ready_step.remaining_deps = 0
234-
AND step.condition_pattern IS NOT NULL
248+
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
235249
AND step.when_unmet = 'skip-cascade'
250+
-- Condition is NOT met when: (if fails) OR (ifNot fails)
236251
AND NOT (
237-
CASE
238-
WHEN step.deps_count = 0 THEN v_run_input @> step.condition_pattern
239-
ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) @> step.condition_pattern
240-
END
252+
(step.condition_pattern IS NULL OR
253+
CASE WHEN step.deps_count = 0 THEN v_run_input ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) END @> step.condition_pattern)
254+
AND
255+
(step.condition_not_pattern IS NULL OR
256+
NOT (CASE WHEN step.deps_count = 0 THEN v_run_input ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) END @> step.condition_not_pattern))
241257
)
242258
ORDER BY step.step_index;
243259

pkgs/core/src/database-types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ export type Database = {
278278
}
279279
steps: {
280280
Row: {
281+
condition_not_pattern: Json | null
281282
condition_pattern: Json | null
282283
created_at: string
283284
deps_count: number
@@ -293,6 +294,7 @@ export type Database = {
293294
when_unmet: string
294295
}
295296
Insert: {
297+
condition_not_pattern?: Json | null
296298
condition_pattern?: Json | null
297299
created_at?: string
298300
deps_count?: number
@@ -308,6 +310,7 @@ export type Database = {
308310
when_unmet?: string
309311
}
310312
Update: {
313+
condition_not_pattern?: Json | null
311314
condition_pattern?: Json | null
312315
created_at?: string
313316
deps_count?: number
@@ -410,6 +413,7 @@ export type Database = {
410413
add_step: {
411414
Args: {
412415
base_delay?: number
416+
condition_not_pattern?: Json
413417
condition_pattern?: Json
414418
deps_slugs?: string[]
415419
flow_slug: string
@@ -422,6 +426,7 @@ export type Database = {
422426
when_unmet?: string
423427
}
424428
Returns: {
429+
condition_not_pattern: Json | null
425430
condition_pattern: Json | null
426431
created_at: string
427432
deps_count: number

pkgs/core/supabase/migrations/20260105214940_pgflow_step_conditions.sql renamed to pkgs/core/supabase/migrations/20260108115842_pgflow_step_conditions.sql

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ END) <= 1), ADD CONSTRAINT "skip_reason_matches_status" CHECK (((status = 'skipp
1515
-- Create index "idx_step_states_skipped" to table: "step_states"
1616
CREATE INDEX "idx_step_states_skipped" ON "pgflow"."step_states" ("run_id", "step_slug") WHERE (status = 'skipped'::text);
1717
-- Modify "steps" table
18-
ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "when_failed_is_valid" CHECK (when_failed = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "condition_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_failed" text NOT NULL DEFAULT 'fail';
18+
ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "when_failed_is_valid" CHECK (when_failed = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "condition_pattern" jsonb NULL, ADD COLUMN "condition_not_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_failed" text NOT NULL DEFAULT 'fail';
1919
-- Create "_cascade_force_skip_steps" function
2020
CREATE FUNCTION "pgflow"."_cascade_force_skip_steps" ("run_id" uuid, "step_slug" text, "skip_reason" text) RETURNS integer LANGUAGE plpgsql AS $$
2121
DECLARE
@@ -151,11 +151,15 @@ BEGIN
151151
-- PHASE 1a: CHECK FOR FAIL CONDITIONS
152152
-- ==========================================
153153
-- Find first step (by topological order) with unmet condition and 'fail' mode.
154+
-- Condition is unmet when:
155+
-- (condition_pattern is set AND input does NOT contain it) OR
156+
-- (condition_not_pattern is set AND input DOES contain it)
154157
WITH steps_with_conditions AS (
155158
SELECT
156159
step_state.flow_slug,
157160
step_state.step_slug,
158161
step.condition_pattern,
162+
step.condition_not_pattern,
159163
step.when_unmet,
160164
step.deps_count,
161165
step.step_index
@@ -166,7 +170,7 @@ BEGIN
166170
WHERE step_state.run_id = cascade_resolve_conditions.run_id
167171
AND step_state.status = 'created'
168172
AND step_state.remaining_deps = 0
169-
AND step.condition_pattern IS NOT NULL
173+
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
170174
),
171175
step_deps_output AS (
172176
SELECT
@@ -184,26 +188,32 @@ BEGIN
184188
condition_evaluations AS (
185189
SELECT
186190
swc.*,
187-
CASE
188-
WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern
189-
ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern
190-
END AS condition_met
191+
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END AS eval_input,
192+
-- condition_met = (if IS NULL OR input @> if) AND (ifNot IS NULL OR NOT(input @> ifNot))
193+
(swc.condition_pattern IS NULL OR
194+
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_pattern)
195+
AND
196+
(swc.condition_not_pattern IS NULL OR
197+
NOT (CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_not_pattern))
198+
AS condition_met
191199
FROM steps_with_conditions swc
192200
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
193201
)
194-
SELECT flow_slug, step_slug, condition_pattern
202+
SELECT flow_slug, step_slug, condition_pattern, condition_not_pattern
195203
INTO v_first_fail
196204
FROM condition_evaluations
197205
WHERE NOT condition_met AND when_unmet = 'fail'
198206
ORDER BY step_index
199207
LIMIT 1;
200208

201209
-- Handle fail mode: fail step and run, return false
202-
IF v_first_fail IS NOT NULL THEN
210+
-- Note: Cannot use "v_first_fail IS NOT NULL" because records with NULL fields
211+
-- evaluate to NULL in IS NOT NULL checks. Use FOUND instead.
212+
IF FOUND THEN
203213
UPDATE pgflow.step_states
204214
SET status = 'failed',
205215
failed_at = now(),
206-
error_message = 'Condition not met: ' || v_first_fail.condition_pattern::text
216+
error_message = 'Condition not met'
207217
WHERE pgflow.step_states.run_id = cascade_resolve_conditions.run_id
208218
AND pgflow.step_states.step_slug = v_first_fail.step_slug;
209219

@@ -219,12 +229,13 @@ BEGIN
219229
-- PHASE 1b: HANDLE SKIP CONDITIONS (with propagation)
220230
-- ==========================================
221231
-- Skip steps with unmet conditions and whenUnmet='skip'.
222-
-- NEW: Also decrement remaining_deps on dependents and set initial_tasks=0 for map dependents.
232+
-- Also decrement remaining_deps on dependents and set initial_tasks=0 for map dependents.
223233
WITH steps_with_conditions AS (
224234
SELECT
225235
step_state.flow_slug,
226236
step_state.step_slug,
227237
step.condition_pattern,
238+
step.condition_not_pattern,
228239
step.when_unmet,
229240
step.deps_count,
230241
step.step_index
@@ -235,7 +246,7 @@ BEGIN
235246
WHERE step_state.run_id = cascade_resolve_conditions.run_id
236247
AND step_state.status = 'created'
237248
AND step_state.remaining_deps = 0
238-
AND step.condition_pattern IS NOT NULL
249+
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
239250
),
240251
step_deps_output AS (
241252
SELECT
@@ -253,10 +264,13 @@ BEGIN
253264
condition_evaluations AS (
254265
SELECT
255266
swc.*,
256-
CASE
257-
WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern
258-
ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern
259-
END AS condition_met
267+
-- condition_met = (if IS NULL OR input @> if) AND (ifNot IS NULL OR NOT(input @> ifNot))
268+
(swc.condition_pattern IS NULL OR
269+
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_pattern)
270+
AND
271+
(swc.condition_not_pattern IS NULL OR
272+
NOT (CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_not_pattern))
273+
AS condition_met
260274
FROM steps_with_conditions swc
261275
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
262276
),
@@ -336,13 +350,15 @@ BEGIN
336350
WHERE ready_step.run_id = cascade_resolve_conditions.run_id
337351
AND ready_step.status = 'created'
338352
AND ready_step.remaining_deps = 0
339-
AND step.condition_pattern IS NOT NULL
353+
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
340354
AND step.when_unmet = 'skip-cascade'
355+
-- Condition is NOT met when: (if fails) OR (ifNot fails)
341356
AND NOT (
342-
CASE
343-
WHEN step.deps_count = 0 THEN v_run_input @> step.condition_pattern
344-
ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) @> step.condition_pattern
345-
END
357+
(step.condition_pattern IS NULL OR
358+
CASE WHEN step.deps_count = 0 THEN v_run_input ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) END @> step.condition_pattern)
359+
AND
360+
(step.condition_not_pattern IS NULL OR
361+
NOT (CASE WHEN step.deps_count = 0 THEN v_run_input ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) END @> step.condition_not_pattern))
346362
)
347363
ORDER BY step.step_index;
348364

@@ -1440,7 +1456,7 @@ with tasks as (
14401456
dep_out.step_slug = st.step_slug
14411457
$$;
14421458
-- Create "add_step" function
1443-
CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single', "condition_pattern" jsonb DEFAULT NULL::jsonb, "when_unmet" text DEFAULT 'skip', "when_failed" text DEFAULT 'fail') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$
1459+
CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single', "condition_pattern" jsonb DEFAULT NULL::jsonb, "condition_not_pattern" jsonb DEFAULT NULL::jsonb, "when_unmet" text DEFAULT 'skip', "when_failed" text DEFAULT 'fail') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$
14441460
DECLARE
14451461
result_step pgflow.steps;
14461462
next_idx int;
@@ -1465,7 +1481,7 @@ BEGIN
14651481
INSERT INTO pgflow.steps (
14661482
flow_slug, step_slug, step_type, step_index, deps_count,
14671483
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay,
1468-
condition_pattern, when_unmet, when_failed
1484+
condition_pattern, condition_not_pattern, when_unmet, when_failed
14691485
)
14701486
VALUES (
14711487
add_step.flow_slug,
@@ -1478,6 +1494,7 @@ BEGIN
14781494
add_step.timeout,
14791495
add_step.start_delay,
14801496
add_step.condition_pattern,
1497+
add_step.condition_not_pattern,
14811498
add_step.when_unmet,
14821499
add_step.when_failed
14831500
)

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:YiBO80ZA6oQ84E10ZabIvo3OS/XglHkEmBn1Rp5Iay4=
1+
h1:HlB2SJOWJkUvO0KcH0pQcrMeKEvDDfS62oeaIa3JpJ4=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -16,4 +16,4 @@ h1:YiBO80ZA6oQ84E10ZabIvo3OS/XglHkEmBn1Rp5Iay4=
1616
20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ=
1717
20251225163110_pgflow_add_flow_input_column.sql h1:734uCbTgKmPhTK3TY56uNYZ31T8u59yll9ea7nwtEoc=
1818
20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o=
19-
20260105214940_pgflow_step_conditions.sql h1:DIta8qrr+qRvA9aFCdWefk72qp27mcPvGGlAJswmitw=
19+
20260108115842_pgflow_step_conditions.sql h1:smH8NsUQ3NHzJUOCge4zYuRwbRhGeIJXUr9+09lXx3I=

0 commit comments

Comments
 (0)