Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/fix-isnull-predicate-subset.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@tanstack/db': patch
'@tanstack/electric-db-collection': patch
---

Fix isNull predicate causing LiveQuery to never become ready when offline. Reorder predicate checks in `isWhereSubsetInternal` so OR superset handling runs before AND subset decomposition, allowing `and(eq, isNull)` to match structurally equal disjuncts. Also separate `forceDisconnectAndRefresh` error handling into its own try-catch with correct error attribution.
34 changes: 18 additions & 16 deletions packages/db/src/query/predicate-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ function isWhereSubsetInternal(
)
}

// Handle OR in subset: (A OR B) ⊆ C only if both A ⊆ C and B ⊆ C.
// Must be checked before OR superset so that or(A, B) ⊆ or(C, D)
// decomposes the subset first: A ⊆ or(C, D) AND B ⊆ or(C, D).
if (subset.type === `func` && subset.name === `or`) {
return subset.args.every((arg) =>
isWhereSubsetInternal(arg as BasicExpression<boolean>, superset),
)
}

// Handle OR in superset: subset ⊆ (A OR B) if subset ⊆ A or subset ⊆ B.
// Must be checked before decomposing AND subsets so that and(A, B) can
// match a structurally equal disjunct via areExpressionsEqual.
if (superset.type === `func` && superset.name === `or`) {
return superset.args.some((arg) =>
isWhereSubsetInternal(subset, arg as BasicExpression<boolean>),
)
}

// Handle subset being an AND: (A AND B) implies both A and B
if (subset.type === `func` && subset.name === `and`) {
// For (A AND B) ⊆ C, since (A AND B) implies A, we check if any conjunct implies C
Expand All @@ -111,22 +129,6 @@ function isWhereSubsetInternal(
}
}

// Handle OR in subset: (A OR B) is subset of C only if both A and B are subsets of C
if (subset.type === `func` && subset.name === `or`) {
return subset.args.every((arg) =>
isWhereSubsetInternal(arg as BasicExpression<boolean>, superset),
)
}

// Handle OR in superset: subset ⊆ (A OR B) if subset ⊆ A or subset ⊆ B
// (A OR B) as superset means data can satisfy A or B
// If subset is contained in any disjunct, it's contained in the union
if (superset.type === `func` && superset.name === `or`) {
return superset.args.some((arg) =>
isWhereSubsetInternal(subset, arg as BasicExpression<boolean>),
)
}

// Handle comparison operators on the same field
if (subset.type === `func` && superset.type === `func`) {
const subsetFunc = subset as Func
Expand Down
65 changes: 65 additions & 0 deletions packages/db/tests/query/predicate-utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,71 @@ describe(`isWhereSubset`, () => {
})
})

describe(`AND subset with OR superset`, () => {
it(`should recognize and(eq, isNull) as subset of or(and(eq, isNull), and(eq, isNull))`, () => {
const projectX = `4e164373-31b4-4b42-95c9-9c395cfb4916`
const projectY = `2fd4c147-2547-4b02-9554-9cd067187409`

const queryX = and(
eq(ref(`project_id`), val(projectX)),
func(`isNull`, ref(`soft_deleted_at`)),
)
const queryY = and(
eq(ref(`project_id`), val(projectY)),
func(`isNull`, ref(`soft_deleted_at`)),
)

const unionPredicate = or(queryX, queryY)

expect(isWhereSubset(queryX, unionPredicate)).toBe(true)
expect(isWhereSubset(queryY, unionPredicate)).toBe(true)
})

it(`should recognize and(A, B) as subset of or(and(A, B), and(C, D))`, () => {
const subsetExpr = and(eq(ref(`id`), val(1)), gt(ref(`age`), val(20)))
const supersetExpr = or(
and(eq(ref(`id`), val(1)), gt(ref(`age`), val(20))),
and(eq(ref(`id`), val(2)), gt(ref(`age`), val(30))),
)
expect(isWhereSubset(subsetExpr, supersetExpr)).toBe(true)
})

it(`should return false when and(A, B) matches no disjunct`, () => {
const subsetExpr = and(eq(ref(`id`), val(3)), gt(ref(`age`), val(20)))
const supersetExpr = or(
and(eq(ref(`id`), val(1)), gt(ref(`age`), val(20))),
and(eq(ref(`id`), val(2)), gt(ref(`age`), val(30))),
)
expect(isWhereSubset(subsetExpr, supersetExpr)).toBe(false)
})
})

describe(`isNull predicates`, () => {
it(`should return true for identical isNull expressions`, () => {
const a = func(`isNull`, ref(`deleted_at`))
const b = func(`isNull`, ref(`deleted_at`))
expect(isWhereSubset(a, b)).toBe(true)
})

it(`should return false for isNull on different fields`, () => {
const a = func(`isNull`, ref(`deleted_at`))
const b = func(`isNull`, ref(`created_at`))
expect(isWhereSubset(a, b)).toBe(false)
})

it(`should return true for and(eq, isNull) subset of identical and(eq, isNull)`, () => {
const subset = and(
eq(ref(`project_id`), val(`abc`)),
func(`isNull`, ref(`soft_deleted_at`)),
)
const superset = and(
eq(ref(`project_id`), val(`abc`)),
func(`isNull`, ref(`soft_deleted_at`)),
)
expect(isWhereSubset(subset, superset)).toBe(true)
})
})

