diff --git a/src/client/supabase.ts b/src/client/supabase.ts index 73970be..0926246 100644 --- a/src/client/supabase.ts +++ b/src/client/supabase.ts @@ -11,6 +11,7 @@ import { import { type Database as CachingDatabaseTypes } from "../types/supabaseCaching.js"; import { type Database as DataDatabaseTypes } from "../types/supabaseData.js"; import { cache } from "./graphql.js"; +import { singleton } from "tsyringe"; // Create a single supabase client for interacting with your database export const supabaseCaching = createClient( @@ -231,268 +232,304 @@ const handleChangeSignatureRequests = ( } }; -export const subscribeToSupabaseRealtimeEvents = () => { - console.log("✏️ Subscribing to realtime events"); +@singleton() +export class SupabaseRealtimeManager { + private isSubscribed: boolean = false; - supabaseCaching - .channel("schema-db-changes") - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "claims", - }, - (payload) => handleChangeClaims(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "fractions", - }, - (payload) => handleChangeFractions(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "metadata", - }, - (payload) => handleChangeMetadata(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "sales", - }, - (payload) => handleChangeSales(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "sales", - }, - (payload) => handleChangeSales(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "allow_list_data", - }, - (payload) => handleChangeAllowlistRecords(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "hypercert_allow_list_records", - }, - (payload) => handleChangeAllowlistRecords(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "attestations", - }, - (payload) => handleChangeAttestations(payload), - ) - .subscribe((status, error) => { - if (status === "SUBSCRIBED") { - console.log( - "✅ [CACHING] Subscribed to realtime events with status", - status, - ); - return; - } + public subscribeToEvents(): void { + if (this.isSubscribed) { + console.log( + "⚠️ [REALTIME] Already subscribed to Supabase realtime events", + ); + return; + } - if (error) { - console.error( - "⛔️ [CACHING] Error subscribing to realtime events ", - error, - ); - throw new Error("Error subscribing to realtime events caching"); - } else { - console.log( - "⚠️ [CACHING] Subscribed to realtime events with status", - status, - ); - throw new Error("Error subscribing to realtime events caching"); - } - }); + console.log( + "✏️ [REALTIME] Initializing Supabase realtime event subscriptions", + ); - supabaseData - .channel("schema-db-changes") - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "users", - }, - (payload) => handleChangeUsers(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "collections", - }, - (payload) => handleChangeHyperboards(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "hyperboards", - }, - (payload) => handleChangeHyperboards(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "hypercerts", - }, - (payload) => handleChangeHyperboards(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "hyperboard_hypercert_metadata", - }, - (payload) => handleChangeHyperboards(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "hyperboard_collections", - }, - (payload) => handleChangeHyperboards(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "hyperboard_blueprint_metadata", - }, - (payload) => handleChangeHyperboards(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "collection_blueprints", - }, - (payload) => handleChangeHyperboards(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "blueprints", - }, - (payload) => { - handleChangeBlueprints(payload); - handleChangeHyperboards(payload); - }, - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "users", - }, - (payload) => handleChangeHyperboards(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "collection_admins", - }, - (payload) => handleChangeHyperboards(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "hyperboard_admins", - }, - (payload) => handleChangeHyperboards(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "marketplace_orders", - }, - (payload) => handleChangeOrders(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "marketplace_order_nonces", - }, - (payload) => handleChangeOrders(payload), - ) - .on( - "postgres_changes", - { - event: "*", - schema: "public", - table: "signature_requests", - }, - (payload) => handleChangeSignatureRequests(payload), - ) - .subscribe((status, error) => { - if (status === "SUBSCRIBED") { - console.log( - "✅ [DATA] Subscribed to realtime events with status", - status, - ); - return; - } + try { + this.subscribeToSupabaseRealtimeEvents(); + this.isSubscribed = true; + console.log( + "✅ [REALTIME] Successfully subscribed to Supabase realtime events", + ); + } catch (error) { + console.error( + "⛔️ [REALTIME] Failed to subscribe to Supabase realtime events:", + error, + ); + throw error; + } + } - if (error) { - console.error( - "⛔️ [DATA] Error subscribing to realtime events ", - error, - ); - throw new Error("Error subscribing to realtime events data"); - } else { - console.log( - "⚠️ [DATA] Subscribed to realtime events with status", - status, - ); - throw new Error("Error subscribing to realtime events data"); - } - }); -}; + private subscribeToSupabaseRealtimeEvents(): void { + console.log("✏️ Subscribing to realtime events"); + + supabaseCaching + .channel("schema-db-changes") + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "claims", + }, + (payload) => handleChangeClaims(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "fractions", + }, + (payload) => handleChangeFractions(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "metadata", + }, + (payload) => handleChangeMetadata(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "sales", + }, + (payload) => handleChangeSales(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "sales", + }, + (payload) => handleChangeSales(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "allow_list_data", + }, + (payload) => handleChangeAllowlistRecords(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "hypercert_allow_list_records", + }, + (payload) => handleChangeAllowlistRecords(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "attestations", + }, + (payload) => handleChangeAttestations(payload), + ) + .subscribe((status, error) => { + if (status === "SUBSCRIBED") { + console.log( + "✅ [CACHING] Subscribed to realtime events with status", + status, + ); + return; + } + + if (error) { + console.error( + "⛔️ [CACHING] Error subscribing to realtime events ", + error, + ); + throw new Error("Error subscribing to realtime events caching"); + } else { + console.log( + "⚠️ [CACHING] Subscribed to realtime events with status", + status, + ); + throw new Error("Error subscribing to realtime events caching"); + } + }); + + supabaseData + .channel("schema-db-changes") + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "users", + }, + (payload) => handleChangeUsers(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "collections", + }, + (payload) => handleChangeHyperboards(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "hyperboards", + }, + (payload) => handleChangeHyperboards(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "hypercerts", + }, + (payload) => handleChangeHyperboards(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "hyperboard_hypercert_metadata", + }, + (payload) => handleChangeHyperboards(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "hyperboard_collections", + }, + (payload) => handleChangeHyperboards(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "hyperboard_blueprint_metadata", + }, + (payload) => handleChangeHyperboards(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "collection_blueprints", + }, + (payload) => handleChangeHyperboards(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "blueprints", + }, + (payload) => { + handleChangeBlueprints(payload); + handleChangeHyperboards(payload); + }, + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "users", + }, + (payload) => handleChangeHyperboards(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "collection_admins", + }, + (payload) => handleChangeHyperboards(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "hyperboard_admins", + }, + (payload) => handleChangeHyperboards(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "marketplace_orders", + }, + (payload) => handleChangeOrders(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "marketplace_order_nonces", + }, + (payload) => handleChangeOrders(payload), + ) + .on( + "postgres_changes", + { + event: "*", + schema: "public", + table: "signature_requests", + }, + (payload) => handleChangeSignatureRequests(payload), + ) + .subscribe((status, error) => { + if (status === "SUBSCRIBED") { + console.log( + "✅ [DATA] Subscribed to realtime events with status", + status, + ); + return; + } + + if (error) { + console.error( + "⛔️ [DATA] Error subscribing to realtime events ", + error, + ); + throw new Error("Error subscribing to realtime events data"); + } else { + console.log( + "⚠️ [DATA] Subscribed to realtime events with status", + status, + ); + throw new Error("Error subscribing to realtime events data"); + } + }); + } + + public isEventSubscriptionActive(): boolean { + return this.isSubscribed; + } +} diff --git a/src/index.ts b/src/index.ts index 703c359..ea83b72 100644 --- a/src/index.ts +++ b/src/index.ts @@ -12,7 +12,7 @@ import SignatureRequestProcessorCron from "./cron/SignatureRequestProcessing.js" import OrderInvalidationCronjob from "./cron/OrderInvalidation.js"; import { container } from "tsyringe"; import { ENABLE_CRON_JOBS } from "./utils/constants.js"; -import { subscribeToSupabaseRealtimeEvents } from "./client/supabase.js"; +import { SupabaseRealtimeManager } from "./client/supabase.js"; // @ts-expect-error BigInt is not supported by JSON BigInt.prototype.toJSON = function () { @@ -58,7 +58,8 @@ if (ENABLE_CRON_JOBS) { console.log("🚨 Cron jobs are disabled"); } -subscribeToSupabaseRealtimeEvents(); +const supabaseRealtimeManager = container.resolve(SupabaseRealtimeManager); +supabaseRealtimeManager.subscribeToEvents(); app.listen(PORT, () => { console.log(