diff --git a/rumqttc/CHANGELOG.md b/rumqttc/CHANGELOG.md index c8c8716a3..c050c16f8 100644 --- a/rumqttc/CHANGELOG.md +++ b/rumqttc/CHANGELOG.md @@ -37,6 +37,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Filter PUBACK in pending save requests to fix unexpected PUBACK sent to reconnected broker. * Resume session only if broker sends `CONNACK` with `session_present == 1`. * Remove v5 PubAck/PubRec/PubRel/PubComp/Sub/Unsub failures from `StateError` and log warnings on these failures. +* Make sure session cleaning on the client side happens as per the mqtt specification. ### Security diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index d31690d99..6039674bb 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -157,8 +157,12 @@ impl EventLoop { Ok(inner) => inner?, Err(_) => return Err(ConnectionError::NetworkTimeout), }; - // Last session might contain packets which aren't acked. If it's a new session, clear the pending packets. - if !connack.session_present { + // Reset session if the session flag is different from what we expected + // i.e. We requested clean_session but the server replied with session present + // or We requested an existing session but the server replied with session not present + if (self.mqtt_options.clean_session && connack.session_present) + || (!self.mqtt_options.clean_session && !connack.session_present) + { self.pending.clear(); } self.network = Some(network); diff --git a/rumqttc/src/v5/state.rs b/rumqttc/src/v5/state.rs index 0f08a33b8..9a7485f3b 100644 --- a/rumqttc/src/v5/state.rs +++ b/rumqttc/src/v5/state.rs @@ -242,7 +242,7 @@ impl MqttState { } _ => { warn!("SubAck Pkid = {:?}, Reason = {:?}", suback.pkid, reason); - }, + } } } Ok(None) @@ -364,7 +364,10 @@ impl MqttState { if puback.reason != PubAckReason::Success && puback.reason != PubAckReason::NoMatchingSubscribers { - warn!("PubAck Pkid = {:?}, reason: {:?}", puback.pkid, puback.reason); + warn!( + "PubAck Pkid = {:?}, reason: {:?}", + puback.pkid, puback.reason + ); return Ok(None); } @@ -397,7 +400,10 @@ impl MqttState { if pubrec.reason != PubRecReason::Success && pubrec.reason != PubRecReason::NoMatchingSubscribers { - warn!("PubRec Pkid = {:?}, reason: {:?}", pubrec.pkid, pubrec.reason); + warn!( + "PubRec Pkid = {:?}, reason: {:?}", + pubrec.pkid, pubrec.reason + ); return Ok(None); } @@ -417,7 +423,10 @@ impl MqttState { self.incoming_pub.set(pubrel.pkid as usize, false); if pubrel.reason != PubRelReason::Success { - warn!("PubRel Pkid = {:?}, reason: {:?}", pubrel.pkid, pubrel.reason); + warn!( + "PubRel Pkid = {:?}, reason: {:?}", + pubrel.pkid, pubrel.reason + ); return Ok(None); } @@ -444,7 +453,10 @@ impl MqttState { self.outgoing_rel.set(pubcomp.pkid as usize, false); if pubcomp.reason != PubCompReason::Success { - warn!("PubComp Pkid = {:?}, reason: {:?}", pubcomp.pkid, pubcomp.reason); + warn!( + "PubComp Pkid = {:?}, reason: {:?}", + pubcomp.pkid, pubcomp.reason + ); return Ok(None); }