Skip to content

Commit

Permalink
fix: don't subscribe if AbortSignal is already aborted
Browse files Browse the repository at this point in the history
  • Loading branch information
felixfbecker committed Aug 30, 2018
1 parent 2d67890 commit 1238576
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 6 deletions.
32 changes: 32 additions & 0 deletions src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,21 @@ describe('Observable consumers', () => {
assert.propertyVal(err, 'name', 'AbortError')
}
})
it('should never subscribe to the Observable if the AbortSignal is already aborted', async () => {
const subscribe = sinon.spy()
const obs = new Observable<number>(subscribe)
const abortController = new AbortController()
abortController.abort()
const promise = toPromise(obs, abortController.signal)
sinon.assert.notCalled(subscribe)
try {
await promise
throw new AssertionError({ message: 'Expected Promise to be rejected' })
} catch (err) {
assert.instanceOf(err, Error)
assert.propertyVal(err, 'name', 'AbortError')
}
})
it('should resolve with the last value emitted', async () => {
const obs = of(1, 2, 3)
const abortController = new AbortController()
Expand Down Expand Up @@ -118,6 +133,23 @@ describe('Observable consumers', () => {
assert.propertyVal(err, 'name', 'AbortError')
}
})
it('should never subscribe to the Observable when the AbortSignal is already aborted', async () => {
const subscribe = sinon.spy()
const obs = new Observable<number>(subscribe)
const abortController = new AbortController()
abortController.abort()
const onnext = sinon.spy()
const promise = forEach(obs, onnext, abortController.signal)
sinon.assert.notCalled(subscribe)
sinon.assert.notCalled(onnext)
try {
await promise
throw new AssertionError({ message: 'Expected Promise to be rejected' })
} catch (err) {
assert.instanceOf(err, Error)
assert.propertyVal(err, 'name', 'AbortError')
}
})
it('should resolve the Promise when the Observable completes', async () => {
const obs = of(1, 2, 3)
const abortController = new AbortController()
Expand Down
22 changes: 16 additions & 6 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { from, Observable, ObservableInput, OperatorFunction, Subscriber, Subscription, TeardownLogic } from 'rxjs'
import { concatMap as rxConcatMap, mergeMap as rxMergeMap, switchMap as rxSwitchMap } from 'rxjs/operators'

const createAbortError = () => {
const error = new Error('Aborted')
error.name = 'AbortError'
return error
}

/**
* Creates an Observable just like RxJS `create`, but exposes an AbortSignal in addition to the subscriber
*/
Expand Down Expand Up @@ -29,6 +35,10 @@ export const defer = <T>(factory: (signal: AbortSignal) => ObservableInput<T>):
*/
export const toPromise = <T>(observable: Observable<T>, signal?: AbortSignal): Promise<T> =>
new Promise((resolve, reject) => {
if (signal && signal.aborted) {
reject(createAbortError())
return
}
let value: T
const subscription = observable.subscribe(
val => {
Expand All @@ -42,9 +52,7 @@ export const toPromise = <T>(observable: Observable<T>, signal?: AbortSignal): P
if (signal) {
signal.addEventListener('abort', () => {
subscription.unsubscribe()
const error = new Error('Aborted')
error.name = 'AbortError'
reject(error)
reject(createAbortError())
})
}
})
Expand All @@ -55,6 +63,10 @@ export const toPromise = <T>(observable: Observable<T>, signal?: AbortSignal): P
*/
export const forEach = <T>(source: Observable<T>, next: (value: T) => void, signal?: AbortSignal): Promise<void> =>
new Promise<void>((resolve, reject) => {
if (signal && signal.aborted) {
reject(createAbortError())
return
}
// Must be declared in a separate statement to avoid a RefernceError when
// accessing subscription below in the closure due to Temporal Dead Zone.
let subscription: Subscription
Expand All @@ -75,9 +87,7 @@ export const forEach = <T>(source: Observable<T>, next: (value: T) => void, sign
if (signal) {
signal.addEventListener('abort', () => {
subscription.unsubscribe()
const error = new Error('Aborted')
error.name = 'AbortError'
reject(error)
reject(createAbortError())
})
}
})
Expand Down

0 comments on commit 1238576

Please sign in to comment.