feat: add RealtimeManager to bridge CursorTracker events into PgSubscriber#1111
Merged
pyramation merged 1 commit intomainfrom May 10, 2026
Merged
feat: add RealtimeManager to bridge CursorTracker events into PgSubscriber#1111pyramation merged 1 commit intomainfrom
pyramation merged 1 commit intomainfrom
Conversation
…riber Introduces RealtimeManager class that converts drain_changes() ChangeLogEntry objects into NOTIFY-format payloads and emits them on PgSubscriber's internal EventEmitter. This provides at-least-once delivery: NOTIFY handles instant delivery while cursor polling catches up on missed events (disconnects, restarts). Key components: - RealtimeManager: lifecycle management (start/stop), event dispatch - entryToChannel: maps source_schema.source_table to NOTIFY channel - entryToNotifyPayload: converts ChangeLogEntry to OP:rowId format - extractRowId: pulls row ID from payload_after/payload_before - RealtimeManagerOptions: typed configuration interface 22 new tests covering helper functions, lifecycle, event dispatching, multi-table routing, and error handling. All 92 tests passing.
Contributor
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds
RealtimeManager, a new class ingraphile-realtime-subscriptionsthat bridges the cursor-tracking polling system (CursorTracker+drain_changes()) into PostGraphile's live subscription delivery path viaPgSubscriber.How it works: When
drain_changes()returnsChangeLogEntryobjects,RealtimeManagerconverts them into the same"OP:rowId"NOTIFY payload format used byemit_changeand emits them onPgSubscriber's internalEventEmitter. This means existing subscription plans (which use grafast'slisten()step on the same EventEmitter) handle cursor-tracked events identically to real NOTIFY events — no changes needed to the plugin or subscription plans.This enables at-least-once delivery: NOTIFY provides instant best-effort delivery, while cursor polling catches up on anything missed during disconnects or restarts. Duplicates are expected; clients should be idempotent.
New files:
src/realtime-manager.ts—RealtimeManagerclass + helper functions (extractRowId,entryToNotifyPayload,entryToChannel)__tests__/realtime-manager.test.ts— 22 tests covering helpers, lifecycle, multi-table event dispatch, and error handlingModified files:
src/types.ts— addsRealtimeManagerOptionsinterfacesrc/index.ts,src/plugin.ts— re-exportRealtimeManagerandRealtimeManagerOptionsBuilds on PR #1110 (CursorTracker class). Server-level wiring (start/stop with server lifecycle) is a follow-up.
Review & Testing Checklist for Human
getEventEmitter()(realtime-manager.ts L139-152): AccessesPgSubscriber's privateeventEmitterproperty via runtime property check. This is the core integration mechanism — verify it works against the actual@dataplan/pg@1.0.0PgSubscriberclass in a running server. Ifeventemitter3changes its internal structure in a future@dataplan/pgrelease, this will silently fail (logged warning, events dropped).extractRowIdassumesidfield (L46-50): Hardcodespayload_after.id/payload_before.id. Tables with non-idprimary keys (e.g., composite keys,uuidcolumn named differently) will produce payloads without row IDs, causing subscription plans to see"INSERT"instead of"INSERT:uuid". Confirm this is acceptable or whether the row ID field should be configurable.pgSubscriber: unknowntype (types.ts L120): Avoids a hard dep on@dataplan/pgtypes but sacrifices compile-time safety. Passing the wrong object will silently degrade (warn + no dispatch). Consider whether importing thePgSubscribertype (even as a dev/peer dep) would be better.Notes
tsc --noEmit).WithPgClientimport inrealtime-manager.tsis unused directly (only referenced throughRealtimeManagerOptions) — may trigger an unused-import lint rule depending on config.Link to Devin session: https://app.devin.ai/sessions/19485cf5cc58416a9f86068563d512f5
Requested by: @pyramation