diff --git a/crates/core/tedge_agent/src/entity_manager/server.rs b/crates/core/tedge_agent/src/entity_manager/server.rs index 0c6a876d22..dc11db52d9 100644 --- a/crates/core/tedge_agent/src/entity_manager/server.rs +++ b/crates/core/tedge_agent/src/entity_manager/server.rs @@ -9,7 +9,7 @@ use tedge_api::entity_store::EntityRegistrationMessage; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; -use tedge_api::pending_entity_store::PendingEntityData; +use tedge_api::pending_entity_store::RegisteredEntityData; use tedge_api::EntityStore; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::QoS; @@ -27,7 +27,7 @@ pub enum EntityStoreRequest { #[derive(Debug)] pub enum EntityStoreResponse { Get(Option), - Create(Result, entity_store::Error>), + Create(Result, entity_store::Error>), Delete(Vec), Ok, } @@ -149,7 +149,7 @@ impl EntityStoreServer { async fn register_entity( &mut self, entity: EntityRegistrationMessage, - ) -> Result, entity_store::Error> { + ) -> Result, entity_store::Error> { if self.entity_store.get(&entity.topic_id).is_some() { return Err(entity_store::Error::EntityAlreadyRegistered( entity.topic_id, diff --git a/crates/core/tedge_api/src/entity_store.rs b/crates/core/tedge_api/src/entity_store.rs index 16289d4405..baff14af74 100644 --- a/crates/core/tedge_api/src/entity_store.rs +++ b/crates/core/tedge_api/src/entity_store.rs @@ -18,8 +18,8 @@ use crate::mqtt_topics::EntityTopicId; use crate::mqtt_topics::MqttSchema; use crate::store::message_log::MessageLogReader; use crate::store::message_log::MessageLogWriter; -use crate::store::pending_entity_store::PendingEntityData; use crate::store::pending_entity_store::PendingEntityStore; +use crate::store::pending_entity_store::RegisteredEntityData; use log::debug; use log::error; use log::info; @@ -292,7 +292,7 @@ impl EntityStore { pub fn update( &mut self, message: EntityRegistrationMessage, - ) -> Result<(Vec, Vec), Error> { + ) -> Result<(Vec, Vec), Error> { match self.register_and_persist_entity(message.clone()) { Ok(affected_entities) => { if affected_entities.is_empty() { @@ -971,7 +971,7 @@ mod tests { .unwrap(); let updated_entities = store.update(entity.clone()).unwrap(); - let pending_entity: PendingEntityData = entity.into(); + let pending_entity: RegisteredEntityData = entity.into(); assert_eq!(updated_entities.0.len(), 1); assert_eq!(updated_entities.0, ["device/main//"]); assert_eq!(updated_entities.1.len(), 1); diff --git a/crates/core/tedge_api/src/store/pending_entity_store.rs b/crates/core/tedge_api/src/store/pending_entity_store.rs index 5762ab640b..1b927216b8 100644 --- a/crates/core/tedge_api/src/store/pending_entity_store.rs +++ b/crates/core/tedge_api/src/store/pending_entity_store.rs @@ -41,13 +41,16 @@ impl PendingEntityCache { } #[derive(Debug, Clone, Eq, PartialEq)] - -pub struct PendingEntityData { +/// Registration data for an entity that has been fully registered +/// +/// Possibly with a list of MQTT messages +/// that have been delayed till this registration. +pub struct RegisteredEntityData { pub reg_message: EntityRegistrationMessage, pub data_messages: Vec, } -impl From for PendingEntityData { +impl From for RegisteredEntityData { fn from(reg_message: EntityRegistrationMessage) -> Self { Self { reg_message, @@ -69,14 +72,14 @@ impl PendingEntityStore { pub fn take_cached_entity_data( &mut self, reg_message: EntityRegistrationMessage, - ) -> PendingEntityData { + ) -> RegisteredEntityData { let mut pending_messages = vec![]; if let Some(pending_entity) = self.entities.remove(®_message.topic_id) { pending_messages.extend(pending_entity.metadata); pending_messages.extend(self.take_cached_telemetry_data(®_message.topic_id)); } - PendingEntityData { + RegisteredEntityData { reg_message, data_messages: pending_messages, } @@ -86,12 +89,12 @@ impl PendingEntityStore { pub fn take_cached_child_entities_data( &mut self, entity_tid: &EntityTopicId, - ) -> Vec { + ) -> Vec { let mut children = vec![]; if let Some(direct_children) = self.orphans.remove(entity_tid) { for child in direct_children { let pending_entity_cache = self.entities.remove(&child).unwrap(); - let pending_entity_data = self.pending_data_from_cache(pending_entity_cache); + let pending_entity_data = self.registered_data_from_cache(pending_entity_cache); children.push(pending_entity_data); children.append(&mut self.take_cached_child_entities_data(&child)); } @@ -99,13 +102,16 @@ impl PendingEntityStore { children } - fn pending_data_from_cache(&mut self, pending_cache: PendingEntityCache) -> PendingEntityData { + fn registered_data_from_cache( + &mut self, + pending_cache: PendingEntityCache, + ) -> RegisteredEntityData { let reg_message = pending_cache.reg_message.unwrap(); let mut pending_messages = vec![]; pending_messages.extend(pending_cache.metadata); pending_messages.extend(self.take_cached_telemetry_data(®_message.topic_id)); - PendingEntityData { + RegisteredEntityData { reg_message, data_messages: pending_messages, } diff --git a/crates/extensions/c8y_mapper_ext/src/actor.rs b/crates/extensions/c8y_mapper_ext/src/actor.rs index 0214b967b7..eb06ccd413 100644 --- a/crates/extensions/c8y_mapper_ext/src/actor.rs +++ b/crates/extensions/c8y_mapper_ext/src/actor.rs @@ -28,7 +28,7 @@ use tedge_actors::SimpleMessageBoxBuilder; use tedge_api::entity_store::EntityRegistrationMessage; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::ChannelFilter; -use tedge_api::pending_entity_store::PendingEntityData; +use tedge_api::pending_entity_store::RegisteredEntityData; use tedge_downloader_ext::DownloadRequest; use tedge_downloader_ext::DownloadResult; use tedge_file_system_ext::FsWatchEvent; @@ -216,7 +216,7 @@ impl C8yMapperActor { /// followed by repeating the same for its cached data messages. pub(crate) async fn process_registered_entities( &mut self, - pending_entities: Vec, + pending_entities: Vec, ) -> Result<(), RuntimeError> { for pending_entity in pending_entities { self.process_registration_message(pending_entity.reg_message) diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 2e6658b5b1..36cdc583da 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -76,7 +76,7 @@ use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::IdGenerator; use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::OperationType; -use tedge_api::pending_entity_store::PendingEntityData; +use tedge_api::pending_entity_store::RegisteredEntityData; use tedge_api::script::ShellScript; use tedge_api::workflow::GenericCommandState; use tedge_api::CommandLog; @@ -295,7 +295,7 @@ impl CumulocityConverter { pub async fn try_register_source_entities( &mut self, message: &MqttMessage, - ) -> Result, ConversionError> { + ) -> Result, ConversionError> { if let Ok((source, channel)) = self.mqtt_schema.entity_channel_of(&message.topic) { match channel { Channel::EntityMetadata => { @@ -1233,7 +1233,7 @@ impl CumulocityConverter { pub(crate) async fn try_register_entity_with_pending_children( &mut self, register_message: EntityRegistrationMessage, - ) -> Result, ConversionError> { + ) -> Result, ConversionError> { match self.entity_cache.register_entity(register_message.clone()) { Err(e) => { error!("Entity registration failed: {e}"); @@ -1645,7 +1645,7 @@ pub(crate) mod tests { use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::OperationType; - use tedge_api::pending_entity_store::PendingEntityData; + use tedge_api::pending_entity_store::RegisteredEntityData; use tedge_api::script::ShellScript; use tedge_api::SoftwareUpdateCommand; use tedge_config::AutoLogUpload; @@ -2801,7 +2801,7 @@ pub(crate) mod tests { .await .unwrap(); - let messages = pending_entities_into_mqtt_messages(entities); + let messages = registered_entities_into_mqtt_messages(entities); // Assert that the registration message, the twin updates and the cached measurement messages are converted assert_messages_matching( @@ -2898,7 +2898,7 @@ pub(crate) mod tests { .try_register_source_entities(®_message) .await .unwrap(); - let messages = pending_entities_into_mqtt_messages(entities); + let messages = registered_entities_into_mqtt_messages(entities); assert_messages_matching( &messages, [ @@ -3036,7 +3036,9 @@ pub(crate) mod tests { )]); } - fn pending_entities_into_mqtt_messages(entities: Vec) -> Vec { + fn registered_entities_into_mqtt_messages( + entities: Vec, + ) -> Vec { let mut messages = vec![]; for entity in entities { messages.push(entity.reg_message.to_mqtt_message(&MqttSchema::default())); diff --git a/crates/extensions/c8y_mapper_ext/src/entity_cache.rs b/crates/extensions/c8y_mapper_ext/src/entity_cache.rs index f27f2adfd5..0f49b2610b 100644 --- a/crates/extensions/c8y_mapper_ext/src/entity_cache.rs +++ b/crates/extensions/c8y_mapper_ext/src/entity_cache.rs @@ -9,8 +9,8 @@ use tedge_api::entity_store::EntityRegistrationMessage; use tedge_api::entity_store::EntityTwinMessage; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; -use tedge_api::pending_entity_store::PendingEntityData; use tedge_api::pending_entity_store::PendingEntityStore; +use tedge_api::pending_entity_store::RegisteredEntityData; use tedge_mqtt_ext::MqttMessage; use thiserror::Error; use tracing::debug; @@ -114,7 +114,7 @@ impl EntityCache { pub(crate) fn register_entity( &mut self, entity: EntityRegistrationMessage, - ) -> Result, Error> { + ) -> Result, Error> { let parent = entity.parent.as_ref().unwrap_or(&self.main_device_tid); if self.entities.contains_key(parent) { let outcome = self.register_single_entity(entity.clone())?;