Skip to content

Commit

Permalink
Simplify EntityStore::update type signature
Browse files Browse the repository at this point in the history
The vector of affected entities was only used by tests.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
  • Loading branch information
didier-wenzek committed Jan 22, 2025
1 parent 1fbc6ea commit b24c3ff
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 31 deletions.
6 changes: 3 additions & 3 deletions crates/core/tedge_agent/src/entity_manager/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,17 @@ impl EntityStoreServer {
}
}

let (affected, pending) = self.entity_store.update(entity.clone())?;
let registered = self.entity_store.update(entity.clone())?;

if !affected.is_empty() {
if !registered.is_empty() {
let message = entity.to_mqtt_message(&self.mqtt_schema);
if let Err(err) = self.mqtt_publisher.send(message.clone()).await {
error!(
"Failed to publish the entity registration message: {message:?} due to {err}",
)
}
}
Ok(pending)
Ok(registered)
}

async fn deregister_entity(&mut self, topic_id: EntityTopicId) -> Vec<EntityTopicId> {
Expand Down
72 changes: 44 additions & 28 deletions crates/core/tedge_api/src/entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,21 +282,21 @@ impl EntityStore {
.collect()
}

/// Updates entity store state based on the content of the entity
/// registration message.
/// Updates entity store state based on the content of the entity registration message.
///
/// It can register a new entity in the store or update already registered
/// entity, returning a list of all entities affected by the update, e.g.:
/// Caches the entity if it cannot be registered because its ancestors are not registered yet.
///
/// - when adding/removing a child device or service, the parent is affected
/// Returns a vector of registered entities that includes:
/// - the entity that is provided in the input message (if actually new and not cached)
/// - any previously cached child entities of the parent that is now registered.
pub fn update(
&mut self,
message: EntityRegistrationMessage,
) -> Result<(Vec<EntityTopicId>, Vec<RegisteredEntityData>), Error> {
) -> Result<Vec<RegisteredEntityData>, Error> {
match self.register_and_persist_entity(message.clone()) {
Ok(affected_entities) => {
if affected_entities.is_empty() {
Ok((vec![], vec![]))
Ok(vec![])
} else {
let topic_id = message.topic_id.clone();
let current_entity_data =
Expand All @@ -312,15 +312,15 @@ impl EntityStore {
pending_entities.push(pending_child);
}

Ok((affected_entities, pending_entities))
Ok(pending_entities)
}
}
Err(Error::NoParent(_)) => {
// When a child device registration message is received before the parent is registered,
// cache it in the unregistered entity store to be processed later
self.pending_entity_store
.cache_early_registration_message(message);
Ok((vec![], vec![]))
Ok(vec![])
}
Err(err) => Err(err),
}
Expand Down Expand Up @@ -972,10 +972,8 @@ mod tests {
let updated_entities = store.update(entity.clone()).unwrap();

let pending_entity: RegisteredEntityData = entity.into();
assert_eq!(updated_entities.0.len(), 1);
assert_eq!(updated_entities.0, ["device/main//"]);
assert_eq!(updated_entities.1.len(), 1);
assert_eq!(updated_entities.1, vec![pending_entity]);
assert_eq!(updated_entities.len(), 1);
assert_eq!(updated_entities, vec![pending_entity]);
}

#[test]
Expand All @@ -995,7 +993,10 @@ mod tests {
)
.unwrap();

assert_eq!(updated_entities.0, ["device/main//"]);
assert_eq!(
updated_entities.get(0).unwrap().reg_message.topic_id,
"device/child1//"
);
assert_eq!(
store.child_devices(&EntityTopicId::default_main_device()),
["device/child1//"]
Expand All @@ -1010,7 +1011,10 @@ mod tests {
.unwrap(),
)
.unwrap();
assert_eq!(updated_entities.0, ["device/main//"]);
assert_eq!(
updated_entities.get(0).unwrap().reg_message.topic_id,
"device/child2//"
);
let children = store.child_devices(&EntityTopicId::default_main_device());
assert!(children.iter().any(|&e| e == "device/child1//"));
assert!(children.iter().any(|&e| e == "device/child2//"));
Expand All @@ -1032,7 +1036,10 @@ mod tests {
})
.unwrap();

assert_eq!(updated_entities.0, ["device/main//"]);
assert_eq!(
updated_entities.get(0).unwrap().reg_message.topic_id,
"device/main/service/service1"
);
assert_eq!(
store.services(&EntityTopicId::default_main_device()),
["device/main/service/service1"]
Expand All @@ -1048,7 +1055,10 @@ mod tests {
})
.unwrap();

assert_eq!(updated_entities.0, ["device/main//"]);
assert_eq!(
updated_entities.get(0).unwrap().reg_message.topic_id,
"device/main/service/service2"
);
let services = store.services(&EntityTopicId::default_main_device());
assert!(services
.iter()
Expand Down Expand Up @@ -1345,15 +1355,18 @@ mod tests {
};

let affected_entities = store.update(reg_message.clone()).unwrap();
assert_eq!(affected_entities.0.get(0).unwrap(), "device/main//");
assert_eq!(
affected_entities.get(0).unwrap().reg_message.topic_id,
"device/child1//"
);

let affected_entities = store.update(reg_message.clone()).unwrap();
assert!(affected_entities.0.is_empty());
assert!(affected_entities.is_empty());

// Duplicate registration ignore even after the entity store is restored from the disk
let mut store = new_entity_store(&temp_dir, false);
let affected_entities = store.update(reg_message).unwrap();
assert!(affected_entities.0.is_empty());
assert!(affected_entities.is_empty());
}

#[test]
Expand All @@ -1370,7 +1383,10 @@ mod tests {
};

let affected_entities = store.update(reg_message.clone()).unwrap();
assert_eq!(affected_entities.0.get(0).unwrap(), "device/main//");
assert_eq!(
affected_entities.get(0).unwrap().reg_message.topic_id,
"device/child1//"
);

// Update the entity twin data
store
Expand All @@ -1383,12 +1399,12 @@ mod tests {

// Assert that the duplicate registration message is still ignored
let affected_entities = store.update(reg_message.clone()).unwrap();
assert!(affected_entities.0.is_empty());
assert!(affected_entities.is_empty());

// Duplicate registration ignore even after the entity store is restored from the disk
let mut store = new_entity_store(&temp_dir, false);
let affected_entities = store.update(reg_message).unwrap();
assert!(affected_entities.0.is_empty());
assert!(affected_entities.is_empty());
}

#[test]
Expand All @@ -1407,7 +1423,7 @@ mod tests {
)
.with_parent(child00_topic_id.clone());
let affected_entities = store.update(child000_reg_message.clone()).unwrap();
assert!(affected_entities.0.is_empty());
assert!(affected_entities.is_empty());

// Register grand-child before child
let child00_reg_message = EntityRegistrationMessage::new_custom(
Expand All @@ -1416,25 +1432,25 @@ mod tests {
)
.with_parent(child0_topic_id.clone());
let affected_entities = store.update(child00_reg_message).unwrap();
assert!(affected_entities.0.is_empty());
assert!(affected_entities.is_empty());

// Register the immediate child device which will trigger the registration of its children as well
let child0_reg_message =
EntityRegistrationMessage::new_custom(child0_topic_id.clone(), EntityType::ChildDevice);
let affected_entities = store.update(child0_reg_message).unwrap();

// Assert that the affected entities include all the children
assert!(!affected_entities.0.is_empty());
assert!(!affected_entities.is_empty());

let affected_entities = store.update(child000_reg_message.clone()).unwrap();
assert!(affected_entities.0.is_empty());
assert!(affected_entities.is_empty());

// Reload the entity store from the persistent log
let mut store = new_entity_store(&temp_dir, true);

// Assert that duplicate registrations are still ignored
let affected_entities = store.update(child000_reg_message).unwrap();
assert!(affected_entities.0.is_empty());
assert!(affected_entities.is_empty());
}

#[test]
Expand Down

0 comments on commit b24c3ff

Please sign in to comment.