Skip to content

Commit

Permalink
Merge pull request #3231 from Ruadhri17/custom-operation-topic-fix
Browse files Browse the repository at this point in the history
fix: match operation without topic only when it was delivered by device control topic
  • Loading branch information
Ruadhri17 authored Nov 18, 2024
2 parents bd3adcd + f1c2a7d commit d682535
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 3 deletions.
50 changes: 48 additions & 2 deletions crates/core/c8y_api/src/smartrest/operations.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::json_c8y_deserializer::C8yDeviceControlTopic;
use crate::smartrest::error::OperationsError;
use crate::smartrest::smartrest_serializer::declare_supported_operations;
use mqtt_channel::TopicFilter;
Expand All @@ -10,6 +11,7 @@ use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use tedge_api::substitution::Record;
use tedge_config::TopicPrefix;
use tracing::warn;

use super::payload::SmartrestPayload;
Expand Down Expand Up @@ -57,11 +59,17 @@ impl Operations {
None
}

pub fn filter_by_topic(&self, topic_name: &str) -> Vec<(String, Operation)> {
pub fn filter_by_topic(
&self,
topic_name: &str,
prefix: &TopicPrefix,
) -> Vec<(String, Operation)> {
let mut vec: Vec<(String, Operation)> = Vec::new();
for op in self.operations.iter() {
match (op.topic(), op.on_fragment()) {
(None, Some(on_fragment)) => vec.push((on_fragment, op.clone())),
(None, Some(on_fragment)) if C8yDeviceControlTopic::name(prefix) == topic_name => {
vec.push((on_fragment, op.clone()))
}
(Some(topic), Some(on_fragment)) if topic == topic_name => {
vec.push((on_fragment, op.clone()))
}
Expand Down Expand Up @@ -394,6 +402,7 @@ pub enum InvalidCustomOperationHandler {
#[cfg(test)]
mod tests {
use std::io::Write;
use std::str::FromStr;

use super::*;
use tedge_config::TopicPrefix;
Expand Down Expand Up @@ -595,4 +604,41 @@ mod tests {
let operation = Operation::new(exec);
assert!(!operation.is_valid_operation_handler());
}

#[test_case(
r#"
on_fragment = "c8y_Something"
command = "echo 1"
"#,
r#"
topic = "c8y/custom/one"
on_fragment = "c8y_Something"
command = "echo 2"
"#
)]
fn filter_by_topic_(toml1: &str, toml2: &str) {
let exec: OnMessageExec = toml::from_str(toml1).unwrap();
let operation1 = Operation::new(exec);

let exec: OnMessageExec = toml::from_str(toml2).unwrap();
let operation2 = Operation::new(exec);

let ops = Operations {
operations: vec![operation1.clone(), operation2.clone()],
};

let prefix = TopicPrefix::from_str("c8y").unwrap();

let filter_custom = ops.filter_by_topic("c8y/custom/one", &prefix);
assert_eq!(
filter_custom,
vec![("c8y_Something".to_string(), operation2)]
);

let filter_default = ops.filter_by_topic("c8y/devicecontrol/notifications", &prefix);
assert_eq!(
filter_default,
vec![("c8y_Something".to_string(), operation1)]
);
}
}
4 changes: 3 additions & 1 deletion crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,9 @@ impl CumulocityConverter {
extras: &HashMap<String, Value>,
message: &MqttMessage,
) -> Result<Vec<MqttMessage>, CumulocityMapperError> {
let handlers = self.operations.filter_by_topic(&message.topic.name);
let handlers = self
.operations
.filter_by_topic(&message.topic.name, &self.config.bridge_config.c8y_prefix);

if handlers.is_empty() {
info!("No matched custom operation handler is found for the subscribed custom operation topics. The operation '{operation_id}' (ID) is ignored.");
Expand Down

0 comments on commit d682535

Please sign in to comment.