Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(SendMessage): use SendStream::set_writable_event_low_watermark #1838

Merged
merged 22 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions neqo-http3/src/send_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::{cell::RefCell, cmp::min, fmt::Debug, rc::Rc};
use std::{cell::RefCell, cmp::min, fmt::Debug, num::NonZeroUsize, rc::Rc};

use neqo_common::{qdebug, qtrace, Encoder, Header, MessageType};
use neqo_qpack::encoder::QPackEncoder;
Expand All @@ -17,6 +17,7 @@ use crate::{
SendStream, SendStreamEvents, Stream,
};

const MIN_DATA_FRAME_SIZE: usize = 3; // Minimal DATA frame size: 2 (header) + 1 (payload)
const MAX_DATA_HEADER_SIZE_2: usize = (1 << 6) - 1; // Maximal amount of data with DATA frame header size 2
const MAX_DATA_HEADER_SIZE_2_LIMIT: usize = MAX_DATA_HEADER_SIZE_2 + 3; // 63 + 3 (size of the next buffer data frame header)
const MAX_DATA_HEADER_SIZE_3: usize = (1 << 14) - 1; // Maximal amount of data with DATA frame header size 3
Expand Down Expand Up @@ -177,7 +178,14 @@ impl SendStream for SendMessage {
let available = conn
.stream_avail_send_space(self.stream_id())
.map_err(|e| Error::map_stream_send_errors(&e.into()))?;
if available <= 2 {
if available < MIN_DATA_FRAME_SIZE {
// Setting this once, instead of every time the available send space
// is exhausted, would suffice. That said, function call should be
// cheap, thus not worth optimizing.
conn.stream_set_writable_event_low_watermark(
self.stream_id(),
NonZeroUsize::new(MIN_DATA_FRAME_SIZE).unwrap(),
)?;
return Ok(0);
}
let to_send = if available <= MAX_DATA_HEADER_SIZE_2_LIMIT {
Expand Down
13 changes: 13 additions & 0 deletions neqo-http3/src/server_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ impl StreamHandler {
.send_data(self.stream_id(), buf, &mut self.conn.borrow_mut())
}

/// Bytes sendable on stream at the QUIC layer.
///
/// Note that this does not yet account for HTTP3 frame headers.
///
/// # Errors
///
/// It may return `InvalidStreamId` if a stream does not exist anymore.
pub fn available(&mut self) -> Res<usize> {
let stream_id = self.stream_id();
let n = self.conn.borrow_mut().stream_avail_send_space(stream_id)?;
Ok(n)
}

larseggert marked this conversation as resolved.
Show resolved Hide resolved
/// Close sending side.
///
/// # Errors
Expand Down
77 changes: 77 additions & 0 deletions neqo-http3/tests/httpconn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,83 @@ fn test_103_response() {
process_client_events(&mut hconn_c);
}

/// Test [`neqo_http3::SendMessage::send_data`] to set
/// [`neqo_transport::SendStream::set_writable_event_low_watermark`].
#[allow(clippy::cast_possible_truncation)]
#[test]
fn test_data_writable_events_low_watermark() -> Result<(), Box<dyn std::error::Error>> {
const STREAM_LIMIT: u64 = 5000;
const DATA_FRAME_HEADER_SIZE: usize = 3;

// Create a client and a server.
let mut hconn_c = http3_client_with_params(Http3Parameters::default().connection_parameters(
ConnectionParameters::default().max_stream_data(StreamType::BiDi, false, STREAM_LIMIT),
));
let mut hconn_s = default_http3_server();
mem::drop(connect_peers(&mut hconn_c, &mut hconn_s));

// Client sends GET to server.
let stream_id = hconn_c.fetch(
now(),
"GET",
&("https", "something.com", "/"),
&[],
Priority::default(),
)?;
hconn_c.stream_close_send(stream_id)?;
exchange_packets(&mut hconn_c, &mut hconn_s, None);

// Server receives GET and responds with headers.
let mut request = receive_request(&mut hconn_s).unwrap();
request.send_headers(&[Header::new(":status", "200")])?;

// Sending these headers clears the server's send stream buffer and thus
// emits a DataWritable event.
exchange_packets(&mut hconn_c, &mut hconn_s, None);
let data_writable = |e| {
matches!(
e,
Http3ServerEvent::DataWritable {
stream
} if stream.stream_id() == stream_id
)
};
assert!(hconn_s.events().any(data_writable));

// Have server fill entire send buffer minus 1 byte.
let all_but_one = request.available()? - DATA_FRAME_HEADER_SIZE - 1;
let buf = vec![1; all_but_one];
let sent = request.send_data(&buf)?;
assert_eq!(sent, all_but_one);
assert_eq!(request.available()?, 1);

// Sending the buffered data clears the send stream buffer and thus emits a
// DataWritable event.
exchange_packets(&mut hconn_c, &mut hconn_s, None);
assert!(hconn_s.events().any(data_writable));

// Sending more fails, given that each data frame needs to be preceeded by a
// header, i.e. needs more than 1 byte of send space to send 1 byte payload.
assert_eq!(request.available()?, 1);
assert_eq!(request.send_data(&buf)?, 0);

// Have the client read all the pending data.
let mut recv_buf = vec![0_u8; all_but_one];
let (recvd, _) = hconn_c.read_data(now(), stream_id, &mut recv_buf)?;
assert_eq!(sent, recvd);
exchange_packets(&mut hconn_c, &mut hconn_s, None);

// Expect the server's available send space to be back to the stream limit.
assert_eq!(request.available()?, STREAM_LIMIT as usize);

// Expect the server to emit a DataWritable event, even though it always had
// at least 1 byte available to send, i.e. it never exhausted the entire
// available send space.
assert!(hconn_s.events().any(data_writable));

Ok(())
}

#[test]
fn test_data_writable_events() {
const STREAM_LIMIT: u64 = 5000;
Expand Down
29 changes: 29 additions & 0 deletions neqo-transport/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{
fmt::{self, Debug},
iter, mem,
net::{IpAddr, SocketAddr},
num::NonZeroUsize,
ops::RangeInclusive,
rc::{Rc, Weak},
time::{Duration, Instant},
Expand Down Expand Up @@ -3184,6 +3185,34 @@ impl Connection {
Ok(self.streams.get_send_stream(stream_id)?.avail())
}

/// Set low watermark for [`ConnectionEvent::SendStreamWritable`] event.
///
/// Stream emits a [`crate::ConnectionEvent::SendStreamWritable`] event
/// when:
/// - the available sendable bytes increased to or above the watermark
/// - and was previously below the watermark.
///
/// Default value is `1`. In other words
/// [`crate::ConnectionEvent::SendStreamWritable`] is emitted whenever the
/// available sendable bytes was previously at `0` and now increased to `1`
/// or more.
///
/// Use this when your protocol needs at least `watermark` amount of available
/// sendable bytes to make progress.
///
/// # Errors
/// When the stream ID is invalid.
pub fn stream_set_writable_event_low_watermark(
&mut self,
stream_id: StreamId,
watermark: NonZeroUsize,
) -> Res<()> {
self.streams
.get_send_stream_mut(stream_id)?
.set_writable_event_low_watermark(watermark);
Ok(())
}

/// Close the stream. Enqueued data will be sent.
/// # Errors
/// When the stream ID is invalid.
Expand Down
9 changes: 5 additions & 4 deletions neqo-transport/src/fc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,16 @@ where
}
}

/// Update the maximum. Returns `true` if the change was an increase.
pub fn update(&mut self, limit: u64) -> bool {
/// Update the maximum. Returns `Some` with the updated available flow
/// control if the change was an increase and `None` otherwise.
pub fn update(&mut self, limit: u64) -> Option<usize> {
debug_assert!(limit < u64::MAX);
if limit > self.limit {
self.limit = limit;
self.blocked_frame = false;
true
Some(self.available())
} else {
false
None
}
}

Expand Down
97 changes: 88 additions & 9 deletions neqo-transport/src/send_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{
collections::{btree_map::Entry, BTreeMap, VecDeque},
hash::{Hash, Hasher},
mem,
num::NonZeroUsize,
ops::Add,
rc::Rc,
};
Expand Down Expand Up @@ -710,6 +711,7 @@ pub struct SendStream {
sendorder: Option<SendOrder>,
bytes_sent: u64,
fair: bool,
writable_event_low_watermark: NonZeroUsize,
}

impl Hash for SendStream {
Expand All @@ -726,6 +728,7 @@ impl PartialEq for SendStream {
impl Eq for SendStream {}

impl SendStream {
#[allow(clippy::missing_panics_doc)] // not possible
pub fn new(
stream_id: StreamId,
max_stream_data: u64,
Expand All @@ -745,6 +748,7 @@ impl SendStream {
sendorder: None,
bytes_sent: 0,
fair: false,
writable_event_low_watermark: 1.try_into().unwrap(),
};
if ss.avail() > 0 {
ss.conn_events.send_stream_writable(stream_id);
Expand Down Expand Up @@ -1128,10 +1132,10 @@ impl SendStream {
SendStreamState::Send {
ref mut send_buf, ..
} => {
let previous_limit = send_buf.avail();
send_buf.mark_as_acked(offset, len);
if self.avail() > 0 {
self.conn_events.send_stream_writable(self.stream_id);
}
let current_limit = send_buf.avail();
martinthomson marked this conversation as resolved.
Show resolved Hide resolved
self.maybe_emit_writable_event(previous_limit, current_limit);
}
SendStreamState::DataSent {
ref mut send_buf,
Expand Down Expand Up @@ -1203,14 +1207,21 @@ impl SendStream {
}
}

/// Set low watermark for [`crate::ConnectionEvent::SendStreamWritable`]
/// event.
///
/// See [`crate::Connection::stream_set_writable_event_low_watermark`].
pub fn set_writable_event_low_watermark(&mut self, watermark: NonZeroUsize) {
self.writable_event_low_watermark = watermark;
}

pub fn set_max_stream_data(&mut self, limit: u64) {
if let SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } =
&mut self.state
{
let stream_was_blocked = fc.available() == 0;
fc.update(limit);
if stream_was_blocked && self.avail() > 0 {
self.conn_events.send_stream_writable(self.stream_id);
let previous_limit = fc.available();
if let Some(current_limit) = fc.update(limit) {
self.maybe_emit_writable_event(previous_limit, current_limit);
}
}
}
Expand Down Expand Up @@ -1369,6 +1380,27 @@ impl SendStream {
pub(crate) fn state(&mut self) -> &mut SendStreamState {
&mut self.state
}

pub(crate) fn maybe_emit_writable_event(
&mut self,
previous_limit: usize,
current_limit: usize,
) {
let low_watermark = self.writable_event_low_watermark.get();

// Skip if:
// - stream was not constrained by limit before,
// - or stream is still constrained by limit,
// - or stream is constrained by different limit.
if low_watermark < previous_limit
|| current_limit < low_watermark
|| self.avail() < low_watermark
{
return;
}

self.conn_events.send_stream_writable(self.stream_id);
}
}

impl ::std::fmt::Display for SendStream {
Expand Down Expand Up @@ -1756,7 +1788,7 @@ pub struct SendStreamRecoveryToken {

#[cfg(test)]
mod tests {
use std::{cell::RefCell, collections::VecDeque, rc::Rc};
use std::{cell::RefCell, collections::VecDeque, num::NonZeroUsize, rc::Rc};

use neqo_common::{event::Provider, hex_with_len, qtrace, Encoder};

Expand Down Expand Up @@ -2450,7 +2482,7 @@ mod tests {
// Increasing conn max (conn:4, stream:4) will unblock but not emit
// event b/c that happens in Connection::emit_frame() (tested in
// connection.rs)
assert!(conn_fc.borrow_mut().update(4));
assert!(conn_fc.borrow_mut().update(4).is_some());
assert_eq!(conn_events.events().count(), 0);
assert_eq!(s.avail(), 2);
assert_eq!(s.send(b"hello").unwrap(), 2);
Expand All @@ -2476,6 +2508,53 @@ mod tests {
assert_eq!(s.send(b"hello").unwrap(), 0);
}

#[test]
fn send_stream_writable_event_gen_with_watermark() {
let conn_fc = connection_fc(0);
let mut conn_events = ConnectionEvents::default();

let mut s = SendStream::new(4.into(), 0, Rc::clone(&conn_fc), conn_events.clone());
// Set watermark at 3.
s.set_writable_event_low_watermark(NonZeroUsize::new(3).unwrap());

// Stream is initially blocked (conn:0, stream:0, watermark: 3) and will
// not accept data.
assert_eq!(s.avail(), 0);
assert_eq!(s.send(b"hi!").unwrap(), 0);

// Increasing the connection limit (conn:10, stream:0, watermark: 3) will not generate
// event or allow sending anything. Stream is constrained by stream limit.
assert!(conn_fc.borrow_mut().update(10).is_some());
assert_eq!(s.avail(), 0);
assert_eq!(conn_events.events().count(), 0);

// Increasing the connection limit further (conn:11, stream:0, watermark: 3) will not
// generate event or allow sending anything. Stream wasn't constrained by connection
// limit before.
assert!(conn_fc.borrow_mut().update(11).is_some());
assert_eq!(s.avail(), 0);
assert_eq!(conn_events.events().count(), 0);

// Increasing to (conn:11, stream:2, watermark: 3) will allow 2 bytes
// but not generate a SendStreamWritable event as it is still below the
// configured watermark.
s.set_max_stream_data(2);
assert_eq!(conn_events.events().count(), 0);
assert_eq!(s.avail(), 2);

// Increasing to (conn:11, stream:3, watermark: 3) will generate an
// event as available sendable bytes are >= watermark.
s.set_max_stream_data(3);
let evts = conn_events.events().collect::<Vec<_>>();
assert_eq!(evts.len(), 1);
assert!(matches!(
evts[0],
ConnectionEvent::SendStreamWritable { .. }
));

assert_eq!(s.send(b"hi!").unwrap(), 3);
}

#[test]
fn send_stream_writable_event_new_stream() {
let conn_fc = connection_fc(2);
Expand Down
Loading
Loading