Skip to content

Commit

Permalink
Refactored existing integration testing helpers (#1593)
Browse files Browse the repository at this point in the history
  • Loading branch information
THardy98 authored Jan 10, 2025
1 parent 4cf092d commit 2eb587b
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 53 deletions.
87 changes: 87 additions & 0 deletions packages/test/src/helpers-integration-multi-codec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/* eslint @typescript-eslint/no-non-null-assertion: 0 */
import { ExecutionContext, TestFn } from 'ava';
import { defaultFailureConverter, defaultPayloadConverter, LoadedDataConverter } from '@temporalio/common';
import { WorkerOptions, WorkflowBundle } from '@temporalio/worker';

import { TestWorkflowEnvironment } from '@temporalio/testing';
import { ConnectionInjectorInterceptor } from './activities/interceptors';
import {
configurableHelpers,
createLocalTestEnvironment,
makeConfigurableEnvironmentTestFn,
} from './helpers-integration';
import { ByteSkewerPayloadCodec, registerDefaultCustomSearchAttributes, Worker } from './helpers';

// Note: re-export shared workflows (or long workflows)
export * from './workflows';

interface TestConfig {
loadedDataConverter: LoadedDataConverter;
env: TestWorkflowEnvironment;
createWorkerWithDefaults: (t: ExecutionContext<TestContext>, opts?: Partial<WorkerOptions>) => Promise<Worker>;
}
interface TestContext {
workflowBundle: WorkflowBundle;
configs: TestConfig[];
}

const codecs = [undefined, new ByteSkewerPayloadCodec()];

export function makeTestFn(makeBundle: () => Promise<WorkflowBundle>): TestFn<TestContext> {
return makeConfigurableEnvironmentTestFn<TestContext>({
createTestContext: async (_t: ExecutionContext) => {
const configs: TestConfig[] = [];
await Promise.all(
codecs.map(async (codec) => {
const dataConverter = { payloadCodecs: codec ? [codec] : [] };
const loadedDataConverter = {
payloadConverter: defaultPayloadConverter,
payloadCodecs: codec ? [codec] : [],
failureConverter: defaultFailureConverter,
};

const env = await createLocalTestEnvironment({
client: { dataConverter },
});
await registerDefaultCustomSearchAttributes(env.connection);

configs.push({
loadedDataConverter,
env,
createWorkerWithDefaults(t: ExecutionContext<TestContext>, opts?: Partial<WorkerOptions>): Promise<Worker> {
return configurableHelpers(t, t.context.workflowBundle, env).createWorker({
dataConverter,
interceptors: {
activity: [
() => ({ inbound: new ConnectionInjectorInterceptor(env.connection, loadedDataConverter) }),
],
},
...opts,
});
},
});
})
);
return {
workflowBundle: await makeBundle(),
configs,
};
},
teardown: async (testContext: TestContext) => {
for (const config of testContext.configs) {
await config.env.teardown();
}
},
});
}

export const configMacro = async (
t: ExecutionContext<TestContext>,
testFn: (t: ExecutionContext<TestContext>, config: TestConfig) => Promise<unknown> | unknown
): Promise<void> => {
const testPromises = t.context.configs.map(async (config) => {
// Note: ideally, we'd like to add an annotation to the test name to indicate what codec it used
await testFn(t, config);
});
await Promise.all(testPromises);
};
156 changes: 103 additions & 53 deletions packages/test/src/helpers-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
isGrpcServiceError,
WorkflowFailedError,
WorkflowHandle,
WorkflowHandleWithFirstExecutionRunId,
WorkflowStartOptions,
WorkflowUpdateFailedError,
} from '@temporalio/client';
Expand All @@ -20,6 +21,7 @@ import {
Runtime,
WorkerOptions,
WorkflowBundle,
WorkflowBundleWithSourceMap,
bundleWorkflowCode,
makeTelemetryFilterString,
} from '@temporalio/worker';
Expand Down Expand Up @@ -50,87 +52,131 @@ const defaultDynamicConfigOptions = [
'worker.removableBuildIdDurationSinceDefault=1',
];

