Skip to content

Commit

Permalink
Workflow-friendly concurrency primitives (#2133)
Browse files Browse the repository at this point in the history
Workflow-friendly concurrency primitives
  • Loading branch information
Quinn-With-Two-Ns authored Jul 30, 2024
1 parent bbf2de7 commit b92c97d
Show file tree
Hide file tree
Showing 8 changed files with 1,108 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ public static <E> WorkflowQueue<E> newWorkflowQueue(int capacity) {
return new WorkflowQueueImpl<>(capacity);
}

public static WorkflowLock newWorkflowLock() {
return new WorkflowLockImpl();
}

public static WorkflowSemaphore newWorkflowSemaphore(int permits) {
return new WorkflowSemaphoreImpl(permits);
}

public static <E> CompletablePromise<E> newCompletablePromise() {
return new CompletablePromiseImpl<>();
}
Expand Down Expand Up @@ -479,13 +487,13 @@ public static <R> R executeActivity(

public static void await(String reason, Supplier<Boolean> unblockCondition)
throws DestroyWorkflowThreadError {
assertNotReadOnly("await");
assertNotReadOnly(reason);
getWorkflowOutboundInterceptor().await(reason, unblockCondition);
}

public static boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition)
throws DestroyWorkflowThreadError {
assertNotReadOnly("await with timeout");
assertNotReadOnly(reason);
return getWorkflowOutboundInterceptor().await(timeout, reason, unblockCondition);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.sync;

import static io.temporal.internal.sync.WorkflowInternal.assertNotReadOnly;

import com.google.common.base.Preconditions;
import io.temporal.workflow.CancellationScope;
import io.temporal.workflow.WorkflowLock;
import java.time.Duration;

class WorkflowLockImpl implements WorkflowLock {
private boolean locked = false;

@Override
public void lock() {
WorkflowInternal.await(
"WorkflowLock.lock",
() -> {
CancellationScope.throwCanceled();
return !locked;
});
locked = true;
}

@Override
public boolean tryLock() {
assertNotReadOnly("WorkflowLock.tryLock");
if (!locked) {
locked = true;
return true;
}
return false;
}

@Override
public boolean tryLock(Duration timeout) {
boolean unlocked =
WorkflowInternal.await(
timeout,
"WorkflowLock.tryLock",
() -> {
CancellationScope.throwCanceled();
return !locked;
});
if (unlocked) {
locked = true;
return true;
}
return false;
}

@Override
public void unlock() {
assertNotReadOnly("WorkflowLock.unlock");
Preconditions.checkState(locked, "WorkflowLock.unlock called when not locked");
locked = false;
}

@Override
public boolean isHeld() {
return locked;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.sync;

import static io.temporal.internal.sync.WorkflowInternal.assertNotReadOnly;

import com.google.common.base.Preconditions;
import io.temporal.workflow.CancellationScope;
import io.temporal.workflow.WorkflowSemaphore;
import java.time.Duration;

class WorkflowSemaphoreImpl implements WorkflowSemaphore {
private int currentPermits;

public WorkflowSemaphoreImpl(int permits) {
this.currentPermits = permits;
}

@Override
public void acquire() {
acquire(1);
}

@Override
public void acquire(int permits) {
Preconditions.checkArgument(
permits >= 0, "WorkflowSemaphore.acquire called with negative permits");
WorkflowInternal.await(
"WorkflowSemaphore.acquire",
() -> {
CancellationScope.throwCanceled();
return currentPermits >= permits;
});
currentPermits -= permits;
}

@Override
public boolean tryAcquire() {
return tryAcquire(1);
}

@Override
public boolean tryAcquire(Duration timeout) {
return tryAcquire(1, timeout);
}

@Override
public boolean tryAcquire(int permits) {
assertNotReadOnly("WorkflowSemaphore.tryAcquire");
Preconditions.checkArgument(
permits >= 0, "WorkflowSemaphore.tryAcquire called with negative permits");
if (currentPermits >= permits) {
currentPermits -= permits;
return true;
}
return false;
}

@Override
public boolean tryAcquire(int permits, Duration timeout) {
Preconditions.checkArgument(
permits >= 0, "WorkflowSemaphore.tryAcquire called with negative permits");
boolean acquired =
WorkflowInternal.await(
timeout,
"WorkflowSemaphore.tryAcquire",
() -> {
CancellationScope.throwCanceled();
return currentPermits >= permits;
});
if (acquired) {
currentPermits -= permits;
}
return acquired;
}

@Override
public void release() {
release(1);
}

@Override
public void release(int permits) {
assertNotReadOnly("WorkflowSemaphore.release");
Preconditions.checkArgument(
permits >= 0, "WorkflowSemaphore.release called with negative permits");
currentPermits += permits;
}
}
21 changes: 21 additions & 0 deletions temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,27 @@ public static <E> Promise<E> newFailedPromise(Exception failure) {
return WorkflowInternal.newFailedPromise(failure);
}

/**
* Creates a {@link WorkflowLock} implementation that can be used from workflow code.
*
* @apiNote The lock returned is not reentrant. If a workflow thread tries to acquire a lock that
* it already holds, the call will block indefinitely.
* @return new instance of {@link WorkflowLock}
*/
public static WorkflowLock newWorkflowLock() {
return WorkflowInternal.newWorkflowLock();
}

/**
* Creates a {@link WorkflowSemaphore} implementation that can be used from workflow code.
*
* @param permits the given number of permits for the semaphore.
* @return new instance of {@link WorkflowSemaphore}
*/
public static WorkflowSemaphore newWorkflowSemaphore(int permits) {
return WorkflowInternal.newWorkflowSemaphore(permits);
}

/**
* Registers an implementation object. The object must implement at least one interface annotated
* with {@link WorkflowInterface}. All its methods annotated with @{@link SignalMethod}
Expand Down
67 changes: 67 additions & 0 deletions temporal-sdk/src/main/java/io/temporal/workflow/WorkflowLock.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow;

import java.time.Duration;

/**
* Workflow lock is an alternative to {@link java.util.concurrent.locks.Lock} that is deterministic
* and compatible with Temporal's concurrency model. API is designed to be used in a workflow code
* only. It is not allowed to be used in an activity code.
*
* <p>In Temporal concurrency model, only one thread in a workflow code can execute at a time.
*/
public interface WorkflowLock {
/**
* Acquires the lock.
*
* @throws io.temporal.failure.CanceledFailure if thread (or current {@link CancellationScope} was
* canceled).
*/
void lock();

/**
* Acquires the lock only if it is free at the time of invocation.
*
* @return true if the lock was acquired and false otherwise
*/
boolean tryLock();

/**
* Acquires the lock if it is free within the given waiting time.
*
* @throws io.temporal.failure.CanceledFailure if thread (or current {@link CancellationScope} was
* canceled).
* @return true if the lock was acquired and false if the waiting time elapsed before the lock was
* acquired.
*/
boolean tryLock(Duration timeout);

/** Releases the lock. */
void unlock();

/**
* Checks if a lock is held.
*
* @return true if the lock is held and false otherwise.
*/
boolean isHeld();
}
Loading

0 comments on commit b92c97d

Please sign in to comment.