describe(`different fields`, () => {
it(`should return false for different fields with no relationship`, () => {
expect(
Expand Down
19 changes: 19 additions & 0 deletions packages/electric-db-collection/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,25 @@ function createLoadSubsetDedupe<T extends Row<unknown>>({

const { cursor, where, orderBy, limit } = opts

// When the stream is already up-to-date, it may be in a long-poll wait.
// Forcing a disconnect-and-refresh ensures requestSnapshot gets a response
// from a fresh server round-trip rather than waiting for the current poll to end.
// If the refresh fails (e.g., PauseLock held during subscriber processing in
// join pipelines), we fall through to requestSnapshot which still works.
if (stream.isUpToDate) {
try {
await stream.forceDisconnectAndRefresh()
} catch (error) {
if (handleSnapshotError(error, `forceDisconnectAndRefresh`)) {
return
}
debug(
`${logPrefix}forceDisconnectAndRefresh failed, proceeding to requestSnapshot: %o`,
error,
)
}
}

try {
if (cursor) {
const whereCurrentOpts: LoadSubsetOptions = {
Expand Down
66 changes: 65 additions & 1 deletion packages/electric-db-collection/tests/electric.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import type { StandardSchemaV1 } from '@standard-schema/spec'
const mockSubscribe = vi.fn()
const mockRequestSnapshot = vi.fn()
const mockFetchSnapshot = vi.fn()
const mockForceDisconnectAndRefresh = vi.fn()
const mockStream = {
subscribe: mockSubscribe,
requestSnapshot: mockRequestSnapshot,
fetchSnapshot: mockFetchSnapshot,
forceDisconnectAndRefresh: mockForceDisconnectAndRefresh,
isUpToDate: false,
}

vi.mock(`@electric-sql/client`, async () => {
Expand Down Expand Up @@ -56,6 +59,8 @@ describe(`Electric Integration`, () => {

// Reset mock requestSnapshot
mockRequestSnapshot.mockResolvedValue(undefined)
mockForceDisconnectAndRefresh.mockResolvedValue(undefined)
mockStream.isUpToDate = false

// Create collection with Electric configuration
const config = {
Expand Down Expand Up @@ -2376,7 +2381,7 @@ describe(`Electric Integration`, () => {
// In on-demand mode, calling loadSubset should request a snapshot
await testCollection._sync.loadSubset({ limit: 10 })

// Verify requestSnapshot was called
expect(mockForceDisconnectAndRefresh).not.toHaveBeenCalled()
expect(mockRequestSnapshot).toHaveBeenCalledWith(
expect.objectContaining({
limit: 10,
Expand All @@ -2385,6 +2390,65 @@ describe(`Electric Integration`, () => {
)
})

it(`should refresh the stream before requesting on-demand snapshots when already up-to-date`, async () => {
vi.clearAllMocks()

const config = {
id: `on-demand-refresh-before-snapshot-test`,
shapeOptions: {
url: `http://test-url`,
params: {
table: `test_table`,
},
},
syncMode: `on-demand` as const,
getKey: (item: Row) => item.id as number,
startSync: true,
}

const testCollection = createCollection(electricCollectionOptions(config))

mockStream.isUpToDate = true

await testCollection._sync.loadSubset({ limit: 10 })

expect(mockForceDisconnectAndRefresh).toHaveBeenCalledTimes(1)
expect(mockRequestSnapshot).toHaveBeenCalledTimes(1)
const refreshCall =
mockForceDisconnectAndRefresh.mock.invocationCallOrder[0]!
const snapshotCall = mockRequestSnapshot.mock.invocationCallOrder[0]!
expect(refreshCall).toBeLessThan(snapshotCall)
})

it(`should fall through to requestSnapshot when forceDisconnectAndRefresh fails`, async () => {
vi.clearAllMocks()

const config = {
id: `on-demand-refresh-fallthrough-test`,
shapeOptions: {
url: `http://test-url`,
params: {
table: `test_table`,
},
},
syncMode: `on-demand` as const,
getKey: (item: Row) => item.id as number,
startSync: true,
}

const testCollection = createCollection(electricCollectionOptions(config))

mockStream.isUpToDate = true
mockForceDisconnectAndRefresh.mockImplementationOnce(async () => {
throw new Error(`PauseLock held`)
})

await testCollection._sync.loadSubset({ limit: 10 })

expect(mockForceDisconnectAndRefresh).toHaveBeenCalledTimes(1)
expect(mockRequestSnapshot).toHaveBeenCalledTimes(1)
})

it(`should fetch snapshots in progressive mode when loadSubset is called before sync completes`, async () => {
vi.clearAllMocks()

Expand Down
Loading