Skip to content

Commit

Permalink
Merge pull request #3270 from didier-wenzek/feat/improve-custom-opera…
Browse files Browse the repository at this point in the history
…tion-workflow-input

feat: Improve custom operation workflow input
  • Loading branch information
didier-wenzek authored Nov 28, 2024
2 parents eb9b8e5 + 3f50890 commit 974562d
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 4 deletions.
6 changes: 3 additions & 3 deletions crates/core/c8y_api/src/smartrest/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,11 @@ impl Operation {
})
}

pub fn workflow_input(&self) -> Option<&str> {
pub fn workflow_input(&self) -> Option<&serde_json::Value> {
self.exec().and_then(|exec| {
exec.workflow
.as_ref()
.and_then(|workflow| workflow.input.as_deref())
.and_then(|workflow| workflow.input.as_ref())
})
}

Expand Down Expand Up @@ -349,7 +349,7 @@ where
#[serde(rename_all = "snake_case")]
struct ExecWorkflow {
operation: Option<String>,
input: Option<String>,
input: Option<serde_json::Value>,
}

fn get_operations(
Expand Down
2 changes: 1 addition & 1 deletion crates/extensions/c8y_mapper_ext/src/operations/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ impl CumulocityConverter {
})?;

let payload: Value = if let Some(workflow_input) = custom_handler.workflow_input() {
let excerpt = StateExcerpt::from(Value::String(workflow_input.to_string()));
let excerpt = StateExcerpt::from(workflow_input.clone());
match excerpt.extract_value_from(&state) {
Value::Object(obj) => Value::Object(obj),
_ => {
Expand Down
81 changes: 81 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2236,6 +2236,87 @@ async fn mapper_converts_custom_operation_for_main_device() {
.await;
}

#[tokio::test]
async fn mapper_converts_custom_operation_with_combined_input() {
let ttd = TempTedgeDir::new();
ttd.dir("operations")
.dir("c8y")
.file("c8y_CombinedInput.template")
.with_raw_content(
r#"[exec]
topic = "c8y/devicecontrol/notifications"
on_fragment = "c8y_CombinedInput"
[exec.workflow]
operation = "command"
input.x = "${.payload.c8y_CombinedInput.inner.x}"
input.y = "${.payload.c8y_CombinedInput.inner.y}"
input.z = { foo = "bar" }
"#,
);

let config = test_mapper_config(&ttd);

let test_handle = spawn_c8y_mapper_actor_with_config(&ttd, config, true).await;
let TestHandle { mqtt, http, .. } = test_handle;
spawn_dummy_c8y_http_proxy(http);

let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS);

skip_init_messages(&mut mqtt).await;

// indicate that main device supports the operation
let capability_message =
MqttMessage::new(&Topic::new_unchecked("te/device/main///cmd/command"), "{}");

mqtt.send(capability_message).await.unwrap();

assert_received_contains_str(&mut mqtt, [("c8y/s/us", "114,c8y_CombinedInput")]).await;

assert!(ttd
.path()
.join("operations/c8y/c8y_CombinedInput")
.is_symlink());

let input_message = MqttMessage::new(
&Topic::new_unchecked("c8y/devicecontrol/notifications"),
json!({
"status":"PENDING",
"id": "1234",
"c8y_CombinedInput": {
"text": "do something",
"inner": {
"x": "x value",
"y": 42,
"z": "z unused value",
},
},
"externalSource":{
"externalId":"test-device",
"type":"c8y_Serial"
}
})
.to_string(),
);
mqtt.send(input_message).await.expect("Send failed");

assert_received_includes_json(
&mut mqtt,
[(
"te/device/main///cmd/command/c8y-mapper-1234",
json!({
"status": "init",
"x": "x value",
"y": 42,
"z": {
"foo": "bar"
}
}),
)],
)
.await;
}

#[tokio::test]
async fn mapper_converts_custom_operation_for_main_device_without_workflow_input() {
let ttd = TempTedgeDir::new();
Expand Down

0 comments on commit 974562d

Please sign in to comment.