From 6efbe159c43dd50fff8608b688be5e4eb7f3476b Mon Sep 17 00:00:00 2001 From: ditadi Date: Mon, 11 May 2026 16:16:47 +0100 Subject: [PATCH] refactor(appkit): introduce CoreServiceRegistry and ServiceLocator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AppKit's core previously ran a hardcoded boot sequence for cache + telemetry. This refactor introduces a generic registry so the core no longer names concrete services: - `CoreServiceFactory` — interface every core service implements, returning `{ instance, shutdown }` (or `null` for intentional opt-outs). - `CoreServiceRegistry` — boots a list of factories in declared order, shuts down in reverse, doubles as a typed locator via `get(name)`. - `bootstrap-factories.ts` — composes the list `_createApp` boots, living next to the concrete service modules so `core/appkit.ts` stays free of concrete-class imports. Cache and Telemetry are migrated to expose `.factory()` static methods and flow through the registry. Behaviour is identical: same boot order, same shutdown order, same opt-out semantics. Plugins now receive a typed `services: ServiceLocator` through `attachContext`, in addition to the existing per-service fields kept for backwards compatibility (`cache`, `telemetry`, plus a `taskflow` placeholder typed as `unknown` for forward compat). New plugins should consume `services.get(name)`; the named legacy fields are marked `@deprecated`. This PR is groundwork — no behaviour change, no public-API removal. The purpose is to make room for a third core service (TaskFlow) without re-threading the cross-package `attachContext` contract: adding a new service now only touches the service module and `bootstrap-factories.ts`. TaskFlow itself is intentionally NOT in this PR; it lands in a follow-up. Verified: pnpm -r typecheck, pnpm build, full pnpm test (122 files, 2255 tests) all green. Signed-off-by: ditadi --- packages/appkit/src/cache/index.ts | 26 +++++ packages/appkit/src/core/appkit.ts | 106 ++++++++++-------- .../appkit/src/core/bootstrap-factories.ts | 30 +++++ packages/appkit/src/core/service-registry.ts | 79 +++++++++++++ packages/appkit/src/plugin/plugin.ts | 45 ++++---- .../appkit/src/telemetry/telemetry-manager.ts | 39 +++++-- .../telemetry/tests/telemetry-manager.test.ts | 2 - packages/shared/src/plugin.ts | 13 ++- 8 files changed, 261 insertions(+), 79 deletions(-) create mode 100644 packages/appkit/src/core/bootstrap-factories.ts create mode 100644 packages/appkit/src/core/service-registry.ts diff --git a/packages/appkit/src/cache/index.ts b/packages/appkit/src/cache/index.ts index 37bd1659e..8fe7d556a 100644 --- a/packages/appkit/src/cache/index.ts +++ b/packages/appkit/src/cache/index.ts @@ -1,6 +1,7 @@ import { createHash } from "node:crypto"; import { ApiError, WorkspaceClient } from "@databricks/sdk-experimental"; import type { CacheConfig, CacheStorage } from "shared"; +import type { CoreServiceFactory } from "@/core/service-registry"; import { createLakebasePool } from "../connectors/lakebase"; import { AppKitError, ExecutionError, InitializationError } from "../errors"; import { createLogger } from "../logging/logger"; @@ -110,6 +111,31 @@ export class CacheManager { return CacheManager.initPromise; } + static factory(config?: CacheConfig): CoreServiceFactory { + return { + name: "cache", + async boot() { + const mgr = await CacheManager.getInstance(config); + return { + instance: mgr, + shutdown: () => mgr.shutdown(), + }; + }, + }; + } + + /** + * Closes storage, clears in-flight requests, and resets the singleton + * so a fresh `getInstance()` rebuilds from scratch (test isolation). + * @internal + */ + async shutdown(): Promise { + await this.close(); + this.inFlightRequests.clear(); + CacheManager.instance = null; + CacheManager.initPromise = null; + } + /** * Create a new cache manager instance * diff --git a/packages/appkit/src/core/appkit.ts b/packages/appkit/src/core/appkit.ts index 3421e03bb..9f920947d 100644 --- a/packages/appkit/src/core/appkit.ts +++ b/packages/appkit/src/core/appkit.ts @@ -7,31 +7,34 @@ import type { PluginConstructor, PluginData, PluginMap, + ServiceLocator, } from "shared"; import { version as productVersion } from "../../package.json"; -import { CacheManager } from "../cache"; import { ServiceContext } from "../context"; import { isInternalTelemetryEnabled, TelemetryReporter, } from "../internal-telemetry"; -import { createLogger } from "../logging/logger"; import { ResourceRegistry, ResourceType } from "../registry"; import type { TelemetryConfig } from "../telemetry"; -import { TelemetryManager } from "../telemetry"; +import { composeCoreFactories } from "./bootstrap-factories"; import { isToolProvider, PluginContext } from "./plugin-context"; - -const logger = createLogger("appkit"); +import { CoreServiceRegistry } from "./service-registry"; export class AppKit { #pluginInstances: Record = {}; #setupPromises: Promise[] = []; #context: PluginContext; + #registry: CoreServiceRegistry; - private constructor(config: { plugins: TPlugins }) { + private constructor( + config: { plugins: TPlugins }, + registry: CoreServiceRegistry, + ) { const { plugins, ...globalConfig } = config; this.#context = new PluginContext(); + this.#registry = registry; const pluginEntries = Object.entries(plugins); @@ -87,8 +90,14 @@ export class AppKit { const pluginInstance = new Plugin(baseConfig); if (typeof pluginInstance.attachContext === "function") { + // Pass a typed service locator instead of concrete service classes, + // so this file stays decoupled from the core service implementations. + const services: ServiceLocator = { + get: (serviceName: string) => this.#registry.get(serviceName), + }; pluginInstance.attachContext({ context: this.#context, + services, telemetryConfig: baseConfig.telemetry, }); } @@ -199,56 +208,61 @@ export class AppKit { disableInternalTelemetry?: boolean; } = {}, ): Promise> { - // Initialize core services - TelemetryManager.initialize(config?.telemetry); - await CacheManager.getInstance(config?.cache); - - const rawPlugins = config.plugins as T; - - // Collect manifest resources via registry - const registry = new ResourceRegistry(); - registry.collectResources(rawPlugins); - - // Derive ServiceContext needs from what manifests declared - const needsWarehouse = registry - .getRequired() - .some((r) => r.type === ResourceType.SQL_WAREHOUSE); - await ServiceContext.initialize( - { warehouseId: needsWarehouse }, - config?.client, + const registry = await CoreServiceRegistry.boot( + composeCoreFactories({ + telemetry: config?.telemetry, + cache: config?.cache, + }), ); - // Validate env vars - registry.enforceValidation(); + try { + const rawPlugins = config.plugins as T; - const preparedPlugins = AppKit.preparePlugins(rawPlugins); - const mergedConfig = { - plugins: preparedPlugins, - }; + const resourceRegistry = new ResourceRegistry(); + resourceRegistry.collectResources(rawPlugins); - const instance = new AppKit(mergedConfig); + const needsWarehouse = resourceRegistry + .getRequired() + .some((r) => r.type === ResourceType.SQL_WAREHOUSE); + await ServiceContext.initialize( + { warehouseId: needsWarehouse }, + config?.client, + ); - await Promise.all(instance.#setupPromises); - await instance.#context.emitLifecycle("setup:complete"); + resourceRegistry.enforceValidation(); - const handle = instance as unknown as PluginMap; + const preparedPlugins = AppKit.preparePlugins(rawPlugins); + const mergedConfig = { + plugins: preparedPlugins, + }; - if (config.onPluginsReady) { - logger.debug("Running onPluginsReady hook"); - await config.onPluginsReady(handle); - logger.debug("onPluginsReady hook completed"); - } + const instance = new AppKit(mergedConfig, registry); - if (isInternalTelemetryEnabled(config)) { - AppKit.bootstrapInternalTelemetry(); - } + await Promise.all(instance.#setupPromises); + await instance.#context.emitLifecycle("setup:complete"); - const serverPlugin = instance.#pluginInstances.server; - if (serverPlugin && typeof (serverPlugin as any).start === "function") { - await (serverPlugin as any).start(); - } + const handle = instance as unknown as PluginMap; - return handle; + if (config.onPluginsReady) { + await config.onPluginsReady(handle); + } + + if (isInternalTelemetryEnabled(config)) { + AppKit.bootstrapInternalTelemetry(); + } + + const serverPlugin = instance.#pluginInstances.server; + if (serverPlugin && typeof (serverPlugin as any).start === "function") { + await (serverPlugin as any).start({ + shutdownCoreServices: () => registry.shutdown(), + }); + } + + return handle; + } catch (error) { + await registry.shutdown(); + throw error; + } } private static bootstrapInternalTelemetry(): void { diff --git a/packages/appkit/src/core/bootstrap-factories.ts b/packages/appkit/src/core/bootstrap-factories.ts new file mode 100644 index 000000000..07dd6c909 --- /dev/null +++ b/packages/appkit/src/core/bootstrap-factories.ts @@ -0,0 +1,30 @@ +/** + * Composes the {@link CoreServiceFactory}s that AppKit boots. + * Keeps `core/appkit.ts` free of concrete service imports. + * + * @internal + */ +import type { CacheConfig } from "shared"; +import { CacheManager } from "../cache"; +import { type TelemetryConfig, TelemetryManager } from "../telemetry"; +import type { CoreServiceFactory } from "./service-registry"; + +interface BootstrapConfig { + telemetry?: TelemetryConfig; + cache?: CacheConfig; +} + +/** + * Returns factories in boot order: telemetry first so other services can + * record traces during their own boot. + * + * @internal + */ +export function composeCoreFactories( + config: BootstrapConfig, +): readonly CoreServiceFactory[] { + return [ + TelemetryManager.factory(config.telemetry), + CacheManager.factory(config.cache), + ]; +} diff --git a/packages/appkit/src/core/service-registry.ts b/packages/appkit/src/core/service-registry.ts new file mode 100644 index 000000000..65778ca88 --- /dev/null +++ b/packages/appkit/src/core/service-registry.ts @@ -0,0 +1,79 @@ +import { createLogger } from "@/logging"; + +const logger = createLogger("service-registry"); + +/** A booted core service. Resolved via the typed `get` helper. */ +interface BootedService { + readonly name: string; + readonly instance: unknown; + shutdown(): Promise; +} + +/** + * Factory that boots a core service. `boot()` returns `null` for + * intentionally-absent services (e.g. telemetry without an OTLP endpoint). + */ +export interface CoreServiceFactory { + readonly name: string; + boot(): Promise<{ instance: T; shutdown(): Promise } | null>; +} + +/** + * Manages the lifecycle of core services and acts as a typed locator. + * Services boot in provided order and shut down in reverse. + */ +export class CoreServiceRegistry { + private constructor(private readonly booted: BootedService[]) {} + + /** Boots services in order; unwinds already-booted services on failure. */ + static async boot( + factories: readonly CoreServiceFactory[], + ): Promise { + const booted: BootedService[] = []; + try { + for (const factory of factories) { + const result = await factory.boot(); + if (!result) continue; + booted.push({ + name: factory.name, + instance: result.instance, + shutdown: result.shutdown, + }); + logger.debug("Booted core service: %s", factory.name); + } + return new CoreServiceRegistry(booted); + } catch (error) { + await CoreServiceRegistry.unwind(booted); + throw error; + } + } + + /** Returns the booted instance for `name`, or `null` if not booted. */ + get(name: string): T | null { + for (const service of this.booted) { + if (service.name === name) { + return service.instance as T; + } + } + return null; + } + + /** Shuts down services in the reverse order they were booted. */ + async shutdown(): Promise { + await CoreServiceRegistry.unwind(this.booted); + } + + private static async unwind(booted: BootedService[]): Promise { + while (booted.length > 0) { + const service = booted.pop(); + if (!service) continue; + + try { + await service.shutdown(); + logger.debug("Shutdown core service: %s", service.name); + } catch (error) { + logger.error("Service %s shutdown failed: %O", service.name, error); + } + } + } +} diff --git a/packages/appkit/src/plugin/plugin.ts b/packages/appkit/src/plugin/plugin.ts index 49d211913..b5dc60cf4 100644 --- a/packages/appkit/src/plugin/plugin.ts +++ b/packages/appkit/src/plugin/plugin.ts @@ -9,6 +9,7 @@ import type { PluginExecutionSettings, PluginPhase, RouteConfig, + ServiceLocator, StreamExecuteHandler, StreamExecutionSettings, } from "shared"; @@ -218,15 +219,19 @@ export abstract class Plugin< | PluginContext | undefined; - // Eagerly bind telemetry + cache if the core services have already been - // initialized (normal createApp path, or tests that mock CacheManager). - // If they haven't, we leave these undefined and rely on `attachContext` - // being called later — this lets factories eagerly construct plugin - // instances at module top-level before `createApp` has run. - this.tryAttachContext(); + // Eager bind for plugins instantiated outside `_createApp` (mostly + // unit tests that mock `CacheManager` and call `new MyPlugin()` + // directly). Production goes through `_createApp` → `attachContext`, + // which overrides whatever is captured here. + this._tryEagerBindServices(); } - private tryAttachContext(): void { + /** + * Test-fallback for plugins instantiated outside `_createApp`. Removable + * once tests migrate to explicit `attachContext` setup. + * @internal + */ + private _tryEagerBindServices(): void { try { this.cache = CacheManager.getInstanceSync(); } catch { @@ -240,22 +245,22 @@ export abstract class Plugin< } /** - * Binds runtime dependencies (telemetry provider, cache, plugin context) to - * this plugin. Called by `AppKit._createApp` after construction and before - * `setup()`. Idempotent: safe to call if the constructor already bound them - * eagerly. Kept separate so factories can eagerly construct plugin instances - * without running this before `TelemetryManager.initialize()` / - * `CacheManager.getInstance()` have run. + * Binds runtime dependencies (cache, telemetry, plugin context). Called + * by `_createApp` after construction and before `setup()`. Plugin authors + * that need a service the base class doesn't surface as a property can + * reach for `deps.services?.get("my-service")`. */ attachContext( deps: { context?: unknown; + services?: ServiceLocator; telemetryConfig?: BasePluginConfig["telemetry"]; } = {}, ): void { - if (!this.cache) { - this.cache = CacheManager.getInstanceSync(); - } + this.cache = + deps.services?.get("cache") ?? + this.cache ?? + CacheManager.getInstanceSync(); this.telemetry = TelemetryManager.getProvider( this.name, deps.telemetryConfig ?? this.config.telemetry, @@ -698,10 +703,12 @@ export abstract class Plugin< } private _checkIfGenerator( - result: any, - ): result is AsyncGenerator { + result: unknown, + ): result is AsyncGenerator { return ( - result && typeof result === "object" && Symbol.asyncIterator in result + typeof result === "object" && + result !== null && + Symbol.asyncIterator in result ); } } diff --git a/packages/appkit/src/telemetry/telemetry-manager.ts b/packages/appkit/src/telemetry/telemetry-manager.ts index 6660b6b2c..ef7be9c06 100644 --- a/packages/appkit/src/telemetry/telemetry-manager.ts +++ b/packages/appkit/src/telemetry/telemetry-manager.ts @@ -22,6 +22,7 @@ import { ATTR_SERVICE_VERSION, } from "@opentelemetry/semantic-conventions"; import type { TelemetryOptions } from "shared"; +import type { CoreServiceFactory } from "@/core/service-registry"; import { createLogger } from "../logging/logger"; import { TelemetryProvider } from "./telemetry-provider"; import { AppKitSampler } from "./trace-sampler"; @@ -95,13 +96,34 @@ export class TelemetryManager { }); this.sdk.start(); - this.registerShutdown(); logger.debug("Initialized successfully"); } catch (error) { logger.error("Failed to initialize: %O", error); } } + /** True once the underlying NodeSDK has started. */ + isActive(): boolean { + return this.sdk !== undefined; + } + + static factory( + config?: TelemetryConfig, + ): CoreServiceFactory { + return { + name: "telemetry", + async boot() { + TelemetryManager.initialize(config); + const instance = TelemetryManager.getInstance(); + if (!instance.isActive()) return null; + return { + instance, + shutdown: () => instance.shutdown(), + }; + }, + }; + } + /** * Register OpenTelemetry instrumentations. * Can be called at any time, but recommended to call in plugin constructor. @@ -158,15 +180,12 @@ export class TelemetryManager { ]; } - private registerShutdown() { - const shutdownFn = async () => { - await TelemetryManager.getInstance().shutdown(); - }; - process.once("SIGTERM", shutdownFn); - process.once("SIGINT", shutdownFn); - } - - private async shutdown(): Promise { + /** + * Drains pending spans/metrics/logs and shuts down the NodeSDK. + * Idempotent. + * @internal + */ + async shutdown(): Promise { if (!this.sdk) { return; } diff --git a/packages/appkit/src/telemetry/tests/telemetry-manager.test.ts b/packages/appkit/src/telemetry/tests/telemetry-manager.test.ts index 11b85d9bf..92e453a01 100644 --- a/packages/appkit/src/telemetry/tests/telemetry-manager.test.ts +++ b/packages/appkit/src/telemetry/tests/telemetry-manager.test.ts @@ -54,8 +54,6 @@ describe("TelemetryManager", () => { vi.clearAllMocks(); // @ts-expect-error - accessing private static property for testing TelemetryManager.instance = undefined; - // @ts-expect-error - accessing private static property for testing - TelemetryManager.shutdownRegistered = false; }); afterEach(() => { diff --git a/packages/shared/src/plugin.ts b/packages/shared/src/plugin.ts index 651840c7b..cca42bdb3 100644 --- a/packages/shared/src/plugin.ts +++ b/packages/shared/src/plugin.ts @@ -9,6 +9,14 @@ import type { // Re-export generated types as the shared canonical definitions. export type { ResourceFieldEntry }; +/** + * Typed service locator passed to {@link BasePlugin.attachContext}. + * Returns `null` when the service is not registered. + */ +export interface ServiceLocator { + get(name: string): T | null; +} + /** Base plugin interface. */ export interface BasePlugin { name: string; @@ -28,11 +36,12 @@ export interface BasePlugin { clientConfig?(): Record; /** - * Binds runtime dependencies (telemetry, cache, plugin context) after the - * plugin has been constructed. Called by the AppKit core before `setup()`. + * Binds runtime dependencies after construction. Called before `setup()`. + * Plugins fetch services via `services.get("my-service")`. */ attachContext?(deps: { context?: unknown; + services?: ServiceLocator; telemetryConfig?: TelemetryOptions; }): void; }