@@ -2,14 +2,22 @@ import { generateKeyBetween } from "fractional-indexing"
22import { DifferenceStreamWriter , UnaryOperator } from "../graph.js"
33import { StreamBuilder } from "../d2.js"
44import { MultiSet } from "../multiset.js"
5- import { binarySearch , globalObjectIdGenerator } from "../utils.js"
5+ import {
6+ binarySearch ,
7+ diffHalfOpen ,
8+ globalObjectIdGenerator ,
9+ } from "../utils.js"
10+ import type { HRange } from "../utils.js"
611import type { DifferenceStreamReader } from "../graph.js"
712import type { IStreamBuilder , PipedOperator } from "../types.js"
813
914export interface TopKWithFractionalIndexOptions {
1015 limit ?: number
1116 offset ?: number
1217 setSizeCallback ?: ( getSize : ( ) => number ) => void
18+ setWindowFn ?: (
19+ windowFn : ( options : { offset ?: number ; limit ?: number } ) => void
20+ ) => void
1321}
1422
1523export type TopKChanges < V > = {
@@ -19,6 +27,15 @@ export type TopKChanges<V> = {
1927 moveOut : IndexedValue < V > | null
2028}
2129
30+ export type TopKMoveChanges < V > = {
31+ /** Flag that marks whether there were any changes to the topK */
32+ changes : boolean
33+ /** Indicates which elements move into the topK (if any) */
34+ moveIns : Array < IndexedValue < V > >
35+ /** Indicates which elements move out of the topK (if any) */
36+ moveOuts : Array < IndexedValue < V > >
37+ }
38+
2239/**
2340 * A topK data structure that supports insertions and deletions
2441 * and returns changes to the topK.
@@ -58,6 +75,49 @@ class TopKArray<V> implements TopK<V> {
5875 return Math . max ( 0 , Math . min ( limit , available ) )
5976 }
6077
78+ /**
79+ * Moves the topK window
80+ */
81+ move ( {
82+ offset,
83+ limit,
84+ } : {
85+ offset ?: number
86+ limit ?: number
87+ } ) : TopKMoveChanges < V > {
88+ const oldOffset = this . #topKStart
89+ const oldLimit = this . #topKEnd - this . #topKStart
90+ const oldRange : HRange = [ this . #topKStart, this . #topKEnd]
91+
92+ this . #topKStart = offset ?? oldOffset
93+ this . #topKEnd = this . #topKStart + ( limit ?? oldLimit )
94+
95+ const newRange : HRange = [ this . #topKStart, this . #topKEnd]
96+ const { onlyInA, onlyInB } = diffHalfOpen ( oldRange , newRange )
97+
98+ const moveIns : Array < IndexedValue < V > > = [ ]
99+ onlyInB . forEach ( ( index ) => {
100+ const value = this . #sortedValues[ index ]
101+ if ( value ) {
102+ moveIns . push ( value )
103+ }
104+ } )
105+
106+ const moveOuts : Array < IndexedValue < V > > = [ ]
107+ onlyInA . forEach ( ( index ) => {
108+ const value = this . #sortedValues[ index ]
109+ if ( value ) {
110+ moveOuts . push ( value )
111+ }
112+ } )
113+
114+ // It could be that there are changes (i.e. moveIns or moveOuts)
115+ // but that the collection is lazy so we don't have the data yet that needs to move in/out
116+ // so `moveIns` and `moveOuts` will be empty but `changes` will be true
117+ // this will tell the caller that it needs to run the graph to load more data
118+ return { moveIns, moveOuts, changes : onlyInA . length + onlyInB . length > 0 }
119+ }
120+
61121 insert ( value : V ) : TopKChanges < V > {
62122 const result : TopKChanges < V > = { moveIn : null , moveOut : null }
63123
@@ -178,8 +238,6 @@ export class TopKWithFractionalIndexOperator<K, T> extends UnaryOperator<
178238 */
179239 #topK: TopK < TaggedValue < K , T > >
180240
181- #limit: number
182-
183241 constructor (
184242 id : number ,
185243 inputA : DifferenceStreamReader < [ K , T ] > ,
@@ -188,7 +246,7 @@ export class TopKWithFractionalIndexOperator<K, T> extends UnaryOperator<
188246 options : TopKWithFractionalIndexOptions
189247 ) {
190248 super ( id , inputA , output )
191- this . # limit = options . limit ?? Infinity
249+ const limit = options . limit ?? Infinity
192250 const offset = options . offset ?? 0
193251 const compareTaggedValues = (
194252 a : TaggedValue < K , T > ,
@@ -204,8 +262,9 @@ export class TopKWithFractionalIndexOperator<K, T> extends UnaryOperator<
204262 const tieBreakerB = getTag ( b )
205263 return tieBreakerA - tieBreakerB
206264 }
207- this . #topK = this . createTopK ( offset , this . # limit, compareTaggedValues )
265+ this . #topK = this . createTopK ( offset , limit , compareTaggedValues )
208266 options . setSizeCallback ?.( ( ) => this . #topK. size )
267+ options . setWindowFn ?.( this . moveTopK . bind ( this ) )
209268 }
210269
211270 protected createTopK (
@@ -216,6 +275,32 @@ export class TopKWithFractionalIndexOperator<K, T> extends UnaryOperator<
216275 return new TopKArray ( offset , limit , comparator )
217276 }
218277
278+ /**
279+ * Moves the topK window based on the provided offset and limit.
280+ * Any changes to the topK are sent to the output.
281+ */
282+ moveTopK ( { offset, limit } : { offset ?: number ; limit ?: number } ) {
283+ if ( ! ( this . #topK instanceof TopKArray ) ) {
284+ throw new Error (
285+ `Cannot move B+-tree implementation of TopK with fractional index`
286+ )
287+ }
288+
289+ const result : Array < [ [ K , IndexedValue < T > ] , number ] > = [ ]
290+
291+ const diff = this . #topK. move ( { offset, limit } )
292+
293+ diff . moveIns . forEach ( ( moveIn ) => this . handleMoveIn ( moveIn , result ) )
294+ diff . moveOuts . forEach ( ( moveOut ) => this . handleMoveOut ( moveOut , result ) )
295+
296+ if ( diff . changes ) {
297+ // There are changes to the topK
298+ // it could be that moveIns and moveOuts are empty
299+ // because the collection is lazy, so we will run the graph again to load the data
300+ this . output . sendData ( new MultiSet ( result ) )
301+ }
302+ }
303+
219304 run ( ) : void {
220305 const result : Array < [ [ K , IndexedValue < T > ] , number ] > = [ ]
221306 for ( const message of this . inputMessages ( ) ) {
@@ -258,23 +343,36 @@ export class TopKWithFractionalIndexOperator<K, T> extends UnaryOperator<
258343 // so it doesn't affect the topK
259344 }
260345
261- if ( res . moveIn ) {
262- const index = getIndex ( res . moveIn )
263- const taggedValue = getValue ( res . moveIn )
346+ this . handleMoveIn ( res . moveIn , result )
347+ this . handleMoveOut ( res . moveOut , result )
348+
349+ return
350+ }
351+
352+ private handleMoveIn (
353+ moveIn : IndexedValue < TaggedValue < K , T > > | null ,
354+ result : Array < [ [ K , IndexedValue < T > ] , number ] >
355+ ) {
356+ if ( moveIn ) {
357+ const index = getIndex ( moveIn )
358+ const taggedValue = getValue ( moveIn )
264359 const k = getKey ( taggedValue )
265360 const val = getVal ( taggedValue )
266361 result . push ( [ [ k , [ val , index ] ] , 1 ] )
267362 }
363+ }
268364
269- if ( res . moveOut ) {
270- const index = getIndex ( res . moveOut )
271- const taggedValue = getValue ( res . moveOut )
365+ private handleMoveOut (
366+ moveOut : IndexedValue < TaggedValue < K , T > > | null ,
367+ result : Array < [ [ K , IndexedValue < T > ] , number ] >
368+ ) {
369+ if ( moveOut ) {
370+ const index = getIndex ( moveOut )
371+ const taggedValue = getValue ( moveOut )
272372 const k = getKey ( taggedValue )
273373 const val = getVal ( taggedValue )
274374 result . push ( [ [ k , [ val , index ] ] , - 1 ] )
275375 }
276-
277- return
278376 }
279377
280378 private getMultiplicity ( key : K ) : number {
0 commit comments