From e0d072ecb3067b10e4e0daa594e3997cdec57856 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Tue, 9 Dec 2025 14:21:34 +0000 Subject: [PATCH 01/15] feat: Add groupedTopKWithFractionalIndex operator This commit introduces a new operator that allows for topK operations to be performed independently on groups of data. Co-authored-by: kevin.de.porre --- .../groupedTopKWithFractionalIndex.ts | 282 +++++++++ packages/db-ivm/src/operators/index.ts | 1 + .../src/operators/topKWithFractionalIndex.ts | 77 +-- .../groupedTopKWithFractionalIndex.test.ts | 558 ++++++++++++++++++ 4 files changed, 880 insertions(+), 38 deletions(-) create mode 100644 packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts create mode 100644 packages/db-ivm/tests/operators/groupedTopKWithFractionalIndex.test.ts diff --git a/packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts b/packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts new file mode 100644 index 000000000..7ca607a64 --- /dev/null +++ b/packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts @@ -0,0 +1,282 @@ +import { DifferenceStreamWriter, UnaryOperator } from '../graph.js' +import { StreamBuilder } from '../d2.js' +import { MultiSet } from '../multiset.js' +import { + TopKArray, + createKeyedComparator, +} from './topKWithFractionalIndex.js' +import type { DifferenceStreamReader } from '../graph.js' +import type { IStreamBuilder, PipedOperator } from '../types.js' +import type { + IndexedValue, + TopK, + TopKChanges, + TopKMoveChanges, +} from './topKWithFractionalIndex.js' + +export interface GroupedTopKWithFractionalIndexOptions { + limit?: number + offset?: number + setSizeCallback?: (getSize: () => number) => void + setWindowFn?: ( + windowFn: (options: { offset?: number; limit?: number }) => void, + ) => void + /** + * Function to extract a group key from the element's key and value. + * Elements with the same group key will be sorted and limited together. + */ + groupKeyFn: (key: K, value: T) => unknown +} + +/** + * State for a single group in the grouped topK operator. + * Each group maintains its own multiplicity index and topK data structure. + */ +type GroupState = { + /** Maps element keys to their multiplicities within this group */ + multiplicities: Map + /** The topK data structure for this group */ + topK: TopK<[K, T]> +} + +/** + * Operator for grouped fractional indexed topK operations. + * This operator maintains separate topK windows for each group, + * allowing per-group limits and ordering. + * + * The input is a keyed stream [K, T] and outputs [K, IndexedValue]. + * Elements are grouped by the groupKeyFn, and each group maintains + * its own sorted collection with independent limit/offset. + */ +export class GroupedTopKWithFractionalIndexOperator< + K extends string | number, + T, +> extends UnaryOperator<[K, T], [K, IndexedValue]> { + #groupStates: Map> = new Map() + #groupKeyFn: (key: K, value: T) => unknown + #comparator: (a: [K, T], b: [K, T]) => number + #offset: number + #limit: number + + constructor( + id: number, + inputA: DifferenceStreamReader<[K, T]>, + output: DifferenceStreamWriter<[K, IndexedValue]>, + comparator: (a: T, b: T) => number, + options: GroupedTopKWithFractionalIndexOptions, + ) { + super(id, inputA, output) + this.#groupKeyFn = options.groupKeyFn + this.#limit = options.limit ?? Infinity + this.#offset = options.offset ?? 0 + this.#comparator = createKeyedComparator(comparator) + options.setSizeCallback?.(() => this.#getTotalSize()) + options.setWindowFn?.(this.#moveTopK.bind(this)) + } + + /** + * Creates a new TopK data structure for a group. + * Can be overridden in subclasses to use different implementations (e.g., B+ tree). + */ + protected createTopK( + offset: number, + limit: number, + comparator: (a: [K, T], b: [K, T]) => number, + ): TopK<[K, T]> { + return new TopKArray(offset, limit, comparator) + } + + #getTotalSize(): number { + let size = 0 + for (const state of this.#groupStates.values()) { + size += state.topK.size + } + return size + } + + #getOrCreateGroupState(groupKey: unknown): GroupState { + let state = this.#groupStates.get(groupKey) + if (!state) { + state = { + multiplicities: new Map(), + topK: this.createTopK(this.#offset, this.#limit, this.#comparator), + } + this.#groupStates.set(groupKey, state) + } + return state + } + + #updateMultiplicity( + state: GroupState, + key: K, + multiplicity: number, + ): { oldMultiplicity: number; newMultiplicity: number } { + if (multiplicity === 0) { + const current = state.multiplicities.get(key) ?? 0 + return { oldMultiplicity: current, newMultiplicity: current } + } + + const oldMultiplicity = state.multiplicities.get(key) ?? 0 + const newMultiplicity = oldMultiplicity + multiplicity + if (newMultiplicity === 0) { + state.multiplicities.delete(key) + } else { + state.multiplicities.set(key, newMultiplicity) + } + return { oldMultiplicity, newMultiplicity } + } + + #cleanupGroupIfEmpty(groupKey: unknown, state: GroupState): void { + if (state.multiplicities.size === 0 && state.topK.size === 0) { + this.#groupStates.delete(groupKey) + } + } + + /** + * Moves the topK window for all groups based on the provided offset and limit. + * Any changes to the topK are sent to the output. + */ + #moveTopK({ offset, limit }: { offset?: number; limit?: number }): void { + if (offset !== undefined) { + this.#offset = offset + } + if (limit !== undefined) { + this.#limit = limit + } + + const result: Array<[[K, IndexedValue], number]> = [] + let hasChanges = false + + for (const state of this.#groupStates.values()) { + if (!(state.topK instanceof TopKArray)) { + throw new Error( + `Cannot move B+-tree implementation of GroupedTopK with fractional index`, + ) + } + + const diff: TopKMoveChanges<[K, T]> = state.topK.move({ + offset: this.#offset, + limit: this.#limit, + }) + + diff.moveIns.forEach((moveIn) => this.#handleMoveIn(moveIn, result)) + diff.moveOuts.forEach((moveOut) => this.#handleMoveOut(moveOut, result)) + + if (diff.changes) { + hasChanges = true + } + } + + if (hasChanges) { + this.output.sendData(new MultiSet(result)) + } + } + + run(): void { + const result: Array<[[K, IndexedValue], number]> = [] + for (const message of this.inputMessages()) { + for (const [item, multiplicity] of message.getInner()) { + const [key, value] = item + this.#processElement(key, value, multiplicity, result) + } + } + + if (result.length > 0) { + this.output.sendData(new MultiSet(result)) + } + } + + #processElement( + key: K, + value: T, + multiplicity: number, + result: Array<[[K, IndexedValue], number]>, + ): void { + const groupKey = this.#groupKeyFn(key, value) + const state = this.#getOrCreateGroupState(groupKey) + + const { oldMultiplicity, newMultiplicity } = this.#updateMultiplicity( + state, + key, + multiplicity, + ) + + let res: TopKChanges<[K, T]> = { + moveIn: null, + moveOut: null, + } + if (oldMultiplicity <= 0 && newMultiplicity > 0) { + // The value was invisible but should now be visible + // Need to insert it into the array of sorted values + res = state.topK.insert([key, value]) + } else if (oldMultiplicity > 0 && newMultiplicity <= 0) { + // The value was visible but should now be invisible + // Need to remove it from the array of sorted values + res = state.topK.delete([key, value]) + } + // else: The value was invisible and remains invisible, + // or was visible and remains visible - no topK change + + this.#handleMoveIn(res.moveIn, result) + this.#handleMoveOut(res.moveOut, result) + + // Cleanup empty groups to prevent memory leaks + this.#cleanupGroupIfEmpty(groupKey, state) + } + + #handleMoveIn( + moveIn: IndexedValue<[K, T]> | null, + result: Array<[[K, IndexedValue], number]>, + ): void { + if (moveIn) { + const [[key, value], index] = moveIn + result.push([[key, [value, index]], 1]) + } + } + + #handleMoveOut( + moveOut: IndexedValue<[K, T]> | null, + result: Array<[[K, IndexedValue], number]>, + ): void { + if (moveOut) { + const [[key, value], index] = moveOut + result.push([[key, [value, index]], -1]) + } + } +} + +/** + * Limits the number of results per group based on a comparator, with optional offset. + * Uses fractional indexing to minimize the number of changes when elements move positions. + * Each element is assigned a fractional index that is lexicographically sortable. + * When elements move, only the indices of the moved elements are updated, not all elements. + * + * This operator groups elements by the provided groupKeyFn and applies the limit/offset + * independently to each group. + * + * @param comparator - A function that compares two elements for ordering + * @param options - Configuration including groupKeyFn, limit, and offset + * @returns A piped operator that orders elements per group and limits results per group + */ +export function groupedTopKWithFractionalIndex( + comparator: (a: T, b: T) => number, + options: GroupedTopKWithFractionalIndexOptions, +): PipedOperator<[K, T], [K, IndexedValue]> { + return ( + stream: IStreamBuilder<[K, T]>, + ): IStreamBuilder<[K, IndexedValue]> => { + const output = new StreamBuilder<[K, IndexedValue]>( + stream.graph, + new DifferenceStreamWriter<[K, IndexedValue]>(), + ) + const operator = new GroupedTopKWithFractionalIndexOperator( + stream.graph.getNextOperatorId(), + stream.connectReader(), + output.writer, + comparator, + options, + ) + stream.graph.addOperator(operator) + return output + } +} diff --git a/packages/db-ivm/src/operators/index.ts b/packages/db-ivm/src/operators/index.ts index 7cd72f1e8..3f8ab8cb8 100644 --- a/packages/db-ivm/src/operators/index.ts +++ b/packages/db-ivm/src/operators/index.ts @@ -14,6 +14,7 @@ export * from './distinct.js' export * from './keying.js' export * from './topK.js' export * from './topKWithFractionalIndex.js' +export * from './groupedTopKWithFractionalIndex.js' export * from './orderBy.js' export * from './filterBy.js' export { groupBy, groupByOperators } from './groupBy.js' diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts index 3609703e5..b36d1b377 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts @@ -42,13 +42,32 @@ export interface TopK { delete: (value: V) => TopKChanges } +// Abstraction for fractionally indexed values +export type FractionalIndex = string +export type IndexedValue = [V, FractionalIndex] + +export function indexedValue( + value: V, + index: FractionalIndex, +): IndexedValue { + return [value, index] +} + +export function getValue(indexedVal: IndexedValue): V { + return indexedVal[0] +} + +export function getIndex(indexedVal: IndexedValue): FractionalIndex { + return indexedVal[1] +} + /** * Implementation of a topK data structure. * Uses a sorted array internally to store the values and keeps a topK window over that array. * Inserts and deletes are O(n) operations because worst case an element is inserted/deleted * at the start of the array which causes all the elements to shift to the right/left. */ -class TopKArray implements TopK { +export class TopKArray implements TopK { #sortedValues: Array> = [] #comparator: (a: V, b: V) => number #topKStart: number @@ -230,6 +249,25 @@ class TopKArray implements TopK { } } +/** + * Creates a comparator for [key, value] tuples that first compares values, + * then uses the row key as a stable tie-breaker. + */ +export function createKeyedComparator( + comparator: (a: T, b: T) => number, +): (a: [K, T], b: [K, T]) => number { + return ([aKey, aVal], [bKey, bVal]) => { + // First compare on the value + const valueComparison = comparator(aVal, bVal) + if (valueComparison !== 0) { + return valueComparison + } + // If the values are equal, use the row key as tie-breaker + // This provides stable, deterministic ordering since keys are string | number + return compareKeys(aKey, bKey) + } +} + /** * Operator for fractional indexed topK operations * This operator maintains fractional indices for sorted elements @@ -421,40 +459,3 @@ export function topKWithFractionalIndex( } } -// Abstraction for fractionally indexed values -export type FractionalIndex = string -export type IndexedValue = [V, FractionalIndex] - -export function indexedValue( - value: V, - index: FractionalIndex, -): IndexedValue { - return [value, index] -} - -export function getValue(indexedVal: IndexedValue): V { - return indexedVal[0] -} - -export function getIndex(indexedVal: IndexedValue): FractionalIndex { - return indexedVal[1] -} - -/** - * Creates a comparator for [key, value] tuples that first compares values, - * then uses the row key as a stable tie-breaker. - */ -function createKeyedComparator( - comparator: (a: T, b: T) => number, -): (a: [K, T], b: [K, T]) => number { - return ([aKey, aVal], [bKey, bVal]) => { - // First compare on the value - const valueComparison = comparator(aVal, bVal) - if (valueComparison !== 0) { - return valueComparison - } - // If the values are equal, use the row key as tie-breaker - // This provides stable, deterministic ordering since keys are string | number - return compareKeys(aKey, bKey) - } -} diff --git a/packages/db-ivm/tests/operators/groupedTopKWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/groupedTopKWithFractionalIndex.test.ts new file mode 100644 index 000000000..a5a2848a8 --- /dev/null +++ b/packages/db-ivm/tests/operators/groupedTopKWithFractionalIndex.test.ts @@ -0,0 +1,558 @@ +import { describe, expect, it } from 'vitest' +import { D2 } from '../../src/d2.js' +import { MultiSet } from '../../src/multiset.js' +import { groupedTopKWithFractionalIndex } from '../../src/operators/groupedTopKWithFractionalIndex.js' +import { output } from '../../src/operators/index.js' +import { MessageTracker, compareFractionalIndex } from '../test-utils.js' + +describe(`Operators`, () => { + describe(`GroupedTopKWithFractionalIndex operator`, () => { + it(`should maintain separate topK per group`, () => { + const graph = new D2() + const input = graph.newInput< + [string, { id: string; group: string; value: number }] + >() + const tracker = new MessageTracker< + [string, [{ id: string; group: string; value: number }, string]] + >() + + input.pipe( + groupedTopKWithFractionalIndex((a, b) => a.value - b.value, { + limit: 2, + groupKeyFn: (_key, value) => value.group, + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + // Initial data - 3 items per group + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 5 }], 1], + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 1 }], 1], + [[`g1-c`, { id: `g1-c`, group: `group1`, value: 3 }], 1], + [[`g2-a`, { id: `g2-a`, group: `group2`, value: 4 }], 1], + [[`g2-b`, { id: `g2-b`, group: `group2`, value: 2 }], 1], + [[`g2-c`, { id: `g2-c`, group: `group2`, value: 6 }], 1], + ]), + ) + graph.run() + + const result = tracker.getResult(compareFractionalIndex) + + // Each group should have limit 2, so 4 total results + expect(result.sortedResults.length).toBe(4) + + // Group by group key and verify each group's results + const groupedValues = new Map>() + for (const [_key, [value, _index]] of result.sortedResults) { + const group = value.group + const list = groupedValues.get(group) ?? [] + list.push(value.value) + groupedValues.set(group, list) + } + + // Sort values within each group for consistent comparison + for (const [group, values] of groupedValues) { + values.sort((a, b) => a - b) + groupedValues.set(group, values) + } + + // group1 should have values 1, 3 (top 2 by ascending value) + expect(groupedValues.get(`group1`)).toEqual([1, 3]) + // group2 should have values 2, 4 (top 2 by ascending value) + expect(groupedValues.get(`group2`)).toEqual([2, 4]) + }) + + it(`should handle incremental updates within a group`, () => { + const graph = new D2() + const input = graph.newInput< + [string, { id: string; group: string; value: number }] + >() + const tracker = new MessageTracker< + [string, [{ id: string; group: string; value: number }, string]] + >() + + input.pipe( + groupedTopKWithFractionalIndex((a, b) => a.value - b.value, { + limit: 2, + groupKeyFn: (_key, value) => value.group, + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + // Initial data + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 5 }], 1], + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 1 }], 1], + [[`g1-c`, { id: `g1-c`, group: `group1`, value: 3 }], 1], + ]), + ) + graph.run() + + // Initial should have 2 items (limit 2): values 1 and 3 + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(2) + const initialValues = initialResult.sortedResults + .map(([_key, [value, _index]]) => value.value) + .sort((a, b) => a - b) + expect(initialValues).toEqual([1, 3]) + + const initialMessageCount = initialResult.messageCount + + // Insert a better value (0) which should evict value 3 + input.sendData( + new MultiSet([ + [[`g1-d`, { id: `g1-d`, group: `group1`, value: 0 }], 1], + ]), + ) + graph.run() + + const updateResult = tracker.getResult(compareFractionalIndex) + // Should have 2 new messages: add 0, remove 3 + expect(updateResult.messageCount - initialMessageCount).toBe(2) + + // Check final state (cumulative) + const finalValues = updateResult.sortedResults + .map(([_key, [value, _index]]) => value.value) + .sort((a, b) => a - b) + expect(finalValues).toEqual([0, 1]) + }) + + it(`should handle removal of elements from topK`, () => { + const graph = new D2() + const input = graph.newInput< + [string, { id: string; group: string; value: number }] + >() + const tracker = new MessageTracker< + [string, [{ id: string; group: string; value: number }, string]] + >() + + input.pipe( + groupedTopKWithFractionalIndex((a, b) => a.value - b.value, { + limit: 2, + groupKeyFn: (_key, value) => value.group, + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + // Initial data + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 5 }], 1], + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 1 }], 1], + [[`g1-c`, { id: `g1-c`, group: `group1`, value: 3 }], 1], + ]), + ) + graph.run() + + const initialMessageCount = tracker.getResult().messageCount + + // Remove the element with value 1 (which is in topK) + input.sendData( + new MultiSet([ + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 1 }], -1], + ]), + ) + graph.run() + + const updateResult = tracker.getResult(compareFractionalIndex) + // Should have 2 new messages: remove 1, add 5 + expect(updateResult.messageCount - initialMessageCount).toBe(2) + + // Final state should have values 3 and 5 + const finalValues = updateResult.sortedResults + .map(([_key, [value, _index]]) => value.value) + .sort((a, b) => a - b) + expect(finalValues).toEqual([3, 5]) + }) + + it(`should handle multiple groups independently`, () => { + const graph = new D2() + const input = graph.newInput< + [string, { id: string; group: string; value: number }] + >() + const tracker = new MessageTracker< + [string, [{ id: string; group: string; value: number }, string]] + >() + + input.pipe( + groupedTopKWithFractionalIndex((a, b) => a.value - b.value, { + limit: 2, + groupKeyFn: (_key, value) => value.group, + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + // Initial data for two groups + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 10 }], 1], + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 20 }], 1], + [[`g2-a`, { id: `g2-a`, group: `group2`, value: 5 }], 1], + [[`g2-b`, { id: `g2-b`, group: `group2`, value: 15 }], 1], + ]), + ) + graph.run() + + tracker.reset() + + // Update only group1 - add a better value + input.sendData( + new MultiSet([ + [[`g1-c`, { id: `g1-c`, group: `group1`, value: 5 }], 1], + ]), + ) + graph.run() + + const updateResult = tracker.getResult() + // Only group1 should be affected + const affectedGroups = new Set( + updateResult.messages.map(([[_key, [value, _idx]], _mult]) => value.group), + ) + expect(affectedGroups.size).toBe(1) + expect(affectedGroups.has(`group1`)).toBe(true) + expect(affectedGroups.has(`group2`)).toBe(false) + }) + + it(`should support offset within groups`, () => { + const graph = new D2() + const input = graph.newInput< + [string, { id: string; group: string; value: number }] + >() + const tracker = new MessageTracker< + [string, [{ id: string; group: string; value: number }, string]] + >() + + input.pipe( + groupedTopKWithFractionalIndex((a, b) => a.value - b.value, { + limit: 2, + offset: 1, + groupKeyFn: (_key, value) => value.group, + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + // Initial data - 4 items per group + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 1 }], 1], + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 2 }], 1], + [[`g1-c`, { id: `g1-c`, group: `group1`, value: 3 }], 1], + [[`g1-d`, { id: `g1-d`, group: `group1`, value: 4 }], 1], + ]), + ) + graph.run() + + const result = tracker.getResult(compareFractionalIndex) + + // With offset 1 and limit 2, should get values 2 and 3 + const values = result.sortedResults + .map(([_key, [value, _index]]) => value.value) + .sort((a, b) => a - b) + expect(values).toEqual([2, 3]) + }) + + it(`should use groupKeyFn to extract group from key with delimiter`, () => { + const graph = new D2() + // Use keys with format "group:itemId" + const input = graph.newInput< + [string, { id: string; value: number }] + >() + const tracker = new MessageTracker< + [string, [{ id: string; value: number }, string]] + >() + + input.pipe( + groupedTopKWithFractionalIndex((a, b) => a.value - b.value, { + limit: 2, + // Extract group from key "group:itemId" + groupKeyFn: (key, _value) => key.split(`:`)[0], + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + input.sendData( + new MultiSet([ + [[`group1:a`, { id: `g1-a`, value: 5 }], 1], + [[`group1:b`, { id: `g1-b`, value: 1 }], 1], + [[`group1:c`, { id: `g1-c`, value: 3 }], 1], + [[`group2:a`, { id: `g2-a`, value: 4 }], 1], + [[`group2:b`, { id: `g2-b`, value: 2 }], 1], + [[`group2:c`, { id: `g2-c`, value: 6 }], 1], + ]), + ) + graph.run() + + const result = tracker.getResult(compareFractionalIndex) + + // Group results by group extracted from key + const groupedValues = new Map>() + for (const [key, [value, _index]] of result.sortedResults) { + const group = key.split(`:`)[0]! + const list = groupedValues.get(group) ?? [] + list.push(value.value) + groupedValues.set(group, list) + } + + for (const [group, values] of groupedValues) { + values.sort((a, b) => a - b) + groupedValues.set(group, values) + } + + expect(groupedValues.get(`group1`)).toEqual([1, 3]) + expect(groupedValues.get(`group2`)).toEqual([2, 4]) + }) + + it(`should support infinite limit (no limit)`, () => { + const graph = new D2() + const input = graph.newInput< + [string, { id: string; group: string; value: number }] + >() + const tracker = new MessageTracker< + [string, [{ id: string; group: string; value: number }, string]] + >() + + input.pipe( + groupedTopKWithFractionalIndex((a, b) => a.value - b.value, { + // No limit specified - defaults to Infinity + groupKeyFn: (_key, value) => value.group, + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 5 }], 1], + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 1 }], 1], + [[`g1-c`, { id: `g1-c`, group: `group1`, value: 3 }], 1], + ]), + ) + graph.run() + + const result = tracker.getResult(compareFractionalIndex) + + // All 3 items should be in the result + expect(result.sortedResults.length).toBe(3) + const values = result.sortedResults + .map(([_key, [value, _index]]) => value.value) + .sort((a, b) => a - b) + expect(values).toEqual([1, 3, 5]) + }) + + it(`should maintain fractional indices correctly within groups`, () => { + const graph = new D2() + const input = graph.newInput< + [string, { id: string; group: string; value: number }] + >() + const tracker = new MessageTracker< + [string, [{ id: string; group: string; value: number }, string]] + >() + + input.pipe( + groupedTopKWithFractionalIndex((a, b) => a.value - b.value, { + limit: 3, + groupKeyFn: (_key, value) => value.group, + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 1 }], 1], + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 3 }], 1], + [[`g1-c`, { id: `g1-c`, group: `group1`, value: 5 }], 1], + ]), + ) + graph.run() + + const result = tracker.getResult(compareFractionalIndex) + + // Check fractional indices are in correct order + const indexedItems = result.sortedResults.map(([key, [value, index]]) => ({ + key, + value: value.value, + index, + })) + + // Sort by value to verify fractional indices match sort order + indexedItems.sort((a, b) => a.value - b.value) + + // Verify indices are in lexicographic order + for (let i = 0; i < indexedItems.length - 1; i++) { + expect(indexedItems[i]!.index < indexedItems[i + 1]!.index).toBe(true) + } + }) + + it(`should handle setSizeCallback correctly`, () => { + const graph = new D2() + const input = graph.newInput< + [string, { id: string; group: string; value: number }] + >() + let getSize: (() => number) | undefined + + input.pipe( + groupedTopKWithFractionalIndex((a, b) => a.value - b.value, { + limit: 2, + groupKeyFn: (_key, value) => value.group, + setSizeCallback: (fn) => { + getSize = fn + }, + }), + output(() => {}), + ) + + graph.finalize() + + expect(getSize).toBeDefined() + expect(getSize!()).toBe(0) // Initially empty + + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 5 }], 1], + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 1 }], 1], + [[`g1-c`, { id: `g1-c`, group: `group1`, value: 3 }], 1], + [[`g2-a`, { id: `g2-a`, group: `group2`, value: 4 }], 1], + [[`g2-b`, { id: `g2-b`, group: `group2`, value: 2 }], 1], + ]), + ) + graph.run() + + // group1 has 2 items in topK, group2 has 2 items + expect(getSize!()).toBe(4) + }) + + it(`should handle moving window with setWindowFn`, () => { + const graph = new D2() + const input = graph.newInput< + [string, { id: string; group: string; value: number }] + >() + const tracker = new MessageTracker< + [string, [{ id: string; group: string; value: number }, string]] + >() + let windowFn: ((options: { offset?: number; limit?: number }) => void) | undefined + + input.pipe( + groupedTopKWithFractionalIndex((a, b) => a.value - b.value, { + limit: 2, + offset: 0, + groupKeyFn: (_key, value) => value.group, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 1 }], 1], + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 2 }], 1], + [[`g1-c`, { id: `g1-c`, group: `group1`, value: 3 }], 1], + [[`g1-d`, { id: `g1-d`, group: `group1`, value: 4 }], 1], + ]), + ) + graph.run() + + // Initial: values 1, 2 + const initialResult = tracker.getResult(compareFractionalIndex) + const initialValues = initialResult.sortedResults + .map(([_key, [value, _index]]) => value.value) + .sort((a, b) => a - b) + expect(initialValues).toEqual([1, 2]) + + // Move window to offset 1 + windowFn!({ offset: 1 }) + graph.run() + + // Now should have values 2, 3 + const movedResult = tracker.getResult(compareFractionalIndex) + const movedValues = movedResult.sortedResults + .map(([_key, [value, _index]]) => value.value) + .sort((a, b) => a - b) + expect(movedValues).toEqual([2, 3]) + }) + + it(`should cleanup empty groups`, () => { + const graph = new D2() + const input = graph.newInput< + [string, { id: string; group: string; value: number }] + >() + const tracker = new MessageTracker< + [string, [{ id: string; group: string; value: number }, string]] + >() + + input.pipe( + groupedTopKWithFractionalIndex((a, b) => a.value - b.value, { + limit: 2, + groupKeyFn: (_key, value) => value.group, + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + // Add items to two groups + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 1 }], 1], + [[`g2-a`, { id: `g2-a`, group: `group2`, value: 2 }], 1], + ]), + ) + graph.run() + + expect(tracker.getResult().sortedResults.length).toBe(2) + + // Remove all items from group1 + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 1 }], -1], + ]), + ) + graph.run() + + // Should have only group2 left in materialized results + const updateResult = tracker.getResult(compareFractionalIndex) + expect(updateResult.sortedResults.length).toBe(1) + expect(updateResult.sortedResults[0]![1][0].group).toBe(`group2`) + }) + }) +}) From 506cc8d5e14082fa6ce88cec7d6e24c9987e4cbc Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Tue, 9 Dec 2025 14:24:04 +0000 Subject: [PATCH 02/15] ci: apply automated fixes --- .../groupedTopKWithFractionalIndex.ts | 5 +- .../src/operators/topKWithFractionalIndex.ts | 1 - .../groupedTopKWithFractionalIndex.test.ts | 74 +++++++++---------- 3 files changed, 35 insertions(+), 45 deletions(-) diff --git a/packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts b/packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts index 7ca607a64..83fc4afb0 100644 --- a/packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts @@ -1,10 +1,7 @@ import { DifferenceStreamWriter, UnaryOperator } from '../graph.js' import { StreamBuilder } from '../d2.js' import { MultiSet } from '../multiset.js' -import { - TopKArray, - createKeyedComparator, -} from './topKWithFractionalIndex.js' +import { TopKArray, createKeyedComparator } from './topKWithFractionalIndex.js' import type { DifferenceStreamReader } from '../graph.js' import type { IStreamBuilder, PipedOperator } from '../types.js' import type { diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts index b36d1b377..b2c3bb299 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts @@ -458,4 +458,3 @@ export function topKWithFractionalIndex( return output } } - diff --git a/packages/db-ivm/tests/operators/groupedTopKWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/groupedTopKWithFractionalIndex.test.ts index a5a2848a8..867c3195c 100644 --- a/packages/db-ivm/tests/operators/groupedTopKWithFractionalIndex.test.ts +++ b/packages/db-ivm/tests/operators/groupedTopKWithFractionalIndex.test.ts @@ -9,9 +9,8 @@ describe(`Operators`, () => { describe(`GroupedTopKWithFractionalIndex operator`, () => { it(`should maintain separate topK per group`, () => { const graph = new D2() - const input = graph.newInput< - [string, { id: string; group: string; value: number }] - >() + const input = + graph.newInput<[string, { id: string; group: string; value: number }]>() const tracker = new MessageTracker< [string, [{ id: string; group: string; value: number }, string]] >() @@ -69,9 +68,8 @@ describe(`Operators`, () => { it(`should handle incremental updates within a group`, () => { const graph = new D2() - const input = graph.newInput< - [string, { id: string; group: string; value: number }] - >() + const input = + graph.newInput<[string, { id: string; group: string; value: number }]>() const tracker = new MessageTracker< [string, [{ id: string; group: string; value: number }, string]] >() @@ -129,9 +127,8 @@ describe(`Operators`, () => { it(`should handle removal of elements from topK`, () => { const graph = new D2() - const input = graph.newInput< - [string, { id: string; group: string; value: number }] - >() + const input = + graph.newInput<[string, { id: string; group: string; value: number }]>() const tracker = new MessageTracker< [string, [{ id: string; group: string; value: number }, string]] >() @@ -181,9 +178,8 @@ describe(`Operators`, () => { it(`should handle multiple groups independently`, () => { const graph = new D2() - const input = graph.newInput< - [string, { id: string; group: string; value: number }] - >() + const input = + graph.newInput<[string, { id: string; group: string; value: number }]>() const tracker = new MessageTracker< [string, [{ id: string; group: string; value: number }, string]] >() @@ -224,7 +220,9 @@ describe(`Operators`, () => { const updateResult = tracker.getResult() // Only group1 should be affected const affectedGroups = new Set( - updateResult.messages.map(([[_key, [value, _idx]], _mult]) => value.group), + updateResult.messages.map( + ([[_key, [value, _idx]], _mult]) => value.group, + ), ) expect(affectedGroups.size).toBe(1) expect(affectedGroups.has(`group1`)).toBe(true) @@ -233,9 +231,8 @@ describe(`Operators`, () => { it(`should support offset within groups`, () => { const graph = new D2() - const input = graph.newInput< - [string, { id: string; group: string; value: number }] - >() + const input = + graph.newInput<[string, { id: string; group: string; value: number }]>() const tracker = new MessageTracker< [string, [{ id: string; group: string; value: number }, string]] >() @@ -276,9 +273,7 @@ describe(`Operators`, () => { it(`should use groupKeyFn to extract group from key with delimiter`, () => { const graph = new D2() // Use keys with format "group:itemId" - const input = graph.newInput< - [string, { id: string; value: number }] - >() + const input = graph.newInput<[string, { id: string; value: number }]>() const tracker = new MessageTracker< [string, [{ id: string; value: number }, string]] >() @@ -330,9 +325,8 @@ describe(`Operators`, () => { it(`should support infinite limit (no limit)`, () => { const graph = new D2() - const input = graph.newInput< - [string, { id: string; group: string; value: number }] - >() + const input = + graph.newInput<[string, { id: string; group: string; value: number }]>() const tracker = new MessageTracker< [string, [{ id: string; group: string; value: number }, string]] >() @@ -370,9 +364,8 @@ describe(`Operators`, () => { it(`should maintain fractional indices correctly within groups`, () => { const graph = new D2() - const input = graph.newInput< - [string, { id: string; group: string; value: number }] - >() + const input = + graph.newInput<[string, { id: string; group: string; value: number }]>() const tracker = new MessageTracker< [string, [{ id: string; group: string; value: number }, string]] >() @@ -401,11 +394,13 @@ describe(`Operators`, () => { const result = tracker.getResult(compareFractionalIndex) // Check fractional indices are in correct order - const indexedItems = result.sortedResults.map(([key, [value, index]]) => ({ - key, - value: value.value, - index, - })) + const indexedItems = result.sortedResults.map( + ([key, [value, index]]) => ({ + key, + value: value.value, + index, + }), + ) // Sort by value to verify fractional indices match sort order indexedItems.sort((a, b) => a.value - b.value) @@ -418,9 +413,8 @@ describe(`Operators`, () => { it(`should handle setSizeCallback correctly`, () => { const graph = new D2() - const input = graph.newInput< - [string, { id: string; group: string; value: number }] - >() + const input = + graph.newInput<[string, { id: string; group: string; value: number }]>() let getSize: (() => number) | undefined input.pipe( @@ -456,13 +450,14 @@ describe(`Operators`, () => { it(`should handle moving window with setWindowFn`, () => { const graph = new D2() - const input = graph.newInput< - [string, { id: string; group: string; value: number }] - >() + const input = + graph.newInput<[string, { id: string; group: string; value: number }]>() const tracker = new MessageTracker< [string, [{ id: string; group: string; value: number }, string]] >() - let windowFn: ((options: { offset?: number; limit?: number }) => void) | undefined + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | undefined input.pipe( groupedTopKWithFractionalIndex((a, b) => a.value - b.value, { @@ -511,9 +506,8 @@ describe(`Operators`, () => { it(`should cleanup empty groups`, () => { const graph = new D2() - const input = graph.newInput< - [string, { id: string; group: string; value: number }] - >() + const input = + graph.newInput<[string, { id: string; group: string; value: number }]>() const tracker = new MessageTracker< [string, [{ id: string; group: string; value: number }, string]] >() From eff90a7abc710241727dfa67744419cb6523d91a Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Tue, 9 Dec 2025 14:44:15 +0000 Subject: [PATCH 03/15] refactor: Extract TopKState helper class for shared topK logic Introduce TopKState to encapsulate the common state and operations for managing a single topK window (multiplicity tracking and topK data structure). Both TopKWithFractionalIndexOperator (single instance) and GroupedTopKWithFractionalIndexOperator (one instance per group) now use this helper class, eliminating code duplication. Also extract handleMoveIn and handleMoveOut as standalone functions that can be reused by both operators. --- .../groupedTopKWithFractionalIndex.ts | 140 ++++--------- .../src/operators/topKWithFractionalIndex.ts | 192 ++++++++++-------- 2 files changed, 151 insertions(+), 181 deletions(-) diff --git a/packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts b/packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts index 83fc4afb0..c78695d62 100644 --- a/packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts @@ -1,15 +1,14 @@ import { DifferenceStreamWriter, UnaryOperator } from '../graph.js' import { StreamBuilder } from '../d2.js' import { MultiSet } from '../multiset.js' -import { TopKArray, createKeyedComparator } from './topKWithFractionalIndex.js' +import { + TopKArray, + TopKState, + createKeyedComparator, +} from './topKWithFractionalIndex.js' import type { DifferenceStreamReader } from '../graph.js' import type { IStreamBuilder, PipedOperator } from '../types.js' -import type { - IndexedValue, - TopK, - TopKChanges, - TopKMoveChanges, -} from './topKWithFractionalIndex.js' +import type { IndexedValue, TopK } from './topKWithFractionalIndex.js' export interface GroupedTopKWithFractionalIndexOptions { limit?: number @@ -25,17 +24,6 @@ export interface GroupedTopKWithFractionalIndexOptions { groupKeyFn: (key: K, value: T) => unknown } -/** - * State for a single group in the grouped topK operator. - * Each group maintains its own multiplicity index and topK data structure. - */ -type GroupState = { - /** Maps element keys to their multiplicities within this group */ - multiplicities: Map - /** The topK data structure for this group */ - topK: TopK<[K, T]> -} - /** * Operator for grouped fractional indexed topK operations. * This operator maintains separate topK windows for each group, @@ -49,7 +37,7 @@ export class GroupedTopKWithFractionalIndexOperator< K extends string | number, T, > extends UnaryOperator<[K, T], [K, IndexedValue]> { - #groupStates: Map> = new Map() + #groupStates: Map> = new Map() #groupKeyFn: (key: K, value: T) => unknown #comparator: (a: [K, T], b: [K, T]) => number #offset: number @@ -86,45 +74,23 @@ export class GroupedTopKWithFractionalIndexOperator< #getTotalSize(): number { let size = 0 for (const state of this.#groupStates.values()) { - size += state.topK.size + size += state.size } return size } - #getOrCreateGroupState(groupKey: unknown): GroupState { + #getOrCreateGroupState(groupKey: unknown): TopKState { let state = this.#groupStates.get(groupKey) if (!state) { - state = { - multiplicities: new Map(), - topK: this.createTopK(this.#offset, this.#limit, this.#comparator), - } + const topK = this.createTopK(this.#offset, this.#limit, this.#comparator) + state = new TopKState(topK) this.#groupStates.set(groupKey, state) } return state } - #updateMultiplicity( - state: GroupState, - key: K, - multiplicity: number, - ): { oldMultiplicity: number; newMultiplicity: number } { - if (multiplicity === 0) { - const current = state.multiplicities.get(key) ?? 0 - return { oldMultiplicity: current, newMultiplicity: current } - } - - const oldMultiplicity = state.multiplicities.get(key) ?? 0 - const newMultiplicity = oldMultiplicity + multiplicity - if (newMultiplicity === 0) { - state.multiplicities.delete(key) - } else { - state.multiplicities.set(key, newMultiplicity) - } - return { oldMultiplicity, newMultiplicity } - } - - #cleanupGroupIfEmpty(groupKey: unknown, state: GroupState): void { - if (state.multiplicities.size === 0 && state.topK.size === 0) { + #cleanupGroupIfEmpty(groupKey: unknown, state: TopKState): void { + if (state.isEmpty) { this.#groupStates.delete(groupKey) } } @@ -145,19 +111,10 @@ export class GroupedTopKWithFractionalIndexOperator< let hasChanges = false for (const state of this.#groupStates.values()) { - if (!(state.topK instanceof TopKArray)) { - throw new Error( - `Cannot move B+-tree implementation of GroupedTopK with fractional index`, - ) - } + const diff = state.move({ offset: this.#offset, limit: this.#limit }) - const diff: TopKMoveChanges<[K, T]> = state.topK.move({ - offset: this.#offset, - limit: this.#limit, - }) - - diff.moveIns.forEach((moveIn) => this.#handleMoveIn(moveIn, result)) - diff.moveOuts.forEach((moveOut) => this.#handleMoveOut(moveOut, result)) + diff.moveIns.forEach((moveIn) => handleMoveIn(moveIn, result)) + diff.moveOuts.forEach((moveOut) => handleMoveOut(moveOut, result)) if (diff.changes) { hasChanges = true @@ -192,53 +149,38 @@ export class GroupedTopKWithFractionalIndexOperator< const groupKey = this.#groupKeyFn(key, value) const state = this.#getOrCreateGroupState(groupKey) - const { oldMultiplicity, newMultiplicity } = this.#updateMultiplicity( - state, - key, - multiplicity, - ) - - let res: TopKChanges<[K, T]> = { - moveIn: null, - moveOut: null, - } - if (oldMultiplicity <= 0 && newMultiplicity > 0) { - // The value was invisible but should now be visible - // Need to insert it into the array of sorted values - res = state.topK.insert([key, value]) - } else if (oldMultiplicity > 0 && newMultiplicity <= 0) { - // The value was visible but should now be invisible - // Need to remove it from the array of sorted values - res = state.topK.delete([key, value]) - } - // else: The value was invisible and remains invisible, - // or was visible and remains visible - no topK change - - this.#handleMoveIn(res.moveIn, result) - this.#handleMoveOut(res.moveOut, result) + const changes = state.processElement(key, value, multiplicity) + handleMoveIn(changes.moveIn, result) + handleMoveOut(changes.moveOut, result) // Cleanup empty groups to prevent memory leaks this.#cleanupGroupIfEmpty(groupKey, state) } +} - #handleMoveIn( - moveIn: IndexedValue<[K, T]> | null, - result: Array<[[K, IndexedValue], number]>, - ): void { - if (moveIn) { - const [[key, value], index] = moveIn - result.push([[key, [value, index]], 1]) - } +/** + * Handles a moveIn change by adding it to the result array. + */ +function handleMoveIn( + moveIn: IndexedValue<[K, T]> | null, + result: Array<[[K, IndexedValue], number]>, +): void { + if (moveIn) { + const [[key, value], index] = moveIn + result.push([[key, [value, index]], 1]) } +} - #handleMoveOut( - moveOut: IndexedValue<[K, T]> | null, - result: Array<[[K, IndexedValue], number]>, - ): void { - if (moveOut) { - const [[key, value], index] = moveOut - result.push([[key, [value, index]], -1]) - } +/** + * Handles a moveOut change by adding it to the result array. + */ +function handleMoveOut( + moveOut: IndexedValue<[K, T]> | null, + result: Array<[[K, IndexedValue], number]>, +): void { + if (moveOut) { + const [[key, value], index] = moveOut + result.push([[key, [value, index]], -1]) } } diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts index b2c3bb299..de39645e4 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts @@ -42,6 +42,84 @@ export interface TopK { delete: (value: V) => TopKChanges } +/** + * Helper class that manages the state for a single topK window. + * Encapsulates the multiplicity tracking and topK data structure, + * providing a clean interface for processing elements and moving the window. + * + * This class is used by both TopKWithFractionalIndexOperator (single instance) + * and GroupedTopKWithFractionalIndexOperator (one instance per group). + */ +export class TopKState { + #multiplicities: Map = new Map() + #topK: TopK<[K, T]> + + constructor(topK: TopK<[K, T]>) { + this.#topK = topK + } + + get size(): number { + return this.#topK.size + } + + get isEmpty(): boolean { + return this.#multiplicities.size === 0 && this.#topK.size === 0 + } + + /** + * Process an element update (insert or delete based on multiplicity change). + * Returns the changes to the topK window. + */ + processElement(key: K, value: T, multiplicity: number): TopKChanges<[K, T]> { + const { oldMultiplicity, newMultiplicity } = this.#updateMultiplicity( + key, + multiplicity, + ) + + if (oldMultiplicity <= 0 && newMultiplicity > 0) { + // The value was invisible but should now be visible + return this.#topK.insert([key, value]) + } else if (oldMultiplicity > 0 && newMultiplicity <= 0) { + // The value was visible but should now be invisible + return this.#topK.delete([key, value]) + } + // The value was invisible and remains invisible, + // or was visible and remains visible - no topK change + return { moveIn: null, moveOut: null } + } + + /** + * Move the topK window. Only works with TopKArray implementation. + */ + move(options: { offset?: number; limit?: number }): TopKMoveChanges<[K, T]> { + if (!(this.#topK instanceof TopKArray)) { + throw new Error( + `Cannot move B+-tree implementation of TopK with fractional index`, + ) + } + return this.#topK.move(options) + } + + #updateMultiplicity( + key: K, + multiplicity: number, + ): { oldMultiplicity: number; newMultiplicity: number } { + if (multiplicity === 0) { + const current = this.#multiplicities.get(key) ?? 0 + return { oldMultiplicity: current, newMultiplicity: current } + } + + const oldMultiplicity = this.#multiplicities.get(key) ?? 0 + const newMultiplicity = oldMultiplicity + multiplicity + if (newMultiplicity === 0) { + this.#multiplicities.delete(key) + } else { + this.#multiplicities.set(key, newMultiplicity) + } + return { oldMultiplicity, newMultiplicity } + } +} + // Abstraction for fractionally indexed values export type FractionalIndex = string export type IndexedValue = [V, FractionalIndex] @@ -277,14 +355,7 @@ export class TopKWithFractionalIndexOperator< K extends string | number, T, > extends UnaryOperator<[K, T], [K, IndexedValue]> { - #index: Map = new Map() // maps keys to their multiplicity - - /** - * topK data structure that supports insertions and deletions - * and returns changes to the topK. - * Elements are stored as [key, value] tuples for stable tie-breaking. - */ - #topK: TopK<[K, T]> + #state: TopKState constructor( id: number, @@ -296,12 +367,9 @@ export class TopKWithFractionalIndexOperator< super(id, inputA, output) const limit = options.limit ?? Infinity const offset = options.offset ?? 0 - this.#topK = this.createTopK( - offset, - limit, - createKeyedComparator(comparator), - ) - options.setSizeCallback?.(() => this.#topK.size) + const topK = this.createTopK(offset, limit, createKeyedComparator(comparator)) + this.#state = new TopKState(topK) + options.setSizeCallback?.(() => this.#state.size) options.setWindowFn?.(this.moveTopK.bind(this)) } @@ -318,18 +386,11 @@ export class TopKWithFractionalIndexOperator< * Any changes to the topK are sent to the output. */ moveTopK({ offset, limit }: { offset?: number; limit?: number }) { - if (!(this.#topK instanceof TopKArray)) { - throw new Error( - `Cannot move B+-tree implementation of TopK with fractional index`, - ) - } - const result: Array<[[K, IndexedValue], number]> = [] + const diff = this.#state.move({ offset, limit }) - const diff = this.#topK.move({ offset, limit }) - - diff.moveIns.forEach((moveIn) => this.handleMoveIn(moveIn, result)) - diff.moveOuts.forEach((moveOut) => this.handleMoveOut(moveOut, result)) + diff.moveIns.forEach((moveIn) => handleMoveIn(moveIn, result)) + diff.moveOuts.forEach((moveOut) => handleMoveOut(moveOut, result)) if (diff.changes) { // There are changes to the topK @@ -359,68 +420,35 @@ export class TopKWithFractionalIndexOperator< multiplicity: number, result: Array<[[K, IndexedValue], number]>, ): void { - const { oldMultiplicity, newMultiplicity } = this.addKey(key, multiplicity) - - let res: TopKChanges<[K, T]> = { - moveIn: null, - moveOut: null, - } - if (oldMultiplicity <= 0 && newMultiplicity > 0) { - // The value was invisible but should now be visible - // Need to insert it into the array of sorted values - res = this.#topK.insert([key, value]) - } else if (oldMultiplicity > 0 && newMultiplicity <= 0) { - // The value was visible but should now be invisible - // Need to remove it from the array of sorted values - res = this.#topK.delete([key, value]) - } else { - // The value was invisible and it remains invisible - // or it was visible and remains visible - // so it doesn't affect the topK - } - - this.handleMoveIn(res.moveIn, result) - this.handleMoveOut(res.moveOut, result) - - return - } - - private handleMoveIn( - moveIn: IndexedValue<[K, T]> | null, - result: Array<[[K, IndexedValue], number]>, - ) { - if (moveIn) { - const [[key, value], index] = moveIn - result.push([[key, [value, index]], 1]) - } - } - - private handleMoveOut( - moveOut: IndexedValue<[K, T]> | null, - result: Array<[[K, IndexedValue], number]>, - ) { - if (moveOut) { - const [[key, value], index] = moveOut - result.push([[key, [value, index]], -1]) - } + const changes = this.#state.processElement(key, value, multiplicity) + handleMoveIn(changes.moveIn, result) + handleMoveOut(changes.moveOut, result) } +} - private getMultiplicity(key: K): number { - return this.#index.get(key) ?? 0 +/** + * Handles a moveIn change by adding it to the result array. + */ +function handleMoveIn( + moveIn: IndexedValue<[K, T]> | null, + result: Array<[[K, IndexedValue], number]>, +): void { + if (moveIn) { + const [[key, value], index] = moveIn + result.push([[key, [value, index]], 1]) } +} - private addKey( - key: K, - multiplicity: number, - ): { oldMultiplicity: number; newMultiplicity: number } { - const oldMultiplicity = this.getMultiplicity(key) - const newMultiplicity = oldMultiplicity + multiplicity - if (newMultiplicity === 0) { - this.#index.delete(key) - } else { - this.#index.set(key, newMultiplicity) - } - return { oldMultiplicity, newMultiplicity } +/** + * Handles a moveOut change by adding it to the result array. + */ +function handleMoveOut( + moveOut: IndexedValue<[K, T]> | null, + result: Array<[[K, IndexedValue], number]>, +): void { + if (moveOut) { + const [[key, value], index] = moveOut + result.push([[key, [value, index]], -1]) } } From 6193beb2f9ba0a7d3671c3fa82791b9cae68f3b8 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Tue, 9 Dec 2025 14:46:39 +0000 Subject: [PATCH 04/15] ci: apply automated fixes --- packages/db-ivm/src/operators/topKWithFractionalIndex.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts index de39645e4..8ca008f94 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts @@ -367,7 +367,11 @@ export class TopKWithFractionalIndexOperator< super(id, inputA, output) const limit = options.limit ?? Infinity const offset = options.offset ?? 0 - const topK = this.createTopK(offset, limit, createKeyedComparator(comparator)) + const topK = this.createTopK( + offset, + limit, + createKeyedComparator(comparator), + ) this.#state = new TopKState(topK) options.setSizeCallback?.(() => this.#state.size) options.setWindowFn?.(this.moveTopK.bind(this)) From 88dbcf6ecb670a9315a3ec72ff12c2b13b2afcf3 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 10 Dec 2025 10:19:34 +0100 Subject: [PATCH 05/15] Refactoring of the topK operator and grouped topK operator --- .../groupedTopKWithFractionalIndex.ts | 37 +- packages/db-ivm/src/operators/topKArray.ts | 255 ++++++++++++ packages/db-ivm/src/operators/topKState.ts | 106 +++++ .../src/operators/topKWithFractionalIndex.ts | 362 +----------------- .../operators/topKWithFractionalIndexBTree.ts | 8 +- 5 files changed, 370 insertions(+), 398 deletions(-) create mode 100644 packages/db-ivm/src/operators/topKArray.ts create mode 100644 packages/db-ivm/src/operators/topKState.ts diff --git a/packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts b/packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts index c78695d62..6668bd729 100644 --- a/packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts @@ -1,14 +1,11 @@ import { DifferenceStreamWriter, UnaryOperator } from '../graph.js' import { StreamBuilder } from '../d2.js' import { MultiSet } from '../multiset.js' -import { - TopKArray, - TopKState, - createKeyedComparator, -} from './topKWithFractionalIndex.js' +import { TopKState, handleMoveIn, handleMoveOut } from './topKState.js' +import { TopKArray, createKeyedComparator } from './topKArray.js' +import type { IndexedValue, TopK} from './topKArray.js'; import type { DifferenceStreamReader } from '../graph.js' import type { IStreamBuilder, PipedOperator } from '../types.js' -import type { IndexedValue, TopK } from './topKWithFractionalIndex.js' export interface GroupedTopKWithFractionalIndexOptions { limit?: number @@ -111,7 +108,7 @@ export class GroupedTopKWithFractionalIndexOperator< let hasChanges = false for (const state of this.#groupStates.values()) { - const diff = state.move({ offset: this.#offset, limit: this.#limit }) + const diff = state.move({ offset: this.#offset, limit: this.#limit }) // TODO: think we should just pass offset and limit diff.moveIns.forEach((moveIn) => handleMoveIn(moveIn, result)) diff.moveOuts.forEach((moveOut) => handleMoveOut(moveOut, result)) @@ -158,32 +155,6 @@ export class GroupedTopKWithFractionalIndexOperator< } } -/** - * Handles a moveIn change by adding it to the result array. - */ -function handleMoveIn( - moveIn: IndexedValue<[K, T]> | null, - result: Array<[[K, IndexedValue], number]>, -): void { - if (moveIn) { - const [[key, value], index] = moveIn - result.push([[key, [value, index]], 1]) - } -} - -/** - * Handles a moveOut change by adding it to the result array. - */ -function handleMoveOut( - moveOut: IndexedValue<[K, T]> | null, - result: Array<[[K, IndexedValue], number]>, -): void { - if (moveOut) { - const [[key, value], index] = moveOut - result.push([[key, [value, index]], -1]) - } -} - /** * Limits the number of results per group based on a comparator, with optional offset. * Uses fractional indexing to minimize the number of changes when elements move positions. diff --git a/packages/db-ivm/src/operators/topKArray.ts b/packages/db-ivm/src/operators/topKArray.ts new file mode 100644 index 000000000..e78bd5bce --- /dev/null +++ b/packages/db-ivm/src/operators/topKArray.ts @@ -0,0 +1,255 @@ +import { generateKeyBetween } from 'fractional-indexing' +import { binarySearch, compareKeys, diffHalfOpen } from '../utils.js' +import type { HRange } from '../utils.js' + +// Abstraction for fractionally indexed values +export type FractionalIndex = string +export type IndexedValue = [V, FractionalIndex] + +export function indexedValue( + value: V, + index: FractionalIndex, +): IndexedValue { + return [value, index] +} + +export function getValue(indexedVal: IndexedValue): V { + return indexedVal[0] +} + +export function getIndex(indexedVal: IndexedValue): FractionalIndex { + return indexedVal[1] +} + +/** + * Creates a comparator for [key, value] tuples that first compares values, + * then uses the row key as a stable tie-breaker. + */ +export function createKeyedComparator( + comparator: (a: T, b: T) => number, +): (a: [K, T], b: [K, T]) => number { + return ([aKey, aVal], [bKey, bVal]) => { + // First compare on the value + const valueComparison = comparator(aVal, bVal) + if (valueComparison !== 0) { + return valueComparison + } + // If the values are equal, use the row key as tie-breaker + // This provides stable, deterministic ordering since keys are string | number + return compareKeys(aKey, bKey) + } +} + +export type TopKChanges = { + /** Indicates which element moves into the topK (if any) */ + moveIn: IndexedValue | null + /** Indicates which element moves out of the topK (if any) */ + moveOut: IndexedValue | null +} + +export type TopKMoveChanges = { + /** Flag that marks whether there were any changes to the topK */ + changes: boolean + /** Indicates which elements move into the topK (if any) */ + moveIns: Array> + /** Indicates which elements move out of the topK (if any) */ + moveOuts: Array> +} + +/** + * A topK data structure that supports insertions and deletions + * and returns changes to the topK. + */ +export interface TopK { + size: number + insert: (value: V) => TopKChanges + delete: (value: V) => TopKChanges +} + +/** + * Implementation of a topK data structure. + * Uses a sorted array internally to store the values and keeps a topK window over that array. + * Inserts and deletes are O(n) operations because worst case an element is inserted/deleted + * at the start of the array which causes all the elements to shift to the right/left. + */ +export class TopKArray implements TopK { + #sortedValues: Array> = [] + #comparator: (a: V, b: V) => number + #topKStart: number + #topKEnd: number + + constructor( + offset: number, + limit: number, + comparator: (a: V, b: V) => number, + ) { + this.#topKStart = offset + this.#topKEnd = offset + limit + this.#comparator = comparator + } + + get size(): number { + const offset = this.#topKStart + const limit = this.#topKEnd - this.#topKStart + const available = this.#sortedValues.length - offset + return Math.max(0, Math.min(limit, available)) + } + + /** + * Moves the topK window + */ + move({ + offset, + limit, + }: { + offset?: number + limit?: number + }): TopKMoveChanges { + const oldOffset = this.#topKStart + const oldLimit = this.#topKEnd - this.#topKStart + + // `this.#topKEnd` can be `Infinity` if it has no limit + // but `diffHalfOpen` expects a finite range + // so we restrict it to the size of the topK if topKEnd is infinite + const oldRange: HRange = [ + this.#topKStart, + this.#topKEnd === Infinity ? this.#topKStart + this.size : this.#topKEnd, + ] + + this.#topKStart = offset ?? oldOffset + this.#topKEnd = this.#topKStart + (limit ?? oldLimit) // can be `Infinity` if limit is `Infinity` + + // Also handle `Infinity` in the newRange + const newRange: HRange = [ + this.#topKStart, + this.#topKEnd === Infinity + ? Math.max(this.#topKStart + this.size, oldRange[1]) // since the new limit is Infinity we need to take everything (so we need to take the biggest (finite) topKEnd) + : this.#topKEnd, + ] + const { onlyInA, onlyInB } = diffHalfOpen(oldRange, newRange) + + const moveIns: Array> = [] + onlyInB.forEach((index) => { + const value = this.#sortedValues[index] + if (value) { + moveIns.push(value) + } + }) + + const moveOuts: Array> = [] + onlyInA.forEach((index) => { + const value = this.#sortedValues[index] + if (value) { + moveOuts.push(value) + } + }) + + // It could be that there are changes (i.e. moveIns or moveOuts) + // but that the collection is lazy so we don't have the data yet that needs to move in/out + // so `moveIns` and `moveOuts` will be empty but `changes` will be true + // this will tell the caller that it needs to run the graph to load more data + return { moveIns, moveOuts, changes: onlyInA.length + onlyInB.length > 0 } + } + + insert(value: V): TopKChanges { + const result: TopKChanges = { moveIn: null, moveOut: null } + + // Lookup insert position + const index = this.#findIndex(value) + // Generate fractional index based on the fractional indices of the elements before and after it + const indexBefore = + index === 0 ? null : getIndex(this.#sortedValues[index - 1]!) + const indexAfter = + index === this.#sortedValues.length + ? null + : getIndex(this.#sortedValues[index]!) + const fractionalIndex = generateKeyBetween(indexBefore, indexAfter) + + // Insert the value at the correct position + const val = indexedValue(value, fractionalIndex) + // Splice is O(n) where n = all elements in the collection (i.e. n >= k) ! + this.#sortedValues.splice(index, 0, val) + + // Check if the topK changed + if (index < this.#topKEnd) { + // The inserted element is either before the top K or within the top K + // If it is before the top K then it moves the element that was right before the topK into the topK + // If it is within the top K then the inserted element moves into the top K + // In both cases the last element of the old top K now moves out of the top K + const moveInIndex = Math.max(index, this.#topKStart) + if (moveInIndex < this.#sortedValues.length) { + // We actually have a topK + // because in some cases there may not be enough elements in the array to reach the start of the topK + // e.g. [1, 2, 3] with K = 2 and offset = 3 does not have a topK + result.moveIn = this.#sortedValues[moveInIndex]! + + // We need to remove the element that falls out of the top K + // The element that falls out of the top K has shifted one to the right + // because of the element we inserted, so we find it at index topKEnd + if (this.#topKEnd < this.#sortedValues.length) { + result.moveOut = this.#sortedValues[this.#topKEnd]! + } + } + } + + return result + } + + /** + * Deletes a value that may or may not be in the topK. + * IMPORTANT: this assumes that the value is present in the collection + * if it's not the case it will remove the element + * that is on the position where the provided `value` would be. + */ + delete(value: V): TopKChanges { + const result: TopKChanges = { moveIn: null, moveOut: null } + + // Lookup delete position + const index = this.#findIndex(value) + // Remove the value at that position + const [removedElem] = this.#sortedValues.splice(index, 1) + + // Check if the topK changed + if (index < this.#topKEnd) { + // The removed element is either before the top K or within the top K + // If it is before the top K then the first element of the topK moves out of the topK + // If it is within the top K then the removed element moves out of the topK + result.moveOut = removedElem! + if (index < this.#topKStart) { + // The removed element is before the topK + // so actually, the first element of the topK moves out of the topK + // and not the element that we removed + // The first element of the topK is now at index topKStart - 1 + // since we removed an element before the topK + const moveOutIndex = this.#topKStart - 1 + if (moveOutIndex < this.#sortedValues.length) { + result.moveOut = this.#sortedValues[moveOutIndex]! + } else { + // No value is moving out of the topK + // because there are no elements in the topK + result.moveOut = null + } + } + + // Since we removed an element that was before or in the topK + // the first element after the topK moved one position to the left + // and thus falls into the topK now + const moveInIndex = this.#topKEnd - 1 + if (moveInIndex < this.#sortedValues.length) { + result.moveIn = this.#sortedValues[moveInIndex]! + } + } + + return result + } + + // TODO: see if there is a way to refactor the code for insert and delete in the topK above + // because they are very similar, one is shifting the topK window to the left and the other is shifting it to the right + // so i have the feeling there is a common pattern here and we can implement both cases using that pattern + + #findIndex(value: V): number { + return binarySearch(this.#sortedValues, indexedValue(value, ``), (a, b) => + this.#comparator(getValue(a), getValue(b)), + ) + } +} diff --git a/packages/db-ivm/src/operators/topKState.ts b/packages/db-ivm/src/operators/topKState.ts new file mode 100644 index 000000000..60239cd05 --- /dev/null +++ b/packages/db-ivm/src/operators/topKState.ts @@ -0,0 +1,106 @@ +import { TopKArray } from './topKArray.js' +import type { IndexedValue, TopK, TopKChanges, TopKMoveChanges } from './topKArray.js'; + +/** + * Helper class that manages the state for a single topK window. + * Encapsulates the multiplicity tracking and topK data structure, + * providing a clean interface for processing elements and moving the window. + * + * This class is used by both TopKWithFractionalIndexOperator (single instance) + * and GroupedTopKWithFractionalIndexOperator (one instance per group). + */ +export class TopKState { + #multiplicities: Map = new Map() + #topK: TopK<[K, T]> + + constructor(topK: TopK<[K, T]>) { + this.#topK = topK + } + + get size(): number { + return this.#topK.size + } + + get isEmpty(): boolean { + return this.#multiplicities.size === 0 && this.#topK.size === 0 + } + + /** + * Process an element update (insert or delete based on multiplicity change). + * Returns the changes to the topK window. + */ + processElement(key: K, value: T, multiplicity: number): TopKChanges<[K, T]> { + const { oldMultiplicity, newMultiplicity } = this.#updateMultiplicity( + key, + multiplicity, + ) + + if (oldMultiplicity <= 0 && newMultiplicity > 0) { + // The value was invisible but should now be visible + return this.#topK.insert([key, value]) + } else if (oldMultiplicity > 0 && newMultiplicity <= 0) { + // The value was visible but should now be invisible + return this.#topK.delete([key, value]) + } + // The value was invisible and remains invisible, + // or was visible and remains visible - no topK change + return { moveIn: null, moveOut: null } + } + + /** + * Move the topK window. Only works with TopKArray implementation. + */ + move(options: { offset?: number; limit?: number }): TopKMoveChanges<[K, T]> { + if (!(this.#topK instanceof TopKArray)) { + throw new Error( + `Cannot move B+-tree implementation of TopK with fractional index`, + ) + } + return this.#topK.move(options) + } + + #updateMultiplicity( + key: K, + multiplicity: number, + ): { oldMultiplicity: number; newMultiplicity: number } { + if (multiplicity === 0) { + const current = this.#multiplicities.get(key) ?? 0 + return { oldMultiplicity: current, newMultiplicity: current } + } + + const oldMultiplicity = this.#multiplicities.get(key) ?? 0 + const newMultiplicity = oldMultiplicity + multiplicity + if (newMultiplicity === 0) { + this.#multiplicities.delete(key) + } else { + this.#multiplicities.set(key, newMultiplicity) + } + return { oldMultiplicity, newMultiplicity } + } +} + +/** + * Handles a moveIn change by adding it to the result array. + */ +export function handleMoveIn( + moveIn: IndexedValue<[K, T]> | null, + result: Array<[[K, IndexedValue], number]>, +): void { + if (moveIn) { + const [[key, value], index] = moveIn + result.push([[key, [value, index]], 1]) + } +} + +/** + * Handles a moveOut change by adding it to the result array. + */ +export function handleMoveOut( + moveOut: IndexedValue<[K, T]> | null, + result: Array<[[K, IndexedValue], number]>, +): void { + if (moveOut) { + const [[key, value], index] = moveOut + result.push([[key, [value, index]], -1]) + } +} \ No newline at end of file diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts index 8ca008f94..d9b6a7adc 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts @@ -1,9 +1,9 @@ -import { generateKeyBetween } from 'fractional-indexing' import { DifferenceStreamWriter, UnaryOperator } from '../graph.js' import { StreamBuilder } from '../d2.js' import { MultiSet } from '../multiset.js' -import { binarySearch, compareKeys, diffHalfOpen } from '../utils.js' -import type { HRange } from '../utils.js' +import { TopKState, handleMoveIn, handleMoveOut } from './topKState.js' +import { TopKArray, createKeyedComparator } from './topKArray.js' +import type { IndexedValue, TopK} from './topKArray.js'; import type { DifferenceStreamReader } from '../graph.js' import type { IStreamBuilder, PipedOperator } from '../types.js' @@ -16,336 +16,6 @@ export interface TopKWithFractionalIndexOptions { ) => void } -export type TopKChanges = { - /** Indicates which element moves into the topK (if any) */ - moveIn: IndexedValue | null - /** Indicates which element moves out of the topK (if any) */ - moveOut: IndexedValue | null -} - -export type TopKMoveChanges = { - /** Flag that marks whether there were any changes to the topK */ - changes: boolean - /** Indicates which elements move into the topK (if any) */ - moveIns: Array> - /** Indicates which elements move out of the topK (if any) */ - moveOuts: Array> -} - -/** - * A topK data structure that supports insertions and deletions - * and returns changes to the topK. - */ -export interface TopK { - size: number - insert: (value: V) => TopKChanges - delete: (value: V) => TopKChanges -} - -/** - * Helper class that manages the state for a single topK window. - * Encapsulates the multiplicity tracking and topK data structure, - * providing a clean interface for processing elements and moving the window. - * - * This class is used by both TopKWithFractionalIndexOperator (single instance) - * and GroupedTopKWithFractionalIndexOperator (one instance per group). - */ -export class TopKState { - #multiplicities: Map = new Map() - #topK: TopK<[K, T]> - - constructor(topK: TopK<[K, T]>) { - this.#topK = topK - } - - get size(): number { - return this.#topK.size - } - - get isEmpty(): boolean { - return this.#multiplicities.size === 0 && this.#topK.size === 0 - } - - /** - * Process an element update (insert or delete based on multiplicity change). - * Returns the changes to the topK window. - */ - processElement(key: K, value: T, multiplicity: number): TopKChanges<[K, T]> { - const { oldMultiplicity, newMultiplicity } = this.#updateMultiplicity( - key, - multiplicity, - ) - - if (oldMultiplicity <= 0 && newMultiplicity > 0) { - // The value was invisible but should now be visible - return this.#topK.insert([key, value]) - } else if (oldMultiplicity > 0 && newMultiplicity <= 0) { - // The value was visible but should now be invisible - return this.#topK.delete([key, value]) - } - // The value was invisible and remains invisible, - // or was visible and remains visible - no topK change - return { moveIn: null, moveOut: null } - } - - /** - * Move the topK window. Only works with TopKArray implementation. - */ - move(options: { offset?: number; limit?: number }): TopKMoveChanges<[K, T]> { - if (!(this.#topK instanceof TopKArray)) { - throw new Error( - `Cannot move B+-tree implementation of TopK with fractional index`, - ) - } - return this.#topK.move(options) - } - - #updateMultiplicity( - key: K, - multiplicity: number, - ): { oldMultiplicity: number; newMultiplicity: number } { - if (multiplicity === 0) { - const current = this.#multiplicities.get(key) ?? 0 - return { oldMultiplicity: current, newMultiplicity: current } - } - - const oldMultiplicity = this.#multiplicities.get(key) ?? 0 - const newMultiplicity = oldMultiplicity + multiplicity - if (newMultiplicity === 0) { - this.#multiplicities.delete(key) - } else { - this.#multiplicities.set(key, newMultiplicity) - } - return { oldMultiplicity, newMultiplicity } - } -} - -// Abstraction for fractionally indexed values -export type FractionalIndex = string -export type IndexedValue = [V, FractionalIndex] - -export function indexedValue( - value: V, - index: FractionalIndex, -): IndexedValue { - return [value, index] -} - -export function getValue(indexedVal: IndexedValue): V { - return indexedVal[0] -} - -export function getIndex(indexedVal: IndexedValue): FractionalIndex { - return indexedVal[1] -} - -/** - * Implementation of a topK data structure. - * Uses a sorted array internally to store the values and keeps a topK window over that array. - * Inserts and deletes are O(n) operations because worst case an element is inserted/deleted - * at the start of the array which causes all the elements to shift to the right/left. - */ -export class TopKArray implements TopK { - #sortedValues: Array> = [] - #comparator: (a: V, b: V) => number - #topKStart: number - #topKEnd: number - - constructor( - offset: number, - limit: number, - comparator: (a: V, b: V) => number, - ) { - this.#topKStart = offset - this.#topKEnd = offset + limit - this.#comparator = comparator - } - - get size(): number { - const offset = this.#topKStart - const limit = this.#topKEnd - this.#topKStart - const available = this.#sortedValues.length - offset - return Math.max(0, Math.min(limit, available)) - } - - /** - * Moves the topK window - */ - move({ - offset, - limit, - }: { - offset?: number - limit?: number - }): TopKMoveChanges { - const oldOffset = this.#topKStart - const oldLimit = this.#topKEnd - this.#topKStart - - // `this.#topKEnd` can be `Infinity` if it has no limit - // but `diffHalfOpen` expects a finite range - // so we restrict it to the size of the topK if topKEnd is infinite - const oldRange: HRange = [ - this.#topKStart, - this.#topKEnd === Infinity ? this.#topKStart + this.size : this.#topKEnd, - ] - - this.#topKStart = offset ?? oldOffset - this.#topKEnd = this.#topKStart + (limit ?? oldLimit) // can be `Infinity` if limit is `Infinity` - - // Also handle `Infinity` in the newRange - const newRange: HRange = [ - this.#topKStart, - this.#topKEnd === Infinity - ? Math.max(this.#topKStart + this.size, oldRange[1]) // since the new limit is Infinity we need to take everything (so we need to take the biggest (finite) topKEnd) - : this.#topKEnd, - ] - const { onlyInA, onlyInB } = diffHalfOpen(oldRange, newRange) - - const moveIns: Array> = [] - onlyInB.forEach((index) => { - const value = this.#sortedValues[index] - if (value) { - moveIns.push(value) - } - }) - - const moveOuts: Array> = [] - onlyInA.forEach((index) => { - const value = this.#sortedValues[index] - if (value) { - moveOuts.push(value) - } - }) - - // It could be that there are changes (i.e. moveIns or moveOuts) - // but that the collection is lazy so we don't have the data yet that needs to move in/out - // so `moveIns` and `moveOuts` will be empty but `changes` will be true - // this will tell the caller that it needs to run the graph to load more data - return { moveIns, moveOuts, changes: onlyInA.length + onlyInB.length > 0 } - } - - insert(value: V): TopKChanges { - const result: TopKChanges = { moveIn: null, moveOut: null } - - // Lookup insert position - const index = this.#findIndex(value) - // Generate fractional index based on the fractional indices of the elements before and after it - const indexBefore = - index === 0 ? null : getIndex(this.#sortedValues[index - 1]!) - const indexAfter = - index === this.#sortedValues.length - ? null - : getIndex(this.#sortedValues[index]!) - const fractionalIndex = generateKeyBetween(indexBefore, indexAfter) - - // Insert the value at the correct position - const val = indexedValue(value, fractionalIndex) - // Splice is O(n) where n = all elements in the collection (i.e. n >= k) ! - this.#sortedValues.splice(index, 0, val) - - // Check if the topK changed - if (index < this.#topKEnd) { - // The inserted element is either before the top K or within the top K - // If it is before the top K then it moves the element that was right before the topK into the topK - // If it is within the top K then the inserted element moves into the top K - // In both cases the last element of the old top K now moves out of the top K - const moveInIndex = Math.max(index, this.#topKStart) - if (moveInIndex < this.#sortedValues.length) { - // We actually have a topK - // because in some cases there may not be enough elements in the array to reach the start of the topK - // e.g. [1, 2, 3] with K = 2 and offset = 3 does not have a topK - result.moveIn = this.#sortedValues[moveInIndex]! - - // We need to remove the element that falls out of the top K - // The element that falls out of the top K has shifted one to the right - // because of the element we inserted, so we find it at index topKEnd - if (this.#topKEnd < this.#sortedValues.length) { - result.moveOut = this.#sortedValues[this.#topKEnd]! - } - } - } - - return result - } - - /** - * Deletes a value that may or may not be in the topK. - * IMPORTANT: this assumes that the value is present in the collection - * if it's not the case it will remove the element - * that is on the position where the provided `value` would be. - */ - delete(value: V): TopKChanges { - const result: TopKChanges = { moveIn: null, moveOut: null } - - // Lookup delete position - const index = this.#findIndex(value) - // Remove the value at that position - const [removedElem] = this.#sortedValues.splice(index, 1) - - // Check if the topK changed - if (index < this.#topKEnd) { - // The removed element is either before the top K or within the top K - // If it is before the top K then the first element of the topK moves out of the topK - // If it is within the top K then the removed element moves out of the topK - result.moveOut = removedElem! - if (index < this.#topKStart) { - // The removed element is before the topK - // so actually, the first element of the topK moves out of the topK - // and not the element that we removed - // The first element of the topK is now at index topKStart - 1 - // since we removed an element before the topK - const moveOutIndex = this.#topKStart - 1 - if (moveOutIndex < this.#sortedValues.length) { - result.moveOut = this.#sortedValues[moveOutIndex]! - } else { - // No value is moving out of the topK - // because there are no elements in the topK - result.moveOut = null - } - } - - // Since we removed an element that was before or in the topK - // the first element after the topK moved one position to the left - // and thus falls into the topK now - const moveInIndex = this.#topKEnd - 1 - if (moveInIndex < this.#sortedValues.length) { - result.moveIn = this.#sortedValues[moveInIndex]! - } - } - - return result - } - - // TODO: see if there is a way to refactor the code for insert and delete in the topK above - // because they are very similar, one is shifting the topK window to the left and the other is shifting it to the right - // so i have the feeling there is a common pattern here and we can implement both cases using that pattern - - #findIndex(value: V): number { - return binarySearch(this.#sortedValues, indexedValue(value, ``), (a, b) => - this.#comparator(getValue(a), getValue(b)), - ) - } -} - -/** - * Creates a comparator for [key, value] tuples that first compares values, - * then uses the row key as a stable tie-breaker. - */ -export function createKeyedComparator( - comparator: (a: T, b: T) => number, -): (a: [K, T], b: [K, T]) => number { - return ([aKey, aVal], [bKey, bVal]) => { - // First compare on the value - const valueComparison = comparator(aVal, bVal) - if (valueComparison !== 0) { - return valueComparison - } - // If the values are equal, use the row key as tie-breaker - // This provides stable, deterministic ordering since keys are string | number - return compareKeys(aKey, bKey) - } -} - /** * Operator for fractional indexed topK operations * This operator maintains fractional indices for sorted elements @@ -430,32 +100,6 @@ export class TopKWithFractionalIndexOperator< } } -/** - * Handles a moveIn change by adding it to the result array. - */ -function handleMoveIn( - moveIn: IndexedValue<[K, T]> | null, - result: Array<[[K, IndexedValue], number]>, -): void { - if (moveIn) { - const [[key, value], index] = moveIn - result.push([[key, [value, index]], 1]) - } -} - -/** - * Handles a moveOut change by adding it to the result array. - */ -function handleMoveOut( - moveOut: IndexedValue<[K, T]> | null, - result: Array<[[K, IndexedValue], number]>, -): void { - if (moveOut) { - const [[key, value], index] = moveOut - result.push([[key, [value, index]], -1]) - } -} - /** * Limits the number of results based on a comparator, with optional offset. * Uses fractional indexing to minimize the number of changes when elements move positions. diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts b/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts index 8114325dc..fee7f7da7 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts @@ -3,15 +3,11 @@ import { DifferenceStreamWriter } from '../graph.js' import { StreamBuilder } from '../d2.js' import { TopKWithFractionalIndexOperator, - getIndex, - getValue, - indexedValue, } from './topKWithFractionalIndex.js' +import { getIndex, getValue, indexedValue } from './topKArray.js' +import type { IndexedValue, TopK, TopKChanges } from './topKArray.js'; import type { IStreamBuilder, PipedOperator } from '../types.js' import type { - IndexedValue, - TopK, - TopKChanges, TopKWithFractionalIndexOptions, } from './topKWithFractionalIndex.js' From fef17b6ad000d5da27e4b17c836c651d661b6db1 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 10 Dec 2025 09:43:57 +0000 Subject: [PATCH 06/15] ci: apply automated fixes --- .../src/operators/groupedTopKWithFractionalIndex.ts | 2 +- packages/db-ivm/src/operators/topKState.ts | 9 +++++++-- .../db-ivm/src/operators/topKWithFractionalIndex.ts | 2 +- .../src/operators/topKWithFractionalIndexBTree.ts | 10 +++------- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts b/packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts index 6668bd729..47e7a309b 100644 --- a/packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts @@ -3,7 +3,7 @@ import { StreamBuilder } from '../d2.js' import { MultiSet } from '../multiset.js' import { TopKState, handleMoveIn, handleMoveOut } from './topKState.js' import { TopKArray, createKeyedComparator } from './topKArray.js' -import type { IndexedValue, TopK} from './topKArray.js'; +import type { IndexedValue, TopK } from './topKArray.js' import type { DifferenceStreamReader } from '../graph.js' import type { IStreamBuilder, PipedOperator } from '../types.js' diff --git a/packages/db-ivm/src/operators/topKState.ts b/packages/db-ivm/src/operators/topKState.ts index 60239cd05..e5dadd213 100644 --- a/packages/db-ivm/src/operators/topKState.ts +++ b/packages/db-ivm/src/operators/topKState.ts @@ -1,5 +1,10 @@ import { TopKArray } from './topKArray.js' -import type { IndexedValue, TopK, TopKChanges, TopKMoveChanges } from './topKArray.js'; +import type { + IndexedValue, + TopK, + TopKChanges, + TopKMoveChanges, +} from './topKArray.js' /** * Helper class that manages the state for a single topK window. @@ -103,4 +108,4 @@ export function handleMoveOut( const [[key, value], index] = moveOut result.push([[key, [value, index]], -1]) } -} \ No newline at end of file +} diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts index d9b6a7adc..740c3b840 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts @@ -3,7 +3,7 @@ import { StreamBuilder } from '../d2.js' import { MultiSet } from '../multiset.js' import { TopKState, handleMoveIn, handleMoveOut } from './topKState.js' import { TopKArray, createKeyedComparator } from './topKArray.js' -import type { IndexedValue, TopK} from './topKArray.js'; +import type { IndexedValue, TopK } from './topKArray.js' import type { DifferenceStreamReader } from '../graph.js' import type { IStreamBuilder, PipedOperator } from '../types.js' diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts b/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts index fee7f7da7..3ca29ac1c 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts @@ -1,15 +1,11 @@ import { generateKeyBetween } from 'fractional-indexing' import { DifferenceStreamWriter } from '../graph.js' import { StreamBuilder } from '../d2.js' -import { - TopKWithFractionalIndexOperator, -} from './topKWithFractionalIndex.js' +import { TopKWithFractionalIndexOperator } from './topKWithFractionalIndex.js' import { getIndex, getValue, indexedValue } from './topKArray.js' -import type { IndexedValue, TopK, TopKChanges } from './topKArray.js'; +import type { IndexedValue, TopK, TopKChanges } from './topKArray.js' import type { IStreamBuilder, PipedOperator } from '../types.js' -import type { - TopKWithFractionalIndexOptions, -} from './topKWithFractionalIndex.js' +import type { TopKWithFractionalIndexOptions } from './topKWithFractionalIndex.js' interface BTree { nextLowerPair: (key: Key) => [Key, Value] | undefined From f4e1cd6e2b13e2a29ef1bd1dfd6c212f7996c6b6 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 10 Dec 2025 11:40:20 +0100 Subject: [PATCH 07/15] Improve unit tests --- .../groupedTopKWithFractionalIndex.test.ts | 80 ++++++------------- 1 file changed, 23 insertions(+), 57 deletions(-) diff --git a/packages/db-ivm/tests/operators/groupedTopKWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/groupedTopKWithFractionalIndex.test.ts index 867c3195c..f1615dccb 100644 --- a/packages/db-ivm/tests/operators/groupedTopKWithFractionalIndex.test.ts +++ b/packages/db-ivm/tests/operators/groupedTopKWithFractionalIndex.test.ts @@ -218,15 +218,30 @@ describe(`Operators`, () => { graph.run() const updateResult = tracker.getResult() - // Only group1 should be affected - const affectedGroups = new Set( - updateResult.messages.map( - ([[_key, [value, _idx]], _mult]) => value.group, - ), + + // Should have exactly 2 messages: one removal and one addition + expect(updateResult.messages.length).toBe(2) + + // Find the removal message (multiplicity -1) and addition message (multiplicity 1) + const removalMessage = updateResult.messages.find( + ([_item, mult]) => mult === -1, ) - expect(affectedGroups.size).toBe(1) - expect(affectedGroups.has(`group1`)).toBe(true) - expect(affectedGroups.has(`group2`)).toBe(false) + const additionMessage = updateResult.messages.find( + ([_item, mult]) => mult === 1, + ) + + expect(removalMessage).toBeDefined() + expect(additionMessage).toBeDefined() + + // Check that removal is for value 20 (g1-b) + const [_removalKey, [removalValue, _removalIdx]] = removalMessage![0] + expect(removalValue.value).toBe(20) + expect(removalValue.id).toBe(`g1-b`) + + // Check that addition is for value 5 (g1-c) + const [_additionKey, [additionValue, _additionIdx]] = additionMessage![0] + expect(additionValue.value).toBe(5) + expect(additionValue.id).toBe(`g1-c`) }) it(`should support offset within groups`, () => { @@ -362,55 +377,6 @@ describe(`Operators`, () => { expect(values).toEqual([1, 3, 5]) }) - it(`should maintain fractional indices correctly within groups`, () => { - const graph = new D2() - const input = - graph.newInput<[string, { id: string; group: string; value: number }]>() - const tracker = new MessageTracker< - [string, [{ id: string; group: string; value: number }, string]] - >() - - input.pipe( - groupedTopKWithFractionalIndex((a, b) => a.value - b.value, { - limit: 3, - groupKeyFn: (_key, value) => value.group, - }), - output((message) => { - tracker.addMessage(message) - }), - ) - - graph.finalize() - - input.sendData( - new MultiSet([ - [[`g1-a`, { id: `g1-a`, group: `group1`, value: 1 }], 1], - [[`g1-b`, { id: `g1-b`, group: `group1`, value: 3 }], 1], - [[`g1-c`, { id: `g1-c`, group: `group1`, value: 5 }], 1], - ]), - ) - graph.run() - - const result = tracker.getResult(compareFractionalIndex) - - // Check fractional indices are in correct order - const indexedItems = result.sortedResults.map( - ([key, [value, index]]) => ({ - key, - value: value.value, - index, - }), - ) - - // Sort by value to verify fractional indices match sort order - indexedItems.sort((a, b) => a.value - b.value) - - // Verify indices are in lexicographic order - for (let i = 0; i < indexedItems.length - 1; i++) { - expect(indexedItems[i]!.index < indexedItems[i + 1]!.index).toBe(true) - } - }) - it(`should handle setSizeCallback correctly`, () => { const graph = new D2() const input = From 6e03bd8dded21cbb44490afef82db90a4554bf71 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 10 Dec 2025 10:42:53 +0000 Subject: [PATCH 08/15] ci: apply automated fixes --- .../operators/groupedTopKWithFractionalIndex.test.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/db-ivm/tests/operators/groupedTopKWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/groupedTopKWithFractionalIndex.test.ts index f1615dccb..c5890d22c 100644 --- a/packages/db-ivm/tests/operators/groupedTopKWithFractionalIndex.test.ts +++ b/packages/db-ivm/tests/operators/groupedTopKWithFractionalIndex.test.ts @@ -218,10 +218,10 @@ describe(`Operators`, () => { graph.run() const updateResult = tracker.getResult() - + // Should have exactly 2 messages: one removal and one addition expect(updateResult.messages.length).toBe(2) - + // Find the removal message (multiplicity -1) and addition message (multiplicity 1) const removalMessage = updateResult.messages.find( ([_item, mult]) => mult === -1, @@ -229,15 +229,15 @@ describe(`Operators`, () => { const additionMessage = updateResult.messages.find( ([_item, mult]) => mult === 1, ) - + expect(removalMessage).toBeDefined() expect(additionMessage).toBeDefined() - + // Check that removal is for value 20 (g1-b) const [_removalKey, [removalValue, _removalIdx]] = removalMessage![0] expect(removalValue.value).toBe(20) expect(removalValue.id).toBe(`g1-b`) - + // Check that addition is for value 5 (g1-c) const [_additionKey, [additionValue, _additionIdx]] = additionMessage![0] expect(additionValue.value).toBe(5) From fdd892bd0d9cb7fc02ef6661ac9a46fcaa461523 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Wed, 10 Dec 2025 10:51:39 +0000 Subject: [PATCH 09/15] feat: Add groupedOrderBy operator This commit introduces the `groupedOrderBy` operator, which allows for ordering and limiting elements within distinct groups. It also includes necessary exports and tests for this new functionality. Co-authored-by: kevin.de.porre --- .changeset/grouped-orderby-operator.md | 5 + .../db-ivm/src/operators/groupedOrderBy.ts | 95 +++ packages/db-ivm/src/operators/index.ts | 1 + .../groupedOrderByWithFractionalIndex.test.ts | 685 ++++++++++++++++++ packages/rxdb-db-collection/src/rxdb.ts | 4 +- 5 files changed, 788 insertions(+), 2 deletions(-) create mode 100644 .changeset/grouped-orderby-operator.md create mode 100644 packages/db-ivm/src/operators/groupedOrderBy.ts create mode 100644 packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts diff --git a/.changeset/grouped-orderby-operator.md b/.changeset/grouped-orderby-operator.md new file mode 100644 index 000000000..73f0f9415 --- /dev/null +++ b/.changeset/grouped-orderby-operator.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db-ivm": patch +--- + +Add `groupedOrderByWithFractionalIndex` operator. This operator groups elements by a provided `groupKeyFn` and applies ordering and limits independently to each group. Each group maintains its own sorted collection with independent limit/offset, which is useful for hierarchical data projections where child collections need to enforce limits within each parent's slice of the stream rather than across the entire dataset. diff --git a/packages/db-ivm/src/operators/groupedOrderBy.ts b/packages/db-ivm/src/operators/groupedOrderBy.ts new file mode 100644 index 000000000..901e05c7c --- /dev/null +++ b/packages/db-ivm/src/operators/groupedOrderBy.ts @@ -0,0 +1,95 @@ +import { groupedTopKWithFractionalIndex } from './groupedTopKWithFractionalIndex.js' +import { consolidate } from './consolidate.js' +import type { IStreamBuilder, KeyValue } from '../types.js' + +export interface GroupedOrderByOptions { + comparator?: (a: Ve, b: Ve) => number + limit?: number + offset?: number +} + +export interface GroupedOrderByWithFractionalIndexOptions< + Ve, + KeyType = unknown, + ValueType = unknown, +> extends GroupedOrderByOptions { + setSizeCallback?: (getSize: () => number) => void + setWindowFn?: ( + windowFn: (options: { offset?: number; limit?: number }) => void, + ) => void + /** + * Function to extract a group key from the element's key and value. + * Elements with the same group key will be sorted and limited together. + */ + groupKeyFn: (key: KeyType, value: ValueType) => unknown +} + +/** + * Orders the elements per group and limits the number of results per group, with optional offset and + * annotates the value with a fractional index. + * This requires a keyed stream, and uses the `groupedTopKWithFractionalIndex` operator to order elements within each group. + * + * Elements are grouped by the provided groupKeyFn, and each group maintains its own sorted collection + * with independent limit/offset. + * + * @param valueExtractor - A function that extracts the value to order by from the element + * @param options - Configuration including groupKeyFn, comparator, limit, and offset + * @returns A piped operator that orders the elements per group and limits the number of results per group + */ +export function groupedOrderByWithFractionalIndex< + T extends KeyValue, + Ve = unknown, +>( + valueExtractor: ( + value: T extends KeyValue ? V : never, + ) => Ve, + options: GroupedOrderByWithFractionalIndexOptions< + Ve, + T extends KeyValue ? K : never, + T extends KeyValue ? V : never + >, +) { + type KeyType = T extends KeyValue ? K : never + type ValueType = T extends KeyValue ? V : never + + const limit = options.limit ?? Infinity + const offset = options.offset ?? 0 + const setSizeCallback = options.setSizeCallback + const setWindowFn = options.setWindowFn + const groupKeyFn = options.groupKeyFn + const comparator = + options.comparator ?? + ((a, b) => { + // Default to JS like ordering + if (a === b) return 0 + if (a < b) return -1 + return 1 + }) + + return ( + stream: IStreamBuilder, + ): IStreamBuilder<[KeyType, [ValueType, string]]> => { + // Cast to the expected key type for groupedTopKWithFractionalIndex + type StreamKey = KeyType extends string | number ? KeyType : string | number + + return stream.pipe( + groupedTopKWithFractionalIndex( + (a: ValueType, b: ValueType) => + comparator(valueExtractor(a), valueExtractor(b)), + { + limit, + offset, + setSizeCallback, + setWindowFn, + groupKeyFn: groupKeyFn as ( + key: StreamKey, + value: ValueType, + ) => unknown, + }, + ) as ( + stream: IStreamBuilder, + ) => IStreamBuilder<[KeyType, [ValueType, string]]>, + consolidate(), + ) + } +} diff --git a/packages/db-ivm/src/operators/index.ts b/packages/db-ivm/src/operators/index.ts index 3f8ab8cb8..13acf130b 100644 --- a/packages/db-ivm/src/operators/index.ts +++ b/packages/db-ivm/src/operators/index.ts @@ -16,5 +16,6 @@ export * from './topK.js' export * from './topKWithFractionalIndex.js' export * from './groupedTopKWithFractionalIndex.js' export * from './orderBy.js' +export * from './groupedOrderBy.js' export * from './filterBy.js' export { groupBy, groupByOperators } from './groupBy.js' diff --git a/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts new file mode 100644 index 000000000..dd5fe52d1 --- /dev/null +++ b/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts @@ -0,0 +1,685 @@ +import { describe, expect, it } from 'vitest' +import { D2 } from '../../src/d2.js' +import { MultiSet } from '../../src/multiset.js' +import { groupedOrderByWithFractionalIndex } from '../../src/operators/groupedOrderBy.js' +import { output } from '../../src/operators/index.js' +import { MessageTracker, compareFractionalIndex } from '../test-utils.js' +import type { KeyValue } from '../../src/types.js' + +describe(`Operators`, () => { + describe(`GroupedOrderByWithFractionalIndex operator`, () => { + it(`should maintain separate ordering per group with array key`, () => { + const graph = new D2() + const input = graph.newInput< + KeyValue< + [string, string], + { + id: string + value: number + } + > + >() + const tracker = new MessageTracker< + [[string, string], [{ id: string; value: number }, string]] + >() + + input.pipe( + groupedOrderByWithFractionalIndex((item) => item.value, { + limit: 2, + groupKeyFn: (key, _value) => key[0], + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + input.sendData( + new MultiSet([ + [[[`group1`, `a`], { id: `g1-a`, value: 5 }], 1], + [[[`group1`, `b`], { id: `g1-b`, value: 1 }], 1], + [[[`group1`, `c`], { id: `g1-c`, value: 3 }], 1], + [[[`group2`, `a`], { id: `g2-a`, value: 4 }], 1], + [[[`group2`, `b`], { id: `g2-b`, value: 2 }], 1], + [[[`group2`, `c`], { id: `g2-c`, value: 6 }], 1], + ]), + ) + + graph.run() + + const result = tracker.getResult(compareFractionalIndex) + + // Each group should have limit 2, so 4 total results + expect(result.sortedResults.length).toBe(4) + + // Group by group key and verify each group's results + const groupedValues = new Map>() + for (const [key, [value, _index]] of result.sortedResults) { + // key is [string, string], extract the first element as the group + const group = (key)[0] + const list = groupedValues.get(group) ?? [] + list.push(value.value) + groupedValues.set(group, list) + } + + // Sort values within each group for consistent comparison + for (const [group, values] of groupedValues) { + values.sort((a, b) => a - b) + groupedValues.set(group, values) + } + + // group1 should have values 1, 3 (top 2 by ascending value) + expect(groupedValues.get(`group1`)).toEqual([1, 3]) + // group2 should have values 2, 4 (top 2 by ascending value) + expect(groupedValues.get(`group2`)).toEqual([2, 4]) + }) + + it(`should group by value property using groupKeyFn`, () => { + const graph = new D2() + const input = + graph.newInput< + KeyValue + >() + const tracker = new MessageTracker< + [string, [{ id: string; group: string; value: number }, string]] + >() + + input.pipe( + groupedOrderByWithFractionalIndex((item) => item.value, { + limit: 2, + groupKeyFn: (_key, value) => value.group, + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + // Initial data - 3 items per group + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 5 }], 1], + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 1 }], 1], + [[`g1-c`, { id: `g1-c`, group: `group1`, value: 3 }], 1], + [[`g2-a`, { id: `g2-a`, group: `group2`, value: 4 }], 1], + [[`g2-b`, { id: `g2-b`, group: `group2`, value: 2 }], 1], + [[`g2-c`, { id: `g2-c`, group: `group2`, value: 6 }], 1], + ]), + ) + graph.run() + + const result = tracker.getResult(compareFractionalIndex) + + // Each group should have limit 2, so 4 total results + expect(result.sortedResults.length).toBe(4) + + // Group by group key and verify each group's results + const groupedValues = new Map>() + for (const [_key, [value, _index]] of result.sortedResults) { + const group = value.group + const list = groupedValues.get(group) ?? [] + list.push(value.value) + groupedValues.set(group, list) + } + + // Sort values within each group for consistent comparison + for (const [group, values] of groupedValues) { + values.sort((a, b) => a - b) + groupedValues.set(group, values) + } + + // group1 should have values 1, 3 (top 2 by ascending value) + expect(groupedValues.get(`group1`)).toEqual([1, 3]) + // group2 should have values 2, 4 (top 2 by ascending value) + expect(groupedValues.get(`group2`)).toEqual([2, 4]) + }) + + it(`should handle incremental updates within a group`, () => { + const graph = new D2() + const input = + graph.newInput< + KeyValue + >() + const tracker = new MessageTracker< + [string, [{ id: string; group: string; value: number }, string]] + >() + + input.pipe( + groupedOrderByWithFractionalIndex((item) => item.value, { + limit: 2, + groupKeyFn: (_key, value) => value.group, + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + // Initial data + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 5 }], 1], + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 1 }], 1], + [[`g1-c`, { id: `g1-c`, group: `group1`, value: 3 }], 1], + ]), + ) + graph.run() + + // Initial should have 2 items (limit 2): values 1 and 3 + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(2) + const initialValues = initialResult.sortedResults + .map(([_key, [value, _index]]) => value.value) + .sort((a, b) => a - b) + expect(initialValues).toEqual([1, 3]) + + const initialMessageCount = initialResult.messageCount + + // Insert a better value (0) which should evict value 3 + input.sendData( + new MultiSet([ + [[`g1-d`, { id: `g1-d`, group: `group1`, value: 0 }], 1], + ]), + ) + graph.run() + + const updateResult = tracker.getResult(compareFractionalIndex) + // Should have 2 new messages: add 0, remove 3 + expect(updateResult.messageCount - initialMessageCount).toBe(2) + + // Check final state (cumulative) + const finalValues = updateResult.sortedResults + .map(([_key, [value, _index]]) => value.value) + .sort((a, b) => a - b) + expect(finalValues).toEqual([0, 1]) + }) + + it(`should handle removal of elements from topK`, () => { + const graph = new D2() + const input = + graph.newInput< + KeyValue + >() + const tracker = new MessageTracker< + [string, [{ id: string; group: string; value: number }, string]] + >() + + input.pipe( + groupedOrderByWithFractionalIndex((item) => item.value, { + limit: 2, + groupKeyFn: (_key, value) => value.group, + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + // Initial data + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 5 }], 1], + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 1 }], 1], + [[`g1-c`, { id: `g1-c`, group: `group1`, value: 3 }], 1], + ]), + ) + graph.run() + + const initialMessageCount = tracker.getResult().messageCount + + // Remove the element with value 1 (which is in topK) + input.sendData( + new MultiSet([ + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 1 }], -1], + ]), + ) + graph.run() + + const updateResult = tracker.getResult(compareFractionalIndex) + // Should have 2 new messages: remove 1, add 5 + expect(updateResult.messageCount - initialMessageCount).toBe(2) + + // Final state should have values 3 and 5 + const finalValues = updateResult.sortedResults + .map(([_key, [value, _index]]) => value.value) + .sort((a, b) => a - b) + expect(finalValues).toEqual([3, 5]) + }) + + it(`should handle multiple groups independently`, () => { + const graph = new D2() + const input = + graph.newInput< + KeyValue + >() + const tracker = new MessageTracker< + [string, [{ id: string; group: string; value: number }, string]] + >() + + input.pipe( + groupedOrderByWithFractionalIndex((item) => item.value, { + limit: 2, + groupKeyFn: (_key, value) => value.group, + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + // Initial data for two groups + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 10 }], 1], + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 20 }], 1], + [[`g2-a`, { id: `g2-a`, group: `group2`, value: 5 }], 1], + [[`g2-b`, { id: `g2-b`, group: `group2`, value: 15 }], 1], + ]), + ) + graph.run() + + tracker.reset() + + // Update only group1 - add a better value + input.sendData( + new MultiSet([ + [[`g1-c`, { id: `g1-c`, group: `group1`, value: 5 }], 1], + ]), + ) + graph.run() + + const updateResult = tracker.getResult() + + // Should have exactly 2 messages: one removal and one addition + expect(updateResult.messages.length).toBe(2) + + // Find the removal message (multiplicity -1) and addition message (multiplicity 1) + const removalMessage = updateResult.messages.find( + ([_item, mult]) => mult === -1, + ) + const additionMessage = updateResult.messages.find( + ([_item, mult]) => mult === 1, + ) + + expect(removalMessage).toBeDefined() + expect(additionMessage).toBeDefined() + + // Check that removal is for value 20 (g1-b) + const [_removalKey, [removalValue, _removalIdx]] = removalMessage![0] + expect(removalValue.value).toBe(20) + expect(removalValue.id).toBe(`g1-b`) + + // Check that addition is for value 5 (g1-c) + const [_additionKey, [additionValue, _additionIdx]] = additionMessage![0] + expect(additionValue.value).toBe(5) + expect(additionValue.id).toBe(`g1-c`) + }) + + it(`should support offset within groups`, () => { + const graph = new D2() + const input = + graph.newInput< + KeyValue + >() + const tracker = new MessageTracker< + [string, [{ id: string; group: string; value: number }, string]] + >() + + input.pipe( + groupedOrderByWithFractionalIndex((item) => item.value, { + limit: 2, + offset: 1, + groupKeyFn: (_key, value) => value.group, + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + // Initial data - 4 items per group + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 1 }], 1], + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 2 }], 1], + [[`g1-c`, { id: `g1-c`, group: `group1`, value: 3 }], 1], + [[`g1-d`, { id: `g1-d`, group: `group1`, value: 4 }], 1], + ]), + ) + graph.run() + + const result = tracker.getResult(compareFractionalIndex) + + // With offset 1 and limit 2, should get values 2 and 3 + const values = result.sortedResults + .map(([_key, [value, _index]]) => value.value) + .sort((a, b) => a - b) + expect(values).toEqual([2, 3]) + }) + + it(`should use custom comparator`, () => { + const graph = new D2() + const input = + graph.newInput< + KeyValue + >() + const tracker = new MessageTracker< + [string, [{ id: string; group: string; value: number }, string]] + >() + + input.pipe( + groupedOrderByWithFractionalIndex((item) => item.value, { + limit: 2, + groupKeyFn: (_key, value) => value.group, + // Descending order + comparator: (a, b) => b - a, + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 1 }], 1], + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 2 }], 1], + [[`g1-c`, { id: `g1-c`, group: `group1`, value: 3 }], 1], + ]), + ) + graph.run() + + const result = tracker.getResult(compareFractionalIndex) + + // With descending order and limit 2, should get values 3 and 2 + const values = result.sortedResults + .map(([_key, [value, _index]]) => value.value) + .sort((a, b) => b - a) // Sort descending for comparison + expect(values).toEqual([3, 2]) + }) + + it(`should use groupKeyFn to extract group from key with delimiter`, () => { + const graph = new D2() + // Use keys with format "group:itemId" + const input = + graph.newInput>() + const tracker = new MessageTracker< + [string, [{ id: string; value: number }, string]] + >() + + input.pipe( + groupedOrderByWithFractionalIndex((item) => item.value, { + limit: 2, + // Extract group from key "group:itemId" + groupKeyFn: (key, _value) => key.split(`:`)[0], + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + input.sendData( + new MultiSet([ + [[`group1:a`, { id: `g1-a`, value: 5 }], 1], + [[`group1:b`, { id: `g1-b`, value: 1 }], 1], + [[`group1:c`, { id: `g1-c`, value: 3 }], 1], + [[`group2:a`, { id: `g2-a`, value: 4 }], 1], + [[`group2:b`, { id: `g2-b`, value: 2 }], 1], + [[`group2:c`, { id: `g2-c`, value: 6 }], 1], + ]), + ) + graph.run() + + const result = tracker.getResult(compareFractionalIndex) + + // Group results by group extracted from key + const groupedValues = new Map>() + for (const [key, [value, _index]] of result.sortedResults) { + const group = key.split(`:`)[0]! + const list = groupedValues.get(group) ?? [] + list.push(value.value) + groupedValues.set(group, list) + } + + for (const [group, values] of groupedValues) { + values.sort((a, b) => a - b) + groupedValues.set(group, values) + } + + expect(groupedValues.get(`group1`)).toEqual([1, 3]) + expect(groupedValues.get(`group2`)).toEqual([2, 4]) + }) + + it(`should support infinite limit (no limit)`, () => { + const graph = new D2() + const input = + graph.newInput< + KeyValue + >() + const tracker = new MessageTracker< + [string, [{ id: string; group: string; value: number }, string]] + >() + + input.pipe( + groupedOrderByWithFractionalIndex((item) => item.value, { + // No limit specified - defaults to Infinity + groupKeyFn: (_key, value) => value.group, + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 5 }], 1], + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 1 }], 1], + [[`g1-c`, { id: `g1-c`, group: `group1`, value: 3 }], 1], + ]), + ) + graph.run() + + const result = tracker.getResult(compareFractionalIndex) + + // All 3 items should be in the result + expect(result.sortedResults.length).toBe(3) + const values = result.sortedResults + .map(([_key, [value, _index]]) => value.value) + .sort((a, b) => a - b) + expect(values).toEqual([1, 3, 5]) + }) + + it(`should handle setSizeCallback correctly`, () => { + const graph = new D2() + const input = + graph.newInput< + KeyValue + >() + let getSize: (() => number) | undefined + + input.pipe( + groupedOrderByWithFractionalIndex((item) => item.value, { + limit: 2, + groupKeyFn: (_key, value) => value.group, + setSizeCallback: (fn) => { + getSize = fn + }, + }), + output(() => {}), + ) + + graph.finalize() + + expect(getSize).toBeDefined() + expect(getSize!()).toBe(0) // Initially empty + + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 5 }], 1], + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 1 }], 1], + [[`g1-c`, { id: `g1-c`, group: `group1`, value: 3 }], 1], + [[`g2-a`, { id: `g2-a`, group: `group2`, value: 4 }], 1], + [[`g2-b`, { id: `g2-b`, group: `group2`, value: 2 }], 1], + ]), + ) + graph.run() + + // group1 has 2 items in topK, group2 has 2 items + expect(getSize!()).toBe(4) + }) + + it(`should handle moving window with setWindowFn`, () => { + const graph = new D2() + const input = + graph.newInput< + KeyValue + >() + const tracker = new MessageTracker< + [string, [{ id: string; group: string; value: number }, string]] + >() + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | undefined + + input.pipe( + groupedOrderByWithFractionalIndex((item) => item.value, { + limit: 2, + offset: 0, + groupKeyFn: (_key, value) => value.group, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 1 }], 1], + [[`g1-b`, { id: `g1-b`, group: `group1`, value: 2 }], 1], + [[`g1-c`, { id: `g1-c`, group: `group1`, value: 3 }], 1], + [[`g1-d`, { id: `g1-d`, group: `group1`, value: 4 }], 1], + ]), + ) + graph.run() + + // Initial: values 1, 2 + const initialResult = tracker.getResult(compareFractionalIndex) + const initialValues = initialResult.sortedResults + .map(([_key, [value, _index]]) => value.value) + .sort((a, b) => a - b) + expect(initialValues).toEqual([1, 2]) + + // Move window to offset 1 + windowFn!({ offset: 1 }) + graph.run() + + // Now should have values 2, 3 + const movedResult = tracker.getResult(compareFractionalIndex) + const movedValues = movedResult.sortedResults + .map(([_key, [value, _index]]) => value.value) + .sort((a, b) => a - b) + expect(movedValues).toEqual([2, 3]) + }) + + it(`should cleanup empty groups`, () => { + const graph = new D2() + const input = + graph.newInput< + KeyValue + >() + const tracker = new MessageTracker< + [string, [{ id: string; group: string; value: number }, string]] + >() + + input.pipe( + groupedOrderByWithFractionalIndex((item) => item.value, { + limit: 2, + groupKeyFn: (_key, value) => value.group, + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + // Add items to two groups + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 1 }], 1], + [[`g2-a`, { id: `g2-a`, group: `group2`, value: 2 }], 1], + ]), + ) + graph.run() + + expect(tracker.getResult().sortedResults.length).toBe(2) + + // Remove all items from group1 + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, value: 1 }], -1], + ]), + ) + graph.run() + + // Should have only group2 left in materialized results + const updateResult = tracker.getResult(compareFractionalIndex) + expect(updateResult.sortedResults.length).toBe(1) + expect(updateResult.sortedResults[0]![1][0].group).toBe(`group2`) + }) + + it(`should order by string property`, () => { + const graph = new D2() + const input = + graph.newInput< + KeyValue + >() + const tracker = new MessageTracker< + [string, [{ id: string; group: string; name: string }, string]] + >() + + input.pipe( + groupedOrderByWithFractionalIndex((item) => item.name, { + limit: 2, + groupKeyFn: (_key, value) => value.group, + }), + output((message) => { + tracker.addMessage(message) + }), + ) + + graph.finalize() + + input.sendData( + new MultiSet([ + [[`g1-a`, { id: `g1-a`, group: `group1`, name: `charlie` }], 1], + [[`g1-b`, { id: `g1-b`, group: `group1`, name: `alice` }], 1], + [[`g1-c`, { id: `g1-c`, group: `group1`, name: `bob` }], 1], + ]), + ) + graph.run() + + const result = tracker.getResult(compareFractionalIndex) + const names = result.sortedResults + .map(([_key, [value, _index]]) => value.name) + .sort() + expect(names).toEqual([`alice`, `bob`]) + }) + }) +}) diff --git a/packages/rxdb-db-collection/src/rxdb.ts b/packages/rxdb-db-collection/src/rxdb.ts index 6ec59cfb6..28d83aa03 100644 --- a/packages/rxdb-db-collection/src/rxdb.ts +++ b/packages/rxdb-db-collection/src/rxdb.ts @@ -280,7 +280,7 @@ export function rxdbCollectionOptions(config: RxDBCollectionConfig) { onUpdate: async (params) => { debug(`update`, params) const mutations = params.transaction.mutations.filter( - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + (m) => m.type === `update`, ) @@ -297,7 +297,7 @@ export function rxdbCollectionOptions(config: RxDBCollectionConfig) { onDelete: async (params) => { debug(`delete`, params) const mutations = params.transaction.mutations.filter( - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + (m) => m.type === `delete`, ) const ids = mutations.map((mutation) => getKey(mutation.original)) From 276332ee1c270348b07928639c80d790b3b3b3fa Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 10 Dec 2025 10:53:59 +0000 Subject: [PATCH 10/15] ci: apply automated fixes --- .changeset/grouped-orderby-operator.md | 2 +- .../tests/operators/groupedOrderByWithFractionalIndex.test.ts | 2 +- packages/rxdb-db-collection/src/rxdb.ts | 2 -- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/.changeset/grouped-orderby-operator.md b/.changeset/grouped-orderby-operator.md index 73f0f9415..f09e5bd10 100644 --- a/.changeset/grouped-orderby-operator.md +++ b/.changeset/grouped-orderby-operator.md @@ -1,5 +1,5 @@ --- -"@tanstack/db-ivm": patch +'@tanstack/db-ivm': patch --- Add `groupedOrderByWithFractionalIndex` operator. This operator groups elements by a provided `groupKeyFn` and applies ordering and limits independently to each group. Each group maintains its own sorted collection with independent limit/offset, which is useful for hierarchical data projections where child collections need to enforce limits within each parent's slice of the stream rather than across the entire dataset. diff --git a/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts index dd5fe52d1..eb05951c9 100644 --- a/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts +++ b/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts @@ -57,7 +57,7 @@ describe(`Operators`, () => { const groupedValues = new Map>() for (const [key, [value, _index]] of result.sortedResults) { // key is [string, string], extract the first element as the group - const group = (key)[0] + const group = key[0] const list = groupedValues.get(group) ?? [] list.push(value.value) groupedValues.set(group, list) diff --git a/packages/rxdb-db-collection/src/rxdb.ts b/packages/rxdb-db-collection/src/rxdb.ts index 28d83aa03..23880c534 100644 --- a/packages/rxdb-db-collection/src/rxdb.ts +++ b/packages/rxdb-db-collection/src/rxdb.ts @@ -280,7 +280,6 @@ export function rxdbCollectionOptions(config: RxDBCollectionConfig) { onUpdate: async (params) => { debug(`update`, params) const mutations = params.transaction.mutations.filter( - (m) => m.type === `update`, ) @@ -297,7 +296,6 @@ export function rxdbCollectionOptions(config: RxDBCollectionConfig) { onDelete: async (params) => { debug(`delete`, params) const mutations = params.transaction.mutations.filter( - (m) => m.type === `delete`, ) const ids = mutations.map((mutation) => getKey(mutation.original)) From e12c558c12960f52355d03ebdf4a3da604cc6d81 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 10 Dec 2025 13:35:31 +0100 Subject: [PATCH 11/15] Improve unit tests --- .../groupedOrderByWithFractionalIndex.test.ts | 219 +++++++++++------- 1 file changed, 137 insertions(+), 82 deletions(-) diff --git a/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts index eb05951c9..850d0c99a 100644 --- a/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts +++ b/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts @@ -21,12 +21,14 @@ describe(`Operators`, () => { >() const tracker = new MessageTracker< [[string, string], [{ id: string; value: number }, string]] - >() + >() + + const groupKeyFn = (key: [string, string]) => key[0] input.pipe( groupedOrderByWithFractionalIndex((item) => item.value, { limit: 2, - groupKeyFn: (key, _value) => key[0], + groupKeyFn, }), output((message) => { tracker.addMessage(message) @@ -53,25 +55,13 @@ describe(`Operators`, () => { // Each group should have limit 2, so 4 total results expect(result.sortedResults.length).toBe(4) - // Group by group key and verify each group's results - const groupedValues = new Map>() - for (const [key, [value, _index]] of result.sortedResults) { - // key is [string, string], extract the first element as the group - const group = key[0] - const list = groupedValues.get(group) ?? [] - list.push(value.value) - groupedValues.set(group, list) - } - - // Sort values within each group for consistent comparison - for (const [group, values] of groupedValues) { - values.sort((a, b) => a - b) - groupedValues.set(group, values) - } - - // group1 should have values 1, 3 (top 2 by ascending value) + // Sort all results by fractional index first, then group by group key + const sortedResults = sortByKeyAndIndex(result.sortedResults) + const groupedValues = groupResultsByKey(sortedResults, groupKeyFn) + + // group1 should have values 1, 3 (top 2 by ascending value, ordered by fractional index) expect(groupedValues.get(`group1`)).toEqual([1, 3]) - // group2 should have values 2, 4 (top 2 by ascending value) + // group2 should have values 2, 4 (top 2 by ascending value, ordered by fractional index) expect(groupedValues.get(`group2`)).toEqual([2, 4]) }) @@ -85,10 +75,12 @@ describe(`Operators`, () => { [string, [{ id: string; group: string; value: number }, string]] >() + const groupKeyFn = (_key: string, value: { id: string; group: string; value: number }) => value.group + input.pipe( groupedOrderByWithFractionalIndex((item) => item.value, { limit: 2, - groupKeyFn: (_key, value) => value.group, + groupKeyFn, }), output((message) => { tracker.addMessage(message) @@ -102,9 +94,9 @@ describe(`Operators`, () => { new MultiSet([ [[`g1-a`, { id: `g1-a`, group: `group1`, value: 5 }], 1], [[`g1-b`, { id: `g1-b`, group: `group1`, value: 1 }], 1], - [[`g1-c`, { id: `g1-c`, group: `group1`, value: 3 }], 1], [[`g2-a`, { id: `g2-a`, group: `group2`, value: 4 }], 1], [[`g2-b`, { id: `g2-b`, group: `group2`, value: 2 }], 1], + [[`g1-c`, { id: `g1-c`, group: `group1`, value: 3 }], 1], [[`g2-c`, { id: `g2-c`, group: `group2`, value: 6 }], 1], ]), ) @@ -115,24 +107,13 @@ describe(`Operators`, () => { // Each group should have limit 2, so 4 total results expect(result.sortedResults.length).toBe(4) - // Group by group key and verify each group's results - const groupedValues = new Map>() - for (const [_key, [value, _index]] of result.sortedResults) { - const group = value.group - const list = groupedValues.get(group) ?? [] - list.push(value.value) - groupedValues.set(group, list) - } - - // Sort values within each group for consistent comparison - for (const [group, values] of groupedValues) { - values.sort((a, b) => a - b) - groupedValues.set(group, values) - } - - // group1 should have values 1, 3 (top 2 by ascending value) + // Sort all results by fractional index first, then group by group key + const sortedResults = sortByKeyAndIndex(result.sortedResults) + const groupedValues = groupResultsByKey(sortedResults, groupKeyFn) + + // group1 should have values 1, 3 (top 2 by ascending value, ordered by fractional index) expect(groupedValues.get(`group1`)).toEqual([1, 3]) - // group2 should have values 2, 4 (top 2 by ascending value) + // group2 should have values 2, 4 (top 2 by ascending value, ordered by fractional index) expect(groupedValues.get(`group2`)).toEqual([2, 4]) }) @@ -171,9 +152,10 @@ describe(`Operators`, () => { // Initial should have 2 items (limit 2): values 1 and 3 const initialResult = tracker.getResult(compareFractionalIndex) expect(initialResult.sortedResults.length).toBe(2) - const initialValues = initialResult.sortedResults - .map(([_key, [value, _index]]) => value.value) - .sort((a, b) => a - b) + const sortedInitialResults = sortByKeyAndIndex(initialResult.sortedResults) + const initialValues = sortedInitialResults.map( + ([_key, [value, _index]]) => value.value, + ) expect(initialValues).toEqual([1, 3]) const initialMessageCount = initialResult.messageCount @@ -191,9 +173,10 @@ describe(`Operators`, () => { expect(updateResult.messageCount - initialMessageCount).toBe(2) // Check final state (cumulative) - const finalValues = updateResult.sortedResults - .map(([_key, [value, _index]]) => value.value) - .sort((a, b) => a - b) + const sortedFinalResults = sortByKeyAndIndex(updateResult.sortedResults) + const finalValues = sortedFinalResults.map( + ([_key, [value, _index]]) => value.value, + ) expect(finalValues).toEqual([0, 1]) }) @@ -244,9 +227,10 @@ describe(`Operators`, () => { expect(updateResult.messageCount - initialMessageCount).toBe(2) // Final state should have values 3 and 5 - const finalValues = updateResult.sortedResults - .map(([_key, [value, _index]]) => value.value) - .sort((a, b) => a - b) + const sortedFinalResults = sortByKeyAndIndex(updateResult.sortedResults) + const finalValues = sortedFinalResults.map( + ([_key, [value, _index]]) => value.value, + ) expect(finalValues).toEqual([3, 5]) }) @@ -260,10 +244,12 @@ describe(`Operators`, () => { [string, [{ id: string; group: string; value: number }, string]] >() + const groupKeyFn = (_key: string, value: { id: string; group: string; value: number }) => value.group + input.pipe( groupedOrderByWithFractionalIndex((item) => item.value, { limit: 2, - groupKeyFn: (_key, value) => value.group, + groupKeyFn, }), output((message) => { tracker.addMessage(message) @@ -283,6 +269,19 @@ describe(`Operators`, () => { ) graph.run() + // Check initial output: each group should have limit 2 + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(4) + + // Sort all results by fractional index first, then group by group key + const sortedInitialResults = sortByKeyAndIndex(initialResult.sortedResults) + const initialGroupedValues = groupResultsByKey(sortedInitialResults, groupKeyFn) + + // group1 should have values 10, 20 (top 2 by ascending value, ordered by fractional index) + expect(initialGroupedValues.get(`group1`)).toEqual([10, 20]) + // group2 should have values 5, 15 (top 2 by ascending value, ordered by fractional index) + expect(initialGroupedValues.get(`group2`)).toEqual([5, 15]) + tracker.reset() // Update only group1 - add a better value @@ -315,9 +314,18 @@ describe(`Operators`, () => { expect(removalValue.id).toBe(`g1-b`) // Check that addition is for value 5 (g1-c) - const [_additionKey, [additionValue, _additionIdx]] = additionMessage![0] + const [_additionKey, [additionValue, additionIdx]] = additionMessage![0] expect(additionValue.value).toBe(5) expect(additionValue.id).toBe(`g1-c`) + + // Check that the fractional index of the added value (5) is smaller than the index of value 10 + const finalResult = tracker.getResult(compareFractionalIndex) + const value10Entry = finalResult.sortedResults.find( + ([_key, [value, _index]]) => value.value === 10 && value.group === `group1`, + ) + expect(value10Entry).toBeDefined() + const [_value10Key, [_value10Value, value10Idx]] = value10Entry! + expect(additionIdx < value10Idx).toBe(true) }) it(`should support offset within groups`, () => { @@ -357,9 +365,8 @@ describe(`Operators`, () => { const result = tracker.getResult(compareFractionalIndex) // With offset 1 and limit 2, should get values 2 and 3 - const values = result.sortedResults - .map(([_key, [value, _index]]) => value.value) - .sort((a, b) => a - b) + const sortedResults = sortByKeyAndIndex(result.sortedResults) + const values = sortedResults.map(([_key, [value, _index]]) => value.value) expect(values).toEqual([2, 3]) }) @@ -399,9 +406,8 @@ describe(`Operators`, () => { const result = tracker.getResult(compareFractionalIndex) // With descending order and limit 2, should get values 3 and 2 - const values = result.sortedResults - .map(([_key, [value, _index]]) => value.value) - .sort((a, b) => b - a) // Sort descending for comparison + const sortedResults = sortByKeyAndIndex(result.sortedResults) + const values = sortedResults.map(([_key, [value, _index]]) => value.value) expect(values).toEqual([3, 2]) }) @@ -414,11 +420,13 @@ describe(`Operators`, () => { [string, [{ id: string; value: number }, string]] >() + const groupKeyFn = (key: string, _value: { id: string; value: number }) => key.split(`:`)[0]! + input.pipe( groupedOrderByWithFractionalIndex((item) => item.value, { limit: 2, // Extract group from key "group:itemId" - groupKeyFn: (key, _value) => key.split(`:`)[0], + groupKeyFn, }), output((message) => { tracker.addMessage(message) @@ -441,19 +449,9 @@ describe(`Operators`, () => { const result = tracker.getResult(compareFractionalIndex) - // Group results by group extracted from key - const groupedValues = new Map>() - for (const [key, [value, _index]] of result.sortedResults) { - const group = key.split(`:`)[0]! - const list = groupedValues.get(group) ?? [] - list.push(value.value) - groupedValues.set(group, list) - } - - for (const [group, values] of groupedValues) { - values.sort((a, b) => a - b) - groupedValues.set(group, values) - } + // Sort all results by fractional index first, then group by group key + const sortedResults = sortByKeyAndIndex(result.sortedResults) + const groupedValues = groupResultsByKey(sortedResults, groupKeyFn) expect(groupedValues.get(`group1`)).toEqual([1, 3]) expect(groupedValues.get(`group2`)).toEqual([2, 4]) @@ -494,9 +492,8 @@ describe(`Operators`, () => { // All 3 items should be in the result expect(result.sortedResults.length).toBe(3) - const values = result.sortedResults - .map(([_key, [value, _index]]) => value.value) - .sort((a, b) => a - b) + const sortedResults = sortByKeyAndIndex(result.sortedResults) + const values = sortedResults.map(([_key, [value, _index]]) => value.value) expect(values).toEqual([1, 3, 5]) }) @@ -580,9 +577,10 @@ describe(`Operators`, () => { // Initial: values 1, 2 const initialResult = tracker.getResult(compareFractionalIndex) - const initialValues = initialResult.sortedResults - .map(([_key, [value, _index]]) => value.value) - .sort((a, b) => a - b) + const sortedInitialResults = sortByKeyAndIndex(initialResult.sortedResults) + const initialValues = sortedInitialResults.map( + ([_key, [value, _index]]) => value.value, + ) expect(initialValues).toEqual([1, 2]) // Move window to offset 1 @@ -591,9 +589,10 @@ describe(`Operators`, () => { // Now should have values 2, 3 const movedResult = tracker.getResult(compareFractionalIndex) - const movedValues = movedResult.sortedResults - .map(([_key, [value, _index]]) => value.value) - .sort((a, b) => a - b) + const sortedMovedResults = sortByKeyAndIndex(movedResult.sortedResults) + const movedValues = sortedMovedResults.map( + ([_key, [value, _index]]) => value.value, + ) expect(movedValues).toEqual([2, 3]) }) @@ -676,10 +675,66 @@ describe(`Operators`, () => { graph.run() const result = tracker.getResult(compareFractionalIndex) - const names = result.sortedResults - .map(([_key, [value, _index]]) => value.name) - .sort() + const sortedResults = sortByKeyAndIndex(result.sortedResults) + const names = sortedResults.map(([_key, [value, _index]]) => value.name) expect(names).toEqual([`alice`, `bob`]) }) }) }) + +/** + * Helper function to sort results by key and then index + */ +function sortByKeyAndIndex(results: Array) { + return [...results] + .sort( + ( + [[_aKey, [_aValue, _aIndex]], aMultiplicity], + [[_bKey, [_bValue, _bIndex]], bMultiplicity], + ) => aMultiplicity - bMultiplicity, + ) + .sort( + ( + [[aKey, [_aValue, _aIndex]], _aMultiplicity], + [[bKey, [_bValue, _bIndex]], _bMultiplicity], + ) => { + // Compare keys - handle string, array, and numeric keys + if (typeof aKey === 'number' && typeof bKey === 'number') { + return aKey - bKey + } + // For string or array keys, convert to string for comparison + const aKeyStr = Array.isArray(aKey) ? aKey.join(',') : String(aKey) + const bKeyStr = Array.isArray(bKey) ? bKey.join(',') : String(bKey) + return aKeyStr < bKeyStr ? -1 : aKeyStr > bKeyStr ? 1 : 0 + }, + ) + .sort( + ( + [[_aKey, [_aValue, aIndex]], _aMultiplicity], + [[_bKey, [_bValue, bIndex]], _bMultiplicity], + ) => { + // lexically compare the index + return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 + }, + ) +} + +/** + * Helper function to group sorted results by group key and extract values. + * Results should already be sorted by fractional index. + * Returns a Map of group key -> array of values (ordered by fractional index). + */ +function groupResultsByKey( + sortedResults: Array, + groupKeyFn: (key: any, value: any) => TGroupKey, +): Map> { + const groupedValues = new Map>() + for (const [key, [value, _index]] of sortedResults) { + const group = groupKeyFn(key, value) + const list = groupedValues.get(group) ?? [] + // Extract the numeric value from the value object + list.push((value as { value: number }).value) + groupedValues.set(group, list) + } + return groupedValues +} From 391067404c05ad3b77ae3ea4d77724922a003804 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 10 Dec 2025 13:35:56 +0100 Subject: [PATCH 12/15] Remove unnecessary changes --- packages/rxdb-db-collection/src/rxdb.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/rxdb-db-collection/src/rxdb.ts b/packages/rxdb-db-collection/src/rxdb.ts index 23880c534..6ec59cfb6 100644 --- a/packages/rxdb-db-collection/src/rxdb.ts +++ b/packages/rxdb-db-collection/src/rxdb.ts @@ -280,6 +280,7 @@ export function rxdbCollectionOptions(config: RxDBCollectionConfig) { onUpdate: async (params) => { debug(`update`, params) const mutations = params.transaction.mutations.filter( + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition (m) => m.type === `update`, ) @@ -296,6 +297,7 @@ export function rxdbCollectionOptions(config: RxDBCollectionConfig) { onDelete: async (params) => { debug(`delete`, params) const mutations = params.transaction.mutations.filter( + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition (m) => m.type === `delete`, ) const ids = mutations.map((mutation) => getKey(mutation.original)) From 606581819f0692eb3de72297e095561e9d692f52 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 10 Dec 2025 12:38:20 +0000 Subject: [PATCH 13/15] ci: apply automated fixes --- .../groupedOrderByWithFractionalIndex.test.ts | 37 ++++++++++++++----- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts index 850d0c99a..a4734d4a5 100644 --- a/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts +++ b/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts @@ -21,8 +21,8 @@ describe(`Operators`, () => { >() const tracker = new MessageTracker< [[string, string], [{ id: string; value: number }, string]] - >() - + >() + const groupKeyFn = (key: [string, string]) => key[0] input.pipe( @@ -75,7 +75,10 @@ describe(`Operators`, () => { [string, [{ id: string; group: string; value: number }, string]] >() - const groupKeyFn = (_key: string, value: { id: string; group: string; value: number }) => value.group + const groupKeyFn = ( + _key: string, + value: { id: string; group: string; value: number }, + ) => value.group input.pipe( groupedOrderByWithFractionalIndex((item) => item.value, { @@ -152,7 +155,9 @@ describe(`Operators`, () => { // Initial should have 2 items (limit 2): values 1 and 3 const initialResult = tracker.getResult(compareFractionalIndex) expect(initialResult.sortedResults.length).toBe(2) - const sortedInitialResults = sortByKeyAndIndex(initialResult.sortedResults) + const sortedInitialResults = sortByKeyAndIndex( + initialResult.sortedResults, + ) const initialValues = sortedInitialResults.map( ([_key, [value, _index]]) => value.value, ) @@ -244,7 +249,10 @@ describe(`Operators`, () => { [string, [{ id: string; group: string; value: number }, string]] >() - const groupKeyFn = (_key: string, value: { id: string; group: string; value: number }) => value.group + const groupKeyFn = ( + _key: string, + value: { id: string; group: string; value: number }, + ) => value.group input.pipe( groupedOrderByWithFractionalIndex((item) => item.value, { @@ -274,8 +282,13 @@ describe(`Operators`, () => { expect(initialResult.sortedResults.length).toBe(4) // Sort all results by fractional index first, then group by group key - const sortedInitialResults = sortByKeyAndIndex(initialResult.sortedResults) - const initialGroupedValues = groupResultsByKey(sortedInitialResults, groupKeyFn) + const sortedInitialResults = sortByKeyAndIndex( + initialResult.sortedResults, + ) + const initialGroupedValues = groupResultsByKey( + sortedInitialResults, + groupKeyFn, + ) // group1 should have values 10, 20 (top 2 by ascending value, ordered by fractional index) expect(initialGroupedValues.get(`group1`)).toEqual([10, 20]) @@ -321,7 +334,8 @@ describe(`Operators`, () => { // Check that the fractional index of the added value (5) is smaller than the index of value 10 const finalResult = tracker.getResult(compareFractionalIndex) const value10Entry = finalResult.sortedResults.find( - ([_key, [value, _index]]) => value.value === 10 && value.group === `group1`, + ([_key, [value, _index]]) => + value.value === 10 && value.group === `group1`, ) expect(value10Entry).toBeDefined() const [_value10Key, [_value10Value, value10Idx]] = value10Entry! @@ -420,7 +434,8 @@ describe(`Operators`, () => { [string, [{ id: string; value: number }, string]] >() - const groupKeyFn = (key: string, _value: { id: string; value: number }) => key.split(`:`)[0]! + const groupKeyFn = (key: string, _value: { id: string; value: number }) => + key.split(`:`)[0]! input.pipe( groupedOrderByWithFractionalIndex((item) => item.value, { @@ -577,7 +592,9 @@ describe(`Operators`, () => { // Initial: values 1, 2 const initialResult = tracker.getResult(compareFractionalIndex) - const sortedInitialResults = sortByKeyAndIndex(initialResult.sortedResults) + const sortedInitialResults = sortByKeyAndIndex( + initialResult.sortedResults, + ) const initialValues = sortedInitialResults.map( ([_key, [value, _index]]) => value.value, ) From d17bf7b9f77a03b4932123ed305af9ace44c1b45 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 10 Dec 2025 13:48:33 +0100 Subject: [PATCH 14/15] Fix failing test --- .../groupedOrderByWithFractionalIndex.test.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts index a4734d4a5..f7df002b0 100644 --- a/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts +++ b/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts @@ -295,6 +295,13 @@ describe(`Operators`, () => { // group2 should have values 5, 15 (top 2 by ascending value, ordered by fractional index) expect(initialGroupedValues.get(`group2`)).toEqual([5, 15]) + // Capture the fractional index of value 10 before reset + const value10Entry = sortedInitialResults.find( + ([_key, [value, _index]]) => value.value === 10 && value.group === `group1`, + ) + expect(value10Entry).toBeDefined() + const [_value10Key, [_value10Value, value10Idx]] = value10Entry! + tracker.reset() // Update only group1 - add a better value @@ -332,13 +339,6 @@ describe(`Operators`, () => { expect(additionValue.id).toBe(`g1-c`) // Check that the fractional index of the added value (5) is smaller than the index of value 10 - const finalResult = tracker.getResult(compareFractionalIndex) - const value10Entry = finalResult.sortedResults.find( - ([_key, [value, _index]]) => - value.value === 10 && value.group === `group1`, - ) - expect(value10Entry).toBeDefined() - const [_value10Key, [_value10Value, value10Idx]] = value10Entry! expect(additionIdx < value10Idx).toBe(true) }) From e9305b8abb76fab919ec3cd52be4792a11d53e14 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 10 Dec 2025 12:54:28 +0000 Subject: [PATCH 15/15] ci: apply automated fixes --- .../tests/operators/groupedOrderByWithFractionalIndex.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts index f7df002b0..d88617b88 100644 --- a/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts +++ b/packages/db-ivm/tests/operators/groupedOrderByWithFractionalIndex.test.ts @@ -297,7 +297,8 @@ describe(`Operators`, () => { // Capture the fractional index of value 10 before reset const value10Entry = sortedInitialResults.find( - ([_key, [value, _index]]) => value.value === 10 && value.group === `group1`, + ([_key, [value, _index]]) => + value.value === 10 && value.group === `group1`, ) expect(value10Entry).toBeDefined() const [_value10Key, [_value10Value, value10Idx]] = value10Entry!