Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions packages/appkit/src/cache/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { createHash } from "node:crypto";
import { ApiError, WorkspaceClient } from "@databricks/sdk-experimental";
import type { CacheConfig, CacheStorage } from "shared";
import type { CoreServiceFactory } from "@/core/service-registry";
import { createLakebasePool } from "../connectors/lakebase";
import { AppKitError, ExecutionError, InitializationError } from "../errors";
import { createLogger } from "../logging/logger";
Expand Down Expand Up @@ -110,6 +111,31 @@ export class CacheManager {
return CacheManager.initPromise;
}

static factory(config?: CacheConfig): CoreServiceFactory<CacheManager> {
return {
name: "cache",
async boot() {
const mgr = await CacheManager.getInstance(config);
return {
instance: mgr,
shutdown: () => mgr.shutdown(),
};
},
};
}

/**
* Closes storage, clears in-flight requests, and resets the singleton
* so a fresh `getInstance()` rebuilds from scratch (test isolation).
* @internal
*/
async shutdown(): Promise<void> {
await this.close();
this.inFlightRequests.clear();
CacheManager.instance = null;
CacheManager.initPromise = null;
}

/**
* Create a new cache manager instance
*
Expand Down
106 changes: 60 additions & 46 deletions packages/appkit/src/core/appkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,34 @@ import type {
PluginConstructor,
PluginData,
PluginMap,
ServiceLocator,
} from "shared";
import { version as productVersion } from "../../package.json";
import { CacheManager } from "../cache";
import { ServiceContext } from "../context";
import {
isInternalTelemetryEnabled,
TelemetryReporter,
} from "../internal-telemetry";
import { createLogger } from "../logging/logger";
import { ResourceRegistry, ResourceType } from "../registry";
import type { TelemetryConfig } from "../telemetry";
import { TelemetryManager } from "../telemetry";
import { composeCoreFactories } from "./bootstrap-factories";
import { isToolProvider, PluginContext } from "./plugin-context";

const logger = createLogger("appkit");
import { CoreServiceRegistry } from "./service-registry";

export class AppKit<TPlugins extends InputPluginMap> {
#pluginInstances: Record<string, BasePlugin> = {};
#setupPromises: Promise<void>[] = [];
#context: PluginContext;
#registry: CoreServiceRegistry;

private constructor(config: { plugins: TPlugins }) {
private constructor(
config: { plugins: TPlugins },
registry: CoreServiceRegistry,
) {
const { plugins, ...globalConfig } = config;

this.#context = new PluginContext();
this.#registry = registry;

const pluginEntries = Object.entries(plugins);

Expand Down Expand Up @@ -87,8 +90,14 @@ export class AppKit<TPlugins extends InputPluginMap> {
const pluginInstance = new Plugin(baseConfig);

if (typeof pluginInstance.attachContext === "function") {
// Pass a typed service locator instead of concrete service classes,
// so this file stays decoupled from the core service implementations.
const services: ServiceLocator = {
get: <T>(serviceName: string) => this.#registry.get<T>(serviceName),
};
pluginInstance.attachContext({
context: this.#context,
services,
telemetryConfig: baseConfig.telemetry,
});
}
Expand Down Expand Up @@ -199,56 +208,61 @@ export class AppKit<TPlugins extends InputPluginMap> {
disableInternalTelemetry?: boolean;
} = {},
): Promise<PluginMap<T>> {
// Initialize core services
TelemetryManager.initialize(config?.telemetry);
await CacheManager.getInstance(config?.cache);

const rawPlugins = config.plugins as T;

// Collect manifest resources via registry
const registry = new ResourceRegistry();
registry.collectResources(rawPlugins);

// Derive ServiceContext needs from what manifests declared
const needsWarehouse = registry
.getRequired()
.some((r) => r.type === ResourceType.SQL_WAREHOUSE);
await ServiceContext.initialize(
{ warehouseId: needsWarehouse },
config?.client,
const registry = await CoreServiceRegistry.boot(
composeCoreFactories({
telemetry: config?.telemetry,
cache: config?.cache,
}),
);

// Validate env vars
registry.enforceValidation();
try {
const rawPlugins = config.plugins as T;

const preparedPlugins = AppKit.preparePlugins(rawPlugins);
const mergedConfig = {
plugins: preparedPlugins,
};
const resourceRegistry = new ResourceRegistry();
resourceRegistry.collectResources(rawPlugins);

const instance = new AppKit(mergedConfig);
const needsWarehouse = resourceRegistry
.getRequired()
.some((r) => r.type === ResourceType.SQL_WAREHOUSE);
await ServiceContext.initialize(
{ warehouseId: needsWarehouse },
config?.client,
);

await Promise.all(instance.#setupPromises);
await instance.#context.emitLifecycle("setup:complete");
resourceRegistry.enforceValidation();

const handle = instance as unknown as PluginMap<T>;
const preparedPlugins = AppKit.preparePlugins(rawPlugins);
const mergedConfig = {
plugins: preparedPlugins,
};

if (config.onPluginsReady) {
logger.debug("Running onPluginsReady hook");
await config.onPluginsReady(handle);
logger.debug("onPluginsReady hook completed");
}
const instance = new AppKit(mergedConfig, registry);

if (isInternalTelemetryEnabled(config)) {
AppKit.bootstrapInternalTelemetry();
}
await Promise.all(instance.#setupPromises);
await instance.#context.emitLifecycle("setup:complete");

const serverPlugin = instance.#pluginInstances.server;
if (serverPlugin && typeof (serverPlugin as any).start === "function") {
await (serverPlugin as any).start();
}
const handle = instance as unknown as PluginMap<T>;

return handle;
if (config.onPluginsReady) {
await config.onPluginsReady(handle);
}

if (isInternalTelemetryEnabled(config)) {
AppKit.bootstrapInternalTelemetry();
}

const serverPlugin = instance.#pluginInstances.server;
if (serverPlugin && typeof (serverPlugin as any).start === "function") {
await (serverPlugin as any).start({
shutdownCoreServices: () => registry.shutdown(),
});
}

return handle;
} catch (error) {
await registry.shutdown();
throw error;
}
}

private static bootstrapInternalTelemetry(): void {
Expand Down
30 changes: 30 additions & 0 deletions packages/appkit/src/core/bootstrap-factories.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Composes the {@link CoreServiceFactory}s that AppKit boots.
* Keeps `core/appkit.ts` free of concrete service imports.
*
* @internal
*/
import type { CacheConfig } from "shared";
import { CacheManager } from "../cache";
import { type TelemetryConfig, TelemetryManager } from "../telemetry";
import type { CoreServiceFactory } from "./service-registry";

