Skip to content
Merged
25 changes: 24 additions & 1 deletion packages/nx/src/daemon/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -634,14 +634,20 @@ export class DaemonClient {
if (this._daemonStatus == DaemonStatus.DISCONNECTED) {
this._daemonStatus = DaemonStatus.CONNECTING;

let daemonPid: number | null = null;
if (!(await this.isServerAvailable())) {
await this.startInBackground();
daemonPid = await this.startInBackground();
}
this.setUpConnection();
this._daemonStatus = DaemonStatus.CONNECTED;
this._daemonReady();

daemonPid ??= getDaemonProcessIdSync();
await this.registerDaemonProcessWithMetricsService(daemonPid);
} else if (this._daemonStatus == DaemonStatus.CONNECTING) {
await this._waitForDaemonReady;
const daemonPid = getDaemonProcessIdSync();
await this.registerDaemonProcessWithMetricsService(daemonPid);
}
// An open promise isn't enough to keep the event loop
// alive, so we set a timeout here and clear it when we hear
Expand All @@ -660,6 +666,23 @@ export class DaemonClient {
});
}

private async registerDaemonProcessWithMetricsService(
daemonPid: number | null
) {
if (!daemonPid) {
return;
}

try {
const { getProcessMetricsService } = await import(
'../../tasks-runner/process-metrics-service'
);
getProcessMetricsService().registerDaemonProcess(daemonPid);
} catch {
// don't error, this is a secondary concern that should not break task execution
}
}

