Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pending messages should only be cleared if there is a discrepancy between what we requested regarding the session and what the server responds with #923

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Filter PUBACK in pending save requests to fix unexpected PUBACK sent to reconnected broker.
* Resume session only if broker sends `CONNACK` with `session_present == 1`.
* Remove v5 PubAck/PubRec/PubRel/PubComp/Sub/Unsub failures from `StateError` and log warnings on these failures.
* Make sure session cleaning on the client side happens as per the mqtt specification.

### Security

Expand Down
8 changes: 6 additions & 2 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,12 @@ impl EventLoop {
Ok(inner) => inner?,
Err(_) => return Err(ConnectionError::NetworkTimeout),
};
// Last session might contain packets which aren't acked. If it's a new session, clear the pending packets.
if !connack.session_present {
// Reset session if the session flag is different from what we expected
// i.e. We requested clean_session but the server replied with session present
// or We requested an existing session but the server replied with session not present
if (self.mqtt_options.clean_session && connack.session_present)
|| (!self.mqtt_options.clean_session && !connack.session_present)
{
self.pending.clear();
}
self.network = Some(network);
Expand Down
22 changes: 17 additions & 5 deletions rumqttc/src/v5/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ impl MqttState {
}
_ => {
warn!("SubAck Pkid = {:?}, Reason = {:?}", suback.pkid, reason);
},
}
}
}
Ok(None)
Expand Down Expand Up @@ -364,7 +364,10 @@ impl MqttState {
if puback.reason != PubAckReason::Success
&& puback.reason != PubAckReason::NoMatchingSubscribers
{
warn!("PubAck Pkid = {:?}, reason: {:?}", puback.pkid, puback.reason);
warn!(
"PubAck Pkid = {:?}, reason: {:?}",
puback.pkid, puback.reason
);
return Ok(None);
}

Expand Down Expand Up @@ -397,7 +400,10 @@ impl MqttState {
if pubrec.reason != PubRecReason::Success
&& pubrec.reason != PubRecReason::NoMatchingSubscribers
{
warn!("PubRec Pkid = {:?}, reason: {:?}", pubrec.pkid, pubrec.reason);
warn!(
"PubRec Pkid = {:?}, reason: {:?}",
pubrec.pkid, pubrec.reason
);
return Ok(None);
}

Expand All @@ -417,7 +423,10 @@ impl MqttState {
self.incoming_pub.set(pubrel.pkid as usize, false);

if pubrel.reason != PubRelReason::Success {
warn!("PubRel Pkid = {:?}, reason: {:?}", pubrel.pkid, pubrel.reason);
warn!(
"PubRel Pkid = {:?}, reason: {:?}",
pubrel.pkid, pubrel.reason
);
return Ok(None);
}

Expand All @@ -444,7 +453,10 @@ impl MqttState {
self.outgoing_rel.set(pubcomp.pkid as usize, false);

if pubcomp.reason != PubCompReason::Success {
warn!("PubComp Pkid = {:?}, reason: {:?}", pubcomp.pkid, pubcomp.reason);
warn!(
"PubComp Pkid = {:?}, reason: {:?}",
pubcomp.pkid, pubcomp.reason
);
return Ok(None);
}

Expand Down