Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: return token when request is made, resolve token when request handling completes #916

Open
wants to merge 31 commits into
base: main
Choose a base branch
from

Conversation

de-sh
Copy link
Contributor

@de-sh de-sh commented Sep 29, 2024

Solves #805

Type of change

New feature, that notifies the requester when their requests are acknowledged by the broker(in the case of QoS 1/2 Publishes and Subscribe/Unsubscribe) or when they are written to TCP buffer(for all other requests).

BREAKING: pending definition changes from VecDeque<Request> to VecDeque<(Request, Option<PromiseTx>)>.

Checklist:

  • Formatted with cargo fmt
  • Make an entry to CHANGELOG.md if it's relevant to the users of the library. If it's not relevant mention why.

@de-sh de-sh changed the title feat: acknowledge notification feat: notify when publish/subscribe/unsubscribe packets are acked Sep 29, 2024
@coveralls
Copy link

coveralls commented Sep 29, 2024

Pull Request Test Coverage Report for Build 13582365646

Details

  • 322 of 696 (46.26%) changed or added relevant lines in 7 files are covered.
  • 34 unchanged lines in 4 files lost coverage.
  • Overall coverage increased (+1.7%) to 37.769%

Changes Missing Coverage Covered Lines Changed/Added Lines %
rumqttc/src/eventloop.rs 1 2 50.0%
rumqttc/src/v5/eventloop.rs 0 1 0.0%
rumqttc/src/tokens.rs 21 33 63.64%
rumqttc/src/state.rs 147 177 83.05%
rumqttc/src/v5/state.rs 120 166 72.29%
rumqttc/src/client.rs 28 149 18.79%
rumqttc/src/v5/client.rs 5 168 2.98%
Files with Coverage Reduction New Missed Lines %
rumqttc/src/state.rs 1 84.42%
rumqttc/src/v5/state.rs 3 67.23%
rumqttc/src/client.rs 15 32.47%
rumqttc/src/v5/client.rs 15 12.64%
Totals Coverage Status
Change from base Build 13581587485: 1.7%
Covered Lines: 6363
Relevant Lines: 16847

💛 - Coveralls

@de-sh de-sh marked this pull request as ready for review September 29, 2024 20:08
@xiaocq2001
Copy link
Contributor

xiaocq2001 commented Sep 30, 2024

Glad to see the progress here. Some of the things to discuss:

  1. Is there any example on how the feature is used?
    I tried to use following code to test
    println!("--- Publishing messages and wait for ack ---");
    let mut set = JoinSet::new();

    let ack_promise = client
        .publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
        .await
        .unwrap();
    set.spawn(async move {
        ack_promise.await
    });

    let ack_promise = client
        .publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
        .await
        .unwrap();
    set.spawn(async move {
        ack_promise.await
    });

    let ack_promise = client
        .publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
        .await
        .unwrap();
    set.spawn(async move {
        ack_promise.await
    });

    while let Some(res) = set.join_next().await {
        println!("Acknoledged = {:?}", res?);
    }

The output shows "RecvError"

--- Publishing messages and wait for ack ---
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 0, Payload Size = 1
Event = Outgoing(Publish(0))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 4, Payload Size = 2
Event = Outgoing(Publish(4))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 5, Payload Size = 3
Event = Outgoing(Publish(5))
Acknoledged = Err(RecvError(()))
Acknoledged = Err(RecvError(()))
Acknoledged = Err(RecvError(()))
Event = Incoming(Publish(Publish { dup: false, qos: AtMostOnce, retain: false, topic: b"hello/world", pkid: 0, payload: b"\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 4, reason: Success, properties: None }))
Event = Incoming(PubRec(PubRec { pkid: 5, reason: Success, properties: None }))
Event = Outgoing(PubRel(5))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 1, payload: b"\x01\x01", properties: None }))
Event = Outgoing(PubAck(1))
Event = Incoming(Publish(Publish { dup: false, qos: ExactlyOnce, retain: false, topic: b"hello/world", pkid: 2, payload: b"\x01\x01\x01", properties: None }))
Event = Outgoing(PubRec(2))
Event = Incoming(PubComp(PubComp { pkid: 5, reason: Success, properties: None }))
Event = Incoming(PubRel(PubRel { pkid: 2, reason: Success, properties: None }))
Event = Outgoing(PubComp(2))