export function makeTestFunction(opts: {
function setupRuntime(recordedLogs?: { [workflowId: string]: LogEntry[] }) {
const logger = recordedLogs
? new DefaultLogger('DEBUG', (entry) => {
const workflowId = (entry.meta as any)?.workflowInfo?.workflowId ?? (entry.meta as any)?.workflowId;
recordedLogs![workflowId] ??= [];
recordedLogs![workflowId].push(entry);
})
: new DefaultLogger((process.env.TEST_LOG_LEVEL || 'DEBUG').toUpperCase() as LogLevel);
Runtime.install({
logger,
telemetryOptions: {
logging: {
filter: makeTelemetryFilterString({
core: (process.env.TEST_LOG_LEVEL || 'INFO').toUpperCase() as LogLevel,
}),
},
},
});
}

export interface HelperTestBundleOptions {
workflowsPath: string;
workflowEnvironmentOpts?: LocalTestWorkflowEnvironmentOptions;
workflowInterceptorModules?: string[];
}

export async function createTestWorkflowBundle({
workflowsPath,
workflowInterceptorModules,
}: HelperTestBundleOptions): Promise<WorkflowBundleWithSourceMap> {
return await bundleWorkflowCode({
...bundlerOptions,
workflowInterceptorModules: [...defaultWorkflowInterceptorModules, ...(workflowInterceptorModules ?? [])],
workflowsPath,
logger: new DefaultLogger('WARN'),
});
}

export async function createLocalTestEnvironment(
opts?: LocalTestWorkflowEnvironmentOptions
): Promise<TestWorkflowEnvironment> {
return await TestWorkflowEnvironment.createLocal({
...(opts || {}), // Use provided options or default to an empty object
server: {
...(opts?.server || {}), // Use provided server options or default to an empty object
extraArgs: [
...defaultDynamicConfigOptions.flatMap((opt) => ['--dynamic-config-value', opt]),
...(opts?.server?.extraArgs ?? []),
],
},
});
}

export function makeConfigurableEnvironmentTestFn<T>(opts: {
recordedLogs?: { [workflowId: string]: LogEntry[] };
}): TestFn<Context> {
const test = anyTest as TestFn<Context>;
createTestContext: (t: ExecutionContext) => Promise<T>;
teardown: (t: T) => Promise<void>;
}): TestFn<T> {
const test = anyTest as TestFn<T>;
test.before(async (t) => {
const workflowBundle = await bundleWorkflowCode({
...bundlerOptions,
workflowInterceptorModules: [...defaultWorkflowInterceptorModules, ...(opts.workflowInterceptorModules ?? [])],
workflowsPath: opts.workflowsPath,
logger: new DefaultLogger('WARN'),
});
const logger = opts.recordedLogs
? new DefaultLogger('DEBUG', (entry) => {
const workflowId = (entry.meta as any)?.workflowInfo?.workflowId ?? (entry.meta as any)?.workflowId;
opts.recordedLogs![workflowId] ??= [];
opts.recordedLogs![workflowId].push(entry);
})
: new DefaultLogger((process.env.TEST_LOG_LEVEL || 'DEBUG').toUpperCase() as LogLevel);
Runtime.install({
logger,
telemetryOptions: {
logging: {
filter: makeTelemetryFilterString({
core: (process.env.TEST_LOG_LEVEL || 'INFO').toUpperCase() as LogLevel,
}),
},
},
});
const env = await TestWorkflowEnvironment.createLocal({
...opts.workflowEnvironmentOpts,
server: {
...opts.workflowEnvironmentOpts?.server,
extraArgs: [
...defaultDynamicConfigOptions.flatMap((opt) => ['--dynamic-config-value', opt]),
...(opts.workflowEnvironmentOpts?.server?.extraArgs ?? []),
],
},
});
await registerDefaultCustomSearchAttributes(env.connection);
t.context = {
env,
workflowBundle,
};
setupRuntime(opts.recordedLogs);
t.context = await opts.createTestContext(t);
});
test.after.always(async (t) => {
await t.context.env.teardown();
await opts.teardown(t.context);
});
return test;
}

export function makeTestFunction(opts: {
workflowsPath: string;
workflowEnvironmentOpts?: LocalTestWorkflowEnvironmentOptions;
workflowInterceptorModules?: string[];
recordedLogs?: { [workflowId: string]: LogEntry[] };
}): TestFn<Context> {
return makeConfigurableEnvironmentTestFn<Context>({
recordedLogs: opts.recordedLogs,
createTestContext: async (_t: ExecutionContext): Promise<Context> => {
const env = await createLocalTestEnvironment(opts.workflowEnvironmentOpts);
await registerDefaultCustomSearchAttributes(env.connection);
return {
workflowBundle: await createTestWorkflowBundle({
workflowsPath: opts.workflowsPath,
workflowInterceptorModules: opts.workflowInterceptorModules,
}),
env,
};
},
teardown: async (c: Context) => {
await c.env.teardown();
},
});
}

export interface Helpers {
taskQueue: string;
createWorker(opts?: Partial<WorkerOptions>): Promise<Worker>;
runReplayHistory(opts: Partial<ReplayWorkerOptions>, history: temporal.api.history.v1.IHistory): Promise<void>;
executeWorkflow<T extends () => Promise<any>>(workflowType: T): Promise<workflow.WorkflowResultType<T>>;
executeWorkflow<T extends workflow.Workflow>(
fn: T,
opts: Omit<WorkflowStartOptions<T>, 'taskQueue' | 'workflowId'>
opts: Omit<WorkflowStartOptions, 'taskQueue' | 'workflowId'> & Partial<Pick<WorkflowStartOptions, 'workflowId'>>
): Promise<workflow.WorkflowResultType<T>>;
startWorkflow<T extends () => Promise<any>>(workflowType: T): Promise<WorkflowHandle<T>>;
startWorkflow<T extends () => Promise<any>>(workflowType: T): Promise<WorkflowHandleWithFirstExecutionRunId<T>>;
startWorkflow<T extends workflow.Workflow>(
fn: T,
opts: Omit<WorkflowStartOptions<T>, 'taskQueue' | 'workflowId'>
): Promise<WorkflowHandle<T>>;
opts: Omit<WorkflowStartOptions, 'taskQueue' | 'workflowId'> & Partial<Pick<WorkflowStartOptions, 'workflowId'>>
): Promise<WorkflowHandleWithFirstExecutionRunId<T>>;
assertWorkflowUpdateFailed(p: Promise<any>, causeConstructor: ErrorConstructor, message?: string): Promise<void>;
assertWorkflowFailedError(p: Promise<any>, causeConstructor: ErrorConstructor, message?: string): Promise<void>;
updateHasBeenAdmitted(handle: WorkflowHandle<workflow.Workflow>, updateId: string): Promise<boolean>;
}

