diff --git a/changelog.d/vector_tap_duration_flag.enhancement.md b/changelog.d/vector_tap_duration_flag.enhancement.md new file mode 100644 index 0000000000000..626f72ef2b089 --- /dev/null +++ b/changelog.d/vector_tap_duration_flag.enhancement.md @@ -0,0 +1,5 @@ +The `vector tap` command now has an optional `duration_ms` flag that allows you to specify the duration of the +tap. By default, the tap will run indefinitely, but if a duration is specified (in milliseconds) the tap will +automatically stop after that duration has elapsed. + +authors: ArunPiduguDD diff --git a/src/tap/cmd.rs b/src/tap/cmd.rs index d402641f0840a..88ce67f76eef5 100644 --- a/src/tap/cmd.rs +++ b/src/tap/cmd.rs @@ -1,6 +1,7 @@ -use std::{borrow::Cow, collections::BTreeMap, time::Duration}; - use colored::{ColoredString, Colorize}; +use std::time::Instant; +use std::{borrow::Cow, collections::BTreeMap, time::Duration}; +use tokio::time::timeout; use tokio_stream::StreamExt; use url::Url; use vector_lib::api_client::{ @@ -98,34 +99,52 @@ async fn run( ); }; + let start_time = Instant::now(); + let stream_duration = opts + .duration_ms + .map(Duration::from_millis) + .unwrap_or(Duration::MAX); + // Loop over the returned results, printing out tap events. #[allow(clippy::print_stdout)] #[allow(clippy::print_stderr)] loop { - let message = stream.next().await; - if let Some(Some(res)) = message { - if let Some(d) = res.data { - for tap_event in d.output_events_by_component_id_patterns.iter() { - match tap_event { - OutputEventsByComponentIdPatternsSubscriptionOutputEventsByComponentIdPatterns::Log(ev) => { - println!("{}", formatter.format(ev.component_id.as_ref(), ev.component_kind.as_ref(), ev.component_type.as_ref(), ev.string.as_ref())); - }, - OutputEventsByComponentIdPatternsSubscriptionOutputEventsByComponentIdPatterns::Metric(ev) => { - println!("{}", formatter.format(ev.component_id.as_ref(), ev.component_kind.as_ref(), ev.component_type.as_ref(), ev.string.as_ref())); - }, - OutputEventsByComponentIdPatternsSubscriptionOutputEventsByComponentIdPatterns::Trace(ev) => { - println!("{}", formatter.format(ev.component_id.as_ref(), ev.component_kind.as_ref(), ev.component_type.as_ref(), ev.string.as_ref())); - }, - OutputEventsByComponentIdPatternsSubscriptionOutputEventsByComponentIdPatterns::EventNotification(ev) => { - if !opts.quiet { - eprintln!("{}", ev.message); - } - }, + let time_elapsed = start_time.elapsed(); + if time_elapsed >= stream_duration { + return exitcode::OK; + } + + let message = timeout(stream_duration - time_elapsed, stream.next()).await; + match message { + Ok(Some(Some(res))) => { + if let Some(d) = res.data { + for tap_event in d.output_events_by_component_id_patterns.iter() { + match tap_event { + OutputEventsByComponentIdPatternsSubscriptionOutputEventsByComponentIdPatterns::Log(ev) => { + println!("{}", formatter.format(ev.component_id.as_ref(), ev.component_kind.as_ref(), ev.component_type.as_ref(), ev.string.as_ref())); + }, + OutputEventsByComponentIdPatternsSubscriptionOutputEventsByComponentIdPatterns::Metric(ev) => { + println!("{}", formatter.format(ev.component_id.as_ref(), ev.component_kind.as_ref(), ev.component_type.as_ref(), ev.string.as_ref())); + }, + OutputEventsByComponentIdPatternsSubscriptionOutputEventsByComponentIdPatterns::Trace(ev) => { + println!("{}", formatter.format(ev.component_id.as_ref(), ev.component_kind.as_ref(), ev.component_type.as_ref(), ev.string.as_ref())); + }, + OutputEventsByComponentIdPatternsSubscriptionOutputEventsByComponentIdPatterns::EventNotification(ev) => { + if !opts.quiet { + eprintln!("{}", ev.message); + } + }, + } } } } - } else { - return exitcode::TEMPFAIL; + Err(_) => + // If the stream times out, that indicates the duration specified by the user + // has elapsed. We should exit gracefully. + { + return exitcode::OK + } + Ok(_) => return exitcode::TEMPFAIL, } } } diff --git a/src/tap/mod.rs b/src/tap/mod.rs index 0b93f5a8704c3..e365427abe0d7 100644 --- a/src/tap/mod.rs +++ b/src/tap/mod.rs @@ -52,6 +52,10 @@ pub struct Opts { /// Whether to reconnect if the underlying API connection drops. By default, tap will attempt to reconnect if the connection drops. #[arg(short, long)] no_reconnect: bool, + + /// Specifies a duration (in milliseconds) to sample logs (e.g. specifying 10000 will sample logs for 10 seconds then exit) + #[arg(short = 'd', long)] + duration_ms: Option, } impl Opts { diff --git a/website/cue/reference/cli.cue b/website/cue/reference/cli.cue index 569b7572cb182..0a53739aa0363 100644 --- a/website/cue/reference/cli.cue +++ b/website/cue/reference/cli.cue @@ -307,6 +307,10 @@ cli: { _short: "n" description: "Whether to reconnect if the underlying Vector API connection drops. By default, tap will attempt to reconnect if the connection drops." } + "duration_ms": { + _short: "d" + description: "Specifies a duration (in milliseconds) to sample logs (e.g. passing in 10000 will sample logs for 10 seconds then exit)." + } } options: {