From 041c2d0af459686e3916aa70ac395fbaebcb7343 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 18 Jun 2025 16:59:43 +0200 Subject: [PATCH 01/20] WIP incremental topKWithFractionalIndex --- packages/d2mini/src/indexes.ts | 7 + .../src/operators/topKWithFractionalIndex.ts | 163 +++++++++++++++++- packages/d2mini/src/utils.ts | 21 +++ 3 files changed, 188 insertions(+), 3 deletions(-) diff --git a/packages/d2mini/src/indexes.ts b/packages/d2mini/src/indexes.ts index c02893e..1503881 100644 --- a/packages/d2mini/src/indexes.ts +++ b/packages/d2mini/src/indexes.ts @@ -35,6 +35,13 @@ export class Index { return [...valueMap.values()] } + getMultiplicity(key: K, value: V): number { + const valueMap = this.#inner.get(key) + const valueHash = hash(value) + const [, multiplicity] = valueMap.get(valueHash) + return multiplicity + } + entries() { return this.#inner.entries() } diff --git a/packages/d2mini/src/operators/topKWithFractionalIndex.ts b/packages/d2mini/src/operators/topKWithFractionalIndex.ts index 6d7ed74..bf4998e 100644 --- a/packages/d2mini/src/operators/topKWithFractionalIndex.ts +++ b/packages/d2mini/src/operators/topKWithFractionalIndex.ts @@ -8,13 +8,19 @@ import { StreamBuilder } from '../d2.js' import { MultiSet } from '../multiset.js' import { Index } from '../indexes.js' import { generateKeyBetween } from 'fractional-indexing' -import { hash } from '../utils.js' +import { binarySearch, hash } from '../utils.js' interface TopKWithFractionalIndexOptions { limit?: number offset?: number } +type FractionalIndex = string +type IndexedValue = [V, FractionalIndex] +const indexedValue = (value: V, index: FractionalIndex): IndexedValue => [value, index] +const getValue = (indexedValue: IndexedValue): V => indexedValue[0] +const getIndex = (indexedValue: IndexedValue): FractionalIndex => indexedValue[1] + /** * Operator for fractional indexed topK operations * This operator maintains fractional indices for sorted elements @@ -22,14 +28,17 @@ interface TopKWithFractionalIndexOptions { */ export class TopKWithFractionalIndexOperator extends UnaryOperator< [K, V1], - [K, [V1, string]] + [K, IndexedValue] > { #index = new Index() - #indexOut = new Index() + #indexOut = new Index>() #comparator: (a: V1, b: V1) => number #limit: number #offset: number + /** A map of keys to a sorted array of values for those keys */ + #sortedValues: Map>> = new Map() + constructor( id: number, inputA: DifferenceStreamReader<[K, V1]>, @@ -44,6 +53,154 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< } run(): void { + for (const message of this.inputMessages()) { + for (const [item, multiplicity] of message.getInner()) { + const [key, value] = item + this.processElement(key, value, multiplicity) + } + } + } + + processElement(key: K, value: V1, multiplicity: number): void { + const oldMultiplicity = this.#index.getMultiplicity(key, value) + this.#index.addValue(key, [value, multiplicity]) + const newMultiplicity = this.#index.getMultiplicity(key, value) + + if (oldMultiplicity <= 0 && newMultiplicity > 0) { + // The value was invisible but should now be visible + // Need to insert it into the array of sorted values + const index = this.insert(key, value) + + // Now check if the top K changed + const topKStart = this.#offset + const topKEnd = this.#offset + this.#limit + + if (index < topKEnd) { + // The inserted element is either before the top K or within the top K + // If it is before the top K then it moves the the element that was right before the topK into the topK + // If it is within the top K then the inserted element moves into the top K + // In both cases the last element of the old top K now moves out of the top K + const moveInIndex = Math.max(index, topKStart) + const sortedValues = this.#sortedValues.get(key) ?? [] + if (moveInIndex < sortedValues.length) { + // We actually have a topK + // because in some cases there may not be enough elements in the array to reach the start of the topK + // e.g. [1, 2, 3] with K = 2 and offset = 3 does not have a topK + const moveInValue = sortedValues[moveInIndex] + // signal the move in with a +1 diff in the output index + this.#indexOut.addValue(key, [moveInValue, 1]) + + // We need to remove the element that falls out of the top K + // The element that falls out of the top K has shifted one to the right + // because of the element we inserted, so we find it at index topKEnd + if (topKEnd < sortedValues.length) { + const valueOut = sortedValues[topKEnd] + this.#indexOut.addValue(key, [valueOut, -1]) + } + } + } + + } else if (oldMultiplicity > 0 && newMultiplicity <= 0) { + // The value was visible but should now be invisible + // Need to remove it from the array of sorted values + const [removedElem, removedIndex] = this.remove(key, value) + + // Now check if the top K changed + const topKStart = this.#offset + const topKEnd = this.#offset + this.#limit + + if (removedIndex < topKEnd) { + // The removed element is either before the top K or within the top K + // If it is before the top K then the first element of the topK moves out of the topK + // If it is within the top K then the removed element moves out of the topK + let moveOutValue: IndexedValue | null = removedElem + const sortedValues = this.#sortedValues.get(key) ?? [] + if (removedIndex < topKStart) { + // The removed element is before the topK + // so actually, the first element of the topK moves out of the topK + // and not the element that we removed + // The first element of the topK is now and topKStart - 1 + // since we removed an element before the topK + const moveOutIndex = topKStart - 1 + if (moveOutIndex < sortedValues.length) { + moveOutValue = sortedValues[moveOutIndex] + } else { + // No value is moving out of the topK + // because there are no elements in the topK + moveOutValue = null + } + } + + if (moveOutValue) { + // Signal the move out with a -1 diff in the output index + this.#indexOut.addValue(key, [moveOutValue, -1]) + } + + // Since we removed an element that was before or in the topK + // the first element after the topK moved one position to the left + // and hence now falls into the topK + const moveInIndex = topKEnd - 1 + if (moveInIndex < sortedValues.length) { + const moveInValue = sortedValues[moveInIndex] + this.#indexOut.addValue(key, [moveInValue, 1]) + } + } + } else { + // The value was invisible and it remains invisible + // or it was visible and remains visible + // so it doesn't affect the topK + return + } + } + + // TODO: see if there is a way to refactor the code for insertions and removals in the topK above + // because they are very similar, one is shifting the topK window to the left and the other is shifting it to the right + // so i have the feeling there is a common pattern here and we can implement both cases + // on top of that pattern + + /** + * Inserts a value at the correct position + * into the sorted array of values for the given key. + * Returns the index of the newly inserted value. + */ + insert(key: K, value: V1): number { + // Lookup insert position + const sortedValues = this.#sortedValues.get(key) ?? [] + const index = binarySearch(sortedValues, indexedValue(value, ''), (a, b) => + this.#comparator(getValue(a), getValue(b)), + ) + + // Generate fractional index based on the fractional indices of the elements before and after it + const indexBefore = index === 0 ? null : getIndex(sortedValues[index - 1]) + const indexAfter = + index === sortedValues.length ? null : getIndex(sortedValues[index]) + const fractionalIndex = generateKeyBetween(indexBefore, indexAfter) + + // Insert the value at the correct position + const val = indexedValue(value, fractionalIndex) + sortedValues.splice(index, 0, val) // O(n) ... + this.#sortedValues.set(key, sortedValues) + return index + } + + /** + * Removes a value from the sorted array of values for the given key. + * Returns the index of the removed value. + * IMPORTANT: this assumes that the value is present in the array + * if it's not the case it will remove the element + * that is on the position where the provided `value` would be. + */ + remove(key: K, value: V1): [IndexedValue, number] { + const sortedValues = this.#sortedValues.get(key) ?? [] + const index = binarySearch(sortedValues, indexedValue(value, ''), (a, b) => + this.#comparator(getValue(a), getValue(b)), + ) + const [removedElement] = sortedValues.splice(index, 1) + this.#sortedValues.set(key, sortedValues) + return [removedElement, index] + } + + runn(): void { const keysTodo = new Set() for (const message of this.inputMessages()) { diff --git a/packages/d2mini/src/utils.ts b/packages/d2mini/src/utils.ts index 60c94b6..3067ed5 100644 --- a/packages/d2mini/src/utils.ts +++ b/packages/d2mini/src/utils.ts @@ -90,3 +90,24 @@ export function hash(data: any): string { hashCache.set(data, hashValue) return hashValue } + +export function binarySearch( + array: T[], + value: T, + comparator: (a: T, b: T) => number, +): number { + let low = 0 + let high = array.length + while (low < high) { + const mid = Math.floor((low + high) / 2) + const comparison = comparator(array[mid], value) + if (comparison < 0) { + low = mid + 1 + } else if (comparison > 0) { + high = mid + } else { + return mid + } + } + return low +} \ No newline at end of file From c5589a6c7bca125602848c813ac518b7191d2387 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 19 Jun 2025 10:31:28 +0200 Subject: [PATCH 02/20] Incremental topKWithFractionalIndex --- .../src/operators/topKWithFractionalIndex.ts | 292 ++---------------- packages/d2mini/src/utils.ts | 2 +- 2 files changed, 27 insertions(+), 267 deletions(-) diff --git a/packages/d2mini/src/operators/topKWithFractionalIndex.ts b/packages/d2mini/src/operators/topKWithFractionalIndex.ts index bf4998e..c816d2c 100644 --- a/packages/d2mini/src/operators/topKWithFractionalIndex.ts +++ b/packages/d2mini/src/operators/topKWithFractionalIndex.ts @@ -8,7 +8,7 @@ import { StreamBuilder } from '../d2.js' import { MultiSet } from '../multiset.js' import { Index } from '../indexes.js' import { generateKeyBetween } from 'fractional-indexing' -import { binarySearch, hash } from '../utils.js' +import { binarySearch } from '../utils.js' interface TopKWithFractionalIndexOptions { limit?: number @@ -17,9 +17,13 @@ interface TopKWithFractionalIndexOptions { type FractionalIndex = string type IndexedValue = [V, FractionalIndex] -const indexedValue = (value: V, index: FractionalIndex): IndexedValue => [value, index] +const indexedValue = (value: V, index: FractionalIndex): IndexedValue => [ + value, + index, +] const getValue = (indexedValue: IndexedValue): V => indexedValue[0] -const getIndex = (indexedValue: IndexedValue): FractionalIndex => indexedValue[1] +const getIndex = (indexedValue: IndexedValue): FractionalIndex => + indexedValue[1] /** * Operator for fractional indexed topK operations @@ -31,7 +35,6 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< [K, IndexedValue] > { #index = new Index() - #indexOut = new Index>() #comparator: (a: V1, b: V1) => number #limit: number #offset: number @@ -53,15 +56,25 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< } run(): void { + const result: Array<[[K, [V1, string]], number]> = [] for (const message of this.inputMessages()) { for (const [item, multiplicity] of message.getInner()) { const [key, value] = item - this.processElement(key, value, multiplicity) + this.processElement(key, value, multiplicity, result) } } + + if (result.length > 0) { + this.output.sendData(new MultiSet(result)) + } } - processElement(key: K, value: V1, multiplicity: number): void { + processElement( + key: K, + value: V1, + multiplicity: number, + result: Array<[[K, [V1, string]], number]>, + ): void { const oldMultiplicity = this.#index.getMultiplicity(key, value) this.#index.addValue(key, [value, multiplicity]) const newMultiplicity = this.#index.getMultiplicity(key, value) @@ -87,19 +100,18 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< // because in some cases there may not be enough elements in the array to reach the start of the topK // e.g. [1, 2, 3] with K = 2 and offset = 3 does not have a topK const moveInValue = sortedValues[moveInIndex] - // signal the move in with a +1 diff in the output index - this.#indexOut.addValue(key, [moveInValue, 1]) + // signal the move in with a +1 diff in the output + result.push([[key, moveInValue], 1]) // We need to remove the element that falls out of the top K // The element that falls out of the top K has shifted one to the right // because of the element we inserted, so we find it at index topKEnd if (topKEnd < sortedValues.length) { const valueOut = sortedValues[topKEnd] - this.#indexOut.addValue(key, [valueOut, -1]) + result.push([[key, valueOut], -1]) } } } - } else if (oldMultiplicity > 0 && newMultiplicity <= 0) { // The value was visible but should now be invisible // Need to remove it from the array of sorted values @@ -130,10 +142,10 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< moveOutValue = null } } - + if (moveOutValue) { // Signal the move out with a -1 diff in the output index - this.#indexOut.addValue(key, [moveOutValue, -1]) + result.push([[key, moveOutValue], -1]) } // Since we removed an element that was before or in the topK @@ -142,7 +154,7 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< const moveInIndex = topKEnd - 1 if (moveInIndex < sortedValues.length) { const moveInValue = sortedValues[moveInIndex] - this.#indexOut.addValue(key, [moveInValue, 1]) + result.push([[key, moveInValue], 1]) } } } else { @@ -195,262 +207,10 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< const index = binarySearch(sortedValues, indexedValue(value, ''), (a, b) => this.#comparator(getValue(a), getValue(b)), ) - const [removedElement] = sortedValues.splice(index, 1) + const [removedElement] = sortedValues.splice(index, 1) // O(n) ... this.#sortedValues.set(key, sortedValues) return [removedElement, index] } - - runn(): void { - const keysTodo = new Set() - - for (const message of this.inputMessages()) { - for (const [item, multiplicity] of message.getInner()) { - const [key, value] = item - this.#index.addValue(key, [value, multiplicity]) - keysTodo.add(key) - } - } - - const result: [[K, [V1, string]], number][] = [] - - for (const key of keysTodo) { - const curr = this.#index.get(key) - const currOut = this.#indexOut.get(key) - - // Sort the current values - const consolidated = new MultiSet(curr).consolidate() - const sortedValues = consolidated - .getInner() - .sort((a, b) => this.#comparator(a[0] as V1, b[0] as V1)) - .slice(this.#offset, this.#offset + this.#limit) - - // Create a map for quick value lookup with pre-stringified keys - const currValueMap = new Map() - const prevOutputMap = new Map() - - // Pre-stringify all values once - const valueKeys: string[] = [] - const valueToKey = new Map() - - // Process current values - for (const [value, multiplicity] of sortedValues) { - if (multiplicity > 0) { - // Only stringify each value once and store the result - let valueKey = valueToKey.get(value as V1) - if (!valueKey) { - valueKey = hash(value) - valueToKey.set(value as V1, valueKey) - valueKeys.push(valueKey) - } - currValueMap.set(valueKey, value as V1) - } - } - - // Process previous output values - for (const [[value, index], multiplicity] of currOut) { - if (multiplicity > 0) { - // Only stringify each value once and store the result - let valueKey = valueToKey.get(value as V1) - if (!valueKey) { - valueKey = hash(value) - valueToKey.set(value as V1, valueKey) - } - prevOutputMap.set(valueKey, [value as V1, index as string]) - } - } - - // Find values that are no longer in the result - for (const [valueKey, [value, index]] of prevOutputMap.entries()) { - if (!currValueMap.has(valueKey)) { - // Value is no longer in the result, remove it - result.push([[key, [value, index]], -1]) - this.#indexOut.addValue(key, [[value, index], -1]) - } - } - - // Process the sorted values and assign fractional indices - let prevIndex: string | null = null - let nextIndex: string | null = null - const newIndices = new Map() - - // First pass: reuse existing indices for values that haven't moved - for (let i = 0; i < sortedValues.length; i++) { - const [value, _multiplicity] = sortedValues[i] - // Use the pre-computed valueKey - const valueKey = valueToKey.get(value as V1) as string - - // Check if this value already has an index - const existingEntry = prevOutputMap.get(valueKey) - - if (existingEntry) { - const [_, existingIndex] = existingEntry - - // Check if we need to update the index - if (i === 0) { - // First element - prevIndex = null - nextIndex = - i + 1 < sortedValues.length - ? newIndices.get( - valueToKey.get(sortedValues[i + 1][0] as V1) as string, - ) || null - : null - - if (nextIndex !== null && existingIndex >= nextIndex) { - // Need to update index - const newIndex = generateKeyBetween(prevIndex, nextIndex) - newIndices.set(valueKey, newIndex) - } else { - // Can reuse existing index - newIndices.set(valueKey, existingIndex) - } - } else if (i === sortedValues.length - 1) { - // Last element - prevIndex = - newIndices.get( - valueToKey.get(sortedValues[i - 1][0] as V1) as string, - ) || null - nextIndex = null - - if (prevIndex !== null && existingIndex <= prevIndex) { - // Need to update index - const newIndex = generateKeyBetween(prevIndex, nextIndex) - newIndices.set(valueKey, newIndex) - } else { - // Can reuse existing index - newIndices.set(valueKey, existingIndex) - } - } else { - // Middle element - prevIndex = - newIndices.get( - valueToKey.get(sortedValues[i - 1][0] as V1) as string, - ) || null - nextIndex = - i + 1 < sortedValues.length - ? newIndices.get( - valueToKey.get(sortedValues[i + 1][0] as V1) as string, - ) || null - : null - - if ( - (prevIndex !== null && existingIndex <= prevIndex) || - (nextIndex !== null && existingIndex >= nextIndex) - ) { - // Need to update index - const newIndex = generateKeyBetween(prevIndex, nextIndex) - newIndices.set(valueKey, newIndex) - } else { - // Can reuse existing index - newIndices.set(valueKey, existingIndex) - } - } - } - } - - // Pre-compute valid previous and next indices for each position - // This avoids repeated lookups during index generation - const validPrevIndices: (string | null)[] = new Array(sortedValues.length) - const validNextIndices: (string | null)[] = new Array(sortedValues.length) - - // Initialize with null values - validPrevIndices.fill(null) - validNextIndices.fill(null) - - // First element has no previous - validPrevIndices[0] = null - - // Last element has no next - validNextIndices[sortedValues.length - 1] = null - - // Compute next valid indices (working forward) - let lastValidNextIndex: string | null = null - for (let i = sortedValues.length - 1; i >= 0; i--) { - const valueKey = valueToKey.get(sortedValues[i][0] as V1) as string - - // Set the next index for the current position - validNextIndices[i] = lastValidNextIndex - - // Update lastValidNextIndex if this element has an index - if (newIndices.has(valueKey)) { - lastValidNextIndex = newIndices.get(valueKey) || null - } else { - const existingEntry = prevOutputMap.get(valueKey) - if (existingEntry) { - lastValidNextIndex = existingEntry[1] - } - } - } - - // Compute previous valid indices (working backward) - let lastValidPrevIndex: string | null = null - for (let i = 0; i < sortedValues.length; i++) { - const valueKey = valueToKey.get(sortedValues[i][0] as V1) as string - - // Set the previous index for the current position - validPrevIndices[i] = lastValidPrevIndex - - // Update lastValidPrevIndex if this element has an index - if (newIndices.has(valueKey)) { - lastValidPrevIndex = newIndices.get(valueKey) || null - } else { - const existingEntry = prevOutputMap.get(valueKey) - if (existingEntry) { - lastValidPrevIndex = existingEntry[1] - } - } - } - - // Second pass: assign new indices for values that don't have one or need to be updated - for (let i = 0; i < sortedValues.length; i++) { - const [value, _multiplicity] = sortedValues[i] - // Use the pre-computed valueKey - const valueKey = valueToKey.get(value as V1) as string - - if (!newIndices.has(valueKey)) { - // This value doesn't have an index yet, use pre-computed indices - prevIndex = validPrevIndices[i] - nextIndex = validNextIndices[i] - - const newIndex = generateKeyBetween(prevIndex, nextIndex) - newIndices.set(valueKey, newIndex) - - // Update validPrevIndices for subsequent elements - if (i < sortedValues.length - 1 && validPrevIndices[i + 1] === null) { - validPrevIndices[i + 1] = newIndex - } - } - } - - // Now create the output with the new indices - for (let i = 0; i < sortedValues.length; i++) { - const [value, _multiplicity] = sortedValues[i] - // Use the pre-computed valueKey - const valueKey = valueToKey.get(value as V1) as string - const index = newIndices.get(valueKey)! - - // Check if this is a new value or if the index has changed - const existingEntry = prevOutputMap.get(valueKey) - - if (!existingEntry) { - // New value - result.push([[key, [value as V1, index]], 1]) - this.#indexOut.addValue(key, [[value as V1, index], 1]) - } else if (existingEntry[1] !== index) { - // Index has changed, remove old entry and add new one - result.push([[key, existingEntry], -1]) - result.push([[key, [value as V1, index]], 1]) - this.#indexOut.addValue(key, [existingEntry, -1]) - this.#indexOut.addValue(key, [[value as V1, index], 1]) - } - // If the value exists and the index hasn't changed, do nothing - } - } - - if (result.length > 0) { - this.output.sendData(new MultiSet(result)) - } - } } /** diff --git a/packages/d2mini/src/utils.ts b/packages/d2mini/src/utils.ts index 3067ed5..22a6bec 100644 --- a/packages/d2mini/src/utils.ts +++ b/packages/d2mini/src/utils.ts @@ -110,4 +110,4 @@ export function binarySearch( } } return low -} \ No newline at end of file +} From 9fc3a2ca7029b35799be528dfdd0614b4528c19c Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 19 Jun 2025 10:32:08 +0200 Subject: [PATCH 03/20] Fix tests to not assume particular fractional indices as those are implementation details. --- .../orderByWithFractionalIndex.test.ts | 116 ++++++++++-------- .../operators/topKWithFractionalIndex.test.ts | 50 ++++++-- 2 files changed, 102 insertions(+), 64 deletions(-) diff --git a/packages/d2mini/tests/operators/orderByWithFractionalIndex.test.ts b/packages/d2mini/tests/operators/orderByWithFractionalIndex.test.ts index 16d139a..67ba1e6 100644 --- a/packages/d2mini/tests/operators/orderByWithFractionalIndex.test.ts +++ b/packages/d2mini/tests/operators/orderByWithFractionalIndex.test.ts @@ -7,6 +7,12 @@ import { } from '../../src/operators/index.js' import { KeyValue } from '../../src/types.js' +const stripFractionalIndex = ([[key, [value, _index]], multiplicity]) => [ + key, + value, + multiplicity, +] + describe('Operators', () => { describe('OrderByWithFractionalIndex operation', () => { test('initial results with default comparator', () => { @@ -46,14 +52,14 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const result = latestMessage.getInner() - const sortedResult = sortByKeyAndIndex(result) + const sortedResult = sortByKeyAndIndex(result).map(stripFractionalIndex) expect(sortedResult).toEqual([ - [['key1', [{ id: 1, value: 'a' }, 'a0']], 1], - [['key3', [{ id: 3, value: 'b' }, 'a1']], 1], - [['key5', [{ id: 5, value: 'c' }, 'a2']], 1], - [['key4', [{ id: 4, value: 'y' }, 'a3']], 1], - [['key2', [{ id: 2, value: 'z' }, 'a4']], 1], + ['key1', { id: 1, value: 'a' }, 1], + ['key3', { id: 3, value: 'b' }, 1], + ['key5', { id: 5, value: 'c' }, 1], + ['key4', { id: 4, value: 'y' }, 1], + ['key2', { id: 2, value: 'z' }, 1], ]) }) @@ -96,14 +102,14 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const result = latestMessage.getInner() - const sortedResult = sortByKeyAndIndex(result) + const sortedResult = sortByKeyAndIndex(result).map(stripFractionalIndex) expect(sortedResult).toEqual([ - [['key2', [{ id: 2, value: 'z' }, 'a0']], 1], - [['key4', [{ id: 4, value: 'y' }, 'a1']], 1], - [['key5', [{ id: 5, value: 'c' }, 'a2']], 1], - [['key3', [{ id: 3, value: 'b' }, 'a3']], 1], - [['key1', [{ id: 1, value: 'a' }, 'a4']], 1], + ['key2', { id: 2, value: 'z' }, 1], + ['key4', { id: 4, value: 'y' }, 1], + ['key5', { id: 5, value: 'c' }, 1], + ['key3', { id: 3, value: 'b' }, 1], + ['key1', { id: 1, value: 'a' }, 1], ]) }) @@ -144,12 +150,12 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const result = latestMessage.getInner() - const sortedResult = sortByKeyAndIndex(result) + const sortedResult = sortByKeyAndIndex(result).map(stripFractionalIndex) expect(sortedResult).toEqual([ - [['key1', [{ id: 1, value: 'a' }, 'a0']], 1], - [['key3', [{ id: 3, value: 'b' }, 'a1']], 1], - [['key5', [{ id: 5, value: 'c' }, 'a2']], 1], + ['key1', { id: 1, value: 'a' }, 1], + ['key3', { id: 3, value: 'b' }, 1], + ['key5', { id: 5, value: 'c' }, 1], ]) }) @@ -193,11 +199,11 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const result = latestMessage.getInner() - const sortedResult = sortByKeyAndIndex(result) + const sortedResult = sortByKeyAndIndex(result).map(stripFractionalIndex) expect(sortedResult).toEqual([ - [['key5', [{ id: 5, value: 'c' }, 'a0']], 1], - [['key4', [{ id: 4, value: 'y' }, 'a1']], 1], + ['key5', { id: 5, value: 'c' }, 1], + ['key4', { id: 4, value: 'y' }, 1], ]) }) @@ -238,14 +244,14 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const result = latestMessage.getInner() - const sortedResult = sortByKeyAndIndex(result) + const sortedResult = sortByKeyAndIndex(result).map(stripFractionalIndex) expect(sortedResult).toEqual([ - [['key1', [{ id: 1, value: 'a' }, 'a0']], 1], - [['key2', [{ id: 2, value: 'b' }, 'a1']], 1], - [['key3', [{ id: 3, value: 'c' }, 'a2']], 1], - [['key4', [{ id: 4, value: 'd' }, 'a3']], 1], - [['key5', [{ id: 5, value: 'e' }, 'a4']], 1], + ['key1', { id: 1, value: 'a' }, 1], + ['key2', { id: 2, value: 'b' }, 1], + ['key3', { id: 3, value: 'c' }, 1], + ['key4', { id: 4, value: 'd' }, 1], + ['key5', { id: 5, value: 'e' }, 1], ]) }) @@ -284,12 +290,13 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const initialResult = latestMessage.getInner() - const sortedInitialResult = sortByKeyAndIndex(initialResult) + const sortedInitialResult = + sortByKeyAndIndex(initialResult).map(stripFractionalIndex) expect(sortedInitialResult).toEqual([ - [['key1', [{ id: 1, value: 'a' }, 'a0']], 1], - [['key2', [{ id: 2, value: 'b' }, 'a1']], 1], - [['key3', [{ id: 3, value: 'c' }, 'a2']], 1], + ['key1', { id: 1, value: 'a' }, 1], + ['key2', { id: 2, value: 'b' }, 1], + ['key3', { id: 3, value: 'c' }, 1], ]) // Add a new row that should be included in the top 3 @@ -303,12 +310,12 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const result = latestMessage.getInner() - const sortedResult = sortByKeyAndIndex(result) + const sortedResult = sortByKeyAndIndex(result).map(stripFractionalIndex) expect(sortedResult).toEqual([ // We dont get key1 as its not changed or moved - [['key4', [{ id: 4, value: 'aa' }, 'a0V']], 1], // New row - [['key3', [{ id: 3, value: 'c' }, 'a2']], -1], // key3 is removed as its moved out of top 3 + ['key4', { id: 4, value: 'aa' }, 1], // New row + ['key3', { id: 3, value: 'c' }, -1], // key3 is removed as its moved out of top 3 ]) }) @@ -348,12 +355,13 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const initialResult = latestMessage.getInner() - const sortedInitialResult = sortByKeyAndIndex(initialResult) + const sortedInitialResult = + sortByKeyAndIndex(initialResult).map(stripFractionalIndex) expect(sortedInitialResult).toEqual([ - [['key1', [{ id: 1, value: 'a' }, 'a0']], 1], - [['key2', [{ id: 2, value: 'b' }, 'a1']], 1], - [['key3', [{ id: 3, value: 'c' }, 'a2']], 1], + ['key1', { id: 1, value: 'a' }, 1], + ['key2', { id: 2, value: 'b' }, 1], + ['key3', { id: 3, value: 'c' }, 1], ]) // Remove a row that was in the top 3 @@ -367,13 +375,13 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const result = latestMessage.getInner() - const sortedResult = sortByKeyAndIndex(result) + const sortedResult = sortByKeyAndIndex(result).map(stripFractionalIndex) expect(sortedResult).toEqual([ // key1 is removed - [['key1', [{ id: 1, value: 'a' }, 'a0']], -1], + ['key1', { id: 1, value: 'a' }, -1], // key4 is moved into the top 3 - [['key4', [{ id: 4, value: 'd' }, 'a3']], 1], + ['key4', { id: 4, value: 'd' }, 1], ]) }) @@ -413,12 +421,13 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const initialResult = latestMessage.getInner() - const sortedInitialResult = sortByKeyAndIndex(initialResult) + const sortedInitialResult = + sortByKeyAndIndex(initialResult).map(stripFractionalIndex) expect(sortedInitialResult).toEqual([ - [['key1', [{ id: 1, value: 'a' }, 'a0']], 1], - [['key3', [{ id: 3, value: 'b' }, 'a1']], 1], - [['key2', [{ id: 2, value: 'c' }, 'a2']], 1], + ['key1', { id: 1, value: 'a' }, 1], + ['key3', { id: 3, value: 'b' }, 1], + ['key2', { id: 2, value: 'c' }, 1], ]) // Modify an existing row by removing it and adding a new version @@ -433,11 +442,11 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const result = latestMessage.getInner() - const sortedResult = sortByKeyAndIndex(result) + const sortedResult = sortByKeyAndIndex(result).map(stripFractionalIndex) expect(sortedResult).toEqual([ - [['key2', [{ id: 2, value: 'c' }, 'a2']], -1], // removed as out of top 3 - [['key4', [{ id: 4, value: 'd' }, 'a2']], 1], // key4 is moved up + ['key2', { id: 2, value: 'c' }, -1], // removed as out of top 3 + ['key4', { id: 4, value: 'd' }, 1], // key4 is moved up ]) }) }) @@ -450,23 +459,24 @@ function sortByKeyAndIndex(results: any[]) { return [...results] .sort( ( - [[_aKey, [_aValue, aIndex]], aMultiplicity], - [[_bKey, [_bValue, bIndex]], bMultiplicity], + [[_aKey, [_aValue, _aIndex]], aMultiplicity], + [[_bKey, [_bValue, _bIndex]], bMultiplicity], ) => aMultiplicity - bMultiplicity, ) .sort( ( - [[aKey, [_aValue, aIndex]], _aMultiplicity], - [[bKey, [_bValue, bIndex]], _bMultiplicity], + [[aKey, [_aValue, _aIndex]], _aMultiplicity], + [[bKey, [_bValue, _bIndex]], _bMultiplicity], ) => aKey - bKey, ) .sort( ( - [[aKey, [_aValue, aIndex]], _aMultiplicity], - [[bKey, [_bValue, bIndex]], _bMultiplicity], + [[_aKey, [_aValue, aIndex]], _aMultiplicity], + [[_bKey, [_bValue, bIndex]], _bMultiplicity], ) => { // lexically compare the index - return aIndex.localeCompare(bIndex) + //return aIndex.localeCompare(bIndex) + return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 }, ) } diff --git a/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts b/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts index 8e7291b..f1e9fa1 100644 --- a/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts +++ b/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts @@ -236,7 +236,7 @@ describe('Operators', () => { expect(addition?.[0][1][0].id).toBe(6) // 'c+' has id 6 // The new element reuses the index of the removed element - expect(addition?.[0][1][1]).toBe(removal?.[0][1][1]) + //expect(addition?.[0][1][1]).toBe(removal?.[0][1][1]) // Reconstruct the current state by applying the changes const currentState = new Map() @@ -256,12 +256,26 @@ describe('Operators', () => { } // Convert to array for lexicographic order check - const currentStateArray = Array.from(currentState.values()).map( - ([value, index]) => [[null, [value, index]], 1], - ) + const stateArray = Array.from(currentState.values()) + const currentStateArray = stateArray.map(([value, index]) => [ + [null, [value, index]], + 1, + ]) // Check that indices are still in lexicographic order after the changes expect(checkLexicographicOrder(currentStateArray)).toBe(true) + + // expect the array to be the values with IDs 2, 3, 6 in that order + const compareFractionalIndex = (a, b) => + a[1] < b[1] ? -1 : a[1] > b[1] ? 1 : 0 + const sortedResult = stateArray + .sort(compareFractionalIndex) + .map(([value, _]) => value) + expect(sortedResult).toEqual([ + { id: 2, value: 'b' }, + { id: 3, value: 'c' }, + { id: 6, value: 'c+' }, + ]) }) it('should handle elements moving positions correctly', () => { @@ -319,8 +333,6 @@ describe('Operators', () => { // We should only emit as many changes as we received // We received 4 changes (2 additions, 2 removals) - // We should emit at most 4 changes - expect(changes.length).toBeLessThanOrEqual(4) expect(changes.length).toBe(4) // 2 removals + 2 additions // Find the removals and additions @@ -358,8 +370,8 @@ describe('Operators', () => { ) // The elements reuse their indices - expect(bPlusAddition?.[0][1][1]).toBe(bRemoval?.[0][1][1]) - expect(dPlusAddition?.[0][1][1]).toBe(dRemoval?.[0][1][1]) + //expect(bPlusAddition?.[0][1][1]).toBe(bRemoval?.[0][1][1]) + //expect(dPlusAddition?.[0][1][1]).toBe(dRemoval?.[0][1][1]) // Check that we only emitted changes for the elements that moved const changedIds = new Set() @@ -388,12 +400,28 @@ describe('Operators', () => { } // Convert to array for lexicographic order check - const currentStateArray = Array.from(currentState.values()).map( - ([value, index]) => [[null, [value, index]], 1], - ) + const stateArray = Array.from(currentState.values()) + const currentStateArray = stateArray.map(([value, index]) => [ + [null, [value, index]], + 1, + ]) // Check that indices are still in lexicographic order after the changes expect(checkLexicographicOrder(currentStateArray)).toBe(true) + + // Expect the array to be the elements with IDs 1, 4, 3, 2, 5 + const compareFractionalIndex = (a, b) => + a[1] < b[1] ? -1 : a[1] > b[1] ? 1 : 0 + const sortedResult = stateArray + .sort(compareFractionalIndex) + .map(([value, _]) => value) + expect(sortedResult).toEqual([ + { id: 1, value: 'a' }, + { id: 4, value: 'b+' }, + { id: 3, value: 'c' }, + { id: 2, value: 'd+' }, + { id: 5, value: 'e' }, + ]) }) it('should maintain lexicographic order through multiple updates', () => { From 4fef94802d927e18384e5a030d3f5107112d2076 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 19 Jun 2025 13:37:33 +0200 Subject: [PATCH 04/20] Introduce a TopK data structure --- .../src/operators/topKWithFractionalIndex.ts | 284 ++++++++++-------- 1 file changed, 159 insertions(+), 125 deletions(-) diff --git a/packages/d2mini/src/operators/topKWithFractionalIndex.ts b/packages/d2mini/src/operators/topKWithFractionalIndex.ts index c816d2c..d8c71cf 100644 --- a/packages/d2mini/src/operators/topKWithFractionalIndex.ts +++ b/packages/d2mini/src/operators/topKWithFractionalIndex.ts @@ -25,6 +25,147 @@ const getValue = (indexedValue: IndexedValue): V => indexedValue[0] const getIndex = (indexedValue: IndexedValue): FractionalIndex => indexedValue[1] +//////////// + +type TopKChanges = { + /** Indicates which element moves into the topK (if any) */ + moveIn: IndexedValue | null + /** Indicates which element moves out of the topK (if any) */ + moveOut: IndexedValue | null +} + +interface TopK { + insert(value: V): TopKChanges + delete(value: V): TopKChanges +} + +/** + * Implementation of a topK data structure. + * Uses a sorted array internally to store the values and keeps a topK window over that array. + * Inserts and deletes are O(n) operations because worst case an element is inserted/deleted + * at the start of the array which causes all the elements to shift to the right/left. + */ +class TopKArray implements TopK { + #sortedValues: Array> = [] + #comparator: (a: V, b: V) => number + #topKStart: number + #topKEnd: number + + constructor( + offset: number, + limit: number, + comparator: (a: V, b: V) => number, + ) { + this.#topKStart = offset + this.#topKEnd = offset + limit + this.#comparator = comparator + } + + insert(value: V): TopKChanges { + let result: TopKChanges = { moveIn: null, moveOut: null } + + // Lookup insert position + const index = this.#findIndex(value) + // Generate fractional index based on the fractional indices of the elements before and after it + const indexBefore = + index === 0 ? null : getIndex(this.#sortedValues[index - 1]) + const indexAfter = + index === this.#sortedValues.length + ? null + : getIndex(this.#sortedValues[index]) + const fractionalIndex = generateKeyBetween(indexBefore, indexAfter) + + // Insert the value at the correct position + const val = indexedValue(value, fractionalIndex) + // Splice is O(n) where n = all elements in the collection (i.e. n >= k) ! + this.#sortedValues.splice(index, 0, val) + + // Check if the topK changed + if (index < this.#topKEnd) { + // The inserted element is either before the top K or within the top K + // If it is before the top K then it moves the element that was right before the topK into the topK + // If it is within the top K then the inserted element moves into the top K + // In both cases the last element of the old top K now moves out of the top K + const moveInIndex = Math.max(index, this.#topKStart) + if (moveInIndex < this.#sortedValues.length) { + // We actually have a topK + // because in some cases there may not be enough elements in the array to reach the start of the topK + // e.g. [1, 2, 3] with K = 2 and offset = 3 does not have a topK + result.moveIn = this.#sortedValues[moveInIndex] + + // We need to remove the element that falls out of the top K + // The element that falls out of the top K has shifted one to the right + // because of the element we inserted, so we find it at index topKEnd + if (this.#topKEnd < this.#sortedValues.length) { + result.moveOut = this.#sortedValues[this.#topKEnd] + } + } + } + + return result + } + + /** + * Deletes a value that may or may not be in the topK. + * IMPORTANT: this assumes that the value is present in the collection + * if it's not the case it will remove the element + * that is on the position where the provided `value` would be. + */ + delete(value: V): TopKChanges { + let result: TopKChanges = { moveIn: null, moveOut: null } + + // Lookup delete position + const index = this.#findIndex(value) + // Remove the value at that position + const [removedElem] = this.#sortedValues.splice(index, 1) + + // Check if the topK changed + if (index < this.#topKEnd) { + // The removed element is either before the top K or within the top K + // If it is before the top K then the first element of the topK moves out of the topK + // If it is within the top K then the removed element moves out of the topK + result.moveOut = removedElem + if (index < this.#topKStart) { + // The removed element is before the topK + // so actually, the first element of the topK moves out of the topK + // and not the element that we removed + // The first element of the topK is now at index topKStart - 1 + // since we removed an element before the topK + const moveOutIndex = this.#topKStart - 1 + if (moveOutIndex < this.#sortedValues.length) { + result.moveOut = this.#sortedValues[moveOutIndex] + } else { + // No value is moving out of the topK + // because there are no elements in the topK + result.moveOut = null + } + } + + // Since we removed an element that was before or in the topK + // the first element after the topK moved one position to the left + // and thus falls into the topK now + const moveInIndex = this.#topKEnd - 1 + if (moveInIndex < this.#sortedValues.length) { + result.moveIn = this.#sortedValues[moveInIndex] + } + } + + return result + } + + // TODO: see if there is a way to refactor the code for insert and delete in the topK above + // because they are very similar, one is shifting the topK window to the left and the other is shifting it to the right + // so i have the feeling there is a common pattern here and we can implement both cases using that pattern + + #findIndex(value: V): number { + return binarySearch(this.#sortedValues, indexedValue(value, ''), (a, b) => + this.#comparator(getValue(a), getValue(b)), + ) + } +} + +/////////// + /** * Operator for fractional indexed topK operations * This operator maintains fractional indices for sorted elements @@ -35,12 +176,12 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< [K, IndexedValue] > { #index = new Index() - #comparator: (a: V1, b: V1) => number - #limit: number - #offset: number - /** A map of keys to a sorted array of values for those keys */ - #sortedValues: Map>> = new Map() + /** + * topK data structure that supports insertions and deletions + * and returns changes to the topK. + */ + #topK: TopKArray constructor( id: number, @@ -50,9 +191,9 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< options: TopKWithFractionalIndexOptions, ) { super(id, inputA, output) - this.#comparator = comparator - this.#limit = options.limit ?? Infinity - this.#offset = options.offset ?? 0 + const limit = options.limit ?? Infinity + const offset = options.offset ?? 0 + this.#topK = new TopKArray(offset, limit, comparator) } run(): void { @@ -79,137 +220,30 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< this.#index.addValue(key, [value, multiplicity]) const newMultiplicity = this.#index.getMultiplicity(key, value) + let res: TopKChanges = { moveIn: null, moveOut: null } if (oldMultiplicity <= 0 && newMultiplicity > 0) { // The value was invisible but should now be visible // Need to insert it into the array of sorted values - const index = this.insert(key, value) - - // Now check if the top K changed - const topKStart = this.#offset - const topKEnd = this.#offset + this.#limit - - if (index < topKEnd) { - // The inserted element is either before the top K or within the top K - // If it is before the top K then it moves the the element that was right before the topK into the topK - // If it is within the top K then the inserted element moves into the top K - // In both cases the last element of the old top K now moves out of the top K - const moveInIndex = Math.max(index, topKStart) - const sortedValues = this.#sortedValues.get(key) ?? [] - if (moveInIndex < sortedValues.length) { - // We actually have a topK - // because in some cases there may not be enough elements in the array to reach the start of the topK - // e.g. [1, 2, 3] with K = 2 and offset = 3 does not have a topK - const moveInValue = sortedValues[moveInIndex] - // signal the move in with a +1 diff in the output - result.push([[key, moveInValue], 1]) - - // We need to remove the element that falls out of the top K - // The element that falls out of the top K has shifted one to the right - // because of the element we inserted, so we find it at index topKEnd - if (topKEnd < sortedValues.length) { - const valueOut = sortedValues[topKEnd] - result.push([[key, valueOut], -1]) - } - } - } + res = this.#topK.insert(value) } else if (oldMultiplicity > 0 && newMultiplicity <= 0) { // The value was visible but should now be invisible // Need to remove it from the array of sorted values - const [removedElem, removedIndex] = this.remove(key, value) - - // Now check if the top K changed - const topKStart = this.#offset - const topKEnd = this.#offset + this.#limit - - if (removedIndex < topKEnd) { - // The removed element is either before the top K or within the top K - // If it is before the top K then the first element of the topK moves out of the topK - // If it is within the top K then the removed element moves out of the topK - let moveOutValue: IndexedValue | null = removedElem - const sortedValues = this.#sortedValues.get(key) ?? [] - if (removedIndex < topKStart) { - // The removed element is before the topK - // so actually, the first element of the topK moves out of the topK - // and not the element that we removed - // The first element of the topK is now and topKStart - 1 - // since we removed an element before the topK - const moveOutIndex = topKStart - 1 - if (moveOutIndex < sortedValues.length) { - moveOutValue = sortedValues[moveOutIndex] - } else { - // No value is moving out of the topK - // because there are no elements in the topK - moveOutValue = null - } - } - - if (moveOutValue) { - // Signal the move out with a -1 diff in the output index - result.push([[key, moveOutValue], -1]) - } - - // Since we removed an element that was before or in the topK - // the first element after the topK moved one position to the left - // and hence now falls into the topK - const moveInIndex = topKEnd - 1 - if (moveInIndex < sortedValues.length) { - const moveInValue = sortedValues[moveInIndex] - result.push([[key, moveInValue], 1]) - } - } + res = this.#topK.delete(value) } else { // The value was invisible and it remains invisible // or it was visible and remains visible // so it doesn't affect the topK - return } - } - - // TODO: see if there is a way to refactor the code for insertions and removals in the topK above - // because they are very similar, one is shifting the topK window to the left and the other is shifting it to the right - // so i have the feeling there is a common pattern here and we can implement both cases - // on top of that pattern - /** - * Inserts a value at the correct position - * into the sorted array of values for the given key. - * Returns the index of the newly inserted value. - */ - insert(key: K, value: V1): number { - // Lookup insert position - const sortedValues = this.#sortedValues.get(key) ?? [] - const index = binarySearch(sortedValues, indexedValue(value, ''), (a, b) => - this.#comparator(getValue(a), getValue(b)), - ) - - // Generate fractional index based on the fractional indices of the elements before and after it - const indexBefore = index === 0 ? null : getIndex(sortedValues[index - 1]) - const indexAfter = - index === sortedValues.length ? null : getIndex(sortedValues[index]) - const fractionalIndex = generateKeyBetween(indexBefore, indexAfter) + if (res.moveIn) { + result.push([[key, res.moveIn], 1]) + } - // Insert the value at the correct position - const val = indexedValue(value, fractionalIndex) - sortedValues.splice(index, 0, val) // O(n) ... - this.#sortedValues.set(key, sortedValues) - return index - } + if (res.moveOut) { + result.push([[key, res.moveOut], -1]) + } - /** - * Removes a value from the sorted array of values for the given key. - * Returns the index of the removed value. - * IMPORTANT: this assumes that the value is present in the array - * if it's not the case it will remove the element - * that is on the position where the provided `value` would be. - */ - remove(key: K, value: V1): [IndexedValue, number] { - const sortedValues = this.#sortedValues.get(key) ?? [] - const index = binarySearch(sortedValues, indexedValue(value, ''), (a, b) => - this.#comparator(getValue(a), getValue(b)), - ) - const [removedElement] = sortedValues.splice(index, 1) // O(n) ... - this.#sortedValues.set(key, sortedValues) - return [removedElement, index] + return } } From de145fd2a8905a895be6013bad947fdd10221bc5 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 19 Jun 2025 17:31:05 +0200 Subject: [PATCH 05/20] B+ tree variant of topKWithFractionalIndex --- packages/d2mini/package.json | 3 +- .../src/operators/topKWithFractionalIndex.ts | 274 ++++++++++++++++-- .../operators/topKWithFractionalIndex.test.ts | 130 ++++++++- pnpm-lock.yaml | 8 + 4 files changed, 385 insertions(+), 30 deletions(-) diff --git a/packages/d2mini/package.json b/packages/d2mini/package.json index bfe11b8..62c0109 100644 --- a/packages/d2mini/package.json +++ b/packages/d2mini/package.json @@ -50,6 +50,7 @@ }, "dependencies": { "fractional-indexing": "^3.2.0", - "murmurhash-js": "^1.0.0" + "murmurhash-js": "^1.0.0", + "sorted-btree": "^1.8.1" } } diff --git a/packages/d2mini/src/operators/topKWithFractionalIndex.ts b/packages/d2mini/src/operators/topKWithFractionalIndex.ts index d8c71cf..2b80436 100644 --- a/packages/d2mini/src/operators/topKWithFractionalIndex.ts +++ b/packages/d2mini/src/operators/topKWithFractionalIndex.ts @@ -8,25 +8,15 @@ import { StreamBuilder } from '../d2.js' import { MultiSet } from '../multiset.js' import { Index } from '../indexes.js' import { generateKeyBetween } from 'fractional-indexing' -import { binarySearch } from '../utils.js' +import { binarySearch, hash } from '../utils.js' +import BTree from 'sorted-btree' interface TopKWithFractionalIndexOptions { limit?: number offset?: number + useTree?: boolean } -type FractionalIndex = string -type IndexedValue = [V, FractionalIndex] -const indexedValue = (value: V, index: FractionalIndex): IndexedValue => [ - value, - index, -] -const getValue = (indexedValue: IndexedValue): V => indexedValue[0] -const getIndex = (indexedValue: IndexedValue): FractionalIndex => - indexedValue[1] - -//////////// - type TopKChanges = { /** Indicates which element moves into the topK (if any) */ moveIn: IndexedValue | null @@ -34,6 +24,10 @@ type TopKChanges = { moveOut: IndexedValue | null } +/** + * A topK data structure that supports insertions and deletions + * and returns changes to the topK. + */ interface TopK { insert(value: V): TopKChanges delete(value: V): TopKChanges @@ -164,7 +158,186 @@ class TopKArray implements TopK { } } -/////////// +/** + * Implementation of a topK data structure that uses a B+ tree. + * The tree allows for logarithmic time insertions and deletions. + */ +class TopKTree implements TopK { + #comparator: (a: V, b: V) => number + // topK is a window at position [topKStart, topKEnd[ + // i.e. `topKStart` is inclusive and `topKEnd` is exclusive + #topKStart: number + #topKEnd: number + + #tree: BTree> + #topKFirstElem: IndexedValue | null = null // inclusive + #topKLastElem: IndexedValue | null = null // inclusive + + constructor( + offset: number, + limit: number, + comparator: (a: V, b: V) => number, + ) { + this.#topKStart = offset + this.#topKEnd = offset + limit + this.#comparator = comparator + this.#tree = new BTree(undefined, comparator) + } + + /** + * Insert a *new* value. + * Ignores the value if it is already present. + */ + insert(value: V): TopKChanges { + let result: TopKChanges = { moveIn: null, moveOut: null } + + // Get the elements before and after the value + const [, indexedValueBefore] = this.#tree.nextLowerPair(value) ?? [ + null, + null, + ] + const [, indexedValueAfter] = this.#tree.nextHigherPair(value) ?? [ + null, + null, + ] + + const indexBefore = indexedValueBefore ? getIndex(indexedValueBefore) : null + const indexAfter = indexedValueAfter ? getIndex(indexedValueAfter) : null + + // Generate a fractional index for the value + // based on the fractional indices of the elements before and after it + const fractionalIndex = generateKeyBetween(indexBefore, indexAfter) + const insertedElem = indexedValue(value, fractionalIndex) + + // Insert the value into the tree + const inserted = this.#tree.set(value, insertedElem, false) + if (!inserted) { + // The value was already present in the tree + // ignore this insertions since we don't support overwrites! + return result + } + + if (this.#tree.size - 1 < this.#topKStart) { + // We don't have a topK yet + // so we don't need to do anything + return result + } + + if (this.#topKFirstElem) { + // We have a topK containing at least 1 element + if (this.#comparator(value, getValue(this.#topKFirstElem)) < 0) { + // The element was inserted before the topK + // so it moves the element that is right before the topK into the topK + const firstElem = getValue(this.#topKFirstElem) + const [, newFirstElem] = this.#tree.nextLowerPair(firstElem)! + this.#topKFirstElem = newFirstElem + result.moveIn = this.#topKFirstElem + } else if ( + !this.#topKLastElem || + this.#comparator(value, getValue(this.#topKLastElem)) < 0 + ) { + // The element was inserted within the topK + result.moveIn = insertedElem + } + + if ( + this.#topKLastElem && + this.#comparator(value, getValue(this.#topKLastElem)) < 0 + ) { + // The element was inserted before or within the topK + // the newly inserted element pushes the last element of the topK out of the topK + // so the one before that becomes the new last element of the topK + const lastElem = this.#topKLastElem + const lastValue = getValue(lastElem) + const [, newLastElem] = this.#tree.nextLowerPair(lastValue)! + this.#topKLastElem = newLastElem + result.moveOut = lastElem + } + } + + // If the tree has as many elements as the offset (i.e. #topKStart) + // then the insertion shifted the elements 1 position to the right + // and the last element in the tree is now the first element of the topK + if (this.#tree.size - 1 === this.#topKStart) { + const topKFirstKey = this.#tree.maxKey()! + this.#topKFirstElem = this.#tree.get(topKFirstKey)! + result.moveIn = this.#topKFirstElem + } + + // By inserting this new element we now have a complete topK + // store the last element of the topK + if (this.#tree.size === this.#topKEnd) { + const topKLastKey = this.#tree.maxKey()! + this.#topKLastElem = this.#tree.get(topKLastKey)! + } + + return result + } + + delete(value: V): TopKChanges { + let result: TopKChanges = { moveIn: null, moveOut: null } + + const deletedElem = this.#tree.get(value) + const deleted = this.#tree.delete(value) + if (!deleted) { + return result + } + + if (!this.#topKFirstElem) { + // We didn't have a topK before the delete + // so we still can't have a topK after the delete + return result + } + + if (this.#comparator(value, getValue(this.#topKFirstElem)) < 0) { + // We deleted an element that was before the topK + // so the topK has shifted one position to the left + + // the old first element moves out of the topK + result.moveOut = this.#topKFirstElem + // the element that was right after the first element of the topK + // is now the new first element of the topK + const firstElem = getValue(this.#topKFirstElem) + const [, newFirstElem] = this.#tree.nextHigherPair(firstElem) ?? [ + null, + null, + ] + this.#topKFirstElem = newFirstElem + } else if ( + !this.#topKLastElem || + // TODO: if on equal order the element is inserted *after* the already existing one + // then this check should become < 0 + this.#comparator(value, getValue(this.#topKLastElem)) <= 0 + ) { + // The element we deleted was within the topK + // so we need to signal that that element is no longer in the topK + result.moveOut = deletedElem! + } + + if ( + this.#topKLastElem && + // TODO: if on equal order the element is inserted *after* the already existing one + // then this check should become < 0 + this.#comparator(value, getValue(this.#topKLastElem)) <= 0 + ) { + // The element we deleted was before or within the topK + // So the first element after the topK moved one position to the left + // and thus falls into the topK now + const lastElem = this.#topKLastElem + const lastValue = getValue(lastElem) + const [, newLastElem] = this.#tree.nextHigherPair(lastValue) ?? [ + null, + null, + ] + this.#topKLastElem = newLastElem + if (newLastElem) { + result.moveIn = newLastElem + } + } + + return result + } +} /** * Operator for fractional indexed topK operations @@ -181,7 +354,7 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< * topK data structure that supports insertions and deletions * and returns changes to the topK. */ - #topK: TopKArray + #topK: TopK> constructor( id: number, @@ -193,7 +366,23 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< super(id, inputA, output) const limit = options.limit ?? Infinity const offset = options.offset ?? 0 - this.#topK = new TopKArray(offset, limit, comparator) + const compareTaggedValues = ( + a: HashTaggedValue, + b: HashTaggedValue, + ) => { + // First compare on the value + const valueComparison = comparator(getValue(a), getValue(b)) + if (valueComparison !== 0) { + return valueComparison + } + // If the values are equal, compare on the hash + const hashA = getHash(a) + const hashB = getHash(b) + return hashA < hashB ? -1 : hashA > hashB ? 1 : 0 + } + this.#topK = options.useTree + ? new TopKTree(offset, limit, compareTaggedValues) + : new TopKArray(offset, limit, compareTaggedValues) } run(): void { @@ -220,15 +409,17 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< this.#index.addValue(key, [value, multiplicity]) const newMultiplicity = this.#index.getMultiplicity(key, value) - let res: TopKChanges = { moveIn: null, moveOut: null } + let res: TopKChanges> = { moveIn: null, moveOut: null } if (oldMultiplicity <= 0 && newMultiplicity > 0) { // The value was invisible but should now be visible // Need to insert it into the array of sorted values - res = this.#topK.insert(value) + const taggedValue = tagValue(value) + res = this.#topK.insert(taggedValue) } else if (oldMultiplicity > 0 && newMultiplicity <= 0) { // The value was visible but should now be invisible // Need to remove it from the array of sorted values - res = this.#topK.delete(value) + const taggedValue = tagValue(value) + res = this.#topK.delete(taggedValue) } else { // The value was invisible and it remains invisible // or it was visible and remains visible @@ -236,11 +427,13 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< } if (res.moveIn) { - result.push([[key, res.moveIn], 1]) + const valueWithoutHash = mapValue(res.moveIn, untagValue) + result.push([[key, valueWithoutHash], 1]) } if (res.moveOut) { - result.push([[key, res.moveOut], -1]) + const valueWithoutHash = mapValue(res.moveOut, untagValue) + result.push([[key, valueWithoutHash], -1]) } return @@ -291,3 +484,42 @@ export function topKWithFractionalIndex< return output } } + +// Abstraction for fractionally indexed values +type FractionalIndex = string +type IndexedValue = [V, FractionalIndex] + +function indexedValue(value: V, index: FractionalIndex): IndexedValue { + return [value, index] +} + +function getValue(indexedValue: IndexedValue): V { + return indexedValue[0] +} + +function getIndex(indexedValue: IndexedValue): FractionalIndex { + return indexedValue[1] +} + +function mapValue( + value: IndexedValue, + f: (value: V) => W, +): IndexedValue { + return [f(getValue(value)), getIndex(value)] +} + +// Abstraction for values tagged with a hash +type Hash = string +type HashTaggedValue = [V, Hash] + +function tagValue(value: V): HashTaggedValue { + return [value, hash(value)] +} + +function untagValue(hashTaggedValue: HashTaggedValue): V { + return hashTaggedValue[0] +} + +function getHash(hashTaggedValue: HashTaggedValue): Hash { + return hashTaggedValue[1] +} \ No newline at end of file diff --git a/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts b/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts index f1e9fa1..415daca 100644 --- a/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts +++ b/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts @@ -23,7 +23,9 @@ function checkLexicographicOrder(results: any[]) { const nextIndex = sortedByValue[i + 1].index // Indices should be in lexicographic order - expect(currentIndex < nextIndex).toBe(true) + if (!(currentIndex < nextIndex)) { + return false + } } return true @@ -56,15 +58,24 @@ function verifyOrder(results: any[], expectedOrder: string[]) { expect(sortedByIndex).toEqual(expectedOrder) } +// Tests: +// - insert into array when there is not topK +// - insert into array when there is no topK and the element becomes the first element of the topK +// - insert into array when there is no topK and the element pushes the last element of the array into the topK +// - topK with limit 1 such that the first is also the last element + describe('Operators', () => { - describe('TopKWithFractionalIndex operation', () => { + describe.each([ + ['with array', { useTree: false }], + ['with B+ tree', { useTree: true }], + ])('TopKWithFractionalIndex operation %s', (_, options) => { it('should assign fractional indices to sorted elements', () => { const graph = new D2() const input = graph.newInput<[null, { id: number; value: string }]>() const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value)), + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), options), output((message) => { allMessages.push(message) }), @@ -160,6 +171,93 @@ describe('Operators', () => { expect(checkLexicographicOrder(currentStateArray)).toBe(true) }) + it('should support duplicate ordering keys', () => { + const graph = new D2() + const input = graph.newInput<[null, { id: number; value: string }]>() + const allMessages: any[] = [] + + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), options), + output((message) => { + allMessages.push(message) + }), + ) + + graph.finalize() + + // Initial data - a, b, c, d, e + input.sendData( + new MultiSet([ + [[null, { id: 1, value: 'a' }], 1], + [[null, { id: 2, value: 'b' }], 1], + [[null, { id: 3, value: 'c' }], 1], + [[null, { id: 4, value: 'd' }], 1], + [[null, { id: 5, value: 'e' }], 1], + ]), + ) + graph.run() + + // Initial result should have all elements with fractional indices + const initialResult = allMessages[0].getInner() + expect(initialResult.length).toBe(5) + + // Check that indices are in lexicographic order + expect(checkLexicographicOrder(initialResult)).toBe(true) + + // Store the initial indices for later comparison + const initialIndices = new Map() + for (const [[_, [value, index]]] of initialResult) { + initialIndices.set(value.id, index) + } + + // Now let's add a new element with a value that is already in there + input.sendData(new MultiSet([[[null, { id: 6, value: 'c' }], 1]])) + graph.run() + + // TODO: when using the tree variant we are not actually inserting it + // because they key already exists, so we ignore it + // and thus there is no message being emitted here + // --> FIX it by wrapping the value with its hash and comparing on the value first and then on the hash + + // Check the changes + const changes = allMessages[1].getInner() + + // We should only emit as many changes as we received + expect(changes.length).toBe(1) // 1 addition + + // Find the addition + const [addition] = changes + + // Check that we added { id: 6, value: 'c' } + expect(addition?.[0][1][0]).toEqual({ id: 6, value: 'c' }) + + // Reconstruct the current state by applying the changes + const currentState = new Map() + for (const [[_, [value, index]]] of initialResult) { + currentState.set(JSON.stringify(value), [value, index]) + } + + // Apply the changes + for (const [[_, [value, index]], multiplicity] of changes) { + if (multiplicity < 0) { + // Remove + currentState.delete(JSON.stringify(value)) + } else { + // Add + currentState.set(JSON.stringify(value), [value, index]) + } + } + + // Convert to array for lexicographic order check + const currentStateArray = Array.from(currentState.values()).map( + ([value, index]) => [[null, [value, index]], 1], + ) + + // Check that indices are still in lexicographic order after the changes + expect(checkLexicographicOrder(currentStateArray)).toBe(true) + expect(currentStateArray.length).toBe(6) + }) + it('should handle limit and offset correctly', () => { const graph = new D2() const input = graph.newInput<[null, { id: number; value: string }]>() @@ -169,6 +267,7 @@ describe('Operators', () => { topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { limit: 3, offset: 1, + ...options, }), output((message) => { allMessages.push(message) @@ -284,7 +383,10 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value)), + topKWithFractionalIndex( + (a, b) => a.value.localeCompare(b.value), + options, + ), output((message) => { allMessages.push(message) }), @@ -430,7 +532,10 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value)), + topKWithFractionalIndex( + (a, b) => a.value.localeCompare(b.value), + options, + ), output((message) => { allMessages.push(message) }), @@ -587,7 +692,10 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value)), + topKWithFractionalIndex( + (a, b) => a.value.localeCompare(b.value), + options, + ), output((message) => { allMessages.push(message) }), @@ -716,7 +824,10 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value)), + topKWithFractionalIndex( + (a, b) => a.value.localeCompare(b.value), + options, + ), output((message) => { allMessages.push(message) }), @@ -800,7 +911,10 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value)), + topKWithFractionalIndex( + (a, b) => a.value.localeCompare(b.value), + options, + ), output((message) => { allMessages.push(message) }), diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e0c21c5..b5aca52 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -87,6 +87,9 @@ importers: murmurhash-js: specifier: ^1.0.0 version: 1.0.0 + sorted-btree: + specifier: ^1.8.1 + version: 1.8.1 devDependencies: '@types/murmurhash-js': specifier: ^1.0.6 @@ -2338,6 +2341,9 @@ packages: resolution: {integrity: sha512-qMCMfhY040cVHT43K9BFygqYbUPFZKHOg7K73mtTWJRb8pyP3fzf4Ixd5SzdEJQ6MRUg/WBnOLxghZtKKurENQ==} engines: {node: '>=10'} + sorted-btree@1.8.1: + resolution: {integrity: sha512-395+XIP+wqNn3USkFSrNz7G3Ss/MXlZEqesxvzCRFwL14h6e8LukDHdLBePn5pwbm5OQ9vGu8mDyz2lLDIqamQ==} + source-map-js@1.2.1: resolution: {integrity: sha512-UXWMKhLOwVKb728IUtQPXxfYU+usdybtUrK/8uGE8CQMvrhOpwvzDBwj0QhSL7MQc7vIsISBG8VQ8+IDQxpfQA==} engines: {node: '>=0.10.0'} @@ -4746,6 +4752,8 @@ snapshots: astral-regex: 2.0.0 is-fullwidth-code-point: 3.0.0 + sorted-btree@1.8.1: {} + source-map-js@1.2.1: {} spawndamnit@3.0.1: From 9b33a5239d5faeeaff4ad23d7a3ae2a8b5f6ece7 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 23 Jun 2025 11:24:50 +0200 Subject: [PATCH 06/20] Extend unit tests to test all insertion and deletion cases --- .../operators/topKWithFractionalIndex.test.ts | 185 +++++++++++++++--- 1 file changed, 155 insertions(+), 30 deletions(-) diff --git a/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts b/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts index 415daca..30ad8ea 100644 --- a/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts +++ b/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts @@ -1,4 +1,4 @@ -import { describe, it, expect, beforeEach, afterEach } from 'vitest' +import { describe, it, expect } from 'vitest' import { D2 } from '../../src/d2.js' import { MultiSet } from '../../src/multiset.js' import { topKWithFractionalIndex } from '../../src/operators/topKWithFractionalIndex.js' @@ -58,12 +58,6 @@ function verifyOrder(results: any[], expectedOrder: string[]) { expect(sortedByIndex).toEqual(expectedOrder) } -// Tests: -// - insert into array when there is not topK -// - insert into array when there is no topK and the element becomes the first element of the topK -// - insert into array when there is no topK and the element pushes the last element of the array into the topK -// - topK with limit 1 such that the first is also the last element - describe('Operators', () => { describe.each([ ['with array', { useTree: false }], @@ -334,9 +328,6 @@ describe('Operators', () => { expect(removal?.[0][1][0].id).toBe(4) // 'd' has id 4 expect(addition?.[0][1][0].id).toBe(6) // 'c+' has id 6 - // The new element reuses the index of the removed element - //expect(addition?.[0][1][1]).toBe(removal?.[0][1][1]) - // Reconstruct the current state by applying the changes const currentState = new Map() for (const [[_, [value, index]]] of initialResult) { @@ -344,37 +335,171 @@ describe('Operators', () => { } // Apply the changes - for (const [[_, [value, index]], multiplicity] of changes) { - if (multiplicity < 0) { - // Remove - currentState.delete(JSON.stringify(value)) - } else { - // Add - currentState.set(JSON.stringify(value), [value, index]) + const applyChanges = (changes: any[]) => { + for (const [[_, [value, index]], multiplicity] of changes) { + if (multiplicity < 0) { + // Remove + currentState.delete(JSON.stringify(value)) + } else { + // Add + currentState.set(JSON.stringify(value), [value, index]) + } } } + applyChanges(changes) + // Convert to array for lexicographic order check - const stateArray = Array.from(currentState.values()) - const currentStateArray = stateArray.map(([value, index]) => [ - [null, [value, index]], - 1, + const checkCurrentState = (expectedResult) => { + const stateArray = Array.from(currentState.values()) + const currentStateArray = stateArray.map(([value, index]) => [ + [null, [value, index]], + 1, + ]) + + // Check that indices are still in lexicographic order after the changes + expect(checkLexicographicOrder(currentStateArray)).toBe(true) + + // expect the array to be the values with IDs 2, 3, 6 in that order + const compareFractionalIndex = (a, b) => + a[1] < b[1] ? -1 : a[1] > b[1] ? 1 : 0 + const sortedResult = stateArray + .sort(compareFractionalIndex) + .map(([value, _]) => value) + expect(sortedResult).toEqual(expectedResult) + } + + checkCurrentState([ + { id: 2, value: 'b' }, + { id: 3, value: 'c' }, + { id: 6, value: 'c+' }, ]) - // Check that indices are still in lexicographic order after the changes - expect(checkLexicographicOrder(currentStateArray)).toBe(true) + // Now add an element that should be before the topK + input.sendData( + new MultiSet([ + [[null, { id: 7, value: '0' }], 1], // This should be before 'a' + ]), + ) + graph.run() - // expect the array to be the values with IDs 2, 3, 6 in that order - const compareFractionalIndex = (a, b) => - a[1] < b[1] ? -1 : a[1] > b[1] ? 1 : 0 - const sortedResult = stateArray - .sort(compareFractionalIndex) - .map(([value, _]) => value) - expect(sortedResult).toEqual([ + // Check the changes + const changes2 = allMessages[2].getInner() + + // We received 1 change (1 addition) + // Since we have a limit, this will push out 1 element, so we'll emit 2 changes + // This is still optimal as we're only emitting the minimum necessary changes + expect(changes2.length).toBe(2) // 1 removal + 1 addition + + // Find the removal and addition + const removal2 = changes2.find(([_, multiplicity]) => multiplicity < 0) + const addition2 = changes2.find(([_, multiplicity]) => multiplicity > 0) + + // Check that we removed 'c+' and added 'a' + expect(removal2?.[0][1][0].value).toBe('c+') + expect(addition2?.[0][1][0].value).toBe('a') + + // Check that the ids are correct + expect(removal2?.[0][1][0].id).toBe(6) // 'c+' has id 6 + expect(addition2?.[0][1][0].id).toBe(1) // 'a' has id 1 + + // Apply the changes + applyChanges(changes2) + + checkCurrentState([ + { id: 1, value: 'a' }, + { id: 2, value: 'b' }, + { id: 3, value: 'c' }, + ]) + + // Now add an element after the topK + input.sendData( + new MultiSet([ + [[null, { id: 8, value: 'h' }], 1], // This should be after 'e' + ]), + ) + graph.run() + + // Should not have emitted any changes + // since the element was added after the topK + // so it does not affect the topK + expect(allMessages.length).toBe(3) + + // Now remove an element before the topK + // This will cause the first element of the topK to move out of the topK + // and the element after the last element of the topK to move into the topK + input.sendData( + new MultiSet([ + [[null, { id: 7, value: '0' }], -1], // Remove '0' + ]), + ) + graph.run() + + const changes3 = allMessages[3].getInner() + + // Find the removal and addition + const removal3 = changes3.find(([_, multiplicity]) => multiplicity < 0) + const addition3 = changes3.find(([_, multiplicity]) => multiplicity > 0) + + // Check that we removed 'a' and added 'c+' + expect(removal3?.[0][1][0].value).toBe('a') + expect(addition3?.[0][1][0].value).toBe('c+') + + // Check that the ids are correct + expect(removal3?.[0][1][0].id).toBe(1) // 'a' has id 1 + expect(addition3?.[0][1][0].id).toBe(6) // 'c+' has id 6 + + // Apply the changes + applyChanges(changes3) + + checkCurrentState([ { id: 2, value: 'b' }, { id: 3, value: 'c' }, { id: 6, value: 'c+' }, ]) + + // Now remove an element in the topK + // This causes the element after the last element of the topK to move into the topK + input.sendData( + new MultiSet([ + [[null, { id: 6, value: 'c+' }], -1], // Remove 'c+' + ]), + ) + graph.run() + + const changes4 = allMessages[4].getInner() + + // Find the removal and addition + const removal4 = changes4.find(([_, multiplicity]) => multiplicity < 0) + const addition4 = changes4.find(([_, multiplicity]) => multiplicity > 0) + + // Check that we removed 'c+' and added 'c' + expect(removal4?.[0][1][0].value).toBe('c+') + expect(addition4?.[0][1][0].value).toBe('d') + + // Check that the ids are correct + expect(removal4?.[0][1][0].id).toBe(6) // 'c+' has id 6 + expect(addition4?.[0][1][0].id).toBe(4) // 'd' has id 4 + + // Apply the changes + applyChanges(changes4) + + checkCurrentState([ + { id: 2, value: 'b' }, + { id: 3, value: 'c' }, + { id: 4, value: 'd' }, + ]) + + // Now remove an element after the topK + input.sendData( + new MultiSet([ + [[null, { id: 8, value: 'h' }], -1], // Remove 'h' + ]), + ) + graph.run() + + // There should be no changes + expect(allMessages.length).toBe(5) }) it('should handle elements moving positions correctly', () => { From 5d41351689612da24047cb2a4717e1a0c76e8b1d Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 23 Jun 2025 11:25:27 +0200 Subject: [PATCH 07/20] Formatting --- .../d2mini/src/operators/topKWithFractionalIndex.ts | 2 +- .../tests/operators/topKWithFractionalIndex.test.ts | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/packages/d2mini/src/operators/topKWithFractionalIndex.ts b/packages/d2mini/src/operators/topKWithFractionalIndex.ts index 2b80436..ca17a6a 100644 --- a/packages/d2mini/src/operators/topKWithFractionalIndex.ts +++ b/packages/d2mini/src/operators/topKWithFractionalIndex.ts @@ -522,4 +522,4 @@ function untagValue(hashTaggedValue: HashTaggedValue): V { function getHash(hashTaggedValue: HashTaggedValue): Hash { return hashTaggedValue[1] -} \ No newline at end of file +} diff --git a/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts b/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts index 30ad8ea..1513d34 100644 --- a/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts +++ b/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts @@ -69,7 +69,10 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), options), + topKWithFractionalIndex( + (a, b) => a.value.localeCompare(b.value), + options, + ), output((message) => { allMessages.push(message) }), @@ -171,7 +174,10 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), options), + topKWithFractionalIndex( + (a, b) => a.value.localeCompare(b.value), + options, + ), output((message) => { allMessages.push(message) }), From 71d0dcb81bd93cfe95f7dc9c6adc5980d4e20e48 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 23 Jun 2025 12:01:42 +0200 Subject: [PATCH 08/20] Unit test for duplicate values --- .../operators/topKWithFractionalIndex.test.ts | 51 +++++++++++++++++-- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts b/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts index 1513d34..69a2f72 100644 --- a/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts +++ b/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts @@ -214,11 +214,6 @@ describe('Operators', () => { input.sendData(new MultiSet([[[null, { id: 6, value: 'c' }], 1]])) graph.run() - // TODO: when using the tree variant we are not actually inserting it - // because they key already exists, so we ignore it - // and thus there is no message being emitted here - // --> FIX it by wrapping the value with its hash and comparing on the value first and then on the hash - // Check the changes const changes = allMessages[1].getInner() @@ -258,6 +253,52 @@ describe('Operators', () => { expect(currentStateArray.length).toBe(6) }) + it('should ignore duplicate values', () => { + const graph = new D2() + const input = graph.newInput<[null, { id: number; value: string }]>() + const allMessages: any[] = [] + + input.pipe( + topKWithFractionalIndex( + (a, b) => a.value.localeCompare(b.value), + options, + ), + output((message) => { + allMessages.push(message) + }), + ) + + graph.finalize() + + // Initial data - a, b, c, d, e + const entryForC = [[null, { id: 3, value: 'c' }], 1] as [ + [null, { id: number; value: string }], + number, + ] + input.sendData( + new MultiSet([ + [[null, { id: 1, value: 'a' }], 1], + [[null, { id: 2, value: 'b' }], 1], + entryForC, + [[null, { id: 4, value: 'd' }], 1], + [[null, { id: 5, value: 'e' }], 1], + ]), + ) + graph.run() + + // Initial result should have all elements with fractional indices + const initialResult = allMessages[0].getInner() + expect(initialResult.length).toBe(5) + + // Now add entryForC again + input.sendData(new MultiSet([entryForC])) + graph.run() + + // Check that no message was emitted + // since there were no changes to the topK + expect(allMessages.length).toBe(1) + }) + it('should handle limit and offset correctly', () => { const graph = new D2() const input = graph.newInput<[null, { id: number; value: string }]>() From c605244bfa6b2531cfad713283a403f8603d437f Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 24 Jun 2025 10:11:45 +0200 Subject: [PATCH 09/20] Expose useTree option also on sortBy operator --- packages/d2mini/src/operators/orderBy.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/d2mini/src/operators/orderBy.ts b/packages/d2mini/src/operators/orderBy.ts index 6796f1e..0d1d71e 100644 --- a/packages/d2mini/src/operators/orderBy.ts +++ b/packages/d2mini/src/operators/orderBy.ts @@ -12,6 +12,10 @@ interface OrderByOptions { offset?: number } +interface OrderByWithFractionalIndexOptions { + useTree?: boolean +} + /** * Orders the elements and limits the number of results, with optional offset * This requires a keyed stream, and uses the `topK` operator to order all the elements. @@ -144,10 +148,11 @@ export function orderByWithFractionalIndex< valueExtractor: ( value: T extends KeyValue ? V : never, ) => Ve, - options?: OrderByOptions, + options?: OrderByOptions & OrderByWithFractionalIndexOptions, ) { const limit = options?.limit ?? Infinity const offset = options?.offset ?? 0 + const useTree = options?.useTree ?? false const comparator = options?.comparator ?? ((a, b) => { @@ -184,6 +189,7 @@ export function orderByWithFractionalIndex< topKWithFractionalIndex((a, b) => comparator(a[1], b[1]), { limit, offset, + useTree, }), map(([_, [[key], index]]) => [key, index] as KeyValue), innerJoin(stream), From 98bc39e27d251d65b153f054689becc61bb00dfc Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 24 Jun 2025 10:33:02 +0200 Subject: [PATCH 10/20] Split array and B+ tree variants in separate operators --- .../src/operators/topKWithFractionalIndex.ts | 203 +------------- .../operators/topKWithFractionalIndexBTree.ts | 256 ++++++++++++++++++ .../operators/topKWithFractionalIndex.test.ts | 50 +--- 3 files changed, 282 insertions(+), 227 deletions(-) create mode 100644 packages/d2mini/src/operators/topKWithFractionalIndexBTree.ts diff --git a/packages/d2mini/src/operators/topKWithFractionalIndex.ts b/packages/d2mini/src/operators/topKWithFractionalIndex.ts index ca17a6a..f62a91f 100644 --- a/packages/d2mini/src/operators/topKWithFractionalIndex.ts +++ b/packages/d2mini/src/operators/topKWithFractionalIndex.ts @@ -9,15 +9,13 @@ import { MultiSet } from '../multiset.js' import { Index } from '../indexes.js' import { generateKeyBetween } from 'fractional-indexing' import { binarySearch, hash } from '../utils.js' -import BTree from 'sorted-btree' -interface TopKWithFractionalIndexOptions { +export interface TopKWithFractionalIndexOptions { limit?: number offset?: number - useTree?: boolean } -type TopKChanges = { +export type TopKChanges = { /** Indicates which element moves into the topK (if any) */ moveIn: IndexedValue | null /** Indicates which element moves out of the topK (if any) */ @@ -28,7 +26,7 @@ type TopKChanges = { * A topK data structure that supports insertions and deletions * and returns changes to the topK. */ -interface TopK { +export interface TopK { insert(value: V): TopKChanges delete(value: V): TopKChanges } @@ -158,187 +156,6 @@ class TopKArray implements TopK { } } -/** - * Implementation of a topK data structure that uses a B+ tree. - * The tree allows for logarithmic time insertions and deletions. - */ -class TopKTree implements TopK { - #comparator: (a: V, b: V) => number - // topK is a window at position [topKStart, topKEnd[ - // i.e. `topKStart` is inclusive and `topKEnd` is exclusive - #topKStart: number - #topKEnd: number - - #tree: BTree> - #topKFirstElem: IndexedValue | null = null // inclusive - #topKLastElem: IndexedValue | null = null // inclusive - - constructor( - offset: number, - limit: number, - comparator: (a: V, b: V) => number, - ) { - this.#topKStart = offset - this.#topKEnd = offset + limit - this.#comparator = comparator - this.#tree = new BTree(undefined, comparator) - } - - /** - * Insert a *new* value. - * Ignores the value if it is already present. - */ - insert(value: V): TopKChanges { - let result: TopKChanges = { moveIn: null, moveOut: null } - - // Get the elements before and after the value - const [, indexedValueBefore] = this.#tree.nextLowerPair(value) ?? [ - null, - null, - ] - const [, indexedValueAfter] = this.#tree.nextHigherPair(value) ?? [ - null, - null, - ] - - const indexBefore = indexedValueBefore ? getIndex(indexedValueBefore) : null - const indexAfter = indexedValueAfter ? getIndex(indexedValueAfter) : null - - // Generate a fractional index for the value - // based on the fractional indices of the elements before and after it - const fractionalIndex = generateKeyBetween(indexBefore, indexAfter) - const insertedElem = indexedValue(value, fractionalIndex) - - // Insert the value into the tree - const inserted = this.#tree.set(value, insertedElem, false) - if (!inserted) { - // The value was already present in the tree - // ignore this insertions since we don't support overwrites! - return result - } - - if (this.#tree.size - 1 < this.#topKStart) { - // We don't have a topK yet - // so we don't need to do anything - return result - } - - if (this.#topKFirstElem) { - // We have a topK containing at least 1 element - if (this.#comparator(value, getValue(this.#topKFirstElem)) < 0) { - // The element was inserted before the topK - // so it moves the element that is right before the topK into the topK - const firstElem = getValue(this.#topKFirstElem) - const [, newFirstElem] = this.#tree.nextLowerPair(firstElem)! - this.#topKFirstElem = newFirstElem - result.moveIn = this.#topKFirstElem - } else if ( - !this.#topKLastElem || - this.#comparator(value, getValue(this.#topKLastElem)) < 0 - ) { - // The element was inserted within the topK - result.moveIn = insertedElem - } - - if ( - this.#topKLastElem && - this.#comparator(value, getValue(this.#topKLastElem)) < 0 - ) { - // The element was inserted before or within the topK - // the newly inserted element pushes the last element of the topK out of the topK - // so the one before that becomes the new last element of the topK - const lastElem = this.#topKLastElem - const lastValue = getValue(lastElem) - const [, newLastElem] = this.#tree.nextLowerPair(lastValue)! - this.#topKLastElem = newLastElem - result.moveOut = lastElem - } - } - - // If the tree has as many elements as the offset (i.e. #topKStart) - // then the insertion shifted the elements 1 position to the right - // and the last element in the tree is now the first element of the topK - if (this.#tree.size - 1 === this.#topKStart) { - const topKFirstKey = this.#tree.maxKey()! - this.#topKFirstElem = this.#tree.get(topKFirstKey)! - result.moveIn = this.#topKFirstElem - } - - // By inserting this new element we now have a complete topK - // store the last element of the topK - if (this.#tree.size === this.#topKEnd) { - const topKLastKey = this.#tree.maxKey()! - this.#topKLastElem = this.#tree.get(topKLastKey)! - } - - return result - } - - delete(value: V): TopKChanges { - let result: TopKChanges = { moveIn: null, moveOut: null } - - const deletedElem = this.#tree.get(value) - const deleted = this.#tree.delete(value) - if (!deleted) { - return result - } - - if (!this.#topKFirstElem) { - // We didn't have a topK before the delete - // so we still can't have a topK after the delete - return result - } - - if (this.#comparator(value, getValue(this.#topKFirstElem)) < 0) { - // We deleted an element that was before the topK - // so the topK has shifted one position to the left - - // the old first element moves out of the topK - result.moveOut = this.#topKFirstElem - // the element that was right after the first element of the topK - // is now the new first element of the topK - const firstElem = getValue(this.#topKFirstElem) - const [, newFirstElem] = this.#tree.nextHigherPair(firstElem) ?? [ - null, - null, - ] - this.#topKFirstElem = newFirstElem - } else if ( - !this.#topKLastElem || - // TODO: if on equal order the element is inserted *after* the already existing one - // then this check should become < 0 - this.#comparator(value, getValue(this.#topKLastElem)) <= 0 - ) { - // The element we deleted was within the topK - // so we need to signal that that element is no longer in the topK - result.moveOut = deletedElem! - } - - if ( - this.#topKLastElem && - // TODO: if on equal order the element is inserted *after* the already existing one - // then this check should become < 0 - this.#comparator(value, getValue(this.#topKLastElem)) <= 0 - ) { - // The element we deleted was before or within the topK - // So the first element after the topK moved one position to the left - // and thus falls into the topK now - const lastElem = this.#topKLastElem - const lastValue = getValue(lastElem) - const [, newLastElem] = this.#tree.nextHigherPair(lastValue) ?? [ - null, - null, - ] - this.#topKLastElem = newLastElem - if (newLastElem) { - result.moveIn = newLastElem - } - } - - return result - } -} - /** * Operator for fractional indexed topK operations * This operator maintains fractional indices for sorted elements @@ -380,9 +197,15 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< const hashB = getHash(b) return hashA < hashB ? -1 : hashA > hashB ? 1 : 0 } - this.#topK = options.useTree - ? new TopKTree(offset, limit, compareTaggedValues) - : new TopKArray(offset, limit, compareTaggedValues) + this.#topK = this.createTopK(offset, limit, compareTaggedValues) + } + + protected createTopK( + offset: number, + limit: number, + comparator: (a: HashTaggedValue, b: HashTaggedValue) => number, + ): TopK> { + return new TopKArray(offset, limit, comparator) } run(): void { @@ -510,7 +333,7 @@ function mapValue( // Abstraction for values tagged with a hash type Hash = string -type HashTaggedValue = [V, Hash] +export type HashTaggedValue = [V, Hash] function tagValue(value: V): HashTaggedValue { return [value, hash(value)] diff --git a/packages/d2mini/src/operators/topKWithFractionalIndexBTree.ts b/packages/d2mini/src/operators/topKWithFractionalIndexBTree.ts new file mode 100644 index 0000000..de7a1c8 --- /dev/null +++ b/packages/d2mini/src/operators/topKWithFractionalIndexBTree.ts @@ -0,0 +1,256 @@ +import { IStreamBuilder, KeyValue, PipedOperator } from '../types.js' +import { DifferenceStreamReader, DifferenceStreamWriter } from '../graph.js' +import { StreamBuilder } from '../d2.js' +import { generateKeyBetween } from 'fractional-indexing' +import BTree from 'sorted-btree' +import { + HashTaggedValue, + TopK, + TopKChanges, + TopKWithFractionalIndexOperator, + TopKWithFractionalIndexOptions, +} from './topKWithFractionalIndex.js' + +/** + * Implementation of a topK data structure that uses a B+ tree. + * The tree allows for logarithmic time insertions and deletions. + */ +class TopKTree implements TopK { + #comparator: (a: V, b: V) => number + // topK is a window at position [topKStart, topKEnd[ + // i.e. `topKStart` is inclusive and `topKEnd` is exclusive + #topKStart: number + #topKEnd: number + + #tree: BTree> + #topKFirstElem: IndexedValue | null = null // inclusive + #topKLastElem: IndexedValue | null = null // inclusive + + constructor( + offset: number, + limit: number, + comparator: (a: V, b: V) => number, + ) { + this.#topKStart = offset + this.#topKEnd = offset + limit + this.#comparator = comparator + this.#tree = new BTree(undefined, comparator) + } + + /** + * Insert a *new* value. + * Ignores the value if it is already present. + */ + insert(value: V): TopKChanges { + let result: TopKChanges = { moveIn: null, moveOut: null } + + // Get the elements before and after the value + const [, indexedValueBefore] = this.#tree.nextLowerPair(value) ?? [ + null, + null, + ] + const [, indexedValueAfter] = this.#tree.nextHigherPair(value) ?? [ + null, + null, + ] + + const indexBefore = indexedValueBefore ? getIndex(indexedValueBefore) : null + const indexAfter = indexedValueAfter ? getIndex(indexedValueAfter) : null + + // Generate a fractional index for the value + // based on the fractional indices of the elements before and after it + const fractionalIndex = generateKeyBetween(indexBefore, indexAfter) + const insertedElem = indexedValue(value, fractionalIndex) + + // Insert the value into the tree + const inserted = this.#tree.set(value, insertedElem, false) + if (!inserted) { + // The value was already present in the tree + // ignore this insertions since we don't support overwrites! + return result + } + + if (this.#tree.size - 1 < this.#topKStart) { + // We don't have a topK yet + // so we don't need to do anything + return result + } + + if (this.#topKFirstElem) { + // We have a topK containing at least 1 element + if (this.#comparator(value, getValue(this.#topKFirstElem)) < 0) { + // The element was inserted before the topK + // so it moves the element that is right before the topK into the topK + const firstElem = getValue(this.#topKFirstElem) + const [, newFirstElem] = this.#tree.nextLowerPair(firstElem)! + this.#topKFirstElem = newFirstElem + result.moveIn = this.#topKFirstElem + } else if ( + !this.#topKLastElem || + this.#comparator(value, getValue(this.#topKLastElem)) < 0 + ) { + // The element was inserted within the topK + result.moveIn = insertedElem + } + + if ( + this.#topKLastElem && + this.#comparator(value, getValue(this.#topKLastElem)) < 0 + ) { + // The element was inserted before or within the topK + // the newly inserted element pushes the last element of the topK out of the topK + // so the one before that becomes the new last element of the topK + const lastElem = this.#topKLastElem + const lastValue = getValue(lastElem) + const [, newLastElem] = this.#tree.nextLowerPair(lastValue)! + this.#topKLastElem = newLastElem + result.moveOut = lastElem + } + } + + // If the tree has as many elements as the offset (i.e. #topKStart) + // then the insertion shifted the elements 1 position to the right + // and the last element in the tree is now the first element of the topK + if (this.#tree.size - 1 === this.#topKStart) { + const topKFirstKey = this.#tree.maxKey()! + this.#topKFirstElem = this.#tree.get(topKFirstKey)! + result.moveIn = this.#topKFirstElem + } + + // By inserting this new element we now have a complete topK + // store the last element of the topK + if (this.#tree.size === this.#topKEnd) { + const topKLastKey = this.#tree.maxKey()! + this.#topKLastElem = this.#tree.get(topKLastKey)! + } + + return result + } + + delete(value: V): TopKChanges { + let result: TopKChanges = { moveIn: null, moveOut: null } + + const deletedElem = this.#tree.get(value) + const deleted = this.#tree.delete(value) + if (!deleted) { + return result + } + + if (!this.#topKFirstElem) { + // We didn't have a topK before the delete + // so we still can't have a topK after the delete + return result + } + + if (this.#comparator(value, getValue(this.#topKFirstElem)) < 0) { + // We deleted an element that was before the topK + // so the topK has shifted one position to the left + + // the old first element moves out of the topK + result.moveOut = this.#topKFirstElem + // the element that was right after the first element of the topK + // is now the new first element of the topK + const firstElem = getValue(this.#topKFirstElem) + const [, newFirstElem] = this.#tree.nextHigherPair(firstElem) ?? [ + null, + null, + ] + this.#topKFirstElem = newFirstElem + } else if ( + !this.#topKLastElem || + // TODO: if on equal order the element is inserted *after* the already existing one + // then this check should become < 0 + this.#comparator(value, getValue(this.#topKLastElem)) <= 0 + ) { + // The element we deleted was within the topK + // so we need to signal that that element is no longer in the topK + result.moveOut = deletedElem! + } + + if ( + this.#topKLastElem && + // TODO: if on equal order the element is inserted *after* the already existing one + // then this check should become < 0 + this.#comparator(value, getValue(this.#topKLastElem)) <= 0 + ) { + // The element we deleted was before or within the topK + // So the first element after the topK moved one position to the left + // and thus falls into the topK now + const lastElem = this.#topKLastElem + const lastValue = getValue(lastElem) + const [, newLastElem] = this.#tree.nextHigherPair(lastValue) ?? [ + null, + null, + ] + this.#topKLastElem = newLastElem + if (newLastElem) { + result.moveIn = newLastElem + } + } + + return result + } +} + +/** + * Operator for fractional indexed topK operations + * This operator maintains fractional indices for sorted elements + * and only updates indices when elements move position + */ +export class TopKWithFractionalIndexBTreeOperator< + K, + V1, +> extends TopKWithFractionalIndexOperator { + protected override createTopK( + offset: number, + limit: number, + comparator: (a: HashTaggedValue, b: HashTaggedValue) => number, + ): TopK> { + return new TopKTree(offset, limit, comparator) + } +} + +/** + * Limits the number of results based on a comparator, with optional offset. + * This works on a keyed stream, where the key is the first element of the tuple. + * The ordering is within a key group, i.e. elements are sorted within a key group + * and the limit + offset is applied to that sorted group. + * To order the entire stream, key by the same value for all elements such as null. + * + * Uses fractional indexing to minimize the number of changes when elements move positions. + * Each element is assigned a fractional index that is lexicographically sortable. + * When elements move, only the indices of the moved elements are updated, not all elements. + * + * @param comparator - A function that compares two elements + * @param options - An optional object containing limit and offset properties + * @returns A piped operator that orders the elements and limits the number of results + */ +export function topKWithFractionalIndexBTree< + K extends T extends KeyValue ? K : never, + V1 extends T extends KeyValue ? V : never, + T, +>( + comparator: (a: V1, b: V1) => number, + options?: TopKWithFractionalIndexOptions, +): PipedOperator> { + const opts = options || {} + + return ( + stream: IStreamBuilder, + ): IStreamBuilder> => { + const output = new StreamBuilder>( + stream.graph, + new DifferenceStreamWriter>(), + ) + const operator = new TopKWithFractionalIndexOperator( + stream.graph.getNextOperatorId(), + stream.connectReader() as DifferenceStreamReader>, + output.writer, + comparator, + opts, + ) + stream.graph.addOperator(operator) + stream.graph.addStream(output.connectReader()) + return output + } +} diff --git a/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts b/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts index 69a2f72..d6238c2 100644 --- a/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts +++ b/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts @@ -2,6 +2,7 @@ import { describe, it, expect } from 'vitest' import { D2 } from '../../src/d2.js' import { MultiSet } from '../../src/multiset.js' import { topKWithFractionalIndex } from '../../src/operators/topKWithFractionalIndex.js' +import { topKWithFractionalIndexBTree } from '../../src/operators/topKWithFractionalIndexBTree.js' import { output } from '../../src/operators/index.js' // Helper function to check if indices are in lexicographic order @@ -60,19 +61,16 @@ function verifyOrder(results: any[], expectedOrder: string[]) { describe('Operators', () => { describe.each([ - ['with array', { useTree: false }], - ['with B+ tree', { useTree: true }], - ])('TopKWithFractionalIndex operation %s', (_, options) => { + ['with array', { topK: topKWithFractionalIndex }], + ['with B+ tree', { topK: topKWithFractionalIndexBTree }], + ])('TopKWithFractionalIndex operation %s', (_, { topK }) => { it('should assign fractional indices to sorted elements', () => { const graph = new D2() const input = graph.newInput<[null, { id: number; value: string }]>() const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex( - (a, b) => a.value.localeCompare(b.value), - options, - ), + topK((a, b) => a.value.localeCompare(b.value)), output((message) => { allMessages.push(message) }), @@ -174,10 +172,7 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex( - (a, b) => a.value.localeCompare(b.value), - options, - ), + topK((a, b) => a.value.localeCompare(b.value)), output((message) => { allMessages.push(message) }), @@ -259,10 +254,7 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex( - (a, b) => a.value.localeCompare(b.value), - options, - ), + topK((a, b) => a.value.localeCompare(b.value)), output((message) => { allMessages.push(message) }), @@ -305,10 +297,9 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + topK((a, b) => a.value.localeCompare(b.value), { limit: 3, offset: 1, - ...options, }), output((message) => { allMessages.push(message) @@ -555,10 +546,7 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex( - (a, b) => a.value.localeCompare(b.value), - options, - ), + topK((a, b) => a.value.localeCompare(b.value)), output((message) => { allMessages.push(message) }), @@ -704,10 +692,7 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex( - (a, b) => a.value.localeCompare(b.value), - options, - ), + topK((a, b) => a.value.localeCompare(b.value)), output((message) => { allMessages.push(message) }), @@ -864,10 +849,7 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex( - (a, b) => a.value.localeCompare(b.value), - options, - ), + topK((a, b) => a.value.localeCompare(b.value)), output((message) => { allMessages.push(message) }), @@ -996,10 +978,7 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex( - (a, b) => a.value.localeCompare(b.value), - options, - ), + topK((a, b) => a.value.localeCompare(b.value)), output((message) => { allMessages.push(message) }), @@ -1083,10 +1062,7 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex( - (a, b) => a.value.localeCompare(b.value), - options, - ), + topK((a, b) => a.value.localeCompare(b.value)), output((message) => { allMessages.push(message) }), From 1e402df5164701e57ba7d067196bf0a8a2f69af5 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 24 Jun 2025 10:44:13 +0200 Subject: [PATCH 11/20] Add missing imports --- .../d2mini/src/operators/topKWithFractionalIndex.ts | 12 ++++++------ .../src/operators/topKWithFractionalIndexBTree.ts | 4 ++++ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/packages/d2mini/src/operators/topKWithFractionalIndex.ts b/packages/d2mini/src/operators/topKWithFractionalIndex.ts index f62a91f..3a438df 100644 --- a/packages/d2mini/src/operators/topKWithFractionalIndex.ts +++ b/packages/d2mini/src/operators/topKWithFractionalIndex.ts @@ -309,18 +309,18 @@ export function topKWithFractionalIndex< } // Abstraction for fractionally indexed values -type FractionalIndex = string -type IndexedValue = [V, FractionalIndex] +export type FractionalIndex = string +export type IndexedValue = [V, FractionalIndex] -function indexedValue(value: V, index: FractionalIndex): IndexedValue { +export function indexedValue(value: V, index: FractionalIndex): IndexedValue { return [value, index] } -function getValue(indexedValue: IndexedValue): V { +export function getValue(indexedValue: IndexedValue): V { return indexedValue[0] } -function getIndex(indexedValue: IndexedValue): FractionalIndex { +export function getIndex(indexedValue: IndexedValue): FractionalIndex { return indexedValue[1] } @@ -332,7 +332,7 @@ function mapValue( } // Abstraction for values tagged with a hash -type Hash = string +export type Hash = string export type HashTaggedValue = [V, Hash] function tagValue(value: V): HashTaggedValue { diff --git a/packages/d2mini/src/operators/topKWithFractionalIndexBTree.ts b/packages/d2mini/src/operators/topKWithFractionalIndexBTree.ts index de7a1c8..ee14d00 100644 --- a/packages/d2mini/src/operators/topKWithFractionalIndexBTree.ts +++ b/packages/d2mini/src/operators/topKWithFractionalIndexBTree.ts @@ -4,7 +4,11 @@ import { StreamBuilder } from '../d2.js' import { generateKeyBetween } from 'fractional-indexing' import BTree from 'sorted-btree' import { + getIndex, + getValue, HashTaggedValue, + indexedValue, + IndexedValue, TopK, TopKChanges, TopKWithFractionalIndexOperator, From d1473b29f10506f15f8aa0703af9fd70d6778597 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 24 Jun 2025 10:44:42 +0200 Subject: [PATCH 12/20] Add a orderBy operator that uses topK with B+ tree variant --- packages/d2mini/src/operators/orderBy.ts | 56 ++++++++++++++++++------ 1 file changed, 42 insertions(+), 14 deletions(-) diff --git a/packages/d2mini/src/operators/orderBy.ts b/packages/d2mini/src/operators/orderBy.ts index 0d1d71e..1e0b06b 100644 --- a/packages/d2mini/src/operators/orderBy.ts +++ b/packages/d2mini/src/operators/orderBy.ts @@ -2,6 +2,7 @@ import { IStreamBuilder } from '../types' import { KeyValue } from '../types.js' import { topK, topKWithIndex } from './topK.js' import { topKWithFractionalIndex } from './topKWithFractionalIndex.js' +import { topKWithFractionalIndexBTree } from './topKWithFractionalIndexBTree.js' import { map } from './map.js' import { innerJoin } from './join.js' import { consolidate } from './consolidate.js' @@ -132,19 +133,11 @@ export function orderByWithIndex< } } -/** - * Orders the elements and limits the number of results, with optional offset and - * annotates the value with a fractional index. - * This requires a keyed stream, and uses the `topKWithFractionalIndex` operator to order all the elements. - * - * @param valueExtractor - A function that extracts the value to order by from the element - * @param options - An optional object containing comparator, limit and offset properties - * @returns A piped operator that orders the elements and limits the number of results - */ -export function orderByWithFractionalIndex< +function orderByWithFractionalIndexBase< T extends KeyValue, Ve = unknown, ->( + >( + topK: typeof topKWithFractionalIndex, valueExtractor: ( value: T extends KeyValue ? V : never, ) => Ve, @@ -152,7 +145,6 @@ export function orderByWithFractionalIndex< ) { const limit = options?.limit ?? Infinity const offset = options?.offset ?? 0 - const useTree = options?.useTree ?? false const comparator = options?.comparator ?? ((a, b) => { @@ -186,10 +178,9 @@ export function orderByWithFractionalIndex< ], ] as KeyValue, ), - topKWithFractionalIndex((a, b) => comparator(a[1], b[1]), { + topK((a, b) => comparator(a[1], b[1]), { limit, offset, - useTree, }), map(([_, [[key], index]]) => [key, index] as KeyValue), innerJoin(stream), @@ -200,3 +191,40 @@ export function orderByWithFractionalIndex< ) } } + +/** + * Orders the elements and limits the number of results, with optional offset and + * annotates the value with a fractional index. + * This requires a keyed stream, and uses the `topKWithFractionalIndex` operator to order all the elements. + * + * @param valueExtractor - A function that extracts the value to order by from the element + * @param options - An optional object containing comparator, limit and offset properties + * @returns A piped operator that orders the elements and limits the number of results + */ +export function orderByWithFractionalIndex< + T extends KeyValue, + Ve = unknown, +>( + valueExtractor: ( + value: T extends KeyValue ? V : never, + ) => Ve, + options?: OrderByOptions & OrderByWithFractionalIndexOptions, +) { + return orderByWithFractionalIndexBase(topKWithFractionalIndex, valueExtractor, options) +} + +export function orderByWithFractionalIndexBTree< + T extends KeyValue, + Ve = unknown, +>( + valueExtractor: ( + value: T extends KeyValue ? V : never, + ) => Ve, + options?: OrderByOptions & OrderByWithFractionalIndexOptions, +) { + return orderByWithFractionalIndexBase( + topKWithFractionalIndexBTree, + valueExtractor, + options, + ) +} From bffd28b6b1514cbbe8c1c709ef40e997fe8bcc56 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 24 Jun 2025 10:44:55 +0200 Subject: [PATCH 13/20] Formatting --- packages/d2mini/src/operators/orderBy.ts | 8 ++++++-- packages/d2mini/src/operators/topKWithFractionalIndex.ts | 5 ++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/packages/d2mini/src/operators/orderBy.ts b/packages/d2mini/src/operators/orderBy.ts index 1e0b06b..318608b 100644 --- a/packages/d2mini/src/operators/orderBy.ts +++ b/packages/d2mini/src/operators/orderBy.ts @@ -136,7 +136,7 @@ export function orderByWithIndex< function orderByWithFractionalIndexBase< T extends KeyValue, Ve = unknown, - >( +>( topK: typeof topKWithFractionalIndex, valueExtractor: ( value: T extends KeyValue ? V : never, @@ -210,7 +210,11 @@ export function orderByWithFractionalIndex< ) => Ve, options?: OrderByOptions & OrderByWithFractionalIndexOptions, ) { - return orderByWithFractionalIndexBase(topKWithFractionalIndex, valueExtractor, options) + return orderByWithFractionalIndexBase( + topKWithFractionalIndex, + valueExtractor, + options, + ) } export function orderByWithFractionalIndexBTree< diff --git a/packages/d2mini/src/operators/topKWithFractionalIndex.ts b/packages/d2mini/src/operators/topKWithFractionalIndex.ts index 3a438df..7c49f86 100644 --- a/packages/d2mini/src/operators/topKWithFractionalIndex.ts +++ b/packages/d2mini/src/operators/topKWithFractionalIndex.ts @@ -312,7 +312,10 @@ export function topKWithFractionalIndex< export type FractionalIndex = string export type IndexedValue = [V, FractionalIndex] -export function indexedValue(value: V, index: FractionalIndex): IndexedValue { +export function indexedValue( + value: V, + index: FractionalIndex, +): IndexedValue { return [value, index] } From 2640de6009d9c4c1126357f3f5196007632a42ba Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 24 Jun 2025 11:13:41 +0200 Subject: [PATCH 14/20] Trigger CI From 69551eaaad31e9ea71ea9402364c24ade096498e Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 24 Jun 2025 11:15:13 +0200 Subject: [PATCH 15/20] Remove useTree option --- packages/d2mini/src/operators/orderBy.ts | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/packages/d2mini/src/operators/orderBy.ts b/packages/d2mini/src/operators/orderBy.ts index 318608b..8161086 100644 --- a/packages/d2mini/src/operators/orderBy.ts +++ b/packages/d2mini/src/operators/orderBy.ts @@ -13,10 +13,6 @@ interface OrderByOptions { offset?: number } -interface OrderByWithFractionalIndexOptions { - useTree?: boolean -} - /** * Orders the elements and limits the number of results, with optional offset * This requires a keyed stream, and uses the `topK` operator to order all the elements. @@ -141,7 +137,7 @@ function orderByWithFractionalIndexBase< valueExtractor: ( value: T extends KeyValue ? V : never, ) => Ve, - options?: OrderByOptions & OrderByWithFractionalIndexOptions, + options?: OrderByOptions, ) { const limit = options?.limit ?? Infinity const offset = options?.offset ?? 0 @@ -208,7 +204,7 @@ export function orderByWithFractionalIndex< valueExtractor: ( value: T extends KeyValue ? V : never, ) => Ve, - options?: OrderByOptions & OrderByWithFractionalIndexOptions, + options?: OrderByOptions, ) { return orderByWithFractionalIndexBase( topKWithFractionalIndex, @@ -224,7 +220,7 @@ export function orderByWithFractionalIndexBTree< valueExtractor: ( value: T extends KeyValue ? V : never, ) => Ve, - options?: OrderByOptions & OrderByWithFractionalIndexOptions, + options?: OrderByOptions, ) { return orderByWithFractionalIndexBase( topKWithFractionalIndexBTree, From edad4be476cd9441081e3a5f5625aa5087a8b0cd Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 24 Jun 2025 11:21:46 +0200 Subject: [PATCH 16/20] Changeset --- .changeset/chilly-icons-design.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/chilly-icons-design.md diff --git a/.changeset/chilly-icons-design.md b/.changeset/chilly-icons-design.md new file mode 100644 index 0000000..1c02663 --- /dev/null +++ b/.changeset/chilly-icons-design.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/d2mini': patch +--- + +Introduce topKWithFractionalIndexBTree and orderByWithFractionalIndexBTree operators. These variants use a B+ tree which should be efficient for big collections (logarithmic time). From 4e2263832e66ec2e16693c7497969c5fa7e8c71a Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 8 Jul 2025 09:15:33 +0200 Subject: [PATCH 17/20] Dynamically import B+ tree library --- .../operators/topKWithFractionalIndexBTree.ts | 45 ++++++++++++++++++- .../operators/topKWithFractionalIndex.test.ts | 13 ++++-- 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/packages/d2mini/src/operators/topKWithFractionalIndexBTree.ts b/packages/d2mini/src/operators/topKWithFractionalIndexBTree.ts index ee14d00..4b61676 100644 --- a/packages/d2mini/src/operators/topKWithFractionalIndexBTree.ts +++ b/packages/d2mini/src/operators/topKWithFractionalIndexBTree.ts @@ -2,7 +2,6 @@ import { IStreamBuilder, KeyValue, PipedOperator } from '../types.js' import { DifferenceStreamReader, DifferenceStreamWriter } from '../graph.js' import { StreamBuilder } from '../d2.js' import { generateKeyBetween } from 'fractional-indexing' -import BTree from 'sorted-btree' import { getIndex, getValue, @@ -15,6 +14,33 @@ import { TopKWithFractionalIndexOptions, } from './topKWithFractionalIndex.js' +interface BTree { + nextLowerPair(key: Key): [Key, Value] | undefined + nextHigherPair(key: Key): [Key, Value] | undefined + set(key: Key, value: Value, overwrite?: boolean): boolean + maxKey(): Key | undefined + get(key: Key, defaultValue?: Value): Value | undefined + delete(key: Key): boolean + size: number +} + +interface BTreeClass { + new ( + entries?: [Key, Value][], + compare?: (a: Key, b: Key) => number, + maxNodeSize?: number, + ): BTree +} + +let BTree: BTreeClass + +export async function loadBTree() { + if (!BTree) { + const { default: BTreeClass } = await import('sorted-btree') + BTree = BTreeClass + } +} + /** * Implementation of a topK data structure that uses a B+ tree. * The tree allows for logarithmic time insertions and deletions. @@ -35,6 +61,12 @@ class TopKTree implements TopK { limit: number, comparator: (a: V, b: V) => number, ) { + if (!BTree) { + throw new Error( + 'B+ tree not loaded. You need to call loadBTree() before using TopKTree.', + ) + } + this.#topKStart = offset this.#topKEnd = offset + limit this.#comparator = comparator @@ -210,6 +242,11 @@ export class TopKWithFractionalIndexBTreeOperator< limit: number, comparator: (a: HashTaggedValue, b: HashTaggedValue) => number, ): TopK> { + if (!BTree) { + throw new Error( + 'B+ tree not loaded. You need to call loadBTree() before using TopKWithFractionalIndexBTreeOperator.', + ) + } return new TopKTree(offset, limit, comparator) } } @@ -239,6 +276,12 @@ export function topKWithFractionalIndexBTree< ): PipedOperator> { const opts = options || {} + if (!BTree) { + throw new Error( + 'B+ tree not loaded. You need to call loadBTree() before using topKWithFractionalIndexBTree.', + ) + } + return ( stream: IStreamBuilder, ): IStreamBuilder> => { diff --git a/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts b/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts index d6238c2..4f0a5dc 100644 --- a/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts +++ b/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts @@ -1,8 +1,11 @@ -import { describe, it, expect } from 'vitest' +import { describe, it, expect, beforeAll } from 'vitest' import { D2 } from '../../src/d2.js' import { MultiSet } from '../../src/multiset.js' import { topKWithFractionalIndex } from '../../src/operators/topKWithFractionalIndex.js' -import { topKWithFractionalIndexBTree } from '../../src/operators/topKWithFractionalIndexBTree.js' +import { + loadBTree, + topKWithFractionalIndexBTree, +} from '../../src/operators/topKWithFractionalIndexBTree.js' import { output } from '../../src/operators/index.js' // Helper function to check if indices are in lexicographic order @@ -59,11 +62,15 @@ function verifyOrder(results: any[], expectedOrder: string[]) { expect(sortedByIndex).toEqual(expectedOrder) } +beforeAll(async () => { + await loadBTree() +}) + describe('Operators', () => { describe.each([ ['with array', { topK: topKWithFractionalIndex }], ['with B+ tree', { topK: topKWithFractionalIndexBTree }], - ])('TopKWithFractionalIndex operation %s', (_, { topK }) => { + ])('TopKWithFractionalIndex operator %s', (_, { topK }) => { it('should assign fractional indices to sorted elements', () => { const graph = new D2() const input = graph.newInput<[null, { id: number; value: string }]>() From 845bbe6a716360f23ecdcb23d4681d33fce3abea Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 8 Jul 2025 09:16:07 +0200 Subject: [PATCH 18/20] Also run orderByWithFractionalIndex tests for both the array and B+ tree variant --- .../orderByWithFractionalIndex.test.ts | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/packages/d2mini/tests/operators/orderByWithFractionalIndex.test.ts b/packages/d2mini/tests/operators/orderByWithFractionalIndex.test.ts index 67ba1e6..3df53bd 100644 --- a/packages/d2mini/tests/operators/orderByWithFractionalIndex.test.ts +++ b/packages/d2mini/tests/operators/orderByWithFractionalIndex.test.ts @@ -1,11 +1,13 @@ -import { describe, test, expect } from 'vitest' +import { describe, test, expect, beforeAll } from 'vitest' import { D2 } from '../../src/d2.js' import { MultiSet } from '../../src/multiset.js' import { orderByWithFractionalIndex, + orderByWithFractionalIndexBTree, output, } from '../../src/operators/index.js' import { KeyValue } from '../../src/types.js' +import { loadBTree } from '../../src/operators/topKWithFractionalIndexBTree.js' const stripFractionalIndex = ([[key, [value, _index]], multiplicity]) => [ key, @@ -13,8 +15,15 @@ const stripFractionalIndex = ([[key, [value, _index]], multiplicity]) => [ multiplicity, ] +beforeAll(async () => { + await loadBTree() +}) + describe('Operators', () => { - describe('OrderByWithFractionalIndex operation', () => { + describe.each([ + ['with array', { orderBy: orderByWithFractionalIndex }], + ['with B+ tree', { orderBy: orderByWithFractionalIndexBTree }], + ])('OrderByWithFractionalIndex operator %s', (_, { orderBy }) => { test('initial results with default comparator', () => { const graph = new D2() const input = graph.newInput< @@ -29,7 +38,7 @@ describe('Operators', () => { let latestMessage: any = null input.pipe( - orderByWithFractionalIndex((item) => item.value), + orderBy((item) => item.value), output((message) => { latestMessage = message }), @@ -77,7 +86,7 @@ describe('Operators', () => { let latestMessage: any = null input.pipe( - orderByWithFractionalIndex((item) => item.value, { + orderBy((item) => item.value, { comparator: (a, b) => b.localeCompare(a), // reverse order }), output((message) => { @@ -127,7 +136,7 @@ describe('Operators', () => { let latestMessage: any = null input.pipe( - orderByWithFractionalIndex((item) => item.value, { limit: 3 }), + orderBy((item) => item.value, { limit: 3 }), output((message) => { latestMessage = message }), @@ -173,7 +182,7 @@ describe('Operators', () => { let latestMessage: any = null input.pipe( - orderByWithFractionalIndex((item) => item.value, { + orderBy((item) => item.value, { limit: 2, offset: 2, }), @@ -221,7 +230,7 @@ describe('Operators', () => { let latestMessage: any = null input.pipe( - orderByWithFractionalIndex((item) => item.id), + orderBy((item) => item.id), output((message) => { latestMessage = message }), @@ -269,7 +278,7 @@ describe('Operators', () => { let latestMessage: any = null input.pipe( - orderByWithFractionalIndex((item) => item.value, { limit: 3 }), + orderBy((item) => item.value, { limit: 3 }), output((message) => { latestMessage = message }), @@ -333,7 +342,7 @@ describe('Operators', () => { let latestMessage: any = null input.pipe( - orderByWithFractionalIndex((item) => item.value, { limit: 3 }), + orderBy((item) => item.value, { limit: 3 }), output((message) => { latestMessage = message }), @@ -399,7 +408,7 @@ describe('Operators', () => { let latestMessage: any = null input.pipe( - orderByWithFractionalIndex((item) => item.value, { limit: 3 }), + orderBy((item) => item.value, { limit: 3 }), output((message) => { latestMessage = message }), From 37eb83f805d6aa3c00726ade1685e871191a129e Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 8 Jul 2025 09:32:07 +0200 Subject: [PATCH 19/20] Split tree version of orderBy into its own file to enable tree shaking the sorted-btree dependency. --- packages/d2mini/src/operators/orderBy.ts | 21 ++----------------- packages/d2mini/src/operators/orderByBTree.ts | 19 +++++++++++++++++ .../orderByWithFractionalIndex.test.ts | 2 +- 3 files changed, 22 insertions(+), 20 deletions(-) create mode 100644 packages/d2mini/src/operators/orderByBTree.ts diff --git a/packages/d2mini/src/operators/orderBy.ts b/packages/d2mini/src/operators/orderBy.ts index 8161086..681ab5d 100644 --- a/packages/d2mini/src/operators/orderBy.ts +++ b/packages/d2mini/src/operators/orderBy.ts @@ -2,12 +2,11 @@ import { IStreamBuilder } from '../types' import { KeyValue } from '../types.js' import { topK, topKWithIndex } from './topK.js' import { topKWithFractionalIndex } from './topKWithFractionalIndex.js' -import { topKWithFractionalIndexBTree } from './topKWithFractionalIndexBTree.js' import { map } from './map.js' import { innerJoin } from './join.js' import { consolidate } from './consolidate.js' -interface OrderByOptions { +export interface OrderByOptions { comparator?: (a: Ve, b: Ve) => number limit?: number offset?: number @@ -129,7 +128,7 @@ export function orderByWithIndex< } } -function orderByWithFractionalIndexBase< +export function orderByWithFractionalIndexBase< T extends KeyValue, Ve = unknown, >( @@ -212,19 +211,3 @@ export function orderByWithFractionalIndex< options, ) } - -export function orderByWithFractionalIndexBTree< - T extends KeyValue, - Ve = unknown, ->( - valueExtractor: ( - value: T extends KeyValue ? V : never, - ) => Ve, - options?: OrderByOptions, -) { - return orderByWithFractionalIndexBase( - topKWithFractionalIndexBTree, - valueExtractor, - options, - ) -} diff --git a/packages/d2mini/src/operators/orderByBTree.ts b/packages/d2mini/src/operators/orderByBTree.ts new file mode 100644 index 0000000..35a7848 --- /dev/null +++ b/packages/d2mini/src/operators/orderByBTree.ts @@ -0,0 +1,19 @@ +import { KeyValue } from '../types.js' +import { OrderByOptions, orderByWithFractionalIndexBase } from './orderBy.js' +import { topKWithFractionalIndexBTree } from './topKWithFractionalIndexBTree.js' + +export function orderByWithFractionalIndexBTree< + T extends KeyValue, + Ve = unknown, +>( + valueExtractor: ( + value: T extends KeyValue ? V : never, + ) => Ve, + options?: OrderByOptions, +) { + return orderByWithFractionalIndexBase( + topKWithFractionalIndexBTree, + valueExtractor, + options, + ) +} diff --git a/packages/d2mini/tests/operators/orderByWithFractionalIndex.test.ts b/packages/d2mini/tests/operators/orderByWithFractionalIndex.test.ts index 3df53bd..ed5576f 100644 --- a/packages/d2mini/tests/operators/orderByWithFractionalIndex.test.ts +++ b/packages/d2mini/tests/operators/orderByWithFractionalIndex.test.ts @@ -3,9 +3,9 @@ import { D2 } from '../../src/d2.js' import { MultiSet } from '../../src/multiset.js' import { orderByWithFractionalIndex, - orderByWithFractionalIndexBTree, output, } from '../../src/operators/index.js' +import { orderByWithFractionalIndexBTree } from '../../src/operators/orderByBTree.js' import { KeyValue } from '../../src/types.js' import { loadBTree } from '../../src/operators/topKWithFractionalIndexBTree.js' From 5fc79d9f7704a5f745fe75682723fb96e05385d3 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 8 Jul 2025 10:10:26 +0200 Subject: [PATCH 20/20] Improved changeset --- .changeset/chilly-icons-design.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/chilly-icons-design.md b/.changeset/chilly-icons-design.md index 1c02663..04bd2f8 100644 --- a/.changeset/chilly-icons-design.md +++ b/.changeset/chilly-icons-design.md @@ -2,4 +2,4 @@ '@electric-sql/d2mini': patch --- -Introduce topKWithFractionalIndexBTree and orderByWithFractionalIndexBTree operators. These variants use a B+ tree which should be efficient for big collections (logarithmic time). +Introduce topKWithFractionalIndexBTree and orderByWithFractionalIndexBTree operators. These variants use a B+ tree which is more efficient on big collections as its time complexity is logarithmic.