Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
clean up zknodes when wf completes.
Browse files Browse the repository at this point in the history
  • Loading branch information
apau authored and kishorebanala committed Aug 16, 2019
1 parent 2ecd3c6 commit 33a4ae0
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,8 @@ void completeWorkflow(Workflow wf) {
if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) {
workflowStatusListener.onWorkflowCompleted(workflow);
}

executionLockService.deleteLock(workflow.getWorkflowId());
}

public void terminateWorkflow(String workflowId, String reason) {
Expand Down Expand Up @@ -761,7 +763,8 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo
workflowStatusListener.onWorkflowTerminated(workflow);
}
} finally {
executionLockService.releaseLock(workflow.getCorrelationId());
executionLockService.releaseLock(workflow.getWorkflowId());
executionLockService.deleteLock(workflow.getWorkflowId());
}
}

Expand Down Expand Up @@ -1005,7 +1008,7 @@ public boolean decide(String workflowId) {
LOGGER.error("Error deciding workflow: {}", workflowId, e);
throw e;
} finally {
executionLockService.releaseLock(workflowId, workflow.getStatus());
executionLockService.releaseLock(workflowId);
}
return false;
}
Expand All @@ -1029,7 +1032,7 @@ List<Task> dedupAndAddTasks(Workflow workflow, List<Task> tasks) {
*/
public void pauseWorkflow(String workflowId) {
try {
executionLockService.waitForLock(workflowId);
executionLockService.acquireLock(workflowId);
WorkflowStatus status = WorkflowStatus.PAUSED;
Workflow workflow = executionDAOFacade.getWorkflowById(workflowId, false);
if (workflow.getStatus().isTerminal()) {
Expand All @@ -1051,7 +1054,7 @@ public void pauseWorkflow(String workflowId) {
*/
public void resumeWorkflow(String workflowId) {
try {
executionLockService.waitForLock(workflowId);
executionLockService.acquireLock(workflowId);
Workflow workflow = executionDAOFacade.getWorkflowById(workflowId, false);
if (!workflow.getStatus().equals(WorkflowStatus.PAUSED)) {
throw new IllegalStateException("The workflow " + workflowId + " is not PAUSED so cannot resume. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,10 @@ public void releaseLock(String lockId) {
}
}

public void releaseLock(String lockId, Workflow.WorkflowStatus status) {
public void deleteLock(String lockId) {
if (config.enableWorkflowExecutionLock()) {
Lock lock = lockProvider.get();
lock.releaseLock(lockId);
// if (status.isTerminal()) {
// lock.deleteLock(lockId);
// }
// LOGGER.debug("Thread {} deleted lockId {}.", Thread.currentThread().getId(), lockId);
lockProvider.get().deleteLock(lockId);
LOGGER.debug("Thread {} deleted lockId {}.", Thread.currentThread().getId(), lockId);
}
}
}
3 changes: 2 additions & 1 deletion server/src/main/resources/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ EC2_AVAILABILITY_ZONE=us-east-1c
workflow.archive=false

#zookeeper
zk.connection=host1.2181,host2:2181,host3:2181
#zk.connection=host1.2181,host2:2181,host3:2181
zk.connection=100.66.206.37:2181,100.66.215.217:2181,100.66.207.32:2181,100.66.194.78:2181

#enable locking during workflow execution
decider.locking.enabled=false

0 comments on commit 33a4ae0

Please sign in to comment.