Skip to content

Commit

Permalink
feat(client): add client.workflow.count high level API (#1573)
Browse files Browse the repository at this point in the history
  • Loading branch information
THardy98 authored Dec 9, 2024
1 parent de09f6c commit 53233c9
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 6 deletions.
23 changes: 22 additions & 1 deletion packages/client/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ import { Replace } from '@temporalio/common/lib/type-helpers';
import { optionalTsToDate, requiredTsToDate } from '@temporalio/common/lib/time';
import { decodeMapFromPayloads } from '@temporalio/common/lib/internal-non-workflow/codec-helpers';
import { temporal, google } from '@temporalio/proto';
import { RawWorkflowExecutionInfo, WorkflowExecutionInfo, WorkflowExecutionStatusName } from './types';
import {
CountWorkflowExecution,
RawWorkflowExecutionInfo,
WorkflowExecutionInfo,
WorkflowExecutionStatusName,
} from './types';

function workflowStatusCodeToName(code: temporal.api.enums.v1.WorkflowExecutionStatus): WorkflowExecutionStatusName {
return workflowStatusCodeToNameInternal(code) ?? 'UNKNOWN';
Expand Down Expand Up @@ -81,6 +86,22 @@ export async function executionInfoFromRaw<T>(
};
}

export function decodeCountWorkflowExecutionsResponse(
raw: temporal.api.workflowservice.v1.ICountWorkflowExecutionsResponse
): CountWorkflowExecution {
return {
// Note: lossy conversion of Long to number
count: raw.count!.toNumber(),
groups: raw.groups!.map((group) => {
return {
// Note: lossy conversion of Long to number
count: group.count!.toNumber(),
groupValues: group.groupValues!.map((value) => searchAttributePayloadConverter.fromPayload(value)),
};
}),
};
}

type ErrorDetailsName = `temporal.api.errordetails.v1.${keyof typeof temporal.api.errordetails.v1}`;

/**
Expand Down
10 changes: 9 additions & 1 deletion packages/client/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type * as grpc from '@grpc/grpc-js';
import type { SearchAttributes } from '@temporalio/common';
import type { SearchAttributes, SearchAttributeValue } from '@temporalio/common';
import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow';
import * as proto from '@temporalio/proto';
import { Replace } from '@temporalio/common/lib/type-helpers';
Expand Down Expand Up @@ -52,6 +52,14 @@ export interface WorkflowExecutionInfo {
raw: RawWorkflowExecutionInfo;
}

export interface CountWorkflowExecution {
count: number;
groups: {
count: number;
groupValues: SearchAttributeValue[];
}[];
}

export type WorkflowExecutionDescription = Replace<
WorkflowExecutionInfo,
{
Expand Down
30 changes: 27 additions & 3 deletions packages/client/src/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import {
WorkflowStartUpdateOutput,
} from './interceptors';
import {
CountWorkflowExecution,
DescribeWorkflowExecutionResponse,
encodeQueryRejectCondition,
GetWorkflowExecutionHistoryRequest,
Expand All @@ -77,7 +78,7 @@ import {
WorkflowStartOptions,
WorkflowUpdateOptions,
} from './workflow-options';
import { executionInfoFromRaw, rethrowKnownErrorTypes } from './helpers';
import { decodeCountWorkflowExecutionsResponse, executionInfoFromRaw, rethrowKnownErrorTypes } from './helpers';
import {
BaseClient,
BaseClientOptions,
Expand Down Expand Up @@ -1285,9 +1286,9 @@ export class WorkflowClient extends BaseClient {
}

/**
* List workflows by given `query`.
* Return a list of Workflow Executions matching the given `query`.
*
* ⚠️ To use advanced query functionality, as of the 1.18 server release, you must use Elasticsearch based visibility.
* Note that the list of Workflow Executions returned is approximate and eventually consistent.
*
* More info on the concept of "visibility" and the query syntax on the Temporal documentation site:
* https://docs.temporal.io/visibility
Expand All @@ -1308,6 +1309,29 @@ export class WorkflowClient extends BaseClient {
};
}

/**
* Return the number of Workflow Executions matching the given `query`. If no `query` is provided, then return the
* total number of Workflow Executions for this namespace.
*
* Note that the number of Workflow Executions returned is approximate and eventually consistent.
*
* More info on the concept of "visibility" and the query syntax on the Temporal documentation site:
* https://docs.temporal.io/visibility
*/
public async count(query?: string): Promise<CountWorkflowExecution> {
let response: temporal.api.workflowservice.v1.CountWorkflowExecutionsResponse;
try {
response = await this.workflowService.countWorkflowExecutions({
namespace: this.options.namespace,
query,
});
} catch (e) {
this.rethrowGrpcError(e, 'Failed to count workflows');
}

return decodeCountWorkflowExecutionsResponse(response);
}

protected getOrMakeInterceptors(workflowId: string, runId?: string): WorkflowClientInterceptor[] {
if (typeof this.options.interceptors === 'object' && 'calls' in this.options.interceptors) {
// eslint-disable-next-line deprecation/deprecation
Expand Down
41 changes: 40 additions & 1 deletion packages/test/src/test-integration-workflows.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { randomUUID } from 'crypto';
import { ExecutionContext } from 'ava';
import { firstValueFrom, Subject } from 'rxjs';
import { WorkflowFailedError } from '@temporalio/client';
import { CountWorkflowExecution, WorkflowFailedError } from '@temporalio/client';
import * as activity from '@temporalio/activity';
import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
import { TestWorkflowEnvironment } from '@temporalio/testing';
Expand Down Expand Up @@ -1264,3 +1264,42 @@ export const interceptors: workflow.WorkflowInterceptorsFactory = () => {
}
return {};
};

export async function completableWorkflow(completes: boolean): Promise<void> {
await workflow.condition(() => completes);
}

test('Count workflow executions', async (t) => {
const { taskQueue, createWorker, executeWorkflow, startWorkflow } = helpers(t);
const worker = await createWorker();
const client = t.context.env.client;

// Run 2 workflows that don't complete
// (use startWorkflow to avoid waiting for workflows to complete, which they never will)
for (let i = 0; i < 2; i++) {
await startWorkflow(completableWorkflow, { args: [false] });
}

await worker.runUntil(async () => {
// Run 3 workflows that complete.
await Promise.all([
executeWorkflow(completableWorkflow, { args: [true] }),
executeWorkflow(completableWorkflow, { args: [true] }),
executeWorkflow(completableWorkflow, { args: [true] }),
]);
});

const actualTotal = await client.workflow.count(`TaskQueue = '${taskQueue}'`);
t.deepEqual(actualTotal, { count: 5, groups: [] });

const expectedByExecutionStatus: CountWorkflowExecution = {
count: 5,
groups: [
{ count: 2, groupValues: [['Running']] },
{ count: 3, groupValues: [['Completed']] },
],
};

const actualByExecutionStatus = await client.workflow.count(`TaskQueue = '${taskQueue}' GROUP BY ExecutionStatus`);
t.deepEqual(actualByExecutionStatus, expectedByExecutionStatus);
});

0 comments on commit 53233c9

Please sign in to comment.