Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/add-pgflow-auth-secret-support.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@pgflow/core': patch
'@pgflow/edge-worker': patch
---

Add PGFLOW_AUTH_SECRET support to bypass JWT format mismatch in ensure_workers authentication
8 changes: 6 additions & 2 deletions pkgs/core/schemas/0059_function_ensure_workers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ as $$

-- Get credentials: Local mode uses hardcoded URL, production uses vault secrets
-- Empty strings are treated as NULL using nullif()
-- pgflow_auth_secret takes priority over supabase_service_role_key for production auth
credentials as (
select
case
when (select is_local from env) then null
else nullif((select decrypted_secret from vault.decrypted_secrets where name = 'supabase_service_role_key'), '')
else coalesce(
nullif((select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_auth_secret'), ''),
nullif((select decrypted_secret from vault.decrypted_secrets where name = 'supabase_service_role_key'), '')
)
end as service_role_key,
case
when (select is_local from env) then 'http://kong:8000/functions/v1'
Expand Down Expand Up @@ -105,6 +109,6 @@ comment on function pgflow.ensure_workers() is
In local mode: pings ALL enabled functions (ignores debounce AND alive workers check).
In production mode: only pings functions that pass debounce AND have no alive workers.
Debounce: skips functions pinged within their debounce interval (production only).
Credentials: Uses Vault secrets (supabase_service_role_key, supabase_project_id) or local fallbacks.
Credentials: Uses Vault secrets (pgflow_auth_secret with fallback to supabase_service_role_key, supabase_project_id) or local fallbacks.
URL is built from project_id: https://{project_id}.supabase.co/functions/v1
Returns request_id from pg_net for each HTTP request made.';
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
-- Modify "ensure_workers" function
CREATE OR REPLACE FUNCTION "pgflow"."ensure_workers" () RETURNS TABLE ("function_name" text, "invoked" boolean, "request_id" bigint) LANGUAGE sql AS $$
with
-- Detect environment
env as (
select pgflow.is_local() as is_local
),

-- Get credentials: Local mode uses hardcoded URL, production uses vault secrets
-- Empty strings are treated as NULL using nullif()
-- pgflow_auth_secret takes priority over supabase_service_role_key for production auth
credentials as (
select
case
when (select is_local from env) then null
else coalesce(
nullif((select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_auth_secret'), ''),
nullif((select decrypted_secret from vault.decrypted_secrets where name = 'supabase_service_role_key'), '')
)
end as service_role_key,
case
when (select is_local from env) then 'http://kong:8000/functions/v1'
else (select 'https://' || nullif(decrypted_secret, '') || '.supabase.co/functions/v1' from vault.decrypted_secrets where name = 'supabase_project_id')
end as base_url
),

-- Find functions that pass the debounce check
debounce_passed as (
select wf.function_name, wf.debounce
from pgflow.worker_functions as wf
where wf.enabled = true
and (
wf.last_invoked_at is null
or wf.last_invoked_at < now() - wf.debounce
)
),

-- Find functions that have at least one alive worker
functions_with_alive_workers as (
select distinct w.function_name
from pgflow.workers as w
inner join debounce_passed as dp on w.function_name = dp.function_name
where w.stopped_at is null
and w.deprecated_at is null
and w.last_heartbeat_at > now() - dp.debounce
),

-- Determine which functions should be invoked
-- Local mode: all enabled functions (bypass debounce AND alive workers check)
-- Production mode: only functions that pass debounce AND have no alive workers
functions_to_invoke as (
select wf.function_name
from pgflow.worker_functions as wf
where wf.enabled = true
and (
pgflow.is_local() = true -- Local: all enabled functions
or (
-- Production: debounce + no alive workers
wf.function_name in (select dp.function_name from debounce_passed as dp)
and wf.function_name not in (select faw.function_name from functions_with_alive_workers as faw)
)
)
),

-- Make HTTP requests and capture request_ids
http_requests as (
select
fti.function_name,
net.http_post(
url => c.base_url || '/' || fti.function_name,
headers => case
when e.is_local then '{}'::jsonb
else jsonb_build_object(
'Content-Type', 'application/json',
'Authorization', 'Bearer ' || c.service_role_key
)
end,
body => '{}'::jsonb
) as request_id
from functions_to_invoke as fti
cross join credentials as c
cross join env as e
where c.base_url is not null
and (e.is_local or c.service_role_key is not null)
),

-- Update last_invoked_at for invoked functions
updated as (
update pgflow.worker_functions as wf
set last_invoked_at = clock_timestamp()
from http_requests as hr
where wf.function_name = hr.function_name
returning wf.function_name
)

select u.function_name, true as invoked, hr.request_id
from updated as u
inner join http_requests as hr on u.function_name = hr.function_name
$$;
-- Set comment to function: "ensure_workers"
COMMENT ON FUNCTION "pgflow"."ensure_workers" IS 'Ensures worker functions are running by pinging them via HTTP when needed.
In local mode: pings ALL enabled functions (ignores debounce AND alive workers check).
In production mode: only pings functions that pass debounce AND have no alive workers.
Debounce: skips functions pinged within their debounce interval (production only).
Credentials: Uses Vault secrets (pgflow_auth_secret with fallback to supabase_service_role_key, supabase_project_id) or local fallbacks.
URL is built from project_id: https://{project_id}.supabase.co/functions/v1
Returns request_id from pg_net for each HTTP request made.';
3 changes: 2 additions & 1 deletion pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:dzKOHL+hbunxWTZaGOIDWQG9THDva7Pk7VVDGASJkps=
h1:q21PG0IM91BaFRI7KM/qP5t76ER7If38sCU9TieHenI=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -17,3 +17,4 @@ h1:dzKOHL+hbunxWTZaGOIDWQG9THDva7Pk7VVDGASJkps=
20251225163110_pgflow_add_flow_input_column.sql h1:734uCbTgKmPhTK3TY56uNYZ31T8u59yll9ea7nwtEoc=
20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o=
20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E=
20260124113408_pgflow_auth_secret_support.sql h1:i/s1JkBqRElN6FOYFQviJt685W08SuSo30aP25lNlLc=
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
-- Test: ensure_workers() uses pgflow_auth_secret with fallback to supabase_service_role_key
begin;
select plan(3);
select pgflow_tests.reset_db();

-- Setup: Create project ID secret (needed for all tests)
select vault.create_secret('testproject123', 'supabase_project_id');

-- Setup: Register a worker function
select pgflow.track_worker_function('my-function');

-- Simulate production mode
set local app.settings.jwt_secret = 'production-secret-different-from-local';

-- =============================================================================
-- TEST 1: pgflow_auth_secret takes priority when both are set
-- =============================================================================
select vault.create_secret('pgflow-auth-secret-value', 'pgflow_auth_secret');
select vault.create_secret('legacy-service-role-key', 'supabase_service_role_key');

update pgflow.worker_functions
set last_invoked_at = now() - interval '10 seconds';

select * into temporary test1_result from pgflow.ensure_workers();

select ok(
(select headers->>'Authorization' = 'Bearer pgflow-auth-secret-value'
from net.http_request_queue
where id = (select request_id from test1_result limit 1)),
'pgflow_auth_secret takes priority over supabase_service_role_key'
);

drop table test1_result;

-- Cleanup secrets for next test
delete from vault.secrets where name in ('pgflow_auth_secret', 'supabase_service_role_key');

-- =============================================================================
-- TEST 2: Falls back to supabase_service_role_key when pgflow_auth_secret not set
-- =============================================================================
select vault.create_secret('fallback-service-role-key', 'supabase_service_role_key');

update pgflow.worker_functions
set last_invoked_at = now() - interval '10 seconds';

select * into temporary test2_result from pgflow.ensure_workers();

select ok(
(select headers->>'Authorization' = 'Bearer fallback-service-role-key'
from net.http_request_queue
where id = (select request_id from test2_result limit 1)),
'Falls back to supabase_service_role_key when pgflow_auth_secret not set'
);

drop table test2_result;

-- Cleanup secrets for next test
delete from vault.secrets where name = 'supabase_service_role_key';

-- =============================================================================
-- TEST 3: pgflow_auth_secret works without supabase_service_role_key
-- =============================================================================
select vault.create_secret('standalone-auth-secret', 'pgflow_auth_secret');

update pgflow.worker_functions
set last_invoked_at = now() - interval '10 seconds';

select * into temporary test3_result from pgflow.ensure_workers();

select ok(
(select headers->>'Authorization' = 'Bearer standalone-auth-secret'
from net.http_request_queue
where id = (select request_id from test3_result limit 1)),
'pgflow_auth_secret works without supabase_service_role_key being set'
);

drop table test3_result;

select finish();
rollback;
11 changes: 8 additions & 3 deletions pkgs/edge-worker/src/shared/authValidation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@ export function validateServiceRoleAuth(
}

const authHeader = request.headers.get('Authorization');
const expectedKey = env['SUPABASE_SERVICE_ROLE_KEY'];

// Treat empty string as unset - use PGFLOW_AUTH_SECRET if set and non-empty,
// otherwise fall back to SUPABASE_SERVICE_ROLE_KEY
const authSecret = env['PGFLOW_AUTH_SECRET'];
const serviceRoleKey = env['SUPABASE_SERVICE_ROLE_KEY'];
const expectedKey = (authSecret && authSecret !== '') ? authSecret : serviceRoleKey;

if (!authHeader) {
return { valid: false, error: 'Missing Authorization header' };
}

if (!expectedKey) {
return { valid: false, error: 'Server misconfigured: missing service role key' };
if (!expectedKey || expectedKey === '') {
return { valid: false, error: 'Server misconfigured: missing PGFLOW_AUTH_SECRET or SUPABASE_SERVICE_ROLE_KEY' };
}

const expected = `Bearer ${expectedKey}`;
Expand Down
72 changes: 71 additions & 1 deletion pkgs/edge-worker/tests/unit/shared/authValidation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,19 @@ function productionEnv(serviceRoleKey?: string): Record<string, string | undefin
};
}

function productionEnvWithAuthSecret(
authSecret?: string,
serviceRoleKey?: string
): Record<string, string | undefined> {
return {
SUPABASE_ANON_KEY: 'production-anon-key-abc',
SUPABASE_SERVICE_ROLE_KEY: serviceRoleKey,
PGFLOW_AUTH_SECRET: authSecret,
};
}

const PRODUCTION_SERVICE_ROLE_KEY = 'production-service-role-key-xyz';
const PGFLOW_AUTH_SECRET_VALUE = 'user-controlled-auth-secret-123';

// ============================================================
// validateServiceRoleAuth() - Local mode tests
Expand Down Expand Up @@ -80,7 +92,7 @@ Deno.test('validateServiceRoleAuth - production: accepts request with correct au
Deno.test('validateServiceRoleAuth - production: rejects when service role key not configured', () => {
const request = createRequest('Bearer any-key');
const result = validateServiceRoleAuth(request, productionEnv(undefined));
assertEquals(result, { valid: false, error: 'Server misconfigured: missing service role key' });
assertEquals(result, { valid: false, error: 'Server misconfigured: missing PGFLOW_AUTH_SECRET or SUPABASE_SERVICE_ROLE_KEY' });
});

Deno.test('validateServiceRoleAuth - production: rejects Basic auth scheme', () => {
Expand All @@ -101,6 +113,64 @@ Deno.test('validateServiceRoleAuth - production: rejects auth header without sch
assertEquals(result, { valid: false, error: 'Invalid Authorization header' });
});

// ============================================================
// validateServiceRoleAuth() - PGFLOW_AUTH_SECRET tests
// ============================================================

Deno.test('validateServiceRoleAuth - PGFLOW_AUTH_SECRET: accepts request with auth secret when set', () => {
const request = createRequest(`Bearer ${PGFLOW_AUTH_SECRET_VALUE}`);
const result = validateServiceRoleAuth(
request,
productionEnvWithAuthSecret(PGFLOW_AUTH_SECRET_VALUE, PRODUCTION_SERVICE_ROLE_KEY)
);
assertEquals(result, { valid: true });
});

Deno.test('validateServiceRoleAuth - PGFLOW_AUTH_SECRET: rejects service role key when auth secret is set', () => {
const request = createRequest(`Bearer ${PRODUCTION_SERVICE_ROLE_KEY}`);
const result = validateServiceRoleAuth(
request,
productionEnvWithAuthSecret(PGFLOW_AUTH_SECRET_VALUE, PRODUCTION_SERVICE_ROLE_KEY)
);
assertEquals(result, { valid: false, error: 'Invalid Authorization header' });
});

Deno.test('validateServiceRoleAuth - PGFLOW_AUTH_SECRET: falls back to service role key when auth secret not set', () => {
const request = createRequest(`Bearer ${PRODUCTION_SERVICE_ROLE_KEY}`);
const result = validateServiceRoleAuth(
request,
productionEnvWithAuthSecret(undefined, PRODUCTION_SERVICE_ROLE_KEY)
);
assertEquals(result, { valid: true });
});

Deno.test('validateServiceRoleAuth - PGFLOW_AUTH_SECRET: works without service role key when auth secret is set', () => {
const request = createRequest(`Bearer ${PGFLOW_AUTH_SECRET_VALUE}`);
const result = validateServiceRoleAuth(
request,
productionEnvWithAuthSecret(PGFLOW_AUTH_SECRET_VALUE, undefined)
);
assertEquals(result, { valid: true });
});

Deno.test('validateServiceRoleAuth - PGFLOW_AUTH_SECRET: returns error when neither key is set', () => {
const request = createRequest('Bearer any-key');
const result = validateServiceRoleAuth(
request,
productionEnvWithAuthSecret(undefined, undefined)
);
assertEquals(result, { valid: false, error: 'Server misconfigured: missing PGFLOW_AUTH_SECRET or SUPABASE_SERVICE_ROLE_KEY' });
});

Deno.test('validateServiceRoleAuth - PGFLOW_AUTH_SECRET: treats empty string as unset, falls back to service role key', () => {
const request = createRequest(`Bearer ${PRODUCTION_SERVICE_ROLE_KEY}`);
const result = validateServiceRoleAuth(
request,
productionEnvWithAuthSecret('', PRODUCTION_SERVICE_ROLE_KEY) // Empty string
);
assertEquals(result, { valid: true });
});

// ============================================================
// createUnauthorizedResponse() tests
// ============================================================
Expand Down
Loading