From 3f508907eed44e8b13d8a19eb4754390cf3bbf99 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Wed, 27 Nov 2024 14:47:29 +0100 Subject: [PATCH] Improve custom operation workflow input Now the workflow input of a custom operation can be built from several excerpts of the json payload sent by Cumulocity. ```toml [exec.workflow] operation = "command" input.x = "${.payload.c8y_CombinedInput.inner.x}" input.y = "${.payload.c8y_CombinedInput.inner.y}" input.z = { foo = "bar" } ``` Signed-off-by: Didier Wenzek --- .../core/c8y_api/src/smartrest/operations.rs | 6 +- .../c8y_mapper_ext/src/operations/convert.rs | 2 +- crates/extensions/c8y_mapper_ext/src/tests.rs | 81 +++++++++++++++++++ 3 files changed, 85 insertions(+), 4 deletions(-) diff --git a/crates/core/c8y_api/src/smartrest/operations.rs b/crates/core/c8y_api/src/smartrest/operations.rs index c1e2eb69793..aea0b52f503 100644 --- a/crates/core/c8y_api/src/smartrest/operations.rs +++ b/crates/core/c8y_api/src/smartrest/operations.rs @@ -194,11 +194,11 @@ impl Operation { }) } - pub fn workflow_input(&self) -> Option<&str> { + pub fn workflow_input(&self) -> Option<&serde_json::Value> { self.exec().and_then(|exec| { exec.workflow .as_ref() - .and_then(|workflow| workflow.input.as_deref()) + .and_then(|workflow| workflow.input.as_ref()) }) } @@ -349,7 +349,7 @@ where #[serde(rename_all = "snake_case")] struct ExecWorkflow { operation: Option, - input: Option, + input: Option, } fn get_operations( diff --git a/crates/extensions/c8y_mapper_ext/src/operations/convert.rs b/crates/extensions/c8y_mapper_ext/src/operations/convert.rs index 85292709bd9..b3c39dfed6b 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/convert.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/convert.rs @@ -416,7 +416,7 @@ impl CumulocityConverter { })?; let payload: Value = if let Some(workflow_input) = custom_handler.workflow_input() { - let excerpt = StateExcerpt::from(Value::String(workflow_input.to_string())); + let excerpt = StateExcerpt::from(workflow_input.clone()); match excerpt.extract_value_from(&state) { Value::Object(obj) => Value::Object(obj), _ => { diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index bc6d7552998..98fb4a8c7df 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -2236,6 +2236,87 @@ async fn mapper_converts_custom_operation_for_main_device() { .await; } +#[tokio::test] +async fn mapper_converts_custom_operation_with_combined_input() { + let ttd = TempTedgeDir::new(); + ttd.dir("operations") + .dir("c8y") + .file("c8y_CombinedInput.template") + .with_raw_content( + r#"[exec] + topic = "c8y/devicecontrol/notifications" + on_fragment = "c8y_CombinedInput" + + [exec.workflow] + operation = "command" + input.x = "${.payload.c8y_CombinedInput.inner.x}" + input.y = "${.payload.c8y_CombinedInput.inner.y}" + input.z = { foo = "bar" } + "#, + ); + + let config = test_mapper_config(&ttd); + + let test_handle = spawn_c8y_mapper_actor_with_config(&ttd, config, true).await; + let TestHandle { mqtt, http, .. } = test_handle; + spawn_dummy_c8y_http_proxy(http); + + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // indicate that main device supports the operation + let capability_message = + MqttMessage::new(&Topic::new_unchecked("te/device/main///cmd/command"), "{}"); + + mqtt.send(capability_message).await.unwrap(); + + assert_received_contains_str(&mut mqtt, [("c8y/s/us", "114,c8y_CombinedInput")]).await; + + assert!(ttd + .path() + .join("operations/c8y/c8y_CombinedInput") + .is_symlink()); + + let input_message = MqttMessage::new( + &Topic::new_unchecked("c8y/devicecontrol/notifications"), + json!({ + "status":"PENDING", + "id": "1234", + "c8y_CombinedInput": { + "text": "do something", + "inner": { + "x": "x value", + "y": 42, + "z": "z unused value", + }, + }, + "externalSource":{ + "externalId":"test-device", + "type":"c8y_Serial" + } + }) + .to_string(), + ); + mqtt.send(input_message).await.expect("Send failed"); + + assert_received_includes_json( + &mut mqtt, + [( + "te/device/main///cmd/command/c8y-mapper-1234", + json!({ + "status": "init", + "x": "x value", + "y": 42, + "z": { + "foo": "bar" + } + }), + )], + ) + .await; +} + #[tokio::test] async fn mapper_converts_custom_operation_for_main_device_without_workflow_input() { let ttd = TempTedgeDir::new();