Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: return token when request is made, resolve token when request handling completes #916

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
dcabfdb
feat: acknowledge notification
Sep 29, 2024
bc8de88
fix: don't panic if promise dropped
Sep 29, 2024
176fe5c
fix: don't remove slots, just take contents
Sep 29, 2024
c4ce2f7
fix: direct resolve for QoS 0
Sep 29, 2024
b6c5ed3
fix: validate and notify sub/unsub acks
Sep 29, 2024
ab96189
fix: forget acked packet
Sep 29, 2024
656cf74
refactor
Sep 29, 2024
4a936fe
fix: panic if `max_inflight == u16::MAX`
Sep 29, 2024
d508595
feat: notify acks in v5 also
Sep 29, 2024
6b0c3ec
doc: add changelog
Sep 29, 2024
f43ea21
fix: bug observed in https://github.com/bytebeamio/rumqtt/pull/916#is…
Sep 30, 2024
2632ab1
doc: add examples of ack notify
Sep 30, 2024
8ad2223
feat: return pkid of ack
Sep 30, 2024
db7c322
doc: update example with pkids
Sep 30, 2024
31887ca
feat: return reason of request failure
Sep 30, 2024
066783a
rm unnecessary example
Sep 30, 2024
10d843c
test: reliability of ack promises
Oct 1, 2024
15ffccf
fix: rm dup import
Oct 1, 2024
b664fd6
test: run on unique port
Oct 1, 2024
b6d447d
doc: code comments
Oct 1, 2024
23e4d9a
fix: don't expose waiters outside crate
Oct 1, 2024
979cb8f
feat: non-blocking `try_resolve`
Oct 2, 2024
79d5ff8
doc: comment on `blocking_wait`
Oct 2, 2024
165d169
refactor: condense and simplify examples
Oct 6, 2024
bfeb44d
test: working of sub/unsub promises
Oct 8, 2024
a0e678b
feat: tokens for all requests (#921)
Nov 15, 2024
d67e919
fix: imports
swanandx Nov 15, 2024
8be8afe
fix: clippy lint
swanandx Nov 15, 2024
919c981
fix: token interfaces (#946)
de-sh Feb 22, 2025
c7ec9b4
fix: clear waiting subacks and unsubacks state
swanandx Feb 28, 2025
2f4def7
Merge branch 'main' into ack-notify
swanandx Feb 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* `set_session_expiry_interval` and `session_expiry_interval` methods on `MqttOptions`.
* `Auth` packet as per MQTT5 standards
* Allow configuring the `nodelay` property of underlying TCP client with the `tcp_nodelay` field in `NetworkOptions`
* `publish` / `subscribe` / `unsubscribe` methods on `AsyncClient` and `Client` now return an `AckPromise` which resolves when the packet(except for QoS 0 publishes, which resolve as soon as handled) is acknowledged by the broker.

### Changed

Expand Down
81 changes: 81 additions & 0 deletions rumqttc/examples/ack_promise.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use tokio::task::{self, JoinSet};

use rumqttc::{AsyncClient, MqttOptions, QoS};
use std::error::Error;
use std::time::Duration;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
task::spawn(async move {
loop {
let event = eventloop.poll().await;
match &event {
Ok(v) => {
println!("Event = {v:?}");
}
Err(e) => {
println!("Error = {e:?}");
}
}
}
});

// Subscribe and wait for broker acknowledgement
match client
.subscribe("hello/world", QoS::AtMostOnce)
.await
.unwrap()
.await
{
Ok(pkid) => println!("Acknowledged Sub({pkid:?})"),
Err(e) => println!("Subscription failed: {e:?}"),
}

// Publish at all QoS levels and wait for broker acknowledgement
for (i, qos) in [QoS::AtMostOnce, QoS::AtLeastOnce, QoS::ExactlyOnce]
.into_iter()
.enumerate()
{
match client
.publish("hello/world", qos, false, vec![1; i])
.await
.unwrap()
.await
{
Ok(ack) => println!("Acknowledged Pub({ack:?})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}

// Publish with different QoS levels and spawn wait for notification
let mut set = JoinSet::new();
for (i, qos) in [QoS::AtMostOnce, QoS::AtLeastOnce, QoS::ExactlyOnce]
.into_iter()
.enumerate()
{
let token = client
.publish("hello/world", qos, false, vec![1; i])
.await
.unwrap();
set.spawn(token);
}

while let Some(Ok(res)) = set.join_next().await {
match res {
Ok(ack) => println!("Acknowledged Pub({ack:?})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}

// Unsubscribe and wait for broker acknowledgement
match client.unsubscribe("hello/world").await.unwrap().await {
Ok(ack) => println!("Acknowledged Unsub({ack:?})"),
Err(e) => println!("Unsubscription failed: {e:?}"),
}

Ok(())
}
98 changes: 98 additions & 0 deletions rumqttc/examples/ack_promise_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use flume::bounded;
use rumqttc::{Client, MqttOptions, QoS, TokenError};
use std::error::Error;
use std::thread::{self, sleep};
use std::time::Duration;

fn main() -> Result<(), Box<dyn Error>> {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut conn) = Client::new(mqttoptions, 10);
thread::spawn(move || {
for event in conn.iter() {
match &event {
Ok(v) => {
println!("Event = {v:?}");
}
Err(e) => {
println!("Error = {e:?}");
}
}
}
});

// Subscribe and wait for broker acknowledgement
match client
.subscribe("hello/world", QoS::AtMostOnce)
.unwrap()
.wait()
{
Ok(pkid) => println!("Acknowledged Sub({pkid:?})"),
Err(e) => println!("Subscription failed: {e:?}"),
}

// Publish at all QoS levels and wait for broker acknowledgement
for (i, qos) in [QoS::AtMostOnce, QoS::AtLeastOnce, QoS::ExactlyOnce]
.into_iter()
.enumerate()
{
match client
.publish("hello/world", qos, false, vec![1; i])
.unwrap()
.wait()
{
Ok(ack) => println!("Acknowledged Pub({ack:?})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}

// Spawn threads for each publish, use channel to notify result
let (tx, rx) = bounded(1);

for (i, qos) in [QoS::AtMostOnce, QoS::AtLeastOnce, QoS::ExactlyOnce]
.into_iter()
.enumerate()
{
let token = client
.publish("hello/world", qos, false, vec![1; i])
.unwrap();
let tx = tx.clone();
thread::spawn(move || {
let res = token.wait();
tx.send(res).unwrap()
});
}

// Try resolving a promise, if it is waiting to resolve, try again after a sleep of 1s
let mut token = client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 4])
.unwrap();
thread::spawn(move || loop {
match token.check() {
Err(TokenError::Waiting) => {
println!("Promise yet to resolve, retrying");
sleep(Duration::from_secs(1));
}
res => {
tx.send(res).unwrap();
break;
}
}
});

while let Ok(res) = rx.recv() {
match res {
Ok(ack) => println!("Acknowledged Pub({ack:?})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}

// Unsubscribe and wait for broker acknowledgement
match client.unsubscribe("hello/world").unwrap().wait() {
Ok(ack) => println!("Acknowledged Unsub({ack:?})"),
Err(e) => println!("Unsubscription failed: {e:?}"),
}

Ok(())
}
81 changes: 81 additions & 0 deletions rumqttc/examples/ack_promise_v5.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use tokio::task::{self, JoinSet};

use rumqttc::v5::{mqttbytes::QoS, AsyncClient, MqttOptions};
use std::error::Error;
use std::time::Duration;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
task::spawn(async move {
loop {
let event = eventloop.poll().await;
match &event {
Ok(v) => {
println!("Event = {v:?}");
}
Err(e) => {
println!("Error = {e:?}");
}
}
}
});

// Subscribe and wait for broker acknowledgement
match client
.subscribe("hello/world", QoS::AtMostOnce)
.await
.unwrap()
.await
{
Ok(pkid) => println!("Acknowledged Sub({pkid:?})"),
Err(e) => println!("Subscription failed: {e:?}"),
}

// Publish at all QoS levels and wait for broker acknowledgement
for (i, qos) in [QoS::AtMostOnce, QoS::AtLeastOnce, QoS::ExactlyOnce]
.into_iter()
.enumerate()
{
match client
.publish("hello/world", qos, false, vec![1; i])
.await
.unwrap()
.await
{
Ok(pkid) => println!("Acknowledged Pub({pkid:?})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}

// Publish with different QoS levels and spawn wait for notification
let mut set = JoinSet::new();
for (i, qos) in [QoS::AtMostOnce, QoS::AtLeastOnce, QoS::ExactlyOnce]
.into_iter()
.enumerate()
{
let token = client
.publish("hello/world", qos, false, vec![1; i])
.await
.unwrap();
set.spawn(token);
}

while let Some(Ok(res)) = set.join_next().await {
match res {
Ok(pkid) => println!("Acknowledged Pub({pkid:?})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}

// Unsubscribe and wait for broker acknowledgement
match client.unsubscribe("hello/world").await.unwrap().await {
Ok(pkid) => println!("Acknowledged Unsub({pkid:?})"),
Err(e) => println!("Unsubscription failed: {e:?}"),
}

Ok(())
}
Loading