Skip to content

Commit

Permalink
Search Attributes maintenance:
Browse files Browse the repository at this point in the history
- Add Upsert Typed Search Attributes call into the outbound calls interceptor
- Move Upsert SA calls logic into Workflow context
  • Loading branch information
roxblnfk committed Feb 6, 2025
1 parent 616d1d1 commit e8cb363
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 47 deletions.
13 changes: 13 additions & 0 deletions src/Interceptor/Trait/WorkflowOutboundCallsInterceptorTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
use Temporal\Interceptor\WorkflowOutboundCalls\SignalExternalWorkflowInput;
use Temporal\Interceptor\WorkflowOutboundCalls\TimerInput;
use Temporal\Interceptor\WorkflowOutboundCalls\UpsertSearchAttributesInput;
use Temporal\Interceptor\WorkflowOutboundCalls\UpsertTypedSearchAttributesInput;
use Temporal\Interceptor\WorkflowOutboundCallsInterceptor;

/**
Expand Down Expand Up @@ -157,6 +158,18 @@ public function upsertSearchAttributes(UpsertSearchAttributesInput $input, calla
return $next($input);
}

/**
* Default implementation of the `upsertTypedSearchAttributes` method.
*
* @see WorkflowOutboundCallsInterceptor::upsertTypedSearchAttributes()
*/
public function upsertTypedSearchAttributes(
UpsertTypedSearchAttributesInput $input,
callable $next,
): PromiseInterface {
return $next($input);
}

/**
* Default implementation of the `await` method.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
final class UpsertSearchAttributesInput
{
/**
* @param array<non-empty-string, mixed> $searchAttributes
*
* @no-named-arguments
* @internal Don't use the constructor. Use {@see self::with()} instead.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

/**
* This file is part of Temporal package.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Temporal\Interceptor\WorkflowOutboundCalls;

use Temporal\Common\SearchAttributes\SearchAttributeUpdate;

/**
* @psalm-immutable
*/
final class UpsertTypedSearchAttributesInput
{
/**
* @param array<SearchAttributeUpdate> $updates
*
* @no-named-arguments
* @internal Don't use the constructor. Use {@see self::with()} instead.
*/
public function __construct(
public readonly array $updates,
) {}

/**
* @param array<SearchAttributeUpdate>|null $updates
*/
public function with(
?array $updates = null,
): self {
return new self(
$updates ?? $this->updates,
);
}
}
14 changes: 10 additions & 4 deletions src/Interceptor/WorkflowOutboundCallsInterceptor.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
use Temporal\Interceptor\WorkflowOutboundCalls\SignalExternalWorkflowInput;
use Temporal\Interceptor\WorkflowOutboundCalls\TimerInput;
use Temporal\Interceptor\WorkflowOutboundCalls\UpsertSearchAttributesInput;
use Temporal\Interceptor\WorkflowOutboundCalls\UpsertTypedSearchAttributesInput;
use Temporal\Internal\Interceptor\Interceptor;

/**
Expand Down Expand Up @@ -56,10 +57,7 @@ interface WorkflowOutboundCallsInterceptor extends Interceptor
/**
* @param callable(ExecuteActivityInput): PromiseInterface $next
*/
public function executeActivity(
ExecuteActivityInput $input,
callable $next,
): PromiseInterface;
public function executeActivity(ExecuteActivityInput $input, callable $next): PromiseInterface;

/**
* @param callable(ExecuteLocalActivityInput): PromiseInterface $next
Expand Down Expand Up @@ -120,6 +118,14 @@ public function getVersion(GetVersionInput $input, callable $next): PromiseInter
*/
public function upsertSearchAttributes(UpsertSearchAttributesInput $input, callable $next): PromiseInterface;

/**
* @param callable(UpsertTypedSearchAttributesInput): PromiseInterface $next
*/
public function upsertTypedSearchAttributes(
UpsertTypedSearchAttributesInput $input,
callable $next,
): PromiseInterface;

