diff --git a/api_guard/dist/types/operators/index.d.ts b/api_guard/dist/types/operators/index.d.ts index cfff96d807..202ee14df3 100644 --- a/api_guard/dist/types/operators/index.d.ts +++ b/api_guard/dist/types/operators/index.d.ts @@ -118,11 +118,13 @@ export declare function first(predicate: (value: T, index: number, sou export declare const flatMap: typeof mergeMap; -export declare function groupBy(keySelector: (value: T) => value is K): OperatorFunction | GroupedObservable>>; -export declare function groupBy(keySelector: (value: T) => K): OperatorFunction>; -export declare function groupBy(keySelector: (value: T) => K, elementSelector: void, durationSelector: (grouped: GroupedObservable) => Observable): OperatorFunction>; -export declare function groupBy(keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable) => Observable): OperatorFunction>; -export declare function groupBy(keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable) => Observable, subjectSelector?: () => Subject): OperatorFunction>; +export declare function groupBy(key: (value: T) => K, options: BasicGroupByOptions): OperatorFunction>; +export declare function groupBy(key: (value: T) => K, options: GroupByOptionsWithElement): OperatorFunction>; +export declare function groupBy(key: (value: T) => value is K): OperatorFunction | GroupedObservable>>; +export declare function groupBy(key: (value: T) => K): OperatorFunction>; +export declare function groupBy(key: (value: T) => K, element: void, duration: (grouped: GroupedObservable) => Observable): OperatorFunction>; +export declare function groupBy(key: (value: T) => K, element?: (value: T) => R, duration?: (grouped: GroupedObservable) => Observable): OperatorFunction>; +export declare function groupBy(key: (value: T) => K, element?: (value: T) => R, duration?: (grouped: GroupedObservable) => Observable, connector?: () => Subject): OperatorFunction>; export declare function ignoreElements(): OperatorFunction; diff --git a/spec/operators/groupBy-spec.ts b/spec/operators/groupBy-spec.ts index e22e42911a..fcc6d4caad 100644 --- a/spec/operators/groupBy-spec.ts +++ b/spec/operators/groupBy-spec.ts @@ -42,7 +42,7 @@ describe('groupBy operator', () => { ]; of(1, 2, 3).pipe( - groupBy((x: number) => x % 2) + groupBy((x) => x % 2) ).subscribe((g: any) => { const expectedGroup = expectedGroups.shift()!; expect(g.key).to.equal(expectedGroup.key); @@ -60,7 +60,7 @@ describe('groupBy operator', () => { ]; of(1, 2, 3).pipe( - groupBy((x: number) => x % 2, (x: number) => x + '!') + groupBy((x) => x % 2, (x) => x + '!') ).subscribe((g: any) => { const expectedGroup = expectedGroups.shift()!; expect(g.key).to.equal(expectedGroup.key); @@ -82,21 +82,20 @@ describe('groupBy operator', () => { const resultingGroups: { key: number, values: number [] }[] = []; of(1, 2, 3, 4, 5, 6).pipe( - groupBy( - (x: number) => x % 2, - (x: number) => x, - (g: any) => g.pipe(skip(1))) - ).subscribe((g: any) => { - let group = { key: g.key, values: [] as number[] }; - - g.subscribe((x: any) => { - group.values.push(x); - }); + groupBy(x => x % 2, { + duration: g => g.pipe(skip(1)) + }) + ).subscribe((g: any) => { + let group = { key: g.key, values: [] as number[] }; - resultingGroups.push(group); + g.subscribe((x: any) => { + group.values.push(x); }); - expect(resultingGroups).to.deep.equal(expectedGroups); + resultingGroups.push(group); + }); + + expect(resultingGroups).to.deep.equal(expectedGroups); }); it('should group values with a subject selector', (done) => { @@ -106,7 +105,9 @@ describe('groupBy operator', () => { ]; of(1, 2, 3).pipe( - groupBy((x: number) => x % 2, null as any, null as any, () => new ReplaySubject(1)), + groupBy(x => x % 2, { + connector: () => new ReplaySubject(1), + }), // Ensure each inner group reaches the destination after the first event // has been next'd to the group delay(5) @@ -802,11 +803,11 @@ describe('groupBy operator', () => { const expectedValues = { v: v, w: w, x: x, y: y, z: z }; const source = e1 - .pipe(groupBy( - (val: string) => val.toLowerCase().trim(), - (val: string) => val, - (group: any) => group.pipe(skip(2)) - )); + .pipe( + groupBy(val => val.toLowerCase().trim(), { + duration: group => group.pipe(skip(2)), + }) + ); expectObservable(source).toBe(expected, expectedValues); expectSubscriptions(e1.subscriptions).toBe(e1subs); @@ -836,11 +837,9 @@ describe('groupBy operator', () => { const expectedValues = { v: v, w: w, x: x }; const source = e1 - .pipe(groupBy( - (val: string) => val.toLowerCase().trim(), - (val: string) => val, - (group: any) => group.pipe(skip(2)) - )); + .pipe(groupBy(val => val.toLowerCase().trim(), { + duration: group => group.pipe(skip(2)) + })); expectObservable(source, unsub).toBe(expected, expectedValues); }); @@ -879,17 +878,15 @@ describe('groupBy operator', () => { .unsubscribedFrame; const source = e1.pipe( - groupBy( - (val: string) => val.toLowerCase().trim(), - (val: string) => val, - (group: any) => group.pipe(skip(2)) - ), - map((group: any) => { + groupBy(val => val.toLowerCase().trim(), { + duration: group => group.pipe(skip(2)) + }), + map((group) => { const arr: any[] = []; const subscription = group.pipe( phonyMarbelize() - ).subscribe((value: any) => { + ).subscribe((value) => { arr.push(value); }); @@ -923,11 +920,9 @@ describe('groupBy operator', () => { .parseMarblesAsSubscriptions(sub) .unsubscribedFrame; - obs.pipe(groupBy( - (val: string) => val, - (val: string) => val, - (group: any) => durations[group.key] - )).subscribe(); + obs.pipe(groupBy((val) => val, { + duration: (group) => durations[Number(group.key)] + })).subscribe(); rxTestScheduler.schedule(() => { durations.forEach((d, i) => { diff --git a/src/internal/operators/groupBy.ts b/src/internal/operators/groupBy.ts index cc77f3d03c..30c7c46e4a 100644 --- a/src/internal/operators/groupBy.ts +++ b/src/internal/operators/groupBy.ts @@ -1,28 +1,51 @@ import { Observable } from '../Observable'; +import { innerFrom } from '../observable/from'; import { Subject } from '../Subject'; -import { Observer, OperatorFunction } from '../types'; +import { ObservableInput, Observer, OperatorFunction, SubjectLike } from '../types'; import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; +interface BasicGroupByOptions { + element?: undefined; + duration?: (grouped: GroupedObservable) => ObservableInput; + connector?: () => SubjectLike; +} + +interface GroupByOptionsWithElement { + element: (value: T) => E; + duration?: (grouped: GroupedObservable) => ObservableInput; + connector?: () => SubjectLike; +} + +export function groupBy(key: (value: T) => K, options: BasicGroupByOptions): OperatorFunction>; + +export function groupBy( + key: (value: T) => K, + options: GroupByOptionsWithElement +): OperatorFunction>; + export function groupBy( - keySelector: (value: T) => value is K + key: (value: T) => value is K ): OperatorFunction | GroupedObservable>>; -export function groupBy(keySelector: (value: T) => K): OperatorFunction>; + +export function groupBy(key: (value: T) => K): OperatorFunction>; + +/** + * @deprecated use the options parameter instead. + */ export function groupBy( - keySelector: (value: T) => K, - elementSelector: void, - durationSelector: (grouped: GroupedObservable) => Observable + key: (value: T) => K, + element: void, + duration: (grouped: GroupedObservable) => Observable ): OperatorFunction>; + +/** + * @deprecated use the options parameter instead. + */ export function groupBy( - keySelector: (value: T) => K, - elementSelector?: (value: T) => R, - durationSelector?: (grouped: GroupedObservable) => Observable -): OperatorFunction>; -export function groupBy( - keySelector: (value: T) => K, - elementSelector?: (value: T) => R, - durationSelector?: (grouped: GroupedObservable) => Observable, - subjectSelector?: () => Subject + key: (value: T) => K, + element?: (value: T) => R, + duration?: (grouped: GroupedObservable) => Observable ): OperatorFunction>; /** @@ -32,7 +55,7 @@ export function groupBy( * * ![](groupBy.png) * - * When the Observable emits an item, a key is computed for this item with the keySelector function. + * When the Observable emits an item, a key is computed for this item with the key function. * * If a {@link GroupedObservable} for this key exists, this {@link GroupedObservable} emits. Otherwise, a new * {@link GroupedObservable} for this key is created and emits. @@ -41,7 +64,7 @@ export function groupBy( * key is available as the `key` field of a {@link GroupedObservable} instance. * * The elements emitted by {@link GroupedObservable}s are by default the items emitted by the Observable, or elements - * returned by the elementSelector function. + * returned by the element function. * * ## Examples * @@ -101,28 +124,45 @@ export function groupBy( * // { id: 3, values: [ 'TSLint' ] } * ``` * - * @param {function(value: T): K} keySelector A function that extracts the key + * @param key A function that extracts the key * for each item. - * @param {function(value: T): R} [elementSelector] A function that extracts the + * @param element A function that extracts the * return element for each item. - * @param {function(grouped: GroupedObservable): Observable} [durationSelector] + * @param duration * A function that returns an Observable to determine how long each group should * exist. - * @param {function(): Subject} [subjectSelector] Factory function to create an + * @param connector Factory function to create an * intermediate Subject through which grouped elements are emitted. * @return A function that returns an Observable that emits GroupedObservables, * each of which corresponds to a unique key value and each of which emits * those items from the source Observable that share that key value. + * + * @deprecated Use the options parameter instead. */ +export function groupBy( + key: (value: T) => K, + element?: (value: T) => R, + duration?: (grouped: GroupedObservable) => Observable, + connector?: () => Subject +): OperatorFunction>; + +// Impl export function groupBy( keySelector: (value: T) => K, - elementSelector?: ((value: T) => R) | void, - durationSelector?: (grouped: GroupedObservable) => Observable, - subjectSelector?: () => Subject + elementOrOptions?: ((value: any) => any) | void | BasicGroupByOptions | GroupByOptionsWithElement, + duration?: (grouped: GroupedObservable) => ObservableInput, + connector?: () => SubjectLike ): OperatorFunction> { return operate((source, subscriber) => { + let element: ((value: any) => any) | void; + if (!elementOrOptions || typeof elementOrOptions === 'function') { + element = elementOrOptions; + } else { + ({ duration, element, connector } = elementOrOptions); + } + // A lookup for the groups that we have so far. - const groups = new Map>(); + const groups = new Map>(); // Used for notifying all groups and the subscriber in the same way. const notify = (cb: (group: Observer) => void) => { @@ -153,7 +193,7 @@ export function groupBy( let group = groups.get(key); if (!group) { // Create our group subject - groups.set(key, (group = subjectSelector ? subjectSelector() : new Subject())); + groups.set(key, (group = connector ? connector() : new Subject())); // Emit the grouped observable. Note that we can't do a simple `asObservable()` here, // because the grouped observable has special semantics around reference counting @@ -161,7 +201,7 @@ export function groupBy( const grouped = createGroupedObservable(key, group); subscriber.next(grouped); - if (durationSelector) { + if (duration) { const durationSubscriber = new OperatorSubscriber( // Providing the group here ensures that it is disposed of -- via `unsubscribe` -- // wnen the duration subscription is torn down. That is important, because then @@ -185,12 +225,12 @@ export function groupBy( ); // Start our duration notifier. - groupBySourceSubscriber.add(durationSelector(grouped).subscribe(durationSubscriber)); + groupBySourceSubscriber.add(innerFrom(duration(grouped)).subscribe(durationSubscriber)); } } // Send the value to our group. - group.next(elementSelector ? elementSelector(value) : value); + group.next(element ? element(value) : value); } catch (err) { handleError(err); } @@ -214,7 +254,7 @@ export function groupBy( * @param key The key of the group * @param groupSubject The subject that fuels the group */ - function createGroupedObservable(key: K, groupSubject: Subject) { + function createGroupedObservable(key: K, groupSubject: SubjectLike) { const result: any = new Observable((groupSubscriber) => { groupBySourceSubscriber.activeGroups++; const innerSub = groupSubject.subscribe(groupSubscriber);