Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions graphile/graphile-cache/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"dependencies": {
"@pgpmjs/logger": "workspace:^",
"express": "^5.2.1",
"graphile-realtime-subscriptions": "workspace:^",
"grafserv": "1.0.0",
"lru-cache": "^11.2.7",
"pg-cache": "workspace:^",
Expand Down
54 changes: 52 additions & 2 deletions graphile/graphile-cache/src/create-instance.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
import { createServer } from 'node:http';
import { Logger } from '@pgpmjs/logger';
import express from 'express';
import { postgraphile } from 'postgraphile';
import { grafserv } from 'grafserv/express/v4';
import type { GraphileCacheEntry } from './graphile-cache';

const log = new Logger('graphile-cache:create');

interface GraphileInstanceOptions {
preset: any;
cacheKey: string;
/**
* When true, a RealtimeManager is created and started alongside the
* PostGraphile instance. The pool is extracted from the preset's
* pgServices (managed by pg-cache) rather than passed separately.
*/
enableRealtime?: boolean;
}

/**
Expand All @@ -18,11 +27,17 @@ interface GraphileInstanceOptions {
*
* Callers are responsible for building the `GraphileConfig.Preset` (including
* pgServices, grafserv options, grafast context, etc.) before passing it here.
*
* When `enableRealtime` is true, a RealtimeManager is created that bridges
* cursor-tracked events from `drain_changes()` into the PostGraphile
* instance's PgSubscriber EventEmitter. Both `pgSubscriber` and the pg
* pool are extracted from the resolved preset's pgServices — no separate
* pool parameter is needed.
*/
export const createGraphileInstance = async (
opts: GraphileInstanceOptions
): Promise<GraphileCacheEntry> => {
const { preset, cacheKey } = opts;
const { preset, cacheKey, enableRealtime = false } = opts;

const pgl = postgraphile(preset);
const serv = pgl.createServ(grafserv);
Expand All @@ -32,12 +47,47 @@ export const createGraphileInstance = async (
await serv.addTo(handler, httpServer);
await serv.ready();

return {
const entry: GraphileCacheEntry = {
pgl,
serv,
handler,
httpServer,
cacheKey,
createdAt: Date.now(),
};

if (enableRealtime) {
try {
const { RealtimeManager } = await import('graphile-realtime-subscriptions');

// Extract PgSubscriber and pool from the resolved preset's pgServices.
// The pool is the same instance managed by pg-cache (via getPgPool)
// and threaded into the preset by makePgService({ pool, schemas }).
const resolvedPreset = pgl.getResolvedPreset();
const pgService = (resolvedPreset as any).pgServices?.[0];
const pgSubscriber = pgService?.pgSubscriber ?? null;
const pool = pgService?.adaptorSettings?.pool ?? null;

if (!pgSubscriber) {
log.warn(`PostGraphile[${cacheKey}] has no pgSubscriber — RealtimeManager will not be started`);
} else if (!pool) {
log.warn(`PostGraphile[${cacheKey}] has no pool in pgService — RealtimeManager will not be started`);
} else {
const manager = new RealtimeManager({
pgSubscriber,
pool,
nodeId: `graphile-cache:${cacheKey}`,
schema: 'realtime_public',
});

await manager.start();
entry.realtimeManager = manager;
log.info(`RealtimeManager started for PostGraphile[${cacheKey}]`);
}
} catch (err) {
log.error(`Failed to start RealtimeManager for PostGraphile[${cacheKey}]:`, err);
}
}

return entry;
};
10 changes: 10 additions & 0 deletions graphile/graphile-cache/src/graphile-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ export interface GraphileCacheEntry {
httpServer: HttpServer;
cacheKey: string;
createdAt: number;
/** Optional RealtimeManager for cursor-tracked subscription delivery */
realtimeManager?: { stop(): Promise<void> } | null;
}

// Track disposed entries to prevent double-disposal
Expand Down Expand Up @@ -119,6 +121,14 @@ const disposeEntry = async (entry: GraphileCacheEntry, key: string): Promise<voi
entry.httpServer.close(() => resolve());
});
}
// Stop RealtimeManager if present (before releasing PostGraphile)
if (entry.realtimeManager) {
try {
await entry.realtimeManager.stop();
} catch (err) {
log.error(`Error stopping RealtimeManager for PostGraphile[${key}]:`, err);
}
}
// Release PostGraphile instance (this also releases grafserv internally)
if (entry.pgl) {
await entry.pgl.release();
Expand Down
Loading
Loading