diff --git a/rumqttc/examples/ack_promise.rs b/rumqttc/examples/ack_promise.rs index 55ff7493..f4c837b2 100644 --- a/rumqttc/examples/ack_promise.rs +++ b/rumqttc/examples/ack_promise.rs @@ -31,7 +31,7 @@ async fn main() -> Result<(), Box> { .unwrap() .await { - Ok(pkid) => println!("Acknowledged Sub({pkid})"), + Ok(pkid) => println!("Acknowledged Sub({pkid:?})"), Err(e) => println!("Subscription failed: {e:?}"), } @@ -46,7 +46,7 @@ async fn main() -> Result<(), Box> { .unwrap() .await { - Ok(pkid) => println!("Acknowledged Pub({pkid})"), + Ok(ack) => println!("Acknowledged Pub({ack:?})"), Err(e) => println!("Publish failed: {e:?}"), } } @@ -66,14 +66,14 @@ async fn main() -> Result<(), Box> { while let Some(Ok(res)) = set.join_next().await { match res { - Ok(pkid) => println!("Acknowledged Pub({pkid})"), + Ok(ack) => println!("Acknowledged Pub({ack:?})"), Err(e) => println!("Publish failed: {e:?}"), } } // Unsubscribe and wait for broker acknowledgement match client.unsubscribe("hello/world").await.unwrap().await { - Ok(pkid) => println!("Acknowledged Unsub({pkid})"), + Ok(ack) => println!("Acknowledged Unsub({ack:?})"), Err(e) => println!("Unsubscription failed: {e:?}"), } diff --git a/rumqttc/examples/ack_promise_sync.rs b/rumqttc/examples/ack_promise_sync.rs index 506ee486..1eaa20e0 100644 --- a/rumqttc/examples/ack_promise_sync.rs +++ b/rumqttc/examples/ack_promise_sync.rs @@ -28,7 +28,7 @@ fn main() -> Result<(), Box> { .unwrap() .wait() { - Ok(pkid) => println!("Acknowledged Sub({pkid})"), + Ok(pkid) => println!("Acknowledged Sub({pkid:?})"), Err(e) => println!("Subscription failed: {e:?}"), } @@ -42,7 +42,7 @@ fn main() -> Result<(), Box> { .unwrap() .wait() { - Ok(pkid) => println!("Acknowledged Pub({pkid})"), + Ok(ack) => println!("Acknowledged Pub({ack:?})"), Err(e) => println!("Publish failed: {e:?}"), } } @@ -83,14 +83,14 @@ fn main() -> Result<(), Box> { while let Ok(res) = rx.recv() { match res { - Ok(pkid) => println!("Acknowledged Pub({pkid})"), + Ok(ack) => println!("Acknowledged Pub({ack:?})"), Err(e) => println!("Publish failed: {e:?}"), } } // Unsubscribe and wait for broker acknowledgement match client.unsubscribe("hello/world").unwrap().wait() { - Ok(pkid) => println!("Acknowledged Unsub({pkid})"), + Ok(ack) => println!("Acknowledged Unsub({ack:?})"), Err(e) => println!("Unsubscription failed: {e:?}"), } diff --git a/rumqttc/examples/ack_promise_v5.rs b/rumqttc/examples/ack_promise_v5.rs index 8873cf6a..de2fdf56 100644 --- a/rumqttc/examples/ack_promise_v5.rs +++ b/rumqttc/examples/ack_promise_v5.rs @@ -31,7 +31,7 @@ async fn main() -> Result<(), Box> { .unwrap() .await { - Ok(pkid) => println!("Acknowledged Sub({pkid})"), + Ok(pkid) => println!("Acknowledged Sub({pkid:?})"), Err(e) => println!("Subscription failed: {e:?}"), } @@ -46,7 +46,7 @@ async fn main() -> Result<(), Box> { .unwrap() .await { - Ok(pkid) => println!("Acknowledged Pub({pkid})"), + Ok(pkid) => println!("Acknowledged Pub({pkid:?})"), Err(e) => println!("Publish failed: {e:?}"), } } @@ -66,14 +66,14 @@ async fn main() -> Result<(), Box> { while let Some(Ok(res)) = set.join_next().await { match res { - Ok(pkid) => println!("Acknowledged Pub({pkid})"), + Ok(pkid) => println!("Acknowledged Pub({pkid:?})"), Err(e) => println!("Publish failed: {e:?}"), } } // Unsubscribe and wait for broker acknowledgement match client.unsubscribe("hello/world").await.unwrap().await { - Ok(pkid) => println!("Acknowledged Unsub({pkid})"), + Ok(pkid) => println!("Acknowledged Unsub({pkid:?})"), Err(e) => println!("Unsubscription failed: {e:?}"), } diff --git a/rumqttc/src/client.rs b/rumqttc/src/client.rs index ffab94f0..404b5230 100644 --- a/rumqttc/src/client.rs +++ b/rumqttc/src/client.rs @@ -5,7 +5,8 @@ use std::time::Duration; use crate::mqttbytes::{v4::*, QoS}; use crate::tokens::{NoResponse, Resolver, Token}; use crate::{ - valid_filter, valid_topic, ConnectionError, Event, EventLoop, MqttOptions, Pkid, Request, + valid_filter, valid_topic, AckOfAck, AckOfPub, ConnectionError, Event, EventLoop, MqttOptions, + Request, }; use bytes::Bytes; @@ -75,7 +76,7 @@ impl AsyncClient { qos: QoS, retain: bool, payload: V, - ) -> Result, ClientError> + ) -> Result, ClientError> where S: Into, V: Into>, @@ -100,7 +101,7 @@ impl AsyncClient { qos: QoS, retain: bool, payload: V, - ) -> Result, ClientError> + ) -> Result, ClientError> where S: Into, V: Into>, @@ -119,7 +120,7 @@ impl AsyncClient { } /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. - pub async fn ack(&self, publish: &Publish) -> Result, ClientError> { + pub async fn ack(&self, publish: &Publish) -> Result, ClientError> { let (resolver, token) = Resolver::new(); let ack = get_ack_req(publish, resolver); if let Some(ack) = ack { @@ -130,7 +131,7 @@ impl AsyncClient { } /// Attempts to send a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. - pub fn try_ack(&self, publish: &Publish) -> Result, ClientError> { + pub fn try_ack(&self, publish: &Publish) -> Result, ClientError> { let (resolver, token) = Resolver::new(); let ack = get_ack_req(publish, resolver); if let Some(ack) = ack { @@ -147,7 +148,7 @@ impl AsyncClient { qos: QoS, retain: bool, payload: Bytes, - ) -> Result, ClientError> + ) -> Result, ClientError> where S: Into, { @@ -165,7 +166,7 @@ impl AsyncClient { &self, topic: S, qos: QoS, - ) -> Result, ClientError> { + ) -> Result, ClientError> { let (resolver, token) = Resolver::new(); let subscribe = Subscribe::new(topic, qos); @@ -184,7 +185,7 @@ impl AsyncClient { &self, topic: S, qos: QoS, - ) -> Result, ClientError> { + ) -> Result, ClientError> { let (resolver, token) = Resolver::new(); let subscribe = Subscribe::new(topic, qos); let is_valid = subscribe_has_valid_filters(&subscribe); @@ -198,7 +199,7 @@ impl AsyncClient { } /// Sends a MQTT Subscribe for multiple topics to the `EventLoop` - pub async fn subscribe_many(&self, topics: T) -> Result, ClientError> + pub async fn subscribe_many(&self, topics: T) -> Result, ClientError> where T: IntoIterator, { @@ -216,7 +217,7 @@ impl AsyncClient { } /// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop` - pub fn try_subscribe_many(&self, topics: T) -> Result, ClientError> + pub fn try_subscribe_many(&self, topics: T) -> Result, ClientError> where T: IntoIterator, { @@ -233,7 +234,10 @@ impl AsyncClient { } /// Sends a MQTT Unsubscribe to the `EventLoop` - pub async fn unsubscribe>(&self, topic: S) -> Result, ClientError> { + pub async fn unsubscribe>( + &self, + topic: S, + ) -> Result, ClientError> { let (resolver, token) = Resolver::new(); let unsubscribe = Unsubscribe::new(topic.into()); let request = Request::Unsubscribe(unsubscribe, resolver); @@ -243,7 +247,10 @@ impl AsyncClient { } /// Attempts to send a MQTT Unsubscribe to the `EventLoop` - pub fn try_unsubscribe>(&self, topic: S) -> Result, ClientError> { + pub fn try_unsubscribe>( + &self, + topic: S, + ) -> Result, ClientError> { let (resolver, token) = Resolver::new(); let unsubscribe = Unsubscribe::new(topic.into()); let request = Request::Unsubscribe(unsubscribe, resolver); @@ -271,10 +278,10 @@ impl AsyncClient { } } -fn get_ack_req(publish: &Publish, resolver: Resolver<()>) -> Option { +fn get_ack_req(publish: &Publish, resolver: Resolver) -> Option { let ack = match publish.qos { QoS::AtMostOnce => { - resolver.resolve(()); + resolver.resolve(AckOfAck::None); return None; } QoS::AtLeastOnce => Request::PubAck(PubAck::new(publish.pkid), resolver), @@ -331,7 +338,7 @@ impl Client { qos: QoS, retain: bool, payload: V, - ) -> Result, ClientError> + ) -> Result, ClientError> where S: Into, V: Into>, @@ -355,7 +362,7 @@ impl Client { qos: QoS, retain: bool, payload: V, - ) -> Result, ClientError> + ) -> Result, ClientError> where S: Into, V: Into>, @@ -364,7 +371,7 @@ impl Client { } /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. - pub fn ack(&self, publish: &Publish) -> Result, ClientError> { + pub fn ack(&self, publish: &Publish) -> Result, ClientError> { let (resolver, token) = Resolver::new(); let ack = get_ack_req(publish, resolver); if let Some(ack) = ack { @@ -375,7 +382,7 @@ impl Client { } /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. - pub fn try_ack(&self, publish: &Publish) -> Result, ClientError> { + pub fn try_ack(&self, publish: &Publish) -> Result, ClientError> { self.client.try_ack(publish) } @@ -384,7 +391,7 @@ impl Client { &self, topic: S, qos: QoS, - ) -> Result, ClientError> { + ) -> Result, ClientError> { let (resolver, token) = Resolver::new(); let subscribe = Subscribe::new(topic, qos); let is_valid = subscribe_has_valid_filters(&subscribe); @@ -402,12 +409,12 @@ impl Client { &self, topic: S, qos: QoS, - ) -> Result, ClientError> { + ) -> Result, ClientError> { self.client.try_subscribe(topic, qos) } /// Sends a MQTT Subscribe for multiple topics to the `EventLoop` - pub fn subscribe_many(&self, topics: T) -> Result, ClientError> + pub fn subscribe_many(&self, topics: T) -> Result, ClientError> where T: IntoIterator, { @@ -424,7 +431,7 @@ impl Client { Ok(token) } - pub fn try_subscribe_many(&self, topics: T) -> Result, ClientError> + pub fn try_subscribe_many(&self, topics: T) -> Result, ClientError> where T: IntoIterator, { @@ -432,7 +439,7 @@ impl Client { } /// Sends a MQTT Unsubscribe to the `EventLoop` - pub fn unsubscribe>(&self, topic: S) -> Result, ClientError> { + pub fn unsubscribe>(&self, topic: S) -> Result, ClientError> { let (resolver, token) = Resolver::new(); let unsubscribe = Unsubscribe::new(topic.into()); let request = Request::Unsubscribe(unsubscribe, resolver); @@ -442,7 +449,10 @@ impl Client { } /// Sends a MQTT Unsubscribe to the `EventLoop` - pub fn try_unsubscribe>(&self, topic: S) -> Result, ClientError> { + pub fn try_unsubscribe>( + &self, + topic: S, + ) -> Result, ClientError> { self.client.try_unsubscribe(topic) } diff --git a/rumqttc/src/lib.rs b/rumqttc/src/lib.rs index f669508e..8cf870c8 100644 --- a/rumqttc/src/lib.rs +++ b/rumqttc/src/lib.rs @@ -160,6 +160,21 @@ pub use proxy::{Proxy, ProxyAuth, ProxyType}; pub type Incoming = Packet; +/// Used to encapsulate all publish/pubrec acknowledgements in v4 +#[derive(Debug, PartialEq)] +pub enum AckOfPub { + PubAck(PubAck), + PubComp(PubComp), + None, +} + +/// Used to encapsulate all ack/pubrel acknowledgements in v4 +#[derive(Debug)] +pub enum AckOfAck { + None, + PubRel(PubRel), +} + /// Current outgoing activity on the eventloop #[derive(Debug, Clone, PartialEq, Eq)] pub enum Outgoing { @@ -191,12 +206,12 @@ pub enum Outgoing { /// handled one by one. #[derive(Debug)] pub enum Request { - Publish(Publish, Resolver), - PubAck(PubAck, Resolver<()>), - PubRec(PubRec, Resolver<()>), - PubRel(PubRel, Resolver), - Subscribe(Subscribe, Resolver), - Unsubscribe(Unsubscribe, Resolver), + Publish(Publish, Resolver), + PubAck(PubAck, Resolver), + PubRec(PubRec, Resolver), + PubRel(PubRel, Resolver), + Subscribe(Subscribe, Resolver), + Unsubscribe(Unsubscribe, Resolver), Disconnect(Resolver<()>), PingReq, } diff --git a/rumqttc/src/state.rs b/rumqttc/src/state.rs index 4698996a..b1f918d0 100644 --- a/rumqttc/src/state.rs +++ b/rumqttc/src/state.rs @@ -1,5 +1,5 @@ -use crate::Pkid; use crate::{tokens::Resolver, Event, Incoming, Outgoing, Request}; +use crate::{AckOfAck, AckOfPub, Pkid}; use crate::mqttbytes::v4::*; use crate::mqttbytes::{self, *}; @@ -68,17 +68,19 @@ pub struct MqttState { /// Packet ids on incoming QoS 2 publishes pub(crate) incoming_pub: FixedBitSet, /// Last collision due to broker not acking in order - pub collision: Option<(Publish, Resolver)>, + pub collision: Option<(Publish, Resolver)>, /// Buffered incoming packets pub events: VecDeque, /// Indicates if acknowledgements should be send immediately pub manual_acks: bool, /// Waiters for publish acknowledgements - pub_ack_waiter: HashMap>, + pub_ack_waiter: HashMap>, + /// Waiters for PubRel, qos 2 + pub_rel_waiter: HashMap>, /// Waiters for subscribe acknowledgements - sub_ack_waiter: HashMap>, + sub_ack_waiter: HashMap>, /// Waiters for unsubscribe acknowledgements - unsub_ack_waiter: HashMap>, + unsub_ack_waiter: HashMap>, } impl MqttState { @@ -104,6 +106,7 @@ impl MqttState { events: VecDeque::with_capacity(100), manual_acks, pub_ack_waiter: HashMap::with_capacity(max_inflight as usize), + pub_rel_waiter: HashMap::with_capacity(max_inflight as usize), sub_ack_waiter: HashMap::with_capacity(max_inflight as usize), unsub_ack_waiter: HashMap::with_capacity(max_inflight as usize), } @@ -167,13 +170,10 @@ impl MqttState { self.outgoing_disconnect()? } Request::PubAck(puback, resolver) => { - resolver.resolve(()); + resolver.resolve(AckOfAck::None); self.outgoing_puback(puback)? } - Request::PubRec(pubrec, resolver) => { - resolver.resolve(()); - self.outgoing_pubrec(pubrec)? - } + Request::PubRec(pubrec, resolver) => self.outgoing_pubrec(pubrec, resolver)?, }; self.last_outgoing = Instant::now(); @@ -213,15 +213,8 @@ impl MqttState { let Some(resolver) = self.sub_ack_waiter.remove(&suback.pkid) else { return Err(StateError::Unsolicited(suback.pkid)); }; - if suback - .return_codes - .iter() - .all(|r| matches!(r, SubscribeReasonCode::Success(_))) - { - resolver.resolve(suback.pkid); - } else { - resolver.reject(suback.return_codes); - } + + resolver.resolve(suback); Ok(None) } @@ -234,7 +227,7 @@ impl MqttState { return Err(StateError::Unsolicited(unsuback.pkid)); }; - resolver.resolve(unsuback.pkid); + resolver.resolve(unsuback); Ok(None) } @@ -259,7 +252,8 @@ impl MqttState { if !self.manual_acks { let pubrec = PubRec::new(pkid); - return self.outgoing_pubrec(pubrec); + let (resolver, _) = Resolver::new(); + return self.outgoing_pubrec(pubrec, resolver); } Ok(None) } @@ -267,39 +261,38 @@ impl MqttState { } fn handle_incoming_puback(&mut self, puback: PubAck) -> Result, StateError> { + let pkid = puback.pkid; let p = self .outgoing_pub - .get_mut(puback.pkid as usize) - .ok_or(StateError::Unsolicited(puback.pkid))?; + .get_mut(pkid as usize) + .ok_or(StateError::Unsolicited(pkid))?; - self.last_puback = puback.pkid; + self.last_puback = pkid; if p.take().is_none() { - error!("Unsolicited puback packet: {:?}", puback.pkid); - return Err(StateError::Unsolicited(puback.pkid)); + error!("Unsolicited puback packet: {pkid:?}"); + return Err(StateError::Unsolicited(pkid)); } - let Some(resolver) = self.pub_ack_waiter.remove(&puback.pkid) else { - return Err(StateError::Unsolicited(puback.pkid)); + let Some(resolver) = self.pub_ack_waiter.remove(&pkid) else { + return Err(StateError::Unsolicited(pkid)); }; // Resolve promise for QoS 1 - resolver.resolve(puback.pkid); + resolver.resolve(AckOfPub::PubAck(puback)); self.inflight -= 1; - let packet = self - .check_collision(puback.pkid) - .map(|(publish, resolver)| { - self.outgoing_pub[publish.pkid as usize] = Some(publish.clone()); - self.inflight += 1; + let packet = self.check_collision(pkid).map(|(publish, resolver)| { + self.outgoing_pub[publish.pkid as usize] = Some(publish.clone()); + self.inflight += 1; - let event = Event::Outgoing(Outgoing::Publish(publish.pkid)); - self.events.push_back(event); - self.collision_ping_count = 0; - self.pub_ack_waiter.insert(publish.pkid, resolver); + let event = Event::Outgoing(Outgoing::Publish(publish.pkid)); + self.events.push_back(event); + self.collision_ping_count = 0; + self.pub_ack_waiter.insert(publish.pkid, resolver); - Packet::Publish(publish) - }); + Packet::Publish(publish) + }); Ok(packet) } @@ -326,44 +319,47 @@ impl MqttState { } fn handle_incoming_pubrel(&mut self, pubrel: PubRel) -> Result, StateError> { - if !self.incoming_pub.contains(pubrel.pkid as usize) { - error!("Unsolicited pubrel packet: {:?}", pubrel.pkid); - return Err(StateError::Unsolicited(pubrel.pkid)); + let pkid = pubrel.pkid; + if !self.incoming_pub.contains(pkid as usize) { + error!("Unsolicited pubrel packet: {:?}", pkid); + return Err(StateError::Unsolicited(pkid)); } - self.incoming_pub.set(pubrel.pkid as usize, false); - let event = Event::Outgoing(Outgoing::PubComp(pubrel.pkid)); - let pubcomp = PubComp { pkid: pubrel.pkid }; + let resolver = self.pub_rel_waiter.remove(&pkid).unwrap(); + resolver.resolve(AckOfAck::PubRel(pubrel)); + + self.incoming_pub.set(pkid as usize, false); + let event = Event::Outgoing(Outgoing::PubComp(pkid)); + let pubcomp = PubComp { pkid }; self.events.push_back(event); Ok(Some(Packet::PubComp(pubcomp))) } fn handle_incoming_pubcomp(&mut self, pubcomp: PubComp) -> Result, StateError> { - if !self.outgoing_rel.contains(pubcomp.pkid as usize) { - error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid); - return Err(StateError::Unsolicited(pubcomp.pkid)); + let pkid = pubcomp.pkid; + if !self.outgoing_rel.contains(pkid as usize) { + error!("Unsolicited pubcomp packet: {pkid:?}"); + return Err(StateError::Unsolicited(pkid)); } - let Some(resolver) = self.pub_ack_waiter.remove(&pubcomp.pkid) else { - return Err(StateError::Unsolicited(pubcomp.pkid)); + let Some(resolver) = self.pub_ack_waiter.remove(&pkid) else { + return Err(StateError::Unsolicited(pkid)); }; // Resolve promise for QoS 2 - resolver.resolve(pubcomp.pkid); + resolver.resolve(AckOfPub::PubComp(pubcomp)); - self.outgoing_rel.set(pubcomp.pkid as usize, false); + self.outgoing_rel.set(pkid as usize, false); self.inflight -= 1; - let packet = self - .check_collision(pubcomp.pkid) - .map(|(publish, resolver)| { - let event = Event::Outgoing(Outgoing::Publish(publish.pkid)); - self.events.push_back(event); - self.collision_ping_count = 0; - self.pub_ack_waiter.insert(publish.pkid, resolver); + let packet = self.check_collision(pkid).map(|(publish, resolver)| { + let event = Event::Outgoing(Outgoing::Publish(publish.pkid)); + self.events.push_back(event); + self.collision_ping_count = 0; + self.pub_ack_waiter.insert(publish.pkid, resolver); - Packet::Publish(publish) - }); + Packet::Publish(publish) + }); Ok(packet) } @@ -379,7 +375,7 @@ impl MqttState { fn outgoing_publish( &mut self, mut publish: Publish, - resolver: Resolver, + resolver: Resolver, ) -> Result, StateError> { if publish.qos != QoS::AtMostOnce { if publish.pkid == 0 { @@ -416,7 +412,7 @@ impl MqttState { let event = Event::Outgoing(Outgoing::Publish(publish.pkid)); self.events.push_back(event); if publish.qos == QoS::AtMostOnce { - resolver.resolve(publish.pkid); + resolver.resolve(AckOfPub::None); } else { self.pub_ack_waiter.insert(publish.pkid, resolver); } @@ -427,7 +423,7 @@ impl MqttState { fn outgoing_pubrel( &mut self, pubrel: PubRel, - resolver: Resolver, + resolver: Resolver, ) -> Result, StateError> { let pubrel = self.save_pubrel(pubrel)?; @@ -446,9 +442,14 @@ impl MqttState { Ok(Some(Packet::PubAck(puback))) } - fn outgoing_pubrec(&mut self, pubrec: PubRec) -> Result, StateError> { + fn outgoing_pubrec( + &mut self, + pubrec: PubRec, + resolver: Resolver, + ) -> Result, StateError> { let event = Event::Outgoing(Outgoing::PubRec(pubrec.pkid)); self.events.push_back(event); + self.pub_rel_waiter.insert(pubrec.pkid, resolver); Ok(Some(Packet::PubRec(pubrec))) } @@ -491,7 +492,7 @@ impl MqttState { fn outgoing_subscribe( &mut self, mut subscription: Subscribe, - resolver: Resolver, + resolver: Resolver, ) -> Result, StateError> { if subscription.filters.is_empty() { return Err(StateError::EmptySubscription); @@ -515,7 +516,7 @@ impl MqttState { fn outgoing_unsubscribe( &mut self, mut unsub: Unsubscribe, - resolver: Resolver, + resolver: Resolver, ) -> Result, StateError> { let pkid = self.next_pkid(); unsub.pkid = pkid; @@ -541,7 +542,7 @@ impl MqttState { Ok(Some(Packet::Disconnect)) } - fn check_collision(&mut self, pkid: u16) -> Option<(Publish, Resolver)> { + fn check_collision(&mut self, pkid: u16) -> Option<(Publish, Resolver)> { if let Some((publish, _)) = &self.collision { if publish.pkid == pkid { return self.collision.take(); diff --git a/rumqttc/src/tokens.rs b/rumqttc/src/tokens.rs index 4d30cac5..040fedb4 100644 --- a/rumqttc/src/tokens.rs +++ b/rumqttc/src/tokens.rs @@ -6,27 +6,12 @@ use std::{ }; use tokio::sync::oneshot::{self, error::TryRecvError}; -pub trait Reason: Debug + Send {} -impl Reason for T where T: Debug + Send {} - -#[derive(Debug, thiserror::Error)] -#[error("Broker rejected the request, reason: {0:?}")] -pub struct Rejection(Box); - -impl Rejection { - fn new(reason: R) -> Self { - Self(Box::new(reason)) - } -} - #[derive(Debug, thiserror::Error)] pub enum TokenError { #[error("Sender has nothing to send instantly")] Waiting, #[error("Sender side of channel was dropped")] Disconnected, - #[error("Broker rejected the request, reason: {0:?}")] - Rejection(#[from] Rejection), } pub type NoResponse = (); @@ -35,7 +20,7 @@ pub type NoResponse = (); /// 1. Packet is acknowldged by the broker, e.g. QoS 1/2 Publish, Subscribe and Unsubscribe /// 2. QoS 0 packet finishes processing in the [`EventLoop`] pub struct Token { - rx: oneshot::Receiver>, + rx: oneshot::Receiver, } impl Future for Token { @@ -45,8 +30,7 @@ impl Future for Token { let polled = unsafe { self.map_unchecked_mut(|s| &mut s.rx) }.poll(cx); match polled { - Poll::Ready(Ok(Ok(p))) => Poll::Ready(Ok(p)), - Poll::Ready(Ok(Err(e))) => Poll::Ready(Err(TokenError::Rejection(e))), + Poll::Ready(Ok(p)) => Poll::Ready(Ok(p)), Poll::Ready(Err(_)) => Poll::Ready(Err(TokenError::Disconnected)), Poll::Pending => Poll::Pending, } @@ -66,8 +50,7 @@ impl Token { pub fn wait(self) -> Result { self.rx .blocking_recv() - .map_err(|_| TokenError::Disconnected)? - .map_err(TokenError::Rejection) + .map_err(|_| TokenError::Disconnected) } /// Attempts to check if the packet handling has been completed, without blocking the current thread. @@ -77,17 +60,16 @@ impl Token { /// Multiple calls to this functions can fail with [`TokenError::Disconnected`] /// if the promise has already been resolved. pub fn check(&mut self) -> Result { - match self.rx.try_recv() { - Ok(r) => r.map_err(TokenError::Rejection), - Err(TryRecvError::Empty) => Err(TokenError::Waiting), - Err(TryRecvError::Closed) => Err(TokenError::Disconnected), - } + self.rx.try_recv().map_err(|e| match e { + TryRecvError::Empty => TokenError::Waiting, + TryRecvError::Closed => TokenError::Disconnected, + }) } } #[derive(Debug)] pub struct Resolver { - tx: oneshot::Sender>, + tx: oneshot::Sender, } impl Resolver { @@ -105,13 +87,7 @@ impl Resolver { } pub fn resolve(self, resolved: T) { - if self.tx.send(Ok(resolved)).is_err() { - trace!("Promise was dropped") - } - } - - pub fn reject(self, reasons: R) { - if self.tx.send(Err(Rejection::new(reasons))).is_err() { + if self.tx.send(resolved).is_err() { trace!("Promise was dropped") } } diff --git a/rumqttc/src/v5/client.rs b/rumqttc/src/v5/client.rs index 6f537f4d..d08de136 100644 --- a/rumqttc/src/v5/client.rs +++ b/rumqttc/src/v5/client.rs @@ -3,13 +3,13 @@ use std::time::Duration; use super::mqttbytes::v5::{ - Filter, PubAck, PubRec, Publish, PublishProperties, Subscribe, SubscribeProperties, - Unsubscribe, UnsubscribeProperties, + Filter, PubAck, PubRec, Publish, PublishProperties, SubAck, Subscribe, SubscribeProperties, + UnsubAck, Unsubscribe, UnsubscribeProperties, }; use super::mqttbytes::QoS; -use super::{ConnectionError, Event, EventLoop, MqttOptions, Request}; +use super::{AckOfAck, AckOfPub, ConnectionError, Event, EventLoop, MqttOptions, Request}; use crate::tokens::{NoResponse, Resolver, Token}; -use crate::{valid_filter, valid_topic, Pkid}; +use crate::{valid_filter, valid_topic}; use bytes::Bytes; use flume::{SendError, Sender, TrySendError}; @@ -79,7 +79,7 @@ impl AsyncClient { retain: bool, payload: P, properties: Option, - ) -> Result, ClientError> + ) -> Result, ClientError> where S: Into, P: Into, @@ -104,7 +104,7 @@ impl AsyncClient { retain: bool, payload: P, properties: PublishProperties, - ) -> Result, ClientError> + ) -> Result, ClientError> where S: Into, P: Into, @@ -119,7 +119,7 @@ impl AsyncClient { qos: QoS, retain: bool, payload: P, - ) -> Result, ClientError> + ) -> Result, ClientError> where S: Into, P: Into, @@ -135,7 +135,7 @@ impl AsyncClient { retain: bool, payload: P, properties: Option, - ) -> Result, ClientError> + ) -> Result, ClientError> where S: Into, P: Into, @@ -160,7 +160,7 @@ impl AsyncClient { retain: bool, payload: P, properties: PublishProperties, - ) -> Result, ClientError> + ) -> Result, ClientError> where S: Into, P: Into, @@ -174,7 +174,7 @@ impl AsyncClient { qos: QoS, retain: bool, payload: P, - ) -> Result, ClientError> + ) -> Result, ClientError> where S: Into, P: Into, @@ -183,7 +183,7 @@ impl AsyncClient { } /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. - pub async fn ack(&self, publish: &Publish) -> Result, ClientError> { + pub async fn ack(&self, publish: &Publish) -> Result, ClientError> { let (resolver, token) = Resolver::new(); let ack = get_ack_req(publish, resolver); @@ -195,7 +195,7 @@ impl AsyncClient { } /// Attempts to send a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. - pub fn try_ack(&self, publish: &Publish) -> Result, ClientError> { + pub fn try_ack(&self, publish: &Publish) -> Result, ClientError> { let (resolver, token) = Resolver::new(); let ack = get_ack_req(publish, resolver); if let Some(ack) = ack { @@ -213,7 +213,7 @@ impl AsyncClient { retain: bool, payload: Bytes, properties: Option, - ) -> Result, ClientError> + ) -> Result, ClientError> where S: Into, { @@ -234,7 +234,7 @@ impl AsyncClient { retain: bool, payload: Bytes, properties: PublishProperties, - ) -> Result, ClientError> + ) -> Result, ClientError> where S: Into, { @@ -248,7 +248,7 @@ impl AsyncClient { qos: QoS, retain: bool, payload: Bytes, - ) -> Result, ClientError> + ) -> Result, ClientError> where S: Into, { @@ -262,7 +262,7 @@ impl AsyncClient { topic: S, qos: QoS, properties: Option, - ) -> Result, ClientError> { + ) -> Result, ClientError> { let (resolver, token) = Resolver::new(); let filter = Filter::new(topic, qos); let subscribe = Subscribe::new(filter, properties); @@ -281,7 +281,7 @@ impl AsyncClient { topic: S, qos: QoS, properties: SubscribeProperties, - ) -> Result, ClientError> { + ) -> Result, ClientError> { self.handle_subscribe(topic, qos, Some(properties)).await } @@ -289,7 +289,7 @@ impl AsyncClient { &self, topic: S, qos: QoS, - ) -> Result, ClientError> { + ) -> Result, ClientError> { self.handle_subscribe(topic, qos, None).await } @@ -299,7 +299,7 @@ impl AsyncClient { topic: S, qos: QoS, properties: Option, - ) -> Result, ClientError> { + ) -> Result, ClientError> { let (resolver, token) = Resolver::new(); let filter = Filter::new(topic, qos); let subscribe = Subscribe::new(filter, properties); @@ -318,7 +318,7 @@ impl AsyncClient { topic: S, qos: QoS, properties: SubscribeProperties, - ) -> Result, ClientError> { + ) -> Result, ClientError> { self.handle_try_subscribe(topic, qos, Some(properties)) } @@ -326,7 +326,7 @@ impl AsyncClient { &self, topic: S, qos: QoS, - ) -> Result, ClientError> { + ) -> Result, ClientError> { self.handle_try_subscribe(topic, qos, None) } @@ -335,7 +335,7 @@ impl AsyncClient { &self, topics: T, properties: Option, - ) -> Result, ClientError> + ) -> Result, ClientError> where T: IntoIterator, { @@ -355,14 +355,14 @@ impl AsyncClient { &self, topics: T, properties: SubscribeProperties, - ) -> Result, ClientError> + ) -> Result, ClientError> where T: IntoIterator, { self.handle_subscribe_many(topics, Some(properties)).await } - pub async fn subscribe_many(&self, topics: T) -> Result, ClientError> + pub async fn subscribe_many(&self, topics: T) -> Result, ClientError> where T: IntoIterator, { @@ -374,7 +374,7 @@ impl AsyncClient { &self, topics: T, properties: Option, - ) -> Result, ClientError> + ) -> Result, ClientError> where T: IntoIterator, { @@ -394,14 +394,14 @@ impl AsyncClient { &self, topics: T, properties: SubscribeProperties, - ) -> Result, ClientError> + ) -> Result, ClientError> where T: IntoIterator, { self.handle_try_subscribe_many(topics, Some(properties)) } - pub fn try_subscribe_many(&self, topics: T) -> Result, ClientError> + pub fn try_subscribe_many(&self, topics: T) -> Result, ClientError> where T: IntoIterator, { @@ -413,7 +413,7 @@ impl AsyncClient { &self, topic: S, properties: Option, - ) -> Result, ClientError> { + ) -> Result, ClientError> { let (resolver, token) = Resolver::new(); let unsubscribe = Unsubscribe::new(topic, properties); let request = Request::Unsubscribe(unsubscribe, resolver); @@ -426,11 +426,14 @@ impl AsyncClient { &self, topic: S, properties: UnsubscribeProperties, - ) -> Result, ClientError> { + ) -> Result, ClientError> { self.handle_unsubscribe(topic, Some(properties)).await } - pub async fn unsubscribe>(&self, topic: S) -> Result, ClientError> { + pub async fn unsubscribe>( + &self, + topic: S, + ) -> Result, ClientError> { self.handle_unsubscribe(topic, None).await } @@ -439,7 +442,7 @@ impl AsyncClient { &self, topic: S, properties: Option, - ) -> Result, ClientError> { + ) -> Result, ClientError> { let (resolver, token) = Resolver::new(); let unsubscribe = Unsubscribe::new(topic, properties); let request = Request::Unsubscribe(unsubscribe, resolver); @@ -452,11 +455,14 @@ impl AsyncClient { &self, topic: S, properties: UnsubscribeProperties, - ) -> Result, ClientError> { + ) -> Result, ClientError> { self.handle_try_unsubscribe(topic, Some(properties)) } - pub fn try_unsubscribe>(&self, topic: S) -> Result, ClientError> { + pub fn try_unsubscribe>( + &self, + topic: S, + ) -> Result, ClientError> { self.handle_try_unsubscribe(topic, None) } @@ -479,10 +485,10 @@ impl AsyncClient { } } -fn get_ack_req(publish: &Publish, resolver: Resolver<()>) -> Option { +fn get_ack_req(publish: &Publish, resolver: Resolver) -> Option { let ack = match publish.qos { QoS::AtMostOnce => { - resolver.resolve(()); + resolver.resolve(AckOfAck::None); return None; } QoS::AtLeastOnce => Request::PubAck(PubAck::new(publish.pkid, None), resolver), @@ -541,7 +547,7 @@ impl Client { retain: bool, payload: P, properties: Option, - ) -> Result, ClientError> + ) -> Result, ClientError> where S: Into, P: Into, @@ -566,7 +572,7 @@ impl Client { retain: bool, payload: P, properties: PublishProperties, - ) -> Result, ClientError> + ) -> Result, ClientError> where S: Into, P: Into, @@ -580,7 +586,7 @@ impl Client { qos: QoS, retain: bool, payload: P, - ) -> Result, ClientError> + ) -> Result, ClientError> where S: Into, P: Into, @@ -595,7 +601,7 @@ impl Client { retain: bool, payload: P, properties: PublishProperties, - ) -> Result, ClientError> + ) -> Result, ClientError> where S: Into, P: Into, @@ -610,7 +616,7 @@ impl Client { qos: QoS, retain: bool, payload: P, - ) -> Result, ClientError> + ) -> Result, ClientError> where S: Into, P: Into, @@ -619,7 +625,7 @@ impl Client { } /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. - pub fn ack(&self, publish: &Publish) -> Result, ClientError> { + pub fn ack(&self, publish: &Publish) -> Result, ClientError> { let (resolver, token) = Resolver::new(); let ack = get_ack_req(publish, resolver); @@ -631,7 +637,7 @@ impl Client { } /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. - pub fn try_ack(&self, publish: &Publish) -> Result, ClientError> { + pub fn try_ack(&self, publish: &Publish) -> Result, ClientError> { self.client.try_ack(publish) } @@ -641,7 +647,7 @@ impl Client { topic: S, qos: QoS, properties: Option, - ) -> Result, ClientError> { + ) -> Result, ClientError> { let (resolver, token) = Resolver::new(); let filter = Filter::new(topic, qos); let subscribe = Subscribe::new(filter, properties); @@ -660,7 +666,7 @@ impl Client { topic: S, qos: QoS, properties: SubscribeProperties, - ) -> Result, ClientError> { + ) -> Result, ClientError> { self.handle_subscribe(topic, qos, Some(properties)) } @@ -668,7 +674,7 @@ impl Client { &self, topic: S, qos: QoS, - ) -> Result, ClientError> { + ) -> Result, ClientError> { self.handle_subscribe(topic, qos, None) } @@ -678,7 +684,7 @@ impl Client { topic: S, qos: QoS, properties: SubscribeProperties, - ) -> Result, ClientError> { + ) -> Result, ClientError> { self.client .try_subscribe_with_properties(topic, qos, properties) } @@ -687,7 +693,7 @@ impl Client { &self, topic: S, qos: QoS, - ) -> Result, ClientError> { + ) -> Result, ClientError> { self.client.try_subscribe(topic, qos) } @@ -696,7 +702,7 @@ impl Client { &self, topics: T, properties: Option, - ) -> Result, ClientError> + ) -> Result, ClientError> where T: IntoIterator, { @@ -716,14 +722,14 @@ impl Client { &self, topics: T, properties: SubscribeProperties, - ) -> Result, ClientError> + ) -> Result, ClientError> where T: IntoIterator, { self.handle_subscribe_many(topics, Some(properties)) } - pub fn subscribe_many(&self, topics: T) -> Result, ClientError> + pub fn subscribe_many(&self, topics: T) -> Result, ClientError> where T: IntoIterator, { @@ -734,7 +740,7 @@ impl Client { &self, topics: T, properties: SubscribeProperties, - ) -> Result, ClientError> + ) -> Result, ClientError> where T: IntoIterator, { @@ -742,7 +748,7 @@ impl Client { .try_subscribe_many_with_properties(topics, properties) } - pub fn try_subscribe_many(&self, topics: T) -> Result, ClientError> + pub fn try_subscribe_many(&self, topics: T) -> Result, ClientError> where T: IntoIterator, { @@ -754,7 +760,7 @@ impl Client { &self, topic: S, properties: Option, - ) -> Result, ClientError> { + ) -> Result, ClientError> { let (resolver, token) = Resolver::new(); let unsubscribe = Unsubscribe::new(topic, properties); let request = Request::Unsubscribe(unsubscribe, resolver); @@ -767,11 +773,11 @@ impl Client { &self, topic: S, properties: UnsubscribeProperties, - ) -> Result, ClientError> { + ) -> Result, ClientError> { self.handle_unsubscribe(topic, Some(properties)) } - pub fn unsubscribe>(&self, topic: S) -> Result, ClientError> { + pub fn unsubscribe>(&self, topic: S) -> Result, ClientError> { self.handle_unsubscribe(topic, None) } @@ -780,12 +786,15 @@ impl Client { &self, topic: S, properties: UnsubscribeProperties, - ) -> Result, ClientError> { + ) -> Result, ClientError> { self.client .try_unsubscribe_with_properties(topic, properties) } - pub fn try_unsubscribe>(&self, topic: S) -> Result, ClientError> { + pub fn try_unsubscribe>( + &self, + topic: S, + ) -> Result, ClientError> { self.client.try_unsubscribe(topic) } diff --git a/rumqttc/src/v5/mod.rs b/rumqttc/src/v5/mod.rs index 22b1942c..d1b044b3 100644 --- a/rumqttc/src/v5/mod.rs +++ b/rumqttc/src/v5/mod.rs @@ -15,8 +15,7 @@ pub mod mqttbytes; mod state; use crate::tokens::Resolver; -use crate::{NetworkOptions, Transport}; -use crate::{Outgoing, Pkid}; +use crate::{NetworkOptions, Outgoing, Transport}; use mqttbytes::v5::*; @@ -32,16 +31,31 @@ pub use crate::proxy::{Proxy, ProxyAuth, ProxyType}; pub type Incoming = Packet; +/// Used to encapsulate all publish acknowledgents in v5 +#[derive(Debug)] +pub enum AckOfPub { + PubAck(PubAck), + PubComp(PubComp), + None, +} + +/// Used to encapsulate all ack/pubrel acknowledgements in v5 +#[derive(Debug)] +pub enum AckOfAck { + None, + PubRel(PubRel), +} + /// Requests by the client to mqtt event loop. Request are /// handled one by one. #[derive(Debug)] pub enum Request { - Publish(Publish, Resolver), - PubAck(PubAck, Resolver<()>), - PubRec(PubRec, Resolver<()>), - PubRel(PubRel, Resolver), - Subscribe(Subscribe, Resolver), - Unsubscribe(Unsubscribe, Resolver), + Publish(Publish, Resolver), + PubAck(PubAck, Resolver), + PubRec(PubRec, Resolver), + PubRel(PubRel, Resolver), + Subscribe(Subscribe, Resolver), + Unsubscribe(Unsubscribe, Resolver), Disconnect(Resolver<()>), PingReq, } diff --git a/rumqttc/src/v5/state.rs b/rumqttc/src/v5/state.rs index 6f9e430c..d291fbd5 100644 --- a/rumqttc/src/v5/state.rs +++ b/rumqttc/src/v5/state.rs @@ -10,7 +10,7 @@ use super::{ }, Error as MqttError, QoS, }, - Event, Incoming, Outgoing, Request, + AckOfAck, AckOfPub, Event, Incoming, Outgoing, Request, }; use bytes::Bytes; @@ -103,7 +103,7 @@ pub struct MqttState { /// Packet ids on incoming QoS 2 publishes pub(crate) incoming_pub: FixedBitSet, /// Last collision due to broker not acking in order - pub collision: Option<(Publish, Resolver)>, + pub collision: Option<(Publish, Resolver)>, /// Buffered incoming packets pub events: VecDeque, /// Indicates if acknowledgements should be send immediately @@ -116,12 +116,14 @@ pub struct MqttState { pub(crate) max_outgoing_inflight: u16, /// Upper limit on the maximum number of allowed inflight QoS1 & QoS2 requests max_outgoing_inflight_upper_limit: u16, - /// Waiters for publish acknowledgements - pub_ack_waiter: HashMap>, + /// Waiters for publish acknowledgements, qos 1/2 + pub_ack_waiter: HashMap>, + /// Waiters for PubRel, qos 2 + pub_rel_waiter: HashMap>, /// Waiters for subscribe acknowledgements - sub_ack_waiter: HashMap>, + sub_ack_waiter: HashMap>, /// Waiters for unsubscribe acknowledgements - unsub_ack_waiter: HashMap>, + unsub_ack_waiter: HashMap>, } impl MqttState { @@ -150,6 +152,7 @@ impl MqttState { max_outgoing_inflight: max_inflight, max_outgoing_inflight_upper_limit: max_inflight, pub_ack_waiter: HashMap::with_capacity(max_inflight as usize), + pub_rel_waiter: HashMap::with_capacity(max_inflight as usize), sub_ack_waiter: HashMap::with_capacity(max_inflight as usize), unsub_ack_waiter: HashMap::with_capacity(max_inflight as usize), } @@ -209,13 +212,10 @@ impl MqttState { self.outgoing_disconnect(DisconnectReasonCode::NormalDisconnection)? } Request::PubAck(puback, resolver) => { - resolver.resolve(()); + resolver.resolve(super::AckOfAck::None); self.outgoing_puback(puback)? } - Request::PubRec(pubrec, resolver) => { - resolver.resolve(()); - self.outgoing_pubrec(pubrec)? - } + Request::PubRec(pubrec, resolver) => self.outgoing_pubrec(pubrec, resolver)?, }; self.last_outgoing = Instant::now(); @@ -275,15 +275,7 @@ impl MqttState { } } - if suback - .return_codes - .iter() - .all(|r| matches!(r, SubscribeReasonCode::Success(_))) - { - resolver.resolve(suback.pkid); - } else { - resolver.reject(suback.return_codes); - } + resolver.resolve(suback); Ok(None) } @@ -302,15 +294,7 @@ impl MqttState { } } - if unsuback - .reasons - .iter() - .all(|r| matches!(r, UnsubAckReason::Success)) - { - resolver.resolve(unsuback.pkid); - } else { - resolver.reject(unsuback.reasons); - } + resolver.resolve(unsuback); Ok(None) } @@ -393,7 +377,8 @@ impl MqttState { if !self.manual_acks { let pubrec = PubRec::new(pkid, None); - return self.outgoing_pubrec(pubrec); + let (resolver, _) = Resolver::new(); + return self.outgoing_pubrec(pubrec, resolver); } Ok(None) } @@ -412,11 +397,7 @@ impl MqttState { .take(); // Resolve promise for QoS 1 - if puback.reason == PubAckReason::Success { - resolver.resolve(puback.pkid); - } else { - resolver.reject(puback.reason); - } + resolver.resolve(AckOfPub::PubAck(puback.clone())); self.inflight -= 1; @@ -482,6 +463,9 @@ impl MqttState { } self.incoming_pub.set(pubrel.pkid as usize, false); + let resolver = self.pub_rel_waiter.remove(&pubrel.pkid).unwrap(); + resolver.resolve(AckOfAck::PubRel(pubrel.clone())); + if pubrel.reason != PubRelReason::Success { warn!( "PubRel Pkid = {:?}, reason: {:?}", @@ -503,11 +487,7 @@ impl MqttState { }; // Resolve promise for QoS 2 - if pubcomp.reason == PubCompReason::Success { - resolver.resolve(pubcomp.pkid); - } else { - resolver.reject(pubcomp.reason); - } + resolver.resolve(AckOfPub::PubComp(pubcomp.clone())); self.outgoing_rel.set(pubcomp.pkid as usize, false); let outgoing = self @@ -543,7 +523,7 @@ impl MqttState { fn outgoing_publish( &mut self, mut publish: Publish, - resolver: Resolver, + resolver: Resolver, ) -> Result, StateError> { if publish.qos != QoS::AtMostOnce { if publish.pkid == 0 { @@ -595,7 +575,7 @@ impl MqttState { let event = Event::Outgoing(Outgoing::Publish(pkid)); self.events.push_back(event); if publish.qos == QoS::AtMostOnce { - resolver.resolve(0); + resolver.resolve(AckOfPub::None) } else { self.pub_ack_waiter.insert(publish.pkid, resolver); } @@ -606,7 +586,7 @@ impl MqttState { fn outgoing_pubrel( &mut self, pubrel: PubRel, - resolver: Resolver, + resolver: Resolver, ) -> Result, StateError> { let pubrel = self.save_pubrel(pubrel)?; @@ -627,10 +607,15 @@ impl MqttState { Ok(Some(Packet::PubAck(puback))) } - fn outgoing_pubrec(&mut self, pubrec: PubRec) -> Result, StateError> { + fn outgoing_pubrec( + &mut self, + pubrec: PubRec, + resolver: Resolver, + ) -> Result, StateError> { let pkid = pubrec.pkid; let event = Event::Outgoing(Outgoing::PubRec(pkid)); self.events.push_back(event); + self.pub_rel_waiter.insert(pubrec.pkid, resolver); Ok(Some(Packet::PubRec(pubrec))) } @@ -670,7 +655,7 @@ impl MqttState { fn outgoing_subscribe( &mut self, mut subscription: Subscribe, - resolver: Resolver, + resolver: Resolver, ) -> Result, StateError> { if subscription.filters.is_empty() { return Err(StateError::EmptySubscription); @@ -695,7 +680,7 @@ impl MqttState { fn outgoing_unsubscribe( &mut self, mut unsub: Unsubscribe, - resolver: Resolver, + resolver: Resolver, ) -> Result, StateError> { let pkid = self.next_pkid(); unsub.pkid = pkid; @@ -724,7 +709,7 @@ impl MqttState { Ok(Some(Packet::Disconnect(Disconnect::new(reason)))) } - fn check_collision(&mut self, pkid: u16) -> Option<(Publish, Resolver)> { + fn check_collision(&mut self, pkid: u16) -> Option<(Publish, Resolver)> { if let Some((publish, _)) = &self.collision { if publish.pkid == pkid { return self.collision.take(); diff --git a/rumqttc/tests/reliability.rs b/rumqttc/tests/reliability.rs index 2915d3e8..098f697e 100644 --- a/rumqttc/tests/reliability.rs +++ b/rumqttc/tests/reliability.rs @@ -622,7 +622,7 @@ async fn resolve_on_qos0_before_write_to_tcp_buffer() { .await .unwrap() .unwrap(), - 0 + AckOfPub::None ); // Verify the packet still reached broker @@ -704,7 +704,7 @@ async fn resolve_on_qos1_ack_from_broker() { .await .unwrap() .unwrap(), - 1 + AckOfPub::PubAck(PubAck { pkid: 1 }) ); } @@ -777,7 +777,7 @@ async fn resolve_on_qos2_ack_from_broker() { .await .unwrap() .unwrap(), - 1 + AckOfPub::PubComp(PubComp { pkid: 1 }) ); } @@ -839,7 +839,8 @@ async fn resolve_on_sub_ack_from_broker() { timeout(Duration::from_secs(1), &mut token) .await .unwrap() - .unwrap(), + .unwrap() + .pkid, 1 ); } @@ -894,6 +895,6 @@ async fn resolve_on_unsub_ack_from_broker() { .await .unwrap() .unwrap(), - 1 + UnsubAck { pkid: 1 } ); }