Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion packages/db/src/collection/changes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type { CollectionLifecycleManager } from './lifecycle.js'
import type { CollectionSyncManager } from './sync.js'
import type { CollectionEventsManager } from './events.js'
import type { CollectionImpl } from './index.js'
import type { CollectionStateManager } from './state.js'

export class CollectionChangesManager<
TOutput extends object = Record<string, unknown>,
Expand All @@ -21,6 +22,7 @@ export class CollectionChangesManager<
private sync!: CollectionSyncManager<TOutput, TKey, TSchema, TInput>
private events!: CollectionEventsManager
private collection!: CollectionImpl<TOutput, TKey, any, TSchema, TInput>
private state!: CollectionStateManager<TOutput, TKey, TSchema, TInput>

public activeSubscribersCount = 0
public changeSubscriptions = new Set<CollectionSubscription>()
Expand All @@ -37,11 +39,13 @@ export class CollectionChangesManager<
sync: CollectionSyncManager<TOutput, TKey, TSchema, TInput>
events: CollectionEventsManager
collection: CollectionImpl<TOutput, TKey, any, TSchema, TInput>
state: CollectionStateManager<TOutput, TKey, TSchema, TInput>
}) {
this.lifecycle = deps.lifecycle
this.sync = deps.sync
this.events = deps.events
this.collection = deps.collection
this.state = deps.state
}

/**
Expand All @@ -55,6 +59,16 @@ export class CollectionChangesManager<
}
}

/**
* Enriches a change message with virtual properties ($synced, $origin, $key, $collectionId).
* Uses the "add-if-missing" pattern to preserve virtual properties from upstream collections.
*/
private enrichChangeWithVirtualProps(
change: ChangeMessage<TOutput, TKey>,
): ChangeMessage<TOutput, TKey> {
return this.state.enrichChangeMessage(change)
}

/**
* Emit events either immediately or batch them for later emission
*/
Expand Down Expand Up @@ -87,9 +101,15 @@ export class CollectionChangesManager<
return
}

// Enrich all change messages with virtual properties
// This uses the "add-if-missing" pattern to preserve pass-through semantics
const enrichedEvents = eventsToEmit.map((change) =>
this.enrichChangeWithVirtualProps(change),
)

// Emit to all listeners
for (const subscription of this.changeSubscriptions) {
subscription.emitEvents(eventsToEmit)
subscription.emitEvents(enrichedEvents)
}
}

