From 754ddf086a5cdd01d06c1b3bb8b5dbc7dbf23b78 Mon Sep 17 00:00:00 2001 From: Emily Xiong Date: Wed, 6 Mar 2024 18:11:07 -0500 Subject: [PATCH] feat(core): run commands directly --- .../run-commands/run-commands.impl.ts | 170 ++++++++++++------ packages/nx/src/hasher/hash-task.ts | 4 +- .../src/tasks-runner/default-tasks-runner.ts | 1 - .../forked-process-task-runner.ts | 4 +- .../nx/src/tasks-runner/task-orchestrator.ts | 84 +++++++-- .../nx/src/tasks-runner/tasks-schedule.ts | 9 +- packages/nx/src/tasks-runner/utils.ts | 29 +-- 7 files changed, 208 insertions(+), 93 deletions(-) diff --git a/packages/nx/src/executors/run-commands/run-commands.impl.ts b/packages/nx/src/executors/run-commands/run-commands.impl.ts index 630f9a302e9c4..d2c9a39ed136c 100644 --- a/packages/nx/src/executors/run-commands/run-commands.impl.ts +++ b/packages/nx/src/executors/run-commands/run-commands.impl.ts @@ -51,6 +51,8 @@ export interface RunCommandsOptions extends Json { args?: string | string[]; envFile?: string; __unparsed__: string[]; + usePty?: boolean; + streamOutput?: boolean; } const propKeys = [ @@ -64,7 +66,8 @@ const propKeys = [ 'envFile', '__unparsed__', 'env', - 'mode', + 'usePty', + 'streamOutput', 'verbose', ]; @@ -87,6 +90,7 @@ export default async function ( context: ExecutorContext ): Promise<{ success: boolean; + terminalOutput: string; }> { await loadEnvVars(options.envFile); const normalized = normalizeOptions(options); @@ -107,10 +111,10 @@ export default async function ( } try { - const success = options.parallel + const result = options.parallel ? await runInParallel(normalized, context) : await runSerially(normalized, context); - return { success }; + return result; } catch (e) { if (process.env.NX_VERBOSE_LOGGING === 'true') { console.error(e); @@ -124,7 +128,7 @@ export default async function ( async function runInParallel( options: NormalizedRunCommandsOptions, context: ExecutorContext -) { +): Promise<{ success: boolean; terminalOutput: string }> { const procs = options.commands.map((c) => createProcess( c, @@ -132,35 +136,59 @@ async function runInParallel( options.color, calculateCwd(options.cwd, context), options.env ?? {}, - true - ).then((result) => ({ + true, + options.usePty, + options.streamOutput + ).then((result: { success: boolean; terminalOutput: string }) => ({ result, command: c.command, })) ); + let terminalOutput = ''; if (options.readyWhen) { - const r = await Promise.race(procs); - if (!r.result) { - process.stderr.write( - `Warning: command "${r.command}" exited with non-zero status code` - ); - return false; + const r: { + result: { success: boolean; terminalOutput: string }; + command: string; + } = await Promise.race(procs); + terminalOutput += r.result.terminalOutput; + if (!r.result.success) { + const output = `Warning: command "${r.command}" exited with non-zero status code`; + terminalOutput += output; + if (options.streamOutput) { + process.stderr.write(output); + } + return { success: false, terminalOutput }; } else { - return true; + return { success: true, terminalOutput }; } } else { - const r = await Promise.all(procs); - const failed = r.filter((v) => !v.result); + const r: { + result: { success: boolean; terminalOutput: string }; + command: string; + }[] = await Promise.all(procs); + terminalOutput += r.map((f) => f.result.terminalOutput).join(''); + const failed = r.filter((v) => !v.result.success); if (failed.length > 0) { - failed.forEach((f) => { - process.stderr.write( - `Warning: command "${f.command}" exited with non-zero status code` - ); - }); - return false; + const output = failed + .map( + (f) => + `Warning: command "${f.command}" exited with non-zero status code` + ) + .join('\r\n'); + terminalOutput += output; + if (options.streamOutput) { + process.stderr.write(output); + } + return { + success: false, + terminalOutput, + }; } else { - return true; + return { + success: true, + terminalOutput, + }; } } } @@ -213,25 +241,31 @@ function normalizeOptions( async function runSerially( options: NormalizedRunCommandsOptions, context: ExecutorContext -) { +): Promise<{ success: boolean; terminalOutput: string }> { + let terminalOutput = ''; for (const c of options.commands) { - const success = await createProcess( - c, - undefined, - options.color, - calculateCwd(options.cwd, context), - options.env ?? {}, - false - ); - if (!success) { - process.stderr.write( - `Warning: command "${c.command}" exited with non-zero status code` + const result: { success: boolean; terminalOutput: string } = + await createProcess( + c, + undefined, + options.color, + calculateCwd(options.cwd, context), + options.env ?? {}, + false, + options.usePty, + options.streamOutput ); - return false; + terminalOutput += result.terminalOutput; + if (!result.success) { + const output = `Warning: command "${c.command}" exited with non-zero status code`; + result.terminalOutput += output; + if (options.streamOutput) { + process.stderr.write(output); + } + return { success: false, terminalOutput }; } } - - return true; + return { success: true, terminalOutput }; } async function createProcess( @@ -245,8 +279,10 @@ async function createProcess( color: boolean, cwd: string, env: Record, - isParallel: boolean -): Promise { + isParallel: boolean, + usePty: boolean = true, + streamOutput: boolean = true +): Promise<{ success: boolean; terminalOutput: string }> { env = processEnv(color, cwd, env); // The rust runCommand is always a tty, so it will not look nice in parallel and if we need prefixes // currently does not work properly in windows @@ -254,32 +290,33 @@ async function createProcess( process.env.NX_NATIVE_COMMAND_RUNNER !== 'false' && process.stdout.isTTY && !commandConfig.prefix && - !isParallel + !isParallel && + usePty ) { const cp = new PseudoTtyProcess( - runCommand(commandConfig.command, cwd, env) + runCommand(commandConfig.command, cwd, env, !streamOutput) ); + let terminalOutput = ''; return new Promise((res) => { cp.onOutput((output) => { + terminalOutput += output; if (readyWhen && output.indexOf(readyWhen) > -1) { - res(true); + res({ success: true, terminalOutput }); } }); cp.onExit((code) => { - if (code === 0) { - res(true); - } else if (code >= 128) { + if (code >= 128) { process.exit(code); } else { - res(false); + res({ success: code === 0, terminalOutput }); } }); }); } - return nodeProcess(commandConfig, color, cwd, env, readyWhen); + return nodeProcess(commandConfig, cwd, env, readyWhen, streamOutput); } function nodeProcess( @@ -289,11 +326,12 @@ function nodeProcess( bgColor?: string; prefix?: string; }, - color: boolean, cwd: string, env: Record, - readyWhen: string -): Promise { + readyWhen: string, + streamOutput = true +): Promise<{ success: boolean; terminalOutput: string }> { + let terminalOutput = ''; return new Promise((res) => { const childProcess = exec(commandConfig.command, { maxBuffer: LARGE_BUFFER, @@ -312,24 +350,36 @@ function nodeProcess( process.on('SIGQUIT', processExitListener); childProcess.stdout.on('data', (data) => { - process.stdout.write(addColorAndPrefix(data, commandConfig)); + const output = addColorAndPrefix(data, commandConfig); + terminalOutput += output; + if (streamOutput) { + process.stdout.write(output); + } if (readyWhen && data.toString().indexOf(readyWhen) > -1) { - res(true); + res({ success: true, terminalOutput }); } }); childProcess.stderr.on('data', (err) => { - process.stderr.write(addColorAndPrefix(err, commandConfig)); + const output = addColorAndPrefix(err, commandConfig); + terminalOutput += output; + if (streamOutput) { + process.stderr.write(output); + } if (readyWhen && err.toString().indexOf(readyWhen) > -1) { - res(true); + res({ success: true, terminalOutput }); } }); childProcess.on('error', (err) => { - process.stderr.write(addColorAndPrefix(err.toString(), commandConfig)); - res(false); + const ouptput = addColorAndPrefix(err.toString(), commandConfig); + terminalOutput += ouptput; + if (streamOutput) { + process.stderr.write(ouptput); + } + res({ success: false, terminalOutput }); }); childProcess.on('exit', (code) => { if (!readyWhen) { - res(code === 0); + res({ success: code === 0, terminalOutput }); } }); }); @@ -370,11 +420,13 @@ function calculateCwd( } function processEnv(color: boolean, cwd: string, env: Record) { + const localEnv = appendLocalEnv({ cwd: cwd ?? process.cwd() }); const res = { ...process.env, - ...appendLocalEnv({ cwd: cwd ?? process.cwd() }), + ...localEnv, ...env, }; + res.PATH = localEnv.PATH; // need to override PATH to make sure we are using the local node_modules if (color) { res.FORCE_COLOR = `${color}`; @@ -389,7 +441,7 @@ export function interpolateArgsIntoCommand( 'args' | 'parsedArgs' | '__unparsed__' | 'unknownOptions' >, forwardAllArgs: boolean -) { +): string { if (command.indexOf('{args.') > -1) { const regex = /{args\.([^}]+)}/g; return command.replace(regex, (_, group: string) => diff --git a/packages/nx/src/hasher/hash-task.ts b/packages/nx/src/hasher/hash-task.ts index 08ce18b05b220..fedc9e284d53c 100644 --- a/packages/nx/src/hasher/hash-task.ts +++ b/packages/nx/src/hasher/hash-task.ts @@ -16,7 +16,7 @@ export async function hashTasksThatDoNotDependOnOutputsOfOtherTasks( const tasks = Object.values(taskGraph.tasks); const tasksWithHashers = await Promise.all( tasks.map(async (task) => { - const customHasher = await getCustomHasher(task, projectGraph); + const customHasher = getCustomHasher(task, projectGraph); return { task, customHasher }; }) ); @@ -56,7 +56,7 @@ export async function hashTask( env: NodeJS.ProcessEnv ) { performance.mark('hashSingleTask:start'); - const customHasher = await getCustomHasher(task, projectGraph); + const customHasher = getCustomHasher(task, projectGraph); const projectsConfigurations = readProjectsConfigurationFromProjectGraph(projectGraph); const { value, details } = await (customHasher diff --git a/packages/nx/src/tasks-runner/default-tasks-runner.ts b/packages/nx/src/tasks-runner/default-tasks-runner.ts index 75e80cdc9386b..a5cb33bfd7473 100644 --- a/packages/nx/src/tasks-runner/default-tasks-runner.ts +++ b/packages/nx/src/tasks-runner/default-tasks-runner.ts @@ -1,6 +1,5 @@ import { TasksRunner, TaskStatus } from './tasks-runner'; import { TaskOrchestrator } from './task-orchestrator'; -import { performance } from 'perf_hooks'; import { TaskHasher } from '../hasher/task-hasher'; import { LifeCycle } from './life-cycle'; import { ProjectGraph } from '../config/project-graph'; diff --git a/packages/nx/src/tasks-runner/forked-process-task-runner.ts b/packages/nx/src/tasks-runner/forked-process-task-runner.ts index fe661918e6a43..15730c733b079 100644 --- a/packages/nx/src/tasks-runner/forked-process-task-runner.ts +++ b/packages/nx/src/tasks-runner/forked-process-task-runner.ts @@ -14,8 +14,8 @@ import { } from './batch/batch-messages'; import { stripIndents } from '../utils/strip-indents'; import { Task, TaskGraph } from '../config/task-graph'; -import { Readable, Transform } from 'stream'; -import { ChildProcess as NativeChildProcess, nxFork } from '../native'; +import { Transform } from 'stream'; +import { nxFork } from '../native'; import { PsuedoIPCServer } from './psuedo-ipc'; import { FORKED_PROCESS_OS_SOCKET_PATH } from '../daemon/socket-utils'; import { PseudoTtyProcess } from '../utils/child-process'; diff --git a/packages/nx/src/tasks-runner/task-orchestrator.ts b/packages/nx/src/tasks-runner/task-orchestrator.ts index da7daa0450ebd..ad94aaf2816ef 100644 --- a/packages/nx/src/tasks-runner/task-orchestrator.ts +++ b/packages/nx/src/tasks-runner/task-orchestrator.ts @@ -1,6 +1,8 @@ import { defaultMaxListeners } from 'events'; import { performance } from 'perf_hooks'; +import { relative } from 'path'; import { TaskHasher } from '../hasher/task-hasher'; +import runCommandsImpl from '../executors/run-commands/run-commands.impl'; import { ForkedProcessTaskRunner } from './forked-process-task-runner'; import { Cache } from './cache'; import { DefaultTasksRunnerOptions } from './default-tasks-runner'; @@ -8,6 +10,8 @@ import { TaskStatus } from './tasks-runner'; import { calculateReverseDeps, getExecutorForTask, + getPrintableCommandArgsForTask, + getTargetConfigurationForTask, isCacheableTask, removeTasksFromTaskGraph, shouldStreamOutput, @@ -24,6 +28,9 @@ import { getTaskSpecificEnv, } from './task-env'; import * as os from 'os'; +import { workspaceRoot } from '../utils/workspace-root'; +import { output } from '../utils/output'; +import { combineOptionsForExecutor } from '../utils/params'; export class TaskOrchestrator { private cache = new Cache(this.options); @@ -376,20 +383,73 @@ export class TaskOrchestrator { // the task wasn't cached if (results.length === 0) { - // cache prep - const { code, terminalOutput } = await this.runTaskInForkedProcess( + const shouldPrefix = + streamOutput && process.env.NX_PREFIX_OUTPUT === 'true'; + const targetConfiguration = getTargetConfigurationForTask( task, - env, - pipeOutput, - temporaryOutputPath, - streamOutput + this.projectGraph ); + if ( + process.env.NX_RUN_COMMANDS_DIRECTLY !== 'false' && + targetConfiguration.executor === 'nx:run-commands' && + !shouldPrefix + ) { + const { schema } = getExecutorForTask(task, this.projectGraph); + const isRunOne = this.initiatingProject != null; + const combinedOptions = combineOptionsForExecutor( + task.overrides, + task.target.configuration ?? targetConfiguration.defaultConfiguration, + targetConfiguration, + schema, + task.target.project, + relative(task.projectRoot ?? workspaceRoot, process.cwd()), + process.env.NX_VERBOSE_LOGGING === 'true' + ); + if (streamOutput) { + const args = getPrintableCommandArgsForTask(task); + output.logCommand(args.join(' ')); + } + const { success, terminalOutput } = await runCommandsImpl( + { + ...combinedOptions, + env, + usePty: isRunOne && !this.tasksSchedule.hasTasks(), + streamOutput, + }, + { + root: workspaceRoot, // only root is needed in runCommandsImpl + } as any + ); - results.push({ - task, - status: code === 0 ? 'success' : 'failure', - terminalOutput, - }); + const status = success ? 'success' : 'failure'; + if (!streamOutput) { + this.options.lifeCycle.printTaskTerminalOutput( + task, + status, + terminalOutput + ); + } + + results.push({ + task, + status, + terminalOutput, + }); + } else { + // cache prep + const { code, terminalOutput } = await this.runTaskInForkedProcess( + task, + env, + pipeOutput, + temporaryOutputPath, + streamOutput + ); + results.push({ + task, + status: code === 0 ? 'success' : 'failure', + terminalOutput, + }); + } } await this.postRunSteps([task], results, doNotSkipCache, { groupId }); } @@ -573,7 +633,7 @@ export class TaskOrchestrator { return true; } - const { schema } = await getExecutorForTask(task, this.projectGraph); + const { schema } = getExecutorForTask(task, this.projectGraph); return ( schema.outputCapture === 'pipe' || diff --git a/packages/nx/src/tasks-runner/tasks-schedule.ts b/packages/nx/src/tasks-runner/tasks-schedule.ts index 010219f8c56aa..5d63f3a317302 100644 --- a/packages/nx/src/tasks-runner/tasks-schedule.ts +++ b/packages/nx/src/tasks-runner/tasks-schedule.ts @@ -123,10 +123,7 @@ export class TasksSchedule { const batchMap: Record = {}; for (const root of this.notScheduledTaskGraph.roots) { const rootTask = this.notScheduledTaskGraph.tasks[root]; - const executorName = await getExecutorNameForTask( - rootTask, - this.projectGraph - ); + const executorName = getExecutorNameForTask(rootTask, this.projectGraph); await this.processTaskForBatches(batchMap, rootTask, executorName, true); } for (const [executorName, taskGraph] of Object.entries(batchMap)) { @@ -154,11 +151,11 @@ export class TasksSchedule { return; } - const { batchImplementationFactory } = await getExecutorForTask( + const { batchImplementationFactory } = getExecutorForTask( task, this.projectGraph ); - const executorName = await getExecutorNameForTask(task, this.projectGraph); + const executorName = getExecutorNameForTask(task, this.projectGraph); if (rootExecutorName !== executorName) { return; } diff --git a/packages/nx/src/tasks-runner/utils.ts b/packages/nx/src/tasks-runner/utils.ts index 8b53f765982e9..9038fd30379dc 100644 --- a/packages/nx/src/tasks-runner/utils.ts +++ b/packages/nx/src/tasks-runner/utils.ts @@ -2,14 +2,17 @@ import { output } from '../utils/output'; import { relative } from 'path'; import { Task, TaskGraph } from '../config/task-graph'; import { ProjectGraph, ProjectGraphProjectNode } from '../config/project-graph'; -import { TargetDependencyConfig } from '../config/workspace-json-project-json'; +import { + TargetConfiguration, + TargetDependencyConfig, +} from '../config/workspace-json-project-json'; import { workspaceRoot } from '../utils/workspace-root'; import { joinPathFragments } from '../utils/path'; import { isRelativePath } from '../utils/fileutils'; import { serializeOverridesIntoCommandLine } from '../utils/serialize-overrides-into-command-line'; import { splitByColons } from '../utils/split-target'; import { getExecutorInformation } from '../command-line/run/executor-utils'; -import { CustomHasher } from '../config/misc-interfaces'; +import { CustomHasher, ExecutorConfig } from '../config/misc-interfaces'; import { readProjectsConfigurationFromProjectGraph } from '../project-graph/project-graph'; export function getCommandAsString(execCommand: string, task: Task) { @@ -248,19 +251,23 @@ export function interpolate(template: string, data: any): string { }); } -export async function getExecutorNameForTask( +export function getTargetConfigurationForTask( task: Task, projectGraph: ProjectGraph -) { +): TargetConfiguration | undefined { const project = projectGraph.nodes[task.target.project].data; - return project.targets[task.target.target].executor; + return project.targets[task.target.target]; +} + +export function getExecutorNameForTask(task: Task, projectGraph: ProjectGraph) { + return getTargetConfigurationForTask(task, projectGraph)?.executor; } -export async function getExecutorForTask( +export function getExecutorForTask( task: Task, projectGraph: ProjectGraph -) { - const executor = await getExecutorNameForTask(task, projectGraph); +): ExecutorConfig & { isNgCompat: boolean; isNxExecutor: boolean } { + const executor = getExecutorNameForTask(task, projectGraph); const [nodeModule, executorName] = executor.split(':'); return getExecutorInformation( @@ -271,11 +278,11 @@ export async function getExecutorForTask( ); } -export async function getCustomHasher( +export function getCustomHasher( task: Task, projectGraph: ProjectGraph -): Promise | null { - const factory = (await getExecutorForTask(task, projectGraph)).hasherFactory; +): CustomHasher | null { + const factory = getExecutorForTask(task, projectGraph).hasherFactory; return factory ? factory() : null; }