Skip to content

Commit

Permalink
Replace deprecated RxJS code (#343)
Browse files Browse the repository at this point in the history
  • Loading branch information
paescuj authored Jun 24, 2023
1 parent 744ece9 commit eadbc6a
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 13 deletions.
5 changes: 3 additions & 2 deletions bin/concurrently.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ const run = (args: string, ctrlcWrapper?: boolean) => {
});

const exit = Rx.firstValueFrom(
Rx.fromEvent<[number | null, NodeJS.Signals | null]>(child, 'exit').pipe(
map((exit) => {
Rx.fromEvent(child, 'exit').pipe(
map((event) => {
const exit = event as [number | null, NodeJS.Signals | null];
return {
/** The exit code if the child exited on its own. */
code: exit[0],
Expand Down
22 changes: 15 additions & 7 deletions src/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,15 @@ export class Command implements CommandInfo {
const highResStartTime = process.hrtime();
this.timer.next({ startDate });

Rx.fromEvent<unknown>(child, 'error').subscribe((event) => {
Rx.fromEvent(child, 'error').subscribe((event) => {
this.process = undefined;
const endDate = new Date(Date.now());
this.timer.next({ startDate, endDate });
this.error.next(event);
});
Rx.fromEvent<[number | null, NodeJS.Signals | null]>(child, 'close').subscribe(
([exitCode, signal]) => {
Rx.fromEvent(child, 'close')
.pipe(Rx.map((event) => event as [number | null, NodeJS.Signals | null]))
.subscribe(([exitCode, signal]) => {
this.process = undefined;
this.exited = true;

Expand All @@ -175,10 +176,17 @@ export class Command implements CommandInfo {
durationSeconds: durationSeconds + durationNanoSeconds / 1e9,
},
});
}
);
child.stdout && pipeTo(Rx.fromEvent<Buffer>(child.stdout, 'data'), this.stdout);
child.stderr && pipeTo(Rx.fromEvent<Buffer>(child.stderr, 'data'), this.stderr);
});
child.stdout &&
pipeTo(
Rx.fromEvent(child.stdout, 'data').pipe(Rx.map((event) => event as Buffer)),
this.stdout
);
child.stderr &&
pipeTo(
Rx.fromEvent(child.stderr, 'data').pipe(Rx.map((event) => event as Buffer)),
this.stderr
);
this.stdin = child.stdin || undefined;
}

Expand Down
2 changes: 1 addition & 1 deletion src/completion-listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export class CompletionListener {
switchMap((exitInfos) =>
this.isSuccess(exitInfos)
? this.emitWithScheduler(Rx.of(exitInfos))
: this.emitWithScheduler(Rx.throwError(exitInfos))
: this.emitWithScheduler(Rx.throwError(() => exitInfos))
),
take(1)
)
Expand Down
13 changes: 10 additions & 3 deletions src/flow-control/restart-process.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as Rx from 'rxjs';
import { defaultIfEmpty, delay, filter, mapTo, skip, take, takeWhile } from 'rxjs/operators';
import { defaultIfEmpty, delay, filter, map, skip, take, takeWhile } from 'rxjs/operators';

import { Command } from '../command';
import * as defaults from '../defaults';
Expand Down Expand Up @@ -49,11 +49,18 @@ export class RestartProcess implements FlowController {
Rx.merge(
// Delay the emission (so that the restarts happen on time),
// explicitly telling the subscriber that a restart is needed
failure.pipe(delay(this.delay, this.scheduler), mapTo(true)),
failure.pipe(
delay(this.delay, this.scheduler),
map(() => true)
),
// Skip the first N emissions (as these would be duplicates of the above),
// meaning it will be empty because of success, or failed all N times,
// and no more restarts should be attempted.
failure.pipe(skip(this.tries), mapTo(false), defaultIfEmpty(false))
failure.pipe(
skip(this.tries),
map(() => false),
defaultIfEmpty(false)
)
).subscribe((restart) => {
const command = commands[index];
if (restart) {
Expand Down

0 comments on commit eadbc6a

Please sign in to comment.