From 13a0e63116703255b79c0fa6d0b74f85697e25fd Mon Sep 17 00:00:00 2001 From: Dan Lynch Date: Sun, 10 May 2026 23:46:15 +0000 Subject: [PATCH] feat: add RealtimeManager to bridge CursorTracker events into PgSubscriber Introduces RealtimeManager class that converts drain_changes() ChangeLogEntry objects into NOTIFY-format payloads and emits them on PgSubscriber's internal EventEmitter. This provides at-least-once delivery: NOTIFY handles instant delivery while cursor polling catches up on missed events (disconnects, restarts). Key components: - RealtimeManager: lifecycle management (start/stop), event dispatch - entryToChannel: maps source_schema.source_table to NOTIFY channel - entryToNotifyPayload: converts ChangeLogEntry to OP:rowId format - extractRowId: pulls row ID from payload_after/payload_before - RealtimeManagerOptions: typed configuration interface 22 new tests covering helper functions, lifecycle, event dispatching, multi-table routing, and error handling. All 92 tests passing. --- .../__tests__/realtime-manager.test.ts | 333 ++++++++++++++++++ .../src/index.ts | 2 + .../src/plugin.ts | 5 +- .../src/realtime-manager.ts | 157 +++++++++ .../src/types.ts | 60 ++++ 5 files changed, 555 insertions(+), 2 deletions(-) create mode 100644 graphile/graphile-realtime-subscriptions/__tests__/realtime-manager.test.ts create mode 100644 graphile/graphile-realtime-subscriptions/src/realtime-manager.ts diff --git a/graphile/graphile-realtime-subscriptions/__tests__/realtime-manager.test.ts b/graphile/graphile-realtime-subscriptions/__tests__/realtime-manager.test.ts new file mode 100644 index 000000000..77851c366 --- /dev/null +++ b/graphile/graphile-realtime-subscriptions/__tests__/realtime-manager.test.ts @@ -0,0 +1,333 @@ +import { RealtimeManager } from '../src/realtime-manager'; +import { extractRowId, entryToNotifyPayload, entryToChannel } from '../src/realtime-manager'; +import type { ChangeLogEntry, WithPgClient, PgClient } from '../src/types'; +import { EventEmitter } from 'events'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeEntry(overrides: Partial = {}): ChangeLogEntry { + return { + id: 'cl-entry-001', + occurred_at: '2024-01-01T00:00:00Z', + source_schema: 'public', + source_table: 'contact', + operation: 'INSERT', + payload_after: { id: 'row-uuid-1', name: 'Alice' }, + payload_before: null, + payload_diff: null, + subscriber_ids: ['sub-1'], + ...overrides, + }; +} + +function createMockClient(): jest.Mocked { + return { + query: jest.fn().mockResolvedValue({ rows: [] }), + }; +} + +function createMockWithPgClient(mockClient: PgClient): WithPgClient { + return async (cb: (client: PgClient) => Promise) => cb(mockClient); +} + +function createMockPgSubscriber() { + const eventEmitter = new EventEmitter(); + return { eventEmitter, subscribe: jest.fn() }; +} + +// --------------------------------------------------------------------------- +// Unit tests: helper functions +// --------------------------------------------------------------------------- + +describe('extractRowId', () => { + it('extracts id from payload_after for INSERT', () => { + const entry = makeEntry({ operation: 'INSERT', payload_after: { id: 'abc-123' } }); + expect(extractRowId(entry)).toBe('abc-123'); + }); + + it('extracts id from payload_after for UPDATE', () => { + const entry = makeEntry({ operation: 'UPDATE', payload_after: { id: 'def-456' } }); + expect(extractRowId(entry)).toBe('def-456'); + }); + + it('extracts id from payload_before for DELETE', () => { + const entry = makeEntry({ + operation: 'DELETE', + payload_after: null, + payload_before: { id: 'ghi-789' }, + }); + expect(extractRowId(entry)).toBe('ghi-789'); + }); + + it('returns null when payload is missing', () => { + const entry = makeEntry({ operation: 'INSERT', payload_after: null }); + expect(extractRowId(entry)).toBeNull(); + }); + + it('returns null when payload has no id field', () => { + const entry = makeEntry({ operation: 'INSERT', payload_after: { name: 'Alice' } }); + expect(extractRowId(entry)).toBeNull(); + }); +}); + +describe('entryToNotifyPayload', () => { + it('formats INSERT with row id', () => { + const entry = makeEntry({ operation: 'INSERT', payload_after: { id: 'row-1' } }); + expect(entryToNotifyPayload(entry)).toBe('INSERT:row-1'); + }); + + it('formats UPDATE with row id', () => { + const entry = makeEntry({ operation: 'UPDATE', payload_after: { id: 'row-2' } }); + expect(entryToNotifyPayload(entry)).toBe('UPDATE:row-2'); + }); + + it('formats DELETE with row id from payload_before', () => { + const entry = makeEntry({ + operation: 'DELETE', + payload_after: null, + payload_before: { id: 'row-3' }, + }); + expect(entryToNotifyPayload(entry)).toBe('DELETE:row-3'); + }); + + it('returns operation only when no row id available', () => { + const entry = makeEntry({ operation: 'INSERT', payload_after: null }); + expect(entryToNotifyPayload(entry)).toBe('INSERT'); + }); +}); + +describe('entryToChannel', () => { + it('builds channel from source_schema and source_table', () => { + const entry = makeEntry({ source_schema: 'public', source_table: 'contact' }); + expect(entryToChannel(entry)).toBe('realtime:public.contact'); + }); + + it('handles custom schema names', () => { + const entry = makeEntry({ source_schema: 'tenant_42', source_table: 'invoice' }); + expect(entryToChannel(entry)).toBe('realtime:tenant_42.invoice'); + }); +}); + +// --------------------------------------------------------------------------- +// RealtimeManager lifecycle +// --------------------------------------------------------------------------- + +describe('RealtimeManager', () => { + let mockClient: jest.Mocked; + let mockSubscriber: ReturnType; + + beforeEach(() => { + jest.useFakeTimers(); + mockClient = createMockClient(); + mockSubscriber = createMockPgSubscriber(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + function createManager(overrides: Record = {}) { + return new RealtimeManager({ + pgSubscriber: mockSubscriber, + withPgClient: createMockWithPgClient(mockClient), + nodeId: 'test-manager-node', + pollIntervalMs: 1000, + heartbeatIntervalMs: 5000, + ...overrides, + }); + } + + it('starts and stops without error', async () => { + const manager = createManager(); + await manager.start(); + expect(manager.isRunning).toBe(true); + + await manager.stop(); + expect(manager.isRunning).toBe(false); + }); + + it('exposes nodeId', () => { + const manager = createManager({ nodeId: 'my-node-42' }); + expect(manager.nodeId).toBe('my-node-42'); + }); + + it('calls touch_listener on start', async () => { + const manager = createManager(); + await manager.start(); + + expect(mockClient.query).toHaveBeenCalledWith( + expect.stringContaining('touch_listener'), + expect.any(Array), + ); + + await manager.stop(); + }); + + it('calls cleanup_ephemeral on stop', async () => { + const manager = createManager(); + await manager.start(); + mockClient.query.mockClear(); + + await manager.stop(); + + expect(mockClient.query).toHaveBeenCalledWith( + expect.stringContaining('cleanup_ephemeral'), + expect.any(Array), + ); + }); + + it('is idempotent for start', async () => { + const manager = createManager(); + await manager.start(); + await manager.start(); // should be no-op + + await manager.stop(); + }); + + it('is idempotent for stop', async () => { + const manager = createManager(); + await manager.start(); + await manager.stop(); + await manager.stop(); // should be no-op + }); + + describe('event dispatching', () => { + it('emits cursor-tracked events on PgSubscriber eventEmitter', async () => { + const emitted: { channel: string; payload: string }[] = []; + mockSubscriber.eventEmitter.on('realtime:public.contact', (payload: string) => { + emitted.push({ channel: 'realtime:public.contact', payload }); + }); + + // Mock drain_changes to return entries + const entries: ChangeLogEntry[] = [ + makeEntry({ operation: 'INSERT', payload_after: { id: 'row-a' } }), + makeEntry({ operation: 'UPDATE', payload_after: { id: 'row-b' } }), + ]; + + mockClient.query.mockImplementation(async (sql: string) => { + if (typeof sql === 'string' && sql.includes('drain_changes')) { + return { rows: entries.map((e) => ({ drain_changes: e })) }; + } + return { rows: [] }; + }); + + const manager = createManager(); + await manager.start(); + + // The initial drain happens in start() — events should be emitted + expect(emitted).toHaveLength(2); + expect(emitted[0].payload).toBe('INSERT:row-a'); + expect(emitted[1].payload).toBe('UPDATE:row-b'); + + await manager.stop(); + }); + + it('handles DELETE events with payload_before', async () => { + const emitted: string[] = []; + mockSubscriber.eventEmitter.on('realtime:public.contact', (payload: string) => { + emitted.push(payload); + }); + + const entries: ChangeLogEntry[] = [ + makeEntry({ + operation: 'DELETE', + payload_after: null, + payload_before: { id: 'deleted-row' }, + }), + ]; + + mockClient.query.mockImplementation(async (sql: string) => { + if (typeof sql === 'string' && sql.includes('drain_changes')) { + return { rows: entries.map((e) => ({ drain_changes: e })) }; + } + return { rows: [] }; + }); + + const manager = createManager(); + await manager.start(); + + expect(emitted).toHaveLength(1); + expect(emitted[0]).toBe('DELETE:deleted-row'); + + await manager.stop(); + }); + + it('dispatches to correct channels for different tables', async () => { + const contactEvents: string[] = []; + const invoiceEvents: string[] = []; + mockSubscriber.eventEmitter.on('realtime:public.contact', (p: string) => contactEvents.push(p)); + mockSubscriber.eventEmitter.on('realtime:billing.invoice', (p: string) => invoiceEvents.push(p)); + + const entries: ChangeLogEntry[] = [ + makeEntry({ + source_schema: 'public', + source_table: 'contact', + operation: 'INSERT', + payload_after: { id: 'contact-1' }, + }), + makeEntry({ + source_schema: 'billing', + source_table: 'invoice', + operation: 'UPDATE', + payload_after: { id: 'invoice-1' }, + }), + ]; + + mockClient.query.mockImplementation(async (sql: string) => { + if (typeof sql === 'string' && sql.includes('drain_changes')) { + return { rows: entries.map((e) => ({ drain_changes: e })) }; + } + return { rows: [] }; + }); + + const manager = createManager(); + await manager.start(); + + expect(contactEvents).toEqual(['INSERT:contact-1']); + expect(invoiceEvents).toEqual(['UPDATE:invoice-1']); + + await manager.stop(); + }); + }); + + describe('error handling', () => { + it('calls onError when drain fails', async () => { + const errors: Error[] = []; + + mockClient.query.mockImplementation(async (sql: string) => { + if (typeof sql === 'string' && sql.includes('drain_changes')) { + throw new Error('drain failed'); + } + return { rows: [] }; + }); + + const manager = createManager({ onError: (err: Error) => errors.push(err) }); + await manager.start(); + + expect(errors).toHaveLength(1); + expect(errors[0].message).toBe('drain failed'); + + await manager.stop(); + }); + + it('handles missing eventEmitter gracefully', async () => { + const entries: ChangeLogEntry[] = [ + makeEntry({ operation: 'INSERT', payload_after: { id: 'row-x' } }), + ]; + + mockClient.query.mockImplementation(async (sql: string) => { + if (typeof sql === 'string' && sql.includes('drain_changes')) { + return { rows: entries.map((e) => ({ drain_changes: e })) }; + } + return { rows: [] }; + }); + + // pgSubscriber without eventEmitter — should not crash + const manager = createManager({ pgSubscriber: {} }); + await manager.start(); + await manager.stop(); + }); + }); +}); diff --git a/graphile/graphile-realtime-subscriptions/src/index.ts b/graphile/graphile-realtime-subscriptions/src/index.ts index ae62fcc21..9edc22c8a 100644 --- a/graphile/graphile-realtime-subscriptions/src/index.ts +++ b/graphile/graphile-realtime-subscriptions/src/index.ts @@ -20,10 +20,12 @@ export { createRealtimeSubscriptionsPlugin, RealtimeSubscriptionsPlugin } from './plugin'; export { RealtimeSubscriptionsPreset } from './preset'; export { CursorTracker } from './cursor-tracker'; +export { RealtimeManager } from './realtime-manager'; export type { RealtimeSubscriptionsPluginOptions } from './types'; export type { CursorTrackerOptions, ChangeLogEntry, PgClient, WithPgClient, + RealtimeManagerOptions, } from './types'; diff --git a/graphile/graphile-realtime-subscriptions/src/plugin.ts b/graphile/graphile-realtime-subscriptions/src/plugin.ts index 9fd61be20..642d04fbf 100644 --- a/graphile/graphile-realtime-subscriptions/src/plugin.ts +++ b/graphile/graphile-realtime-subscriptions/src/plugin.ts @@ -338,9 +338,10 @@ export function createRealtimeSubscriptionsPlugin( export { createRealtimeSubscriptionsPlugin as RealtimeSubscriptionsPlugin }; -// Re-export CursorTracker for convenience +// Re-export CursorTracker and RealtimeManager for convenience export { CursorTracker } from './cursor-tracker'; -export type { CursorTrackerOptions, ChangeLogEntry, WithPgClient, PgClient } from './types'; +export { RealtimeManager } from './realtime-manager'; +export type { CursorTrackerOptions, ChangeLogEntry, WithPgClient, PgClient, RealtimeManagerOptions } from './types'; // Exported for testing export { parseNotifyPayload, EventThrottle, DEFAULT_OVERFLOW_THRESHOLD }; diff --git a/graphile/graphile-realtime-subscriptions/src/realtime-manager.ts b/graphile/graphile-realtime-subscriptions/src/realtime-manager.ts new file mode 100644 index 000000000..929e50dc7 --- /dev/null +++ b/graphile/graphile-realtime-subscriptions/src/realtime-manager.ts @@ -0,0 +1,157 @@ +/** + * RealtimeManager — bridges CursorTracker (polling drain_changes) into + * PostGraphile's PgSubscriber so cursor-tracked events flow through the + * same subscription plans as NOTIFY events. + * + * Architecture: + * PgSubscriber uses an internal EventEmitter. NOTIFY payloads arrive via + * pg's `notification` event and are emitted as `eventEmitter.emit(channel, payload)`. + * The `listen()` step in grafast subscribes to the same EventEmitter. + * + * RealtimeManager converts ChangeLogEntry objects from drain_changes() into + * the same NOTIFY payload format ("OP:rowId1,rowId2,...") and emits them on + * the PgSubscriber's EventEmitter, so existing subscription plans handle + * them identically to real NOTIFY events. + * + * This provides at-least-once delivery: NOTIFY is instant but best-effort; + * cursor polling catches up on anything missed (disconnects, restarts). + * Duplicates are expected and acceptable — clients should be idempotent. + * + * Lifecycle: + * 1. start() → registers listener node, begins polling + heartbeat + * 2. drain_changes() results are converted and emitted on PgSubscriber + * 3. stop() → cleans up ephemeral subscriptions, removes listener node + */ + +import { Logger } from '@pgpmjs/logger'; + +import { CursorTracker } from './cursor-tracker'; +import type { + ChangeLogEntry, + WithPgClient, + RealtimeManagerOptions, +} from './types'; + +const log = new Logger('realtime-manager'); + +/** + * Extract row IDs from a ChangeLogEntry. + * + * For INSERT/UPDATE the row ID lives in payload_after.id; + * for DELETE it lives in payload_before.id. + * Falls back to the change_log entry's own id if payloads are missing. + */ +function extractRowId(entry: ChangeLogEntry): string | null { + if (entry.operation === 'DELETE') { + return (entry.payload_before?.id as string) ?? null; + } + return (entry.payload_after?.id as string) ?? null; +} + +/** + * Convert a ChangeLogEntry into the NOTIFY payload format used by emit_change. + * Format: "OPERATION:rowId" (e.g. "INSERT:550e8400-...") + */ +function entryToNotifyPayload(entry: ChangeLogEntry): string { + const rowId = extractRowId(entry); + if (!rowId) { + return entry.operation; + } + return `${entry.operation}:${rowId}`; +} + +/** + * Build the NOTIFY channel name for a change_log entry. + * Matches the channel format used by emit_change: "realtime:{schema}.{table}" + */ +function entryToChannel(entry: ChangeLogEntry): string { + return `realtime:${entry.source_schema}.${entry.source_table}`; +} + +export class RealtimeManager { + private readonly cursorTracker: CursorTracker; + private readonly subscriber: unknown; + private started = false; + + constructor(options: RealtimeManagerOptions) { + const { pgSubscriber, withPgClient, ...cursorOpts } = options; + this.subscriber = pgSubscriber; + + this.cursorTracker = new CursorTracker({ + nodeId: cursorOpts.nodeId, + schema: cursorOpts.schema, + pollIntervalMs: cursorOpts.pollIntervalMs, + heartbeatIntervalMs: cursorOpts.heartbeatIntervalMs, + batchLimit: cursorOpts.batchLimit, + withPgClient, + onChanges: (entries) => this.dispatchEntries(entries), + onError: cursorOpts.onError ?? ((err) => { + log.error(`RealtimeManager error: ${err.message}`); + }), + }); + } + + get nodeId(): string { + return this.cursorTracker.nodeId; + } + + get isRunning(): boolean { + return this.started && this.cursorTracker.isRunning; + } + + async start(): Promise { + if (this.started) return; + this.started = true; + + log.info(`Starting RealtimeManager: node=${this.nodeId}`); + await this.cursorTracker.start(); + } + + async stop(): Promise { + if (!this.started) return; + this.started = false; + + log.info(`Stopping RealtimeManager: node=${this.nodeId}`); + await this.cursorTracker.stop(); + } + + /** + * Convert ChangeLogEntry objects to NOTIFY-format payloads and emit + * them on the PgSubscriber's internal EventEmitter. + */ + private dispatchEntries(entries: ChangeLogEntry[]): void { + const emitter = this.getEventEmitter(); + if (!emitter) { + log.warn('PgSubscriber has no eventEmitter; cursor events cannot be dispatched'); + return; + } + + for (const entry of entries) { + const channel = entryToChannel(entry); + const payload = entryToNotifyPayload(entry); + emitter.emit(channel, payload); + } + + log.info(`Dispatched ${entries.length} cursor-tracked event(s) to PgSubscriber`); + } + + /** + * Access PgSubscriber's internal EventEmitter. + * + * PgSubscriber from @dataplan/pg stores an EventEmitter3 instance as + * `this.eventEmitter`. It is private but stable across v1.x releases. + * This is the same emitter that NOTIFY events are dispatched through. + */ + private getEventEmitter(): { emit(event: string, payload: string): boolean } | null { + const sub = this.subscriber as Record; + if (sub && typeof sub === 'object' && 'eventEmitter' in sub) { + const ee = sub.eventEmitter as { emit(event: string, payload: string): boolean }; + if (typeof ee?.emit === 'function') { + return ee; + } + } + return null; + } +} + +export { extractRowId, entryToNotifyPayload, entryToChannel }; diff --git a/graphile/graphile-realtime-subscriptions/src/types.ts b/graphile/graphile-realtime-subscriptions/src/types.ts index d6d860609..a80756f16 100644 --- a/graphile/graphile-realtime-subscriptions/src/types.ts +++ b/graphile/graphile-realtime-subscriptions/src/types.ts @@ -106,3 +106,63 @@ export interface CursorTrackerOptions { */ onError?: (error: Error) => void; } + +/** + * Configuration for the RealtimeManager, which bridges CursorTracker + * events into PostGraphile's PgSubscriber for at-least-once delivery. + */ +export interface RealtimeManagerOptions { + /** + * The PgSubscriber instance from PostGraphile's context. + * RealtimeManager emits cursor-tracked events on its internal EventEmitter + * so they flow through existing subscription plans. + */ + pgSubscriber: unknown; + + /** + * Function to acquire a PgClient for executing cursor tracking queries. + */ + withPgClient: WithPgClient; + + /** + * Unique identifier for this listener node. + * Should be stable across restarts if you want cursor continuity. + * If not provided, a random ID is generated (ephemeral mode). + */ + nodeId?: string; + + /** + * The realtime_public schema name where drain_changes(), + * touch_listener(), and cleanup_ephemeral() live. + * + * Default: 'realtime_public' + */ + schema?: string; + + /** + * How often to poll drain_changes() for new events (milliseconds). + * + * Default: 5000 (5 seconds) + */ + pollIntervalMs?: number; + + /** + * How often to send a heartbeat via touch_listener() (milliseconds). + * + * Default: 30000 (30 seconds) + */ + heartbeatIntervalMs?: number; + + /** + * Maximum number of change_log rows to fetch per drain_changes() call. + * + * Default: 500 + */ + batchLimit?: number; + + /** + * Called when an error occurs during polling, heartbeat, or cleanup. + * If not provided, errors are logged via @pgpmjs/logger. + */ + onError?: (error: Error) => void; +}