Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add wait for remote settings being set before sending requests #641

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -534,6 +534,11 @@ where
pub fn is_extended_connect_protocol_enabled(&self) -> bool {
self.inner.is_extended_connect_protocol_enabled()
}

/// Returns negotiated max send streams
pub fn max_send_streams(&self) -> usize {
self.inner.max_send_streams()
}
}

impl<B> fmt::Debug for SendRequest<B>
26 changes: 26 additions & 0 deletions src/proto/streams/counts.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::*;

use std::task::{Context, Waker};
use std::usize;

#[derive(Debug)]
@@ -25,6 +26,11 @@ pub(super) struct Counts {

/// Current number of pending locally reset streams
num_reset_streams: usize,

/// If remote settings were applied
remote_settings_applied: bool,

remote_settings_applied_task: Option<Waker>,
}

impl Counts {
@@ -38,6 +44,8 @@ impl Counts {
num_recv_streams: 0,
max_reset_streams: config.local_reset_max,
num_reset_streams: 0,
remote_settings_applied: false,
remote_settings_applied_task: None,
}
}

@@ -108,6 +116,8 @@ impl Counts {
if let Some(val) = settings.max_concurrent_streams() {
self.max_send_streams = val as usize;
}
self.remote_settings_applied = true;
self.notify_remote_settings_applied()
}

/// Run a block of code that could potentially transition a stream's state.
@@ -173,6 +183,16 @@ impl Counts {
self.max_send_streams
}

/// Returns if remote settings were applied
pub(crate) fn remote_settings_applied(&self) -> bool {
self.remote_settings_applied
}

/// Sets waker task for remote settings being set
pub(crate) fn wait_remote_settings_applied(&mut self, cx: &Context) {
self.remote_settings_applied_task = Some(cx.waker().clone());
}

/// Returns the maximum number of streams that can be initiated by the
/// remote peer.
pub(crate) fn max_recv_streams(&self) -> usize {
@@ -197,6 +217,12 @@ impl Counts {
assert!(self.num_reset_streams > 0);
self.num_reset_streams -= 1;
}

fn notify_remote_settings_applied(&mut self) {
if let Some(task) = self.remote_settings_applied_task.take() {
task.wake();
}
}
}

impl Drop for Counts {
4 changes: 4 additions & 0 deletions src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
@@ -922,6 +922,10 @@ where

me.actions.ensure_no_conn_error()?;
me.actions.send.ensure_next_stream_id()?;
if !me.counts.remote_settings_applied() {
me.counts.wait_remote_settings_applied(cx);
return Poll::Pending;
}

if let Some(pending) = pending {
let mut stream = me.store.resolve(pending.key);