Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding global input hashes in lage server worker #861

Merged
merged 6 commits into from
Mar 8, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions change/change-1ac49201-a1a8-4f31-a6a4-b5d05e1d76a0.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"changes": [
{
"type": "minor",
"comment": "cheat on optimization by leverage the fact that 'info' command is called before anything else ALWAYS in BXL",
"packageName": "@lage-run/cli",
"email": "kchau@microsoft.com",
"dependentChangeType": "patch"
},
{
"type": "minor",
"comment": "cheat on optimization by leverage the fact that 'info' command is called before anything else ALWAYS in BXL",
"packageName": "@lage-run/hasher",
"email": "kchau@microsoft.com",
"dependentChangeType": "patch"
},
{
"type": "minor",
"comment": "cheat on optimization by leverage the fact that 'info' command is called before anything else ALWAYS in BXL",
"packageName": "@lage-run/rpc",
"email": "kchau@microsoft.com",
"dependentChangeType": "patch"
}
]
}
11 changes: 11 additions & 0 deletions change/change-be167829-f523-4d7e-8fe1-e50eac6ba277.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"changes": [
{
"type": "minor",
"comment": "cheat on optimization by leverage the fact that 'info' command is called before anything else ALWAYS in BXL",
"packageName": "lage",
"email": "kchau@microsoft.com",
"dependentChangeType": "patch"
}
]
}
3 changes: 2 additions & 1 deletion packages/cli/src/commands/exec/executeRemotely.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import path from "path";
import type { Logger } from "@lage-run/logger";
import createLogger from "@lage-run/logger";
import { initializeReporters } from "../initializeReporters.js";
Expand Down Expand Up @@ -140,7 +141,7 @@ export async function executeRemotely(options: ExecRemotelyOptions, command: Com
process.exitCode = response.exitCode;

// we will simulate file access even if exit code may be non-zero
await simulateFileAccess(logger, [...response.inputs, ...response.globalInputs], response.outputs);
await simulateFileAccess(logger, [...response.inputs, path.join(response.cwd, response.globalInputHashFile)], response.outputs);
} else {
process.exitCode = 1;
}
Expand Down
68 changes: 63 additions & 5 deletions packages/cli/src/commands/info/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,21 @@
import { getFilteredPackages } from "../../filter/getFilteredPackages.js";
import createLogger from "@lage-run/logger";
import path from "path";
import fs from "fs";
import { parse } from "shell-quote";

import type { ReporterInitOptions } from "../../types/ReporterInitOptions.js";
import type { Target } from "@lage-run/target-graph";
import { Target, getStartTargetId, getTargetId } from "@lage-run/target-graph";

Check failure on line 14 in packages/cli/src/commands/info/action.ts

View workflow job for this annotation

GitHub Actions / build (18.x, macos-latest)

Import "Target" is only used as types

Check warning on line 14 in packages/cli/src/commands/info/action.ts

View workflow job for this annotation

GitHub Actions / build (18.x, macos-latest)

'getTargetId' is defined but never used

Check failure on line 14 in packages/cli/src/commands/info/action.ts

View workflow job for this annotation

GitHub Actions / build (20.x, macos-latest)

Import "Target" is only used as types

Check warning on line 14 in packages/cli/src/commands/info/action.ts

View workflow job for this annotation

GitHub Actions / build (20.x, macos-latest)

'getTargetId' is defined but never used

Check failure on line 14 in packages/cli/src/commands/info/action.ts

View workflow job for this annotation

GitHub Actions / build (22.x, macos-latest)

Import "Target" is only used as types

Check warning on line 14 in packages/cli/src/commands/info/action.ts

View workflow job for this annotation

GitHub Actions / build (22.x, macos-latest)

'getTargetId' is defined but never used
import { initializeReporters } from "../initializeReporters.js";
import { TargetRunnerPicker } from "@lage-run/runners";
import { getBinPaths } from "../../getBinPaths.js";
import { runnerPickerOptions } from "../../runnerPickerOptions.js";
import { parseServerOption } from "../parseServerOption.js";
import { optimizeTargetGraph } from "../../optimizeTargetGraph.js";
import { glob } from "@lage-run/globby";
import { FileHasher } from "@lage-run/hasher/lib/FileHasher.js";
import { hashStrings } from "@lage-run/hasher";
import { getGlobalInputHashFilePath } from "../targetHashFilePath.js";

