Skip to content

snatalenko/node-cqrs

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

773 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

node-cqrs

Version Coverage Downloads Stars Forks License Tests/Audit

Building blocks for CQRS/ES, inspired by Lokad.CQRS.

Features

CQRS and Event Sourcing are simple in a single process (example), but a minefield in the cloud.

node-cqrs handles the "boring but hard" distributed plumbing - concurrency, message delivery, projections, and rehydration - so you can focus on your domain logic.

  • Reliable Consistency: Per-aggregate FIFO handling and conflict-safe writes with optimistic concurrency.
  • Resilient Projections: Restart-safe views with checkpoints, readiness gates, and locking.
  • Fast Rehydration: Automatic snapshotting and selective event restores.
  • Distributed Sagas: Built-in event correlation and origin propagation for complex workflows.
  • Smart Pipelines: Pluggable dispatching with back-pressure and concurrency limits.
  • Pluggable by Design: Thin interfaces on every component - swap any piece, without patching the library or your domain code.

The core is infrastructure-agnostic, but the heavy lifting for common stacks is done, so you can mix and match sub-modules to fit your environment:

  • node-cqrs/sqlite – Embedded per-process event storage and/or views.
  • node-cqrs/mongodb – Distributed event storage and persistent projection views for multi-process deployments.
  • node-cqrs/rabbitmq – Robust, distributed command and event bus.
  • node-cqrs/redis – Redis-backed persistent projection views for distributed deployments.

Table of Contents

Overview

Overview

Domain logic lives in three building blocks:

  • Aggregates - handle commands and emit events
  • Sagas - manage processes by reacting to events and enqueueing follow-up commands
  • Projections - consume events and update views

Commands and events are loosely typed objects implementing the IMessage interface:

interface IMessage<TPayload = unknown> {

	/** Event or command type */
	type: string;

	/** Target aggregate identifier for commands, originating aggregate identifier for events */
	aggregateId?: Identifier;

	/** Aggregate version at the time of the message */
	aggregateVersion?: number;

	/** Starter event ids of sagas associated with this message, keyed by saga descriptor */
	sagaOrigins?: Record<string, string>;

	/** Business data */
	payload: TPayload;

	/** Optional metadata/context (e.g. auth info, request id); set on commands, copied to events */
	context?: any;
}

Message delivery is handled by the following components, in order:

  • Command Bus - routes commands to handlers
  • Aggregate Command Handler - restores aggregate state and executes commands
  • Event Store - runs the event dispatch pipeline (e.g. encoding, persistence), then publishes events to the event bus for delivery to all subscribers
  • Saga Event Handler - restores saga state and applies events

src/, tests/, and examples/ are good entry points - the codebase is intentionally small and readable.

Installation

npm install node-cqrs

Node.js 16+ and browsers are supported.

ContainerBuilder

Wire buses, the event store, and your domain components with dependency injection:

const builder = new ContainerBuilder();

builder.register(InMemoryEventStorage); // implements IEventStorageReader, IDispatchPipelineProcessor, and IIdentifierProvider
builder.registerAggregate(UserAggregate);
builder.registerProjection(UsersProjection, 'usersView');
builder.registerSaga(WelcomeEmailSaga);

const { commandBus, eventStore, usersView } = builder.container();
Manual setup (without DI container)
const commandBus = new InMemoryMessageBus();
const eventBus = new InMemoryMessageBus();
const eventStorage = new InMemoryEventStorage();
const eventStore = new EventStore({
	eventStorageReader: eventStorage,
	identifierProvider: eventStorage,
	eventDispatchPipeline: [eventStorage],
	eventBus
});

const aggregateCommandHandler = new AggregateCommandHandler({ eventStore, aggregateType: UserAggregate });
aggregateCommandHandler.subscribe(commandBus);

const projection = new UsersProjection();
projection.subscribe(eventStore);
projection.restore(eventStore);
const users = projection.view;

Commands

Commands represent intent. Send them via commandBus:

commandBus.send('signupUser', undefined, { payload: { profile, password } });
// or
commandBus.send({ type: 'signupUser', payload: { profile, password } });

Commands are handled by Aggregates and may also be enqueued by Sagas.

Write Model (Aggregates)

Aggregates handle commands, validate business rules, and emit events. Minimal contract (IAggregate):

interface IAggregate {

	/**
	 * Applies a single event to update the aggregate's internal state.
	 *
	 * This method is used primarily when rehydrating the aggregate
	 * from the persisted sequence of events
	 *
	 * @param event - The event to be applied
	 */
	mutate(event: IEvent): void;

	/**
	 * Processes a command by executing the aggregate's business logic,
	 * resulting in new events that capture the state changes.
	 * It serves as the primary entry point for invoking aggregate behavior
	 *
	 * @param command - The command to be processed
	 * @returns A set of events produced by the command
	 */
	handle(command: ICommand): IEventSet | Promise<IEventSet>;
}

AbstractAggregate

The recommended base class. Public method names are matched to command types - createUser() handles createUser:

class UserAggregate extends AbstractAggregate<void> {
	createUser(payload: CreateUserCommandPayload) {
		this.emit('userCreated', { username: payload.username });
	}
}

Override static get handles() to declare command types explicitly.

Aggregate State

Keep state separate from command handlers - derive it by projecting the aggregate's own events:

class UserAggregateState {
	passwordHash: string;

	passwordChanged(event: IEvent<PasswordChangedEventPayload>) {
		this.passwordHash = event.payload.passwordHash;
	}
}

class UserAggregate extends AbstractAggregate<UserAggregateState> {
	protected readonly state = new UserAggregateState();

	changePassword(payload: ChangePasswordCommandPayload) {
		if (md5(payload.oldPassword) !== this.state.passwordHash)
			throw new Error('Invalid password');

		this.emit('passwordChanged', { passwordHash: md5(payload.newPassword) });
	}
}

State must not throw - all validation belongs in the aggregate command handler.

External Dependencies

Constructor arguments are injected automatically by the DI container:

class UserAggregate extends AbstractAggregate {
	constructor({ id, authService }) {
		super({ id });
		this._authService = authService;
	}

	async signupUser(payload) {
		await this._authService.registerUser(payload);
	}
}

builder.register(AuthService).as('authService');
builder.registerAggregate(UserAggregate);

Read Model (Projections and Views)

Projections listen to events and update views. Minimal contract (IProjection):

interface IProjection<TView> extends IObserver {
	readonly view: TView;

	/** Subscribe to new events */
	subscribe(eventStore: IObservable): Promise<void> | void;

	/** Restore view state from not-yet-projected events */
	restore(eventStore: IEventStorageReader): Promise<void> | void;

	/** Project new event */
	project(event: IEvent): Promise<void> | void;
}

AbstractProjection

Same name-matching rule as AbstractAggregate - userCreated() handles the userCreated event:

class UsersProjection extends AbstractProjection<Map<string, { 
	username: string
}>> {
	constructor() {
		super();
		this.view = new Map();
	}

	userCreated(event: IEvent<UserCreatedEventPayload>) {
		this.view.set(event.aggregateId, {
			username: event.payload.username
		});
	}
}

Override static get handles() to declare event types explicitly.

View restoring on start

For persistent views and safe restarts, implement IViewLocker and IEventLocker on the projection view to enable catch-up and last-processed checkpoints.

Accessing views

// optional interface for container typing
interface IMyContainer extends IContainer {
	usersView: UsersView;
}

const builder = new ContainerBuilder<IMyContainer>();
builder.registerProjection(UsersProjection, 'usersView');

const { usersView } = builder.container();

For projections that manage and need to expose multiple views:

builder.registerProjection(UsersProjection).as('usersProjection');
builder.register(c => c.usersProjection.users).as('usersView');
builder.register(c => c.usersProjection.connections).as('connectionsView');

Sagas

Sagas coordinate multi-step processes by reacting to events and enqueueing follow-up commands.

class WelcomeEmailSaga extends AbstractSaga {
	userSignedUp(event) {
		this.enqueue('sendWelcomeEmail', undefined, {
			email: event.payload.email
		});
	}
}

builder.register(EventIdAugmentor).as('eventIdAugmenter'); // required: adds event.id
builder.registerSaga(WelcomeEmailSaga);
  • Handler methods are named after event types (userSignedUp handles userSignedUp)
  • this.enqueue(commandType, aggregateId, payload) produces commands
  • EventIdAugmentor must be in the dispatch pipeline - starter events use event.id as the saga origin
  • static sagaDescriptor (optional) - stable key for message.sagaOrigins, defaults to class name

handle(event) runs the handler before mutate(event), so handlers always see the previous state.

Saga context is tracked in message.sagaOrigins[sagaDescriptor], storing the starter event id. A saga starts when sagaOrigins[sagaDescriptor] is absent and continues when it is present. A single event type can start multiple saga types.

Optional: explicit startsWith/handles

By default, the saga starts on any handled event that does not have sagaOrigins[sagaDescriptor] and continues when it does.

For strict, explicit routing:

  • static startsWith: event types allowed to start a saga
  • static handles: additional event types to subscribe to
Manual wiring (without DI container)
const commandBus = new InMemoryMessageBus();
const eventBus = new InMemoryMessageBus();
const eventStorage = new InMemoryEventStorage();
const eventStore = new EventStore({
	eventStorageReader: eventStorage,
	identifierProvider: eventStorage,
	eventDispatchPipeline: [
		new EventIdAugmentor({ identifierProvider: eventStorage }),
		eventStorage
	],
	eventBus
});

SignupAggregate.register(eventStore, commandBus);
WelcomeEmailSaga.register(eventStore, commandBus);

Minimal contract (ISaga):

interface ISaga {

	/**
	 * Apply a historical event to restore saga state.
	 */
	mutate(event: IEvent): unknown | Promise<unknown>;

	/**
	 * Process an incoming event.
	 *
	 * @returns Commands produced by the saga in response to the event
	 */
	handle(event: IEvent): ReadonlyArray<ICommand> | Promise<ReadonlyArray<ICommand>>;
}

