From 36b352fbf7a8475f94613a965c21732960c09dc2 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 18 Oct 2024 11:08:04 +0200 Subject: [PATCH] feat(bin): don't allocate in UDP recv path (#2189) Pass a long lived receive buffer to `neqo_udp::recv_inner`, receiving an iterator of `Datagram<&[u8]>`s pointing into that buffer, thus no longer allocating in UDP receive path. --- neqo-bin/src/client/http09.rs | 9 ++--- neqo-bin/src/client/http3.rs | 9 ++--- neqo-bin/src/client/mod.rs | 62 ++++++++++++++++++++--------------- neqo-bin/src/server/http09.rs | 2 +- neqo-bin/src/server/http3.rs | 2 +- neqo-bin/src/server/mod.rs | 15 ++++++--- neqo-bin/src/udp.rs | 15 +++++---- 7 files changed, 66 insertions(+), 48 deletions(-) diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index d4a1829892..728088a3f8 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -181,10 +181,11 @@ impl super::Client for Connection { self.process_output(now) } - fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) - where - I: IntoIterator>, - { + fn process_multiple_input<'a>( + &mut self, + dgrams: impl IntoIterator>, + now: Instant, + ) { self.process_multiple_input(dgrams, now); } diff --git a/neqo-bin/src/client/http3.rs b/neqo-bin/src/client/http3.rs index b8745a1fd6..e667355d9b 100644 --- a/neqo-bin/src/client/http3.rs +++ b/neqo-bin/src/client/http3.rs @@ -132,10 +132,11 @@ impl super::Client for Http3Client { self.process_output(now) } - fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) - where - I: IntoIterator>, - { + fn process_multiple_input<'a>( + &mut self, + dgrams: impl IntoIterator>, + now: Instant, + ) { self.process_multiple_input(dgrams, now); } diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs index beac31dda3..9b06195ec7 100644 --- a/neqo-bin/src/client/mod.rs +++ b/neqo-bin/src/client/mod.rs @@ -374,9 +374,11 @@ enum CloseState { /// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`]. trait Client { fn process_output(&mut self, now: Instant) -> Output; - fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) - where - I: IntoIterator>; + fn process_multiple_input<'a>( + &mut self, + dgrams: impl IntoIterator>, + now: Instant, + ); fn has_events(&self) -> bool; fn close(&mut self, now: Instant, app_error: AppError, msg: S) where @@ -392,9 +394,28 @@ struct Runner<'a, H: Handler> { handler: H, timeout: Option>>, args: &'a Args, + recv_buf: Vec, } impl<'a, H: Handler> Runner<'a, H> { + fn new( + local_addr: SocketAddr, + socket: &'a mut crate::udp::Socket, + client: H::Client, + handler: H, + args: &'a Args, + ) -> Self { + Self { + local_addr, + socket, + client, + handler, + args, + timeout: None, + recv_buf: vec![0; neqo_udp::RECV_BUF_SIZE], + } + } + async fn run(mut self) -> Res> { loop { let handler_done = self.handler.handle(&mut self.client)?; @@ -457,12 +478,13 @@ impl<'a, H: Handler> Runner<'a, H> { async fn process_multiple_input(&mut self) -> Res<()> { loop { - let dgrams = self.socket.recv(&self.local_addr)?; - if dgrams.is_empty() { + let Some(dgrams) = self.socket.recv(&self.local_addr, &mut self.recv_buf)? else { + break; + }; + if dgrams.len() == 0 { break; } - self.client - .process_multiple_input(dgrams.iter().map(Datagram::borrow), Instant::now()); + self.client.process_multiple_input(dgrams, Instant::now()); self.process_output().await?; } @@ -573,32 +595,18 @@ pub async fn client(mut args: Args) -> Res<()> { let handler = http09::Handler::new(to_request, &args); - Runner { - args: &args, - client, - handler, - local_addr: real_local, - socket: &mut socket, - timeout: None, - } - .run() - .await? + Runner::new(real_local, &mut socket, client, handler, &args) + .run() + .await? } else { let client = http3::create_client(&args, real_local, remote_addr, &hostname, token) .expect("failed to create client"); let handler = http3::Handler::new(to_request, &args); - Runner { - args: &args, - client, - handler, - local_addr: real_local, - socket: &mut socket, - timeout: None, - } - .run() - .await? + Runner::new(real_local, &mut socket, client, handler, &args) + .run() + .await? }; } } diff --git a/neqo-bin/src/server/http09.rs b/neqo-bin/src/server/http09.rs index ff7214f3a8..1815140b01 100644 --- a/neqo-bin/src/server/http09.rs +++ b/neqo-bin/src/server/http09.rs @@ -185,7 +185,7 @@ impl HttpServer { } impl super::HttpServer for HttpServer { - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { + fn process(&mut self, dgram: Option, now: Instant) -> Output { self.server.process(dgram, now) } diff --git a/neqo-bin/src/server/http3.rs b/neqo-bin/src/server/http3.rs index 1cb9daf6d2..3506387a62 100644 --- a/neqo-bin/src/server/http3.rs +++ b/neqo-bin/src/server/http3.rs @@ -79,7 +79,7 @@ impl Display for HttpServer { } impl super::HttpServer for HttpServer { - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> neqo_http3::Output { + fn process(&mut self, dgram: Option, now: Instant) -> neqo_http3::Output { self.server.process(dgram, now) } diff --git a/neqo-bin/src/server/mod.rs b/neqo-bin/src/server/mod.rs index abf614f1f8..4dce9d243f 100644 --- a/neqo-bin/src/server/mod.rs +++ b/neqo-bin/src/server/mod.rs @@ -194,7 +194,7 @@ fn qns_read_response(filename: &str) -> Result, io::Error> { #[allow(clippy::module_name_repetitions)] pub trait HttpServer: Display { - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output; + fn process(&mut self, dgram: Option, now: Instant) -> Output; fn process_events(&mut self, now: Instant); fn has_events(&self) -> bool; } @@ -205,6 +205,7 @@ pub struct ServerRunner { server: Box, timeout: Option>>, sockets: Vec<(SocketAddr, crate::udp::Socket)>, + recv_buf: Vec, } impl ServerRunner { @@ -219,6 +220,7 @@ impl ServerRunner { server, timeout: None, sockets, + recv_buf: vec![0; neqo_udp::RECV_BUF_SIZE], } } @@ -236,7 +238,7 @@ impl ServerRunner { .unwrap_or(first_socket) } - async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> { + async fn process(&mut self, mut dgram: Option) -> Result<(), io::Error> { loop { match self.server.process(dgram.take(), (self.now)()) { Output::Datagram(dgram) => { @@ -289,12 +291,15 @@ impl ServerRunner { match self.ready().await? { Ready::Socket(inx) => loop { let (host, socket) = self.sockets.get_mut(inx).unwrap(); - let dgrams = socket.recv(host)?; - if dgrams.is_empty() { + let Some(dgrams) = socket.recv(host, &mut self.recv_buf)? else { + break; + }; + if dgrams.len() == 0 { break; } + let dgrams: Vec = dgrams.map(|d| d.to_owned()).collect(); for dgram in dgrams { - self.process(Some(&dgram)).await?; + self.process(Some(dgram)).await?; } }, Ready::Timeout => { diff --git a/neqo-bin/src/udp.rs b/neqo-bin/src/udp.rs index c418f5ee3c..0ea6edc449 100644 --- a/neqo-bin/src/udp.rs +++ b/neqo-bin/src/udp.rs @@ -7,7 +7,7 @@ use std::{io, net::SocketAddr}; use neqo_common::Datagram; -use neqo_transport::RECV_BUFFER_SIZE; +use neqo_udp::DatagramIter; /// Ideally this would live in [`neqo-udp`]. [`neqo-udp`] is used in Firefox. /// @@ -56,16 +56,19 @@ impl Socket { /// Receive a batch of [`Datagram`]s on the given [`Socket`], each set with /// the provided local address. - pub fn recv(&self, local_address: &SocketAddr) -> Result, io::Error> { - let mut recv_buf = vec![0; RECV_BUFFER_SIZE]; + pub fn recv<'a>( + &self, + local_address: &SocketAddr, + recv_buf: &'a mut [u8], + ) -> Result>, io::Error> { self.inner .try_io(tokio::io::Interest::READABLE, || { - neqo_udp::recv_inner(local_address, &self.state, &self.inner, &mut recv_buf) + neqo_udp::recv_inner(local_address, &self.state, &self.inner, recv_buf) }) - .map(|dgrams| dgrams.map(|d| d.to_owned()).collect()) + .map(Some) .or_else(|e| { if e.kind() == io::ErrorKind::WouldBlock { - Ok(vec![]) + Ok(None) } else { Err(e) }