Skip to content

Commit

Permalink
refactor(Subscription): Get rid of _unsubscribe weirdness (#5670)
Browse files Browse the repository at this point in the history
* refactor(Subscription): Get rid of _unsubscribe weirdness

This is just another step toward getting what we're doing with inheritance to be a little more sane, ultimately, this may be refactored further, as I am not happy with some of these subscribers adding their own teardowns, but this is a start. The goal was to get rid of the weirdness around `_unsubscribe` in our code base.

- Moves to calling `_unsubscribe` a new internal name in `Subscription`: `initialTeardown`, because it's slightly more descriptive
- Removes `_unsubscribe` usage from the entire codebase.
- Formatted a few files just for my sanity.

* chore: address comments
  • Loading branch information
benlesh authored Aug 25, 2020
1 parent dfdef5d commit fd1e039
Show file tree
Hide file tree
Showing 13 changed files with 35 additions and 51 deletions.
2 changes: 1 addition & 1 deletion api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ export declare class Subscriber<T> extends Subscription implements Observer<T> {

export declare class Subscription implements SubscriptionLike {
closed: boolean;
constructor(unsubscribe?: () => void);
constructor(initialTeardown?: (() => void) | undefined);
add(teardown: TeardownLogic): void;
remove(teardown: Exclude<TeardownLogic, void>): void;
unsubscribe(): void;
Expand Down
4 changes: 2 additions & 2 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ export class SafeSubscriber<T> extends Subscriber<T> {
error?: ((e?: any) => void) | null,
complete?: (() => void) | null) {
super();
this.add(this._teardown);

let next: ((value: T) => void) | undefined;

Expand Down Expand Up @@ -281,8 +282,7 @@ export class SafeSubscriber<T> extends Subscriber<T> {
return false;
}

/** @internal This is an internal implementation detail, do not use. */
_unsubscribe(): void {
private _teardown = () => {
const { _parentSubscriber } = this;
this._parentSubscriber = null!;
_parentSubscriber.unsubscribe();
Expand Down
29 changes: 6 additions & 23 deletions src/internal/Subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,10 @@ export class Subscription implements SubscriptionLike {
private _teardowns: Exclude<TeardownLogic, void>[] | null = null;

/**
* @param {function(): void} [unsubscribe] A function describing how to
* perform the disposal of resources when the `unsubscribe` method is called.
* @param initialTeardown A function executed first as part of the teardown
* process that is kicked off when {@link unsubscribe} is called.
*/
constructor(unsubscribe?: () => void) {
if (unsubscribe) {
(this as any)._ctorUnsubscribe = true;
(this as any)._unsubscribe = unsubscribe;
}
}
constructor(private initialTeardown?: () => void) {}

/**
* Disposes the resources held by the subscription. May, for instance, cancel
Expand Down Expand Up @@ -76,22 +71,10 @@ export class Subscription implements SubscriptionLike {
}
}

const _unsubscribe = (this as any)._unsubscribe;
if (isFunction(_unsubscribe)) {
// It's only possible to null _unsubscribe - to release the reference to
// any teardown function passed in the constructor - if the property was
// actually assigned in the constructor, as there are some classes that
// are derived from Subscriber (which derives from Subscription) that
// implement an _unsubscribe method as a mechanism for obtaining
// unsubscription notifications and some of those subscribers are
// recycled. Also, in some of those subscribers, _unsubscribe switches
// from a prototype method to an instance property - see notifyNext in
// RetryWhenSubscriber.
if ((this as any)._ctorUnsubscribe) {
(this as any)._unsubscribe = undefined;
}
const { initialTeardown } = this;
if (isFunction(initialTeardown)) {
try {
_unsubscribe.call(this);
initialTeardown();
} catch (e) {
errors = e instanceof UnsubscriptionError ? e.errors : [e];
}
Expand Down
12 changes: 7 additions & 5 deletions src/internal/observable/ConnectableObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,19 @@ class ConnectableSubscriber<T> extends Subscriber<T> {
constructor(protected destination: Subject<T>,
private connectable: ConnectableObservable<T>) {
super();
this.add(this._teardown);
}
protected _error(err: any): void {
this._unsubscribe();
this._teardown();
super._error(err);
}
protected _complete(): void {
this.connectable._isComplete = true;
this._unsubscribe();
this._teardown();
super._complete();
}
protected _unsubscribe() {
const connectable = <any>this.connectable;
private _teardown = () => {
const connectable = this.connectable as any;
if (connectable) {
this.connectable = null!;
const connection = connectable._connection;
Expand Down Expand Up @@ -125,9 +126,10 @@ class RefCountSubscriber<T> extends Subscriber<T> {
constructor(destination: Subscriber<T>,
private connectable: ConnectableObservable<T>) {
super(destination);
this.add(this._teardown);
}

protected _unsubscribe() {
private _teardown = () => {

const { connectable } = this;
if (!connectable) {
Expand Down
4 changes: 2 additions & 2 deletions src/internal/operators/bufferTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ class BufferTimeSubscriber<T> extends Subscriber<T> {
private maxBufferSize: number,
private scheduler: SchedulerLike) {
super(destination);
this.add(this._teardown);
const context = this.openContext();
this.timespanOnly = bufferCreationInterval == null || bufferCreationInterval < 0;
if (this.timespanOnly) {
Expand Down Expand Up @@ -184,8 +185,7 @@ class BufferTimeSubscriber<T> extends Subscriber<T> {
super._complete();
}

/** @deprecated This is an internal implementation detail, do not use. */
_unsubscribe() {
private _teardown = () => {
this.contexts = null!;
}

Expand Down
4 changes: 2 additions & 2 deletions src/internal/operators/bufferWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class BufferWhenSubscriber<T> extends SimpleOuterSubscriber<T, any> {

constructor(destination: Subscriber<T[]>, private closingSelector: () => Observable<any>) {
super(destination);
this.add(this._teardown);
this.openBuffer();
}

Expand All @@ -90,8 +91,7 @@ class BufferWhenSubscriber<T> extends SimpleOuterSubscriber<T, any> {
super._complete();
}

/** @deprecated This is an internal implementation detail, do not use. */
_unsubscribe() {
private _teardown = () => {
this.buffer = null!;
this.subscribing = false;
}
Expand Down
8 changes: 4 additions & 4 deletions src/internal/operators/groupBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,17 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscr
*/
class GroupDurationSubscriber<K, T> extends Subscriber<T> {
constructor(private key: K,
private group: Subject<T>,
group: Subject<T>,
private parent: GroupBySubscriber<any, K, T | any>) {
super(group);
this.add(this._teardown);
}

protected _next(value: T): void {
protected _next(): void {
this.complete();
}

/** @deprecated This is an internal implementation detail, do not use. */
_unsubscribe() {
private _teardown = () => {
const { parent, key } = this;
this.key = this.parent = null!;
if (parent) {
Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/refCount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ class RefCountSubscriber<T> extends Subscriber<T> {
constructor(destination: Subscriber<T>,
private connectable: ConnectableObservable<T>) {
super(destination);
this.add(this._teardown);
}

protected _unsubscribe() {
private _teardown = () => {

const { connectable } = this;
if (!connectable) {
Expand Down
5 changes: 1 addition & 4 deletions src/internal/operators/switchMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,8 @@ class SwitchMapSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
if (!innerSubscription || innerSubscription.closed) {
super._complete();
}
this.unsubscribe();
}

protected _unsubscribe() {
this.innerSubscription = undefined;
this.unsubscribe();
}

notifyComplete(): void {
Expand Down
4 changes: 2 additions & 2 deletions src/internal/operators/window.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class WindowSubscriber<T> extends SimpleOuterSubscriber<T, any> {

constructor(destination: Subscriber<Observable<T>>) {
super(destination);
this.add(this._teardown);
destination.next(this.window);
}

Expand Down Expand Up @@ -109,8 +110,7 @@ class WindowSubscriber<T> extends SimpleOuterSubscriber<T, any> {
this.destination.complete();
}

/** @deprecated This is an internal implementation detail, do not use. */
_unsubscribe() {
private _teardown = () => {
this.window = null!;
}

Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/windowCount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class WindowCountSubscriber<T> extends Subscriber<T> {
private windowSize: number,
private startWindowEvery: number) {
super(destination);
this.add(this._teardown);
destination.next(this.windows[0]);
}

Expand Down Expand Up @@ -142,7 +143,7 @@ class WindowCountSubscriber<T> extends Subscriber<T> {
this.destination.complete();
}

protected _unsubscribe() {
private _teardown = () => {
this.count = 0;
this.windows = null!;
}
Expand Down
4 changes: 2 additions & 2 deletions src/internal/operators/windowToggle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class WindowToggleSubscriber<T, O> extends ComplexOuterSubscriber<T, any> {
private openings: Observable<O>,
private closingSelector: (openValue: O) => Observable<any>) {
super(destination);
this.add(this._teardown);
this.add(this.openSubscription = innerSubscribe(openings, new ComplexInnerSubscriber(this, openings, 0)));
}

Expand Down Expand Up @@ -137,8 +138,7 @@ class WindowToggleSubscriber<T, O> extends ComplexOuterSubscriber<T, any> {
super._complete();
}

/** @deprecated This is an internal implementation detail, do not use. */
_unsubscribe() {
private _teardown = () => {
const { contexts } = this;
this.contexts = null!;
if (contexts) {
Expand Down
4 changes: 2 additions & 2 deletions src/internal/scheduler/AsyncAction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class AsyncAction<T> extends Action<T> {
constructor(protected scheduler: AsyncScheduler,
protected work: (this: SchedulerAction<T>, state?: T) => void) {
super(scheduler, work);
this.add(this._teardown);
}

public schedule(state?: T, delay: number = 0): Subscription {
Expand Down Expand Up @@ -132,8 +133,7 @@ export class AsyncAction<T> extends Action<T> {
}
}

/** @deprecated This is an internal implementation detail, do not use. */
_unsubscribe() {
private _teardown = () => {

const id = this.id;
const scheduler = this.scheduler;
Expand Down

0 comments on commit fd1e039

Please sign in to comment.