Skip to content

Commit

Permalink
Write in parallel, separate from reading.
Browse files Browse the repository at this point in the history
This ensures that TLS writes can be done without blocking TLS reads.
In addition, use partial TLS writes if possible.
  • Loading branch information
zlogic committed Dec 17, 2024
1 parent d39cf44 commit 77011fa
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 31 deletions.
5 changes: 2 additions & 3 deletions src/fortivpn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,9 +616,8 @@ impl FortiVPNTunnel {
}
}

pub async fn write_data(&mut self, data: &mut [u8]) -> Result<usize, FortiError> {
self.socket.write_all(data).await?;
Ok(data.len())
pub async fn write_data(&mut self, data: &[u8]) -> Result<usize, FortiError> {
Ok(self.socket.write(data).await?)
}

pub async fn flush(&mut self) -> Result<(), FortiError> {
Expand Down
2 changes: 1 addition & 1 deletion src/fortivpn/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl FortiService {
pub async fn process_events(&mut self) -> Result<(), VpnServiceError> {
if let ConnectionState::Connected(state) = &mut self.state {
if !state.send_range.is_empty() {
let remaining_data = &mut state.send_buffer[state.send_range.clone()];
let remaining_data = &state.send_buffer[state.send_range.clone()];
match state.tunnel.write_data(remaining_data).await {
Ok(sent_bytes) => {
state.send_range.start += sent_bytes;
Expand Down
64 changes: 37 additions & 27 deletions src/ikev2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,14 @@ impl Server {
// Wait until something is ready.
let mut udp_read_buf = ReadBuf::new(&mut udp_read_buffer);
poll_seed = poll_seed.wrapping_add(1);
let mut vpn_event = None;
let mut sent_udp = None;
let mut udp_source = None;
let mut command_message = None;
{
let (command_message, udp_source, vpn_event) = {
let mut vpn_event = None;
let mut udp_source = None;
let mut command_message = None;
let ignore_vpn = shutdown && !vpn_is_connected;
let mut receive_command = pin!(command_receiver.recv());
let mut receive_vpn_event = pin!(vpn_service.wait_event(&mut vpn_read_buffer));
let sockets = &mut sessions.sockets;
let must_send_esp = sockets.is_pending_send();
let sockets = &sessions.sockets;
future::poll_fn(|cx| {
if vpn_event.is_none() {
vpn_event = if !ignore_vpn {
Expand All @@ -186,8 +184,8 @@ impl Server {
None
};
}
if sent_udp.is_none() && must_send_esp {
sent_udp = match sockets.poll_pending_send(cx) {
if udp_source.is_none() {
udp_source = match sockets.poll_recv(cx, poll_seed, &mut udp_read_buf) {
Poll::Ready(result) => Some(result),
Poll::Pending => None,
};
Expand All @@ -198,25 +196,14 @@ impl Server {
Poll::Pending => None,
}
}
if udp_source.is_none() {
udp_source = match sockets.poll_recv(cx, poll_seed, &mut udp_read_buf) {
Poll::Ready(result) => Some(result),
Poll::Pending => None,
};
}
if must_send_esp && sent_udp.is_none() {
Poll::Pending
} else if vpn_event.is_some()
|| sent_udp.is_some()
|| command_message.is_some()
|| udp_source.is_some()
{
if vpn_event.is_some() || udp_source.is_some() || command_message.is_some() {
Poll::Ready(())
} else {
Poll::Pending
}
})
.await;
(command_message, udp_source, vpn_event)
};
// Process all ready events.
if let Some(message) = command_message {
Expand Down Expand Up @@ -277,10 +264,37 @@ impl Server {
}
None => {}
}
let (vpn_event, sent_udp) = {
let mut process_vpn_events = pin!(vpn_service.process_events());
let mut sent_udp = None;
let mut vpn_event = None;
let sockets = &mut sessions.sockets;
future::poll_fn(|cx| {
if vpn_event.is_none() {
vpn_event = match process_vpn_events.as_mut().poll(cx) {
Poll::Ready(result) => Some(result),
Poll::Pending => None,
};
}
if sent_udp.is_none() {
sent_udp = match sockets.poll_pending_send(cx) {
Poll::Ready(result) => Some(result),
Poll::Pending => None,
};
}
if vpn_event.is_some() && sent_udp.is_some() {
Poll::Ready(())
} else {
Poll::Pending
}
})
.await;
(vpn_event, sent_udp)
};
if let Some(Err(err)) = sent_udp {
warn!("Failed to send UDP ESP message: {}", err);
}
if let Err(err) = vpn_service.process_events().await {
if let Some(Err(err)) = vpn_event {
warn!("Failed to process VPN lifecycle events: {}", err);
}
if vpn_is_connected && !vpn_service.is_connected() {
Expand Down Expand Up @@ -433,10 +447,6 @@ impl Sockets {
}
Ok(())
}

fn is_pending_send(&self) -> bool {
self.pending_send.is_some()
}
}

struct UdpDatagramOrigin {
Expand Down

0 comments on commit 77011fa

Please sign in to comment.