Skip to content

Commit

Permalink
Packaging cleanup, add tee, pipeTo, pipeThrough DOM Streams methods (#…
Browse files Browse the repository at this point in the history
…267)

* build(packages): create sourcemap files, add package.json entries for esmodules, ship esnext in the

* feat(asynciterable-pipe): add tee, pipeTo, and pipeThrough to AsyncIterable prototype, move node pip
  • Loading branch information
trxcllnt authored Mar 4, 2019
1 parent 2e00c95 commit 873b8fe
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 90 deletions.
30 changes: 16 additions & 14 deletions gulp/copy-main-task.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,25 @@ const { Observable, ReplaySubject } = require('rxjs');

const copyMainTask = ((cache) => memoizeTask(cache, function copyMain(target) {
const out = targetDir(target);
const dtsGlob = `${targetDir(`es2015`, `cjs`)}/**/*.ts`;
const cjsGlob = `${targetDir(`es2015`, `cjs`)}/**/*.js`;
const dtsGlob = `${targetDir(`esnext`, `cjs`)}/**/*.ts`;
const cjsGlob = `${targetDir(`esnext`, `cjs`)}/**/*.js`;
const esmGlob = `${targetDir(`esnext`, `esm`)}/**/*.js`;
const es5UmdGlob = `${targetDir(`es5`, `umd`)}/*.js`;
const es5UmdMaps = `${targetDir(`es5`, `umd`)}/*.map`;
const es2015UmdGlob = `${targetDir(`es2015`, `umd`)}/*.js`;
const es2015UmdMaps = `${targetDir(`es2015`, `umd`)}/*.map`;
const ch_ext = (ext) => gulpRename((p) => { p.extname = ext; });
const append = (ap) => gulpRename((p) => { p.basename += ap; });
const esnextUmdGlob = `${targetDir(`esnext`, `umd`)}/*.js`;
const cjsSourceMapsGlob = `${targetDir(`esnext`, `cjs`)}/**/*.map`;
const esmSourceMapsGlob = `${targetDir(`esnext`, `esm`)}/**/*.map`;
const es5UmdSourceMapsGlob = `${targetDir(`es5`, `umd`)}/*.map`;
const esnextUmdSourceMapsGlob = `${targetDir(`esnext`, `umd`)}/*.map`;
return Observable.forkJoin(
observableFromStreams(gulp.src(dtsGlob), gulp.dest(out)), // copy d.ts files
observableFromStreams(gulp.src(cjsGlob), gulp.dest(out)), // copy es2015 cjs files
observableFromStreams(gulp.src(esmGlob), ch_ext(`.mjs`), gulp.dest(out)), // copy es2015 esm files and rename to `.mjs`
observableFromStreams(gulp.src(es5UmdGlob), append(`.es5.min`), gulp.dest(out)), // copy es5 umd files and add `.min`
observableFromStreams(gulp.src(es5UmdMaps), gulp.dest(out)), // copy es5 umd sourcemap files, but don't rename
observableFromStreams(gulp.src(es2015UmdGlob), append(`.es2015.min`), gulp.dest(out)), // copy es2015 umd files and add `.es2015.min`
observableFromStreams(gulp.src(es2015UmdMaps), gulp.dest(out)), // copy es2015 umd sourcemap files, but don't rename
observableFromStreams(gulp.src(dtsGlob), gulp.dest(out)), // copy d.ts files
observableFromStreams(gulp.src(cjsGlob), gulp.dest(out)), // copy es2015 cjs files
observableFromStreams(gulp.src(cjsSourceMapsGlob), gulp.dest(out)), // copy es2015 cjs sourcemaps
observableFromStreams(gulp.src(esmSourceMapsGlob), gulp.dest(out)), // copy es2015 esm sourcemaps
observableFromStreams(gulp.src(es5UmdSourceMapsGlob), gulp.dest(out)), // copy es5 umd sourcemap files, but don't rename
observableFromStreams(gulp.src(esnextUmdSourceMapsGlob), gulp.dest(out)), // copy es2015 umd sourcemap files, but don't rename
observableFromStreams(gulp.src(esmGlob), gulpRename((p) => { p.extname = '.mjs'; }), gulp.dest(out)), // copy es2015 esm files and rename to `.mjs`
observableFromStreams(gulp.src(es5UmdGlob), gulpRename((p) => { p.basename += `.es5.min`; }), gulp.dest(out)), // copy es5 umd files and add `.min`
observableFromStreams(gulp.src(esnextUmdGlob), gulpRename((p) => { p.basename += `.es2015.min`; }), gulp.dest(out)), // copy es2015 umd files and add `.es2015.min`
).publish(new ReplaySubject()).refCount();
}))({});

Expand Down
61 changes: 44 additions & 17 deletions gulp/package-task.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,16 @@ const createMainPackageJson = (target, format) => (orig) => ({
name: npmPkgName,
main: `${mainExport}.node`,
browser: `${mainExport}.dom`,
module: `${mainExport}.dom.mjs`,
types: `${mainExport}.node.d.ts`,
unpkg: `${mainExport}.dom.es5.min.js`,
[`esm`]: { mode: `all`, sourceMap: true }
esm: { mode: `all`, sourceMap: true }
});

const createTypeScriptPackageJson = (target, format) => (orig) => ({
...createScopedPackageJSON(target, format)(orig),
bin: undefined,
module: undefined,
main: `${mainExport}.node.ts`,
types: `${mainExport}.node.ts`,
browser: `${mainExport}.dom.ts`,
Expand All @@ -66,22 +68,47 @@ const createTypeScriptPackageJson = (target, format) => (orig) => ({
});

const createScopedPackageJSON = (target, format) => (({ name, ...orig }) =>
conditionallyAddStandardESMEntry(target, format)(
packageJSONFields.reduce(
(xs, key) => ({ ...xs, [key]: xs[key] || orig[key] }),
{
name: `${npmOrgName}/${npmPkgName}-${packageName(target, format)}`,
browser: format === 'umd' ? undefined : `${mainExport}.dom`,
main: format === 'umd' ? `${mainExport}.dom` : `${mainExport}.node`,
types: format === 'umd' ? `${mainExport}.d.ts` : `${mainExport}.node.d.ts`,
version: undefined, unpkg: undefined, module: undefined, [`esm`]: undefined,
}
)
packageJSONFields.reduce(
(xs, key) => ({ ...xs, [key]: xs[key] || orig[key] }),
{
// un-set version, since it's automatically applied during the release process
version: undefined,
// set the scoped package name (e.g. "@apache-arrow/esnext-esm")
name: `${npmOrgName}/${npmPkgName}-${packageName(target, format)}`,
// set "unpkg"/"jsdeliver" if building scoped UMD target
unpkg: format === 'umd' ? `${mainExport}.dom.js` : undefined,
jsdelivr: format === 'umd' ? `${mainExport}.dom.js` : undefined,
// set "browser" if building scoped UMD target, otherwise "Arrow.dom"
browser: format === 'umd' ? `${mainExport}.dom.js` : `${mainExport}.dom.js`,
// set "main" to "Arrow" if building scoped UMD target, otherwise "Arrow.node"
main: format === 'umd' ? `${mainExport}.dom` : `${mainExport}.node`,
// set "module" (for https://www.npmjs.com/package/@pika/pack) if building scoped ESM target
module: format === 'esm' ? `${mainExport}.dom.js` : undefined,
// include "esm" settings for https://www.npmjs.com/package/esm if building scoped ESM target
esm: format === `esm` ? { mode: `auto`, sourceMap: true } : undefined,
// set "types" (for TypeScript/VSCode)
types: format === 'umd' ? undefined : `${mainExport}.node.d.ts`,
}
)
);

// const createScopedPackageJSON = (target, format) => (({ name, ...orig }) =>
// conditionallyAddStandardESMEntry(target, format)(
// packageJSONFields.reduce(
// (xs, key) => ({ ...xs, [key]: xs[key] || orig[key] }),
// {
// name: `${npmOrgName}/${npmPkgName}-${packageName(target, format)}`,
// browser: format === 'umd' ? undefined : `${mainExport}.dom`,
// main: format === 'umd' ? `${mainExport}.dom` : `${mainExport}.node`,
// types: format === 'umd' ? `${mainExport}.d.ts` : `${mainExport}.node.d.ts`,
// version: undefined, unpkg: undefined, module: undefined, [`esm`]: undefined,
// }
// )
// )
// );

const conditionallyAddStandardESMEntry = (target, format) => (packageJSON) => (
format !== `esm` && format !== `cls`
? packageJSON
: { ...packageJSON, [`esm`]: { mode: `auto`, sourceMap: true } }
);
// const conditionallyAddStandardESMEntry = (target, format) => (packageJSON) => (
// format !== `esm` && format !== `cls`
// ? packageJSON
// : { ...packageJSON, [`esm`]: { mode: `auto`, sourceMap: true } }
// );
6 changes: 5 additions & 1 deletion gulp/typescript-task.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,13 @@ function compileTypescript(out, tsconfigPath, tsconfigOverrides) {
tsProject(ts.reporter.defaultReporter())
);
const writeDTypes = observableFromStreams(dts, gulp.dest(out));
const writeJS = observableFromStreams(js, sourcemaps.write(), gulp.dest(out));
const mapFile = tsProject.options.module === 5 ? esmMapFile : cjsMapFile;
const writeJS = observableFromStreams(js, sourcemaps.write('./', { mapFile }), gulp.dest(out));
return Observable.forkJoin(writeDTypes, writeJS);
}

function cjsMapFile(mapFilePath) { return mapFilePath; }
function esmMapFile(mapFilePath) { return mapFilePath.replace('.js.map', '.mjs.map'); }

module.exports = typescriptTask;
module.exports.typescriptTask = typescriptTask;
9 changes: 5 additions & 4 deletions gulp/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,19 @@ function shouldRunInChildProcess(target, format) {

const gulp = path.join(path.parse(require.resolve(`gulp`)).dir, `bin/gulp.js`);
function spawnGulpCommandInChildProcess(command, target, format) {
const args = [gulp, command, '-t', target, '-m', format];
const args = [gulp, command, '-t', target, '-m', format, `--silent`];
const opts = {
stdio: [`ignore`, `inherit`, `inherit`],
env: { ...process.env, NODE_NO_WARNINGS: `1` }
};
return asyncDone(() => child_process.spawn(`node`, args, opts));
return asyncDone(() => child_process.spawn(`node`, args, opts))
.catch((e) => { throw `Error in "${command}:${taskName(target, format)}" task`; });
}


const logAndDie = (e) => { if (e) { process.exit(1) } };
function observableFromStreams(...streams) {
if (streams.length <= 0) { return Observable.empty(); }
const pumped = streams.length <= 1 ? streams[0] : pump(...streams, (e) => { if (e) { console.error(e); process.exit(1) }});
const pumped = streams.length <= 1 ? streams[0] : pump(...streams, logAndDie);
const fromEvent = Observable.fromEvent.bind(null, pumped);
const streamObs = fromEvent(`data`)
.merge(fromEvent(`error`).flatMap((e) => Observable.throw(e)))
Expand Down
8 changes: 4 additions & 4 deletions gulpfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ gulp.task(`build:${npmPkgName}`,
gulp.series(
gulp.parallel(
`build:${taskName(`es5`, `umd`)}`,
`build:${taskName(`es2015`, `cjs`)}`,
`build:${taskName(`es2015`, `esm`)}`,
`build:${taskName(`es2015`, `umd`)}`
`build:${taskName(`esnext`, `cjs`)}`,
`build:${taskName(`esnext`, `esm`)}`,
`build:${taskName(`esnext`, `umd`)}`
),
`clean:${npmPkgName}`,
`compile:${npmPkgName}`,
Expand All @@ -81,7 +81,7 @@ gulp.task(`compile`, gulpConcurrent(getTasks(`compile`)));
gulp.task(`package`, gulpConcurrent(getTasks(`package`)));
gulp.task(`default`, gulp.series(`clean`, `build`, `test`));

function gulpConcurrent(tasks, numCPUs = require('os').cpus().length) {
function gulpConcurrent(tasks, numCPUs = Math.max(1, require('os').cpus().length * 0.5) | 0) {
return () => Observable.from(tasks.map((task) => gulp.series(task)))
.flatMap((task) => Observable.bindNodeCallback(task)(), numCPUs || 1);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Ix.dom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export { AsyncSink, Iterable, AsyncIterable } from './Ix';

import './add/asynciterable/fromdomstream';
import './add/asynciterable-operators/todomstream';
export { toDOMStream } from './asynciterable/todomstream';
export { ReadableBYOBStreamOptions, ReadableByteStreamOptions } from './asynciterable/todomstream';
export {
fromDOMStream,
AsyncIterableReadableStream,
Expand Down
33 changes: 0 additions & 33 deletions src/Ix.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,3 @@ import './add/asynciterable-operators/tonodestream';
export { IterableReadable } from './iterable/tonodestream';
export { AsyncIterableReadable } from './asynciterable/tonodestream';
export { fromNodeStream, ReadableStreamAsyncIterable } from './asynciterable/fromnodestream';

import { OperatorAsyncFunction } from './interfaces';
import { AsyncIterableX } from './asynciterable/asynciterablex';

import { toNodeStream } from './asynciterable/tonodestream';
import { isReadableNodeStream, isWritableNodeStream } from './internal/isiterable';

AsyncIterableX.prototype.pipe = nodePipe;

const as = AsyncIterableX.as;
const readableOpts = (x: any, opts = x._writableState || { objectMode: true }) => opts;
type WritableOrOperatorAsyncFunction<T, R> =
| NodeJS.WritableStream
| NodeJS.ReadWriteStream
| OperatorAsyncFunction<T, R>;

function nodePipe<T>(this: AsyncIterableX<T>, ...args: any[]) {
let i = -1;
let n = args.length;
let prev: any = this;
let next: WritableOrOperatorAsyncFunction<T, any>;
while (++i < n) {
next = args[i];
if (typeof next === 'function') {
prev = as(next(prev));
} else if (isWritableNodeStream(next)) {
// prettier-ignore
return isReadableNodeStream(prev) ? prev.pipe(next) :
toNodeStream(prev, readableOpts(next)).pipe(next);
}
}
return prev;
}
69 changes: 67 additions & 2 deletions src/asynciterable/asynciterablex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@ import { OperatorAsyncFunction } from '../interfaces';
import { bindCallback } from '../internal/bindcallback';
import { identityAsync } from '../internal/identity';
import { toLength } from '../internal/tolength';
import { isArrayLike, isIterable, isAsyncIterable } from '../internal/isiterable';
import { Observable } from '../observer';
import {
isArrayLike,
isIterable,
isAsyncIterable,
isReadableNodeStream,
isWritableNodeStream
} from '../internal/isiterable';

/**
* This class serves as the base for all operations which support [Symbol.asyncIterator].
Expand Down Expand Up @@ -34,6 +40,26 @@ export abstract class AsyncIterableX<T> implements AsyncIterable<T> {
return acc as AsyncIterableX<R>;
}

tee(): [ReadableStream<T>, ReadableStream<T>] {
return this._getDOMStream().tee();
}

pipeTo(writable: WritableStream<T>, options?: PipeOptions) {
return this._getDOMStream().pipeTo(writable, options);
}

pipeThrough<R extends ReadableStream<any>>(
duplex: { writable: WritableStream<T>; readable: R },
options?: PipeOptions
) {
return this._getDOMStream().pipeThrough(duplex, options);
}

private _DOMStream?: ReadableStream<T>;
private _getDOMStream(): ReadableStream<T> {
return this._DOMStream || (this._DOMStream = this.publish().toDOMStream());
}

static as(source: string): AsyncIterableX<string>;
static as<T extends AsyncIterableX<any>>(source: T): T;
static as<T>(source: AsyncIterableInput<T>): AsyncIterableX<T>;
Expand Down Expand Up @@ -224,11 +250,16 @@ class OfAsyncIterable<TSource> extends AsyncIterableX<TSource> {
}
}

type WritableOrOperatorAsyncFunction<T, R> =
| NodeJS.WritableStream
| NodeJS.ReadWriteStream
| OperatorAsyncFunction<T, R>;

declare module '../asynciterable/asynciterablex' {
interface AsyncIterableX<T> {
pipe(): AsyncIterableX<T>;
pipe<A>(op1: OperatorAsyncFunction<T, A>): AsyncIterableX<A>;
pipe<A extends NodeJS.WritableStream>(op1: A): A;
pipe<A extends NodeJS.WritableStream>(op1: A, options?: any /* { end?: boolean; } */): A;
pipe<A, B>(
op1: OperatorAsyncFunction<T, A>,
op2: OperatorAsyncFunction<A, B>
Expand Down Expand Up @@ -292,3 +323,37 @@ declare module '../asynciterable/asynciterablex' {
pipe(...operations: OperatorAsyncFunction<any, any>[]): AsyncIterableX<{}>;
}
}

try {
(isBrowser => {
if (isBrowser) {
return;
}

const as = AsyncIterableX.as;
AsyncIterableX.prototype.pipe = nodePipe;
const readableOpts = (x: any, opts = x._writableState || { objectMode: true }) => opts;

function nodePipe<T>(this: AsyncIterableX<T>, ...args: any[]) {
let i = -1;
let end: boolean;
let n = args.length;
let prev: any = this;
let next: WritableOrOperatorAsyncFunction<T, any>;
while (++i < n) {
next = args[i];
if (typeof next === 'function') {
prev = as(next(prev));
} else if (isWritableNodeStream(next)) {
({ end = true } = args[i + 1] || {});
// prettier-ignore
return isReadableNodeStream(prev) ? prev.pipe(next, {end}) :
prev.toNodeStream(readableOpts(next)).pipe(next, {end});
}
}
return prev;
}
})(typeof window === 'object' && typeof document === 'object' && document.nodeType === 9);
} catch (e) {
/* */
}
30 changes: 17 additions & 13 deletions src/asynciterable/fromdomstream.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { AsyncIterableX } from './asynciterablex';

// To work around circular-dependency hell, these need to be on
// the AsyncIterable prototype for tee, pipeTo, and pipeThrough
import '../add/asynciterable-operators/publish';
import '../add/asynciterable-operators/todomstream';

/** @ignore */
const SharedArrayBuf = typeof SharedArrayBuffer !== 'undefined' ? SharedArrayBuffer : ArrayBuffer;
/** @ignore */
const pump = <T extends Iterator<any> | AsyncIterator<any>>(iterator: T) => {
iterator.next();
return iterator;
};

export class AsyncIterableReadableStream<T> extends AsyncIterableX<T> {
constructor(protected _stream: ReadableStream<T>) {
Expand All @@ -21,13 +21,17 @@ export class AsyncIterableReadableStream<T> extends AsyncIterableX<T> {

export class AsyncIterableReadableByteStream extends AsyncIterableReadableStream<Uint8Array> {
[Symbol.asyncIterator]() {
let stream = this._stream;
let reader: ReadableStreamBYOBReader;
try {
const stream = this._stream;
const reader = stream['getReader']({ mode: 'byob' });
return pump(_consumeReader(stream, reader, byobReaderToAsyncIterator(reader)));
reader = stream['getReader']({ mode: 'byob' });
} catch (e) {
return super[Symbol.asyncIterator]() as AsyncIterableIterator<Uint8Array>;
}
const iterator = _consumeReader(stream, reader, byobReaderToAsyncIterator(reader));
// "pump" the iterator once so it initializes and is ready to accept a buffer or bytesToRead
iterator.next();
return iterator;
}
}

Expand Down Expand Up @@ -73,11 +77,11 @@ async function* defaultReaderToAsyncIterator<T = any>(reader: ReadableStreamDefa

/** @ignore */
async function* byobReaderToAsyncIterator(reader: ReadableStreamBYOBReader) {
let bufferOrByteLength: number | ArrayBufferLike;
let r: null | IteratorResult<Uint8Array> = null;
do {
bufferOrByteLength = yield r ? r.value : null!;
} while (!(r = await readNext(reader, bufferOrByteLength, 0)).done);
let r: IteratorResult<Uint8Array>;
let value: number | ArrayBufferLike = yield null!;
while (!(r = await readNext(reader, value, 0)).done) {
value = yield r.value;
}
}

/** @ignore */
Expand Down
2 changes: 1 addition & 1 deletion src/asynciterable/todomstream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class UnderlyingAsyncIterableByteSource<TSource extends ArrayBufferView = Uint8A
// Did the source write into the BYOB view itself,
// then yield us the `bytesWritten` value? If so,
// pass that along
if (typeof value === 'number' && value > 0) {
if (typeof value === 'number') {
return controller.byobRequest.respond(value);
}
// otherwise if the source is only producing buffers
Expand Down

0 comments on commit 873b8fe

Please sign in to comment.