interface BootstrapConfig {
telemetry?: TelemetryConfig;
cache?: CacheConfig;
}

/**
* Returns factories in boot order: telemetry first so other services can
* record traces during their own boot.
*
* @internal
*/
export function composeCoreFactories(
config: BootstrapConfig,
): readonly CoreServiceFactory[] {
return [
TelemetryManager.factory(config.telemetry),
CacheManager.factory(config.cache),
];
}
79 changes: 79 additions & 0 deletions packages/appkit/src/core/service-registry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { createLogger } from "@/logging";

const logger = createLogger("service-registry");

/** A booted core service. Resolved via the typed `get<T>` helper. */
interface BootedService {
readonly name: string;
readonly instance: unknown;
shutdown(): Promise<void>;
}

/**
* Factory that boots a core service. `boot()` returns `null` for
* intentionally-absent services (e.g. telemetry without an OTLP endpoint).
*/
export interface CoreServiceFactory<T = unknown> {
readonly name: string;
boot(): Promise<{ instance: T; shutdown(): Promise<void> } | null>;
}

/**
* Manages the lifecycle of core services and acts as a typed locator.
* Services boot in provided order and shut down in reverse.
*/
export class CoreServiceRegistry {
private constructor(private readonly booted: BootedService[]) {}

/** Boots services in order; unwinds already-booted services on failure. */
static async boot(
factories: readonly CoreServiceFactory[],
): Promise<CoreServiceRegistry> {
const booted: BootedService[] = [];
try {
for (const factory of factories) {
const result = await factory.boot();
if (!result) continue;
booted.push({
name: factory.name,
instance: result.instance,
shutdown: result.shutdown,
});
logger.debug("Booted core service: %s", factory.name);
}
return new CoreServiceRegistry(booted);
} catch (error) {
await CoreServiceRegistry.unwind(booted);
throw error;
}
}

/** Returns the booted instance for `name`, or `null` if not booted. */
get<T>(name: string): T | null {
for (const service of this.booted) {
if (service.name === name) {
return service.instance as T;
}
}
return null;
}

/** Shuts down services in the reverse order they were booted. */
async shutdown(): Promise<void> {
await CoreServiceRegistry.unwind(this.booted);
}

private static async unwind(booted: BootedService[]): Promise<void> {
while (booted.length > 0) {
const service = booted.pop();
if (!service) continue;

try {
await service.shutdown();
logger.debug("Shutdown core service: %s", service.name);
} catch (error) {
logger.error("Service %s shutdown failed: %O", service.name, error);
}
}
}
}
45 changes: 26 additions & 19 deletions packages/appkit/src/plugin/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type {
PluginExecutionSettings,
PluginPhase,
RouteConfig,
ServiceLocator,
StreamExecuteHandler,
StreamExecutionSettings,
} from "shared";
Expand Down Expand Up @@ -218,15 +219,19 @@ export abstract class Plugin<
| PluginContext
| undefined;

