Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(metrics): run a separate task for utilization metric to ensure it is regularly updated #22070

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions changelog.d/22070_utilization_metric_periodic_emit.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `utilization` metric is now properly published periodically, even when no events are flowing through the components.

authors: esensar Quad9DNS
98 changes: 80 additions & 18 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{

use futures::{stream::FuturesOrdered, FutureExt, StreamExt, TryStreamExt};
use futures_util::stream::FuturesUnordered;
use metrics::gauge;
use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
use tokio::{
select,
Expand Down Expand Up @@ -51,7 +52,7 @@ use crate::{
spawn_named,
topology::task::TaskError,
transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
utilization::wrap,
utilization::{wrap, UtilizationEmitter, UtilizationTimerMessage},
SourceSender,
};

Expand Down Expand Up @@ -84,6 +85,7 @@ struct Builder<'a> {
healthchecks: HashMap<ComponentKey, Task>,
detach_triggers: HashMap<ComponentKey, Trigger>,
extra_context: ExtraContext,
utilization_emitter: UtilizationEmitter,
}

impl<'a> Builder<'a> {
Expand All @@ -105,6 +107,7 @@ impl<'a> Builder<'a> {
healthchecks: HashMap::new(),
detach_triggers: HashMap::new(),
extra_context,
utilization_emitter: UtilizationEmitter::new(),
}
}

Expand All @@ -128,6 +131,7 @@ impl<'a> Builder<'a> {
healthchecks: self.healthchecks,
shutdown_coordinator: self.shutdown_coordinator,
detach_triggers: self.detach_triggers,
utilization_emitter: Some(self.utilization_emitter),
})
} else {
Err(self.errors)
Expand Down Expand Up @@ -497,7 +501,7 @@ impl<'a> Builder<'a> {

let (transform_task, transform_outputs) = {
let _span = span.enter();
build_transform(transform, node, input_rx)
build_transform(transform, node, input_rx, &mut self.utilization_emitter)
};

self.outputs.extend(transform_outputs);
Expand Down Expand Up @@ -585,6 +589,10 @@ impl<'a> Builder<'a> {

let (trigger, tripwire) = Tripwire::new();

self.utilization_emitter
.add_component(key.clone(), gauge!("utilization"));
let utilization_sender = self.utilization_emitter.get_sender();
let component_key = key.clone();
let sink = async move {
debug!("Sink starting.");

Expand All @@ -600,7 +608,7 @@ impl<'a> Builder<'a> {
.take()
.expect("Task started but input has been taken.");

let mut rx = wrap(rx);
let mut rx = wrap(utilization_sender, component_key.clone(), rx);

let events_received = register!(EventsReceived);
sink.run(
Expand Down Expand Up @@ -682,6 +690,7 @@ pub struct TopologyPieces {
pub(super) healthchecks: HashMap<ComponentKey, Task>,
pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
pub(crate) utilization_emitter: Option<UtilizationEmitter>,
}

impl TopologyPieces {
Expand Down Expand Up @@ -760,18 +769,22 @@ fn build_transform(
transform: Transform,
node: TransformNode,
input_rx: BufferReceiver<EventArray>,
utilization_emitter: &mut UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
match transform {
// TODO: avoid the double boxing for function transforms here
Transform::Function(t) => build_sync_transform(Box::new(t), node, input_rx),
Transform::Synchronous(t) => build_sync_transform(t, node, input_rx),
Transform::Function(t) => {
build_sync_transform(Box::new(t), node, input_rx, utilization_emitter)
}
Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, utilization_emitter),
Transform::Task(t) => build_task_transform(
t,
input_rx,
node.input_details.data_type(),
node.typetag,
&node.key,
&node.outputs,
utilization_emitter,
),
}
}
Expand All @@ -780,10 +793,19 @@ fn build_sync_transform(
t: Box<dyn SyncTransform>,
node: TransformNode,
input_rx: BufferReceiver<EventArray>,
utilization_emitter: &mut UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);

let runner = Runner::new(t, input_rx, node.input_details.data_type(), outputs);
utilization_emitter.add_component(node.key.clone(), gauge!("utilization"));
let runner = Runner::new(
t,
input_rx,
utilization_emitter.get_sender(),
node.key.clone(),
node.input_details.data_type(),
outputs,
);
let transform = if node.enable_concurrency {
runner.run_concurrently().boxed()
} else {
Expand Down Expand Up @@ -823,15 +845,17 @@ struct Runner {
input_rx: Option<BufferReceiver<EventArray>>,
input_type: DataType,
outputs: TransformOutputs,
timer: crate::utilization::Timer,
last_report: Instant,
key: ComponentKey,
timer_tx: UnboundedSender<UtilizationTimerMessage>,
events_received: Registered<EventsReceived>,
}

impl Runner {
fn new(
transform: Box<dyn SyncTransform>,
input_rx: BufferReceiver<EventArray>,
timer_tx: UnboundedSender<UtilizationTimerMessage>,
key: ComponentKey,
input_type: DataType,
outputs: TransformOutputs,
) -> Self {
Expand All @@ -840,17 +864,22 @@ impl Runner {
input_rx: Some(input_rx),
input_type,
outputs,
timer: crate::utilization::Timer::new(),
last_report: Instant::now(),
key,
timer_tx,
events_received: register!(EventsReceived),
}
}

fn on_events_received(&mut self, events: &EventArray) {
let stopped = self.timer.stop_wait();
if stopped.duration_since(self.last_report).as_secs() >= 5 {
self.timer.report();
self.last_report = stopped;
if self
.timer_tx
.send(UtilizationTimerMessage::StopWait(
self.key.clone(),
Instant::now(),
))
.is_err()
{
debug!(component_id = ?self.key, "Couldn't send utilization stop wait message from sync transform.");
}

self.events_received.emit(CountByteSize(
Expand All @@ -860,7 +889,16 @@ impl Runner {
}

async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
self.timer.start_wait();
if self
.timer_tx
.send(UtilizationTimerMessage::StartWait(
self.key.clone(),
Instant::now(),
))
.is_err()
{
debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform.");
}
self.outputs.send(outputs_buf).await
}

Expand All @@ -877,7 +915,16 @@ impl Runner {
.into_stream()
.filter(move |events| ready(filter_events_type(events, self.input_type)));

self.timer.start_wait();
if self
.timer_tx
.send(UtilizationTimerMessage::StartWait(
self.key.clone(),
Instant::now(),
))
.is_err()
{
debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform.");
}
while let Some(events) = input_rx.next().await {
self.on_events_received(&events);
self.transform.transform_all(events, &mut outputs_buf);
Expand All @@ -903,7 +950,16 @@ impl Runner {
let mut in_flight = FuturesOrdered::new();
let mut shutting_down = false;

self.timer.start_wait();
if self
.timer_tx
.send(UtilizationTimerMessage::StartWait(
self.key.clone(),
Instant::now(),
))
.is_err()
{
debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform.");
}
loop {
tokio::select! {
biased;
Expand Down Expand Up @@ -964,10 +1020,16 @@ fn build_task_transform(
typetag: &str,
key: &ComponentKey,
outputs: &[TransformOutput],
utilization_emitter: &mut UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (mut fanout, control) = Fanout::new();

let input_rx = crate::utilization::wrap(input_rx.into_stream());
utilization_emitter.add_component(key.clone(), gauge!("utilization"));
let input_rx = wrap(
utilization_emitter.get_sender(),
key.clone(),
input_rx.into_stream(),
);

let events_received = register!(EventsReceived);
let filtered = input_rx
Expand Down
41 changes: 36 additions & 5 deletions src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use std::{
};

use super::{
builder,
builder::TopologyPieces,
builder::{self, TopologyPieces},
fanout::{ControlChannel, ControlMessage},
handle_errors, retain, take_healthchecks,
task::TaskOutput,
task::{Task, TaskOutput},
BuiltBuffer, TaskHandle,
};
use crate::{
Expand All @@ -23,14 +22,15 @@ use crate::{
spawn_named,
};
use futures::{future, Future, FutureExt};
use stream_cancel::Trigger;
use tokio::{
sync::{mpsc, watch},
time::{interval, sleep_until, Duration, Instant},
};
use tracing::Instrument;
use vector_lib::buffers::topology::channel::BufferSender;
use vector_lib::tap::topology::{TapOutput, TapResource, WatchRx, WatchTx};
use vector_lib::trigger::DisabledTrigger;
use vector_lib::{buffers::topology::channel::BufferSender, shutdown::ShutdownSignal};

pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;

Expand All @@ -49,6 +49,8 @@ pub struct RunningTopology {
watch: (WatchTx, WatchRx),
pub(crate) running: Arc<AtomicBool>,
graceful_shutdown_duration: Option<Duration>,
utilization_task: Option<TaskHandle>,
utilization_task_shutdown_trigger: Option<Trigger>,
}

impl RunningTopology {
Expand All @@ -67,6 +69,8 @@ impl RunningTopology {
running: Arc::new(AtomicBool::new(true)),
graceful_shutdown_duration: config.graceful_shutdown_duration,
config,
utilization_task: None,
utilization_task_shutdown_trigger: None,
}
}

Expand Down Expand Up @@ -116,15 +120,21 @@ impl RunningTopology {
// pump in self.tasks, and the other for source in self.source_tasks.
let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();

let map_closure = |_result| ();

// We need to give some time to the sources to gracefully shutdown, so
// we will merge them with other tasks.
for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) {
let task = task.map(|_result| ()).shared();
let task = task.map(map_closure).shared();

wait_handles.push(task.clone());
check_handles.entry(key).or_default().push(task);
}

if let Some(utilization_task) = self.utilization_task {
wait_handles.push(utilization_task.map(map_closure).shared());
}

// If we reach this, we will forcefully shutdown the sources. If None, we will never force shutdown.
let deadline = self
.graceful_shutdown_duration
Expand Down Expand Up @@ -201,6 +211,9 @@ impl RunningTopology {

// Now kick off the shutdown process by shutting down the sources.
let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
if let Some(trigger) = self.utilization_task_shutdown_trigger {
trigger.cancel();
}

futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
}
Expand Down Expand Up @@ -1042,6 +1055,7 @@ impl RunningTopology {
return None;
}

let mut utilization_emitter = pieces.utilization_emitter.take().unwrap();
let mut running_topology = Self::new(config, abort_tx);

if !running_topology
Expand All @@ -1053,6 +1067,23 @@ impl RunningTopology {
running_topology.connect_diff(&diff, &mut pieces).await;
running_topology.spawn_diff(&diff, pieces);

let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
ShutdownSignal::new_wired();
running_topology.utilization_task_shutdown_trigger =
Some(utilization_task_shutdown_trigger);
running_topology.utilization_task = Some(tokio::spawn(Task::new(
"utilization_heartbeat".into(),
"",
async move {
utilization_emitter
.run_utilization(utilization_shutdown_signal)
.await;
// TODO: new task output type for this? Or handle this task in a completely
// different way
Ok(TaskOutput::Healthcheck)
},
)));

Some((running_topology, abort_rx))
}
}
Expand Down
Loading
Loading