From 59b7c9ad55dff61ecc34be71f438e4a7f9826eb8 Mon Sep 17 00:00:00 2001 From: neuronull Date: Wed, 9 Oct 2024 09:24:09 -0600 Subject: [PATCH] fix(socket sink): gracefully shutdown on reload when stream is terminated (#21455) * fix(socket sink): gracefully shutdown on reload when stream is terminated * fix configured out dep * fix scoping fr --- .../21455-socket-sink-graceful-reload.fix.md | 3 ++ src/lib.rs | 1 - src/sinks/util/tcp.rs | 48 +++++++++---------- 3 files changed, 27 insertions(+), 25 deletions(-) create mode 100644 changelog.d/21455-socket-sink-graceful-reload.fix.md diff --git a/changelog.d/21455-socket-sink-graceful-reload.fix.md b/changelog.d/21455-socket-sink-graceful-reload.fix.md new file mode 100644 index 0000000000000..a909a44a346a0 --- /dev/null +++ b/changelog.d/21455-socket-sink-graceful-reload.fix.md @@ -0,0 +1,3 @@ +All TCP based socket sinks now gracefully handle config reloads under load. Previously, when a configuration reload occurred and data was flowing through the topology, the vector process crashed due to the TCP sink attempting to access the stream when it had been terminated. + +authors: neuronull diff --git a/src/lib.rs b/src/lib.rs index b3ab97ac8e3ea..0c5061b464eb3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -97,7 +97,6 @@ pub mod serde; #[cfg(windows)] pub mod service; pub mod signal; -#[cfg(all(any(feature = "sinks-socket", feature = "sinks-statsd"), unix))] pub(crate) mod sink_ext; #[allow(unreachable_pub)] pub mod sinks; diff --git a/src/sinks/util/tcp.rs b/src/sinks/util/tcp.rs index 91c5ec6305d9d..933cb22961585 100644 --- a/src/sinks/util/tcp.rs +++ b/src/sinks/util/tcp.rs @@ -9,7 +9,6 @@ use std::{ use async_trait::async_trait; use bytes::{Bytes, BytesMut}; use futures::{stream::BoxStream, task::noop_waker_ref, SinkExt, StreamExt}; -use futures_util::{future::ready, stream}; use snafu::{ResultExt, Snafu}; use tokio::{ io::{AsyncRead, ReadBuf}, @@ -29,6 +28,7 @@ use crate::{ ConnectionOpen, OpenGauge, SocketMode, SocketSendError, TcpSocketConnectionEstablished, TcpSocketConnectionShutdown, TcpSocketOutgoingConnectionError, }, + sink_ext::VecSinkExt, sinks::{ util::{ retries::ExponentialBackoff, @@ -281,34 +281,34 @@ where // We need [Peekable](https://docs.rs/futures/0.3.6/futures/stream/struct.Peekable.html) for initiating // connection only when we have something to send. let mut encoder = self.encoder.clone(); - let mut input = input.map(|mut event| { - let byte_size = event.size_of(); - let json_byte_size = event.estimated_json_encoded_size_of(); - let finalizers = event.metadata_mut().take_finalizers(); - self.transformer.transform(&mut event); - let mut bytes = BytesMut::new(); - - // Errors are handled by `Encoder`. - if encoder.encode(event, &mut bytes).is_ok() { - let item = bytes.freeze(); - EncodedEvent { - item, - finalizers, - byte_size, - json_byte_size, + let mut input = input + .map(|mut event| { + let byte_size = event.size_of(); + let json_byte_size = event.estimated_json_encoded_size_of(); + let finalizers = event.metadata_mut().take_finalizers(); + self.transformer.transform(&mut event); + let mut bytes = BytesMut::new(); + + // Errors are handled by `Encoder`. + if encoder.encode(event, &mut bytes).is_ok() { + let item = bytes.freeze(); + EncodedEvent { + item, + finalizers, + byte_size, + json_byte_size, + } + } else { + EncodedEvent::new(Bytes::new(), 0, JsonSize::zero()) } - } else { - EncodedEvent::new(Bytes::new(), 0, JsonSize::zero()) - } - }); + }) + .peekable(); - while let Some(item) = input.next().await { + while Pin::new(&mut input).peek().await.is_some() { let mut sink = self.connect().await; let _open_token = OpenGauge::new().open(|count| emit!(ConnectionOpen { count })); - let mut mapped_input = stream::once(ready(item)).chain(&mut input).map(Ok); - - let result = match sink.send_all(&mut mapped_input).await { + let result = match sink.send_all_peekable(&mut (&mut input).peekable()).await { Ok(()) => sink.close().await, Err(error) => Err(error), };