Skip to content

Commit

Permalink
Include update info in workflow logging output (#1595)
Browse files Browse the repository at this point in the history
  • Loading branch information
THardy98 authored Jan 10, 2025
1 parent 8c68e0c commit 6a4ea4a
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 1 deletion.
36 changes: 36 additions & 0 deletions packages/test/src/test-integration-update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
} from '@temporalio/client';
import * as wf from '@temporalio/workflow';
import { temporal } from '@temporalio/proto';
import { LogEntry } from '@temporalio/worker';
import { helpers, makeTestFunction } from './helpers-integration';
import { signalUpdateOrderingWorkflow } from './workflows/signal-update-ordering';
import { signalsActivitiesTimersPromiseOrdering } from './workflows/signals-timers-activities-order';
Expand All @@ -19,6 +20,8 @@ import { loadHistory, waitUntil } from './helpers';
// polling/retry strategies result in the expected behavior
const LONG_POLL_EXPIRATION_INTERVAL_SECONDS = 5.0;

const recordedLogs: { [workflowId: string]: LogEntry[] } = {};

const test = makeTestFunction({
workflowsPath: __filename,
workflowEnvironmentOpts: {
Expand All @@ -29,6 +32,7 @@ const test = makeTestFunction({
],
},
},
recordedLogs,
});

export const update = wf.defineUpdate<string[], [string]>('update');
Expand Down Expand Up @@ -1016,3 +1020,35 @@ test('Can complete update after workflow returns - pre-1.11.0 compatibility', as
await runReplayHistory({}, hist);
t.pass();
});

const logUpdate = wf.defineUpdate<[string, string], [string]>('log-update');
export async function workflowWithLogInUpdate(): Promise<void> {
const updateHandler = (msg: string): [string, string] => {
const updateInfo = wf.currentUpdateInfo();
if (!updateInfo) {
throw new Error('expected updateInfo to be defined');
}
wf.log.info(msg);
return [updateInfo.id, updateInfo.name];
};
wf.setHandler(logUpdate, updateHandler);
await wf.condition(() => false);
}

test('Workflow Worker logs update info when logging within update handler', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
const worker = await createWorker();
await worker.runUntil(async () => {
const wfHandle = await startWorkflow(workflowWithLogInUpdate);
const logMsg = 'log msg';
const [updateId, updateName] = await wfHandle.executeUpdate(logUpdate, { args: [logMsg] });
t.true(
recordedLogs[wfHandle.workflowId].some(
(logEntry) =>
logEntry.meta?.updateName === updateName &&
logEntry.meta?.updateId === updateId &&
logEntry.message === logMsg
)
);
});
});
12 changes: 11 additions & 1 deletion packages/workflow/src/logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { type Sink, type Sinks, proxySinks } from './sinks';
import { isCancellation } from './errors';
import { WorkflowInfo, ContinueAsNew } from './interfaces';
import { assertInWorkflowContext } from './global-attributes';
import { currentUpdateInfo, inWorkflowContext } from './workflow';

export interface WorkflowLogger extends Sink {
trace(message: string, attrs?: Record<string, unknown>): void;
Expand Down Expand Up @@ -117,11 +118,20 @@ export function executeWithLifecycleLogging(fn: () => Promise<unknown>): Promise
* Note that this function may be called from outside of the Workflow context (eg. by the worker itself).
*/
export function workflowLogAttributes(info: WorkflowInfo): Record<string, unknown> {
return {
const attributes: { [key: string]: string } = {
namespace: info.namespace,
taskQueue: info.taskQueue,
workflowId: info.workflowId,
runId: info.runId,
workflowType: info.workflowType,
};
if (inWorkflowContext()) {
const updateInfo = currentUpdateInfo();
if (updateInfo) {
// Add update info if it exists
attributes['updateId'] = updateInfo.id;
attributes['updateName'] = updateInfo.name;
}
}
return attributes;
}

0 comments on commit 6a4ea4a

Please sign in to comment.