diff --git a/crates/core/tedge_agent/src/entity_manager/server.rs b/crates/core/tedge_agent/src/entity_manager/server.rs index 53e8202ba3..76d86c1228 100644 --- a/crates/core/tedge_agent/src/entity_manager/server.rs +++ b/crates/core/tedge_agent/src/entity_manager/server.rs @@ -9,7 +9,7 @@ use tedge_api::entity_store::EntityRegistrationMessage; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; -use tedge_api::pending_entity_store::PendingEntityData; +use tedge_api::pending_entity_store::RegisteredEntityData; use tedge_api::EntityStore; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::QoS; @@ -27,7 +27,7 @@ pub enum EntityStoreRequest { #[derive(Debug)] pub enum EntityStoreResponse { Get(Option), - Create(Result<(Vec, Vec), entity_store::Error>), + Create(Result, entity_store::Error>), Delete(Vec), Ok, } @@ -149,7 +149,7 @@ impl EntityStoreServer { async fn register_entity( &mut self, entity: EntityRegistrationMessage, - ) -> Result<(Vec, Vec), entity_store::Error> { + ) -> Result, entity_store::Error> { if self.entity_store.get(&entity.topic_id).is_some() { return Err(entity_store::Error::EntityAlreadyRegistered( entity.topic_id, @@ -164,9 +164,9 @@ impl EntityStoreServer { } } - let (affected, pending) = self.entity_store.update(entity.clone())?; + let registered = self.entity_store.update(entity.clone())?; - if !affected.is_empty() { + if !registered.is_empty() { let message = entity.to_mqtt_message(&self.mqtt_schema); if let Err(err) = self.mqtt_publisher.send(message.clone()).await { error!( @@ -174,7 +174,7 @@ impl EntityStoreServer { ) } } - Ok((affected, pending)) + Ok(registered) } async fn deregister_entity(&mut self, topic_id: EntityTopicId) -> Vec { diff --git a/crates/core/tedge_agent/src/http_server/entity_store.rs b/crates/core/tedge_agent/src/http_server/entity_store.rs index 03751ce708..c9a5c75790 100644 --- a/crates/core/tedge_agent/src/http_server/entity_store.rs +++ b/crates/core/tedge_agent/src/http_server/entity_store.rs @@ -252,10 +252,7 @@ mod tests { && entity.r#type == EntityType::ChildDevice { req.reply_to - .send(EntityStoreResponse::Create(Ok(( - vec![EntityTopicId::default_main_device()], - vec![], - )))) + .send(EntityStoreResponse::Create(Ok(vec![]))) .await .unwrap(); } diff --git a/crates/core/tedge_api/src/entity_store.rs b/crates/core/tedge_api/src/entity_store.rs index 16289d4405..afb74b937d 100644 --- a/crates/core/tedge_api/src/entity_store.rs +++ b/crates/core/tedge_api/src/entity_store.rs @@ -18,8 +18,8 @@ use crate::mqtt_topics::EntityTopicId; use crate::mqtt_topics::MqttSchema; use crate::store::message_log::MessageLogReader; use crate::store::message_log::MessageLogWriter; -use crate::store::pending_entity_store::PendingEntityData; use crate::store::pending_entity_store::PendingEntityStore; +use crate::store::pending_entity_store::RegisteredEntityData; use log::debug; use log::error; use log::info; @@ -282,21 +282,21 @@ impl EntityStore { .collect() } - /// Updates entity store state based on the content of the entity - /// registration message. + /// Updates entity store state based on the content of the entity registration message. /// - /// It can register a new entity in the store or update already registered - /// entity, returning a list of all entities affected by the update, e.g.: + /// Caches the entity if it cannot be registered because its ancestors are not registered yet. /// - /// - when adding/removing a child device or service, the parent is affected + /// Returns a vector of registered entities that includes: + /// - the entity that is provided in the input message (if actually new and not cached) + /// - any previously cached child entities of the parent that is now registered. pub fn update( &mut self, message: EntityRegistrationMessage, - ) -> Result<(Vec, Vec), Error> { + ) -> Result, Error> { match self.register_and_persist_entity(message.clone()) { Ok(affected_entities) => { if affected_entities.is_empty() { - Ok((vec![], vec![])) + Ok(vec![]) } else { let topic_id = message.topic_id.clone(); let current_entity_data = @@ -312,7 +312,7 @@ impl EntityStore { pending_entities.push(pending_child); } - Ok((affected_entities, pending_entities)) + Ok(pending_entities) } } Err(Error::NoParent(_)) => { @@ -320,7 +320,7 @@ impl EntityStore { // cache it in the unregistered entity store to be processed later self.pending_entity_store .cache_early_registration_message(message); - Ok((vec![], vec![])) + Ok(vec![]) } Err(err) => Err(err), } @@ -971,11 +971,9 @@ mod tests { .unwrap(); let updated_entities = store.update(entity.clone()).unwrap(); - let pending_entity: PendingEntityData = entity.into(); - assert_eq!(updated_entities.0.len(), 1); - assert_eq!(updated_entities.0, ["device/main//"]); - assert_eq!(updated_entities.1.len(), 1); - assert_eq!(updated_entities.1, vec![pending_entity]); + let pending_entity: RegisteredEntityData = entity.into(); + assert_eq!(updated_entities.len(), 1); + assert_eq!(updated_entities, vec![pending_entity]); } #[test] @@ -995,7 +993,10 @@ mod tests { ) .unwrap(); - assert_eq!(updated_entities.0, ["device/main//"]); + assert_eq!( + updated_entities.get(0).unwrap().reg_message.topic_id, + "device/child1//" + ); assert_eq!( store.child_devices(&EntityTopicId::default_main_device()), ["device/child1//"] @@ -1010,7 +1011,10 @@ mod tests { .unwrap(), ) .unwrap(); - assert_eq!(updated_entities.0, ["device/main//"]); + assert_eq!( + updated_entities.get(0).unwrap().reg_message.topic_id, + "device/child2//" + ); let children = store.child_devices(&EntityTopicId::default_main_device()); assert!(children.iter().any(|&e| e == "device/child1//")); assert!(children.iter().any(|&e| e == "device/child2//")); @@ -1032,7 +1036,10 @@ mod tests { }) .unwrap(); - assert_eq!(updated_entities.0, ["device/main//"]); + assert_eq!( + updated_entities.get(0).unwrap().reg_message.topic_id, + "device/main/service/service1" + ); assert_eq!( store.services(&EntityTopicId::default_main_device()), ["device/main/service/service1"] @@ -1048,7 +1055,10 @@ mod tests { }) .unwrap(); - assert_eq!(updated_entities.0, ["device/main//"]); + assert_eq!( + updated_entities.get(0).unwrap().reg_message.topic_id, + "device/main/service/service2" + ); let services = store.services(&EntityTopicId::default_main_device()); assert!(services .iter() @@ -1345,15 +1355,18 @@ mod tests { }; let affected_entities = store.update(reg_message.clone()).unwrap(); - assert_eq!(affected_entities.0.get(0).unwrap(), "device/main//"); + assert_eq!( + affected_entities.get(0).unwrap().reg_message.topic_id, + "device/child1//" + ); let affected_entities = store.update(reg_message.clone()).unwrap(); - assert!(affected_entities.0.is_empty()); + assert!(affected_entities.is_empty()); // Duplicate registration ignore even after the entity store is restored from the disk let mut store = new_entity_store(&temp_dir, false); let affected_entities = store.update(reg_message).unwrap(); - assert!(affected_entities.0.is_empty()); + assert!(affected_entities.is_empty()); } #[test] @@ -1370,7 +1383,10 @@ mod tests { }; let affected_entities = store.update(reg_message.clone()).unwrap(); - assert_eq!(affected_entities.0.get(0).unwrap(), "device/main//"); + assert_eq!( + affected_entities.get(0).unwrap().reg_message.topic_id, + "device/child1//" + ); // Update the entity twin data store @@ -1383,12 +1399,12 @@ mod tests { // Assert that the duplicate registration message is still ignored let affected_entities = store.update(reg_message.clone()).unwrap(); - assert!(affected_entities.0.is_empty()); + assert!(affected_entities.is_empty()); // Duplicate registration ignore even after the entity store is restored from the disk let mut store = new_entity_store(&temp_dir, false); let affected_entities = store.update(reg_message).unwrap(); - assert!(affected_entities.0.is_empty()); + assert!(affected_entities.is_empty()); } #[test] @@ -1407,7 +1423,7 @@ mod tests { ) .with_parent(child00_topic_id.clone()); let affected_entities = store.update(child000_reg_message.clone()).unwrap(); - assert!(affected_entities.0.is_empty()); + assert!(affected_entities.is_empty()); // Register grand-child before child let child00_reg_message = EntityRegistrationMessage::new_custom( @@ -1416,7 +1432,7 @@ mod tests { ) .with_parent(child0_topic_id.clone()); let affected_entities = store.update(child00_reg_message).unwrap(); - assert!(affected_entities.0.is_empty()); + assert!(affected_entities.is_empty()); // Register the immediate child device which will trigger the registration of its children as well let child0_reg_message = @@ -1424,17 +1440,17 @@ mod tests { let affected_entities = store.update(child0_reg_message).unwrap(); // Assert that the affected entities include all the children - assert!(!affected_entities.0.is_empty()); + assert!(!affected_entities.is_empty()); let affected_entities = store.update(child000_reg_message.clone()).unwrap(); - assert!(affected_entities.0.is_empty()); + assert!(affected_entities.is_empty()); // Reload the entity store from the persistent log let mut store = new_entity_store(&temp_dir, true); // Assert that duplicate registrations are still ignored let affected_entities = store.update(child000_reg_message).unwrap(); - assert!(affected_entities.0.is_empty()); + assert!(affected_entities.is_empty()); } #[test] diff --git a/crates/core/tedge_api/src/store/pending_entity_store.rs b/crates/core/tedge_api/src/store/pending_entity_store.rs index 5762ab640b..1b927216b8 100644 --- a/crates/core/tedge_api/src/store/pending_entity_store.rs +++ b/crates/core/tedge_api/src/store/pending_entity_store.rs @@ -41,13 +41,16 @@ impl PendingEntityCache { } #[derive(Debug, Clone, Eq, PartialEq)] - -pub struct PendingEntityData { +/// Registration data for an entity that has been fully registered +/// +/// Possibly with a list of MQTT messages +/// that have been delayed till this registration. +pub struct RegisteredEntityData { pub reg_message: EntityRegistrationMessage, pub data_messages: Vec, } -impl From for PendingEntityData { +impl From for RegisteredEntityData { fn from(reg_message: EntityRegistrationMessage) -> Self { Self { reg_message, @@ -69,14 +72,14 @@ impl PendingEntityStore { pub fn take_cached_entity_data( &mut self, reg_message: EntityRegistrationMessage, - ) -> PendingEntityData { + ) -> RegisteredEntityData { let mut pending_messages = vec![]; if let Some(pending_entity) = self.entities.remove(®_message.topic_id) { pending_messages.extend(pending_entity.metadata); pending_messages.extend(self.take_cached_telemetry_data(®_message.topic_id)); } - PendingEntityData { + RegisteredEntityData { reg_message, data_messages: pending_messages, } @@ -86,12 +89,12 @@ impl PendingEntityStore { pub fn take_cached_child_entities_data( &mut self, entity_tid: &EntityTopicId, - ) -> Vec { + ) -> Vec { let mut children = vec![]; if let Some(direct_children) = self.orphans.remove(entity_tid) { for child in direct_children { let pending_entity_cache = self.entities.remove(&child).unwrap(); - let pending_entity_data = self.pending_data_from_cache(pending_entity_cache); + let pending_entity_data = self.registered_data_from_cache(pending_entity_cache); children.push(pending_entity_data); children.append(&mut self.take_cached_child_entities_data(&child)); } @@ -99,13 +102,16 @@ impl PendingEntityStore { children } - fn pending_data_from_cache(&mut self, pending_cache: PendingEntityCache) -> PendingEntityData { + fn registered_data_from_cache( + &mut self, + pending_cache: PendingEntityCache, + ) -> RegisteredEntityData { let reg_message = pending_cache.reg_message.unwrap(); let mut pending_messages = vec![]; pending_messages.extend(pending_cache.metadata); pending_messages.extend(self.take_cached_telemetry_data(®_message.topic_id)); - PendingEntityData { + RegisteredEntityData { reg_message, data_messages: pending_messages, } diff --git a/crates/extensions/c8y_mapper_ext/src/actor.rs b/crates/extensions/c8y_mapper_ext/src/actor.rs index 0214b967b7..eb06ccd413 100644 --- a/crates/extensions/c8y_mapper_ext/src/actor.rs +++ b/crates/extensions/c8y_mapper_ext/src/actor.rs @@ -28,7 +28,7 @@ use tedge_actors::SimpleMessageBoxBuilder; use tedge_api::entity_store::EntityRegistrationMessage; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::ChannelFilter; -use tedge_api::pending_entity_store::PendingEntityData; +use tedge_api::pending_entity_store::RegisteredEntityData; use tedge_downloader_ext::DownloadRequest; use tedge_downloader_ext::DownloadResult; use tedge_file_system_ext::FsWatchEvent; @@ -216,7 +216,7 @@ impl C8yMapperActor { /// followed by repeating the same for its cached data messages. pub(crate) async fn process_registered_entities( &mut self, - pending_entities: Vec, + pending_entities: Vec, ) -> Result<(), RuntimeError> { for pending_entity in pending_entities { self.process_registration_message(pending_entity.reg_message) diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 2e6658b5b1..36cdc583da 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -76,7 +76,7 @@ use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::IdGenerator; use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::OperationType; -use tedge_api::pending_entity_store::PendingEntityData; +use tedge_api::pending_entity_store::RegisteredEntityData; use tedge_api::script::ShellScript; use tedge_api::workflow::GenericCommandState; use tedge_api::CommandLog; @@ -295,7 +295,7 @@ impl CumulocityConverter { pub async fn try_register_source_entities( &mut self, message: &MqttMessage, - ) -> Result, ConversionError> { + ) -> Result, ConversionError> { if let Ok((source, channel)) = self.mqtt_schema.entity_channel_of(&message.topic) { match channel { Channel::EntityMetadata => { @@ -1233,7 +1233,7 @@ impl CumulocityConverter { pub(crate) async fn try_register_entity_with_pending_children( &mut self, register_message: EntityRegistrationMessage, - ) -> Result, ConversionError> { + ) -> Result, ConversionError> { match self.entity_cache.register_entity(register_message.clone()) { Err(e) => { error!("Entity registration failed: {e}"); @@ -1645,7 +1645,7 @@ pub(crate) mod tests { use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::OperationType; - use tedge_api::pending_entity_store::PendingEntityData; + use tedge_api::pending_entity_store::RegisteredEntityData; use tedge_api::script::ShellScript; use tedge_api::SoftwareUpdateCommand; use tedge_config::AutoLogUpload; @@ -2801,7 +2801,7 @@ pub(crate) mod tests { .await .unwrap(); - let messages = pending_entities_into_mqtt_messages(entities); + let messages = registered_entities_into_mqtt_messages(entities); // Assert that the registration message, the twin updates and the cached measurement messages are converted assert_messages_matching( @@ -2898,7 +2898,7 @@ pub(crate) mod tests { .try_register_source_entities(®_message) .await .unwrap(); - let messages = pending_entities_into_mqtt_messages(entities); + let messages = registered_entities_into_mqtt_messages(entities); assert_messages_matching( &messages, [ @@ -3036,7 +3036,9 @@ pub(crate) mod tests { )]); } - fn pending_entities_into_mqtt_messages(entities: Vec) -> Vec { + fn registered_entities_into_mqtt_messages( + entities: Vec, + ) -> Vec { let mut messages = vec![]; for entity in entities { messages.push(entity.reg_message.to_mqtt_message(&MqttSchema::default())); diff --git a/crates/extensions/c8y_mapper_ext/src/entity_cache.rs b/crates/extensions/c8y_mapper_ext/src/entity_cache.rs index f27f2adfd5..0f49b2610b 100644 --- a/crates/extensions/c8y_mapper_ext/src/entity_cache.rs +++ b/crates/extensions/c8y_mapper_ext/src/entity_cache.rs @@ -9,8 +9,8 @@ use tedge_api::entity_store::EntityRegistrationMessage; use tedge_api::entity_store::EntityTwinMessage; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; -use tedge_api::pending_entity_store::PendingEntityData; use tedge_api::pending_entity_store::PendingEntityStore; +use tedge_api::pending_entity_store::RegisteredEntityData; use tedge_mqtt_ext::MqttMessage; use thiserror::Error; use tracing::debug; @@ -114,7 +114,7 @@ impl EntityCache { pub(crate) fn register_entity( &mut self, entity: EntityRegistrationMessage, - ) -> Result, Error> { + ) -> Result, Error> { let parent = entity.parent.as_ref().unwrap_or(&self.main_device_tid); if self.entities.contains_key(parent) { let outcome = self.register_single_entity(entity.clone())?;