Infrastructure Modules

Swap implementations by registering different classes in the DI container. All modules below implement the same interfaces - pick what fits your deployment.

Capability Matrix

Module Event storage Object view storage Projection wiring / execution Message buses
node-cqrs InMemoryEventStorage, InMemorySnapshotStorage InMemoryView, InMemoryLock AbstractProjection InMemoryMessageBus
node-cqrs/sqlite SqliteEventStorage SqliteObjectView, SqliteObjectStorage, SqliteViewLocker, SqliteEventLocker AbstractSqliteObjectProjection, AbstractSqliteView -
node-cqrs/mongodb MongoEventStorage MongoObjectView, MongoObjectStorage, MongoViewLocker, MongoEventLocker AbstractMongoObjectProjection, AbstractMongoView -
node-cqrs/redis - RedisView, RedisObjectStorage, RedisViewLocker, RedisEventLocker AbstractRedisProjection -
node-cqrs/rabbitmq - - - RabbitMqGateway, RabbitMqCommandBus, RabbitMqEventBus
node-cqrs/workers - - AbstractWorkerProjection, WorkerProxyProjection -

Event Storage

Where aggregate events are persisted and replayed from.

Implementation Import Peer deps Notes
InMemoryEventStorage node-cqrs - Dev/test only; data lost on restart (example)
SqliteEventStorage node-cqrs/sqlite better-sqlite3 Embedded, single-process (example)
MongoEventStorage node-cqrs/mongodb mongodb Distributed, multi-process (example)

Read Model

Where projections store and query their read-side state. Each persistent backend provides the same layered set of building blocks:

Layer Purpose
Object storage Key/value CRUD with optimistic concurrency
View locker Prevents concurrent schema-migration rebuilds - only one process rebuilds at a time; others wait
Event locker Per-event deduplication and last-projected checkpoint
Composite view Combines the above into a single view object
Base projection Wires locking, checkpointing, and error handling automatically

In-memory

Class Notes
InMemoryLock Simple in-process lock
InMemoryView Simple Map-backed view; restores from events on each restart

SQLite (node-cqrs/sqlite, peer dep: better-sqlite3)

Class Role
SqliteObjectStorage Key/value object storage with version-based concurrency
SqliteViewLocker Prevents concurrent schema-migration rebuilds via SQLite row lock
SqliteEventLocker Event deduplication and last-event checkpoint
AbstractSqliteView Base class for relational (non-object) SQLite views with view and event locks embedded
SqliteObjectView Composite view combining the above
AbstractSqliteObjectProjection Base projection wired to SqliteObjectView

See src/sqlite for additional documentation, and examples/sqlite for runnable project examples

MongoDB (node-cqrs/mongodb, peer dep: mongodb)

Experimental - not yet validated in production. APIs may change in minor versions.

Class Role
MongoObjectStorage Document storage with version-based optimistic concurrency
MongoViewLocker Prevents concurrent schema-migration rebuilds; auto-prolongs lock via token + TTL
MongoEventLocker Event deduplication and last-event checkpoint
AbstractMongoView Base class combining MongoViewLocker + MongoEventLocker
MongoObjectView Composite view combining the above
AbstractMongoObjectProjection Base projection wired to MongoObjectView

See src/mongodb for additional documentation, and examples/mongodb-views for runnable projection examples.

Redis (node-cqrs/redis, peer dep: ioredis)

Experimental - not yet validated in production. APIs may change in minor versions.

Class Role
RedisObjectStorage Key/value object storage backed by Redis hashes
RedisViewLocker Prevents concurrent schema-migration rebuilds; auto-prolongs lock via PEXPIRE
RedisEventLocker Event deduplication and last-event checkpoint
RedisView Composite view combining the above
AbstractRedisProjection Base projection wired to RedisView

See src/redis for additional documentation, and examples/redis for runnable projection examples.

Message Buses

How commands and events move between producers and consumers.

Implementation Import Peer deps Notes
InMemoryMessageBus node-cqrs - Single-process; used as both command and event bus (example)
RabbitMqEventBus node-cqrs/rabbitmq amqplib Fanout delivery to all subscribers (instructions)
RabbitMqCommandBus node-cqrs/rabbitmq amqplib Point-to-point via durable queue (instructions)

Other

Implementation Import Notes
InMemorySnapshotStorage node-cqrs Aggregate snapshot cache in memory, resets on process restart
AbstractWorkerProjection node-cqrs/workers Run projections in worker threads (instructions, example)

Experimental - the Workers module is new and has not been validated in production. APIs may change in minor versions.

OpenTelemetry

Optional distributed tracing via OpenTelemetry. Requires @opentelemetry/api peer dependency. Register a tracerFactory in the container to enable automatic span creation across CQRS components:

import { trace } from '@opentelemetry/api';

builder.register(() => (name: string) => trace.getTracer(`cqrs.${name}`)).as('tracerFactory');

See examples/telemetry/index.ts for a full working example.

Examples

TS examples can be run with NodeJS 24+ without transpiling.

About

CQRS backbone with event sourcing

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Packages

 
 
 

Contributors