private retryMessageAfterNewDaemonStarts() {
const [msg, res, rej] = [
this.currentMessage,
Expand Down
26 changes: 22 additions & 4 deletions packages/nx/src/executors/run-commands/run-commands.impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as yargsParser from 'yargs-parser';
import { ExecutorContext } from '../../config/misc-interfaces';
import { isTuiEnabled } from '../../tasks-runner/is-tui-enabled';
import { PseudoTerminal } from '../../tasks-runner/pseudo-terminal';
import { createTaskId } from '../../tasks-runner/utils';
import { NoopChildProcess } from '../../tasks-runner/running-tasks/noop-child-process';
import {
ParallelRunningTasks,
Expand Down Expand Up @@ -97,7 +98,8 @@ export default async function (

export async function runCommands(
options: RunCommandsOptions,
context: ExecutorContext
context: ExecutorContext,
taskId?: string
) {
const normalized = normalizeOptions(options);

Expand Down Expand Up @@ -138,11 +140,27 @@ export async function runCommands(
const tuiEnabled = isTuiEnabled();

try {
const resolvedTaskId =
taskId ??
createTaskId(
context.projectName,
context.targetName,
context.configurationName
);
const runningTask = isSingleCommandAndCanUsePseudoTerminal
? await runSingleCommandWithPseudoTerminal(normalized, context)
? await runSingleCommandWithPseudoTerminal(
normalized,
context,
resolvedTaskId
)
: options.parallel
? new ParallelRunningTasks(normalized, context)
: new SeriallyRunningTasks(normalized, context, tuiEnabled);
? new ParallelRunningTasks(normalized, context, resolvedTaskId)
: new SeriallyRunningTasks(
normalized,
context,
tuiEnabled,
resolvedTaskId
);
return runningTask;
} catch (e) {
if (process.env.NX_VERBOSE_LOGGING === 'true') {
Expand Down
52 changes: 45 additions & 7 deletions packages/nx/src/executors/run-commands/running-tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
loadAndExpandDotEnvFile,
unloadDotEnvFile,
} from '../../tasks-runner/task-env';
import { getProcessMetricsService } from '../../tasks-runner/process-metrics-service';
import { signalToCode } from '../../utils/exit-codes';
import {
LARGE_BUFFER,
Expand All @@ -30,7 +31,11 @@ export class ParallelRunningTasks implements RunningTask {
[];
private outputCallbacks: Array<(terminalOutput: string) => void> = [];

constructor(options: NormalizedRunCommandsOptions, context: ExecutorContext) {
constructor(
options: NormalizedRunCommandsOptions,
context: ExecutorContext,
taskId: string
) {
this.childProcesses = options.commands.map(
(commandConfig) =>
new RunningNodeProcess(
Expand All @@ -40,7 +45,8 @@ export class ParallelRunningTasks implements RunningTask {
options.env ?? {},
options.readyWhenStatus,
options.streamOutput,
options.envFile
options.envFile,
taskId
)
);
this.readyWhenStatus = options.readyWhenStatus;
Expand Down Expand Up @@ -228,7 +234,8 @@ export class SeriallyRunningTasks implements RunningTask {
constructor(
options: NormalizedRunCommandsOptions,
context: ExecutorContext,
private readonly tuiEnabled: boolean
private readonly tuiEnabled: boolean,
private readonly taskId: string
) {
this.run(options, context)
.catch((e) => {
Expand Down Expand Up @@ -279,6 +286,7 @@ export class SeriallyRunningTasks implements RunningTask {
options.color,
calculateCwd(options.cwd, context),
options.processEnv ?? options.env ?? {},
this.taskId,
options.usePty,
options.streamOutput,
options.tty,
Expand Down Expand Up @@ -314,6 +322,7 @@ export class SeriallyRunningTasks implements RunningTask {
color: boolean,
cwd: string,
env: Record<string, string>,
taskId: string,
usePty: boolean = true,
streamOutput: boolean = true,
tty: boolean,
Expand All @@ -330,7 +339,7 @@ export class SeriallyRunningTasks implements RunningTask {
const pseudoTerminal = createPseudoTerminal();
registerProcessListener(this, pseudoTerminal);

return createProcessWithPseudoTty(
const pseudoTtyProcess = await createProcessWithPseudoTty(
pseudoTerminal,
commandConfig,
color,
Expand All @@ -340,6 +349,15 @@ export class SeriallyRunningTasks implements RunningTask {
tty,
envFile
);

// Register process for metrics collection (direct run-commands execution)
// Skip registration if we're in a forked executor - the fork wrapper already registered
const pid = pseudoTtyProcess.getPid();
if (pid && !process.env.NX_FORKED_TASK_EXECUTOR) {
getProcessMetricsService().registerTaskProcess(taskId, pid);
}

return pseudoTtyProcess;
}

return new RunningNodeProcess(
Expand All @@ -349,7 +367,8 @@ export class SeriallyRunningTasks implements RunningTask {
env,
[],
streamOutput,
envFile
envFile,
taskId
);
}
}
Expand All @@ -369,7 +388,8 @@ class RunningNodeProcess implements RunningTask {
env: Record<string, string>,
private readyWhenStatus: { stringToMatch: string; found: boolean }[],
streamOutput = true,
envFile: string
envFile: string,
private taskId: string
) {
env = processEnv(color, cwd, env, envFile);
this.command = commandConfig.command;
Expand All @@ -384,6 +404,15 @@ class RunningNodeProcess implements RunningTask {
windowsHide: false,
});

// Register process for metrics collection
// Skip registration if we're in a forked executor - the fork wrapper already registered
if (this.childProcess.pid && !process.env.NX_FORKED_TASK_EXECUTOR) {
getProcessMetricsService().registerTaskProcess(
this.taskId,
this.childProcess.pid
);
}

this.addListeners(commandConfig, streamOutput);
}

Expand Down Expand Up @@ -508,7 +537,8 @@ class RunningNodeProcess implements RunningTask {

export async function runSingleCommandWithPseudoTerminal(
normalized: NormalizedRunCommandsOptions,
context: ExecutorContext
context: ExecutorContext,
taskId: string
): Promise<PseudoTtyProcess> {
const pseudoTerminal = createPseudoTerminal();
const pseudoTtyProcess = await createProcessWithPseudoTty(
Expand All @@ -521,6 +551,14 @@ export async function runSingleCommandWithPseudoTerminal(
pseudoTerminal ? normalized.isTTY : false,
normalized.envFile
);

// Register process for metrics collection (direct run-commands execution)
// Skip registration if we're in a forked executor - the fork wrapper already registered
const pid = pseudoTtyProcess.getPid();
if (pid && !process.env.NX_FORKED_TASK_EXECUTOR) {
getProcessMetricsService().registerTaskProcess(taskId, pid);
}

registerProcessListener(pseudoTtyProcess, pseudoTerminal);
return pseudoTtyProcess;
}
Expand Down
90 changes: 90 additions & 0 deletions packages/nx/src/native/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export declare class AppLifeCycle {

export declare class ChildProcess {
getParserAndWriter(): ExternalObject<[ParserArc, WriterArc]>
getPid(): number
kill(signal?: NodeJS.Signals): void
onExit(callback: (message: string) => void): void
onOutput(callback: (message: string) => void): void
Expand Down Expand Up @@ -92,6 +93,45 @@ export declare class NxTaskHistory {
getEstimatedTaskTimings(targets: Array<TaskTarget>): Record<string, number>
}

/**
* High-performance metrics collector for Nx tasks
* Thread-safe and designed for minimal overhead
*/
export declare class ProcessMetricsCollector {
/** Create a new ProcessMetricsCollector with default configuration */
constructor()
/**
* Start metrics collection
* Idempotent - safe to call multiple times
*/
startCollection(): void
/**
* Stop metrics collection
* Returns true if collection was stopped, false if not running
*/
stopCollection(): boolean
/**
* Get system information (CPU cores and total memory)
* This is separate from the collection interval and meant to be called imperatively
*/
getSystemInfo(): SystemInfo
/** Register the main CLI process for metrics collection */
registerMainCliProcess(pid: number): void
/** Register the daemon process for metrics collection */
registerDaemonProcess(pid: number): void
/**
* Register a process for a specific task
* Automatically creates the task if it doesn't exist
*/
registerTaskProcess(taskId: string, pid: number): void
/** Register a batch with multiple tasks sharing a worker */
registerBatch(batchId: string, taskIds: Array<string>, pid: number): void
/** Register a subprocess of the main CLI for metrics collection */
registerMainCliSubprocess(pid: number): void
/** Subscribe to push-based metrics notifications from TypeScript */
subscribe(callback: (err: Error | null, event: MetricsUpdate) => void): void
}

export declare class RunningTasksService {
constructor(db: ExternalObject<NxDbConnection>)
getRunningTasks(ids: Array<string>): Array<string>
Expand Down Expand Up @@ -153,6 +193,13 @@ export declare class WorkspaceContext {
getFilesInDirectory(directory: string): Array<string>
}

/** Batch metrics snapshot */
export interface BatchMetricsSnapshot {
batchId: string
taskIds: Array<string>
processes: Array<ProcessMetrics>
}

export interface CachedResult {
code: number
terminalOutput?: string
Expand Down Expand Up @@ -264,6 +311,12 @@ export declare export declare function isEditorInstalled(editor: SupportedEditor

export declare export declare function logDebug(message: string): void

/** Metrics update sent every collection cycle */
export interface MetricsUpdate {
metrics: ProcessMetricsSnapshot
metadata?: Record<string, ProcessMetadata>
}

/** Stripped version of the NxJson interface for use in rust */
export interface NxJson {
namedInputs?: Record<string, Array<JsInputs>>
Expand All @@ -283,6 +336,37 @@ export interface NxWorkspaceFilesExternals {

export declare export declare function parseTaskStatus(stringStatus: string): TaskStatus

/** Process metadata (static, doesn't change during process lifetime) */
export interface ProcessMetadata {
ppid: number
name: string
command: string
exePath: string
cwd: string
}

/** Process metrics (dynamic, changes every collection) */
export interface ProcessMetrics {
pid: number
cpu: number
memory: number
}

/** Organized collection of process metrics with timestamp */
export interface ProcessMetricsSnapshot {
timestamp: number
mainCli?: ProcessTreeMetrics
daemon?: ProcessTreeMetrics
tasks: Record<string, Array<ProcessMetrics>>
batches: Record<string, BatchMetricsSnapshot>
}

/** Metrics for a process and its subprocesses (used for both CLI and daemon) */
export interface ProcessTreeMetrics {
main: ProcessMetrics
subprocesses: Array<ProcessMetrics>
}

export interface Project {
root: string
namedInputs?: Record<string, Array<JsInputs>>
Expand Down Expand Up @@ -318,6 +402,12 @@ export declare const enum SupportedEditor {
Unknown = 5
}

/** System information (static system-level data) */
export interface SystemInfo {
cpuCores: number
totalMemory: number
}

export interface Target {
executor?: string
inputs?: Array<JsInputs>
Expand Down
Loading
Loading