diff --git a/graphile/graphile-cache/package.json b/graphile/graphile-cache/package.json index 0e253f814d..a51ac4f718 100644 --- a/graphile/graphile-cache/package.json +++ b/graphile/graphile-cache/package.json @@ -31,6 +31,7 @@ "dependencies": { "@pgpmjs/logger": "workspace:^", "express": "^5.2.1", + "graphile-realtime-subscriptions": "workspace:^", "grafserv": "1.0.0", "lru-cache": "^11.2.7", "pg-cache": "workspace:^", diff --git a/graphile/graphile-cache/src/create-instance.ts b/graphile/graphile-cache/src/create-instance.ts index 35bf6a519a..fc4c625ae2 100644 --- a/graphile/graphile-cache/src/create-instance.ts +++ b/graphile/graphile-cache/src/create-instance.ts @@ -1,12 +1,21 @@ import { createServer } from 'node:http'; +import { Logger } from '@pgpmjs/logger'; import express from 'express'; import { postgraphile } from 'postgraphile'; import { grafserv } from 'grafserv/express/v4'; import type { GraphileCacheEntry } from './graphile-cache'; +const log = new Logger('graphile-cache:create'); + interface GraphileInstanceOptions { preset: any; cacheKey: string; + /** + * When true, a RealtimeManager is created and started alongside the + * PostGraphile instance. The pool is extracted from the preset's + * pgServices (managed by pg-cache) rather than passed separately. + */ + enableRealtime?: boolean; } /** @@ -18,11 +27,17 @@ interface GraphileInstanceOptions { * * Callers are responsible for building the `GraphileConfig.Preset` (including * pgServices, grafserv options, grafast context, etc.) before passing it here. + * + * When `enableRealtime` is true, a RealtimeManager is created that bridges + * cursor-tracked events from `drain_changes()` into the PostGraphile + * instance's PgSubscriber EventEmitter. Both `pgSubscriber` and the pg + * pool are extracted from the resolved preset's pgServices — no separate + * pool parameter is needed. */ export const createGraphileInstance = async ( opts: GraphileInstanceOptions ): Promise => { - const { preset, cacheKey } = opts; + const { preset, cacheKey, enableRealtime = false } = opts; const pgl = postgraphile(preset); const serv = pgl.createServ(grafserv); @@ -32,7 +47,7 @@ export const createGraphileInstance = async ( await serv.addTo(handler, httpServer); await serv.ready(); - return { + const entry: GraphileCacheEntry = { pgl, serv, handler, @@ -40,4 +55,39 @@ export const createGraphileInstance = async ( cacheKey, createdAt: Date.now(), }; + + if (enableRealtime) { + try { + const { RealtimeManager } = await import('graphile-realtime-subscriptions'); + + // Extract PgSubscriber and pool from the resolved preset's pgServices. + // The pool is the same instance managed by pg-cache (via getPgPool) + // and threaded into the preset by makePgService({ pool, schemas }). + const resolvedPreset = pgl.getResolvedPreset(); + const pgService = (resolvedPreset as any).pgServices?.[0]; + const pgSubscriber = pgService?.pgSubscriber ?? null; + const pool = pgService?.adaptorSettings?.pool ?? null; + + if (!pgSubscriber) { + log.warn(`PostGraphile[${cacheKey}] has no pgSubscriber — RealtimeManager will not be started`); + } else if (!pool) { + log.warn(`PostGraphile[${cacheKey}] has no pool in pgService — RealtimeManager will not be started`); + } else { + const manager = new RealtimeManager({ + pgSubscriber, + pool, + nodeId: `graphile-cache:${cacheKey}`, + schema: 'realtime_public', + }); + + await manager.start(); + entry.realtimeManager = manager; + log.info(`RealtimeManager started for PostGraphile[${cacheKey}]`); + } + } catch (err) { + log.error(`Failed to start RealtimeManager for PostGraphile[${cacheKey}]:`, err); + } + } + + return entry; }; diff --git a/graphile/graphile-cache/src/graphile-cache.ts b/graphile/graphile-cache/src/graphile-cache.ts index 139e804312..90f2abb995 100644 --- a/graphile/graphile-cache/src/graphile-cache.ts +++ b/graphile/graphile-cache/src/graphile-cache.ts @@ -86,6 +86,8 @@ export interface GraphileCacheEntry { httpServer: HttpServer; cacheKey: string; createdAt: number; + /** Optional RealtimeManager for cursor-tracked subscription delivery */ + realtimeManager?: { stop(): Promise } | null; } // Track disposed entries to prevent double-disposal @@ -119,6 +121,14 @@ const disposeEntry = async (entry: GraphileCacheEntry, key: string): Promise resolve()); }); } + // Stop RealtimeManager if present (before releasing PostGraphile) + if (entry.realtimeManager) { + try { + await entry.realtimeManager.stop(); + } catch (err) { + log.error(`Error stopping RealtimeManager for PostGraphile[${key}]:`, err); + } + } // Release PostGraphile instance (this also releases grafserv internally) if (entry.pgl) { await entry.pgl.release(); diff --git a/graphile/graphile-realtime-subscriptions/__tests__/cursor-tracker.test.ts b/graphile/graphile-realtime-subscriptions/__tests__/cursor-tracker.test.ts index dbfaf5865e..b16bd2d95d 100644 --- a/graphile/graphile-realtime-subscriptions/__tests__/cursor-tracker.test.ts +++ b/graphile/graphile-realtime-subscriptions/__tests__/cursor-tracker.test.ts @@ -26,23 +26,16 @@ import { DEFAULT_BATCH_LIMIT, DEFAULT_SCHEMA, } from '../src/cursor-tracker'; -import type { PgClient, WithPgClient, ChangeLogEntry } from '../src/types'; +import type { Queryable, ChangeLogEntry } from '../src/types'; // --- Test helpers --- -function createMockClient(queryResult: { rows: any[] } = { rows: [] }): PgClient { +function createMockPool(queryResult: { rows: any[] } = { rows: [] }): jest.Mocked { return { query: jest.fn().mockResolvedValue(queryResult), }; } -function createMockWithPgClient(client?: PgClient): WithPgClient { - const mockClient = client ?? createMockClient(); - return async (callback: (c: PgClient) => Promise): Promise => { - return callback(mockClient); - }; -} - function createChangeLogEntry(overrides: Partial = {}): ChangeLogEntry { return { id: 'change-1', @@ -70,7 +63,7 @@ describe('CursorTracker defaults', () => { it('generates a nodeId when not provided', () => { const tracker = new CursorTracker({ - withPgClient: createMockWithPgClient(), + pool: createMockPool(), }); expect(tracker.nodeId).toBeDefined(); @@ -81,7 +74,7 @@ describe('CursorTracker defaults', () => { it('uses provided nodeId', () => { const tracker = new CursorTracker({ nodeId: 'my-node-42', - withPgClient: createMockWithPgClient(), + pool: createMockPool(), }); expect(tracker.nodeId).toBe('my-node-42'); @@ -98,15 +91,15 @@ describe('CursorTracker.start()', () => { }); it('calls touch_listener on start', async () => { - const mockClient = createMockClient(); + const mockPool = createMockPool(); const tracker = new CursorTracker({ nodeId: 'test-node', - withPgClient: createMockWithPgClient(mockClient), + pool: mockPool, }); await tracker.start(); - expect(mockClient.query).toHaveBeenCalledWith( + expect(mockPool.query).toHaveBeenCalledWith( expect.stringContaining('touch_listener'), ['test-node'], ); @@ -115,15 +108,15 @@ describe('CursorTracker.start()', () => { }); it('calls drain_changes immediately after start', async () => { - const mockClient = createMockClient(); + const mockPool = createMockPool(); const tracker = new CursorTracker({ nodeId: 'test-node', - withPgClient: createMockWithPgClient(mockClient), + pool: mockPool, }); await tracker.start(); - const calls = (mockClient.query as jest.Mock).mock.calls; + const calls = (mockPool.query as jest.Mock).mock.calls; const drainCalls = calls.filter((c: any[]) => c[0].includes('drain_changes')); expect(drainCalls.length).toBeGreaterThanOrEqual(1); @@ -132,7 +125,7 @@ describe('CursorTracker.start()', () => { it('sets isRunning to true', async () => { const tracker = new CursorTracker({ - withPgClient: createMockWithPgClient(), + pool: createMockPool(), }); expect(tracker.isRunning).toBe(false); @@ -143,17 +136,17 @@ describe('CursorTracker.start()', () => { }); it('is idempotent (calling start twice does not double-register)', async () => { - const mockClient = createMockClient(); + const mockPool = createMockPool(); const tracker = new CursorTracker({ nodeId: 'test-node', - withPgClient: createMockWithPgClient(mockClient), + pool: mockPool, }); await tracker.start(); - const callCountAfterFirst = (mockClient.query as jest.Mock).mock.calls.length; + const callCountAfterFirst = (mockPool.query as jest.Mock).mock.calls.length; await tracker.start(); - const callCountAfterSecond = (mockClient.query as jest.Mock).mock.calls.length; + const callCountAfterSecond = (mockPool.query as jest.Mock).mock.calls.length; expect(callCountAfterSecond).toBe(callCountAfterFirst); @@ -171,18 +164,18 @@ describe('CursorTracker.stop()', () => { }); it('calls cleanup_ephemeral on stop', async () => { - const mockClient = createMockClient(); + const mockPool = createMockPool(); const tracker = new CursorTracker({ nodeId: 'test-node', - withPgClient: createMockWithPgClient(mockClient), + pool: mockPool, }); await tracker.start(); - (mockClient.query as jest.Mock).mockClear(); + (mockPool.query as jest.Mock).mockClear(); await tracker.stop(); - expect(mockClient.query).toHaveBeenCalledWith( + expect(mockPool.query).toHaveBeenCalledWith( expect.stringContaining('cleanup_ephemeral'), ['test-node'], ); @@ -190,7 +183,7 @@ describe('CursorTracker.stop()', () => { it('sets isRunning to false', async () => { const tracker = new CursorTracker({ - withPgClient: createMockWithPgClient(), + pool: createMockPool(), }); await tracker.start(); @@ -199,20 +192,20 @@ describe('CursorTracker.stop()', () => { }); it('is idempotent (calling stop twice does not double-cleanup)', async () => { - const mockClient = createMockClient(); + const mockPool = createMockPool(); const tracker = new CursorTracker({ nodeId: 'test-node', - withPgClient: createMockWithPgClient(mockClient), + pool: mockPool, }); await tracker.start(); - (mockClient.query as jest.Mock).mockClear(); + (mockPool.query as jest.Mock).mockClear(); await tracker.stop(); - const callCountAfterFirst = (mockClient.query as jest.Mock).mock.calls.length; + const callCountAfterFirst = (mockPool.query as jest.Mock).mock.calls.length; await tracker.stop(); - const callCountAfterSecond = (mockClient.query as jest.Mock).mock.calls.length; + const callCountAfterSecond = (mockPool.query as jest.Mock).mock.calls.length; expect(callCountAfterSecond).toBe(callCountAfterFirst); }); @@ -221,7 +214,7 @@ describe('CursorTracker.stop()', () => { const clearSpy = jest.spyOn(global, 'clearInterval'); const tracker = new CursorTracker({ - withPgClient: createMockWithPgClient(), + pool: createMockPool(), }); await tracker.start(); @@ -242,16 +235,16 @@ describe('CursorTracker.drain()', () => { }); it('calls drain_changes with nodeId and batchLimit', async () => { - const mockClient = createMockClient(); + const mockPool = createMockPool(); const tracker = new CursorTracker({ nodeId: 'drain-node', batchLimit: 100, - withPgClient: createMockWithPgClient(mockClient), + pool: mockPool, }); await tracker.drain(); - expect(mockClient.query).toHaveBeenCalledWith( + expect(mockPool.query).toHaveBeenCalledWith( expect.stringContaining('drain_changes'), ['drain-node', 100], ); @@ -259,12 +252,12 @@ describe('CursorTracker.drain()', () => { it('returns parsed change_log entries', async () => { const entry = createChangeLogEntry(); - const mockClient = createMockClient({ + const mockPool = createMockPool({ rows: [{ drain_changes: entry }], }); const tracker = new CursorTracker({ - withPgClient: createMockWithPgClient(mockClient), + pool: mockPool, }); const result = await tracker.drain(); @@ -273,13 +266,13 @@ describe('CursorTracker.drain()', () => { it('calls onChanges callback with entries', async () => { const entry = createChangeLogEntry(); - const mockClient = createMockClient({ + const mockPool = createMockPool({ rows: [{ drain_changes: entry }], }); const onChanges = jest.fn(); const tracker = new CursorTracker({ - withPgClient: createMockWithPgClient(mockClient), + pool: mockPool, onChanges, }); @@ -291,7 +284,7 @@ describe('CursorTracker.drain()', () => { const onChanges = jest.fn(); const tracker = new CursorTracker({ - withPgClient: createMockWithPgClient(), + pool: createMockPool(), onChanges, }); @@ -300,13 +293,13 @@ describe('CursorTracker.drain()', () => { }); it('returns empty array on error and calls onError', async () => { - const failingClient: PgClient = { + const failingPool: Queryable = { query: jest.fn().mockRejectedValue(new Error('connection lost')), }; const onError = jest.fn(); const tracker = new CursorTracker({ - withPgClient: createMockWithPgClient(failingClient), + pool: failingPool, onError, }); @@ -320,7 +313,7 @@ describe('CursorTracker.drain()', () => { it('prevents concurrent drains', async () => { let resolveQuery: (() => void) | null = null; - const slowClient: PgClient = { + const slowPool: Queryable = { query: jest.fn().mockImplementation(() => { return new Promise<{ rows: any[] }>((resolve) => { resolveQuery = () => resolve({ rows: [] }); @@ -329,7 +322,7 @@ describe('CursorTracker.drain()', () => { }; const tracker = new CursorTracker({ - withPgClient: createMockWithPgClient(slowClient), + pool: slowPool, }); const drain1 = tracker.drain(); @@ -353,21 +346,21 @@ describe('CursorTracker periodic polling', () => { }); it('polls drain_changes at configured interval', async () => { - const mockClient = createMockClient(); + const mockPool = createMockPool(); const tracker = new CursorTracker({ nodeId: 'poll-node', pollIntervalMs: 2000, - withPgClient: createMockWithPgClient(mockClient), + pool: mockPool, }); await tracker.start(); - (mockClient.query as jest.Mock).mockClear(); + (mockPool.query as jest.Mock).mockClear(); jest.advanceTimersByTime(2000); // Allow async callbacks await Promise.resolve(); - const drainCalls = (mockClient.query as jest.Mock).mock.calls + const drainCalls = (mockPool.query as jest.Mock).mock.calls .filter((c: any[]) => c[0].includes('drain_changes')); expect(drainCalls.length).toBeGreaterThanOrEqual(1); @@ -375,20 +368,20 @@ describe('CursorTracker periodic polling', () => { }); it('heartbeats at configured interval', async () => { - const mockClient = createMockClient(); + const mockPool = createMockPool(); const tracker = new CursorTracker({ nodeId: 'hb-node', heartbeatIntervalMs: 5000, - withPgClient: createMockWithPgClient(mockClient), + pool: mockPool, }); await tracker.start(); - (mockClient.query as jest.Mock).mockClear(); + (mockPool.query as jest.Mock).mockClear(); jest.advanceTimersByTime(5000); await Promise.resolve(); - const touchCalls = (mockClient.query as jest.Mock).mock.calls + const touchCalls = (mockPool.query as jest.Mock).mock.calls .filter((c: any[]) => c[0].includes('touch_listener')); expect(touchCalls.length).toBeGreaterThanOrEqual(1); @@ -398,47 +391,47 @@ describe('CursorTracker periodic polling', () => { describe('CursorTracker schema quoting', () => { it('includes schema name in SQL queries', async () => { - const mockClient = createMockClient(); + const mockPool = createMockPool(); const tracker = new CursorTracker({ nodeId: 'schema-node', schema: 'my_realtime_public', - withPgClient: createMockWithPgClient(mockClient), + pool: mockPool, }); await tracker.drain(); - expect(mockClient.query).toHaveBeenCalledWith( + expect(mockPool.query).toHaveBeenCalledWith( expect.stringContaining('my_realtime_public'), expect.any(Array), ); }); it('uses default schema when not specified', async () => { - const mockClient = createMockClient(); + const mockPool = createMockPool(); const tracker = new CursorTracker({ nodeId: 'default-schema-node', - withPgClient: createMockWithPgClient(mockClient), + pool: mockPool, }); await tracker.drain(); - expect(mockClient.query).toHaveBeenCalledWith( + expect(mockPool.query).toHaveBeenCalledWith( expect.stringContaining('realtime_public'), expect.any(Array), ); }); it('quotes schema names that need quoting', async () => { - const mockClient = createMockClient(); + const mockPool = createMockPool(); const tracker = new CursorTracker({ nodeId: 'special-schema-node', schema: 'my schema', - withPgClient: createMockWithPgClient(mockClient), + pool: mockPool, }); await tracker.drain(); - expect(mockClient.query).toHaveBeenCalledWith( + expect(mockPool.query).toHaveBeenCalledWith( expect.stringContaining('"my schema"'), expect.any(Array), ); @@ -455,13 +448,13 @@ describe('CursorTracker error handling', () => { }); it('touch_listener error calls onError without throwing', async () => { - const failingClient: PgClient = { + const failingPool: Queryable = { query: jest.fn().mockRejectedValue(new Error('touch failed')), }; const onError = jest.fn(); const tracker = new CursorTracker({ - withPgClient: createMockWithPgClient(failingClient), + pool: failingPool, onError, }); @@ -473,13 +466,13 @@ describe('CursorTracker error handling', () => { }); it('cleanup_ephemeral error calls onError without throwing', async () => { - const failingClient: PgClient = { + const failingPool: Queryable = { query: jest.fn().mockRejectedValue(new Error('cleanup failed')), }; const onError = jest.fn(); const tracker = new CursorTracker({ - withPgClient: createMockWithPgClient(failingClient), + pool: failingPool, onError, }); @@ -491,13 +484,13 @@ describe('CursorTracker error handling', () => { }); it('wraps non-Error objects in Error', async () => { - const failingClient: PgClient = { + const failingPool: Queryable = { query: jest.fn().mockRejectedValue('string error'), }; const onError = jest.fn(); const tracker = new CursorTracker({ - withPgClient: createMockWithPgClient(failingClient), + pool: failingPool, onError, }); diff --git a/graphile/graphile-realtime-subscriptions/__tests__/realtime-manager.test.ts b/graphile/graphile-realtime-subscriptions/__tests__/realtime-manager.test.ts index 77851c366d..d5fa6c2418 100644 --- a/graphile/graphile-realtime-subscriptions/__tests__/realtime-manager.test.ts +++ b/graphile/graphile-realtime-subscriptions/__tests__/realtime-manager.test.ts @@ -1,6 +1,6 @@ import { RealtimeManager } from '../src/realtime-manager'; import { extractRowId, entryToNotifyPayload, entryToChannel } from '../src/realtime-manager'; -import type { ChangeLogEntry, WithPgClient, PgClient } from '../src/types'; +import type { ChangeLogEntry, Queryable } from '../src/types'; import { EventEmitter } from 'events'; // --------------------------------------------------------------------------- @@ -22,16 +22,12 @@ function makeEntry(overrides: Partial = {}): ChangeLogEntry { }; } -function createMockClient(): jest.Mocked { +function createMockPool(): jest.Mocked { return { query: jest.fn().mockResolvedValue({ rows: [] }), }; } -function createMockWithPgClient(mockClient: PgClient): WithPgClient { - return async (cb: (client: PgClient) => Promise) => cb(mockClient); -} - function createMockPgSubscriber() { const eventEmitter = new EventEmitter(); return { eventEmitter, subscribe: jest.fn() }; @@ -115,12 +111,12 @@ describe('entryToChannel', () => { // --------------------------------------------------------------------------- describe('RealtimeManager', () => { - let mockClient: jest.Mocked; + let mockPool: jest.Mocked; let mockSubscriber: ReturnType; beforeEach(() => { jest.useFakeTimers(); - mockClient = createMockClient(); + mockPool = createMockPool(); mockSubscriber = createMockPgSubscriber(); }); @@ -131,7 +127,7 @@ describe('RealtimeManager', () => { function createManager(overrides: Record = {}) { return new RealtimeManager({ pgSubscriber: mockSubscriber, - withPgClient: createMockWithPgClient(mockClient), + pool: mockPool, nodeId: 'test-manager-node', pollIntervalMs: 1000, heartbeatIntervalMs: 5000, @@ -157,7 +153,7 @@ describe('RealtimeManager', () => { const manager = createManager(); await manager.start(); - expect(mockClient.query).toHaveBeenCalledWith( + expect(mockPool.query).toHaveBeenCalledWith( expect.stringContaining('touch_listener'), expect.any(Array), ); @@ -168,11 +164,11 @@ describe('RealtimeManager', () => { it('calls cleanup_ephemeral on stop', async () => { const manager = createManager(); await manager.start(); - mockClient.query.mockClear(); + mockPool.query.mockClear(); await manager.stop(); - expect(mockClient.query).toHaveBeenCalledWith( + expect(mockPool.query).toHaveBeenCalledWith( expect.stringContaining('cleanup_ephemeral'), expect.any(Array), ); @@ -206,7 +202,7 @@ describe('RealtimeManager', () => { makeEntry({ operation: 'UPDATE', payload_after: { id: 'row-b' } }), ]; - mockClient.query.mockImplementation(async (sql: string) => { + mockPool.query.mockImplementation(async (sql: string) => { if (typeof sql === 'string' && sql.includes('drain_changes')) { return { rows: entries.map((e) => ({ drain_changes: e })) }; } @@ -238,7 +234,7 @@ describe('RealtimeManager', () => { }), ]; - mockClient.query.mockImplementation(async (sql: string) => { + mockPool.query.mockImplementation(async (sql: string) => { if (typeof sql === 'string' && sql.includes('drain_changes')) { return { rows: entries.map((e) => ({ drain_changes: e })) }; } @@ -275,7 +271,7 @@ describe('RealtimeManager', () => { }), ]; - mockClient.query.mockImplementation(async (sql: string) => { + mockPool.query.mockImplementation(async (sql: string) => { if (typeof sql === 'string' && sql.includes('drain_changes')) { return { rows: entries.map((e) => ({ drain_changes: e })) }; } @@ -296,7 +292,7 @@ describe('RealtimeManager', () => { it('calls onError when drain fails', async () => { const errors: Error[] = []; - mockClient.query.mockImplementation(async (sql: string) => { + mockPool.query.mockImplementation(async (sql: string) => { if (typeof sql === 'string' && sql.includes('drain_changes')) { throw new Error('drain failed'); } @@ -317,7 +313,7 @@ describe('RealtimeManager', () => { makeEntry({ operation: 'INSERT', payload_after: { id: 'row-x' } }), ]; - mockClient.query.mockImplementation(async (sql: string) => { + mockPool.query.mockImplementation(async (sql: string) => { if (typeof sql === 'string' && sql.includes('drain_changes')) { return { rows: entries.map((e) => ({ drain_changes: e })) }; } diff --git a/graphile/graphile-realtime-subscriptions/src/cursor-tracker.ts b/graphile/graphile-realtime-subscriptions/src/cursor-tracker.ts index ec28f32c3e..45a9ad761f 100644 --- a/graphile/graphile-realtime-subscriptions/src/cursor-tracker.ts +++ b/graphile/graphile-realtime-subscriptions/src/cursor-tracker.ts @@ -9,8 +9,8 @@ * 4. stop() → calls cleanup_ephemeral() to remove ephemeral subscriptions * and delete the listener_node row * - * The caller provides a withPgClient callback that acquires a PgClient - * for each operation. This keeps connection management external. + * The caller provides a Queryable (typically a pg.Pool from pg-cache) + * and this class calls pool.query() directly for each operation. */ import { randomUUID } from 'crypto'; @@ -20,8 +20,7 @@ import { QuoteUtils } from '@pgsql/quotes'; import type { CursorTrackerOptions, ChangeLogEntry, - WithPgClient, - PgClient, + Queryable, } from './types'; const log = new Logger('cursor-tracker'); @@ -38,7 +37,7 @@ export class CursorTracker { private readonly pollIntervalMs: number; private readonly heartbeatIntervalMs: number; private readonly batchLimit: number; - private readonly withPgClient: WithPgClient; + private readonly pool: Queryable; private readonly onChanges: (entries: ChangeLogEntry[]) => void; private readonly onError: (error: Error) => void; @@ -53,7 +52,7 @@ export class CursorTracker { this.pollIntervalMs = options.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS; this.heartbeatIntervalMs = options.heartbeatIntervalMs ?? DEFAULT_HEARTBEAT_INTERVAL_MS; this.batchLimit = options.batchLimit ?? DEFAULT_BATCH_LIMIT; - this.withPgClient = options.withPgClient; + this.pool = options.pool; this.onChanges = options.onChanges ?? (() => {}); this.onError = options.onError ?? ((err) => { log.error(`CursorTracker error: ${err.message}`); @@ -104,14 +103,12 @@ export class CursorTracker { this.draining = true; try { - const entries = await this.withPgClient(async (client: PgClient) => { - const sql = `SELECT * FROM ${this.quoteIdent(this.schema)}.drain_changes($1, $2)`; - const result = await client.query<{ drain_changes: ChangeLogEntry }>( - sql, - [this.nodeId, this.batchLimit], - ); - return result.rows.map((row) => row.drain_changes); - }); + const sql = `SELECT * FROM ${this.quoteIdent(this.schema)}.drain_changes($1, $2)`; + const result = await this.pool.query<{ drain_changes: ChangeLogEntry }>( + sql, + [this.nodeId, this.batchLimit], + ); + const entries = result.rows.map((row) => row.drain_changes); if (entries.length > 0) { log.info(`Drained ${entries.length} change(s) for node=${this.nodeId}`); @@ -129,10 +126,8 @@ export class CursorTracker { async touchListener(): Promise { try { - await this.withPgClient(async (client: PgClient) => { - const sql = `SELECT ${this.quoteIdent(this.schema)}.touch_listener($1)`; - await client.query(sql, [this.nodeId]); - }); + const sql = `SELECT ${this.quoteIdent(this.schema)}.touch_listener($1)`; + await this.pool.query(sql, [this.nodeId]); } catch (err) { this.onError(err instanceof Error ? err : new Error(String(err))); } @@ -140,10 +135,8 @@ export class CursorTracker { async cleanupEphemeral(): Promise { try { - await this.withPgClient(async (client: PgClient) => { - const sql = `SELECT ${this.quoteIdent(this.schema)}.cleanup_ephemeral($1)`; - await client.query(sql, [this.nodeId]); - }); + const sql = `SELECT ${this.quoteIdent(this.schema)}.cleanup_ephemeral($1)`; + await this.pool.query(sql, [this.nodeId]); log.info(`Cleaned up ephemeral subscriptions for node=${this.nodeId}`); } catch (err) { this.onError(err instanceof Error ? err : new Error(String(err))); diff --git a/graphile/graphile-realtime-subscriptions/src/index.ts b/graphile/graphile-realtime-subscriptions/src/index.ts index 9edc22c8a5..0b8aa410a6 100644 --- a/graphile/graphile-realtime-subscriptions/src/index.ts +++ b/graphile/graphile-realtime-subscriptions/src/index.ts @@ -25,7 +25,6 @@ export type { RealtimeSubscriptionsPluginOptions } from './types'; export type { CursorTrackerOptions, ChangeLogEntry, - PgClient, - WithPgClient, + Queryable, RealtimeManagerOptions, } from './types'; diff --git a/graphile/graphile-realtime-subscriptions/src/plugin.ts b/graphile/graphile-realtime-subscriptions/src/plugin.ts index 642d04fbf4..09f7dd65d4 100644 --- a/graphile/graphile-realtime-subscriptions/src/plugin.ts +++ b/graphile/graphile-realtime-subscriptions/src/plugin.ts @@ -341,7 +341,7 @@ export { createRealtimeSubscriptionsPlugin as RealtimeSubscriptionsPlugin }; // Re-export CursorTracker and RealtimeManager for convenience export { CursorTracker } from './cursor-tracker'; export { RealtimeManager } from './realtime-manager'; -export type { CursorTrackerOptions, ChangeLogEntry, WithPgClient, PgClient, RealtimeManagerOptions } from './types'; +export type { CursorTrackerOptions, ChangeLogEntry, Queryable, RealtimeManagerOptions } from './types'; // Exported for testing export { parseNotifyPayload, EventThrottle, DEFAULT_OVERFLOW_THRESHOLD }; diff --git a/graphile/graphile-realtime-subscriptions/src/realtime-manager.ts b/graphile/graphile-realtime-subscriptions/src/realtime-manager.ts index 929e50dc78..d2da0405a3 100644 --- a/graphile/graphile-realtime-subscriptions/src/realtime-manager.ts +++ b/graphile/graphile-realtime-subscriptions/src/realtime-manager.ts @@ -28,7 +28,6 @@ import { Logger } from '@pgpmjs/logger'; import { CursorTracker } from './cursor-tracker'; import type { ChangeLogEntry, - WithPgClient, RealtimeManagerOptions, } from './types'; @@ -74,7 +73,7 @@ export class RealtimeManager { private started = false; constructor(options: RealtimeManagerOptions) { - const { pgSubscriber, withPgClient, ...cursorOpts } = options; + const { pgSubscriber, pool, ...cursorOpts } = options; this.subscriber = pgSubscriber; this.cursorTracker = new CursorTracker({ @@ -83,7 +82,7 @@ export class RealtimeManager { pollIntervalMs: cursorOpts.pollIntervalMs, heartbeatIntervalMs: cursorOpts.heartbeatIntervalMs, batchLimit: cursorOpts.batchLimit, - withPgClient, + pool, onChanges: (entries) => this.dispatchEntries(entries), onError: cursorOpts.onError ?? ((err) => { log.error(`RealtimeManager error: ${err.message}`); diff --git a/graphile/graphile-realtime-subscriptions/src/types.ts b/graphile/graphile-realtime-subscriptions/src/types.ts index a80756f162..bbf220ba4d 100644 --- a/graphile/graphile-realtime-subscriptions/src/types.ts +++ b/graphile/graphile-realtime-subscriptions/src/types.ts @@ -14,24 +14,20 @@ export interface RealtimeSubscriptionsPluginOptions { } /** - * A minimal PostgreSQL client interface used by CursorTracker. - * Compatible with node-postgres (pg) Client or PoolClient. + * A minimal query-capable interface — satisfied by pg.Pool, pg.Client, + * pg.PoolClient, or any object with a compatible `query` method. + * + * CursorTracker and RealtimeManager use only one-shot queries, so + * `pool.query()` (which internally borrows a client, runs the query, + * and releases the client) is all that's needed — no callback wrapper. */ -export interface PgClient { +export interface Queryable { query>( text: string, values?: unknown[], ): Promise<{ rows: R[] }>; } -/** - * Callback that provides a PgClient for executing queries. - * The client is released after the callback returns. - */ -export type WithPgClient = ( - callback: (client: PgClient) => Promise, -) => Promise; - /** * A single entry from drain_changes(), representing a change_log row * matched against subscriber tables. @@ -89,10 +85,12 @@ export interface CursorTrackerOptions { batchLimit?: number; /** - * Function to acquire a PgClient for executing queries. - * The cursor tracker calls this for every poll and heartbeat cycle. + * A query-capable object (typically a pg.Pool from pg-cache) used to + * run drain_changes(), touch_listener(), and cleanup_ephemeral() queries. + * pool.query() internally borrows a connection and releases it after + * each call — no manual connection management needed. */ - withPgClient: WithPgClient; + pool: Queryable; /** * Called when drain_changes() returns new change_log entries. @@ -120,9 +118,10 @@ export interface RealtimeManagerOptions { pgSubscriber: unknown; /** - * Function to acquire a PgClient for executing cursor tracking queries. + * A query-capable object (typically a pg.Pool from pg-cache) used by + * the underlying CursorTracker for drain_changes() polling. */ - withPgClient: WithPgClient; + pool: Queryable; /** * Unique identifier for this listener node. diff --git a/graphile/graphile-settings/package.json b/graphile/graphile-settings/package.json index eb07a23926..d74430f2ea 100644 --- a/graphile/graphile-settings/package.json +++ b/graphile/graphile-settings/package.json @@ -51,6 +51,7 @@ "graphile-build-pg": "5.0.0", "graphile-config": "1.0.0", "graphile-connection-filter": "workspace:^", + "graphile-realtime-subscriptions": "workspace:^", "graphile-ltree": "workspace:^", "graphile-pg-aggregates": "workspace:^", "graphile-postgis": "workspace:^", diff --git a/graphile/graphile-settings/src/presets/constructive-preset.ts b/graphile/graphile-settings/src/presets/constructive-preset.ts index 6e87d6c079..13a1dd55c7 100644 --- a/graphile/graphile-settings/src/presets/constructive-preset.ts +++ b/graphile/graphile-settings/src/presets/constructive-preset.ts @@ -6,6 +6,7 @@ import { createPostgisOperatorFactory,GraphilePostgisPreset } from 'graphile-pos import { PgAggregatesPreset } from 'graphile-pg-aggregates'; import { PresignedUrlPreset } from 'graphile-presigned-url-plugin'; import { createMatchesOperatorFactory, createTrgmOperatorFactories,UnifiedSearchPreset } from 'graphile-search'; +import { RealtimeSubscriptionsPreset } from 'graphile-realtime-subscriptions'; import { SqlExpressionValidatorPreset } from 'graphile-sql-expression-validator'; import { UploadPreset } from 'graphile-upload-plugin'; @@ -44,6 +45,7 @@ export interface ConstructivePresetOptions { enableConnectionFilter?: boolean; enableLtree?: boolean; enableLlm?: boolean; + enableRealtime?: boolean; } const DEFAULTS: Required = { @@ -56,6 +58,7 @@ const DEFAULTS: Required = { enableConnectionFilter: true, enableLtree: true, enableLlm: false, + enableRealtime: false, }; /** @@ -88,6 +91,7 @@ const DEFAULTS: Required = { * - enableDirectUploads -> UploadPreset * - enablePresignedUploads -> PresignedUrlPreset, BucketProvisionerPreset * - enableAggregates -> PgAggregatesPreset (off by default) + * - enableRealtime -> RealtimeSubscriptionsPreset (off by default) * - enableLlm -> (no plugin yet, reserved for future use) * * RELATION FILTERS (when enableConnectionFilter is true): @@ -182,6 +186,10 @@ export function createConstructivePreset( presets.push(PgAggregatesPreset); } + if (opts.enableRealtime) { + presets.push(RealtimeSubscriptionsPreset()); + } + // ----- connectionFilterOperatorFactories ----- // Only include operator factories for features that are actually enabled. // graphile-config replaces (not concatenates) arrays when merging presets, diff --git a/graphql/server/src/middleware/api.ts b/graphql/server/src/middleware/api.ts index bbd3cf5376..147a39be94 100644 --- a/graphql/server/src/middleware/api.ts +++ b/graphql/server/src/middleware/api.ts @@ -535,7 +535,8 @@ const queryRlsSettings = async (pool: Pool, databaseId: string): Promise(RLS_SETTINGS_SQL, [databaseId]); return toRlsModuleFromSettings(result.rows[0] ?? null); - } catch { + } catch (e: any) { + log.warn(`[rls-settings] Failed to load RLS settings: ${e.message}`); return undefined; } }; @@ -561,7 +562,8 @@ const queryCorsSettings = async (pool: Pool, databaseId: string, apiId?: string) } const dbDefault = await pool.query(CORS_SETTINGS_DB_DEFAULT_SQL, [databaseId]); return dbDefault.rows[0]?.allowed_origins; - } catch { + } catch (e: any) { + log.warn(`[cors-settings] Failed to load CORS settings: ${e.message}`); return undefined; } }; @@ -609,7 +611,8 @@ const queryPubkeySettings = async (pool: Pool, databaseId: string): Promise(PUBKEY_SETTINGS_SQL, [databaseId]); return toPubkeyChallengeSettings(result.rows[0] ?? null); - } catch { + } catch (e: any) { + log.warn(`[pubkey-settings] Failed to load pubkey challenge settings: ${e.message}`); return undefined; } }; @@ -649,7 +652,8 @@ const queryWebauthnSettings = async (pool: Pool, databaseId: string): Promise(WEBAUTHN_SETTINGS_SQL, [databaseId]); return toWebauthnSettings(result.rows[0] ?? null); - } catch { + } catch (e: any) { + log.warn(`[webauthn-settings] Failed to load webauthn settings: ${e.message}`); return undefined; } }; @@ -668,6 +672,9 @@ const toDatabaseSettings = (row: DatabaseSettingsRow | null): DatabaseSettings | enableConnectionFilter: row.resolved_enable_connection_filter, enableLtree: row.resolved_enable_ltree, enableLlm: row.resolved_enable_llm, + // Reads from the COALESCE cascade once constructive-db#1105 is merged and + // the enable_realtime column exists in database_settings / api_settings. + enableRealtime: false, }; }; @@ -675,7 +682,8 @@ const queryDatabaseSettings = async (pool: Pool, databaseId: string, apiId?: str try { const result = await pool.query(DATABASE_SETTINGS_SQL, [databaseId, apiId ?? null]); return toDatabaseSettings(result.rows[0] ?? null); - } catch { + } catch (e: any) { + log.warn(`[database-settings] Failed to load database settings: ${e.message}`); return undefined; } }; diff --git a/graphql/server/src/middleware/graphile.ts b/graphql/server/src/middleware/graphile.ts index 134c03f4c2..e4c1ef3f53 100644 --- a/graphql/server/src/middleware/graphile.ts +++ b/graphql/server/src/middleware/graphile.ts @@ -370,7 +370,11 @@ export const graphile = (opts: ConstructiveOptions): RequestHandler => { serviceKey: key, databaseId: api.databaseId ?? null, }, - () => createGraphileInstance({ preset, cacheKey: key }), + () => createGraphileInstance({ + preset, + cacheKey: key, + enableRealtime: api.databaseSettings?.enableRealtime, + }), { enabled: observabilityEnabled }, ); creating.set(key, creationPromise); diff --git a/graphql/server/src/types.ts b/graphql/server/src/types.ts index 8a5c614692..c0fa01ec19 100644 --- a/graphql/server/src/types.ts +++ b/graphql/server/src/types.ts @@ -32,6 +32,7 @@ export interface DatabaseSettings { enableConnectionFilter: boolean; enableLtree: boolean; enableLlm: boolean; + enableRealtime: boolean; } /** diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 090e82ab43..c4ad518394 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -220,6 +220,9 @@ importers: grafserv: specifier: 1.0.0 version: 1.0.0(@types/node@25.6.2)(@types/react-dom@19.2.3(@types/react@19.2.14))(@types/react@19.2.14)(grafast@1.0.0(graphql@16.13.0))(graphile-config@1.0.0)(graphql@16.13.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4)(use-sync-external-store@1.6.0(react@19.2.4))(ws@8.19.0) + graphile-realtime-subscriptions: + specifier: workspace:^ + version: link:../graphile-realtime-subscriptions/dist lru-cache: specifier: ^11.2.7 version: 11.2.7 @@ -758,6 +761,9 @@ importers: graphile-presigned-url-plugin: specifier: workspace:^ version: link:../graphile-presigned-url-plugin/dist + graphile-realtime-subscriptions: + specifier: workspace:^ + version: link:../graphile-realtime-subscriptions/dist graphile-search: specifier: workspace:^ version: link:../graphile-search/dist