Skip to content

Commit

Permalink
fix(socket sink): gracefully shutdown on reload when stream is termin…
Browse files Browse the repository at this point in the history
…ated
  • Loading branch information
neuronull committed Oct 8, 2024
1 parent 49bb1b5 commit a449052
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 24 deletions.
3 changes: 3 additions & 0 deletions changelog.d/21455-socket-sink-graceful-reload.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
All TCP based socket sinks now gracefully handle config reloads under load. Previously, when a configuration reload occurred and data was flowing through the topology, the vector process crashed due to the TCP sink attempting to access the stream when it had been terminated.

authors: neuronull
48 changes: 24 additions & 24 deletions src/sinks/util/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::{
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use futures::{stream::BoxStream, task::noop_waker_ref, SinkExt, StreamExt};
use futures_util::{future::ready, stream};
use snafu::{ResultExt, Snafu};
use tokio::{
io::{AsyncRead, ReadBuf},
Expand All @@ -29,6 +28,7 @@ use crate::{
ConnectionOpen, OpenGauge, SocketMode, SocketSendError, TcpSocketConnectionEstablished,
TcpSocketConnectionShutdown, TcpSocketOutgoingConnectionError,
},
sink_ext::VecSinkExt,

Check failure on line 31 in src/sinks/util/tcp.rs

View workflow job for this annotation

GitHub Actions / Checks

unresolved import `crate::sink_ext`
sinks::{
util::{
retries::ExponentialBackoff,
Expand Down Expand Up @@ -281,34 +281,34 @@ where
// We need [Peekable](https://docs.rs/futures/0.3.6/futures/stream/struct.Peekable.html) for initiating
// connection only when we have something to send.
let mut encoder = self.encoder.clone();
let mut input = input.map(|mut event| {
let byte_size = event.size_of();
let json_byte_size = event.estimated_json_encoded_size_of();
let finalizers = event.metadata_mut().take_finalizers();
self.transformer.transform(&mut event);
let mut bytes = BytesMut::new();

// Errors are handled by `Encoder`.
if encoder.encode(event, &mut bytes).is_ok() {
let item = bytes.freeze();
EncodedEvent {
item,
finalizers,
byte_size,
json_byte_size,
let mut input = input
.map(|mut event| {
let byte_size = event.size_of();
let json_byte_size = event.estimated_json_encoded_size_of();
let finalizers = event.metadata_mut().take_finalizers();
self.transformer.transform(&mut event);
let mut bytes = BytesMut::new();

// Errors are handled by `Encoder`.
if encoder.encode(event, &mut bytes).is_ok() {
let item = bytes.freeze();
EncodedEvent {
item,
finalizers,
byte_size,
json_byte_size,
}
} else {
EncodedEvent::new(Bytes::new(), 0, JsonSize::zero())
}
} else {
EncodedEvent::new(Bytes::new(), 0, JsonSize::zero())
}
});
})
.peekable();

while let Some(item) = input.next().await {
while Pin::new(&mut input).peek().await.is_some() {
let mut sink = self.connect().await;
let _open_token = OpenGauge::new().open(|count| emit!(ConnectionOpen { count }));

let mut mapped_input = stream::once(ready(item)).chain(&mut input).map(Ok);

let result = match sink.send_all(&mut mapped_input).await {
let result = match sink.send_all_peekable(&mut (&mut input).peekable()).await {

Check failure on line 311 in src/sinks/util/tcp.rs

View workflow job for this annotation

GitHub Actions / Checks

no method named `send_all_peekable` found for struct `BytesSink` in the current scope
Ok(()) => sink.close().await,
Err(error) => Err(error),
};
Expand Down

0 comments on commit a449052

Please sign in to comment.