Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Canceling an async receive drops messages #65

Open
Andrepuel opened this issue Feb 19, 2020 · 2 comments
Open

Canceling an async receive drops messages #65

Andrepuel opened this issue Feb 19, 2020 · 2 comments

Comments

@Andrepuel
Copy link

From the source code:

struct WorkQueue {
    waiting: VecDeque<oneshot::Sender<Result<NngMsg>>>,
    ready: VecDeque<Result<NngMsg>>,
}

impl WorkQueue {
    fn push_back(&mut self, message: Result<NngMsg>) {
        if let Some(sender) = self.waiting.pop_front() {
            sender
                .send(message)
                .unwrap_or_else(|err| debug!("Dropping message: {:?}", err));
        } else {
            self.ready.push_back(message);
        }
    }

    fn pop_front(&mut self) -> AsyncMsg {
        // If a value is ready return it immediately.  Otherwise
        if let Some(item) = self.ready.pop_front() {
            Box::pin(future::ready(item))
        } else {
            let (sender, receiver) = oneshot::channel();
            self.waiting.push_back(sender);
            let receiver = receiver.map(result::flatten_result);
            Box::pin(receiver)
        }
    }
}

When you read from a socket. The pop_front function is called. Since there is no message ready, an oneshot channel is registered. If the read is cancelled (AsyncMsg is dropped), the oneshot stays registered.
Eventually, when a message actually is received, it will be sent to that invalid channel. The send will fail and the message will be dropped.

A simple workaround would be to put the body of the push_back within a loop statement. Trying to send the message until a valid channel is reached or until it is placed into the ready queue.

Cancelling a read is useful when you use the select! macro.

@Andrepuel
Copy link
Author

The following workaround worked for me:

impl WorkQueue {
    fn push_back(&mut self, message: Result<NngMsg>) {
        let mut message = Some(message);
        loop {
            if let Some(sender) = self.waiting.pop_front() {
                if let Err(unsent_message) = sender
                    .send(message.take().unwrap())
                {
                    message = Some(unsent_message);
                    continue;
                }
            } else {
                self.ready.push_back(message.take().unwrap());
            }

            break;
        }
    }

    fn pop_front(&mut self) -> AsyncMsg {
        // If a value is ready return it immediately.  Otherwise
        if let Some(item) = self.ready.pop_front() {
            Box::pin(future::ready(item))
        } else {
            let (sender, receiver) = oneshot::channel();
            self.waiting.push_back(sender);
            let receiver = receiver.map(result::flatten_result);
            Box::pin(receiver)
        }
    }
}

@jeikabu
Copy link
Owner

jeikabu commented Feb 20, 2020

Seems reasonable.

If you could add a test verifying the correct behavior I'd gladly accept a PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants