diff --git a/rumqttc/CHANGELOG.md b/rumqttc/CHANGELOG.md index d5971b2a1..f68dc630c 100644 --- a/rumqttc/CHANGELOG.md +++ b/rumqttc/CHANGELOG.md @@ -31,6 +31,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Validate filters while creating subscription requests. * Make v4::Connect::write return correct value * Ordering of `State.events` related to `QoS > 0` publishes +* Filter PUBACK in pending save requests to fix unexpected PUBACK sent to reconnected broker. * Resume session only if broker sends `CONNACK` with `session_present == 1`. ### Security diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index 25081a179..1527c8603 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -130,7 +130,15 @@ impl EventLoop { self.pending.extend(self.state.clean()); // drain requests from channel which weren't yet received - let requests_in_channel = self.requests_rx.drain(); + let mut requests_in_channel: Vec<_> = self.requests_rx.drain().collect(); + + requests_in_channel.retain(|request| { + match request { + Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack + _ => true, + } + }); + self.pending.extend(requests_in_channel); } diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs index e5b24a9cf..ea7076dde 100644 --- a/rumqttc/src/v5/eventloop.rs +++ b/rumqttc/src/v5/eventloop.rs @@ -126,7 +126,15 @@ impl EventLoop { self.pending.extend(self.state.clean()); // drain requests from channel which weren't yet received - let requests_in_channel = self.requests_rx.drain(); + let mut requests_in_channel: Vec<_> = self.requests_rx.drain().collect(); + + requests_in_channel.retain(|request| { + match request { + Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack + _ => true, + } + }); + self.pending.extend(requests_in_channel); }