Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(metrics): run a separate task for utilization metric to ensure it is regularly updated #22070

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions changelog.d/22070_utilization_metric_periodic_emit.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `utilization` metric is now properly published periodically, even when no events are flowing through the components.

authors: esensar
99 changes: 81 additions & 18 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{

use futures::{stream::FuturesOrdered, FutureExt, StreamExt, TryStreamExt};
use futures_util::stream::FuturesUnordered;
use metrics::gauge;
use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
use tokio::{
select,
Expand Down Expand Up @@ -51,7 +52,7 @@ use crate::{
spawn_named,
topology::task::TaskError,
transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
utilization::wrap,
utilization::{wrap, UtilizationEmitter, UtilizationTimerMessage},
SourceSender,
};

Expand Down Expand Up @@ -84,6 +85,7 @@ struct Builder<'a> {
healthchecks: HashMap<ComponentKey, Task>,
detach_triggers: HashMap<ComponentKey, Trigger>,
extra_context: ExtraContext,
utilization_emitter: UtilizationEmitter,
}

impl<'a> Builder<'a> {
Expand All @@ -105,6 +107,7 @@ impl<'a> Builder<'a> {
healthchecks: HashMap::new(),
detach_triggers: HashMap::new(),
extra_context,
utilization_emitter: UtilizationEmitter::new(),
}
}

Expand All @@ -128,6 +131,7 @@ impl<'a> Builder<'a> {
healthchecks: self.healthchecks,
shutdown_coordinator: self.shutdown_coordinator,
detach_triggers: self.detach_triggers,
utilization_emitter: Some(self.utilization_emitter),
})
} else {
Err(self.errors)
Expand Down Expand Up @@ -497,7 +501,7 @@ impl<'a> Builder<'a> {

let (transform_task, transform_outputs) = {
let _span = span.enter();
build_transform(transform, node, input_rx)
build_transform(transform, node, input_rx, &mut self.utilization_emitter)
};

self.outputs.extend(transform_outputs);
Expand All @@ -506,6 +510,7 @@ impl<'a> Builder<'a> {
}

async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
let utilization_sender = self.utilization_emitter.get_sender();
for (key, sink) in self
.config
.sinks()
Expand Down Expand Up @@ -585,6 +590,10 @@ impl<'a> Builder<'a> {

let (trigger, tripwire) = Tripwire::new();

self.utilization_emitter
.add_component(key.clone(), gauge!("utilization"));
let utilization_sender = utilization_sender.clone();
let component_key = key.clone();
let sink = async move {
debug!("Sink starting.");

Expand All @@ -600,7 +609,7 @@ impl<'a> Builder<'a> {
.take()
.expect("Task started but input has been taken.");

let mut rx = wrap(rx);
let mut rx = wrap(utilization_sender, component_key.clone(), rx);

let events_received = register!(EventsReceived);
sink.run(
Expand Down Expand Up @@ -682,6 +691,7 @@ pub struct TopologyPieces {
pub(super) healthchecks: HashMap<ComponentKey, Task>,
pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
pub(crate) utilization_emitter: Option<UtilizationEmitter>,
}

impl TopologyPieces {
Expand Down Expand Up @@ -760,18 +770,22 @@ fn build_transform(
transform: Transform,
node: TransformNode,
input_rx: BufferReceiver<EventArray>,
utilization_emitter: &mut UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
match transform {
// TODO: avoid the double boxing for function transforms here
Transform::Function(t) => build_sync_transform(Box::new(t), node, input_rx),
Transform::Synchronous(t) => build_sync_transform(t, node, input_rx),
Transform::Function(t) => {
build_sync_transform(Box::new(t), node, input_rx, utilization_emitter)
}
Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, utilization_emitter),
Transform::Task(t) => build_task_transform(
t,
input_rx,
node.input_details.data_type(),
node.typetag,
&node.key,
&node.outputs,
utilization_emitter,
),
}
}
Expand All @@ -780,10 +794,19 @@ fn build_sync_transform(
t: Box<dyn SyncTransform>,
node: TransformNode,
input_rx: BufferReceiver<EventArray>,
utilization_emitter: &mut UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);

let runner = Runner::new(t, input_rx, node.input_details.data_type(), outputs);
utilization_emitter.add_component(node.key.clone(), gauge!("utilization"));
let runner = Runner::new(
t,
input_rx,
utilization_emitter.get_sender(),
node.key.clone(),
node.input_details.data_type(),
outputs,
);
let transform = if node.enable_concurrency {
runner.run_concurrently().boxed()
} else {
Expand Down Expand Up @@ -823,15 +846,17 @@ struct Runner {
input_rx: Option<BufferReceiver<EventArray>>,
input_type: DataType,
outputs: TransformOutputs,
timer: crate::utilization::Timer,
last_report: Instant,
key: ComponentKey,
timer_tx: UnboundedSender<UtilizationTimerMessage>,
events_received: Registered<EventsReceived>,
}

impl Runner {
fn new(
transform: Box<dyn SyncTransform>,
input_rx: BufferReceiver<EventArray>,
timer_tx: UnboundedSender<UtilizationTimerMessage>,
key: ComponentKey,
input_type: DataType,
outputs: TransformOutputs,
) -> Self {
Expand All @@ -840,17 +865,22 @@ impl Runner {
input_rx: Some(input_rx),
input_type,
outputs,
timer: crate::utilization::Timer::new(),
last_report: Instant::now(),
key,
timer_tx,
events_received: register!(EventsReceived),
}
}

fn on_events_received(&mut self, events: &EventArray) {
let stopped = self.timer.stop_wait();
if stopped.duration_since(self.last_report).as_secs() >= 5 {
self.timer.report();
self.last_report = stopped;
if self
.timer_tx
.send(UtilizationTimerMessage::StopWait(
self.key.clone(),
Instant::now(),
))
.is_err()
{
debug!(component_id = ?self.key, "Couldn't send utilization stop wait message from sync transform.");
}

self.events_received.emit(CountByteSize(
Expand All @@ -860,7 +890,16 @@ impl Runner {
}

async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
self.timer.start_wait();
if self
.timer_tx
.send(UtilizationTimerMessage::StartWait(
self.key.clone(),
Instant::now(),
))
.is_err()
{
debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform.");
}
self.outputs.send(outputs_buf).await
}

Expand All @@ -877,7 +916,16 @@ impl Runner {
.into_stream()
.filter(move |events| ready(filter_events_type(events, self.input_type)));

self.timer.start_wait();
if self
.timer_tx
.send(UtilizationTimerMessage::StartWait(
self.key.clone(),
Instant::now(),
))
.is_err()
{
debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform.");
}
while let Some(events) = input_rx.next().await {
self.on_events_received(&events);
self.transform.transform_all(events, &mut outputs_buf);
Expand All @@ -903,7 +951,16 @@ impl Runner {
let mut in_flight = FuturesOrdered::new();
let mut shutting_down = false;

self.timer.start_wait();
if self
.timer_tx
.send(UtilizationTimerMessage::StartWait(
self.key.clone(),
Instant::now(),
))
.is_err()
{
debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform.");
}
loop {
tokio::select! {
biased;
Expand Down Expand Up @@ -964,10 +1021,16 @@ fn build_task_transform(
typetag: &str,
key: &ComponentKey,
outputs: &[TransformOutput],
utilization_emitter: &mut UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (mut fanout, control) = Fanout::new();

let input_rx = crate::utilization::wrap(input_rx.into_stream());
utilization_emitter.add_component(key.clone(), gauge!("utilization"));
let input_rx = wrap(
utilization_emitter.get_sender(),
key.clone(),
input_rx.into_stream(),
);

let events_received = register!(EventsReceived);
let filtered = input_rx
Expand Down
21 changes: 17 additions & 4 deletions src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use std::{
};

use super::{
builder,
builder::TopologyPieces,
builder::{self, TopologyPieces},
fanout::{ControlChannel, ControlMessage},
handle_errors, retain, take_healthchecks,
task::TaskOutput,
task::{Task, TaskOutput},
BuiltBuffer, TaskHandle,
};
use crate::{
Expand All @@ -28,9 +27,9 @@ use tokio::{
time::{interval, sleep_until, Duration, Instant},
};
use tracing::Instrument;
use vector_lib::buffers::topology::channel::BufferSender;
use vector_lib::tap::topology::{TapOutput, TapResource, WatchRx, WatchTx};
use vector_lib::trigger::DisabledTrigger;
use vector_lib::{buffers::topology::channel::BufferSender, shutdown::ShutdownSignal};

pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;

Expand All @@ -49,6 +48,7 @@ pub struct RunningTopology {
watch: (WatchTx, WatchRx),
pub(crate) running: Arc<AtomicBool>,
graceful_shutdown_duration: Option<Duration>,
utilization_task: Option<TaskHandle>,
}

impl RunningTopology {
Expand All @@ -67,6 +67,7 @@ impl RunningTopology {
running: Arc::new(AtomicBool::new(true)),
graceful_shutdown_duration: config.graceful_shutdown_duration,
config,
utilization_task: None,
}
}

Expand Down Expand Up @@ -1042,6 +1043,7 @@ impl RunningTopology {
return None;
}

let mut utilization_emitter = pieces.utilization_emitter.take().unwrap();
let mut running_topology = Self::new(config, abort_tx);

if !running_topology
Expand All @@ -1053,6 +1055,17 @@ impl RunningTopology {
running_topology.connect_diff(&diff, &mut pieces).await;
running_topology.spawn_diff(&diff, pieces);

running_topology.utilization_task =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Still trying to parse the details here)
Do we join this handle at any point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we do. I forgot to add that it seems. Should it be joined in stop? I can see that other tasks are joined there.

Copy link
Member

@pront pront Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds right.

// TODO: how to name this custom task?
Some(tokio::spawn(Task::new("".into(), "", async move {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A possible name utilization_heartbeat. But here I have a more basic question, do we expect this to repeat every 5 seconds (the hardcoded value)? In my test it seems like it was way more frequent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is expected every 5 seconds. Not sure what went wrong there, in my testing it was repeated every 5 seconds (even though utilization was printed from the sink every second, it was only updated by this component every 5).

utilization_emitter
.run_utilization(ShutdownSignal::noop())
.await;
// TODO: new task output type for this? Or handle this task in a completely
// different way
Ok(TaskOutput::Healthcheck)
})));

Some((running_topology, abort_rx))
}
}
Expand Down
Loading
Loading