Skip to content

Commit

Permalink
feat: command executor
Browse files Browse the repository at this point in the history
  • Loading branch information
lishaduck committed Nov 18, 2024
1 parent 54dc4bf commit 0c6f141
Show file tree
Hide file tree
Showing 5 changed files with 361 additions and 2 deletions.
1 change: 1 addition & 0 deletions biome.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"useNamingConvention": "off"
},
"complexity": {
"noVoid": "off",
"useLiteralKeys": "off"
},
"correctness": {
Expand Down
261 changes: 261 additions & 0 deletions packages/platform-deno/src/DenoCommandExecutor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
import { Command, CommandExecutor, FileSystem } from "@effect/platform";
import type { PlatformError } from "@effect/platform/Error";
import {
Deferred,
Effect,
Inspectable,
Layer,
Option,
type Scope,
Sink,
Stream,
} from "effect";
import { constUndefined, identity, pipe } from "effect/Function";
import { handleErrnoException } from "./internal/error.ts";
import { fromWritable } from "./internal/sink.ts";

const inputToStdioOption = (
stdin: Command.Command.Input,
): "pipe" | "inherit" => (typeof stdin === "string" ? stdin : "pipe");

const outputToStdioOption = (
output: Command.Command.Output,
): "pipe" | "inherit" => (typeof output === "string" ? output : "pipe");

const toError = (err: unknown): Error =>
err instanceof globalThis.Error ? err : new globalThis.Error(String(err));

const toPlatformError = (
method: string,
error: Error,
command: Command.Command,
): PlatformError => {
const flattened = Command.flatten(command).reduce((acc, curr) => {
const command = `${curr.command} ${curr.args.join(" ")}`;
return acc.length === 0 ? command : `${acc} | ${command}`;
}, "");
return handleErrnoException("Command", method)(error, [flattened]);
};

type ExitCode = readonly [code: number | null, signal: Deno.Signal | null];
type ExitCodeDeferred = Deferred.Deferred<ExitCode>;

const ProcessProto = {
[CommandExecutor.ProcessTypeId]: CommandExecutor.ProcessTypeId,
...Inspectable.BaseProto,
toJSON(this: CommandExecutor.Process): object {
return {
_id: Symbol.keyFor(CommandExecutor.ProcessTypeId),
pid: this.pid,
};
},
};

const runCommand =
(fileSystem: FileSystem.FileSystem) =>
(
command: Command.Command,
): Effect.Effect<CommandExecutor.Process, PlatformError, Scope.Scope> => {
switch (command._tag) {
case "StandardCommand": {
const spawn = Effect.flatMap(Deferred.make<ExitCode>(), (exitCode) =>
Effect.try<
readonly [Deno.ChildProcess, ExitCodeDeferred],
PlatformError
>({
try: (): readonly [Deno.ChildProcess, ExitCodeDeferred] => {
const comm = new Deno.Command(command.command, {
// TODO: PR Deno to make args as immutable.
// @ts-expect-error: args is mutable, command.args is immutable.
args: command.args,
stdio: [
inputToStdioOption(command.stdin),
outputToStdioOption(command.stdout),
outputToStdioOption(command.stderr),
],
cwd: Option.getOrElse(command.cwd, constUndefined),
env: {
...Deno.env.toObject(),
...Object.fromEntries(command.env),
},
});
const handle = comm.spawn();

void handle.status.then((status) => {
Deferred.unsafeDone(
exitCode,
Effect.succeed<ExitCode>([status.code, status.signal]),
);
});

return [handle, exitCode];
},
catch: (err): PlatformError =>
toPlatformError("spawn", err as Error, command),
}),
);
return pipe(
// Validate that the directory is accessible
Option.match(command.cwd, {
onNone: (): Effect.Effect<void, never> => Effect.void,
onSome: (dir): Effect.Effect<void, PlatformError> =>
fileSystem.access(dir),
}),
Effect.zipRight(
Effect.acquireRelease(spawn, ([handle, exitCode]) =>
Effect.flatMap(Deferred.isDone(exitCode), (done) =>
done
? Effect.void
: Effect.suspend(() => {
handle.kill("SIGTERM");
return Deferred.await(exitCode);
}),
),
),
),
Effect.map(([handle, exitCodeDeferred]): CommandExecutor.Process => {
let stdin: Sink.Sink<void, unknown, never, PlatformError> =
Sink.drain;

if (handle.stdin !== null) {
stdin = fromWritable(
// biome-ignore lint/style/noNonNullAssertion: If we've gotten this far, the process was spawned successfully.
() => handle.stdin!,
(err: unknown) =>
toPlatformError("toWritable", toError(err), command),
);
}

const exitCode: CommandExecutor.Process["exitCode"] =
Effect.flatMap(
Deferred.await(exitCodeDeferred),
([code, signal]) => {
if (code !== null) {
return Effect.succeed(CommandExecutor.ExitCode(code));
}
// If code is `null`, then `signal` must be defined. See the NodeJS
// documentation for the `"exit"` event on a `child_process`.
// https://nodejs.org/api/child_process.html#child_process_event_exit
return Effect.fail(
toPlatformError(
"exitCode",
new globalThis.Error(
`Process interrupted due to receipt of signal: ${signal}`,
),
command,
),
);
},
);

const isRunning = Effect.negate(Deferred.isDone(exitCodeDeferred));

const kill: CommandExecutor.Process["kill"] = (
signal = "SIGTERM",
) =>
Effect.suspend(() => {
handle.kill(
// Deno's Signal type is slightly different.
// They support `SIGEMT`, but don't support `SIGIOT` or `SIGLOST`.
// Presumably, there's no runtime validation, so it should be fine.
signal as Deno.Signal,
);
return Effect.asVoid(Deferred.await(exitCodeDeferred));
});

// biome-ignore lint/style/noNonNullAssertion: "If the child process fails to spawn due to errors, then the value is undefined and error is emitted." We do handle this case above.
const pid = CommandExecutor.ProcessId(handle.pid!);
const stderr = Stream.fromReadableStream<Uint8Array, PlatformError>(
// biome-ignore lint/style/noNonNullAssertion: "The subprocess.stdout property can be null or undefined if the child process could not be successfully spawned." We do handle this case above.
() => handle.stderr!,
(err: unknown) =>
toPlatformError(
"fromReadableStream(stderr)",
toError(err),
command,
),
);
let stdout: Stream.Stream<Uint8Array, PlatformError> =
Stream.fromReadableStream<Uint8Array, PlatformError>(
// biome-ignore lint/style/noNonNullAssertion: "The subprocess.stdout property can be null or undefined if the child process could not be successfully spawned." We do handle this case above.
() => handle.stdout!,
(err: unknown) =>
toPlatformError(
"fromReadableStream(stdout)",
toError(err),
command,
),
);
// TODO: add Sink.isSink
if (typeof command.stdout !== "string") {
stdout = Stream.transduce(stdout, command.stdout);
}
return Object.assign(Object.create(ProcessProto), {
pid,
exitCode,
isRunning,
kill,
stdin,
stderr,
stdout,
});
}),
typeof command.stdin === "string"
? identity
: Effect.tap((process) =>
Effect.forkDaemon(
Stream.run(
command.stdin as Stream.Stream<Uint8Array>,
process.stdin,
),
),
),
);
}
case "PipedCommand": {
const flattened = Command.flatten(command);
if (flattened.length === 1) {
return pipe(flattened[0], runCommand(fileSystem));
}
const head = flattened[0];
const tail = flattened.slice(1);
const initial = tail.slice(0, tail.length - 1);
// TODO: PR Effect to fix this type.
// biome-ignore lint/style/noNonNullAssertion: A pipe always has a `right` element, but types don't use a non-empty tuple.
const last = tail.at(-1)!;
const stream = initial.reduce(
(stdin, command) =>
pipe(
Command.stdin(command, stdin),
runCommand(fileSystem),
Effect.map((process) => process.stdout),
Stream.unwrapScoped,
),
pipe(
runCommand(fileSystem)(head),
Effect.map((process) => process.stdout),
Stream.unwrapScoped,
),
);
return pipe(Command.stdin(last, stream), runCommand(fileSystem));
}
}
};

/**
* @since 1.0.0
* @category layer
*/
export const layer: Layer.Layer<
CommandExecutor.CommandExecutor,
never,
FileSystem.FileSystem
> = Layer.effect(
CommandExecutor.CommandExecutor,
pipe(
FileSystem.FileSystem,
Effect.map((fileSystem) =>
CommandExecutor.makeExecutor(runCommand(fileSystem)),
),
),
);
4 changes: 2 additions & 2 deletions packages/platform-deno/src/DenoContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import type {
Terminal,
Worker,
} from "@effect/platform";
import * as NodeCommandExecutor from "@effect/platform-node-shared/NodeCommandExecutor";
import * as NodeTerminal from "@effect/platform-node-shared/NodeTerminal";
import { Layer } from "effect";
import * as DenoCommandExecutor from "./DenoCommandExecutor.ts";
import * as DenoFileSystem from "./DenoFileSystem.ts";
import * as DenoPath from "./DenoPath.ts";
import * as DenoWorker from "./DenoWorker.ts";
Expand All @@ -29,7 +29,7 @@ export type DenoContext =
*/
export const layer: Layer.Layer<DenoContext> = Layer.mergeAll(
DenoPath.layer,
NodeCommandExecutor.layer,
DenoCommandExecutor.layer,
NodeTerminal.layer,
DenoWorker.layerManager,
).pipe(Layer.provideMerge(DenoFileSystem.layer));
56 changes: 56 additions & 0 deletions packages/platform-deno/src/internal/sink.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import * as Channel from "effect/Channel";
import type * as Chunk from "effect/Chunk";
import * as Deferred from "effect/Deferred";
import * as Effect from "effect/Effect";
import type { LazyArg } from "effect/Function";
import * as Sink from "effect/Sink";
import { writeInput } from "./stream.ts";

/** @internal */
export const fromWritable = <E, A = Uint8Array | string>(
evaluate: LazyArg<WritableStream>,
onError: (error: unknown) => E,
): Sink.Sink<void, A, never, E> =>
Sink.fromChannel(fromWritableChannel(evaluate, onError));

/** @internal */
export const fromWritableChannel = <IE, OE, A>(
writable: LazyArg<WritableStream>,
onError: (error: unknown) => OE,
): Channel.Channel<
Chunk.Chunk<never>,
Chunk.Chunk<A>,
IE | OE,
IE,
void,
unknown
> =>
Channel.flatMap(
Effect.zip(
Effect.sync(() => writable()),
Deferred.make<void, IE | OE>(),
),
([writable, deferred]) =>
Channel.embedInput(
writableOutput(writable, deferred, onError),
writeInput<IE, A>(
writable,
(cause) => Deferred.failCause(deferred, cause),
Deferred.complete(deferred, Effect.void),
),
),
);

const writableOutput = <IE, E>(
writable: WritableStream,
deferred: Deferred.Deferred<void, IE | E>,
onError: (error: unknown) => E,
): Effect.Effect<void, IE | E> =>
Effect.suspend(() => {
function handleError(err: unknown): void {
Deferred.unsafeDone(deferred, Effect.fail(onError(err)));
}

void writable.getWriter().closed.catch(handleError);
return Deferred.await(deferred);
});
41 changes: 41 additions & 0 deletions packages/platform-deno/src/internal/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { type Cause, type Chunk, Effect } from "effect";
import type * as AsyncInput from "effect/SingleProducerAsyncInput";

/**
* @category model
* @since 1.0.0
*/
export interface FromWritableOptions {
readonly endOnDone?: boolean;
}

/** @internal */
export const writeEffect =
<A>(
writable: WritableStream,
): ((chunk: Chunk.Chunk<A>) => Effect.Effect<void>) =>
(chunk: Chunk.Chunk<A>): Effect.Effect<void> =>
chunk.length === 0
? Effect.void
: Effect.promise(async () => {
for (const item of chunk) {
await writable.getWriter().write(item);
}
});

/** @internal */
export const writeInput = <IE, A>(
writable: WritableStream,
onFailure: (cause: Cause.Cause<IE>) => Effect.Effect<void>,
onDone = Effect.void,
): AsyncInput.AsyncInputProducer<IE, Chunk.Chunk<A>, unknown> => {
const write = writeEffect(writable);
return {
awaitRead: (): Effect.Effect<void> => Effect.void,
emit: write,
error: (cause): Effect.Effect<void> =>
Effect.zipRight(Effect.promise(writable.close), onFailure(cause)),
done: (_): Effect.Effect<void> =>
Effect.zipRight(Effect.promise(writable.close), onDone),
};
};

0 comments on commit 0c6f141

Please sign in to comment.