/**
* @param callable(AwaitInput): PromiseInterface $next
*/
Expand Down
39 changes: 1 addition & 38 deletions src/Internal/Workflow/ScopeContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Temporal\Common\SearchAttributes\SearchAttributeUpdate;
use Temporal\Exception\Failure\CanceledFailure;
use Temporal\Internal\Transport\CompletableResult;
use Temporal\Internal\Transport\Request\UpsertMemo;
use Temporal\Internal\Transport\Request\UpsertTypedSearchAttributes;
use Temporal\Internal\Workflow\Process\Scope;
use Temporal\Promise;
Expand Down Expand Up @@ -114,44 +115,6 @@ public function rejectConditionGroup(string $conditionGroupId): void
$this->parent->rejectConditionGroup($conditionGroupId);
}

public function upsertSearchAttributes(array $searchAttributes): void
{
$this->request(new UpsertSearchAttributes($searchAttributes), waitResponse: false);

/** @psalm-suppress UnsupportedPropertyReferenceUsage $sa */
$sa = &$this->input->info->searchAttributes;
foreach ($searchAttributes as $name => $value) {
if ($value === null) {
unset($sa[$name]);
continue;
}

$sa[$name] = $value;
}
}

public function upsertTypedSearchAttributes(SearchAttributeUpdate ...$updates): void
{
$this->request(new UpsertTypedSearchAttributes($updates), waitResponse: false);

// Merge changes
$tsa = $this->input->info->typedSearchAttributes;
foreach ($updates as $update) {
if ($update instanceof SearchAttributeUpdate\ValueUnset) {
$tsa = $tsa->withoutValue($update->name);
continue;
}

\assert($update instanceof SearchAttributeUpdate\ValueSet);
$tsa = $tsa->withValue(
SearchAttributeKey::for($update->type, $update->name),
$update->value,
);
}

$this->input->info->typedSearchAttributes = $tsa;
}

