From 6a4ea4ac6dab78aa78b1cc085d1ecdd6bf4ffda3 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Fri, 10 Jan 2025 16:20:14 -0500 Subject: [PATCH] Include update info in workflow logging output (#1595) --- packages/test/src/test-integration-update.ts | 36 ++++++++++++++++++++ packages/workflow/src/logs.ts | 12 ++++++- 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/packages/test/src/test-integration-update.ts b/packages/test/src/test-integration-update.ts index 791bc6e4a..cf03f7672 100644 --- a/packages/test/src/test-integration-update.ts +++ b/packages/test/src/test-integration-update.ts @@ -10,6 +10,7 @@ import { } from '@temporalio/client'; import * as wf from '@temporalio/workflow'; import { temporal } from '@temporalio/proto'; +import { LogEntry } from '@temporalio/worker'; import { helpers, makeTestFunction } from './helpers-integration'; import { signalUpdateOrderingWorkflow } from './workflows/signal-update-ordering'; import { signalsActivitiesTimersPromiseOrdering } from './workflows/signals-timers-activities-order'; @@ -19,6 +20,8 @@ import { loadHistory, waitUntil } from './helpers'; // polling/retry strategies result in the expected behavior const LONG_POLL_EXPIRATION_INTERVAL_SECONDS = 5.0; +const recordedLogs: { [workflowId: string]: LogEntry[] } = {}; + const test = makeTestFunction({ workflowsPath: __filename, workflowEnvironmentOpts: { @@ -29,6 +32,7 @@ const test = makeTestFunction({ ], }, }, + recordedLogs, }); export const update = wf.defineUpdate('update'); @@ -1016,3 +1020,35 @@ test('Can complete update after workflow returns - pre-1.11.0 compatibility', as await runReplayHistory({}, hist); t.pass(); }); + +const logUpdate = wf.defineUpdate<[string, string], [string]>('log-update'); +export async function workflowWithLogInUpdate(): Promise { + const updateHandler = (msg: string): [string, string] => { + const updateInfo = wf.currentUpdateInfo(); + if (!updateInfo) { + throw new Error('expected updateInfo to be defined'); + } + wf.log.info(msg); + return [updateInfo.id, updateInfo.name]; + }; + wf.setHandler(logUpdate, updateHandler); + await wf.condition(() => false); +} + +test('Workflow Worker logs update info when logging within update handler', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker(); + await worker.runUntil(async () => { + const wfHandle = await startWorkflow(workflowWithLogInUpdate); + const logMsg = 'log msg'; + const [updateId, updateName] = await wfHandle.executeUpdate(logUpdate, { args: [logMsg] }); + t.true( + recordedLogs[wfHandle.workflowId].some( + (logEntry) => + logEntry.meta?.updateName === updateName && + logEntry.meta?.updateId === updateId && + logEntry.message === logMsg + ) + ); + }); +}); diff --git a/packages/workflow/src/logs.ts b/packages/workflow/src/logs.ts index ce9e23cf9..6e8310eed 100644 --- a/packages/workflow/src/logs.ts +++ b/packages/workflow/src/logs.ts @@ -5,6 +5,7 @@ import { type Sink, type Sinks, proxySinks } from './sinks'; import { isCancellation } from './errors'; import { WorkflowInfo, ContinueAsNew } from './interfaces'; import { assertInWorkflowContext } from './global-attributes'; +import { currentUpdateInfo, inWorkflowContext } from './workflow'; export interface WorkflowLogger extends Sink { trace(message: string, attrs?: Record): void; @@ -117,11 +118,20 @@ export function executeWithLifecycleLogging(fn: () => Promise): Promise * Note that this function may be called from outside of the Workflow context (eg. by the worker itself). */ export function workflowLogAttributes(info: WorkflowInfo): Record { - return { + const attributes: { [key: string]: string } = { namespace: info.namespace, taskQueue: info.taskQueue, workflowId: info.workflowId, runId: info.runId, workflowType: info.workflowType, }; + if (inWorkflowContext()) { + const updateInfo = currentUpdateInfo(); + if (updateInfo) { + // Add update info if it exists + attributes['updateId'] = updateInfo.id; + attributes['updateName'] = updateInfo.name; + } + } + return attributes; }