Skip to content

Commit

Permalink
Merge pull request #3346 from didier-wenzek/refactor/simplify-entity-…
Browse files Browse the repository at this point in the history
…registration-internal-api

Refactoring: Simplify entity registration internal API
  • Loading branch information
didier-wenzek authored Jan 22, 2025
2 parents 3de87d1 + b24c3ff commit 8b3b494
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 60 deletions.
12 changes: 6 additions & 6 deletions crates/core/tedge_agent/src/entity_manager/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tedge_api::entity_store::EntityRegistrationMessage;
use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::pending_entity_store::PendingEntityData;
use tedge_api::pending_entity_store::RegisteredEntityData;
use tedge_api::EntityStore;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::QoS;
Expand All @@ -27,7 +27,7 @@ pub enum EntityStoreRequest {
#[derive(Debug)]
pub enum EntityStoreResponse {
Get(Option<EntityMetadata>),
Create(Result<(Vec<EntityTopicId>, Vec<PendingEntityData>), entity_store::Error>),
Create(Result<Vec<RegisteredEntityData>, entity_store::Error>),
Delete(Vec<EntityTopicId>),
Ok,
}
Expand Down Expand Up @@ -149,7 +149,7 @@ impl EntityStoreServer {
async fn register_entity(
&mut self,
entity: EntityRegistrationMessage,
) -> Result<(Vec<EntityTopicId>, Vec<PendingEntityData>), entity_store::Error> {
) -> Result<Vec<RegisteredEntityData>, entity_store::Error> {
if self.entity_store.get(&entity.topic_id).is_some() {
return Err(entity_store::Error::EntityAlreadyRegistered(
entity.topic_id,
Expand All @@ -164,17 +164,17 @@ impl EntityStoreServer {
}
}

let (affected, pending) = self.entity_store.update(entity.clone())?;
let registered = self.entity_store.update(entity.clone())?;

if !affected.is_empty() {
if !registered.is_empty() {
let message = entity.to_mqtt_message(&self.mqtt_schema);
if let Err(err) = self.mqtt_publisher.send(message.clone()).await {
error!(
"Failed to publish the entity registration message: {message:?} due to {err}",
)
}
}
Ok((affected, pending))
Ok(registered)
}

async fn deregister_entity(&mut self, topic_id: EntityTopicId) -> Vec<EntityTopicId> {
Expand Down
5 changes: 1 addition & 4 deletions crates/core/tedge_agent/src/http_server/entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,7 @@ mod tests {
&& entity.r#type == EntityType::ChildDevice
{
req.reply_to
.send(EntityStoreResponse::Create(Ok((
vec![EntityTopicId::default_main_device()],
vec![],
))))
.send(EntityStoreResponse::Create(Ok(vec![])))
.await
.unwrap();
}
Expand Down
76 changes: 46 additions & 30 deletions crates/core/tedge_api/src/entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use crate::mqtt_topics::EntityTopicId;
use crate::mqtt_topics::MqttSchema;
use crate::store::message_log::MessageLogReader;
use crate::store::message_log::MessageLogWriter;
use crate::store::pending_entity_store::PendingEntityData;
use crate::store::pending_entity_store::PendingEntityStore;
use crate::store::pending_entity_store::RegisteredEntityData;
use log::debug;
use log::error;
use log::info;
Expand Down Expand Up @@ -282,21 +282,21 @@ impl EntityStore {
.collect()
}

/// Updates entity store state based on the content of the entity
/// registration message.
/// Updates entity store state based on the content of the entity registration message.
///
/// It can register a new entity in the store or update already registered
/// entity, returning a list of all entities affected by the update, e.g.:
/// Caches the entity if it cannot be registered because its ancestors are not registered yet.
///
/// - when adding/removing a child device or service, the parent is affected
/// Returns a vector of registered entities that includes:
/// - the entity that is provided in the input message (if actually new and not cached)
/// - any previously cached child entities of the parent that is now registered.
pub fn update(
&mut self,
message: EntityRegistrationMessage,
) -> Result<(Vec<EntityTopicId>, Vec<PendingEntityData>), Error> {
) -> Result<Vec<RegisteredEntityData>, Error> {
match self.register_and_persist_entity(message.clone()) {
Ok(affected_entities) => {
if affected_entities.is_empty() {
Ok((vec![], vec![]))
Ok(vec![])
} else {
let topic_id = message.topic_id.clone();
let current_entity_data =
Expand All @@ -312,15 +312,15 @@ impl EntityStore {
pending_entities.push(pending_child);
}

Ok((affected_entities, pending_entities))
Ok(pending_entities)
}
}
Err(Error::NoParent(_)) => {
// When a child device registration message is received before the parent is registered,
// cache it in the unregistered entity store to be processed later
self.pending_entity_store
.cache_early_registration_message(message);
Ok((vec![], vec![]))
Ok(vec![])
}
Err(err) => Err(err),
}
Expand Down Expand Up @@ -971,11 +971,9 @@ mod tests {
.unwrap();
let updated_entities = store.update(entity.clone()).unwrap();

let pending_entity: PendingEntityData = entity.into();
assert_eq!(updated_entities.0.len(), 1);
assert_eq!(updated_entities.0, ["device/main//"]);
assert_eq!(updated_entities.1.len(), 1);
assert_eq!(updated_entities.1, vec![pending_entity]);
let pending_entity: RegisteredEntityData = entity.into();
assert_eq!(updated_entities.len(), 1);
assert_eq!(updated_entities, vec![pending_entity]);
}

#[test]
Expand All @@ -995,7 +993,10 @@ mod tests {
)
.unwrap();

assert_eq!(updated_entities.0, ["device/main//"]);
assert_eq!(
updated_entities.get(0).unwrap().reg_message.topic_id,
"device/child1//"
);
assert_eq!(
store.child_devices(&EntityTopicId::default_main_device()),
["device/child1//"]
Expand All @@ -1010,7 +1011,10 @@ mod tests {
.unwrap(),
)
.unwrap();
assert_eq!(updated_entities.0, ["device/main//"]);
assert_eq!(
updated_entities.get(0).unwrap().reg_message.topic_id,
"device/child2//"
);
let children = store.child_devices(&EntityTopicId::default_main_device());
assert!(children.iter().any(|&e| e == "device/child1//"));
assert!(children.iter().any(|&e| e == "device/child2//"));
Expand All @@ -1032,7 +1036,10 @@ mod tests {
})
.unwrap();

assert_eq!(updated_entities.0, ["device/main//"]);
assert_eq!(
updated_entities.get(0).unwrap().reg_message.topic_id,
"device/main/service/service1"
);
assert_eq!(
store.services(&EntityTopicId::default_main_device()),
["device/main/service/service1"]
Expand All @@ -1048,7 +1055,10 @@ mod tests {
})
.unwrap();

assert_eq!(updated_entities.0, ["device/main//"]);
assert_eq!(
updated_entities.get(0).unwrap().reg_message.topic_id,
"device/main/service/service2"
);
let services = store.services(&EntityTopicId::default_main_device());
assert!(services
.iter()
Expand Down Expand Up @@ -1345,15 +1355,18 @@ mod tests {
};

let affected_entities = store.update(reg_message.clone()).unwrap();
assert_eq!(affected_entities.0.get(0).unwrap(), "device/main//");
assert_eq!(
affected_entities.get(0).unwrap().reg_message.topic_id,
"device/child1//"
);

let affected_entities = store.update(reg_message.clone()).unwrap();
assert!(affected_entities.0.is_empty());
assert!(affected_entities.is_empty());

// Duplicate registration ignore even after the entity store is restored from the disk
let mut store = new_entity_store(&temp_dir, false);
let affected_entities = store.update(reg_message).unwrap();
assert!(affected_entities.0.is_empty());
assert!(affected_entities.is_empty());
}

#[test]
Expand All @@ -1370,7 +1383,10 @@ mod tests {
};

let affected_entities = store.update(reg_message.clone()).unwrap();
assert_eq!(affected_entities.0.get(0).unwrap(), "device/main//");
assert_eq!(
affected_entities.get(0).unwrap().reg_message.topic_id,
"device/child1//"
);

// Update the entity twin data
store
Expand All @@ -1383,12 +1399,12 @@ mod tests {

// Assert that the duplicate registration message is still ignored
let affected_entities = store.update(reg_message.clone()).unwrap();
assert!(affected_entities.0.is_empty());
assert!(affected_entities.is_empty());

// Duplicate registration ignore even after the entity store is restored from the disk
let mut store = new_entity_store(&temp_dir, false);
let affected_entities = store.update(reg_message).unwrap();
assert!(affected_entities.0.is_empty());
assert!(affected_entities.is_empty());
}

#[test]
Expand All @@ -1407,7 +1423,7 @@ mod tests {
)
.with_parent(child00_topic_id.clone());
let affected_entities = store.update(child000_reg_message.clone()).unwrap();
assert!(affected_entities.0.is_empty());
assert!(affected_entities.is_empty());

// Register grand-child before child
let child00_reg_message = EntityRegistrationMessage::new_custom(
Expand All @@ -1416,25 +1432,25 @@ mod tests {
)
.with_parent(child0_topic_id.clone());
let affected_entities = store.update(child00_reg_message).unwrap();
assert!(affected_entities.0.is_empty());
assert!(affected_entities.is_empty());

// Register the immediate child device which will trigger the registration of its children as well
let child0_reg_message =
EntityRegistrationMessage::new_custom(child0_topic_id.clone(), EntityType::ChildDevice);
let affected_entities = store.update(child0_reg_message).unwrap();

// Assert that the affected entities include all the children
assert!(!affected_entities.0.is_empty());
assert!(!affected_entities.is_empty());

let affected_entities = store.update(child000_reg_message.clone()).unwrap();
assert!(affected_entities.0.is_empty());
assert!(affected_entities.is_empty());

// Reload the entity store from the persistent log
let mut store = new_entity_store(&temp_dir, true);

// Assert that duplicate registrations are still ignored
let affected_entities = store.update(child000_reg_message).unwrap();
assert!(affected_entities.0.is_empty());
assert!(affected_entities.is_empty());
}

#[test]
Expand Down
24 changes: 15 additions & 9 deletions crates/core/tedge_api/src/store/pending_entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@ impl PendingEntityCache {
}

#[derive(Debug, Clone, Eq, PartialEq)]

pub struct PendingEntityData {
/// Registration data for an entity that has been fully registered
///
/// Possibly with a list of MQTT messages
/// that have been delayed till this registration.
pub struct RegisteredEntityData {
pub reg_message: EntityRegistrationMessage,
pub data_messages: Vec<MqttMessage>,
}

impl From<EntityRegistrationMessage> for PendingEntityData {
impl From<EntityRegistrationMessage> for RegisteredEntityData {
fn from(reg_message: EntityRegistrationMessage) -> Self {
Self {
reg_message,
Expand All @@ -69,14 +72,14 @@ impl PendingEntityStore {
pub fn take_cached_entity_data(
&mut self,
reg_message: EntityRegistrationMessage,
) -> PendingEntityData {
) -> RegisteredEntityData {
let mut pending_messages = vec![];
if let Some(pending_entity) = self.entities.remove(&reg_message.topic_id) {
pending_messages.extend(pending_entity.metadata);
pending_messages.extend(self.take_cached_telemetry_data(&reg_message.topic_id));
}

PendingEntityData {
RegisteredEntityData {
reg_message,
data_messages: pending_messages,
}
Expand All @@ -86,26 +89,29 @@ impl PendingEntityStore {
pub fn take_cached_child_entities_data(
&mut self,
entity_tid: &EntityTopicId,
) -> Vec<PendingEntityData> {
) -> Vec<RegisteredEntityData> {
let mut children = vec![];
if let Some(direct_children) = self.orphans.remove(entity_tid) {
for child in direct_children {
let pending_entity_cache = self.entities.remove(&child).unwrap();
let pending_entity_data = self.pending_data_from_cache(pending_entity_cache);
let pending_entity_data = self.registered_data_from_cache(pending_entity_cache);
children.push(pending_entity_data);
children.append(&mut self.take_cached_child_entities_data(&child));
}
}
children
}

fn pending_data_from_cache(&mut self, pending_cache: PendingEntityCache) -> PendingEntityData {
fn registered_data_from_cache(
&mut self,
pending_cache: PendingEntityCache,
) -> RegisteredEntityData {
let reg_message = pending_cache.reg_message.unwrap();
let mut pending_messages = vec![];
pending_messages.extend(pending_cache.metadata);
pending_messages.extend(self.take_cached_telemetry_data(&reg_message.topic_id));

PendingEntityData {
RegisteredEntityData {
reg_message,
data_messages: pending_messages,
}
Expand Down
4 changes: 2 additions & 2 deletions crates/extensions/c8y_mapper_ext/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tedge_actors::SimpleMessageBoxBuilder;
use tedge_api::entity_store::EntityRegistrationMessage;
use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::ChannelFilter;
use tedge_api::pending_entity_store::PendingEntityData;
use tedge_api::pending_entity_store::RegisteredEntityData;
use tedge_downloader_ext::DownloadRequest;
use tedge_downloader_ext::DownloadResult;
use tedge_file_system_ext::FsWatchEvent;
Expand Down Expand Up @@ -216,7 +216,7 @@ impl C8yMapperActor {
/// followed by repeating the same for its cached data messages.
pub(crate) async fn process_registered_entities(
&mut self,
pending_entities: Vec<PendingEntityData>,
pending_entities: Vec<RegisteredEntityData>,
) -> Result<(), RuntimeError> {
for pending_entity in pending_entities {
self.process_registration_message(pending_entity.reg_message)
Expand Down
Loading

0 comments on commit 8b3b494

Please sign in to comment.