// Eagerly bind telemetry + cache if the core services have already been
// initialized (normal createApp path, or tests that mock CacheManager).
// If they haven't, we leave these undefined and rely on `attachContext`
// being called later — this lets factories eagerly construct plugin
// instances at module top-level before `createApp` has run.
this.tryAttachContext();
// Eager bind for plugins instantiated outside `_createApp` (mostly
// unit tests that mock `CacheManager` and call `new MyPlugin()`
// directly). Production goes through `_createApp` → `attachContext`,
// which overrides whatever is captured here.
this._tryEagerBindServices();
}

private tryAttachContext(): void {
/**
* Test-fallback for plugins instantiated outside `_createApp`. Removable
* once tests migrate to explicit `attachContext` setup.
* @internal
*/
private _tryEagerBindServices(): void {
try {
this.cache = CacheManager.getInstanceSync();
} catch {
Expand All @@ -240,22 +245,22 @@ export abstract class Plugin<
}

/**
* Binds runtime dependencies (telemetry provider, cache, plugin context) to
* this plugin. Called by `AppKit._createApp` after construction and before
* `setup()`. Idempotent: safe to call if the constructor already bound them
* eagerly. Kept separate so factories can eagerly construct plugin instances
* without running this before `TelemetryManager.initialize()` /
* `CacheManager.getInstance()` have run.
* Binds runtime dependencies (cache, telemetry, plugin context). Called
* by `_createApp` after construction and before `setup()`. Plugin authors
* that need a service the base class doesn't surface as a property can
* reach for `deps.services?.get<MyService>("my-service")`.
*/
attachContext(
deps: {
context?: unknown;
services?: ServiceLocator;
telemetryConfig?: BasePluginConfig["telemetry"];
} = {},
): void {
if (!this.cache) {
this.cache = CacheManager.getInstanceSync();
}
this.cache =
deps.services?.get<CacheManager>("cache") ??
this.cache ??
CacheManager.getInstanceSync();
this.telemetry = TelemetryManager.getProvider(
this.name,
deps.telemetryConfig ?? this.config.telemetry,
Expand Down Expand Up @@ -698,10 +703,12 @@ export abstract class Plugin<
}

private _checkIfGenerator(
result: any,
): result is AsyncGenerator<any, void, unknown> {
result: unknown,
): result is AsyncGenerator<unknown, void, unknown> {
return (
result && typeof result === "object" && Symbol.asyncIterator in result
typeof result === "object" &&
result !== null &&
Symbol.asyncIterator in result
);
}
}
Loading
Loading