diff --git a/neqo-http3/src/send_message.rs b/neqo-http3/src/send_message.rs index 15965c44f6..026524a71f 100644 --- a/neqo-http3/src/send_message.rs +++ b/neqo-http3/src/send_message.rs @@ -4,7 +4,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::{cell::RefCell, cmp::min, fmt::Debug, rc::Rc}; +use std::{cell::RefCell, cmp::min, fmt::Debug, num::NonZeroUsize, rc::Rc}; use neqo_common::{qdebug, qtrace, Encoder, Header, MessageType}; use neqo_qpack::encoder::QPackEncoder; @@ -17,6 +17,7 @@ use crate::{ SendStream, SendStreamEvents, Stream, }; +const MIN_DATA_FRAME_SIZE: usize = 3; // Minimal DATA frame size: 2 (header) + 1 (payload) const MAX_DATA_HEADER_SIZE_2: usize = (1 << 6) - 1; // Maximal amount of data with DATA frame header size 2 const MAX_DATA_HEADER_SIZE_2_LIMIT: usize = MAX_DATA_HEADER_SIZE_2 + 3; // 63 + 3 (size of the next buffer data frame header) const MAX_DATA_HEADER_SIZE_3: usize = (1 << 14) - 1; // Maximal amount of data with DATA frame header size 3 @@ -177,7 +178,14 @@ impl SendStream for SendMessage { let available = conn .stream_avail_send_space(self.stream_id()) .map_err(|e| Error::map_stream_send_errors(&e.into()))?; - if available <= 2 { + if available < MIN_DATA_FRAME_SIZE { + // Setting this once, instead of every time the available send space + // is exhausted, would suffice. That said, function call should be + // cheap, thus not worth optimizing. + conn.stream_set_writable_event_low_watermark( + self.stream_id(), + NonZeroUsize::new(MIN_DATA_FRAME_SIZE).unwrap(), + )?; return Ok(0); } let to_send = if available <= MAX_DATA_HEADER_SIZE_2_LIMIT { diff --git a/neqo-http3/src/server_events.rs b/neqo-http3/src/server_events.rs index 214a48c757..119d9f9f39 100644 --- a/neqo-http3/src/server_events.rs +++ b/neqo-http3/src/server_events.rs @@ -84,6 +84,19 @@ impl StreamHandler { .send_data(self.stream_id(), buf, &mut self.conn.borrow_mut()) } + /// Bytes sendable on stream at the QUIC layer. + /// + /// Note that this does not yet account for HTTP3 frame headers. + /// + /// # Errors + /// + /// It may return `InvalidStreamId` if a stream does not exist anymore. + pub fn available(&mut self) -> Res { + let stream_id = self.stream_id(); + let n = self.conn.borrow_mut().stream_avail_send_space(stream_id)?; + Ok(n) + } + /// Close sending side. /// /// # Errors diff --git a/neqo-http3/tests/httpconn.rs b/neqo-http3/tests/httpconn.rs index c0c62de9c9..8b9e7b42e8 100644 --- a/neqo-http3/tests/httpconn.rs +++ b/neqo-http3/tests/httpconn.rs @@ -246,6 +246,83 @@ fn test_103_response() { process_client_events(&mut hconn_c); } +/// Test [`neqo_http3::SendMessage::send_data`] to set +/// [`neqo_transport::SendStream::set_writable_event_low_watermark`]. +#[allow(clippy::cast_possible_truncation)] +#[test] +fn test_data_writable_events_low_watermark() -> Result<(), Box> { + const STREAM_LIMIT: u64 = 5000; + const DATA_FRAME_HEADER_SIZE: usize = 3; + + // Create a client and a server. + let mut hconn_c = http3_client_with_params(Http3Parameters::default().connection_parameters( + ConnectionParameters::default().max_stream_data(StreamType::BiDi, false, STREAM_LIMIT), + )); + let mut hconn_s = default_http3_server(); + mem::drop(connect_peers(&mut hconn_c, &mut hconn_s)); + + // Client sends GET to server. + let stream_id = hconn_c.fetch( + now(), + "GET", + &("https", "something.com", "/"), + &[], + Priority::default(), + )?; + hconn_c.stream_close_send(stream_id)?; + exchange_packets(&mut hconn_c, &mut hconn_s, None); + + // Server receives GET and responds with headers. + let mut request = receive_request(&mut hconn_s).unwrap(); + request.send_headers(&[Header::new(":status", "200")])?; + + // Sending these headers clears the server's send stream buffer and thus + // emits a DataWritable event. + exchange_packets(&mut hconn_c, &mut hconn_s, None); + let data_writable = |e| { + matches!( + e, + Http3ServerEvent::DataWritable { + stream + } if stream.stream_id() == stream_id + ) + }; + assert!(hconn_s.events().any(data_writable)); + + // Have server fill entire send buffer minus 1 byte. + let all_but_one = request.available()? - DATA_FRAME_HEADER_SIZE - 1; + let buf = vec![1; all_but_one]; + let sent = request.send_data(&buf)?; + assert_eq!(sent, all_but_one); + assert_eq!(request.available()?, 1); + + // Sending the buffered data clears the send stream buffer and thus emits a + // DataWritable event. + exchange_packets(&mut hconn_c, &mut hconn_s, None); + assert!(hconn_s.events().any(data_writable)); + + // Sending more fails, given that each data frame needs to be preceeded by a + // header, i.e. needs more than 1 byte of send space to send 1 byte payload. + assert_eq!(request.available()?, 1); + assert_eq!(request.send_data(&buf)?, 0); + + // Have the client read all the pending data. + let mut recv_buf = vec![0_u8; all_but_one]; + let (recvd, _) = hconn_c.read_data(now(), stream_id, &mut recv_buf)?; + assert_eq!(sent, recvd); + exchange_packets(&mut hconn_c, &mut hconn_s, None); + + // Expect the server's available send space to be back to the stream limit. + assert_eq!(request.available()?, STREAM_LIMIT as usize); + + // Expect the server to emit a DataWritable event, even though it always had + // at least 1 byte available to send, i.e. it never exhausted the entire + // available send space. + assert!(hconn_s.events().any(data_writable)); + + Ok(()) +} + #[test] fn test_data_writable_events() { const STREAM_LIMIT: u64 = 5000; diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index f955381414..732cd31cf4 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -12,6 +12,7 @@ use std::{ fmt::{self, Debug}, iter, mem, net::{IpAddr, SocketAddr}, + num::NonZeroUsize, ops::RangeInclusive, rc::{Rc, Weak}, time::{Duration, Instant}, @@ -3184,6 +3185,34 @@ impl Connection { Ok(self.streams.get_send_stream(stream_id)?.avail()) } + /// Set low watermark for [`ConnectionEvent::SendStreamWritable`] event. + /// + /// Stream emits a [`crate::ConnectionEvent::SendStreamWritable`] event + /// when: + /// - the available sendable bytes increased to or above the watermark + /// - and was previously below the watermark. + /// + /// Default value is `1`. In other words + /// [`crate::ConnectionEvent::SendStreamWritable`] is emitted whenever the + /// available sendable bytes was previously at `0` and now increased to `1` + /// or more. + /// + /// Use this when your protocol needs at least `watermark` amount of available + /// sendable bytes to make progress. + /// + /// # Errors + /// When the stream ID is invalid. + pub fn stream_set_writable_event_low_watermark( + &mut self, + stream_id: StreamId, + watermark: NonZeroUsize, + ) -> Res<()> { + self.streams + .get_send_stream_mut(stream_id)? + .set_writable_event_low_watermark(watermark); + Ok(()) + } + /// Close the stream. Enqueued data will be sent. /// # Errors /// When the stream ID is invalid. diff --git a/neqo-transport/src/fc.rs b/neqo-transport/src/fc.rs index 5ddfce6463..d619fd8e82 100644 --- a/neqo-transport/src/fc.rs +++ b/neqo-transport/src/fc.rs @@ -64,15 +64,16 @@ where } } - /// Update the maximum. Returns `true` if the change was an increase. - pub fn update(&mut self, limit: u64) -> bool { + /// Update the maximum. Returns `Some` with the updated available flow + /// control if the change was an increase and `None` otherwise. + pub fn update(&mut self, limit: u64) -> Option { debug_assert!(limit < u64::MAX); if limit > self.limit { self.limit = limit; self.blocked_frame = false; - true + Some(self.available()) } else { - false + None } } diff --git a/neqo-transport/src/send_stream.rs b/neqo-transport/src/send_stream.rs index 98476e9d18..e443edc033 100644 --- a/neqo-transport/src/send_stream.rs +++ b/neqo-transport/src/send_stream.rs @@ -12,6 +12,7 @@ use std::{ collections::{btree_map::Entry, BTreeMap, VecDeque}, hash::{Hash, Hasher}, mem, + num::NonZeroUsize, ops::Add, rc::Rc, }; @@ -710,6 +711,7 @@ pub struct SendStream { sendorder: Option, bytes_sent: u64, fair: bool, + writable_event_low_watermark: NonZeroUsize, } impl Hash for SendStream { @@ -726,6 +728,7 @@ impl PartialEq for SendStream { impl Eq for SendStream {} impl SendStream { + #[allow(clippy::missing_panics_doc)] // not possible pub fn new( stream_id: StreamId, max_stream_data: u64, @@ -745,6 +748,7 @@ impl SendStream { sendorder: None, bytes_sent: 0, fair: false, + writable_event_low_watermark: 1.try_into().unwrap(), }; if ss.avail() > 0 { ss.conn_events.send_stream_writable(stream_id); @@ -1128,10 +1132,10 @@ impl SendStream { SendStreamState::Send { ref mut send_buf, .. } => { + let previous_limit = send_buf.avail(); send_buf.mark_as_acked(offset, len); - if self.avail() > 0 { - self.conn_events.send_stream_writable(self.stream_id); - } + let current_limit = send_buf.avail(); + self.maybe_emit_writable_event(previous_limit, current_limit); } SendStreamState::DataSent { ref mut send_buf, @@ -1203,14 +1207,21 @@ impl SendStream { } } + /// Set low watermark for [`crate::ConnectionEvent::SendStreamWritable`] + /// event. + /// + /// See [`crate::Connection::stream_set_writable_event_low_watermark`]. + pub fn set_writable_event_low_watermark(&mut self, watermark: NonZeroUsize) { + self.writable_event_low_watermark = watermark; + } + pub fn set_max_stream_data(&mut self, limit: u64) { if let SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } = &mut self.state { - let stream_was_blocked = fc.available() == 0; - fc.update(limit); - if stream_was_blocked && self.avail() > 0 { - self.conn_events.send_stream_writable(self.stream_id); + let previous_limit = fc.available(); + if let Some(current_limit) = fc.update(limit) { + self.maybe_emit_writable_event(previous_limit, current_limit); } } } @@ -1369,6 +1380,27 @@ impl SendStream { pub(crate) fn state(&mut self) -> &mut SendStreamState { &mut self.state } + + pub(crate) fn maybe_emit_writable_event( + &mut self, + previous_limit: usize, + current_limit: usize, + ) { + let low_watermark = self.writable_event_low_watermark.get(); + + // Skip if: + // - stream was not constrained by limit before, + // - or stream is still constrained by limit, + // - or stream is constrained by different limit. + if low_watermark < previous_limit + || current_limit < low_watermark + || self.avail() < low_watermark + { + return; + } + + self.conn_events.send_stream_writable(self.stream_id); + } } impl ::std::fmt::Display for SendStream { @@ -1756,7 +1788,7 @@ pub struct SendStreamRecoveryToken { #[cfg(test)] mod tests { - use std::{cell::RefCell, collections::VecDeque, rc::Rc}; + use std::{cell::RefCell, collections::VecDeque, num::NonZeroUsize, rc::Rc}; use neqo_common::{event::Provider, hex_with_len, qtrace, Encoder}; @@ -2450,7 +2482,7 @@ mod tests { // Increasing conn max (conn:4, stream:4) will unblock but not emit // event b/c that happens in Connection::emit_frame() (tested in // connection.rs) - assert!(conn_fc.borrow_mut().update(4)); + assert!(conn_fc.borrow_mut().update(4).is_some()); assert_eq!(conn_events.events().count(), 0); assert_eq!(s.avail(), 2); assert_eq!(s.send(b"hello").unwrap(), 2); @@ -2476,6 +2508,53 @@ mod tests { assert_eq!(s.send(b"hello").unwrap(), 0); } + #[test] + fn send_stream_writable_event_gen_with_watermark() { + let conn_fc = connection_fc(0); + let mut conn_events = ConnectionEvents::default(); + + let mut s = SendStream::new(4.into(), 0, Rc::clone(&conn_fc), conn_events.clone()); + // Set watermark at 3. + s.set_writable_event_low_watermark(NonZeroUsize::new(3).unwrap()); + + // Stream is initially blocked (conn:0, stream:0, watermark: 3) and will + // not accept data. + assert_eq!(s.avail(), 0); + assert_eq!(s.send(b"hi!").unwrap(), 0); + + // Increasing the connection limit (conn:10, stream:0, watermark: 3) will not generate + // event or allow sending anything. Stream is constrained by stream limit. + assert!(conn_fc.borrow_mut().update(10).is_some()); + assert_eq!(s.avail(), 0); + assert_eq!(conn_events.events().count(), 0); + + // Increasing the connection limit further (conn:11, stream:0, watermark: 3) will not + // generate event or allow sending anything. Stream wasn't constrained by connection + // limit before. + assert!(conn_fc.borrow_mut().update(11).is_some()); + assert_eq!(s.avail(), 0); + assert_eq!(conn_events.events().count(), 0); + + // Increasing to (conn:11, stream:2, watermark: 3) will allow 2 bytes + // but not generate a SendStreamWritable event as it is still below the + // configured watermark. + s.set_max_stream_data(2); + assert_eq!(conn_events.events().count(), 0); + assert_eq!(s.avail(), 2); + + // Increasing to (conn:11, stream:3, watermark: 3) will generate an + // event as available sendable bytes are >= watermark. + s.set_max_stream_data(3); + let evts = conn_events.events().collect::>(); + assert_eq!(evts.len(), 1); + assert!(matches!( + evts[0], + ConnectionEvent::SendStreamWritable { .. } + )); + + assert_eq!(s.send(b"hi!").unwrap(), 3); + } + #[test] fn send_stream_writable_event_new_stream() { let conn_fc = connection_fc(2); diff --git a/neqo-transport/src/streams.rs b/neqo-transport/src/streams.rs index d8662afa3b..b95b33c294 100644 --- a/neqo-transport/src/streams.rs +++ b/neqo-transport/src/streams.rs @@ -476,17 +476,13 @@ impl Streams { } pub fn handle_max_data(&mut self, maximum_data: u64) { - let conn_was_blocked = self.sender_fc.borrow().available() == 0; - let conn_credit_increased = self.sender_fc.borrow_mut().update(maximum_data); - - if conn_was_blocked && conn_credit_increased { - for (id, ss) in &mut self.send { - if ss.avail() > 0 { - // These may not actually all be writable if one - // uses up all the conn credit. Not our fault. - self.events.send_stream_writable(*id); - } - } + let previous_limit = self.sender_fc.borrow().available(); + let Some(current_limit) = self.sender_fc.borrow_mut().update(maximum_data) else { + return; + }; + + for (_id, ss) in &mut self.send { + ss.maybe_emit_writable_event(previous_limit, current_limit); } } @@ -531,7 +527,10 @@ impl Streams { } pub fn handle_max_streams(&mut self, stream_type: StreamType, maximum_streams: u64) { - if self.local_stream_limits[stream_type].update(maximum_streams) { + let increased = self.local_stream_limits[stream_type] + .update(maximum_streams) + .is_some(); + if increased { self.events.send_stream_creatable(stream_type); } }