feat: add CursorTracker for at-least-once delivery via drain_changes() polling#1110
feat: add CursorTracker for at-least-once delivery via drain_changes() polling#1110pyramation merged 2 commits intomainfrom
Conversation
…) polling Implements cursor tracking integration for the realtime subscriptions plugin: - CursorTracker class manages listener_node lifecycle (register, heartbeat, cleanup) - Periodic drain_changes() polling fetches change_log entries matched against subscribers - Periodic touch_listener() heartbeat keeps the node alive - cleanup_ephemeral() on stop removes ephemeral subscriptions and deletes the node - Configurable poll interval, heartbeat interval, batch limit, and schema - WithPgClient callback pattern keeps connection management external - onChanges callback delivers ChangeLogEntry[] with subscriber_ids for fan-out - Error handling with onError callback, no unhandled rejections - Guard against concurrent drains New types: PgClient, WithPgClient, ChangeLogEntry, CursorTrackerOptions 24 new tests covering lifecycle, polling, error handling, schema quoting
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
…d-rolled quoteIdent
|
Review the following changes in direct dependencies. Learn more about Socket for GitHub.
|
|
Warning Review the following alerts detected in dependencies. According to your organization's Security Policy, it is recommended to resolve "Warn" alerts. Learn more about Socket for GitHub.
|
Summary
Adds a
CursorTrackerclass tographile-realtime-subscriptionsthat manages thelistener_nodelifecycle and periodicdrain_changes()polling, enabling at-least-once delivery semantics using the existing database-side cursor tracking infrastructure (change_log,listener_node,drain_changes,touch_listener,cleanup_ephemeral).New files:
src/cursor-tracker.ts—CursorTrackerclass withstart(),stop(),drain(),touchListener(),cleanupEphemeral()methods__tests__/cursor-tracker.test.ts— 25 tests covering lifecycle, polling, error handling, concurrency guards, schema quotingModified files:
src/types.ts— New types:PgClient,WithPgClient,ChangeLogEntry,CursorTrackerOptionssrc/index.ts— ExportsCursorTrackerand new typessrc/plugin.ts— Re-exportsCursorTracker, updated docstring documenting cursor trackingpackage.json— Added@pgsql/quotesdependency for identifier quotingThe
CursorTrackeris a standalone component — it is exported for external use but is not wired into the plugin's Grafast subscription plan execution. Callers instantiate it with awithPgClientcallback and anonChangeshandler to receiveChangeLogEntry[]results fromdrain_changes(). The wiring of cursor-tracked changes into the GraphQL subscription delivery path is expected to happen in a subsequent integration step (e.g., server preset or middleware).Updates since last revision
quoteIdentwithQuoteUtils.quoteIdentifier()from@pgsql/quotes— uses the same identifier quoting library already used byquery-builder,graphile-settings, and other packages in the monorepo.QuoteUtils.quoteIdentifier()only adds double-quotes when the identifier requires them (e.g., contains spaces or special characters); simple identifiers likerealtime_publicare returned unquoted.QuoteUtils.quoteIdentifier()behavior — schema quoting tests no longer assert double-quotes around simple identifiers. Added a new test verifying that identifiers with spaces are properly quoted.Review & Testing Checklist for Human
ChangeLogEntrytype matchesdrain_changes()return shape — The type assumesdrain_changesreturns JSONB rows with{id, occurred_at, source_schema, source_table, operation, payload_after, payload_before, payload_diff, subscriber_ids}. Cross-reference withproc_drain_changes_bodyinconstructive-db.CursorTrackeris not wired intosubscribePlan/buildPlans; it's a building block. There is no code path that connectsonChangesentries to GraphQL subscriber delivery yet. Confirm this is the intended phasing.drain()— UsesSELECT * FROM ${schema}.drain_changes($1, $2)whereschemais quoted viaQuoteUtils.quoteIdentifier(). Verify this produces valid SQL for both simple schemas (realtime_public) and schemas requiring quoting.CursorTrackeragainst a real database withrealtime_moduledeployed, callstart(), insert rows into a realtime-enabled table, verifyonChangesreceives the expected entries, then callstop()and verifylistener_noderow is removed.Notes
QuoteUtils.quoteIdentifier()from@pgsql/quotesonly adds double-quotes when necessary — simple identifiers likerealtime_publicare emitted unquoted, while identifiers with spaces or special characters get properly quoted. This is safe since the quoting follows PostgreSQL's identifier rules.drainingboolean) silently skips poll cycles if a previous drain is still in-flight. This is intentional to prevent query pileup.CursorTrackerand its types are exported from bothsrc/plugin.tsandsrc/index.ts(dual export paths).Link to Devin session: https://app.devin.ai/sessions/19485cf5cc58416a9f86068563d512f5
Requested by: @pyramation