Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions js/gulp/arrow-task.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ const gulp = require('gulp');
const mkdirp = require('mkdirp');
const gulpRename = require(`gulp-rename`);
const { memoizeTask } = require('./memoize-task');
const { Observable, ReplaySubject } = require('rxjs');
const {
ReplaySubject,
forkJoin: ObservableForkJoin,
} = require('rxjs');
const {
share
} = require('rxjs/operators');
const pipeline = require('util').promisify(require('stream').pipeline);

const arrowTask = ((cache) => memoizeTask(cache, function copyMain(target) {
Expand All @@ -38,7 +44,7 @@ const arrowTask = ((cache) => memoizeTask(cache, function copyMain(target) {
const esmSourceMapsGlob = `${targetDir(`esnext`, `esm`)}/**/*.map`;
const es2015UmdSourceMapsGlob = `${targetDir(`es2015`, `umd`)}/*.map`;
const esnextUmdSourceMapsGlob = `${targetDir(`esnext`, `umd`)}/*.map`;
return Observable.forkJoin(
return ObservableForkJoin(
observableFromStreams(gulp.src(dtsGlob), gulp.dest(out)), // copy d.ts files
observableFromStreams(gulp.src(cjsGlob), gulp.dest(out)), // copy esnext cjs files
observableFromStreams(gulp.src(cjsSourceMapsGlob), gulp.dest(out)), // copy esnext cjs sourcemaps
Expand All @@ -48,7 +54,7 @@ const arrowTask = ((cache) => memoizeTask(cache, function copyMain(target) {
observableFromStreams(gulp.src(esmGlob), gulpRename((p) => { p.extname = '.mjs'; }), gulp.dest(out)), // copy esnext esm files and rename to `.mjs`
observableFromStreams(gulp.src(es2015UmdGlob), gulpRename((p) => { p.basename += `.es2015.min`; }), gulp.dest(out)), // copy es2015 umd files and add `.min`
observableFromStreams(gulp.src(esnextUmdGlob), gulpRename((p) => { p.basename += `.esnext.min`; }), gulp.dest(out)), // copy esnext umd files and add `.esnext.min`
).publish(new ReplaySubject()).refCount();
).pipe(share({ connector: () => new ReplaySubject(), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false }));
}))({});

const arrowTSTask = ((cache) => memoizeTask(cache, async function copyTS(target, format) {
Expand Down
12 changes: 8 additions & 4 deletions js/gulp/clean-task.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@
// under the License.

const del = require('del');
const { Observable } = require('rxjs');
const { targetDir } = require('./util');
const memoizeTask = require('./memoize-task');
const { catchError } = require('rxjs/operators');
const {
from: ObservableFrom,
EMPTY: ObservableEmpty,
} = require('rxjs');

const cleanTask = ((cache) => memoizeTask(cache, function clean(target, format) {
const dir = targetDir(target, format);
return Observable.from(del(dir))
.catch((e) => Observable.empty());
return ObservableFrom(del(dir))
.pipe(catchError((e) => ObservableEmpty()));
}))({});

module.exports = cleanTask;
module.exports.cleanTask = cleanTask;
module.exports.cleanTask = cleanTask;
15 changes: 11 additions & 4 deletions js/gulp/package-task.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,27 @@ const {

const gulp = require('gulp');
const { memoizeTask } = require('./memoize-task');
const { Observable, ReplaySubject } = require('rxjs');
const {
ReplaySubject,
EMPTY: ObservableEmpty,
forkJoin: ObservableForkJoin,
} = require('rxjs');
const {
share
} = require('rxjs/operators');
const gulpJsonTransform = require('gulp-json-transform');

const packageTask = ((cache) => memoizeTask(cache, function bundle(target, format) {
if (target === `src`) return Observable.empty();
if (target === `src`) return ObservableEmpty();
const out = targetDir(target, format);
const jsonTransform = gulpJsonTransform(target === npmPkgName ? createMainPackageJson(target, format) :
target === `ts` ? createTypeScriptPackageJson(target, format)
: createScopedPackageJSON(target, format),
2);
return Observable.forkJoin(
return ObservableForkJoin([
observableFromStreams(gulp.src(metadataFiles), gulp.dest(out)), // copy metadata files
observableFromStreams(gulp.src(`package.json`), jsonTransform, gulp.dest(out)) // write packageJSONs
).publish(new ReplaySubject()).refCount();
]).pipe(share({ connector: () => new ReplaySubject(), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false }));
}))({});

module.exports = packageTask;
Expand Down
17 changes: 13 additions & 4 deletions js/gulp/typescript-task.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@ const path = require('path');
const ts = require(`gulp-typescript`);
const sourcemaps = require('gulp-sourcemaps');
const { memoizeTask } = require('./memoize-task');
const { Observable, ReplaySubject } = require('rxjs');
const {
ReplaySubject,
forkJoin: ObservableForkJoin,
} = require('rxjs');
const {
mergeWith,
takeLast,
share
} = require('rxjs/operators');

const typescriptTask = ((cache) => memoizeTask(cache, function typescript(target, format) {
if (shouldRunInChildProcess(target, format)) {
Expand All @@ -38,8 +46,9 @@ const typescriptTask = ((cache) => memoizeTask(cache, function typescript(target
const out = targetDir(target, format);
const tsconfigPath = path.join(`tsconfig`, `tsconfig.${tsconfigName(target, format)}.json`);
return compileTypescript(out, tsconfigPath)
.merge(compileBinFiles(target, format)).takeLast(1)
.publish(new ReplaySubject()).refCount();
.pipe(mergeWith(compileBinFiles(target, format)))
.pipe(takeLast(1))
.pipe(share({ connector: () => new ReplaySubject(), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false }))
}))({});

function compileBinFiles(target, format) {
Expand All @@ -58,7 +67,7 @@ function compileTypescript(out, tsconfigPath, tsconfigOverrides) {
const writeDTypes = observableFromStreams(dts, sourcemaps.write('./', { includeContent: false }), gulp.dest(out));
const mapFile = tsProject.options.module === 5 ? esmMapFile : cjsMapFile;
const writeJS = observableFromStreams(js, sourcemaps.write('./', { mapFile, includeContent: false }), gulp.dest(out));
return Observable.forkJoin(writeSources, writeDTypes, writeJS);
return ObservableForkJoin(writeSources, writeDTypes, writeJS);
}

function cjsMapFile(mapFilePath) { return mapFilePath; }
Expand Down
31 changes: 22 additions & 9 deletions js/gulp/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,19 @@ const path = require(`path`);
const pump = require(`stream`).pipeline;
const child_process = require(`child_process`);
const { targets, modules } = require('./argv');
const { Observable, ReplaySubject } = require('rxjs');
const {
ReplaySubject,
empty: ObservableEmpty,
throwError: ObservableThrow,
fromEvent: ObservableFromEvent
} = require('rxjs');
const {
share,
flatMap,
takeUntil,
defaultIfEmpty,
mergeWith,
} = require('rxjs/operators');
const asyncDone = require('util').promisify(require('async-done'));

const mainExport = `Arrow`;
Expand Down Expand Up @@ -102,16 +114,17 @@ function spawnGulpCommandInChildProcess(command, target, format) {
.catch((e) => { throw `Error in "${command}:${taskName(target, format)}" task`; });
}

const logAndDie = (e) => { if (e) { process.exit(1); } };
const logAndDie = (e) => { if (e) { process.exit(1) } };
function observableFromStreams(...streams) {
if (streams.length <= 0) { return Observable.empty(); }
if (streams.length <= 0) { return ObservableEmpty(); }
const pumped = streams.length <= 1 ? streams[0] : pump(...streams, logAndDie);
const fromEvent = Observable.fromEvent.bind(null, pumped);
const streamObs = fromEvent(`data`)
.merge(fromEvent(`error`).flatMap((e) => Observable.throw(e)))
.takeUntil(fromEvent(`end`).merge(fromEvent(`close`)))
.defaultIfEmpty(`empty stream`)
.multicast(new ReplaySubject()).refCount();
const fromEvent = ObservableFromEvent.bind(null, pumped);
const streamObs = fromEvent(`data`).pipe(
mergeWith(fromEvent(`error`).pipe(flatMap((e) => ObservableThrow(e)))),
takeUntil(fromEvent(`end`).pipe(mergeWith(fromEvent(`close`)))),
defaultIfEmpty(`empty stream`),
share({ connector: () => new ReplaySubject(), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })
);
streamObs.stream = pumped;
streamObs.observable = streamObs;
return streamObs;
Expand Down
13 changes: 8 additions & 5 deletions js/gulpfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

const del = require('del');
const gulp = require('gulp');
const { Observable } = require('rxjs');
const { targets } = require('./gulp/argv');
const {
from: ObservableFrom,
bindNodeCallback: ObservableBindNodeCallback
} = require('rxjs');
const { flatMap } = require('rxjs/operators');
const cleanTask = require('./gulp/clean-task');
const compileTask = require('./gulp/compile-task');
const packageTask = require('./gulp/package-task');
Expand Down Expand Up @@ -83,10 +87,9 @@ gulp.task(`compile`, gulpConcurrent(getTasks(`compile`)));
gulp.task(`package`, gulpConcurrent(getTasks(`package`)));
gulp.task(`default`, gulp.series(`clean`, `build`, `test`));

function gulpConcurrent(tasks) {
const numCPUs = Math.max(1, require('os').cpus().length * 0.75) | 0;
return () => Observable.from(tasks.map((task) => gulp.series(task)))
.flatMap((task) => Observable.bindNodeCallback(task)(), numCPUs);
function gulpConcurrent(tasks, numCPUs = Math.max(1, require('os').cpus().length * 0.5) | 0) {
return () => ObservableFrom(tasks.map((task) => gulp.series(task)))
.pipe(flatMap((task) => ObservableBindNodeCallback(task)(), numCPUs || 1));
}

function getTasks(name) {
Expand Down
8 changes: 4 additions & 4 deletions js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
"benny": "3.6.15",
"cpy": "8.1.2",
"cross-env": "7.0.3",
"del-cli": "3.0.1",
"del-cli": "4.0.0",
"eslint": "7.27.0",
"eslint-plugin-jest": "24.3.6",
"esm": "https://github.com/jsg2021/esm/releases/download/v3.x.x-pr883/esm-3.x.x-pr883.tgz",
Expand All @@ -82,16 +82,16 @@
"gulp-rename": "2.0.0",
"gulp-sourcemaps": "3.0.0",
"gulp-typescript": "5.0.1",
"ix": "2.5.3",
"jest": "27.0.1",
"ix": "4.4.1",
"jest": "27.0.6",
"jest-silent-reporter": "0.5.0",
"lerna": "4.0.0",
"memfs": "3.2.2",
"mkdirp": "1.0.4",
"multistream": "4.1.0",
"npm-run-all": "4.1.5",
"randomatic": "3.1.1",
"rxjs": "5.5.11",
"rxjs": "7.2.0",
"ts-jest": "27.0.3",
"ts-node": "10.0.0",
"typedoc": "0.20.36",
Expand Down
13 changes: 6 additions & 7 deletions js/test/unit/builders/builder-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
// under the License.

import '../../jest-extensions';
import { AsyncIterable } from 'ix';
import { from, fromDOMStream, toArray } from 'ix/asynciterable';
import { fromNodeStream } from 'ix/asynciterable/fromnodestream';
import { validateVector } from './utils';
import * as generate from '../../generate-test-data';
import { Type, DataType, Chunked, util, Builder, UnionVector } from 'apache-arrow';
Expand Down Expand Up @@ -243,11 +244,10 @@ async function encodeChunks<T extends DataType, TNull = any>(values: (T['TValue'

async function encodeChunksDOM<T extends DataType, TNull = any>(values: (T['TValue'] | TNull)[], options: BuilderTransformOptions<T, TNull>) {

const stream = AsyncIterable
.from(values).toDOMStream()
const stream = from(values).toDOMStream()
.pipeThrough(Builder.throughDOM(options));

const chunks = await AsyncIterable.fromDOMStream(stream).toArray();
const chunks = await fromDOMStream(stream).pipe(toArray);

return Chunked.concat(...chunks);
}
Expand All @@ -258,12 +258,11 @@ async function encodeChunksNode<T extends DataType, TNull = any>(values: (T['TVa
options.nullValues = [...options.nullValues, undefined] as TNull[];
}

const stream = AsyncIterable
.from(fillNA(values, [undefined]))
const stream = from(fillNA(values, [undefined]))
.toNodeStream({ objectMode: true })
.pipe(Builder.throughNode(options));

const chunks: any[] = await AsyncIterable.fromNodeStream(stream, options.highWaterMark).toArray();
const chunks: any[] = await fromNodeStream(stream, options.highWaterMark).pipe(toArray);

return Chunked.concat(...chunks);
}
12 changes: 7 additions & 5 deletions js/test/unit/builders/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
// under the License.

import '../../jest-extensions';
import { AsyncIterable } from 'ix';
import { from, fromDOMStream, toArray } from 'ix/asynciterable';
import { fromNodeStream } from 'ix/asynciterable/fromnodestream';
import 'ix/Ix.node';
import { util } from 'apache-arrow';
import { Builder } from 'apache-arrow';
import { DataType, Vector, Chunked } from 'apache-arrow';
Expand Down Expand Up @@ -147,9 +149,9 @@ export function encodeEachDOM<T extends DataType>(typeFactory: () => T, chunkLen
return async function encodeEachDOM<TNull = any>(vals: (T['TValue'] | TNull)[], nullValues?: TNull[]) {
const type = typeFactory();
const strategy = { highWaterMark: chunkLen };
const source = AsyncIterable.from(vals).toDOMStream();
const source = from(vals).toDOMStream();
const builder = Builder.throughDOM({ type, nullValues, readableStrategy: strategy, writableStrategy: strategy });
const chunks = await AsyncIterable.fromDOMStream(source.pipeThrough(builder)).toArray();
const chunks = await fromDOMStream(source.pipeThrough(builder)).pipe(toArray);
return Chunked.concat(...chunks) as Chunked<T>;
};
}
Expand All @@ -158,10 +160,10 @@ export function encodeEachNode<T extends DataType>(typeFactory: () => T, chunkLe
return async function encodeEachNode<TNull = any>(vals: (T['TValue'] | TNull)[], nullValues?: TNull[]) {
const type = typeFactory();
const vals_ = vals.map((x) => x === null ? undefined : x);
const source = AsyncIterable.from(vals_).toNodeStream({ objectMode: true });
const source = from(vals_).toNodeStream({ objectMode: true });
const nulls_ = nullValues ? nullValues.map((x) => x === null ? undefined : x) : nullValues;
const builder = Builder.throughNode({ type, nullValues: nulls_, highWaterMark: chunkLen });
const chunks: any[] = await AsyncIterable.fromNodeStream(source.pipe(builder), chunkLen).toArray();
const chunks: any[] = await fromNodeStream(source.pipe(builder), chunkLen).pipe(toArray);
return Chunked.concat(...chunks) as Chunked<T>;
};
}
Expand Down
13 changes: 7 additions & 6 deletions js/test/unit/ipc/writer/streams-dom-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import {
// generateDictionaryTables
} from '../../../data/tables';

import { AsyncIterable } from 'ix';
import { from, as } from 'ix/asynciterable';
import { tap, flatMap } from 'ix/asynciterable/operators';

import {
Table,
Expand Down Expand Up @@ -232,9 +233,9 @@ import {
it(`should write a stream of tables to the same output stream`, async () => {

const tables = [] as Table[];
const stream = AsyncIterable.from(generateRandomTables([10, 20, 30]))
const stream: ReadableStream<any> = from(generateRandomTables([10, 20, 30]))
// insert some asynchrony
.tap({ async next(table: Table) { tables.push(table); await sleep(1); } })
.pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } }))
.pipeThrough(RecordBatchStreamWriter.throughDOM(opts));

for await (const reader of RecordBatchReader.readAll(stream)) {
Expand All @@ -250,11 +251,11 @@ import {
it(`should write a stream of record batches to the same output stream`, async () => {

const tables = [] as Table[];
const stream = AsyncIterable.from(generateRandomTables([10, 20, 30]))
const stream = from(generateRandomTables([10, 20, 30]))
// insert some asynchrony
.tap({ async next(table: Table) { tables.push(table); await sleep(1); } })
.pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } }))
// flatMap from Table -> RecordBatches[]
.flatMap((table) => AsyncIterable.as(table.chunks))
.pipe(flatMap((table) => as(table.chunks)))
.pipeThrough(RecordBatchStreamWriter.throughDOM(opts));

for await (const reader of RecordBatchReader.readAll(stream)) {
Expand Down
16 changes: 8 additions & 8 deletions js/test/unit/ipc/writer/streams-node-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import {
// generateDictionaryTables
} from '../../../data/tables';

import { AsyncIterable } from 'ix';
import { from, as } from 'ix/asynciterable';
import { tap, flatMap } from 'ix/asynciterable/operators';
import 'ix/Ix.node';

import {
Table,
Expand Down Expand Up @@ -231,10 +233,9 @@ import {

const tables = [] as Table[];
const writer = RecordBatchStreamWriter.throughNode({ autoDestroy: false });
const stream = AsyncIterable
.from(generateRandomTables([10, 20, 30]))
const stream = from(generateRandomTables([10, 20, 30]))
// insert some asynchrony
.tap({ async next(table: Table) { tables.push(table); await sleep(1); } })
.pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } }))
.pipe(writer);

for await (const reader of RecordBatchReader.readAll(stream)) {
Expand All @@ -252,11 +253,10 @@ import {

const tables = [] as Table[];
const writer = RecordBatchStreamWriter.throughNode({ autoDestroy: false });
const stream = AsyncIterable
.from(generateRandomTables([10, 20, 30]))
const stream = from(generateRandomTables([10, 20, 30]))
// insert some asynchrony
.tap({ async next(table: Table) { tables.push(table); await sleep(1); } })
.flatMap((table) => AsyncIterable.as(table.chunks))
.pipe(tap({ async next(table: Table) { tables.push(table); await sleep(1); } }))
.pipe(flatMap((table) => as(table.chunks)))
.pipe(writer);

for await (const reader of RecordBatchReader.readAll(stream)) {
Expand Down
Loading