From b21d811ad34672bf7a7f7d9d908cd317517b38b2 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 25 Aug 2020 17:59:36 -0500 Subject: [PATCH] feat(groupBy): Support named arguments, support ObservableInputs for duration selector - Adds support for named arguments. - Adds support for returning promises, et al, from the duration selector. NOTES: * There's a problem with type inferrence from the options `duration` selector that is noted in a `TODO` in the code here. * The tests for `groupBy` appear to be EXTREMELY old and outdated, and I was unable to updated them easily to use run mode. We may have to rewrite them all at some point to use better techniques. The issue seems to be a rudementary means of testing the inner observables that is incompatible with run mode. * Docs still need updated * Older paths still need to be deprecated * dtslint tests need to be added --- spec/operators/groupBy-spec.ts | 75 ++++++++++----------- src/internal/operators/groupBy.ts | 106 ++++++++++++++++++++---------- 2 files changed, 109 insertions(+), 72 deletions(-) diff --git a/spec/operators/groupBy-spec.ts b/spec/operators/groupBy-spec.ts index 3c98dd4721a..5e797c1e1fc 100644 --- a/spec/operators/groupBy-spec.ts +++ b/spec/operators/groupBy-spec.ts @@ -42,7 +42,7 @@ describe('groupBy operator', () => { ]; of(1, 2, 3).pipe( - groupBy((x: number) => x % 2) + groupBy((x) => x % 2) ).subscribe((g: any) => { const expectedGroup = expectedGroups.shift()!; expect(g.key).to.equal(expectedGroup.key); @@ -60,7 +60,7 @@ describe('groupBy operator', () => { ]; of(1, 2, 3).pipe( - groupBy((x: number) => x % 2, (x: number) => x + '!') + groupBy((x) => x % 2, (x) => x + '!') ).subscribe((g: any) => { const expectedGroup = expectedGroups.shift()!; expect(g.key).to.equal(expectedGroup.key); @@ -82,21 +82,21 @@ describe('groupBy operator', () => { const resultingGroups: { key: number, values: number [] }[] = []; of(1, 2, 3, 4, 5, 6).pipe( - groupBy( - (x: number) => x % 2, - (x: number) => x, - (g: any) => g.pipe(skip(1))) - ).subscribe((g: any) => { - let group = { key: g.key, values: [] as number[] }; - - g.subscribe((x: any) => { - group.values.push(x); - }); + groupBy({ + key: x => x % 2, + duration: g => g.pipe(skip(1)) + }) + ).subscribe((g: any) => { + let group = { key: g.key, values: [] as number[] }; - resultingGroups.push(group); + g.subscribe((x: any) => { + group.values.push(x); }); - expect(resultingGroups).to.deep.equal(expectedGroups); + resultingGroups.push(group); + }); + + expect(resultingGroups).to.deep.equal(expectedGroups); }); it('should group values with a subject selector', (done: MochaDone) => { @@ -106,7 +106,10 @@ describe('groupBy operator', () => { ]; of(1, 2, 3).pipe( - groupBy((x: number) => x % 2, null as any, null as any, () => new ReplaySubject(1)), + groupBy({ + key: x => x % 2, + subject: () => new ReplaySubject(1) + }), // Ensure each inner group reaches the destination after the first event // has been next'd to the group delay(5) @@ -802,11 +805,12 @@ describe('groupBy operator', () => { const expectedValues = { v: v, w: w, x: x, y: y, z: z }; const source = e1 - .pipe(groupBy( - (val: string) => val.toLowerCase().trim(), - (val: string) => val, - (group: any) => group.pipe(skip(2)) - )); + .pipe( + groupBy({ + key: val => val.toLowerCase().trim(), + duration: group => group.pipe(skip(2)), + }) + ); expectObservable(source).toBe(expected, expectedValues); expectSubscriptions(e1.subscriptions).toBe(e1subs); @@ -836,11 +840,10 @@ describe('groupBy operator', () => { const expectedValues = { v: v, w: w, x: x }; const source = e1 - .pipe(groupBy( - (val: string) => val.toLowerCase().trim(), - (val: string) => val, - (group: any) => group.pipe(skip(2)) - )); + .pipe(groupBy({ + key: val => val.toLowerCase().trim(), + duration: group => group.pipe(skip(2)) + })); expectObservable(source, unsub).toBe(expected, expectedValues); }); @@ -879,17 +882,16 @@ describe('groupBy operator', () => { .unsubscribedFrame; const source = e1.pipe( - groupBy( - (val: string) => val.toLowerCase().trim(), - (val: string) => val, - (group: any) => group.pipe(skip(2)) - ), - map((group: any) => { + groupBy({ + key: val => val.toLowerCase().trim(), + duration: group => group.pipe(skip(2)) + }), + map((group) => { const arr: any[] = []; const subscription = group.pipe( phonyMarbelize() - ).subscribe((value: any) => { + ).subscribe((value) => { arr.push(value); }); @@ -923,11 +925,10 @@ describe('groupBy operator', () => { .parseMarblesAsSubscriptions(sub) .unsubscribedFrame; - obs.pipe(groupBy( - (val: string) => val, - (val: string) => val, - (group: any) => durations[group.key] - )).subscribe(); + obs.pipe(groupBy({ + key: (val) => val, + duration: (group) => durations[group.key] + })).subscribe(); rxTestScheduler.schedule(() => { durations.forEach((d, i) => { diff --git a/src/internal/operators/groupBy.ts b/src/internal/operators/groupBy.ts index 12668a7306f..0626a5edad9 100644 --- a/src/internal/operators/groupBy.ts +++ b/src/internal/operators/groupBy.ts @@ -1,18 +1,45 @@ +/** @prettier */ import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; import { Observable } from '../Observable'; import { Operator } from '../Operator'; import { Subject } from '../Subject'; -import { OperatorFunction } from '../types'; +import { OperatorFunction, ObservableInput } from '../types'; import { lift } from '../util/lift'; +import { from } from '../observable/from'; -/* tslint:disable:max-line-length */ -export function groupBy(keySelector: (value: T) => value is K): OperatorFunction | GroupedObservable>>; +// TODO: The types here aren't really inferring properly. There's an issue +// where the type of `grouped` in `duration` is `GroupedObservable`, +// in all cases, and will not pick up the `K` value from `key`, where the +// return value will work fine. +export interface GroupByOptions { + key: (value: T) => K; + duration?: (grouped: GroupedObservable) => ObservableInput; + subject?: () => Subject; +} + +export function groupBy(options: GroupByOptions): OperatorFunction>; + +export function groupBy( + keySelector: (value: T) => value is K +): OperatorFunction | GroupedObservable>>; export function groupBy(keySelector: (value: T) => K): OperatorFunction>; -export function groupBy(keySelector: (value: T) => K, elementSelector: void, durationSelector: (grouped: GroupedObservable) => Observable): OperatorFunction>; -export function groupBy(keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable) => Observable): OperatorFunction>; -export function groupBy(keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable) => Observable, subjectSelector?: () => Subject): OperatorFunction>; -/* tslint:enable:max-line-length */ +export function groupBy( + keySelector: (value: T) => K, + elementSelector: void, + durationSelector: (grouped: GroupedObservable) => Observable +): OperatorFunction>; +export function groupBy( + keySelector: (value: T) => K, + elementSelector?: (value: T) => R, + durationSelector?: (grouped: GroupedObservable) => Observable +): OperatorFunction>; +export function groupBy( + keySelector: (value: T) => K, + elementSelector?: (value: T) => R, + durationSelector?: (grouped: GroupedObservable) => Observable, + subjectSelector?: () => Subject +): OperatorFunction>; /** * Groups the items emitted by an Observable according to a specified criterion, @@ -105,12 +132,22 @@ export function groupBy(keySelector: (value: T) => K, elementSelector?: * value. * @name groupBy */ -export function groupBy(keySelector: (value: T) => K, - elementSelector?: ((value: T) => R) | void, - durationSelector?: (grouped: GroupedObservable) => Observable, - subjectSelector?: () => Subject): OperatorFunction> { - return (source: Observable) => - lift(source, new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector)); +export function groupBy( + optionsOrKeySelector: GroupByOptions | ((value: T) => K), + elementSelector?: ((value: T) => R) | void, + durationSelector?: (grouped: GroupedObservable) => ObservableInput, + subjectSelector?: () => Subject +): OperatorFunction> { + let keySelector: (value: T) => K; + if (optionsOrKeySelector && typeof optionsOrKeySelector === 'object') { + keySelector = optionsOrKeySelector.key; + durationSelector = optionsOrKeySelector.duration; + subjectSelector = optionsOrKeySelector.subject; + } else { + keySelector = optionsOrKeySelector; + } + + return (source: Observable) => lift(source, new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector)); } export interface RefCountSubscription { @@ -121,16 +158,17 @@ export interface RefCountSubscription { } class GroupByOperator implements Operator> { - constructor(private keySelector: (value: T) => K, - private elementSelector?: ((value: T) => R) | void, - private durationSelector?: (grouped: GroupedObservable) => Observable, - private subjectSelector?: () => Subject) { - } + constructor( + private keySelector: (value: T) => K, + private elementSelector?: ((value: T) => R) | void, + private durationSelector?: (grouped: GroupedObservable) => ObservableInput, + private subjectSelector?: () => Subject + ) {} call(subscriber: Subscriber>, source: any): any { - return source.subscribe(new GroupBySubscriber( - subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector - )); + return source.subscribe( + new GroupBySubscriber(subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector) + ); } } @@ -144,11 +182,13 @@ class GroupBySubscriber extends Subscriber implements RefCountSubscr public attemptedToUnsubscribe: boolean = false; public count: number = 0; - constructor(destination: Subscriber>, - private keySelector: (value: T) => K, - private elementSelector?: ((value: T) => R) | void, - private durationSelector?: (grouped: GroupedObservable) => Observable, - private subjectSelector?: () => Subject) { + constructor( + destination: Subscriber>, + private keySelector: (value: T) => K, + private elementSelector?: ((value: T) => R) | void, + private durationSelector?: (grouped: GroupedObservable) => ObservableInput, + private subjectSelector?: () => Subject + ) { super(destination); } @@ -190,9 +230,9 @@ class GroupBySubscriber extends Subscriber implements RefCountSubscr const groupedObservable = new GroupedObservable(key, group, this); this.destination.next(groupedObservable); if (this.durationSelector) { - let duration: any; + let duration: Observable; try { - duration = this.durationSelector(new GroupedObservable(key, >group)); + duration = from(this.durationSelector(new GroupedObservable(key, >group))); } catch (err) { this.error(err); return; @@ -250,9 +290,7 @@ class GroupBySubscriber extends Subscriber implements RefCountSubscr * @extends {Ignored} */ class GroupDurationSubscriber extends Subscriber { - constructor(private key: K, - group: Subject, - private parent: GroupBySubscriber) { + constructor(private key: K, group: Subject, private parent: GroupBySubscriber) { super(group); this.add(this._teardown); } @@ -267,7 +305,7 @@ class GroupDurationSubscriber extends Subscriber { if (parent) { parent.removeGroup(key); } - } + }; } /** @@ -280,9 +318,7 @@ class GroupDurationSubscriber extends Subscriber { */ export class GroupedObservable extends Observable { /** @deprecated Do not construct this type. Internal use only */ - constructor(public key: K, - private groupSubject: Subject, - private refCountSubscription?: RefCountSubscription) { + constructor(public readonly key: K, private groupSubject: Subject, private refCountSubscription?: RefCountSubscription) { super(); }