Skip to content

Commit

Permalink
feat(Abort): Fix most operators
Browse files Browse the repository at this point in the history
  • Loading branch information
mpodwysocki authored and trxcllnt committed Sep 1, 2020
1 parent a66a7c8 commit 08a4c08
Show file tree
Hide file tree
Showing 50 changed files with 298 additions and 200 deletions.
9 changes: 6 additions & 3 deletions src/aborterror.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
export class AbortError extends Error {
constructor() {
super();
constructor(message: string = 'The operation has been aborted') {
super(message);
Object.setPrototypeOf(this, AbortError.prototype);
this.message = 'The operation has been aborted';
Error.captureStackTrace(this, this.constructor);
this.name = 'AbortError';
}

get [Symbol.toStringTag]() { return 'AbortError'; }
}

export function throwIfAborted(signal?: AbortSignal) {
Expand Down
17 changes: 17 additions & 0 deletions src/abortsignal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export interface AbortSignal {
aborted: boolean;

addEventListener: (type: 'abort', listener: ((this: AbortSignal, event: any) => any), options?: boolean | {
capture?: boolean;
once?: boolean;
passive?: boolean;
}) => void;

removeEventListener: (type: 'abort', listener: ((this: AbortSignal, event: any) => any), options?: boolean | {
capture?: boolean;
}) => void;

dispatchEvent: (event: any) => boolean;

onabort?: null | ((this: AbortSignal, event: any) => void);
}
25 changes: 23 additions & 2 deletions src/asynciterable/_sleep.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
export function sleep(dueTime: number) {
return new Promise<void>(res => setTimeout(res, dueTime));
import { AbortError } from '../aborterror';

export function sleep(dueTime: number, signal?: AbortSignal) {
return new Promise<void>((resolve, reject) => {
if (signal?.aborted) {
reject(new AbortError());
}

const id = setTimeout(() => {
if (signal?.aborted) {
reject(new AbortError());
}

resolve();
}, dueTime);

if (signal) {
signal.onabort = () => {
clearTimeout(id);
reject(new AbortError());
};
}
});
}
13 changes: 7 additions & 6 deletions src/asynciterable/operators/_extremaby.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { AsyncIterableX } from '../asynciterablex';
import { wrapWithAbort } from './withabort';

/**
* @ignore
Expand All @@ -13,12 +14,12 @@ export async function defaultCompareAsync<T>(key: T, minValue: T): Promise<numbe
*/
class ExtremaByAsyncIterator<TSource, TKey> extends AsyncIterableX<TSource> {
private _source: AsyncIterable<TSource>;
private _keyFn: (x: TSource) => TKey | Promise<TKey>;
private _keyFn: (x: TSource, signal?: AbortSignal) => TKey | Promise<TKey>;
private _cmp: (x: TKey, y: TKey) => number | Promise<number>;

constructor(
source: AsyncIterable<TSource>,
keyFn: (x: TSource) => TKey | Promise<TKey>,
keyFn: (x: TSource, signal?: AbortSignal) => TKey | Promise<TKey>,
cmp: (x: TKey, y: TKey) => number | Promise<number>
) {
super();
Expand All @@ -27,16 +28,16 @@ class ExtremaByAsyncIterator<TSource, TKey> extends AsyncIterableX<TSource> {
this._cmp = cmp;
}

async *[Symbol.asyncIterator]() {
async *[Symbol.asyncIterator](signal?: AbortSignal) {
let result: TSource[] = [];
let next;
const it = this._source[Symbol.asyncIterator]();
const it = wrapWithAbort(this._source, signal)[Symbol.asyncIterator]();
if ((next = await it.next()).done) {
throw new Error('Sequence contains no elements');
}

const current = next.value;
let resKey = await this._keyFn(current);
let resKey = await this._keyFn(current, signal);
result.push(current);

while (!(next = await it.next()).done) {
Expand All @@ -60,7 +61,7 @@ class ExtremaByAsyncIterator<TSource, TKey> extends AsyncIterableX<TSource> {
*/
export function extremaBy<TSource, TKey>(
source: AsyncIterable<TSource>,
keyFn: (x: TSource) => TKey | Promise<TKey>,
keyFn: (x: TSource, signal?: AbortSignal) => TKey | Promise<TKey>,
cmp: (x: TKey, y: TKey) => number | Promise<number>
): AsyncIterableX<TSource> {
return new ExtremaByAsyncIterator<TSource, TKey>(source, keyFn, cmp);
Expand Down
13 changes: 8 additions & 5 deletions src/asynciterable/operators/_grouping.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import { wrapWithAbort } from './withabort';

/**
* @ignore
*/
export async function createGrouping<TSource, TKey, TValue>(
source: AsyncIterable<TSource>,
keySelector: (value: TSource) => TKey | Promise<TKey>,
elementSelector: (value: TSource) => TValue | Promise<TValue>
keySelector: (value: TSource, signal?: AbortSignal) => TKey | Promise<TKey>,
elementSelector: (value: TSource, signal?: AbortSignal) => TValue | Promise<TValue>,
signal?: AbortSignal
): Promise<Map<TKey, TValue[]>> {
const map = new Map<TKey, TValue[]>();
for await (const item of source) {
const key = await keySelector(item);
for await (const item of wrapWithAbort(source, signal)) {
const key = await keySelector(item, signal);
let grouping = map.get(key);
if (!map.has(key)) {
grouping = [];
map.set(key, grouping);
}
const element = await elementSelector(item);
const element = await elementSelector(item, signal);
grouping!.push(element);
}

Expand Down
5 changes: 3 additions & 2 deletions src/asynciterable/operators/batch.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AsyncIterableX } from '../asynciterablex';
import { OperatorAsyncFunction } from '../../interfaces';
import { wrapWithAbort } from './withabort';

interface AsyncResolver<T> {
resolve: (value?: T | PromiseLike<T> | undefined) => void;
Expand Down Expand Up @@ -32,8 +33,8 @@ class BatchAsyncIterable<TSource> extends AsyncIterableX<TSource[]> {
this._source = source;
}

[Symbol.asyncIterator]() {
const it = this._source[Symbol.asyncIterator]();
[Symbol.asyncIterator](signal?: AbortSignal) {
const it = wrapWithAbort(this._source, signal)[Symbol.asyncIterator]();

let state: State<TSource> = { type: BATCHING_TYPE, values: [] };
let ended: null | Promise<IteratorResult<TSource[]>> = null;
Expand Down
5 changes: 3 additions & 2 deletions src/asynciterable/operators/buffer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AsyncIterableX } from '../asynciterablex';
import { OperatorAsyncFunction } from '../../interfaces';
import { wrapWithAbort } from './withabort';

export class BufferAsyncIterable<TSource> extends AsyncIterableX<TSource[]> {
private _source: AsyncIterable<TSource>;
Expand All @@ -13,10 +14,10 @@ export class BufferAsyncIterable<TSource> extends AsyncIterableX<TSource[]> {
this._skip = skip;
}

async *[Symbol.asyncIterator]() {
async *[Symbol.asyncIterator](signal?: AbortSignal) {
const buffers: TSource[][] = [];
let i = 0;
for await (const item of this._source) {
for await (const item of wrapWithAbort(this._source, signal)) {
if (i % this._skip === 0) {
buffers.push([]);
}
Expand Down
16 changes: 9 additions & 7 deletions src/asynciterable/operators/catcherror.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
import { AsyncIterableX } from '../asynciterablex';
import { OperatorAsyncFunction } from '../../interfaces';
import { returnAsyncIterator } from '../../util/returniterator';
import { wrapWithAbort } from './withabort';

export class CatchWithAsyncIterable<TSource, TResult> extends AsyncIterableX<TSource | TResult> {
private _source: AsyncIterable<TSource>;
private _handler: (error: any) => AsyncIterable<TResult> | Promise<AsyncIterable<TResult>>;
private _handler: (error: any, signal?: AbortSignal) => AsyncIterable<TResult> | Promise<AsyncIterable<TResult>>;

constructor(
source: AsyncIterable<TSource>,
handler: (error: any) => AsyncIterable<TResult> | Promise<AsyncIterable<TResult>>
handler: (error: any, signal?: AbortSignal) => AsyncIterable<TResult> | Promise<AsyncIterable<TResult>>
) {
super();
this._source = source;
this._handler = handler;
}

async *[Symbol.asyncIterator]() {
async *[Symbol.asyncIterator](signal?: AbortSignal) {
let err: AsyncIterable<TResult> | undefined;
let hasError = false;
const it = this._source[Symbol.asyncIterator]();
const source = wrapWithAbort(this._source, signal);
const it = source[Symbol.asyncIterator]();
while (1) {
let c = <IteratorResult<TSource>>{};

Expand All @@ -29,7 +31,7 @@ export class CatchWithAsyncIterable<TSource, TResult> extends AsyncIterableX<TSo
break;
}
} catch (e) {
err = await this._handler(e);
err = await this._handler(e, signal);
hasError = true;
await returnAsyncIterator(it);
break;
Expand All @@ -39,15 +41,15 @@ export class CatchWithAsyncIterable<TSource, TResult> extends AsyncIterableX<TSo
}

if (hasError) {
for await (const item of err!) {
for await (const item of wrapWithAbort(err!, signal)) {
yield item;
}
}
}
}

export function catchError<TSource, TResult>(
handler: (error: any) => AsyncIterable<TResult> | Promise<AsyncIterable<TResult>>
handler: (error: any, signal?: AbortSignal) => AsyncIterable<TResult> | Promise<AsyncIterable<TResult>>
): OperatorAsyncFunction<TSource, TSource | TResult> {
return function catchWithOperatorFunction(
source: AsyncIterable<TSource>
Expand Down
7 changes: 4 additions & 3 deletions src/asynciterable/operators/concatall.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AsyncIterableX } from '../asynciterablex';
import { OperatorAsyncFunction } from '../../interfaces';
import { wrapWithAbort } from './withabort';

export class ConcatAllAsyncIterable<TSource> extends AsyncIterableX<TSource> {
private _source: AsyncIterable<AsyncIterable<TSource>>;
Expand All @@ -9,9 +10,9 @@ export class ConcatAllAsyncIterable<TSource> extends AsyncIterableX<TSource> {
this._source = source;
}

async *[Symbol.asyncIterator]() {
for await (const outer of this._source) {
for await (const item of outer) {
async *[Symbol.asyncIterator](signal?: AbortSignal) {
for await (const outer of wrapWithAbort(this._source, signal)) {
for await (const item of wrapWithAbort(outer, signal)) {
yield item;
}
}
Expand Down
22 changes: 13 additions & 9 deletions src/asynciterable/operators/debounce.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { AsyncIterableX } from '../asynciterablex';
import { MonoTypeOperatorAsyncFunction } from '../../interfaces';
import { wrapWithAbort } from './withabort';

async function forEach<T>(
source: AsyncIterable<T>,
fn: (item: T) => void | Promise<void>
fn: (item: T, signal?: AbortSignal) => void | Promise<void>,
signal?: AbortSignal
): Promise<void> {
for await (const item of source) {
for await (const item of wrapWithAbort(source, signal)) {
await fn(item);
}
}
Expand All @@ -20,7 +22,7 @@ export class DebounceAsyncIterable<TSource> extends AsyncIterableX<TSource> {
this._time = time;
}

async *[Symbol.asyncIterator]() {
async *[Symbol.asyncIterator](signal?: AbortSignal) {
let noValue: boolean;
let lastItem: TSource | undefined;
let deferred: Promise<TSource>;
Expand Down Expand Up @@ -49,12 +51,14 @@ export class DebounceAsyncIterable<TSource> extends AsyncIterableX<TSource> {
};

reset(true);
forEach(this._source, item => {
lastItem = item;
if (noValue) {
run();
}
})
forEach(
this._source,
item => {
lastItem = item;
if (noValue) {
run();
}
}, signal)
.then(() => (done = true))
.catch(err => {
hasError = true;
Expand Down
5 changes: 3 additions & 2 deletions src/asynciterable/operators/defaultifempty.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AsyncIterableX } from '../asynciterablex';
import { MonoTypeOperatorAsyncFunction } from '../../interfaces';
import { wrapWithAbort } from './withabort';

export class DefaultIfEmptyAsyncIterable<TSource> extends AsyncIterableX<TSource> {
private _source: AsyncIterable<TSource>;
Expand All @@ -11,9 +12,9 @@ export class DefaultIfEmptyAsyncIterable<TSource> extends AsyncIterableX<TSource
this._defaultValue = defaultValue;
}

async *[Symbol.asyncIterator]() {
async *[Symbol.asyncIterator](signal?: AbortSignal) {
let state = 1;
for await (const item of this._source) {
for await (const item of wrapWithAbort(this._source, signal)) {
state = 2;
yield item;
}
Expand Down
7 changes: 4 additions & 3 deletions src/asynciterable/operators/delay.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { AsyncIterableX } from '../asynciterablex';
import { MonoTypeOperatorAsyncFunction } from '../../interfaces';
import { sleep } from '../_sleep';
import { wrapWithAbort } from './withabort';

export class DelayAsyncIterable<TSource> extends AsyncIterableX<TSource> {
private _source: AsyncIterable<TSource>;
Expand All @@ -12,9 +13,9 @@ export class DelayAsyncIterable<TSource> extends AsyncIterableX<TSource> {
this._dueTime = dueTime;
}

async *[Symbol.asyncIterator]() {
await sleep(this._dueTime);
for await (const item of this._source) {
async *[Symbol.asyncIterator](signal?: AbortSignal) {
await sleep(this._dueTime, signal);
for await (const item of wrapWithAbort(this._source, signal)) {
yield item;
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/asynciterable/operators/delayeach.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { AsyncIterableX } from '../asynciterablex';
import { MonoTypeOperatorAsyncFunction } from '../../interfaces';
import { sleep } from '../_sleep';
import { wrapWithAbort } from './withabort';

export class DelayEachAsyncIterable<TSource> extends AsyncIterableX<TSource> {
private _source: AsyncIterable<TSource>;
Expand All @@ -12,9 +13,9 @@ export class DelayEachAsyncIterable<TSource> extends AsyncIterableX<TSource> {
this._dueTime = dueTime;
}

async *[Symbol.asyncIterator]() {
for await (const item of this._source) {
await sleep(this._dueTime);
async *[Symbol.asyncIterator](signal?: AbortSignal) {
for await (const item of wrapWithAbort(this._source, signal)) {
await sleep(this._dueTime, signal);
yield item;
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/asynciterable/operators/distinct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import { wrapWithAbort } from './withabort';

export class DistinctAsyncIterable<TSource, TKey> extends AsyncIterableX<TSource> {
private _source: AsyncIterable<TSource>;
private _keySelector: (value: TSource) => TKey | Promise<TKey>;
private _keySelector: (value: TSource, signal?: AbortSignal) => TKey | Promise<TKey>;
private _comparer: (x: TKey, y: TKey) => boolean | Promise<boolean>;

constructor(
source: AsyncIterable<TSource>,
keySelector: (value: TSource) => TKey | Promise<TKey>,
keySelector: (value: TSource, signal?: AbortSignal) => TKey | Promise<TKey>,
comparer: (x: TKey, y: TKey) => boolean | Promise<boolean>
) {
super();
Expand All @@ -25,7 +25,7 @@ export class DistinctAsyncIterable<TSource, TKey> extends AsyncIterableX<TSource
const set = [] as TKey[];

for await (const item of wrapWithAbort(this._source, signal)) {
const key = await this._keySelector(item);
const key = await this._keySelector(item, signal);
if ((await arrayIndexOfAsync(set, key, this._comparer)) === -1) {
set.push(key);
yield item;
Expand All @@ -35,7 +35,7 @@ export class DistinctAsyncIterable<TSource, TKey> extends AsyncIterableX<TSource
}

export function distinct<TSource, TKey>(
keySelector: (value: TSource) => TKey | Promise<TKey> = identityAsync,
keySelector: (value: TSource, signal?: AbortSignal) => TKey | Promise<TKey> = identityAsync,
comparer: (x: TKey, y: TKey) => boolean | Promise<boolean> = comparerAsync
): MonoTypeOperatorAsyncFunction<TSource> {
return function distinctOperatorFunction(
Expand Down
Loading

0 comments on commit 08a4c08

Please sign in to comment.