Expand Down
1 change: 1 addition & 0 deletions packages/db/src/collection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ export class CollectionImpl<
lifecycle: this._lifecycle,
sync: this._sync,
events: this._events,
state: this._state, // Required for enriching changes with virtual properties
})
this._events.setDeps({
collection: this, // Required for adding to emitted events
Expand Down
99 changes: 99 additions & 0 deletions packages/db/src/collection/state.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { deepEquals } from '../utils'
import { SortedMap } from '../SortedMap'
import { enrichRowWithVirtualProps } from '../virtual-props.js'
import type { VirtualOrigin, WithVirtualProps } from '../virtual-props.js'
import type { Transaction } from '../transactions'
import type { StandardSchemaV1 } from '@standard-schema/spec'
import type {
Expand Down Expand Up @@ -58,6 +60,24 @@ export class CollectionStateManager<
public optimisticUpserts = new Map<TKey, TOutput>()
public optimisticDeletes = new Set<TKey>()

/**
* Tracks the origin of confirmed changes for each row.
* 'local' = change originated from this client
* 'remote' = change was received via sync
*
* This is used for the $origin virtual property.
* Note: This only tracks *confirmed* changes, not optimistic ones.
* Optimistic changes are always considered 'local' for $origin.
*/
public rowOrigins = new Map<TKey, VirtualOrigin>()

/**
* Tracks keys that have pending local changes.
* Used to determine whether sync-confirmed data should have 'local' or 'remote' origin.
* When sync confirms data for a key with pending local changes, it keeps 'local' origin.
*/
public pendingLocalChanges = new Set<TKey>()

// Cached size for performance
public size = 0

Expand Down Expand Up @@ -96,6 +116,66 @@ export class CollectionStateManager<
this._events = deps.events
}

/**
* Checks if a row has pending optimistic mutations (not yet confirmed by sync).
* Used to compute the $synced virtual property.
*/
public isRowSynced(key: TKey): boolean {
return !this.optimisticUpserts.has(key) && !this.optimisticDeletes.has(key)
}

/**
* Gets the origin of the last confirmed change to a row.
* Returns 'local' if the row has optimistic mutations (optimistic changes are local).
* Used to compute the $origin virtual property.
*/
public getRowOrigin(key: TKey): VirtualOrigin {
// If there are optimistic changes, they're local
if (this.optimisticUpserts.has(key) || this.optimisticDeletes.has(key)) {
return 'local'
}
// Otherwise, return the confirmed origin (defaults to 'remote' for synced data)
return this.rowOrigins.get(key) ?? 'remote'
}

/**
* Enriches a row with virtual properties using the "add-if-missing" pattern.
* If the row already has virtual properties (from an upstream collection),
* they are preserved. Otherwise, new values are computed.
*/
public enrichWithVirtualProps(
row: TOutput,
key: TKey,
): WithVirtualProps<TOutput, TKey> {
return enrichRowWithVirtualProps(
row,
key,
this.collection.id,
() => this.isRowSynced(key),
() => this.getRowOrigin(key),
)
}

/**
* Creates a change message with virtual properties.
* Uses the "add-if-missing" pattern so that pass-through from upstream
* collections works correctly.
*/
public enrichChangeMessage(
change: ChangeMessage<TOutput, TKey>,
): ChangeMessage<WithVirtualProps<TOutput, TKey>, TKey> {
const enrichedValue = this.enrichWithVirtualProps(change.value, change.key)
const enrichedPreviousValue = change.previousValue
? this.enrichWithVirtualProps(change.previousValue, change.key)
: undefined

return {
...change,
value: enrichedValue,
previousValue: enrichedPreviousValue,
} as ChangeMessage<WithVirtualProps<TOutput, TKey>, TKey>
}

/**
* Get the current value for a key (virtual derived state)
*/
Expand Down Expand Up @@ -259,6 +339,9 @@ export class CollectionStateManager<
for (const transaction of activeTransactions) {
for (const mutation of transaction.mutations) {
if (this.isThisCollection(mutation.collection) && mutation.optimistic) {
// Track that this key has pending local changes for $origin tracking
this.pendingLocalChanges.add(mutation.key)

switch (mutation.type) {
case `insert`:
case `update`:
Expand Down Expand Up @@ -582,10 +665,18 @@ export class CollectionStateManager<
break
}

// Determine origin: 'local' if this key had pending local changes, 'remote' otherwise
const origin: VirtualOrigin = this.pendingLocalChanges.has(key)
? 'local'
: 'remote'

// Update synced data
switch (operation.type) {
case `insert`:
this.syncedData.set(key, operation.value)
this.rowOrigins.set(key, origin)
// Clear pending local changes now that sync has confirmed
this.pendingLocalChanges.delete(key)
break
case `update`: {
if (rowUpdateMode === `partial`) {
Expand All @@ -598,10 +689,16 @@ export class CollectionStateManager<
} else {
this.syncedData.set(key, operation.value)
}
this.rowOrigins.set(key, origin)
// Clear pending local changes now that sync has confirmed
this.pendingLocalChanges.delete(key)
break
}
case `delete`:
this.syncedData.delete(key)
// Clean up origin and pending tracking for deleted rows
this.rowOrigins.delete(key)
this.pendingLocalChanges.delete(key)
break
}
}
Expand Down Expand Up @@ -908,6 +1005,8 @@ export class CollectionStateManager<
this.syncedMetadata.clear()
this.optimisticUpserts.clear()
this.optimisticDeletes.clear()
this.rowOrigins.clear()
this.pendingLocalChanges.clear()
this.size = 0
this.pendingSyncedTransactions = []
this.syncedKeys.clear()
Expand Down
9 changes: 9 additions & 0 deletions packages/db/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ export { deepEquals } from './utils'
export * from './paced-mutations'
export * from './strategies/index.js'

// Virtual properties exports
export {
type VirtualRowProps,
type VirtualOrigin,
type WithVirtualProps,
type WithoutVirtualProps,
hasVirtualProps,
} from './virtual-props.js'

// Index system exports
export * from './indexes/base-index.js'
export * from './indexes/btree-index.js'
Expand Down
6 changes: 6 additions & 0 deletions packages/db/src/local-only.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,12 @@ function createLocalOnlySync<T extends object, TKey extends string | number>(

// Apply initial data if provided
if (initialData && initialData.length > 0) {
// Mark initial data as local so $origin is 'local' for local-only collections
for (const item of initialData) {
const key = params.collection.getKeyFromItem(item)
params.collection._state.pendingLocalChanges.add(key)
}

begin()
initialData.forEach((item) => {
write({
Expand Down
28 changes: 24 additions & 4 deletions packages/db/src/query/builder/ref-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,38 @@ export interface RefProxy<T = any> {
readonly __type: T
}

/**
* Virtual properties available on all row ref proxies.
* These allow querying on sync status, origin, key, and collection ID.
*/
export type VirtualPropsRefProxy<
TKey extends string | number = string | number,
> = {
readonly $synced: RefLeaf<boolean>
readonly $origin: RefLeaf<'local' | 'remote'>
readonly $key: RefLeaf<TKey>
readonly $collectionId: RefLeaf<string>
}

/**
* Type for creating a RefProxy for a single row/type without namespacing
* Used in collection indexes and where clauses
*
* Includes virtual properties ($synced, $origin, $key, $collectionId) for
* querying on sync status and row metadata.
*/
export type SingleRowRefProxy<T> =
export type SingleRowRefProxy<
T,
TKey extends string | number = string | number,
> =
T extends Record<string, any>
? {
[K in keyof T]: T[K] extends Record<string, any>
? SingleRowRefProxy<T[K]> & RefProxy<T[K]>
? SingleRowRefProxy<T[K], TKey> & RefProxy<T[K]>
: RefLeaf<T[K]>
} & RefProxy<T>
: RefProxy<T>
} & RefProxy<T> &
VirtualPropsRefProxy<TKey>
: RefProxy<T> & VirtualPropsRefProxy<TKey>

/**
* Creates a proxy object that records property access paths for a single row
Expand Down
34 changes: 33 additions & 1 deletion packages/db/src/query/builder/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type {
Value,
} from '../ir.js'
import type { QueryBuilder } from './index.js'
import type { VirtualOrigin } from '../../virtual-props.js'

/**
* Context - The central state container for query builder operations
Expand Down Expand Up @@ -471,6 +472,32 @@ type NonUndefined<T> = T extends undefined ? never : T
// Helper type to extract non-null type
type NonNull<T> = T extends null ? never : T

/**
* Virtual properties available on all Ref types in query builders.
* These allow querying on sync status, origin, key, and collection ID.
*
* @example
* ```typescript
* // Filter by sync status
* .where(({ user }) => eq(user.$synced, true))
*
* // Filter by origin
* .where(({ order }) => eq(order.$origin, 'local'))
*
* // Access key in select
* .select(({ user }) => ({
* key: user.$key,
* collectionId: user.$collectionId,
* }))
* ```
*/
type VirtualPropsRef = {
readonly $synced: RefLeaf<boolean>
readonly $origin: RefLeaf<VirtualOrigin>
readonly $key: RefLeaf<string | number>
readonly $collectionId: RefLeaf<string>
}

/**
* Ref - The user-facing ref interface for the query builder
*
Expand All @@ -482,12 +509,16 @@ type NonNull<T> = T extends null ? never : T
* When spread in select clauses, it correctly produces the underlying data type
* without Ref wrappers, enabling clean spread operations.
*
* Includes virtual properties ($synced, $origin, $key, $collectionId) for
* querying on sync status and row metadata.
*
* Example usage:
* ```typescript
* // Clean interface - no internal properties visible
* const users: Ref<{ id: number; profile?: { bio: string } }> = { ... }
* users.id // Ref<number> - clean display
* users.profile?.bio // Ref<string> - nested optional access works
* users.$synced // RefLeaf<boolean> - virtual property access
*
* // Spread operations work cleanly:
* select(({ user }) => ({ ...user })) // Returns User type, not Ref types
Expand All @@ -513,7 +544,8 @@ export type Ref<T = any> = {
IsPlainObject<T[K]> extends true
? Ref<T[K]>
: RefLeaf<T[K]>
} & RefLeaf<T>
} & RefLeaf<T> &
VirtualPropsRef

/**
* Ref - The user-facing ref type with clean IDE display
Expand Down
Loading
Loading