From ddc1bedbba57abe5f1e806f07eebaac0da6daafe Mon Sep 17 00:00:00 2001 From: Aravindan Ramkumar <1028385+aravindanr@users.noreply.github.com> Date: Tue, 5 Jul 2022 14:32:08 -0700 Subject: [PATCH] Added metrics for ExecutionLockService, DeciderService and WorkflowExecutor#decide --- .../core/execution/DeciderService.java | 2 ++ .../core/execution/WorkflowExecutor.java | 27 ++++++++++++++----- .../netflix/conductor/metrics/Monitors.java | 4 +++ .../service/ExecutionLockService.java | 2 ++ 4 files changed, 28 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java index 18bda39189..7f5cb790fb 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java @@ -24,6 +24,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import com.netflix.conductor.annotations.Trace; import com.netflix.conductor.annotations.VisibleForTesting; import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.common.metadata.tasks.TaskType; @@ -53,6 +54,7 @@ * workflow or do nothing. */ @Service +@Trace public class DeciderService { private static final Logger LOGGER = LoggerFactory.getLogger(DeciderService.class); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index d8aeb764bc..63b8dafc6f 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -17,6 +17,7 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -416,7 +417,7 @@ public String startWorkflow( try { createWorkflow(workflow); // then decide to see if anything needs to be done as part of the workflow - decide(workflowId); + _decide(workflowId); Monitors.recordWorkflowStartSuccess( workflow.getWorkflowName(), String.valueOf(workflow.getWorkflowVersion()), @@ -611,7 +612,7 @@ public void restart(String workflowId, boolean useLatestDefinitions) { throw e; } - decide(workflowId); + _decide(workflowId); updateAndPushParents(workflow, "restarted"); } @@ -1240,7 +1241,7 @@ public void updateTask(TaskResult taskResult) { task.getTaskDefName(), lastDuration, false, task.getStatus()); } - decide(workflowId); + _decide(workflowId); } public TaskModel getTask(String taskId) { @@ -1270,6 +1271,18 @@ public List getRunningWorkflowIds(String workflowName, int version) { return executionDAOFacade.getRunningWorkflowIds(workflowName, version); } + /** Records a metric for the "decide" process. */ + private boolean _decide(String workflowId) { + StopWatch watch = new StopWatch(); + watch.start(); + try { + return decide(workflowId); + } finally { + watch.stop(); + Monitors.recordWorkflowDecisionTime(watch.getTime()); + } + } + /** * @param workflowId ID of the workflow to evaluate the state for * @return true if the workflow has completed (success or failed), false otherwise. @@ -1521,7 +1534,7 @@ public void resumeWorkflow(String workflowId) { workflow.getPriority(), properties.getWorkflowOffsetTimeout().getSeconds()); executionDAOFacade.updateWorkflow(workflow); - decide(workflowId); + _decide(workflowId); } /** @@ -1588,7 +1601,7 @@ public void skipTaskFromWorkflow( taskToBeSkipped.setOutputMessage(skipTaskRequest.getTaskOutputMessage()); } executionDAOFacade.createTasks(Collections.singletonList(taskToBeSkipped)); - decide(workflowId); + _decide(workflowId); } public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) { @@ -1859,7 +1872,7 @@ private boolean rerunWF( properties.getWorkflowOffsetTimeout().getSeconds()); executionDAOFacade.updateWorkflow(workflow); - decide(workflowId); + _decide(workflowId); return true; } @@ -1945,7 +1958,7 @@ private boolean rerunWF( } executionDAOFacade.updateTask(rerunFromTask); - decide(workflowId); + _decide(workflowId); return true; } return false; diff --git a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java index 5dc6d0fc75..f9b48e9346 100644 --- a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java +++ b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java @@ -187,6 +187,10 @@ public static void recordTaskExecutionTime( .record(duration, TimeUnit.MILLISECONDS); } + public static void recordWorkflowDecisionTime(long duration) { + getTimer(classQualifier, "workflow_decision").record(duration, TimeUnit.MILLISECONDS); + } + public static void recordTaskPollError(String taskType, String exception) { recordTaskPollError(taskType, NO_DOMAIN, exception); } diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionLockService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionLockService.java index e355dc336f..c45bba263a 100644 --- a/core/src/main/java/com/netflix/conductor/service/ExecutionLockService.java +++ b/core/src/main/java/com/netflix/conductor/service/ExecutionLockService.java @@ -19,11 +19,13 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import com.netflix.conductor.annotations.Trace; import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.sync.Lock; import com.netflix.conductor.metrics.Monitors; @Service +@Trace public class ExecutionLockService { private static final Logger LOGGER = LoggerFactory.getLogger(ExecutionLockService.class);