diff --git a/src/test/java/com/uber/cadence/workflow/ManualActivityCompletionWorkflowTest.java b/src/test/java/com/uber/cadence/workflow/ManualActivityCompletionWorkflowTest.java index 9eb15b5cf..19c32e3e8 100644 --- a/src/test/java/com/uber/cadence/workflow/ManualActivityCompletionWorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/ManualActivityCompletionWorkflowTest.java @@ -17,7 +17,6 @@ package com.uber.cadence.workflow; -import com.google.common.base.Preconditions; import com.uber.cadence.WorkflowExecution; import com.uber.cadence.activity.Activity; import com.uber.cadence.activity.ActivityMethod; @@ -48,6 +47,9 @@ public interface ManualCompletionActivities { @ActivityMethod String asyncActivity(); + @ActivityMethod + void reset(); + @ActivityMethod void completeAsyncActivity(String result); @@ -73,48 +75,61 @@ private class ManualCompletionActivitiesImpl implements ManualCompletionActiviti @Override public synchronized String asyncActivity() { openTask = Activity.getTask(); + notifyAll(); Activity.doNotCompleteOnReturn(); return null; } + @Override + public synchronized void reset() { + openTask = null; + } + @Override public synchronized void completeAsyncActivity(String details) { - Preconditions.checkState(openTask != null); - getClient().complete(openTask.getTaskToken(), details); + getClient().complete(getOpenTask().getTaskToken(), details); } @Override public synchronized void completeAsyncActivityById(String details) { - Preconditions.checkState(openTask != null); - getClient().complete(getCurrentWorkflow(), openTask.getActivityId(), details); + getClient().complete(getCurrentWorkflow(), getOpenTask().getActivityId(), details); } @Override public synchronized void failAsyncActivity(String details) { - Preconditions.checkState(openTask != null); getClient() - .completeExceptionally(openTask.getTaskToken(), new ExceptionWithDetaills(details)); + .completeExceptionally(getOpenTask().getTaskToken(), new ExceptionWithDetaills(details)); } @Override public synchronized void failAsyncActivityById(String details) { - Preconditions.checkState(openTask != null); getClient() .completeExceptionally( - getCurrentWorkflow(), openTask.getActivityId(), new ExceptionWithDetaills(details)); + getCurrentWorkflow(), + getOpenTask().getActivityId(), + new ExceptionWithDetaills(details)); } @Override public synchronized void cancelAsyncActivity(String details) { - Preconditions.checkState(openTask != null); - getClient().reportCancellation(openTask.getTaskToken(), details); + getClient().reportCancellation(getOpenTask().getTaskToken(), details); } @Override public synchronized void cancelAsyncActivityById(String details) { - Preconditions.checkState(openTask != null); - getClient().reportCancellation(getCurrentWorkflow(), openTask.getActivityId(), details); + getClient().reportCancellation(getCurrentWorkflow(), getOpenTask().getActivityId(), details); + } + + private synchronized ActivityTask getOpenTask() { + while (openTask == null) { + try { + wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return openTask; } private WorkflowExecution getCurrentWorkflow() { @@ -146,21 +161,29 @@ public void run() { expectSuccess("1", result); expectFailure(() -> activities.completeAsyncActivity("again")); + activities.reset(); + result = Async.function(activities::asyncActivity); activities.completeAsyncActivityById("2"); expectSuccess("2", result); expectFailure(() -> activities.completeAsyncActivityById("again")); + activities.reset(); + result = Async.function(activities::asyncActivity); activities.failAsyncActivity("3"); expectFailureWithDetails(result, "3"); expectFailure(() -> activities.failAsyncActivity("again")); + activities.reset(); + result = Async.function(activities::asyncActivity); activities.failAsyncActivityById("4"); expectFailureWithDetails(result, "4"); expectFailure(() -> activities.failAsyncActivityById("again")); + activities.reset(); + // Need to request cancellation, then the activity can respond with the cancel CompletablePromise completablePromise = Workflow.newPromise(); CancellationScope scope = @@ -178,6 +201,8 @@ public void run() { activities.cancelAsyncActivity("5"); expectCancelled(result); + activities.reset(); + // Need to request cancellation, then the activity can respond with the cancel CompletablePromise completablePromise2 = Workflow.newPromise(); scope =