interface InfoActionOptions extends ReporterInitOptions {
dependencies: boolean;
Expand Down Expand Up @@ -147,6 +152,58 @@
generatePackageTask(target, taskArgs, config, options, binPaths, packageInfos, tasks)
);

// In worker server mode, we need to actually speed up the BuildXL runs with presupplied global input hashes so that it doesn't try to read it over and over again
// This is an important optimization for BuildXL for large amount of env glob matches in non-well-behaved monorepos
// (e.g. repos that have files listed in env glob to avoid circular dependencies in package graph)
if (shouldRunWorkersAsService(options)) {
// For each target in the target graph, we need to create a global input hash file in this kind of location:
// ${target.cwd}/.lage/global_inputs_hash
// We will use glob for these files and use the FileHasher to generate these hashes.
const fileHasher = new FileHasher({
root,
});

const globHashCache = new Map<string, string>();
const globHashWithCache = (patterns: string[], options: { cwd: string }) => {
const key = patterns.join("###");
if (globHashCache.has(key)) {
return globHashCache.get(key)!;
}

const files = glob(patterns, options);
const hash = hashStrings(Object.values(fileHasher.hash(files.map((file) => path.join(root, file)))));

globHashCache.set(key, hash);

return hash;
};

const globalInputs = config.cacheOptions?.environmentGlob
? glob(config.cacheOptions?.environmentGlob, { cwd: root })
: ["lage.config.js"];

for (const target of optimizedTargets) {
if (target.id === getStartTargetId()) {
continue;
}

const targetGlobalInputsHash = target.environmentGlob
? globHashWithCache(target.environmentGlob, { cwd: root })
: globHashWithCache(globalInputs, { cwd: root });

const targetGlobalInputsHashFile = path.join(target.cwd, getGlobalInputHashFilePath(target));
const targetGlobalInputsHashFileDir = path.dirname(targetGlobalInputsHashFile);

// Make sure the directory exists
if (!fs.existsSync(targetGlobalInputsHashFileDir)) {
fs.mkdirSync(targetGlobalInputsHashFileDir, { recursive: true });
}

// Write the hash to the file
fs.writeFileSync(targetGlobalInputsHashFile, targetGlobalInputsHash);
}
}