In outgoing_publish, the tx is not saved to ack_waiter, it's dropped!
Maybe you can add (with QoS0 notification in discuss 2)

        if publish.qos != QoS::AtMostOnce {
            self.ack_waiter[pkid as usize] = tx;
        } else {
            if let Some(tx) = tx {
                tx.resolve();
            }
        }

after

        let event = Event::Outgoing(Outgoing::Publish(pkid));
        self.events.push_back(event);
  1. It seems in QoS0, there is no notification, is it worthy that we have notification on QoS0 packet sent (outgoing_publish).

@de-sh
Copy link
Contributor Author

de-sh commented Sep 30, 2024

Thanks for the review, two more things I have been thinking about with respect to the interface are as follows:

  1. Returning pkid instead of ().
  2. Errors with more context about why a request was refused by the broker, both subscribe and unsub acks have reason codes as response. Should this just return the acknowledgement packets received from the broker or repackage the same into a more presentable type?

@@ -75,11 +75,11 @@ pub struct EventLoop {
/// Current state of the connection
pub state: MqttState,
/// Request stream
requests_rx: Receiver<Request>,
requests_rx: Receiver<(Request, Option<PromiseTx>)>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
requests_rx: Receiver<(Request, Option<PromiseTx>)>,
requests_rx: Receiver<PendingRequest>,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved in #917

/// Pending packets from last session
pub pending: VecDeque<Request>,
pub pending: VecDeque<(Request, Option<PromiseTx>)>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change, note on release

@de-sh de-sh mentioned this pull request Oct 1, 2024
2 tasks
@xiaocq2001
Copy link
Contributor

Thanks for the review, two more things I have been thinking about with respect to the interface are as follows:

  1. Returning pkid instead of ().
  2. Errors with more context about why a request was refused by the broker, both subscribe and unsub acks have reason codes as response. Should this just return the acknowledgement packets received from the broker or repackage the same into a more presentable type?

For 1, that's great. That also helps on 2, before we decide to put more things in error cases, the pkid helps to extract the error packet from event loop events.
For 2, I vote the acknowledgement packets, since that's not only the reason code, but also with reason string and maybe other user defined properties.

@de-sh de-sh changed the title feat: notify when publish/subscribe/unsubscribe packets are acked feat: return token when request is made, resolve token when request handling completes Nov 15, 2024
@izolyomi
Copy link

Thank you for the efforts, I'm also interested in using this once it's merged.

@Leandros
Copy link

Leandros commented Dec 2, 2024

Any way that one can help to get this merged?

Thank you!

@swanandx
Copy link
Member

swanandx commented Dec 3, 2024

just an heads up: this will be merged and released with rumqttc upcoming version by / in January!

thank you so much for your patience and contribution 💯

@izolyomi
Copy link

Do you maybe have any updates like a current schedule or planned deadline for a merge and release of this feature? Thank you for the efforts.

@tekjar
Copy link
Contributor

tekjar commented Jan 31, 2025

Hi @izolyomi . We are unfortunately delayed on this. We'll try to focus on these issues in Feb and cut a new release

@cartertinney
Copy link

I checked this out and it works quite well, aside from a handful of issues:

  1. The reason for rejection by the broker that are reported via the Token cannot seem to be matched upon, or interacted with in any meaningful way. It would be nice if I could write conditional logic the respond to particular rejections in different ways. Perhaps I missed something?

  2. If a subscribe or unsubscribe is done when the client thinks it is connected, but is not and has just not timed out on the ping/keep alive yet, the Token will hang indefinitely - I am assuming is because, per MQTT spec, unacknowledged SUB and UNSUB are discarded upon connection loss/reconnect, but the discard does not seem to get reported to the Token, resulting in the token.await hanging forever.

  3. When using QoS2 and manual ack for a received message (perhaps this also applies with auto ack), I'm seeing the Token.await return after the PUBREC is sent, without waiting for the rest of the PUBREL/PUBCOMP handshake

  4. When using QoS1 or QoS2 and manual ack for a received message if there is a disconnect between receiving the message and sending the manual ack with .ack(), the PUBACK will queue up and be redelivered upon reconnect, which is a MQTT spec violation as it can lead to acking the wrong message, or duplicate acks in edge cases, or even broker disconnects. I assume this also applies to automatic acking, but it's very hard to reproduce this condition without manual ack.

I see that the release of this feature has been delayed - is there some issue with the implementation that I'm missing? Aside from the above issues I found, everything worked fantastic, and it would be extremely helpful to get this integrated into my codebase ASAP. Are there any updates?

@de-sh
Copy link
Contributor Author

de-sh commented Feb 21, 2025

checked this out and it works quite well, aside from a handful of issues:

Thanks for pointing these out @cartertinney, will figure out how to solve this over the weekend!

@de-sh de-sh mentioned this pull request Feb 22, 2025
2 tasks
@cartertinney
Copy link

cartertinney commented Feb 25, 2025

#946 appears to have solved issues 1 and 3 that I raised above. Thanks much, I continue to look forward to fixes for 2 and 4.

To expand on issue 2, further testing exposed that this same hanging issue affecting .subscribe() and .unsubscribe() also appears when using manual acks with the .ack() method on QoS2 messages, which makes sense and follows from the original observations, but I just thought I should clarify.

@swanandx
Copy link
Member

#946 appears to have solved issues 1 and 3 that I raised above. Thanks much, I continue to look forward to fixes for 2 and 4.

To expand on issue 2, further testing exposed that this same hanging issue affecting .subscribe() and .unsubscribe() also appears when using manual acks with the .ack() method on QoS2 messages, which makes sense and follows from the original observations, but I just thought I should clarify.

for 2, if client is timed out on keep alive, the connection is closed and so token should resolve to Disconnected error! you are mentioning that this isn't working right?

for 3, we tested and it was working fine.

note that when using manual acks with qos 2 you just need to ack the incoming publish ( rest of the qos 2 flow is handled internally, i.e. pubrel/pubcomp hadshake is done internally ) and for outgoing publishes, token is resolved only after getting pubcomp!

[ shall we have manual acks for PubRec &/ PubRel as well? cc: @tekjar @de-sh ]

can you please provide a minimal poc so that we can reproduce the issue to fix it?

thanks!

@cartertinney
Copy link

cartertinney commented Feb 26, 2025

#946 appears to have solved issues 1 and 3 that I raised above. Thanks much, I continue to look forward to fixes for 2 and 4.
To expand on issue 2, further testing exposed that this same hanging issue affecting .subscribe() and .unsubscribe() also appears when using manual acks with the .ack() method on QoS2 messages, which makes sense and follows from the original observations, but I just thought I should clarify.

for 2, if client is timed out on keep alive, the connection is closed and so token should resolve to Disconnected error! you are mentioning that this isn't working right?

for 3, we tested and it was working fine.

note that when using manual acks with qos 2 you just need to ack the incoming publish ( rest of the qos 2 flow is handled internally, i.e. pubrel/pubcomp hadshake is done internally ) and for outgoing publishes, token is resolved only after getting pubcomp!

[ shall we have manual acks for PubRec &/ PubRel as well? cc: @tekjar @de-sh ]

can you please provide a minimal poc so that we can reproduce the issue to fix it?

thanks!

Yes, like I said, issues 1 and 3 appear to be solved. I'm happy with the way QoS2 handles the PUBREC/REL/COMP handshake, it is much improved from prior iterations - I personally don't think manual PUBCOMP has much value from a user perspective, I like the current solution.

As for 2, I can run the test again, perhaps I missed something, but you are correct - I'm not seeing the Token resolve to a Disconnected error, I'm seeing the same hanging behavior as before on Tokens created by SUB and UNSUB operations. Is it possible there's something I'm missing?

@swanandx
Copy link
Member

I'm not seeing the Token resolve to a Disconnected error, I'm seeing the same hanging behavior as before on Tokens created by SUB and UNSUB operations

my observations:
TokenError::Disconnected is returned when the sender ( say tx ) is dropped. At a given time tx can be at:

  • in channel between client and eventloop
    this is when we call the method ( publish/subscribe/etc ) and eventloop is yet to process that request.

  • in state of eventloop
    one that Request is processed and we are waiting for the Ack

Issue is that the tx is never dropped if we don't have network connection!

Minimal PoC
use rumqttc::{AsyncClient, MqttOptions, QoS};
use std::time::Duration;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let mqttoptions = MqttOptions::new("poc", "never-connect-here", 1883);

    let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);

    // spawn eventloop poll in separate task
    tokio::task::spawn(async move {
        loop {
            let event = eventloop.poll().await;
            match &event {
                Ok(v) => {
                    println!("Event = {v:?}");
                }
                Err(e) => {
                    println!("Error = {e:?}");
                    tokio::time::sleep(Duration::from_secs(60)).await;
                }
            }
        }
    });

    let token = client
        .subscribe("hello/world", QoS::AtMostOnce)
        .await
        .unwrap();

    // as we never connect to network, we never process any requests in eventloop
    // that are pending in channel. Hence sender is never dropped and token is
    // never resolved!
    match token.await {
        Ok(pkid) => println!("Acknowledged Sub({pkid:?})"),
        Err(e) => println!("Subscription failed: {e:?}"),
    };
}

