From d6c0a271412707e8df3419a2c720545785c111af Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Thu, 19 Dec 2024 14:44:04 +0100 Subject: [PATCH 1/3] Simplify command state update methods Signed-off-by: Didier Wenzek --- .../src/operation_workflows/actor.rs | 2 +- .../src/operation_workflows/persist.rs | 2 +- crates/core/tedge_api/src/workflow/state.rs | 32 +++++++++++++++---- .../core/tedge_api/src/workflow/supervisor.rs | 28 +++++++--------- 4 files changed, 40 insertions(+), 24 deletions(-) diff --git a/crates/core/tedge_agent/src/operation_workflows/actor.rs b/crates/core/tedge_agent/src/operation_workflows/actor.rs index 5623d0820e..9dbbe50a1f 100644 --- a/crates/core/tedge_agent/src/operation_workflows/actor.rs +++ b/crates/core/tedge_agent/src/operation_workflows/actor.rs @@ -159,7 +159,7 @@ impl WorkflowActor { Ok(Some(new_state)) => { self.persist_command_board().await?; if new_state.is_init() { - self.process_command_update(new_state.set_log_path(&log_file.path)) + self.process_command_update(new_state.with_log_path(&log_file.path)) .await?; } } diff --git a/crates/core/tedge_agent/src/operation_workflows/persist.rs b/crates/core/tedge_agent/src/operation_workflows/persist.rs index 96eb153214..12ebb3b3ab 100644 --- a/crates/core/tedge_agent/src/operation_workflows/persist.rs +++ b/crates/core/tedge_agent/src/operation_workflows/persist.rs @@ -359,7 +359,7 @@ impl WorkflowRepository { if let Some(current_version) = self.workflows.use_current_version(&operation) { self.persist_workflow_definition(&operation, ¤t_version) .await; - *command = command.clone().set_workflow_version(¤t_version); + command.set_workflow_version(¤t_version); } } } diff --git a/crates/core/tedge_api/src/workflow/state.rs b/crates/core/tedge_api/src/workflow/state.rs index 7aef332d23..b971148a56 100644 --- a/crates/core/tedge_api/src/workflow/state.rs +++ b/crates/core/tedge_api/src/workflow/state.rs @@ -186,8 +186,18 @@ impl GenericCommandState { } } - pub fn update_with_key_value(self, key: &str, val: &str) -> Self { - self.update_with_json(json!({ key: val })) + pub fn with_key_value(mut self, key: &str, val: &str) -> Self { + self.set_key_value(key, val); + self + } + + fn set_key_value(&mut self, key: &str, val: &str) { + if let Some(o) = self.payload.as_object_mut() { + o.insert(key.to_string(), val.into()); + } + if key == STATUS { + self.status = val.to_string(); + } } pub fn get_log_path(&self) -> Option { @@ -197,8 +207,13 @@ impl GenericCommandState { .map(Utf8PathBuf::from) } - pub fn set_log_path>(self, path: P) -> Self { - self.update_with_key_value(OP_LOG_PATH_KEY, path.as_ref().as_str()) + pub fn with_log_path>(mut self, path: P) -> Self { + self.set_log_path(path); + self + } + + pub fn set_log_path>(&mut self, path: P) { + self.set_key_value(OP_LOG_PATH_KEY, path.as_ref().as_str()) } pub fn workflow_version(&self) -> Option { @@ -208,8 +223,13 @@ impl GenericCommandState { .map(|str| str.to_string()) } - pub fn set_workflow_version(self, version: &str) -> Self { - self.update_with_key_value(OP_WORKFLOW_VERSION_KEY, version) + pub fn with_workflow_version(mut self, version: &str) -> Self { + self.set_workflow_version(version); + self + } + + pub fn set_workflow_version(&mut self, version: &str) { + self.set_key_value(OP_WORKFLOW_VERSION_KEY, version) } /// Update the command state with the outcome of a script diff --git a/crates/core/tedge_api/src/workflow/supervisor.rs b/crates/core/tedge_api/src/workflow/supervisor.rs index 29200b0b0c..835732561e 100644 --- a/crates/core/tedge_api/src/workflow/supervisor.rs +++ b/crates/core/tedge_api/src/workflow/supervisor.rs @@ -80,7 +80,7 @@ impl WorkflowSupervisor { self.commands = commands; self.commands .iter() - .filter_map(|(t, s)| self.resume_command(t, s)) + .filter_map(|(t, s)| self.resume_command(t, s.clone())) .collect() } @@ -136,10 +136,10 @@ impl WorkflowSupervisor { /// /// Return the current version if any. pub fn use_current_version(&mut self, operation: &OperationName) -> Option { - match self.workflows.get_mut(&operation.as_str().into()) { - Some(versions) => versions.use_current_version().cloned(), - None => None, - } + self.workflows + .get_mut(&operation.as_str().into()) + .map(WorkflowVersions::use_current_version)? + .cloned() } /// Update the state of the command board on reception of a message sent by a peer over MQTT @@ -162,9 +162,9 @@ impl WorkflowSupervisor { } else if command_state.is_init() { // This is a new command request if let Some(current_version) = workflow_versions.use_current_version() { - let command_state = command_state.set_workflow_version(current_version); - self.commands.insert(command_state.clone())?; - Ok(Some(command_state)) + let updated_state = command_state.with_workflow_version(current_version); + self.commands.insert(updated_state.clone())?; + Ok(Some(updated_state)) } else { return Err(WorkflowExecutionError::DeprecatedOperation { operation: operation.to_string(), @@ -287,21 +287,17 @@ impl WorkflowSupervisor { fn resume_command( &self, timestamp: &Timestamp, - command: &GenericCommandState, + command: GenericCommandState, ) -> Option { - let action = match self.get_action(command) { + let action = match self.get_action(&command) { Ok(action) => action, Err(err) => { - return Some( - command - .clone() - .fail_with(format!("Fail to resume on start: {err:?}")), - ); + return Some(command.fail_with(format!("Fail to resume on start: {err:?}"))); } }; let epoch = format!("{}.{}", timestamp.unix_timestamp(), timestamp.millisecond()); - let command = command.clone().update_with_key_value("resumed_at", &epoch); + let command = command.with_key_value("resumed_at", &epoch); match action { OperationAction::AwaitingAgentRestart(handlers) => { Some(command.update(handlers.on_success)) From a7b50e893f5333c4f72da9f365f21ef05ac85188 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Thu, 19 Dec 2024 15:27:29 +0100 Subject: [PATCH 2/3] Avoid returning String when callers expect &str Signed-off-by: Didier Wenzek --- .../src/operation_workflows/persist.rs | 24 +++++++------------ crates/core/tedge_api/src/workflow/mod.rs | 2 +- crates/core/tedge_api/src/workflow/state.rs | 7 +++--- .../core/tedge_api/src/workflow/supervisor.rs | 8 +++---- 4 files changed, 16 insertions(+), 25 deletions(-) diff --git a/crates/core/tedge_agent/src/operation_workflows/persist.rs b/crates/core/tedge_agent/src/operation_workflows/persist.rs index 12ebb3b3ab..1ba0d02a52 100644 --- a/crates/core/tedge_agent/src/operation_workflows/persist.rs +++ b/crates/core/tedge_agent/src/operation_workflows/persist.rs @@ -261,11 +261,7 @@ impl WorkflowRepository { /// Copy the workflow definition file to the persisted state directory, /// unless this has already been done. - async fn persist_workflow_definition( - &mut self, - operation: &OperationName, - version: &WorkflowVersion, - ) { + async fn persist_workflow_definition(&mut self, operation: &str, version: &str) { if version_is_builtin(version) { return; } @@ -279,16 +275,12 @@ impl WorkflowRepository { if let Err(err) = tokio::fs::copy(source.clone(), target.clone()).await { error!("Fail to persist a copy of {source} as {target}: {err}"); } else { - self.in_use_copies.insert(version.clone(), 1); + self.in_use_copies.insert(version.to_owned(), 1); } } } - fn workflow_copy_path( - &self, - operation: &OperationName, - version: &WorkflowVersion, - ) -> Utf8PathBuf { + fn workflow_copy_path(&self, operation: &str, version: &str) -> Utf8PathBuf { let filename = format!("{operation}-{version}"); self.state_dir.join(filename).with_extension("toml") } @@ -305,7 +297,7 @@ impl WorkflowRepository { } } - async fn release_in_use_copy(&mut self, operation: &OperationName, version: &WorkflowVersion) { + async fn release_in_use_copy(&mut self, operation: &str, version: &str) { if version_is_builtin(version) { return; } @@ -410,14 +402,14 @@ impl WorkflowRepository { operation: &OperationType, command_state: GenericCommandState, ) -> Result, WorkflowExecutionError> { + let operation_name = operation.to_string(); if command_state.is_init() { // A new command instance must use the latest on-disk version of the operation workflow - self.load_latest_version(&operation.to_string()).await; + self.load_latest_version(&operation_name).await; } else if command_state.is_finished() { // Clear the cache if this happens to be the latest instance using that version of the workflow if let Some(version) = command_state.workflow_version() { - self.release_in_use_copy(&operation.to_string(), &version) - .await; + self.release_in_use_copy(&operation_name, version).await; } } @@ -429,7 +421,7 @@ impl WorkflowRepository { Some(new_state) if new_state.is_init() => { if let Some(version) = new_state.workflow_version() { - self.persist_workflow_definition(&operation.to_string(), &version) + self.persist_workflow_definition(&operation_name, version) .await; } Ok(Some(new_state)) diff --git a/crates/core/tedge_api/src/workflow/mod.rs b/crates/core/tedge_api/src/workflow/mod.rs index f1f57c6d65..a8fb4a597b 100644 --- a/crates/core/tedge_api/src/workflow/mod.rs +++ b/crates/core/tedge_api/src/workflow/mod.rs @@ -32,7 +32,7 @@ pub type WorkflowVersion = String; const BUILT_IN: &str = "builtin"; -pub fn version_is_builtin(version: &WorkflowVersion) -> bool { +pub fn version_is_builtin(version: &str) -> bool { version == BUILT_IN } diff --git a/crates/core/tedge_api/src/workflow/state.rs b/crates/core/tedge_api/src/workflow/state.rs index b971148a56..5ecfba3d54 100644 --- a/crates/core/tedge_api/src/workflow/state.rs +++ b/crates/core/tedge_api/src/workflow/state.rs @@ -193,10 +193,10 @@ impl GenericCommandState { fn set_key_value(&mut self, key: &str, val: &str) { if let Some(o) = self.payload.as_object_mut() { - o.insert(key.to_string(), val.into()); + o.insert(key.into(), val.into()); } if key == STATUS { - self.status = val.to_string(); + self.status = val.to_owned(); } } @@ -216,11 +216,10 @@ impl GenericCommandState { self.set_key_value(OP_LOG_PATH_KEY, path.as_ref().as_str()) } - pub fn workflow_version(&self) -> Option { + pub fn workflow_version(&self) -> Option<&str> { self.payload .get(OP_WORKFLOW_VERSION_KEY) .and_then(|val| val.as_str()) - .map(|str| str.to_string()) } pub fn with_workflow_version(mut self, version: &str) -> Self { diff --git a/crates/core/tedge_api/src/workflow/supervisor.rs b/crates/core/tedge_api/src/workflow/supervisor.rs index 835732561e..3fc7265a6b 100644 --- a/crates/core/tedge_api/src/workflow/supervisor.rs +++ b/crates/core/tedge_api/src/workflow/supervisor.rs @@ -137,8 +137,8 @@ impl WorkflowSupervisor { /// Return the current version if any. pub fn use_current_version(&mut self, operation: &OperationName) -> Option { self.workflows - .get_mut(&operation.as_str().into()) - .map(WorkflowVersions::use_current_version)? + .get_mut(&operation.as_str().into())? + .use_current_version() .cloned() } @@ -192,7 +192,7 @@ impl WorkflowSupervisor { }); }; - let Some(version) = &command_state.workflow_version() else { + let Some(version) = command_state.workflow_version() else { return Err(WorkflowExecutionError::MissingVersion); }; @@ -421,7 +421,7 @@ impl WorkflowVersions { self.in_use.contains_key(BUILT_IN) } - fn get(&self, version: &WorkflowVersion) -> Result<&OperationWorkflow, WorkflowExecutionError> { + fn get(&self, version: &str) -> Result<&OperationWorkflow, WorkflowExecutionError> { self.in_use .get(version) .ok_or(WorkflowExecutionError::UnknownVersion { From b9fea1be0374d7304b779bc3844ab8e8557bdec9 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Thu, 19 Dec 2024 17:37:22 +0100 Subject: [PATCH 3/3] Use OperationType::name instead of OperationType::to_string() In order to better convey the intent. Signed-off-by: Didier Wenzek --- .../tedge_agent/src/operation_workflows/persist.rs | 2 +- crates/core/tedge_api/src/mqtt_topics.rs | 12 ++++++------ crates/core/tedge_api/src/workflow/mod.rs | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/core/tedge_agent/src/operation_workflows/persist.rs b/crates/core/tedge_agent/src/operation_workflows/persist.rs index 1ba0d02a52..a537650d24 100644 --- a/crates/core/tedge_agent/src/operation_workflows/persist.rs +++ b/crates/core/tedge_agent/src/operation_workflows/persist.rs @@ -402,7 +402,7 @@ impl WorkflowRepository { operation: &OperationType, command_state: GenericCommandState, ) -> Result, WorkflowExecutionError> { - let operation_name = operation.to_string(); + let operation_name = operation.name(); if command_state.is_init() { // A new command instance must use the latest on-disk version of the operation workflow self.load_latest_version(&operation_name).await; diff --git a/crates/core/tedge_api/src/mqtt_topics.rs b/crates/core/tedge_api/src/mqtt_topics.rs index c33ed54485..7f98e74ac3 100644 --- a/crates/core/tedge_api/src/mqtt_topics.rs +++ b/crates/core/tedge_api/src/mqtt_topics.rs @@ -737,12 +737,6 @@ impl<'a> From<&'a str> for OperationType { } } -impl From<&OperationType> for String { - fn from(value: &OperationType) -> Self { - format!("{value}") - } -} - impl Display for OperationType { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { @@ -760,6 +754,12 @@ impl Display for OperationType { } } +impl OperationType { + pub fn name(&self) -> String { + format!("{self}") + } +} + #[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)] pub enum ChannelError { #[error("Channel needs to have at least 2 segments")] diff --git a/crates/core/tedge_api/src/workflow/mod.rs b/crates/core/tedge_api/src/workflow/mod.rs index a8fb4a597b..4065670a4b 100644 --- a/crates/core/tedge_api/src/workflow/mod.rs +++ b/crates/core/tedge_api/src/workflow/mod.rs @@ -342,7 +342,7 @@ impl OperationWorkflow { self.states .get(&command_state.status) .ok_or_else(|| WorkflowExecutionError::UnknownStep { - operation: (&self.operation).into(), + operation: self.operation.name(), step: command_state.status.clone(), }) .map(|action| action.inject_state(command_state))