diff --git a/.gitignore b/.gitignore index 5d417368..c6c522b9 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,6 @@ coverage .turbo .databricks + +# TaskFlow durable storage (SQLite + WAL); per-machine, never checked in. +.appkit/ diff --git a/apps/dev-playground/.gitignore b/apps/dev-playground/.gitignore index 1f4745f5..d36ad9c8 100644 --- a/apps/dev-playground/.gitignore +++ b/apps/dev-playground/.gitignore @@ -3,4 +3,7 @@ test-results/ playwright-report/ # Auto-generated types (endpoint-specific, varies per developer) -shared/appkit-types/serving.d.ts \ No newline at end of file +shared/appkit-types/serving.d.ts + +# TaskFlow durable storage (SQLite + WAL); per-machine, never checked in. +.appkit/ diff --git a/knip.json b/knip.json index 9692fa3f..e4f3829d 100644 --- a/knip.json +++ b/knip.json @@ -15,6 +15,7 @@ "**/*.generated.ts", "**/*.example.tsx", "**/*.css", + "packages/appkit/vendor/**", "packages/appkit/src/plugins/vector-search/**", "packages/appkit/src/plugin/index.ts", "packages/appkit/src/plugin/to-plugin.ts", diff --git a/packages/appkit/src/core/appkit.ts b/packages/appkit/src/core/appkit.ts index 9f920947..fd1d854d 100644 --- a/packages/appkit/src/core/appkit.ts +++ b/packages/appkit/src/core/appkit.ts @@ -16,6 +16,7 @@ import { TelemetryReporter, } from "../internal-telemetry"; import { ResourceRegistry, ResourceType } from "../registry"; +import type { TaskflowConfig } from "../taskflow"; import type { TelemetryConfig } from "../telemetry"; import { composeCoreFactories } from "./bootstrap-factories"; import { isToolProvider, PluginContext } from "./plugin-context"; @@ -90,8 +91,8 @@ export class AppKit { const pluginInstance = new Plugin(baseConfig); if (typeof pluginInstance.attachContext === "function") { - // Pass a typed service locator instead of concrete service classes, - // so this file stays decoupled from the core service implementations. + // Pass a typed locator instead of concrete service classes, so + // this file stays decoupled from the core service implementations. const services: ServiceLocator = { get: (serviceName: string) => this.#registry.get(serviceName), }; @@ -203,6 +204,7 @@ export class AppKit { plugins?: T; telemetry?: TelemetryConfig; cache?: CacheConfig; + taskflow?: TaskflowConfig | false; client?: WorkspaceClient; onPluginsReady?: (appkit: PluginMap) => void | Promise; disableInternalTelemetry?: boolean; @@ -212,6 +214,7 @@ export class AppKit { composeCoreFactories({ telemetry: config?.telemetry, cache: config?.cache, + taskflow: config?.taskflow, }), ); @@ -334,6 +337,7 @@ export async function createApp< plugins?: T; telemetry?: TelemetryConfig; cache?: CacheConfig; + taskflow?: TaskflowConfig | false; client?: WorkspaceClient; onPluginsReady?: (appkit: PluginMap) => void | Promise; disableInternalTelemetry?: boolean; diff --git a/packages/appkit/src/core/bootstrap-factories.ts b/packages/appkit/src/core/bootstrap-factories.ts index 07dd6c90..7706d5f6 100644 --- a/packages/appkit/src/core/bootstrap-factories.ts +++ b/packages/appkit/src/core/bootstrap-factories.ts @@ -6,17 +6,20 @@ */ import type { CacheConfig } from "shared"; import { CacheManager } from "../cache"; +import { type TaskflowConfig, TaskflowService } from "../taskflow"; import { type TelemetryConfig, TelemetryManager } from "../telemetry"; import type { CoreServiceFactory } from "./service-registry"; interface BootstrapConfig { telemetry?: TelemetryConfig; cache?: CacheConfig; + taskflow?: TaskflowConfig | false; } /** - * Returns factories in boot order: telemetry first so other services can - * record traces during their own boot. + * Returns factories in boot order: telemetry first so other services + * can trace during their own boot; taskflow last. Shutdown unwinds in + * reverse so taskflow drains before cache/telemetry close. * * @internal */ @@ -26,5 +29,6 @@ export function composeCoreFactories( return [ TelemetryManager.factory(config.telemetry), CacheManager.factory(config.cache), + TaskflowService.factory(config.taskflow), ]; } diff --git a/packages/appkit/src/plugin/plugin.ts b/packages/appkit/src/plugin/plugin.ts index b5dc60cf..ec89c44b 100644 --- a/packages/appkit/src/plugin/plugin.ts +++ b/packages/appkit/src/plugin/plugin.ts @@ -25,6 +25,7 @@ import type { PluginContext } from "../core/plugin-context"; import { AppKitError, AuthenticationError } from "../errors"; import { createLogger } from "../logging/logger"; import { StreamManager } from "../stream"; +import { TaskflowService } from "../taskflow"; import { type ITelemetry, normalizeTelemetryOptions, @@ -91,6 +92,9 @@ const EXCLUDED_FROM_PROXY = new Set([ "asUser", // Internal methods "constructor", + // Synchronous accessor for the shared singleton — no need for an + // extra async-context frame. + "requireTaskflow", ]); /** @@ -188,6 +192,13 @@ export abstract class Plugin< protected telemetry!: ITelemetry; protected context?: PluginContext; + /** + * Durable execution service. `null` when the app opted out via + * `createApp({ taskflow: false })`. Use {@link requireTaskflow} when + * the plugin needs it to be present. + */ + protected taskflow: TaskflowService | null = null; + /** Registered endpoints for this plugin */ private registeredEndpoints: PluginEndpointMap = {}; @@ -220,15 +231,13 @@ export abstract class Plugin< | undefined; // Eager bind for plugins instantiated outside `_createApp` (mostly - // unit tests that mock `CacheManager` and call `new MyPlugin()` - // directly). Production goes through `_createApp` → `attachContext`, - // which overrides whatever is captured here. + // unit tests). Production goes through `attachContext`, which + // overrides whatever is captured here. this._tryEagerBindServices(); } /** - * Test-fallback for plugins instantiated outside `_createApp`. Removable - * once tests migrate to explicit `attachContext` setup. + * Test-fallback for plugins instantiated outside `_createApp`. * @internal */ private _tryEagerBindServices(): void { @@ -241,14 +250,15 @@ export abstract class Plugin< this.name, this.config.telemetry, ); + this.taskflow = TaskflowService.tryGetInstance(); this.isReady = true; } /** - * Binds runtime dependencies (cache, telemetry, plugin context). Called - * by `_createApp` after construction and before `setup()`. Plugin authors - * that need a service the base class doesn't surface as a property can - * reach for `deps.services?.get("my-service")`. + * Binds runtime dependencies (cache, telemetry, taskflow, plugin + * context). Called by `_createApp` before `setup()`. Plugin authors + * that need a service the base class doesn't surface as a property + * can reach for `deps.services?.get("my-service")`. */ attachContext( deps: { @@ -257,20 +267,37 @@ export abstract class Plugin< telemetryConfig?: BasePluginConfig["telemetry"]; } = {}, ): void { + const fromLocator = deps.services; this.cache = - deps.services?.get("cache") ?? + fromLocator?.get("cache") ?? this.cache ?? CacheManager.getInstanceSync(); this.telemetry = TelemetryManager.getProvider( this.name, deps.telemetryConfig ?? this.config.telemetry, ); + this.taskflow = fromLocator + ? fromLocator.get("taskflow") + : TaskflowService.tryGetInstance(); if (deps.context !== undefined) { this.context = deps.context as PluginContext; } this.isReady = true; } + /** + * Returns the live {@link TaskflowService}, or throws if it was + * disabled via `createApp({ taskflow: false })`. + */ + protected requireTaskflow(): TaskflowService { + if (!this.taskflow) { + throw new Error( + `Plugin "${this.name}" requires TaskFlow but it was disabled via createApp({ taskflow: false }). Remove the opt-out or refactor the plugin to not use this.taskflow.`, + ); + } + return this.taskflow; + } + injectRoutes(_: express.Router) { return; } diff --git a/packages/appkit/src/plugins/server/index.ts b/packages/appkit/src/plugins/server/index.ts index bd5e6674..917d64e8 100644 --- a/packages/appkit/src/plugins/server/index.ts +++ b/packages/appkit/src/plugins/server/index.ts @@ -65,6 +65,11 @@ export class ServerPlugin extends Plugin { protected declare config: ServerConfig; private serverExtensions: ((app: express.Application) => void)[] = []; private rawBodyPaths: Set = new Set(); + /** + * Drains the core services in reverse boot order. Wired by `_createApp`; + * optional so plugins constructed in tests keep working. + */ + private _shutdownCoreServices?: () => Promise; static phase: PluginPhase = "deferred"; constructor(config: ServerConfig) { @@ -102,22 +107,22 @@ export class ServerPlugin extends Plugin { } /** - * Start the server. - * - * This method starts the server and sets up the frontend. - * It also sets up the remote tunneling if enabled. - * - * @returns The express application. + * Starts the server, registers the frontend, and (if enabled) the + * remote tunnel. The plugin owns the SIGTERM handler; the optional + * `shutdownCoreServices` hook supplied by `_createApp` drains core + * services on shutdown. */ - async start(): Promise { + async start(options?: { + shutdownCoreServices?: () => Promise; + }): Promise { + this._shutdownCoreServices = options?.shutdownCoreServices; this.serverApplication.use(requestMetricsMiddleware); this.serverApplication.use( express.json({ type: (req) => { - // Skip JSON parsing for routes that declared skipBodyParsing - // (e.g. file uploads where the raw body must flow through). - // rawBodyPaths is populated by extendRoutes() below; the type - // callback runs per-request so the set is already filled. + // Skip JSON parsing for routes that opted out (e.g. file + // uploads). `rawBodyPaths` is populated by `extendRoutes()` + // before any request hits this callback. const urlPath = req.url?.split("?")[0]; if (urlPath && this.rawBodyPaths.has(urlPath)) return false; const ct = req.headers["content-type"] ?? ""; @@ -391,6 +396,13 @@ export class ServerPlugin extends Plugin { } } + /** + * Graceful shutdown sequence: + * 1. Tear down server-local pieces (Vite, tunnel, telemetry reporter). + * 2. Abort in-flight plugin operations so streams release HTTP conns. + * 3. Drain core services via `_shutdownCoreServices` (reverse boot order). + * 4. Close the HTTP server, force-exit on timeout. + */ private async _gracefulShutdown() { logger.info("Starting graceful shutdown..."); @@ -404,7 +416,6 @@ export class ServerPlugin extends Plugin { TelemetryReporter.getInstance()?.stop(); - // 1. abort active operations from plugins const shutdownPlugins = this.context?.getPlugins(); if (shutdownPlugins) { for (const plugin of shutdownPlugins.values()) { @@ -422,14 +433,20 @@ export class ServerPlugin extends Plugin { } } - // 2. close the server + if (this._shutdownCoreServices) { + try { + await this._shutdownCoreServices(); + } catch (err) { + logger.error("Error shutting down core services: %O", err); + } + } + if (this.server) { this.server.close(() => { logger.debug("Server closed gracefully"); process.exit(0); }); - // 3. timeout to force shutdown after 15 seconds setTimeout(() => { logger.debug("Force shutdown after timeout"); process.exit(1); diff --git a/packages/appkit/src/plugins/server/tests/server.test.ts b/packages/appkit/src/plugins/server/tests/server.test.ts index bbc96172..687184df 100644 --- a/packages/appkit/src/plugins/server/tests/server.test.ts +++ b/packages/appkit/src/plugins/server/tests/server.test.ts @@ -717,7 +717,6 @@ describe("ServerPlugin", () => { }), } as any); - // pretend started (plugin as any).server = mockHttpServer; await (plugin as any)._gracefulShutdown(); @@ -730,5 +729,93 @@ describe("ServerPlugin", () => { exitSpy.mockRestore(); vi.useRealTimers(); }); + + test("invokes shutdownCoreServices after aborting and before closing the server", async () => { + vi.useFakeTimers(); + mockLoggerError.mockClear(); + const exitSpy = vi + .spyOn(process, "exit") + .mockImplementation(((_code?: number) => undefined) as any); + + const callOrder: string[] = []; + const abortFn = vi.fn(() => { + callOrder.push("abort"); + }); + const shutdownCoreServices = vi.fn(async () => { + callOrder.push("shutdownCoreServices"); + }); + const closeSpy = vi.fn((cb: any) => { + callOrder.push("server.close"); + cb?.(); + }); + + const plugin = new ServerPlugin({ + context: createContextWithPlugins({ + plugin: { name: "plugin", abortActiveOperations: abortFn }, + }), + } as any); + + (plugin as any).server = { ...mockHttpServer, close: closeSpy }; + (plugin as any)._shutdownCoreServices = shutdownCoreServices; + + await (plugin as any)._gracefulShutdown(); + vi.runAllTimers(); + + expect(abortFn).toHaveBeenCalled(); + expect(shutdownCoreServices).toHaveBeenCalledTimes(1); + expect(closeSpy).toHaveBeenCalled(); + expect(callOrder).toEqual([ + "abort", + "shutdownCoreServices", + "server.close", + ]); + expect(mockLoggerError).not.toHaveBeenCalled(); + + exitSpy.mockRestore(); + vi.useRealTimers(); + }); + + test("isolates errors from shutdownCoreServices and still closes the server", async () => { + vi.useFakeTimers(); + mockLoggerError.mockClear(); + const exitSpy = vi + .spyOn(process, "exit") + .mockImplementation(((_code?: number) => undefined) as any); + + const shutdownCoreServices = vi.fn(async () => { + throw new Error("registry boom"); + }); + + const plugin = new ServerPlugin({ + context: createContextWithPlugins({ + plugin: { name: "plugin", abortActiveOperations: vi.fn() }, + }), + } as any); + + (plugin as any).server = mockHttpServer; + (plugin as any)._shutdownCoreServices = shutdownCoreServices; + + await (plugin as any)._gracefulShutdown(); + vi.runAllTimers(); + + expect(shutdownCoreServices).toHaveBeenCalled(); + expect(mockLoggerError).toHaveBeenCalled(); + expect(mockHttpServer.close).toHaveBeenCalled(); + expect(exitSpy).toHaveBeenCalled(); + + exitSpy.mockRestore(); + vi.useRealTimers(); + }); + + test("start() wires shutdownCoreServices into the instance", async () => { + const shutdownCoreServices = vi.fn(async () => {}); + const plugin = new ServerPlugin({ + context: createContextWithPlugins({}), + } as any); + + await plugin.start({ shutdownCoreServices }); + + expect((plugin as any)._shutdownCoreServices).toBe(shutdownCoreServices); + }); }); }); diff --git a/packages/appkit/src/taskflow/index.ts b/packages/appkit/src/taskflow/index.ts new file mode 100644 index 00000000..96073f7d --- /dev/null +++ b/packages/appkit/src/taskflow/index.ts @@ -0,0 +1,373 @@ +/** + * TaskflowService — durable execution core service. + * + * Wraps the vendored TaskFlow Node.js bindings (Rust + napi). Booted by + * `createApp` and exposed to plugins as `this.taskflow`. Default storage + * is SQLite at `.appkit/taskflow/tasks.db`; opt out with + * `createApp({ taskflow: false })`. + */ + +import { createHash } from "node:crypto"; +import { readFile } from "node:fs/promises"; +import { join } from "node:path"; +import type { CoreServiceFactory } from "@/core/service-registry"; +import type { + ResumeOptions, + StopOptions, + StreamEvent, + SubmitOptions, + Task, + TaskEvent, + TaskflowConfig, + Engine as TaskflowEngine, + TaskHandle, +} from "../../vendor/taskflow/taskflow.js"; +import { InitializationError } from "../errors"; +import { createLogger } from "../logging/logger"; + +const logger = createLogger("taskflow"); + +/** Type-only import keeps `import "@databricks/appkit"` from touching the native binary. */ +type VendorModule = typeof import("../../vendor/taskflow/taskflow.js"); + +let cachedVendor: VendorModule | null = null; + +/** + * Lazy-loads the vendored binary. The artifact only ships for + * `darwin-arm64` and `linux-x64`; deferring the load keeps the SDK + * importable on other platforms when the caller opts out. + * @internal + */ +async function loadVendorModule(): Promise { + if (cachedVendor) return cachedVendor; + try { + if (process.env.APPKIT_VERIFY_TASKFLOW_VENDOR === "1") { + await verifyVendorIntegrity(); + } + cachedVendor = (await import( + "../../vendor/taskflow/taskflow.js" + )) as VendorModule; + return cachedVendor; + } catch (err) { + const message = (err as { message?: string } | undefined)?.message ?? err; + throw new InitializationError( + `Failed to load vendored TaskFlow binary for ${process.platform}-${process.arch}: ${message}\n` + + "If you do not need durable task execution, opt out with " + + "`createApp({ taskflow: false, plugins: [...] })`. Otherwise, " + + "verify the binary is published for your platform; see " + + "`packages/appkit/vendor/taskflow/`.", + ); + } +} + +/** + * Verifies the platform `.node` and JS loader against `VENDOR.json` and + * throws before the loader executes on mismatch. Opt-in via + * `APPKIT_VERIFY_TASKFLOW_VENDOR=1`. `taskflow.d.ts` is intentionally + * not checked (types-only, never executed). + * @internal + */ +async function verifyVendorIntegrity(): Promise { + type VendorManifest = { + platforms?: Record; + loader?: { file: string; sha256: string }; + }; + const manifestUrl = new URL( + "../../vendor/taskflow/VENDOR.json", + import.meta.url, + ); + const vendorDir = join(manifestUrl.pathname, ".."); + const manifest = JSON.parse( + await readFile(manifestUrl, "utf8"), + ) as VendorManifest; + const platformKey = `${process.platform}-${process.arch}`; + const platform = manifest.platforms?.[platformKey]; + const loader = manifest.loader; + const targets: Array<{ label: string; file: string; expected: string }> = []; + if (platform) { + targets.push({ + label: `binary (${platformKey})`, + file: platform.file, + expected: platform.sha256, + }); + } + if (loader) { + targets.push({ + label: "loader", + file: loader.file, + expected: loader.sha256, + }); + } + if (targets.length === 0) { + throw new InitializationError( + `APPKIT_VERIFY_TASKFLOW_VENDOR=1: VENDOR.json has no entries for ${platformKey} and no loader manifest. Refresh with \`bin/sync-taskflow.sh\`.`, + ); + } + for (const target of targets) { + const buf = await readFile(join(vendorDir, target.file)); + const actual = createHash("sha256").update(buf).digest("hex"); + if (actual !== target.expected) { + throw new InitializationError( + `APPKIT_VERIFY_TASKFLOW_VENDOR=1: ${target.label} sha256 mismatch.\n` + + ` expected: ${target.expected}\n` + + ` actual: ${actual}\n` + + "The vendored TaskFlow artifact has been modified outside of " + + "`bin/sync-taskflow.sh`. Investigate the diff before booting.", + ); + } + } +} + +/** + * Default config. On-disk state lives under `.appkit/taskflow/` so the + * footprint is obvious; `enableTestMode: false` keeps `simulateCrash` + * behind an explicit opt-in. + */ +const APPKIT_DEFAULTS: TaskflowConfig = { + engine: { + walPath: ".appkit/taskflow/wal", + recoveryIntervalMs: 5000, + staleThresholdMs: 30000, + enableTestMode: false, + }, + executor: { + heartbeatIntervalMs: 5000, + }, + storage: { + backend: "sqlite", + databasePath: ".appkit/taskflow/tasks.db", + }, +}; + +/** + * Warns when SQLite is paired with a Databricks Apps environment — + * the per-pod filesystem cannot survive rolling restarts, so durability + * silently degrades. Can't refuse to boot since single-process dev + * looks identical at the config level. + * @internal + */ +function warnOnEphemeralStorage(config: TaskflowConfig): void { + const isDatabricksApps = + !!process.env.DATABRICKS_APP_NAME || + !!process.env.DATABRICKS_APP_ID || + !!process.env.DATABRICKS_APP_URL; + if (!isDatabricksApps) return; + const backend = config.storage?.backend ?? "sqlite"; + if (backend !== "sqlite") return; + logger.warn( + "TaskFlow is configured with the SQLite backend but the runtime " + + "appears to be Databricks Apps (multi-pod, no shared volume). " + + "Tasks will not survive rolling restarts. For production, switch " + + "the backend to `lakebase` (Postgres) by passing " + + "`taskflow: { storage: { backend: 'lakebase', connectionString: … } }` " + + "to `createApp(...)`.", + ); +} + +/** + * Merges user config over defaults. `engine`/`executor` merge one level + * deep; `storage` is a discriminated union and replaces wholesale. + * `wal`/`admission`/`stream` are only emitted when present — the FFI + * distinguishes "key absent" from "key present with no fields". + */ +function mergeAppkitDefaults(user: TaskflowConfig | undefined): TaskflowConfig { + if (!user) return APPKIT_DEFAULTS; + const merged: TaskflowConfig = { + ...APPKIT_DEFAULTS, + ...user, + engine: { ...APPKIT_DEFAULTS.engine, ...user.engine }, + executor: { ...APPKIT_DEFAULTS.executor, ...user.executor }, + storage: user.storage ?? APPKIT_DEFAULTS.storage, + }; + if (user.wal) merged.wal = { ...user.wal }; + if (user.admission) merged.admission = { ...user.admission }; + if (user.stream) merged.stream = { ...user.stream }; + return merged; +} + +/** + * Engine types re-exported as the AppKit-side surface for `this.taskflow`. + * @public + */ +export type { + ResumeOptions, + StopOptions, + StreamEvent, + SubmitOptions, + Task, + TaskEvent, + TaskflowConfig, + TaskHandle, +}; + +/** + * Single instance per AppKit app, booted by `createApp` and exposed to + * plugins as `this.taskflow`. + */ +export class TaskflowService { + private static _instance: TaskflowService | null = null; + + private readonly engine: TaskflowEngine; + private hasShutdown = false; + + private constructor(engine: TaskflowEngine) { + this.engine = engine; + } + + /** + * Bootstraps the service. Pass `false` to opt out (returns `null`). + * Idempotent: subsequent calls return the existing instance. + */ + static async initialize( + config: TaskflowConfig | false | undefined, + ): Promise { + if (config === false) { + logger.info("TaskFlow disabled via createApp({ taskflow: false })."); + TaskflowService._instance = null; + return null; + } + if (TaskflowService._instance) { + return TaskflowService._instance; + } + + const merged = mergeAppkitDefaults(config); + logger.debug("Initializing TaskFlow engine", { config: merged }); + warnOnEphemeralStorage(merged); + const vendor = await loadVendorModule(); + const engine = await vendor.Engine.create(merged); + const service = new TaskflowService(engine); + TaskflowService._instance = service; + return service; + } + + /** @internal */ + static getInstanceSync(): TaskflowService { + if (!TaskflowService._instance) { + throw InitializationError.notInitialized( + "TaskflowService", + "Either createApp has not run yet, or it was started with `taskflow: false`. Remove the opt-out to use this.taskflow.", + ); + } + return TaskflowService._instance; + } + + /** @internal */ + static tryGetInstance(): TaskflowService | null { + return TaskflowService._instance; + } + + /** + * Test-only singleton reset. Hard-fails in production. Vendor module + * cache is left intact (Node already memoises it). + * @internal + */ + static _resetForTests(): void { + if (process.env.NODE_ENV === "production") { + throw new Error( + "TaskflowService._resetForTests() is test-only and refuses to run when NODE_ENV=production.", + ); + } + TaskflowService._instance = null; + } + + /** Core service factory consumed by the bootstrap registry. */ + static factory( + config: TaskflowConfig | false | undefined, + ): CoreServiceFactory { + return { + name: "taskflow", + async boot() { + const service = await TaskflowService.initialize(config); + if (!service) return null; + + return { + instance: service, + shutdown: () => service.shutdown(), + }; + }, + }; + } + + /** + * Spawns a new task attempt. Returns a handle even when a task with + * the same idempotency key already exists — dedup is resolved by the + * engine based on `executeMode`. + */ + async start( + name: string, + input: unknown, + options: SubmitOptions = {}, + ): Promise { + this.assertAlive(); + return this.engine.submit(name, input, options); + } + + /** + * Async iterable of `StreamEvent`s ordered by sequence number. Pass + * `lastSeq` to resume from a known position (SSE reconnection). + */ + subscribe( + idempotencyKey: string, + lastSeq?: number, + ): AsyncIterableIterator { + this.assertAlive(); + return this.engine.subscribe(idempotencyKey, lastSeq); + } + + /** Returns the current task record, or null if not found / unauthorized. */ + async reconnect( + idempotencyKey: string, + userId?: string, + ): Promise { + this.assertAlive(); + return this.engine.reconnect(idempotencyKey, userId); + } + + /** + * Revives a suspended task — after a deliberate `stop()`, or after a + * crash for an OBO task (where auto-recovery is disabled). + */ + async resume( + idempotencyKey: string, + options: ResumeOptions = {}, + ): Promise { + this.assertAlive(); + return this.engine.resume(idempotencyKey, options); + } + + /** Cooperative stop. Emits a `suspended` event. Idempotent. */ + async stop( + idempotencyKey: string, + options: StopOptions = {}, + ): Promise { + this.assertAlive(); + return this.engine.stop(idempotencyKey, options); + } + + /** + * Test-only: aborts the executor mid-run without writing a terminal + * event so reconnect/recovery exercises the crash path. Throws unless + * `enableTestMode: true`. + */ + simulateCrash(idempotencyKey: string): void { + this.assertAlive(); + this.engine.simulateCrash(idempotencyKey); + } + + /** Drains in-flight tasks and shuts the engine down. Idempotent. */ + async shutdown(): Promise { + if (this.hasShutdown) return; + this.hasShutdown = true; + logger.info("Shutting down TaskFlow engine"); + await this.engine.shutdown(); + if (TaskflowService._instance === this) { + TaskflowService._instance = null; + } + } + + private assertAlive(): void { + if (this.hasShutdown) { + throw new Error("TaskflowService has been shut down."); + } + } +} diff --git a/packages/appkit/tsdown.config.ts b/packages/appkit/tsdown.config.ts index d61e8c53..92d79169 100644 --- a/packages/appkit/tsdown.config.ts +++ b/packages/appkit/tsdown.config.ts @@ -33,6 +33,14 @@ export default defineConfig([ to: "dist/plugins/server/remote-tunnel", flatten: true, }, + // `unbundle: true` lifts `src/` to `dist/appkit/`, and the emitted + // loader does `require("./taskflow.node")` relative to itself, so + // the binaries must land at `dist/appkit/vendor/taskflow/`. + { + from: "vendor/taskflow/*", + to: "dist/appkit/vendor/taskflow", + flatten: true, + }, ], }, ]); diff --git a/packages/appkit/vendor/taskflow/VENDOR.json b/packages/appkit/vendor/taskflow/VENDOR.json new file mode 100644 index 00000000..cabecccf --- /dev/null +++ b/packages/appkit/vendor/taskflow/VENDOR.json @@ -0,0 +1,28 @@ +{ + "$schema": "./VENDOR.schema.json", + "name": "@databricks/taskflow", + "description": "Vendored TaskFlow Node.js bindings (Rust + napi). Source of truth: github.com/databricks/taskflow.", + "version": "0.1.0-internal", + "platforms": { + "darwin-arm64": { + "file": "taskflow-darwin-arm64.node", + "sha256": "2a8354e51d3598ce7608d5614866bb4784a9caa768ccb57d0d888529521bcd20" + }, + "linux-x64": { + "file": "taskflow-linux-x64.node", + "sha256": "12044089b8fe117268104c2e0ea5a7d1b4bf1f058f1e5ed6d7ebe51db2629f50" + } + }, + "loader": { + "file": "taskflow.js", + "sha256": "46aabcaef2c5524509577658f0a0a4f33a332fb5c51a8935520f84570c4ccc2e" + }, + "types": { + "file": "taskflow.d.ts", + "sha256": "4446c8e0112f3961562073a971fd6b21f0b44ce9a736e08f7151f69c67dbfe5d" + }, + "refresh": { + "command": "bin/sync-taskflow.sh", + "description": "Re-runs the upstream build, copies the artifacts, and refreshes the sha256 fields in this file. Failing CI implies the binary was modified out of band; investigate the diff before publishing." + } +} diff --git a/packages/appkit/vendor/taskflow/taskflow-darwin-arm64.node b/packages/appkit/vendor/taskflow/taskflow-darwin-arm64.node new file mode 100755 index 00000000..5b6fbc77 Binary files /dev/null and b/packages/appkit/vendor/taskflow/taskflow-darwin-arm64.node differ diff --git a/packages/appkit/vendor/taskflow/taskflow-linux-x64.node b/packages/appkit/vendor/taskflow/taskflow-linux-x64.node new file mode 100755 index 00000000..7d5c2a3d Binary files /dev/null and b/packages/appkit/vendor/taskflow/taskflow-linux-x64.node differ diff --git a/packages/appkit/vendor/taskflow/taskflow.d.ts b/packages/appkit/vendor/taskflow/taskflow.d.ts new file mode 100644 index 00000000..dc0b86fc --- /dev/null +++ b/packages/appkit/vendor/taskflow/taskflow.d.ts @@ -0,0 +1,279 @@ +export interface TaskHandle { + taskId: string; + idempotencyKey: string; +} + +export interface Task { + id: string; + name: string; + idempotencyKey: string; + userId: string | null; + status: string; + input: any; + result: any | null; + error: string | null; + createdAtMs: number; + startedAtMs: number | null; + completedAtMs: number | null; + attempt: number; + timeoutMs: number | null; + maxAttempts: number | null; +} + +export interface TaskEvent { + id: string; + taskId: string; + idempotencyKey: string; + seq: number; + eventType: string; + timestampMs: number; + payload: any; +} + +export interface StreamEvent { + streamSeq: number; + event: TaskEvent; +} + +export interface SubmitOptions { + /** + * Caller identity for ownership checks on later `reconnect` / `resume` / + * `stop`. The engine does not authenticate; the embedder must verify this + * value (e.g., from a session token) before passing it in. + */ + userId?: string; + /** + * Dedup strictness. Defaults to `at_least_once` (fast path; cache-backed). + * Use `at_most_once` for non-idempotent transactions; the engine then + * always queries storage before creating the task, sacrificing latency for + * stronger cross-pod uniqueness. + */ + executeMode?: 'at_least_once' | 'at_most_once'; + /** Per-attempt handler timeout. Falls back to `executor.defaultTimeoutMs`. */ + timeoutMs?: number; + /** + * Per-task retry budget (overrides `executor.retry.maxAttempts`). The + * count includes the first try, so `maxAttempts: 3` means up to 2 retries. + */ + maxAttempts?: number; + /** + * Live JS object passed through to the handler as `ctx.context`. Stored + * in the FFI sidecar (see `stream.contextSidecarCapacity`); released + * automatically when the executor exits. Never serialised: not durable + * across crashes; not visible to handlers re-spawned on a different pod + * by the recovery worker. + */ + context?: any; +} + +export interface RegisterTaskOptions { + /** + * Whether the recovery worker should re-spawn this task automatically + * after a stale heartbeat. Defaults to `true`. Set to `false` when the + * task should only be revived by an explicit external trigger + * (`engine.resume(...)`). + */ + autoRecover?: boolean; +} + +export interface ResumeOptions { + /** + * Ownership check: must equal the `userId` passed at submit time. + * Mismatch returns `null` (the engine never reveals existence to an + * unauthorised caller). The engine does not authenticate; the embedder + * must verify this value before passing it in. + */ + userId?: string; + /** + * Live JS object for the new attempt's `ctx.context`. Replaces the + * spawn-time context, which has already been released. See + * `SubmitOptions.context` for lifetime caveats. + */ + context?: any; +} + +export interface StopOptions { + /** + * Ownership check: must equal the `userId` passed at submit time. + * Mismatch returns `TaskNotFound` (the engine never reveals existence to + * an unauthorised caller). The engine does not authenticate; the + * embedder must verify this value before passing it in. + */ + userId?: string; + /** + * Human-readable reason persisted with the suspension event. Surfaced to + * every `subscribe` consumer and bounded by + * `MAX_SUSPENDED_REASON_LEN` (512 chars). Defaults to `"stopped via API"`. + */ + reason?: string; +} + +export interface TaskContext { + readonly taskId: string; + readonly idempotencyKey: string; + readonly userId: string | null; + readonly attempt: number; + readonly previousEvents: TaskEvent[]; + readonly isRecovery: boolean; + readonly context: any | null; + + emit(eventType: string, payload?: any): Promise; + heartbeat(): Promise; +} + +export interface TaskDefinition { + name: string; + execute(input: TInput, ctx: TaskContext): Promise; + recover?(input: TInput, ctx: TaskContext): Promise; + autoRecover?: boolean; +} + +export interface TaskflowConfig { + engine?: { + walPath?: string; + flushIntervalMs?: number; + recoveryIntervalMs?: number; + staleThresholdMs?: number; + flushMaxBatchSize?: number; + recoveryMaxPerCycle?: number; + shutdownGracePeriodMs?: number; + shutdownDeadlineMs?: number; + recoveryTaskTimeoutMs?: number; + enableTestMode?: boolean; + }; + wal?: { + maxSegmentBytes?: number; + minSegmentsRetained?: number; + maxPendingWrites?: number; + maxBatchSize?: number; + }; + admission?: { + guard?: { globalRateLimit?: number; perUserRateLimit?: number }; + slots?: { globalLimit?: number; perUserLimit?: number }; + }; + executor?: { + retry?: { + maxAttempts?: number; + initialDelayMs?: number; + maxDelayMs?: number; + backoffMultiplier?: number; + }; + heartbeatIntervalMs?: number; + defaultTimeoutMs?: number; + }; + storage?: + | { + backend: 'sqlite'; + databasePath?: string; + maxConnections?: number; + connectionTimeoutMs?: number; + maxEventsPerTask?: number; + retry?: StorageRetryConfig; + } + | { + backend: 'lakebase'; + connectionString?: string; + maxConnections?: number; + connectionTimeoutMs?: number; + maxEventsPerTask?: number; + retry?: StorageRetryConfig; + }; + stream?: { + bufferCapacity?: number; + retentionMs?: number; + channelCapacity?: number; + reapIntervalMs?: number; + contextSidecarCapacity?: number; + }; +} + +export interface StorageRetryConfig { + maxRetries?: number; + initialDelayMs?: number; + maxDelayMs?: number; + backoffMultiplier?: number; +} + +// Low-level API: native `Engine` bindings. + +export declare class Engine { + static create(config?: TaskflowConfig): Promise; + registerTask(definition: TaskDefinition): void; + submit(name: string, input: any, options?: SubmitOptions): Promise; + reconnect(idempotencyKey: string, userId?: string): Promise; + /** + * Revive a `Suspended` task and start a fresh attempt. + * + * Returns `null` if the task does not exist, has been moved to a terminal + * state, is no longer `Suspended`, OR if the caller's `userId` does not + * match the submit-time owner — these cases are intentionally indistinguishable + * to prevent existence probing by unauthorised callers. Throws on + * underlying engine errors (storage, slot exhaustion). + */ + resume(idempotencyKey: string, options?: ResumeOptions): Promise; + /** + * Move a task into `Suspended` and raise the stop intent. + * + * For `Created` tasks this is a synchronous durable transition; for + * `Running` tasks the stop intent is honoured by the next heartbeat. + * **Naming note:** despite the verb, this emits a `suspended` event, not + * a `stopped` event — the JS-side method is named `stop` because that is + * the user-facing verb, but the underlying semantics are identical to + * `engine.suspend(...)`. Use `resume` to revive the task; use + * `cancelTask` for a true terminal cancellation. Idempotent. + * + * Throws `TaskNotFound` if the task does not exist or the caller's + * `userId` does not match the owner. + */ + stop(idempotencyKey: string, options?: StopOptions): Promise; + cancelTask(idempotencyKey: string): void; + subscribe(idempotencyKey: string, lastSeq?: number): AsyncIterableIterator; + shutdown(): Promise; + /** + * Test-only. Aborts the executor mid-run without writing a terminal + * event so a subsequent reconnect/recovery exercises the crash path. + * Throws `TestModeDisabled` unless `engine.enableTestMode = true` in the + * config; production deployments must leave this disabled. + */ + simulateCrash(idempotencyKey: string): void; +} + +// High-level SDK: same shape as the Python `Taskflow` helpers. + +export declare class Taskflow { + constructor(config?: TaskflowConfig); + + static task(name: string, fn: (input: any, ctx: TaskContext) => Promise): typeof fn; + static task(name: string, options?: { + recover?: (input: any, ctx: TaskContext) => Promise; + autoRecover?: boolean; + }): (fn: (input: any, ctx: TaskContext) => Promise) => typeof fn; + + static start(name: string, input: any, userId?: string): Promise; + static start(name: string, input: any, options?: SubmitOptions): Promise; + /** See `Engine.resume`. */ + static resume(idempotencyKey: string, options?: ResumeOptions): Promise; + /** See `Engine.stop`. */ + static stop(idempotencyKey: string, options?: StopOptions): Promise; + static subscribe( + idempotencyKey: string, + lastSeq?: number, + userId?: string, + ): Promise>; + /** Test-only. See `Engine.simulateCrash`. */ + static simulateCrash(idempotencyKey: string): Promise; + static shutdown(): Promise; +} + +// Workflow primitives: opinionated helpers; callers may extend them. + +type StepFn = + (ctx: TaskContext, ...args: TArgs) => Promise; + +export declare namespace workflow { + function step( + fn: StepFn, + ): StepFn; + + function findEvent(ctx: TaskContext, eventType: string): TaskEvent | null; +} diff --git a/packages/appkit/vendor/taskflow/taskflow.js b/packages/appkit/vendor/taskflow/taskflow.js new file mode 100644 index 00000000..72cf11ab --- /dev/null +++ b/packages/appkit/vendor/taskflow/taskflow.js @@ -0,0 +1,36 @@ +import { createRequire } from "node:module"; + +const require = createRequire(import.meta.url); + +// Some consumers ship a single binary (`taskflow.node`) per app — they +// install platform-specific tarballs. Others (e.g. AppKit) vendor multiple +// binaries side by side and resolve at runtime via `${platform}-${arch}`. +// This loader supports both: it prefers the platform-specific name and +// falls back to the generic one. +const platform = `${process.platform}-${process.arch}`; +const candidates = [`./taskflow-${platform}.node`, "./taskflow.node"]; + +let native; +const errors = []; +for (const candidate of candidates) { + try { + native = require(candidate); + break; + } catch (err) { + errors.push(`${candidate}: ${err?.message ?? err}`); + } +} + +if (!native) { + const detail = errors.map((e) => ` - ${e}`).join("\n"); + throw new Error( + `[taskflow] No native binary found for ${platform}. Tried:\n${detail}\n` + + `If you build from source, run \`bin/build-nodejs.sh\` for your platform; ` + + `if you installed a published package, this platform is not in the prebuild matrix.`, + ); +} + +export const Engine = native.Engine; +export const Taskflow = native.Taskflow; +export const workflow = native.workflow; +export default native; diff --git a/packages/shared/src/plugin.ts b/packages/shared/src/plugin.ts index cca42bdb..5d453c07 100644 --- a/packages/shared/src/plugin.ts +++ b/packages/shared/src/plugin.ts @@ -11,7 +11,8 @@ export type { ResourceFieldEntry }; /** * Typed service locator passed to {@link BasePlugin.attachContext}. - * Returns `null` when the service is not registered. + * Returns `null` when the service is not registered or was opted out + * (e.g. `createApp({ taskflow: false })`). */ export interface ServiceLocator { get(name: string): T | null; diff --git a/template/_gitignore b/template/_gitignore index 23adbc24..4ab6c68b 100644 --- a/template/_gitignore +++ b/template/_gitignore @@ -6,6 +6,10 @@ build/ .env .databricks/ .smoke-test/ + +# TaskFlow durable storage (SQLite + WAL); per-machine, never checked in. +.appkit/ + test-results/ playwright-report/