For keep alive timeouts / disconnects due to other errors:

packets that are sent, i.e. for handles in state, we should clear them em, pushed the commit c7ec9b4 for fixing this!

But for the ones that are in channel ( yet to reach the eventloop ). we don't intend to drop the requests which are in channel, which leads to that hanging behaviour as in above poc and mentioned above.

please comment on the above behaviour for suggestions!

thanks!

@swanandx
Copy link
Member

swanandx commented Feb 28, 2025

  1. When using QoS1 or QoS2 and manual ack for a received message if there is a disconnect between receiving the message and sending the manual ack with .ack(), the PUBACK will queue up and be redelivered upon reconnect, which is a MQTT spec violation as it can lead to acking the wrong message, or duplicate acks in edge cases, or even broker disconnects. I assume this also applies to automatic acking, but it's very hard to reproduce this condition without manual ack.

we do not retransmit the pubacks though:

Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack

can you verify again if i'm missing something or maybe something related to qos2? @cartertinney

@cartertinney
Copy link

cartertinney commented Mar 4, 2025

@de-sh @swanandx

Trying out the latest version, I now see the expected behavior for SUB and UNSUB in the case of connection drops, I now get a TokenError indicating the "sender side of channel was dropped". This is the right behavior, although the error text is a little confusing (the connection was lost, the underlying channel or Event Loop wasn't dropped from memory and still exists, and if connection were to be regained things would again begin working normally). That quirk of semantics aside, this is a huge improvement for correctness, I am thrilled with this change.

Now, regarding the acknowledgement process:

I understand that the ACK requests that queue up while disconnected are in the channel (yet to reach the EventLoop/ MQTT session), but the issue is that allowing them to queue up like this violates the MQTT specification. Acking a message from a previous connection upon reconnect is dangerous because we can't guarantee we are acknowledging the packet we think we are. Consider the case:

A publish is sent to the client from the broker. After receiving the publish, but before sending the PUBACK, the client loses connection. The PUBACK is queued up in the channel. Eventually, the client reconnects. The broker redelivers the original publish, now with the DUP flag set to true. The queued up PUBACK the client had in the channel is also delivered upon reconnect. This PUBACK was intended to be sent for the original copy of the message on a prior connection, but now acks the redelivery from the broker's perspective. The broker sees no outstanding PKID that needs to be acked, but the client thinks it still needs to ack the duplicate (treated as a new message per spec), resulting in a second ACK being sent for the same PKID. At this point:

  1. The broker may choose to end the session with the client for sending an unexpected ACK (in my experience this is most common on QoS2 with unsolicited PUBREC/PUBREL errors in the broker/client depending on the timing) because of protocol violations that threaten the integrity of message delivery guarantees.
  2. If it does not kick the client off, and the broker had reassigned the PKID to a new message (because the PKID was acked means it's available for reassignment), and delivers it to the client before the client sends the second ACK for the duplicate message, the client could incorrectly ack a completely different message, resulting in QoS guarantees falling apart.

This is admittedly a fairly unlikely scenario when using QoS1 and automatic acking, but when using a high amount of message throughput, or when using QoS2, or when using manual ack (and ESPECIALLY when using QoS2 with manual ack and high throughput), this scenario actually becomes relatively easy to trigger.

The only correct behavior is that you must wait for the broker to redeliver the publish, and then ack the duplicate.

I realize, from an implementation standpoint, that this is difficult due to the way you use a single channel for all requests, but ultimately to be correct I think you probably would need to handle PUBACK/PUBREC/PUBCOMP differently from the rest.

I hope this explanation is clear - it's a relatively easy scenario to reproduce, but I'm happy to provide a sample application that will reliably create this issue if necessary.

@swanandx
Copy link
Member

swanandx commented Mar 5, 2025

fair points, but this case is already handled ( atleast for PubAck ) here as mentioned previously:

Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack

iirc, we need to retransmit PubRels as well right? so here as you pointed out ignoring PubRec and PubComps would be much more better behaviour right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants