Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ coverage
.turbo

.databricks

# TaskFlow durable storage (SQLite + WAL); per-machine, never checked in.
.appkit/
5 changes: 4 additions & 1 deletion apps/dev-playground/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@ test-results/
playwright-report/

# Auto-generated types (endpoint-specific, varies per developer)
shared/appkit-types/serving.d.ts
shared/appkit-types/serving.d.ts

# TaskFlow durable storage (SQLite + WAL); per-machine, never checked in.
.appkit/
1 change: 1 addition & 0 deletions knip.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"**/*.generated.ts",
"**/*.example.tsx",
"**/*.css",
"packages/appkit/vendor/**",
"packages/appkit/src/plugins/vector-search/**",
"packages/appkit/src/plugin/index.ts",
"packages/appkit/src/plugin/to-plugin.ts",
Expand Down
8 changes: 6 additions & 2 deletions packages/appkit/src/core/appkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
TelemetryReporter,
} from "../internal-telemetry";
import { ResourceRegistry, ResourceType } from "../registry";
import type { TaskflowConfig } from "../taskflow";
import type { TelemetryConfig } from "../telemetry";
import { composeCoreFactories } from "./bootstrap-factories";
import { isToolProvider, PluginContext } from "./plugin-context";
Expand Down Expand Up @@ -90,8 +91,8 @@ export class AppKit<TPlugins extends InputPluginMap> {
const pluginInstance = new Plugin(baseConfig);

if (typeof pluginInstance.attachContext === "function") {
// Pass a typed service locator instead of concrete service classes,
// so this file stays decoupled from the core service implementations.
// Pass a typed locator instead of concrete service classes, so
// this file stays decoupled from the core service implementations.
const services: ServiceLocator = {
get: <T>(serviceName: string) => this.#registry.get<T>(serviceName),
};
Expand Down Expand Up @@ -203,6 +204,7 @@ export class AppKit<TPlugins extends InputPluginMap> {
plugins?: T;
telemetry?: TelemetryConfig;
cache?: CacheConfig;
taskflow?: TaskflowConfig | false;
client?: WorkspaceClient;
onPluginsReady?: (appkit: PluginMap<T>) => void | Promise<void>;
disableInternalTelemetry?: boolean;
Expand All @@ -212,6 +214,7 @@ export class AppKit<TPlugins extends InputPluginMap> {
composeCoreFactories({
telemetry: config?.telemetry,
cache: config?.cache,
taskflow: config?.taskflow,
}),
);

Expand Down Expand Up @@ -334,6 +337,7 @@ export async function createApp<
plugins?: T;
telemetry?: TelemetryConfig;
cache?: CacheConfig;
taskflow?: TaskflowConfig | false;
client?: WorkspaceClient;
onPluginsReady?: (appkit: PluginMap<T>) => void | Promise<void>;
disableInternalTelemetry?: boolean;
Expand Down
8 changes: 6 additions & 2 deletions packages/appkit/src/core/bootstrap-factories.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@
*/
import type { CacheConfig } from "shared";
import { CacheManager } from "../cache";
import { type TaskflowConfig, TaskflowService } from "../taskflow";
import { type TelemetryConfig, TelemetryManager } from "../telemetry";
import type { CoreServiceFactory } from "./service-registry";

interface BootstrapConfig {
telemetry?: TelemetryConfig;
cache?: CacheConfig;
taskflow?: TaskflowConfig | false;
}

/**
* Returns factories in boot order: telemetry first so other services can
* record traces during their own boot.
* Returns factories in boot order: telemetry first so other services
* can trace during their own boot; taskflow last. Shutdown unwinds in
* reverse so taskflow drains before cache/telemetry close.
*
* @internal
*/
Expand All @@ -26,5 +29,6 @@ export function composeCoreFactories(
return [
TelemetryManager.factory(config.telemetry),
CacheManager.factory(config.cache),
TaskflowService.factory(config.taskflow),
];
}
47 changes: 37 additions & 10 deletions packages/appkit/src/plugin/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import type { PluginContext } from "../core/plugin-context";
import { AppKitError, AuthenticationError } from "../errors";
import { createLogger } from "../logging/logger";
import { StreamManager } from "../stream";
import { TaskflowService } from "../taskflow";
import {
type ITelemetry,
normalizeTelemetryOptions,
Expand Down Expand Up @@ -91,6 +92,9 @@ const EXCLUDED_FROM_PROXY = new Set([
"asUser",
// Internal methods
"constructor",
// Synchronous accessor for the shared singleton — no need for an
// extra async-context frame.
"requireTaskflow",
]);

/**
Expand Down Expand Up @@ -188,6 +192,13 @@ export abstract class Plugin<
protected telemetry!: ITelemetry;
protected context?: PluginContext;

/**
* Durable execution service. `null` when the app opted out via
* `createApp({ taskflow: false })`. Use {@link requireTaskflow} when
* the plugin needs it to be present.
*/
protected taskflow: TaskflowService | null = null;

/** Registered endpoints for this plugin */
private registeredEndpoints: PluginEndpointMap = {};

Expand Down Expand Up @@ -220,15 +231,13 @@ export abstract class Plugin<
| undefined;

// Eager bind for plugins instantiated outside `_createApp` (mostly
// unit tests that mock `CacheManager` and call `new MyPlugin()`
// directly). Production goes through `_createApp` → `attachContext`,
// which overrides whatever is captured here.
// unit tests). Production goes through `attachContext`, which
// overrides whatever is captured here.
this._tryEagerBindServices();
}

/**
* Test-fallback for plugins instantiated outside `_createApp`. Removable
* once tests migrate to explicit `attachContext` setup.
* Test-fallback for plugins instantiated outside `_createApp`.
* @internal
*/
private _tryEagerBindServices(): void {
Expand All @@ -241,14 +250,15 @@ export abstract class Plugin<
this.name,
this.config.telemetry,
);
this.taskflow = TaskflowService.tryGetInstance();
this.isReady = true;
}

/**
* Binds runtime dependencies (cache, telemetry, plugin context). Called
* by `_createApp` after construction and before `setup()`. Plugin authors
* that need a service the base class doesn't surface as a property can
* reach for `deps.services?.get<MyService>("my-service")`.
* Binds runtime dependencies (cache, telemetry, taskflow, plugin
* context). Called by `_createApp` before `setup()`. Plugin authors
* that need a service the base class doesn't surface as a property
* can reach for `deps.services?.get<MyService>("my-service")`.
*/
attachContext(
deps: {
Expand All @@ -257,20 +267,37 @@ export abstract class Plugin<
telemetryConfig?: BasePluginConfig["telemetry"];
} = {},
): void {
const fromLocator = deps.services;
this.cache =
deps.services?.get<CacheManager>("cache") ??
fromLocator?.get<CacheManager>("cache") ??
this.cache ??
CacheManager.getInstanceSync();
this.telemetry = TelemetryManager.getProvider(
this.name,
deps.telemetryConfig ?? this.config.telemetry,
);
this.taskflow = fromLocator
? fromLocator.get<TaskflowService>("taskflow")
: TaskflowService.tryGetInstance();
if (deps.context !== undefined) {
this.context = deps.context as PluginContext;
}
this.isReady = true;
}

/**
* Returns the live {@link TaskflowService}, or throws if it was
* disabled via `createApp({ taskflow: false })`.
*/
protected requireTaskflow(): TaskflowService {
if (!this.taskflow) {
throw new Error(
`Plugin "${this.name}" requires TaskFlow but it was disabled via createApp({ taskflow: false }). Remove the opt-out or refactor the plugin to not use this.taskflow.`,
);
}
return this.taskflow;
}

injectRoutes(_: express.Router) {
return;
}
Expand Down
45 changes: 31 additions & 14 deletions packages/appkit/src/plugins/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ export class ServerPlugin extends Plugin {
protected declare config: ServerConfig;
private serverExtensions: ((app: express.Application) => void)[] = [];
private rawBodyPaths: Set<string> = new Set();
/**
* Drains the core services in reverse boot order. Wired by `_createApp`;
* optional so plugins constructed in tests keep working.
*/
private _shutdownCoreServices?: () => Promise<void>;
static phase: PluginPhase = "deferred";

constructor(config: ServerConfig) {
Expand Down Expand Up @@ -102,22 +107,22 @@ export class ServerPlugin extends Plugin {
}

/**
* Start the server.
*
* This method starts the server and sets up the frontend.
* It also sets up the remote tunneling if enabled.
*
* @returns The express application.
* Starts the server, registers the frontend, and (if enabled) the
* remote tunnel. The plugin owns the SIGTERM handler; the optional
* `shutdownCoreServices` hook supplied by `_createApp` drains core
* services on shutdown.
*/
async start(): Promise<express.Application> {
async start(options?: {
shutdownCoreServices?: () => Promise<void>;
}): Promise<express.Application> {
this._shutdownCoreServices = options?.shutdownCoreServices;
this.serverApplication.use(requestMetricsMiddleware);
this.serverApplication.use(
express.json({
type: (req) => {
// Skip JSON parsing for routes that declared skipBodyParsing
// (e.g. file uploads where the raw body must flow through).
// rawBodyPaths is populated by extendRoutes() below; the type
// callback runs per-request so the set is already filled.
// Skip JSON parsing for routes that opted out (e.g. file
// uploads). `rawBodyPaths` is populated by `extendRoutes()`
// before any request hits this callback.
const urlPath = req.url?.split("?")[0];
if (urlPath && this.rawBodyPaths.has(urlPath)) return false;
const ct = req.headers["content-type"] ?? "";
Expand Down Expand Up @@ -391,6 +396,13 @@ export class ServerPlugin extends Plugin {
}
}

/**
* Graceful shutdown sequence:
* 1. Tear down server-local pieces (Vite, tunnel, telemetry reporter).
* 2. Abort in-flight plugin operations so streams release HTTP conns.
* 3. Drain core services via `_shutdownCoreServices` (reverse boot order).
* 4. Close the HTTP server, force-exit on timeout.
*/
private async _gracefulShutdown() {
logger.info("Starting graceful shutdown...");

Expand All @@ -404,7 +416,6 @@ export class ServerPlugin extends Plugin {

TelemetryReporter.getInstance()?.stop();

// 1. abort active operations from plugins
const shutdownPlugins = this.context?.getPlugins();
if (shutdownPlugins) {
for (const plugin of shutdownPlugins.values()) {
Expand All @@ -422,14 +433,20 @@ export class ServerPlugin extends Plugin {
}
}

// 2. close the server
if (this._shutdownCoreServices) {
try {
await this._shutdownCoreServices();
} catch (err) {
logger.error("Error shutting down core services: %O", err);
}
}

if (this.server) {
this.server.close(() => {
logger.debug("Server closed gracefully");
process.exit(0);
});

// 3. timeout to force shutdown after 15 seconds
setTimeout(() => {
logger.debug("Force shutdown after timeout");
process.exit(1);
Expand Down
Loading