Skip to content

Commit

Permalink
Rename PendingEntityData as ResolvedEntityData
Browse files Browse the repository at this point in the history
The previous name was confusing as attached to entity fully registered.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
  • Loading branch information
didier-wenzek committed Jan 22, 2025
1 parent bec6c75 commit 1fbc6ea
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 26 deletions.
6 changes: 3 additions & 3 deletions crates/core/tedge_agent/src/entity_manager/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tedge_api::entity_store::EntityRegistrationMessage;
use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::pending_entity_store::PendingEntityData;
use tedge_api::pending_entity_store::RegisteredEntityData;
use tedge_api::EntityStore;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::QoS;
Expand All @@ -27,7 +27,7 @@ pub enum EntityStoreRequest {
#[derive(Debug)]
pub enum EntityStoreResponse {
Get(Option<EntityMetadata>),
Create(Result<Vec<PendingEntityData>, entity_store::Error>),
Create(Result<Vec<RegisteredEntityData>, entity_store::Error>),
Delete(Vec<EntityTopicId>),
Ok,
}
Expand Down Expand Up @@ -149,7 +149,7 @@ impl EntityStoreServer {
async fn register_entity(
&mut self,
entity: EntityRegistrationMessage,
) -> Result<Vec<PendingEntityData>, entity_store::Error> {
) -> Result<Vec<RegisteredEntityData>, entity_store::Error> {
if self.entity_store.get(&entity.topic_id).is_some() {
return Err(entity_store::Error::EntityAlreadyRegistered(
entity.topic_id,
Expand Down
6 changes: 3 additions & 3 deletions crates/core/tedge_api/src/entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use crate::mqtt_topics::EntityTopicId;
use crate::mqtt_topics::MqttSchema;
use crate::store::message_log::MessageLogReader;
use crate::store::message_log::MessageLogWriter;
use crate::store::pending_entity_store::PendingEntityData;
use crate::store::pending_entity_store::PendingEntityStore;
use crate::store::pending_entity_store::RegisteredEntityData;
use log::debug;
use log::error;
use log::info;
Expand Down Expand Up @@ -292,7 +292,7 @@ impl EntityStore {
pub fn update(
&mut self,
message: EntityRegistrationMessage,
) -> Result<(Vec<EntityTopicId>, Vec<PendingEntityData>), Error> {
) -> Result<(Vec<EntityTopicId>, Vec<RegisteredEntityData>), Error> {
match self.register_and_persist_entity(message.clone()) {
Ok(affected_entities) => {
if affected_entities.is_empty() {
Expand Down Expand Up @@ -971,7 +971,7 @@ mod tests {
.unwrap();
let updated_entities = store.update(entity.clone()).unwrap();

let pending_entity: PendingEntityData = entity.into();
let pending_entity: RegisteredEntityData = entity.into();
assert_eq!(updated_entities.0.len(), 1);
assert_eq!(updated_entities.0, ["device/main//"]);
assert_eq!(updated_entities.1.len(), 1);
Expand Down
24 changes: 15 additions & 9 deletions crates/core/tedge_api/src/store/pending_entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@ impl PendingEntityCache {
}

#[derive(Debug, Clone, Eq, PartialEq)]

pub struct PendingEntityData {
/// Registration data for an entity that has been fully registered
///
/// Possibly with a list of MQTT messages
/// that have been delayed till this registration.
pub struct RegisteredEntityData {
pub reg_message: EntityRegistrationMessage,
pub data_messages: Vec<MqttMessage>,
}

impl From<EntityRegistrationMessage> for PendingEntityData {
impl From<EntityRegistrationMessage> for RegisteredEntityData {
fn from(reg_message: EntityRegistrationMessage) -> Self {
Self {
reg_message,
Expand All @@ -69,14 +72,14 @@ impl PendingEntityStore {
pub fn take_cached_entity_data(
&mut self,
reg_message: EntityRegistrationMessage,
) -> PendingEntityData {
) -> RegisteredEntityData {
let mut pending_messages = vec![];
if let Some(pending_entity) = self.entities.remove(&reg_message.topic_id) {
pending_messages.extend(pending_entity.metadata);
pending_messages.extend(self.take_cached_telemetry_data(&reg_message.topic_id));
}

PendingEntityData {
RegisteredEntityData {
reg_message,
data_messages: pending_messages,
}
Expand All @@ -86,26 +89,29 @@ impl PendingEntityStore {
pub fn take_cached_child_entities_data(
&mut self,
entity_tid: &EntityTopicId,
) -> Vec<PendingEntityData> {
) -> Vec<RegisteredEntityData> {
let mut children = vec![];
if let Some(direct_children) = self.orphans.remove(entity_tid) {
for child in direct_children {
let pending_entity_cache = self.entities.remove(&child).unwrap();
let pending_entity_data = self.pending_data_from_cache(pending_entity_cache);
let pending_entity_data = self.registered_data_from_cache(pending_entity_cache);
children.push(pending_entity_data);
children.append(&mut self.take_cached_child_entities_data(&child));
}
}
children
}

fn pending_data_from_cache(&mut self, pending_cache: PendingEntityCache) -> PendingEntityData {
fn registered_data_from_cache(
&mut self,
pending_cache: PendingEntityCache,
) -> RegisteredEntityData {
let reg_message = pending_cache.reg_message.unwrap();
let mut pending_messages = vec![];
pending_messages.extend(pending_cache.metadata);
pending_messages.extend(self.take_cached_telemetry_data(&reg_message.topic_id));

PendingEntityData {
RegisteredEntityData {
reg_message,
data_messages: pending_messages,
}
Expand Down
4 changes: 2 additions & 2 deletions crates/extensions/c8y_mapper_ext/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tedge_actors::SimpleMessageBoxBuilder;
use tedge_api::entity_store::EntityRegistrationMessage;
use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::ChannelFilter;
use tedge_api::pending_entity_store::PendingEntityData;
use tedge_api::pending_entity_store::RegisteredEntityData;
use tedge_downloader_ext::DownloadRequest;
use tedge_downloader_ext::DownloadResult;
use tedge_file_system_ext::FsWatchEvent;
Expand Down Expand Up @@ -216,7 +216,7 @@ impl C8yMapperActor {
/// followed by repeating the same for its cached data messages.
pub(crate) async fn process_registered_entities(
&mut self,
pending_entities: Vec<PendingEntityData>,
pending_entities: Vec<RegisteredEntityData>,
) -> Result<(), RuntimeError> {
for pending_entity in pending_entities {
self.process_registration_message(pending_entity.reg_message)
Expand Down
16 changes: 9 additions & 7 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::IdGenerator;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::OperationType;
use tedge_api::pending_entity_store::PendingEntityData;
use tedge_api::pending_entity_store::RegisteredEntityData;
use tedge_api::script::ShellScript;
use tedge_api::workflow::GenericCommandState;
use tedge_api::CommandLog;
Expand Down Expand Up @@ -295,7 +295,7 @@ impl CumulocityConverter {
pub async fn try_register_source_entities(
&mut self,
message: &MqttMessage,
) -> Result<Vec<PendingEntityData>, ConversionError> {
) -> Result<Vec<RegisteredEntityData>, ConversionError> {
if let Ok((source, channel)) = self.mqtt_schema.entity_channel_of(&message.topic) {
match channel {
Channel::EntityMetadata => {
Expand Down Expand Up @@ -1233,7 +1233,7 @@ impl CumulocityConverter {
pub(crate) async fn try_register_entity_with_pending_children(
&mut self,
register_message: EntityRegistrationMessage,
) -> Result<Vec<PendingEntityData>, ConversionError> {
) -> Result<Vec<RegisteredEntityData>, ConversionError> {
match self.entity_cache.register_entity(register_message.clone()) {
Err(e) => {
error!("Entity registration failed: {e}");
Expand Down Expand Up @@ -1645,7 +1645,7 @@ pub(crate) mod tests {
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::OperationType;
use tedge_api::pending_entity_store::PendingEntityData;
use tedge_api::pending_entity_store::RegisteredEntityData;
use tedge_api::script::ShellScript;
use tedge_api::SoftwareUpdateCommand;
use tedge_config::AutoLogUpload;
Expand Down Expand Up @@ -2801,7 +2801,7 @@ pub(crate) mod tests {
.await
.unwrap();

let messages = pending_entities_into_mqtt_messages(entities);
let messages = registered_entities_into_mqtt_messages(entities);

// Assert that the registration message, the twin updates and the cached measurement messages are converted
assert_messages_matching(
Expand Down Expand Up @@ -2898,7 +2898,7 @@ pub(crate) mod tests {
.try_register_source_entities(&reg_message)
.await
.unwrap();
let messages = pending_entities_into_mqtt_messages(entities);
let messages = registered_entities_into_mqtt_messages(entities);
assert_messages_matching(
&messages,
[
Expand Down Expand Up @@ -3036,7 +3036,9 @@ pub(crate) mod tests {
)]);
}

fn pending_entities_into_mqtt_messages(entities: Vec<PendingEntityData>) -> Vec<MqttMessage> {
fn registered_entities_into_mqtt_messages(
entities: Vec<RegisteredEntityData>,
) -> Vec<MqttMessage> {
let mut messages = vec![];
for entity in entities {
messages.push(entity.reg_message.to_mqtt_message(&MqttSchema::default()));
Expand Down
4 changes: 2 additions & 2 deletions crates/extensions/c8y_mapper_ext/src/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use tedge_api::entity_store::EntityRegistrationMessage;
use tedge_api::entity_store::EntityTwinMessage;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::pending_entity_store::PendingEntityData;
use tedge_api::pending_entity_store::PendingEntityStore;
use tedge_api::pending_entity_store::RegisteredEntityData;
use tedge_mqtt_ext::MqttMessage;
use thiserror::Error;
use tracing::debug;
Expand Down Expand Up @@ -114,7 +114,7 @@ impl EntityCache {
pub(crate) fn register_entity(
&mut self,
entity: EntityRegistrationMessage,
) -> Result<Vec<PendingEntityData>, Error> {
) -> Result<Vec<RegisteredEntityData>, Error> {
let parent = entity.parent.as_ref().unwrap_or(&self.main_device_tid);
if self.entities.contains_key(parent) {
let outcome = self.register_single_entity(entity.clone())?;
Expand Down

0 comments on commit 1fbc6ea

Please sign in to comment.