#[\Override]
public function destroy(): void
{
Expand Down
52 changes: 50 additions & 2 deletions src/Internal/Workflow/WorkflowContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
use Temporal\Activity\ActivityOptions;
use Temporal\Activity\ActivityOptionsInterface;
use Temporal\Activity\LocalActivityOptions;
use Temporal\Common\SearchAttributes\SearchAttributeKey;
use Temporal\Common\SearchAttributes\SearchAttributeUpdate;
use Temporal\Common\Uuid;
use Temporal\DataConverter\EncodedValues;
use Temporal\DataConverter\Type;
Expand All @@ -34,6 +36,7 @@
use Temporal\Interceptor\WorkflowOutboundCalls\SideEffectInput;
use Temporal\Interceptor\WorkflowOutboundCalls\TimerInput;
use Temporal\Interceptor\WorkflowOutboundCalls\UpsertSearchAttributesInput;
use Temporal\Interceptor\WorkflowOutboundCalls\UpsertTypedSearchAttributesInput;
use Temporal\Interceptor\WorkflowOutboundCallsInterceptor;
use Temporal\Interceptor\WorkflowOutboundRequestInterceptor;
use Temporal\Internal\Declaration\Destroyable;
Expand All @@ -53,6 +56,7 @@
use Temporal\Internal\Transport\Request\Panic;
use Temporal\Internal\Transport\Request\SideEffect;
use Temporal\Internal\Transport\Request\UpsertSearchAttributes;
use Temporal\Internal\Transport\Request\UpsertTypedSearchAttributes;
use Temporal\Internal\Workflow\Process\HandlerState;
use Temporal\Promise;
use Temporal\Worker\Transport\Command\RequestInterface;
Expand Down Expand Up @@ -447,13 +451,57 @@ public function allHandlersFinished(): bool
public function upsertSearchAttributes(array $searchAttributes): void
{
$this->callsInterceptor->with(
fn(UpsertSearchAttributesInput $input): PromiseInterface
=> $this->request(new UpsertSearchAttributes($input->searchAttributes)),
function (UpsertSearchAttributesInput $input): PromiseInterface {
$result = $this->request(new UpsertSearchAttributes($input->searchAttributes), false);

/** @psalm-suppress UnsupportedPropertyReferenceUsage $sa */
$sa = &$this->input->info->searchAttributes;
foreach ($input->searchAttributes as $name => $value) {
if ($value === null) {
unset($sa[$name]);
continue;
}

$sa[$name] = $value;
}

return $result;
},
/** @see WorkflowOutboundCallsInterceptor::upsertSearchAttributes() */
'upsertSearchAttributes',
)(new UpsertSearchAttributesInput($searchAttributes));
}

public function upsertTypedSearchAttributes(SearchAttributeUpdate ...$updates): void
{
$this->callsInterceptor->with(
function (UpsertTypedSearchAttributesInput $input): PromiseInterface {
$result = $this->request(new UpsertTypedSearchAttributes($input->updates), false);

// Merge changes
$tsa = $this->input->info->typedSearchAttributes;
foreach ($input->updates as $update) {
if ($update instanceof SearchAttributeUpdate\ValueUnset) {
$tsa = $tsa->withoutValue($update->name);
continue;
}

if ($update instanceof SearchAttributeUpdate\ValueSet) {
$tsa = $tsa->withValue(
SearchAttributeKey::for($update->type, $update->name),
$update->value,
);
}
}

$this->input->info->typedSearchAttributes = $tsa;

Check failure on line 497 in src/Internal/Workflow/WorkflowContext.php

View workflow job for this annotation

GitHub Actions / Psalm Validation (PHP 8.3, OS ubuntu-latest)

InaccessibleProperty

src/Internal/Workflow/WorkflowContext.php:497:17: InaccessibleProperty: Temporal\Workflow\WorkflowInfo::$typedSearchAttributes is marked readonly (see https://psalm.dev/054)

Check failure on line 497 in src/Internal/Workflow/WorkflowContext.php

View workflow job for this annotation

GitHub Actions / Psalm Validation (PHP 8.3, OS ubuntu-latest)

InaccessibleProperty

src/Internal/Workflow/WorkflowContext.php:497:17: InaccessibleProperty: Temporal\Workflow\WorkflowInfo::$typedSearchAttributes is marked readonly (see https://psalm.dev/054)
return $result;
},
/** @see WorkflowOutboundCallsInterceptor::upsertTypedSearchAttributes() */
'upsertSearchAttributes',
)(new UpsertTypedSearchAttributesInput($updates));
}

public function await(callable|Mutex|PromiseInterface ...$conditions): PromiseInterface
{
return $this->callsInterceptor->with(
Expand Down
2 changes: 1 addition & 1 deletion src/Workflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ public static function allHandlersFinished(): bool
/**
* Upsert search attributes
*
* @param array<string, mixed> $searchAttributes
* @param array<non-empty-string, mixed> $searchAttributes
*/
public static function upsertSearchAttributes(array $searchAttributes): void
{
Expand Down
21 changes: 19 additions & 2 deletions src/Workflow/WorkflowContextInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use React\Promise\PromiseInterface;
use Temporal\Activity\ActivityOptions;
use Temporal\Activity\ActivityOptionsInterface;
use Temporal\Common\SearchAttributes\SearchAttributeUpdate;
use Temporal\DataConverter\Type;
use Temporal\DataConverter\ValuesInterface;
use Temporal\Internal\Support\DateInterval;
Expand Down Expand Up @@ -285,18 +286,34 @@ public function getStackTrace(): string;
* interruption of in-progress handlers by workflow exit:
*
* ```php
* yield Workflow.await(static fn() => Workflow::allHandlersFinished());
* yield Workflow.await(static fn() => Workflow::allHandlersFinished());
* ```
*
* @return bool True if all handlers have finished executing.
*/
public function allHandlersFinished(): bool;

/**
* @param array<string, mixed> $searchAttributes
* Upsert search attributes
*
* @param array<non-empty-string, mixed> $searchAttributes
*/
public function upsertSearchAttributes(array $searchAttributes): void;

/**
* Upsert typed Search Attributes
*
* ```php
* Workflow::upsertTypedSearchAttributes(
* SearchAttributeKey::forKeyword('CustomKeyword')->valueSet('CustomValue'),
* SearchAttributeKey::forInt('MyCounter')->valueSet(42),
* );
* ```
*
* @link https://docs.temporal.io/visibility#search-attribute
*/
public function upsertTypedSearchAttributes(SearchAttributeUpdate ...$updates): void;

/**
* Generate a UUID.
*
Expand Down

0 comments on commit e8cb363

Please sign in to comment.