Skip to content

Commit

Permalink
fix: token interfaces (#946)
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh authored Feb 22, 2025
1 parent 8be8afe commit 919c981
Show file tree
Hide file tree
Showing 11 changed files with 272 additions and 261 deletions.
8 changes: 4 additions & 4 deletions rumqttc/examples/ack_promise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.unwrap()
.await
{
Ok(pkid) => println!("Acknowledged Sub({pkid})"),
Ok(pkid) => println!("Acknowledged Sub({pkid:?})"),
Err(e) => println!("Subscription failed: {e:?}"),
}

Expand All @@ -46,7 +46,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.unwrap()
.await
{
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Ok(ack) => println!("Acknowledged Pub({ack:?})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}
Expand All @@ -66,14 +66,14 @@ async fn main() -> Result<(), Box<dyn Error>> {

while let Some(Ok(res)) = set.join_next().await {
match res {
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Ok(ack) => println!("Acknowledged Pub({ack:?})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}

// Unsubscribe and wait for broker acknowledgement
match client.unsubscribe("hello/world").await.unwrap().await {
Ok(pkid) => println!("Acknowledged Unsub({pkid})"),
Ok(ack) => println!("Acknowledged Unsub({ack:?})"),
Err(e) => println!("Unsubscription failed: {e:?}"),
}

Expand Down
8 changes: 4 additions & 4 deletions rumqttc/examples/ack_promise_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn main() -> Result<(), Box<dyn Error>> {
.unwrap()
.wait()
{
Ok(pkid) => println!("Acknowledged Sub({pkid})"),
Ok(pkid) => println!("Acknowledged Sub({pkid:?})"),
Err(e) => println!("Subscription failed: {e:?}"),
}

Expand All @@ -42,7 +42,7 @@ fn main() -> Result<(), Box<dyn Error>> {
.unwrap()
.wait()
{
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Ok(ack) => println!("Acknowledged Pub({ack:?})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}
Expand Down Expand Up @@ -83,14 +83,14 @@ fn main() -> Result<(), Box<dyn Error>> {

while let Ok(res) = rx.recv() {
match res {
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Ok(ack) => println!("Acknowledged Pub({ack:?})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}

// Unsubscribe and wait for broker acknowledgement
match client.unsubscribe("hello/world").unwrap().wait() {
Ok(pkid) => println!("Acknowledged Unsub({pkid})"),
Ok(ack) => println!("Acknowledged Unsub({ack:?})"),
Err(e) => println!("Unsubscription failed: {e:?}"),
}

Expand Down
8 changes: 4 additions & 4 deletions rumqttc/examples/ack_promise_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.unwrap()
.await
{
Ok(pkid) => println!("Acknowledged Sub({pkid})"),
Ok(pkid) => println!("Acknowledged Sub({pkid:?})"),
Err(e) => println!("Subscription failed: {e:?}"),
}

Expand All @@ -46,7 +46,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.unwrap()
.await
{
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Ok(pkid) => println!("Acknowledged Pub({pkid:?})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}
Expand All @@ -66,14 +66,14 @@ async fn main() -> Result<(), Box<dyn Error>> {

while let Some(Ok(res)) = set.join_next().await {
match res {
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Ok(pkid) => println!("Acknowledged Pub({pkid:?})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}

// Unsubscribe and wait for broker acknowledgement
match client.unsubscribe("hello/world").await.unwrap().await {
Ok(pkid) => println!("Acknowledged Unsub({pkid})"),
Ok(pkid) => println!("Acknowledged Unsub({pkid:?})"),
Err(e) => println!("Unsubscription failed: {e:?}"),
}

Expand Down
58 changes: 34 additions & 24 deletions rumqttc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::time::Duration;
use crate::mqttbytes::{v4::*, QoS};
use crate::tokens::{NoResponse, Resolver, Token};
use crate::{
valid_filter, valid_topic, ConnectionError, Event, EventLoop, MqttOptions, Pkid, Request,
valid_filter, valid_topic, AckOfAck, AckOfPub, ConnectionError, Event, EventLoop, MqttOptions,
Request,
};

use bytes::Bytes;
Expand Down Expand Up @@ -75,7 +76,7 @@ impl AsyncClient {
qos: QoS,
retain: bool,
payload: V,
) -> Result<Token<Pkid>, ClientError>
) -> Result<Token<AckOfPub>, ClientError>
where
S: Into<String>,
V: Into<Vec<u8>>,
Expand All @@ -100,7 +101,7 @@ impl AsyncClient {
qos: QoS,
retain: bool,
payload: V,
) -> Result<Token<Pkid>, ClientError>
) -> Result<Token<AckOfPub>, ClientError>
where
S: Into<String>,
V: Into<Vec<u8>>,
Expand All @@ -119,7 +120,7 @@ impl AsyncClient {
}

/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub async fn ack(&self, publish: &Publish) -> Result<Token<NoResponse>, ClientError> {
pub async fn ack(&self, publish: &Publish) -> Result<Token<AckOfAck>, ClientError> {
let (resolver, token) = Resolver::new();
let ack = get_ack_req(publish, resolver);
if let Some(ack) = ack {
Expand All @@ -130,7 +131,7 @@ impl AsyncClient {
}

/// Attempts to send a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn try_ack(&self, publish: &Publish) -> Result<Token<NoResponse>, ClientError> {
pub fn try_ack(&self, publish: &Publish) -> Result<Token<AckOfAck>, ClientError> {
let (resolver, token) = Resolver::new();
let ack = get_ack_req(publish, resolver);
if let Some(ack) = ack {
Expand All @@ -147,7 +148,7 @@ impl AsyncClient {
qos: QoS,
retain: bool,
payload: Bytes,
) -> Result<Token<Pkid>, ClientError>
) -> Result<Token<AckOfPub>, ClientError>
where
S: Into<String>,
{
Expand All @@ -165,7 +166,7 @@ impl AsyncClient {
&self,
topic: S,
qos: QoS,
) -> Result<Token<Pkid>, ClientError> {
) -> Result<Token<SubAck>, ClientError> {
let (resolver, token) = Resolver::new();
let subscribe = Subscribe::new(topic, qos);

Expand All @@ -184,7 +185,7 @@ impl AsyncClient {
&self,
topic: S,
qos: QoS,
) -> Result<Token<Pkid>, ClientError> {
) -> Result<Token<SubAck>, ClientError> {
let (resolver, token) = Resolver::new();
let subscribe = Subscribe::new(topic, qos);
let is_valid = subscribe_has_valid_filters(&subscribe);
Expand All @@ -198,7 +199,7 @@ impl AsyncClient {
}

/// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
pub async fn subscribe_many<T>(&self, topics: T) -> Result<Token<Pkid>, ClientError>
pub async fn subscribe_many<T>(&self, topics: T) -> Result<Token<SubAck>, ClientError>
where
T: IntoIterator<Item = SubscribeFilter>,
{
Expand All @@ -216,7 +217,7 @@ impl AsyncClient {
}

/// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop`
pub fn try_subscribe_many<T>(&self, topics: T) -> Result<Token<Pkid>, ClientError>
pub fn try_subscribe_many<T>(&self, topics: T) -> Result<Token<SubAck>, ClientError>
where
T: IntoIterator<Item = SubscribeFilter>,
{
Expand All @@ -233,7 +234,10 @@ impl AsyncClient {
}

/// Sends a MQTT Unsubscribe to the `EventLoop`
pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<Token<Pkid>, ClientError> {
pub async fn unsubscribe<S: Into<String>>(
&self,
topic: S,
) -> Result<Token<UnsubAck>, ClientError> {
let (resolver, token) = Resolver::new();
let unsubscribe = Unsubscribe::new(topic.into());
let request = Request::Unsubscribe(unsubscribe, resolver);
Expand All @@ -243,7 +247,10 @@ impl AsyncClient {
}

/// Attempts to send a MQTT Unsubscribe to the `EventLoop`
pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<Token<Pkid>, ClientError> {
pub fn try_unsubscribe<S: Into<String>>(
&self,
topic: S,
) -> Result<Token<UnsubAck>, ClientError> {
let (resolver, token) = Resolver::new();
let unsubscribe = Unsubscribe::new(topic.into());
let request = Request::Unsubscribe(unsubscribe, resolver);
Expand Down Expand Up @@ -271,10 +278,10 @@ impl AsyncClient {
}
}

fn get_ack_req(publish: &Publish, resolver: Resolver<()>) -> Option<Request> {
fn get_ack_req(publish: &Publish, resolver: Resolver<AckOfAck>) -> Option<Request> {
let ack = match publish.qos {
QoS::AtMostOnce => {
resolver.resolve(());
resolver.resolve(AckOfAck::None);
return None;
}
QoS::AtLeastOnce => Request::PubAck(PubAck::new(publish.pkid), resolver),
Expand Down Expand Up @@ -331,7 +338,7 @@ impl Client {
qos: QoS,
retain: bool,
payload: V,
) -> Result<Token<Pkid>, ClientError>
) -> Result<Token<AckOfPub>, ClientError>
where
S: Into<String>,
V: Into<Vec<u8>>,
Expand All @@ -355,7 +362,7 @@ impl Client {
qos: QoS,
retain: bool,
payload: V,
) -> Result<Token<Pkid>, ClientError>
) -> Result<Token<AckOfPub>, ClientError>
where
S: Into<String>,
V: Into<Vec<u8>>,
Expand All @@ -364,7 +371,7 @@ impl Client {
}

/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn ack(&self, publish: &Publish) -> Result<Token<NoResponse>, ClientError> {
pub fn ack(&self, publish: &Publish) -> Result<Token<AckOfAck>, ClientError> {
let (resolver, token) = Resolver::new();
let ack = get_ack_req(publish, resolver);
if let Some(ack) = ack {
Expand All @@ -375,7 +382,7 @@ impl Client {
}

/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn try_ack(&self, publish: &Publish) -> Result<Token<NoResponse>, ClientError> {
pub fn try_ack(&self, publish: &Publish) -> Result<Token<AckOfAck>, ClientError> {
self.client.try_ack(publish)
}

Expand All @@ -384,7 +391,7 @@ impl Client {
&self,
topic: S,
qos: QoS,
) -> Result<Token<Pkid>, ClientError> {
) -> Result<Token<SubAck>, ClientError> {
let (resolver, token) = Resolver::new();
let subscribe = Subscribe::new(topic, qos);
let is_valid = subscribe_has_valid_filters(&subscribe);
Expand All @@ -402,12 +409,12 @@ impl Client {
&self,
topic: S,
qos: QoS,
) -> Result<Token<Pkid>, ClientError> {
) -> Result<Token<SubAck>, ClientError> {
self.client.try_subscribe(topic, qos)
}

/// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
pub fn subscribe_many<T>(&self, topics: T) -> Result<Token<Pkid>, ClientError>
pub fn subscribe_many<T>(&self, topics: T) -> Result<Token<SubAck>, ClientError>
where
T: IntoIterator<Item = SubscribeFilter>,
{
Expand All @@ -424,15 +431,15 @@ impl Client {
Ok(token)
}

pub fn try_subscribe_many<T>(&self, topics: T) -> Result<Token<Pkid>, ClientError>
pub fn try_subscribe_many<T>(&self, topics: T) -> Result<Token<SubAck>, ClientError>
where
T: IntoIterator<Item = SubscribeFilter>,
{
self.client.try_subscribe_many(topics)
}

/// Sends a MQTT Unsubscribe to the `EventLoop`
pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<Token<Pkid>, ClientError> {
pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<Token<UnsubAck>, ClientError> {
let (resolver, token) = Resolver::new();
let unsubscribe = Unsubscribe::new(topic.into());
let request = Request::Unsubscribe(unsubscribe, resolver);
Expand All @@ -442,7 +449,10 @@ impl Client {
}

/// Sends a MQTT Unsubscribe to the `EventLoop`
pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<Token<Pkid>, ClientError> {
pub fn try_unsubscribe<S: Into<String>>(
&self,
topic: S,
) -> Result<Token<UnsubAck>, ClientError> {
self.client.try_unsubscribe(topic)
}

Expand Down
27 changes: 21 additions & 6 deletions rumqttc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,21 @@ pub use proxy::{Proxy, ProxyAuth, ProxyType};

pub type Incoming = Packet;

/// Used to encapsulate all publish/pubrec acknowledgements in v4
#[derive(Debug, PartialEq)]
pub enum AckOfPub {
PubAck(PubAck),
PubComp(PubComp),
None,
}

/// Used to encapsulate all ack/pubrel acknowledgements in v4
#[derive(Debug)]
pub enum AckOfAck {
None,
PubRel(PubRel),
}

/// Current outgoing activity on the eventloop
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Outgoing {
Expand Down Expand Up @@ -191,12 +206,12 @@ pub enum Outgoing {
/// handled one by one.
#[derive(Debug)]
pub enum Request {
Publish(Publish, Resolver<Pkid>),
PubAck(PubAck, Resolver<()>),
PubRec(PubRec, Resolver<()>),
PubRel(PubRel, Resolver<Pkid>),
Subscribe(Subscribe, Resolver<Pkid>),
Unsubscribe(Unsubscribe, Resolver<Pkid>),
Publish(Publish, Resolver<AckOfPub>),
PubAck(PubAck, Resolver<AckOfAck>),
PubRec(PubRec, Resolver<AckOfAck>),
PubRel(PubRel, Resolver<AckOfPub>),
Subscribe(Subscribe, Resolver<SubAck>),
Unsubscribe(Unsubscribe, Resolver<UnsubAck>),
Disconnect(Resolver<()>),
PingReq,
}
Expand Down
Loading

0 comments on commit 919c981

Please sign in to comment.