export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvironment = t.context.env): Helpers {
export function configurableHelpers<T>(
t: ExecutionContext<T>,
workflowBundle: WorkflowBundle,
testEnv: TestWorkflowEnvironment
): Helpers {
const taskQueue = t.title.replace(/ /g, '_');

return {
taskQueue,
async createWorker(opts?: Partial<WorkerOptions>): Promise<Worker> {
return await Worker.create({
connection: testEnv.nativeConnection,
workflowBundle: t.context.workflowBundle,
workflowBundle,
taskQueue,
interceptors: {
activity: [() => ({ inbound: new ConnectionInjectorInterceptor(testEnv.connection) })],
Expand All @@ -145,15 +191,15 @@ export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvir
): Promise<void> {
await Worker.runReplayHistory(
{
workflowBundle: t.context.workflowBundle,
workflowBundle,
...opts,
},
history
);
},
async executeWorkflow(
fn: workflow.Workflow,
opts?: Omit<WorkflowStartOptions, 'taskQueue' | 'workflowId'>
opts?: Omit<WorkflowStartOptions, 'taskQueue' | 'workflowId'> & Partial<Pick<WorkflowStartOptions, 'workflowId'>>
): Promise<any> {
return await testEnv.client.workflow.execute(fn, {
taskQueue,
Expand All @@ -163,8 +209,8 @@ export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvir
},
async startWorkflow(
fn: workflow.Workflow,
opts?: Omit<WorkflowStartOptions, 'taskQueue' | 'workflowId'>
): Promise<WorkflowHandle<workflow.Workflow>> {
opts?: Omit<WorkflowStartOptions, 'taskQueue' | 'workflowId'> & Partial<Pick<WorkflowStartOptions, 'workflowId'>>
): Promise<WorkflowHandleWithFirstExecutionRunId<workflow.Workflow>> {
return await testEnv.client.workflow.start(fn, {
taskQueue,
workflowId: randomUUID(),
Expand Down Expand Up @@ -216,3 +262,7 @@ export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvir
},
};
}

export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvironment = t.context.env): Helpers {
return configurableHelpers(t, t.context.workflowBundle, testEnv);
}

0 comments on commit 2eb587b

Please sign in to comment.