logger.info("info", {
command: command.args,
scope,
Expand Down Expand Up @@ -188,6 +245,10 @@
return packageTask;
}

function shouldRunWorkersAsService(options: InfoActionOptions) {
return (typeof process.env.LAGE_WORKER_SERVER === "string" && process.env.LAGE_WORKER_SERVER !== "false") || !!options.server;
}

function generateCommand(
target: Target,
taskArgs: string[],
Expand All @@ -197,9 +258,6 @@
packageInfos: PackageInfos,
tasks: string[]
) {
const shouldRunWorkersAsService =
(typeof process.env.LAGE_WORKER_SERVER === "string" && process.env.LAGE_WORKER_SERVER !== "false") || !!options.server;

if (target.type === "npmScript") {
const script = target.packageName !== undefined ? packageInfos[target.packageName]?.scripts?.[target.task] : undefined;

Expand All @@ -215,7 +273,7 @@
const npmClient = config.npmClient ?? "npm";
const command = [npmClient, ...getNpmArgs(target.task, taskArgs)];
return command;
} else if (target.type === "worker" && shouldRunWorkersAsService) {
} else if (target.type === "worker" && shouldRunWorkersAsService(options)) {
const { host, port } = parseServerOption(options.server);
const command = [binPaths["lage"], "exec", "--tasks", ...tasks, "--server", `${host}:${port}`];
if (options.concurrency) {
Expand Down
26 changes: 13 additions & 13 deletions packages/cli/src/commands/server/lageService.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { type ConfigOptions, getConfig, getConcurrency, getMaxWorkersPerTask } from "@lage-run/config";
import type { Logger } from "@lage-run/logger";
import { ConnectError, Code, type ILageService } from "@lage-run/rpc";
import { getStartTargetId, getTargetId, type Target, type TargetGraph } from "@lage-run/target-graph";
import { getStartTargetId, getTargetId, type TargetGraph } from "@lage-run/target-graph";
import { type DependencyMap, getPackageInfos, getWorkspaceRoot } from "workspace-tools";
import { createTargetGraph } from "../run/createTargetGraph.js";
import { type Readable } from "stream";
Expand All @@ -16,6 +16,7 @@
import { formatDuration, hrToSeconds, hrtimeDiff } from "@lage-run/format-hrtime";
import path from "path";
import fs from "fs";
import { getGlobalInputHashFilePath, getHashFilePath } from "../targetHashFilePath.js";

interface LageServiceContext {
config: ConfigOptions;
Expand Down Expand Up @@ -162,10 +163,6 @@
tasks: string[];
}

function getHashFilePath(target: Target) {
return path.join(`node_modules/.lage/hash_${target.task}`);
}

export async function createLageService({
cwd,
serverControls,
Expand All @@ -188,7 +185,7 @@
// THIS IS A BIG ASSUMPTION; TODO: memoize based on the parameters of the initialize() call
// The first request sets up the nodeArg and taskArgs - we are assuming that all requests to run this target are coming from the same
// `lage info` call
const { config, targetGraph, dependencyMap, packageTree, root, pool, globalInputs, targetHasher } = await initialize({

Check warning on line 188 in packages/cli/src/commands/server/lageService.ts

View workflow job for this annotation

GitHub Actions / build (18.x, macos-latest)

'globalInputs' is assigned a value but never used

Check warning on line 188 in packages/cli/src/commands/server/lageService.ts

View workflow job for this annotation

GitHub Actions / build (20.x, macos-latest)

'globalInputs' is assigned a value but never used

Check warning on line 188 in packages/cli/src/commands/server/lageService.ts

View workflow job for this annotation

GitHub Actions / build (22.x, macos-latest)

'globalInputs' is assigned a value but never used
cwd,
logger,
nodeArg: request.nodeOptions,
Expand Down Expand Up @@ -233,18 +230,17 @@
threadId: 0,
};

const targetGlobalInputs = target.environmentGlob ? glob(target.environmentGlob, { cwd: root }) : globalInputs;

let results: {
packageName?: string;
task: string;
cwd: string;
exitCode: number;
inputs: string[];
outputs: string[];
stdout: string;
stderr: string;
id: string;
globalInputs: string[];
globalInputHashFile: string;
};

const inputs = getInputFiles(target, dependencyMap, packageTree);
Expand All @@ -258,6 +254,7 @@
inputs.push(path.join(path.relative(root, depTarget.cwd), getHashFilePath(depTarget)).replace(/\\/g, "/"));
}

// Write the target hash to a file for its dependants to use
const targetHashFile = getHashFilePath(target);
const targetHashFullPath = path.join(target.cwd, targetHashFile);

Expand All @@ -271,6 +268,8 @@
throw new ConnectError(`Error writing target hash file: ${targetHashFullPath}`, Code.Internal);
}

const targetGlobalInputHashRelativePath = getGlobalInputHashFilePath(target);

try {
await pool.exec(
task,
Expand Down Expand Up @@ -317,13 +316,14 @@
results = {
packageName: request.packageName,
task: request.task,
cwd: target.cwd,
exitCode: 0,
inputs,
outputs,
stdout: writableStdout.toString(),
stderr: writableStderr.toString(),
id,
globalInputs: targetGlobalInputs,
globalInputHashFile: targetGlobalInputHashRelativePath,
};
} catch (e) {
const outputs = getOutputFiles(root, target, config.cacheOptions?.outputGlob, packageTree);
Expand All @@ -336,13 +336,14 @@
results = {
packageName: request.packageName,
task: request.task,
cwd: target.cwd,
exitCode: 1,
inputs,
outputs,
stdout: "",
stderr: e instanceof Error ? e.toString() : "",
id,
globalInputs: targetGlobalInputs,
globalInputHashFile: targetGlobalInputHashRelativePath,
};
}

Expand All @@ -351,13 +352,12 @@
{
packageName: results.packageName,
task: results.task,
cwd: results.cwd,
exitCode: results.exitCode,
inputs: results.inputs,
outputs: results.outputs,
id: results.id,
globalInputs: `(${target.environmentGlob ? "custom target env glob used" : "general global inputs used"}): ${
results.globalInputs.length
} files`,
globalInputHashFile: targetGlobalInputHashRelativePath,
},
null,
2
Expand Down
9 changes: 9 additions & 0 deletions packages/cli/src/commands/targetHashFilePath.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import path from "path";

export function getHashFilePath(target: { task: string }) {
return path.join(`node_modules/.lage/hash_${target.task}`);
}

export function getGlobalInputHashFilePath(target: { task: string }) {
return path.join(`node_modules/.lage/global_inputs_hash_${target.task}`);
}
2 changes: 2 additions & 0 deletions packages/hasher/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export { TargetHasher } from "./TargetHasher.js";
export { PackageTree } from "./PackageTree.js";
export { getInputFiles } from "./getInputFiles.js";
export { FileHasher } from "./FileHasher.js";
export { hashStrings } from "./hashStrings.js";
15 changes: 8 additions & 7 deletions packages/rpc/proto/lage/v1/lage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ message RunTargetRequest {
message RunTargetResponse {
string id = 1;
optional string package_name = 2;
string task = 3;
int32 exit_code = 4;
repeated string inputs = 5;
repeated string outputs = 6;
string stdout = 7;
string stderr = 8;
repeated string global_inputs = 9;
string cwd = 3;
string task = 4;
int32 exit_code = 5;
repeated string inputs = 6;
repeated string outputs = 7;
string stdout = 8;
string stderr = 9;
string global_input_hash_file = 10;
}

message PingRequest {}
Expand Down
36 changes: 21 additions & 15 deletions packages/rpc/src/gen/lage/v1/lage_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,39 +85,44 @@ export class RunTargetResponse extends Message<RunTargetResponse> {
packageName?: string;

/**
* @generated from field: string task = 3;
* @generated from field: string cwd = 3;
*/
cwd = "";

/**
* @generated from field: string task = 4;
*/
task = "";

/**
* @generated from field: int32 exit_code = 4;
* @generated from field: int32 exit_code = 5;
*/
exitCode = 0;

/**
* @generated from field: repeated string inputs = 5;
* @generated from field: repeated string inputs = 6;
*/
inputs: string[] = [];

/**
* @generated from field: repeated string outputs = 6;
* @generated from field: repeated string outputs = 7;
*/
outputs: string[] = [];

/**
* @generated from field: string stdout = 7;
* @generated from field: string stdout = 8;
*/
stdout = "";

/**
* @generated from field: string stderr = 8;
* @generated from field: string stderr = 9;
*/
stderr = "";

/**
* @generated from field: repeated string global_inputs = 9;
* @generated from field: string global_input_hash_file = 10;
*/
globalInputs: string[] = [];
globalInputHashFile = "";

constructor(data?: PartialMessage<RunTargetResponse>) {
super();
Expand All @@ -129,13 +134,14 @@ export class RunTargetResponse extends Message<RunTargetResponse> {
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{ no: 1, name: "id", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 2, name: "package_name", kind: "scalar", T: 9 /* ScalarType.STRING */, opt: true },
{ no: 3, name: "task", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 4, name: "exit_code", kind: "scalar", T: 5 /* ScalarType.INT32 */ },
{ no: 5, name: "inputs", kind: "scalar", T: 9 /* ScalarType.STRING */, repeated: true },
{ no: 6, name: "outputs", kind: "scalar", T: 9 /* ScalarType.STRING */, repeated: true },
{ no: 7, name: "stdout", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 8, name: "stderr", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 9, name: "global_inputs", kind: "scalar", T: 9 /* ScalarType.STRING */, repeated: true },
{ no: 3, name: "cwd", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 4, name: "task", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 5, name: "exit_code", kind: "scalar", T: 5 /* ScalarType.INT32 */ },
{ no: 6, name: "inputs", kind: "scalar", T: 9 /* ScalarType.STRING */, repeated: true },
{ no: 7, name: "outputs", kind: "scalar", T: 9 /* ScalarType.STRING */, repeated: true },
{ no: 8, name: "stdout", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 9, name: "stderr", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 10, name: "global_input_hash_file", kind: "scalar", T: 9 /* ScalarType.STRING */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): RunTargetResponse {
Expand Down
Loading