-
Notifications
You must be signed in to change notification settings - Fork 268
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: return token when request is made, resolve token when request handling completes #916
base: main
Are you sure you want to change the base?
Conversation
publish
/subscribe
/unsubscribe
packets are acked
Pull Request Test Coverage Report for Build 13582365646Details
💛 - Coveralls |
Glad to see the progress here. Some of the things to discuss:
The output shows "RecvError"
In
after
|
Thanks for the review, two more things I have been thinking about with respect to the interface are as follows:
|
rumqttc/src/eventloop.rs
Outdated
@@ -75,11 +75,11 @@ pub struct EventLoop { | |||
/// Current state of the connection | |||
pub state: MqttState, | |||
/// Request stream | |||
requests_rx: Receiver<Request>, | |||
requests_rx: Receiver<(Request, Option<PromiseTx>)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
requests_rx: Receiver<(Request, Option<PromiseTx>)>, | |
requests_rx: Receiver<PendingRequest>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved in #917
rumqttc/src/eventloop.rs
Outdated
/// Pending packets from last session | ||
pub pending: VecDeque<Request>, | ||
pub pending: VecDeque<(Request, Option<PromiseTx>)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a breaking change, note on release
For 1, that's great. That also helps on 2, before we decide to put more things in error cases, the pkid helps to extract the error packet from event loop events. |
publish
/subscribe
/unsubscribe
packets are acked
Thank you for the efforts, I'm also interested in using this once it's merged. |
Any way that one can help to get this merged? Thank you! |
just an heads up: this will be merged and released with rumqttc upcoming version by / in January! thank you so much for your patience and contribution 💯 |
Do you maybe have any updates like a current schedule or planned deadline for a merge and release of this feature? Thank you for the efforts. |
Hi @izolyomi . We are unfortunately delayed on this. We'll try to focus on these issues in Feb and cut a new release |
I checked this out and it works quite well, aside from a handful of issues:
I see that the release of this feature has been delayed - is there some issue with the implementation that I'm missing? Aside from the above issues I found, everything worked fantastic, and it would be extremely helpful to get this integrated into my codebase ASAP. Are there any updates? |
Thanks for pointing these out @cartertinney, will figure out how to solve this over the weekend! |
#946 appears to have solved issues 1 and 3 that I raised above. Thanks much, I continue to look forward to fixes for 2 and 4. To expand on issue 2, further testing exposed that this same hanging issue affecting |
for 2, if client is timed out on keep alive, the connection is closed and so token should resolve to for 3, we tested and it was working fine. note that when using manual acks with qos 2 you just need to ack the incoming publish ( rest of the qos 2 flow is handled internally, i.e. pubrel/pubcomp hadshake is done internally ) and for outgoing publishes, token is resolved only after getting pubcomp! [ shall we have manual acks for PubRec &/ PubRel as well? cc: @tekjar @de-sh ] can you please provide a minimal poc so that we can reproduce the issue to fix it? thanks! |
Yes, like I said, issues 1 and 3 appear to be solved. I'm happy with the way QoS2 handles the PUBREC/REL/COMP handshake, it is much improved from prior iterations - I personally don't think manual PUBCOMP has much value from a user perspective, I like the current solution. As for 2, I can run the test again, perhaps I missed something, but you are correct - I'm not seeing the Token resolve to a Disconnected error, I'm seeing the same hanging behavior as before on Tokens created by SUB and UNSUB operations. Is it possible there's something I'm missing? |
my observations:
Issue is that the Minimal PoCuse rumqttc::{AsyncClient, MqttOptions, QoS};
use std::time::Duration;
#[tokio::main(flavor = "current_thread")]
async fn main() {
let mqttoptions = MqttOptions::new("poc", "never-connect-here", 1883);
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
// spawn eventloop poll in separate task
tokio::task::spawn(async move {
loop {
let event = eventloop.poll().await;
match &event {
Ok(v) => {
println!("Event = {v:?}");
}
Err(e) => {
println!("Error = {e:?}");
tokio::time::sleep(Duration::from_secs(60)).await;
}
}
}
});
let token = client
.subscribe("hello/world", QoS::AtMostOnce)
.await
.unwrap();
// as we never connect to network, we never process any requests in eventloop
// that are pending in channel. Hence sender is never dropped and token is
// never resolved!
match token.await {
Ok(pkid) => println!("Acknowledged Sub({pkid:?})"),
Err(e) => println!("Subscription failed: {e:?}"),
};
} For keep alive timeouts / disconnects due to other errors: packets that are sent, i.e. for handles in state, we should clear them em, pushed the commit c7ec9b4 for fixing this! But for the ones that are in channel ( yet to reach the eventloop ). we don't intend to drop the requests which are in channel, which leads to that hanging behaviour as in above poc and mentioned above. please comment on the above behaviour for suggestions! thanks! |
we do not retransmit the pubacks though: rumqtt/rumqttc/src/eventloop.rs Line 137 in cb89fcf
can you verify again if i'm missing something or maybe something related to qos2? @cartertinney |
Trying out the latest version, I now see the expected behavior for SUB and UNSUB in the case of connection drops, I now get a Now, regarding the acknowledgement process: I understand that the ACK requests that queue up while disconnected are in the channel (yet to reach the EventLoop/ MQTT session), but the issue is that allowing them to queue up like this violates the MQTT specification. Acking a message from a previous connection upon reconnect is dangerous because we can't guarantee we are acknowledging the packet we think we are. Consider the case: A publish is sent to the client from the broker. After receiving the publish, but before sending the PUBACK, the client loses connection. The PUBACK is queued up in the channel. Eventually, the client reconnects. The broker redelivers the original publish, now with the DUP flag set to true. The queued up PUBACK the client had in the channel is also delivered upon reconnect. This PUBACK was intended to be sent for the original copy of the message on a prior connection, but now acks the redelivery from the broker's perspective. The broker sees no outstanding PKID that needs to be acked, but the client thinks it still needs to ack the duplicate (treated as a new message per spec), resulting in a second ACK being sent for the same PKID. At this point:
This is admittedly a fairly unlikely scenario when using QoS1 and automatic acking, but when using a high amount of message throughput, or when using QoS2, or when using manual ack (and ESPECIALLY when using QoS2 with manual ack and high throughput), this scenario actually becomes relatively easy to trigger. The only correct behavior is that you must wait for the broker to redeliver the publish, and then ack the duplicate. I realize, from an implementation standpoint, that this is difficult due to the way you use a single channel for all requests, but ultimately to be correct I think you probably would need to handle PUBACK/PUBREC/PUBCOMP differently from the rest. I hope this explanation is clear - it's a relatively easy scenario to reproduce, but I'm happy to provide a sample application that will reliably create this issue if necessary. |
fair points, but this case is already handled ( atleast for PubAck ) here as mentioned previously: rumqtt/rumqttc/src/eventloop.rs Line 137 in cb89fcf
iirc, we need to retransmit PubRels as well right? so here as you pointed out ignoring |
Solves #805
Type of change
New feature, that notifies the requester when their requests are acknowledged by the broker(in the case of QoS 1/2 Publishes and Subscribe/Unsubscribe) or when they are written to TCP buffer(for all other requests).
BREAKING:
pending
definition changes fromVecDeque<Request>
toVecDeque<(Request, Option<PromiseTx>)>
.Checklist:
cargo fmt
CHANGELOG.md
if it's relevant to the users of the library. If it's not relevant mention why.