-
Notifications
You must be signed in to change notification settings - Fork 0
Extract core-level in-memory fallbacks for cache/queue/job into @objectstack/core #673
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,158 @@ | ||||||
| import { describe, it, expect } from 'vitest'; | ||||||
| import { createMemoryCache } from './memory-cache'; | ||||||
| import { createMemoryQueue } from './memory-queue'; | ||||||
| import { createMemoryJob } from './memory-job'; | ||||||
| import { CORE_FALLBACK_FACTORIES } from './index'; | ||||||
|
|
||||||
| describe('CORE_FALLBACK_FACTORIES', () => { | ||||||
| it('should have exactly 3 entries: cache, queue, job', () => { | ||||||
| expect(Object.keys(CORE_FALLBACK_FACTORIES)).toEqual(['cache', 'queue', 'job']); | ||||||
| }); | ||||||
|
|
||||||
| it('should map to factory functions', () => { | ||||||
| for (const factory of Object.values(CORE_FALLBACK_FACTORIES)) { | ||||||
| expect(typeof factory).toBe('function'); | ||||||
| } | ||||||
| }); | ||||||
| }); | ||||||
|
|
||||||
| describe('createMemoryCache', () => { | ||||||
| it('should return an object with _fallback: true', () => { | ||||||
| const cache = createMemoryCache(); | ||||||
| expect(cache._fallback).toBe(true); | ||||||
| expect(cache._serviceName).toBe('cache'); | ||||||
| }); | ||||||
|
|
||||||
| it('should set and get a value', async () => { | ||||||
| const cache = createMemoryCache(); | ||||||
| await cache.set('key1', 'value1'); | ||||||
| expect(await cache.get('key1')).toBe('value1'); | ||||||
| }); | ||||||
|
|
||||||
| it('should return undefined for missing key', async () => { | ||||||
| const cache = createMemoryCache(); | ||||||
| expect(await cache.get('nonexistent')).toBeUndefined(); | ||||||
| }); | ||||||
|
|
||||||
| it('should delete a key', async () => { | ||||||
| const cache = createMemoryCache(); | ||||||
| await cache.set('key1', 'value1'); | ||||||
| expect(await cache.delete('key1')).toBe(true); | ||||||
| expect(await cache.get('key1')).toBeUndefined(); | ||||||
| }); | ||||||
|
|
||||||
| it('should check if a key exists with has()', async () => { | ||||||
| const cache = createMemoryCache(); | ||||||
| expect(await cache.has('key1')).toBe(false); | ||||||
| await cache.set('key1', 'value1'); | ||||||
| expect(await cache.has('key1')).toBe(true); | ||||||
| }); | ||||||
|
|
||||||
| it('should clear all entries', async () => { | ||||||
| const cache = createMemoryCache(); | ||||||
| await cache.set('a', 1); | ||||||
| await cache.set('b', 2); | ||||||
| await cache.clear(); | ||||||
| expect(await cache.has('a')).toBe(false); | ||||||
| expect(await cache.has('b')).toBe(false); | ||||||
| }); | ||||||
|
|
||||||
| it('should expire entries based on TTL', async () => { | ||||||
| const cache = createMemoryCache(); | ||||||
| // Set with very short TTL (0.001 seconds = 1ms) | ||||||
| await cache.set('temp', 'data', 0.001); | ||||||
| // Wait for expiry | ||||||
| await new Promise(r => setTimeout(r, 20)); | ||||||
| expect(await cache.get('temp')).toBeUndefined(); | ||||||
| }); | ||||||
|
|
||||||
| it('should track hit/miss stats', async () => { | ||||||
| const cache = createMemoryCache(); | ||||||
| await cache.set('key1', 'value1'); | ||||||
| await cache.get('key1'); // hit | ||||||
| await cache.get('missing'); // miss | ||||||
| const stats = await cache.stats(); | ||||||
| expect(stats.hits).toBe(1); | ||||||
| expect(stats.misses).toBe(1); | ||||||
| expect(stats.keyCount).toBe(1); | ||||||
| }); | ||||||
| }); | ||||||
|
|
||||||
| describe('createMemoryQueue', () => { | ||||||
| it('should return an object with _fallback: true', () => { | ||||||
| const queue = createMemoryQueue(); | ||||||
| expect(queue._fallback).toBe(true); | ||||||
| expect(queue._serviceName).toBe('queue'); | ||||||
| }); | ||||||
|
|
||||||
| it('should publish and deliver to subscriber synchronously', async () => { | ||||||
| const queue = createMemoryQueue(); | ||||||
| const received: any[] = []; | ||||||
| await queue.subscribe('test-q', async (msg: any) => { received.push(msg); }); | ||||||
| const id = await queue.publish('test-q', { hello: 'world' }); | ||||||
| expect(id).toMatch(/^fallback-msg-/); | ||||||
| expect(received).toHaveLength(1); | ||||||
| expect(received[0].data).toEqual({ hello: 'world' }); | ||||||
| }); | ||||||
|
|
||||||
| it('should not deliver to unsubscribed queue', async () => { | ||||||
| const queue = createMemoryQueue(); | ||||||
| const received: any[] = []; | ||||||
| await queue.subscribe('q1', async (msg: any) => { received.push(msg); }); | ||||||
| await queue.unsubscribe('q1'); | ||||||
| await queue.publish('q1', 'data'); | ||||||
| expect(received).toHaveLength(0); | ||||||
| }); | ||||||
|
|
||||||
| it('should return queue size of 0', async () => { | ||||||
| const queue = createMemoryQueue(); | ||||||
| expect(await queue.getQueueSize()).toBe(0); | ||||||
|
||||||
| expect(await queue.getQueueSize()).toBe(0); | |
| expect(await queue.getQueueSize('test-q')).toBe(0); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| // Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. | ||
|
|
||
| import { createMemoryCache } from './memory-cache.js'; | ||
| import { createMemoryQueue } from './memory-queue.js'; | ||
| import { createMemoryJob } from './memory-job.js'; | ||
|
|
||
| export { createMemoryCache } from './memory-cache.js'; | ||
| export { createMemoryQueue } from './memory-queue.js'; | ||
| export { createMemoryJob } from './memory-job.js'; | ||
|
|
||
| /** | ||
| * Map of core-criticality service names to their in-memory fallback factories. | ||
| * Used by ObjectKernel.validateSystemRequirements() to auto-inject fallbacks | ||
| * when no real plugin provides the service. | ||
| */ | ||
| export const CORE_FALLBACK_FACTORIES: Record<string, () => Record<string, any>> = { | ||
| cache: createMemoryCache, | ||
| queue: createMemoryQueue, | ||
| job: createMemoryJob, | ||
| }; | ||
|
Comment on lines
+16
to
+20
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| // Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. | ||
|
|
||
| /** | ||
| * In-memory Map-backed cache fallback. | ||
| * | ||
| * Implements the ICacheService contract with basic get/set/delete/has/clear | ||
| * and TTL expiry. Used by ObjectKernel as an automatic fallback when no | ||
| * real cache plugin (e.g. Redis) is registered. | ||
| */ | ||
| export function createMemoryCache() { | ||
| const store = new Map<string, { value: unknown; expires?: number }>(); | ||
| let hits = 0; | ||
| let misses = 0; | ||
| return { | ||
| _fallback: true, _serviceName: 'cache', | ||
| async get<T = unknown>(key: string): Promise<T | undefined> { | ||
| const entry = store.get(key); | ||
| if (!entry || (entry.expires && Date.now() > entry.expires)) { | ||
| store.delete(key); | ||
| misses++; | ||
| return undefined; | ||
| } | ||
| hits++; | ||
| return entry.value as T; | ||
| }, | ||
| async set<T = unknown>(key: string, value: T, ttl?: number): Promise<void> { | ||
| store.set(key, { value, expires: ttl ? Date.now() + ttl * 1000 : undefined }); | ||
| }, | ||
| async delete(key: string): Promise<boolean> { return store.delete(key); }, | ||
| async has(key: string): Promise<boolean> { return store.has(key); }, | ||
| async clear(): Promise<void> { store.clear(); }, | ||
| async stats() { return { hits, misses, keyCount: store.size }; }, | ||
| }; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| // Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. | ||
|
|
||
| /** | ||
| * In-memory job scheduler fallback. | ||
| * | ||
| * Implements the IJobService contract with basic schedule/cancel/trigger | ||
| * operations. Used by ObjectKernel as an automatic fallback when no real | ||
| * job plugin (e.g. Agenda / BullMQ) is registered. | ||
| */ | ||
| export function createMemoryJob() { | ||
| const jobs = new Map<string, any>(); | ||
| return { | ||
| _fallback: true, _serviceName: 'job', | ||
| async schedule(name: string, schedule: any, handler: any): Promise<void> { jobs.set(name, { schedule, handler }); }, | ||
|
||
| async cancel(name: string): Promise<void> { jobs.delete(name); }, | ||
| async trigger(name: string, data?: unknown): Promise<void> { | ||
| const job = jobs.get(name); | ||
| if (job?.handler) await job.handler({ jobId: name, data }); | ||
| }, | ||
| async getExecutions(): Promise<any[]> { return []; }, | ||
| async listJobs(): Promise<string[]> { return [...jobs.keys()]; }, | ||
| }; | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,28 @@ | ||||||||||||||||||||||||||||
| // Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||
| * In-memory publish/subscribe queue fallback. | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * Implements the IQueueService contract with synchronous in-process delivery. | ||||||||||||||||||||||||||||
| * Used by ObjectKernel as an automatic fallback when no real queue plugin | ||||||||||||||||||||||||||||
| * (e.g. BullMQ / RabbitMQ) is registered. | ||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||
| export function createMemoryQueue() { | ||||||||||||||||||||||||||||
| const handlers = new Map<string, Function[]>(); | ||||||||||||||||||||||||||||
| let msgId = 0; | ||||||||||||||||||||||||||||
| return { | ||||||||||||||||||||||||||||
| _fallback: true, _serviceName: 'queue', | ||||||||||||||||||||||||||||
| async publish<T = unknown>(queue: string, data: T): Promise<string> { | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
| async publish<T = unknown>(queue: string, data: T): Promise<string> { | |
| async publish<T = unknown>(queue: string, data: T, _options?: unknown): Promise<string> { |
Copilot
AI
Feb 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The handler functions are invoked without await, which means:
- Errors in handlers will result in unhandled promise rejections
- The publish operation completes immediately without waiting for handlers
Consider either:
- Awaiting handlers sequentially:
for (const fn of fns) await fn({ ... }) - Firing handlers in parallel and catching errors:
Promise.all(fns.map(fn => fn({ ... }).catch(err => console.error(err))))
The current synchronous delivery is documented in the JSDoc comment, so if this is intentional for the fallback's simplicity, at least wrap the handler call in a try-catch or .catch() to prevent unhandled rejections.
| for (const fn of fns) fn({ id, data, attempts: 1, timestamp: Date.now() }); | |
| for (const fn of fns) { | |
| try { | |
| const result = fn({ id, data, attempts: 1, timestamp: Date.now() }); | |
| if (result && typeof (result as any).catch === 'function') { | |
| (result as Promise<unknown>).catch((err: unknown) => { | |
| console.error('[memory-queue] handler error for queue "%s":', queue, err); | |
| }); | |
| } | |
| } catch (err) { | |
| console.error('[memory-queue] handler threw synchronously for queue "%s":', queue, err); | |
| } | |
| } |
Copilot
AI
Feb 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The getQueueSize method should accept a queue parameter to match the IQueueService contract defined in packages/spec/src/contracts/queue-service.ts. The interface specifies:
getQueueSize?(queue: string): Promise<number>;But this implementation has no parameters. It should be:
async getQueueSize(queue: string): Promise<number> { return 0; },Note: This is a pre-existing issue from the original dev-plugin implementation, but it should be fixed since we're extracting this as a production fallback.
| async getQueueSize(): Promise<number> { return 0; }, | |
| async getQueueSize(queue: string): Promise<number> { return 0; }, |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ import type { LoggerConfig } from '@objectstack/spec/system'; | |
| import { ServiceRequirementDef } from '@objectstack/spec/system'; | ||
| import { PluginLoader, PluginMetadata, ServiceLifecycle, ServiceFactory, PluginStartupResult } from './plugin-loader.js'; | ||
| import { isNode, safeExit } from './utils/env.js'; | ||
| import { CORE_FALLBACK_FACTORIES } from './fallbacks/index.js'; | ||
|
|
||
| /** | ||
| * Enhanced Kernel Configuration | ||
|
|
@@ -234,8 +235,16 @@ export class ObjectKernel { | |
| this.logger.error(`CRITICAL: Required service missing: ${serviceName}`); | ||
| missingServices.push(serviceName); | ||
| } else if (criticality === 'core') { | ||
| this.logger.warn(`CORE: Core service missing, functionality may be degraded: ${serviceName}`); | ||
| missingCoreServices.push(serviceName); | ||
| // Auto-inject in-memory fallback if available | ||
| const factory = CORE_FALLBACK_FACTORIES[serviceName]; | ||
| if (factory) { | ||
| const fallback = factory(); | ||
| this.registerService(serviceName, fallback); | ||
| this.logger.warn(`Service '${serviceName}' not provided — using in-memory fallback`); | ||
| } else { | ||
| this.logger.warn(`CORE: Core service missing, functionality may be degraded: ${serviceName}`); | ||
| missingCoreServices.push(serviceName); | ||
| } | ||
|
Comment on lines
+238
to
+247
|
||
| } else { | ||
| this.logger.info(`Info: Optional service not present: ${serviceName}`); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,6 +1,6 @@ | ||||||||||||||
| // Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. | ||||||||||||||
|
|
||||||||||||||
| import { Plugin, PluginContext } from '@objectstack/core'; | ||||||||||||||
| import { Plugin, PluginContext, createMemoryCache, createMemoryQueue, createMemoryJob } from '@objectstack/core'; | ||||||||||||||
|
|
||||||||||||||
| /** | ||||||||||||||
| * All 17 core kernel service names as defined in CoreServiceName. | ||||||||||||||
|
|
@@ -33,70 +33,6 @@ const SECURITY_SERVICE_NAMES = [ | |||||||||||||
| * a trivially useful implementation. | ||||||||||||||
| */ | ||||||||||||||
|
|
||||||||||||||
| /** ICacheService — in-memory Map-backed stub */ | ||||||||||||||
| function createCacheStub() { | ||||||||||||||
| const store = new Map<string, { value: unknown; expires?: number }>(); | ||||||||||||||
| let hits = 0; | ||||||||||||||
| let misses = 0; | ||||||||||||||
| return { | ||||||||||||||
| _dev: true, _serviceName: 'cache', | ||||||||||||||
| async get<T = unknown>(key: string): Promise<T | undefined> { | ||||||||||||||
| const entry = store.get(key); | ||||||||||||||
| if (!entry || (entry.expires && Date.now() > entry.expires)) { | ||||||||||||||
| store.delete(key); | ||||||||||||||
| misses++; | ||||||||||||||
| return undefined; | ||||||||||||||
| } | ||||||||||||||
| hits++; | ||||||||||||||
| return entry.value as T; | ||||||||||||||
| }, | ||||||||||||||
| async set<T = unknown>(key: string, value: T, ttl?: number): Promise<void> { | ||||||||||||||
| store.set(key, { value, expires: ttl ? Date.now() + ttl * 1000 : undefined }); | ||||||||||||||
| }, | ||||||||||||||
| async delete(key: string): Promise<boolean> { return store.delete(key); }, | ||||||||||||||
| async has(key: string): Promise<boolean> { return store.has(key); }, | ||||||||||||||
| async clear(): Promise<void> { store.clear(); }, | ||||||||||||||
| async stats() { return { hits, misses, keyCount: store.size }; }, | ||||||||||||||
| }; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /** IQueueService — in-memory publish/subscribe stub */ | ||||||||||||||
| function createQueueStub() { | ||||||||||||||
| const handlers = new Map<string, Function[]>(); | ||||||||||||||
| let msgId = 0; | ||||||||||||||
| return { | ||||||||||||||
| _dev: true, _serviceName: 'queue', | ||||||||||||||
| async publish<T = unknown>(queue: string, data: T): Promise<string> { | ||||||||||||||
| const id = `dev-msg-${++msgId}`; | ||||||||||||||
| const fns = handlers.get(queue) ?? []; | ||||||||||||||
| for (const fn of fns) fn({ id, data, attempts: 1, timestamp: Date.now() }); | ||||||||||||||
| return id; | ||||||||||||||
| }, | ||||||||||||||
| async subscribe(queue: string, handler: (msg: any) => Promise<void>): Promise<void> { | ||||||||||||||
| handlers.set(queue, [...(handlers.get(queue) ?? []), handler]); | ||||||||||||||
| }, | ||||||||||||||
| async unsubscribe(queue: string): Promise<void> { handlers.delete(queue); }, | ||||||||||||||
| async getQueueSize(): Promise<number> { return 0; }, | ||||||||||||||
| async purge(queue: string): Promise<void> { handlers.delete(queue); }, | ||||||||||||||
| }; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /** IJobService — no-op job scheduler stub */ | ||||||||||||||
| function createJobStub() { | ||||||||||||||
| const jobs = new Map<string, any>(); | ||||||||||||||
| return { | ||||||||||||||
| _dev: true, _serviceName: 'job', | ||||||||||||||
| async schedule(name: string, schedule: any, handler: any): Promise<void> { jobs.set(name, { schedule, handler }); }, | ||||||||||||||
| async cancel(name: string): Promise<void> { jobs.delete(name); }, | ||||||||||||||
| async trigger(name: string, data?: unknown): Promise<void> { | ||||||||||||||
| const job = jobs.get(name); | ||||||||||||||
| if (job?.handler) await job.handler({ jobId: name, data }); | ||||||||||||||
| }, | ||||||||||||||
| async getExecutions(): Promise<any[]> { return []; }, | ||||||||||||||
| async listJobs(): Promise<string[]> { return [...jobs.keys()]; }, | ||||||||||||||
| }; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /** IStorageService — in-memory file storage stub */ | ||||||||||||||
| function createStorageStub() { | ||||||||||||||
| const files = new Map<string, { data: Buffer; meta: any }>(); | ||||||||||||||
|
|
@@ -346,9 +282,9 @@ function createSecurityFieldMaskerStub() { | |||||||||||||
| * from `packages/spec/src/contracts/`. | ||||||||||||||
| */ | ||||||||||||||
| const DEV_STUB_FACTORIES: Record<string, () => Record<string, any>> = { | ||||||||||||||
| 'cache': createCacheStub, | ||||||||||||||
| 'queue': createQueueStub, | ||||||||||||||
| 'job': createJobStub, | ||||||||||||||
| 'cache': createMemoryCache, | ||||||||||||||
| 'queue': createMemoryQueue, | ||||||||||||||
| 'job': createMemoryJob, | ||||||||||||||
|
Comment on lines
+285
to
+287
|
||||||||||||||
| 'cache': createMemoryCache, | |
| 'queue': createMemoryQueue, | |
| 'job': createMemoryJob, | |
| 'cache': () => ({ ...createMemoryCache(), _dev: true }), | |
| 'queue': () => ({ ...createMemoryQueue(), _dev: true }), | |
| 'job': () => ({ ...createMemoryJob(), _dev: true }), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test file imports are missing
.jsfile extensions. In ESM, all relative imports should include the file extension. The imports should be:This is inconsistent with the production code which correctly uses
.jsextensions (seeindex.tsandkernel.ts).