Skip to content

Commit

Permalink
solve clippy issues
Browse files Browse the repository at this point in the history
  • Loading branch information
gllb committed Mar 6, 2025
1 parent d67394c commit 214cce4
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 38 deletions.
2 changes: 1 addition & 1 deletion lib/vector-common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl ComponentKey {

impl AsRef<ComponentKey> for ComponentKey {
fn as_ref(&self) -> &ComponentKey {
&self
self
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub struct ComponentConfig {
}

impl ComponentConfig {
pub fn new(config_paths: Vec<PathBuf>, component_key: ComponentKey) -> Self {
pub const fn new(config_paths: Vec<PathBuf>, component_key: ComponentKey) -> Self {
Self {
config_paths,
component_key,
Expand Down
19 changes: 9 additions & 10 deletions src/config/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ pub fn spawn_thread<'a>(
let mut component_config_paths: Vec<_> = component_configs
.clone()
.into_iter()
.map(|p| p.config_paths.clone())
.flatten()
.flat_map(|p| p.config_paths.clone())
.collect();

config_paths.append(&mut component_config_paths);
Expand Down Expand Up @@ -107,8 +106,7 @@ pub fn spawn_thread<'a>(
let component_keys: Vec<_> = component_configs
.clone()
.into_iter()
.map(|p| p.contains(&event.paths))
.flatten()
.flat_map(|p| p.contains(&event.paths))
.collect();

// We need to read paths to resolve any inode changes that may have happened.
Expand All @@ -121,7 +119,7 @@ pub fn spawn_thread<'a>(
debug!(message = "Reloaded paths.");

info!("Configuration file changed.");
if component_keys.len() > 0 {
if !component_keys.is_empty() {
info!("Component {:?} configuration changed.", component_keys);
_ = signal_tx.send(crate::signal::SignalTo::ReloadComponents(component_keys)).map_err(|error| {
error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error)
Expand Down Expand Up @@ -187,6 +185,7 @@ mod tests {
use crate::{
signal::SignalRx,
test_util::{temp_dir, temp_file, trace_init},
config::ComponentKey,
};
use std::{fs::File, io::Write, time::Duration};
use tokio::sync::broadcast;
Expand All @@ -209,14 +208,14 @@ mod tests {
let dir = temp_dir().to_path_buf();
let file_path = dir.join("vector.toml");
let watcher_conf = WatcherConfig::RecommendedWatcher;
let component_file_path = Vec::new(dir.join("tls.cert"), dir.join("tls.key"));
let component_file_path = vec![dir.join("tls.cert"), dir.join("tls.key")];
let component_config =
ComponentConfig::new(component_file_path, ComponentKey::from("http"));
std::fs::create_dir(&dir).unwrap();
let mut file = File::create(&file_path).unwrap();

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(watcher_conf, signal_tx, &[dir], component_file_path, delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[dir], vec![component_config], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand All @@ -233,7 +232,7 @@ mod tests {
let watcher_conf = WatcherConfig::RecommendedWatcher;

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(watcher_conf, signal_tx, &[file_path], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[file_path], vec![], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand All @@ -254,7 +253,7 @@ mod tests {
let watcher_conf = WatcherConfig::RecommendedWatcher;

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(watcher_conf, signal_tx, &[sym_file], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[sym_file], vec![], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand All @@ -275,7 +274,7 @@ mod tests {
let mut file = File::create(&file_path).unwrap();

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(watcher_conf, signal_tx, &[sub_dir], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[sub_dir], vec![], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand Down
5 changes: 2 additions & 3 deletions src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,9 +548,8 @@ impl RunningTopology {
}))
.collect::<Vec<_>>();

match components_to_reload {
Some(mut components) => sinks_to_change.append(&mut components),
_ => (),
if let Some(mut components) = components_to_reload {
sinks_to_change.append(&mut components)
}

for key in &sinks_to_change {
Expand Down
2 changes: 1 addition & 1 deletion src/topology/test/doesnt_reload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn topology_doesnt_reload_new_data_dir() {
new_config.global.data_dir = Some(Path::new("/qwerty").to_path_buf());

topology
.reload_config_and_respawn(new_config.build().unwrap(), Default::default())
.reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None)
.await
.unwrap();

Expand Down
24 changes: 12 additions & 12 deletions src/topology/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ async fn topology_remove_one_source() {
config.add_sink("out1", &["in1"], sink1);

assert!(topology
.reload_config_and_respawn(config.build().unwrap(), Default::default())
.reload_config_and_respawn(config.build().unwrap(), Default::default(), None)
.await
.unwrap());

Expand Down Expand Up @@ -350,7 +350,7 @@ async fn topology_remove_one_sink() {
config.add_sink("out1", &["in1"], basic_sink(10).1);

assert!(topology
.reload_config_and_respawn(config.build().unwrap(), Default::default())
.reload_config_and_respawn(config.build().unwrap(), Default::default(), None)
.await
.unwrap());

Expand Down Expand Up @@ -403,7 +403,7 @@ async fn topology_remove_one_transform() {
config.add_sink("out1", &["t2"], sink2);

assert!(topology
.reload_config_and_respawn(config.build().unwrap(), Default::default())
.reload_config_and_respawn(config.build().unwrap(), Default::default(), None)
.await
.unwrap());

Expand Down Expand Up @@ -452,7 +452,7 @@ async fn topology_swap_source() {
config.add_sink("out1", &["in2"], sink2);

assert!(topology
.reload_config_and_respawn(config.build().unwrap(), Default::default())
.reload_config_and_respawn(config.build().unwrap(), Default::default(), None)
.await
.unwrap());

Expand Down Expand Up @@ -517,7 +517,7 @@ async fn topology_swap_transform() {
config.add_sink("out1", &["t1"], sink2);

assert!(topology
.reload_config_and_respawn(config.build().unwrap(), Default::default())
.reload_config_and_respawn(config.build().unwrap(), Default::default(), None)
.await
.unwrap());

Expand Down Expand Up @@ -569,7 +569,7 @@ async fn topology_swap_sink() {
config.add_sink("out1", &["in1"], sink2);

assert!(topology
.reload_config_and_respawn(config.build().unwrap(), Default::default())
.reload_config_and_respawn(config.build().unwrap(), Default::default(), None)
.await
.unwrap());

Expand Down Expand Up @@ -657,7 +657,7 @@ async fn topology_swap_transform_is_atomic() {
config.add_sink("out1", &["t1"], basic_sink(10).1);

assert!(topology
.reload_config_and_respawn(config.build().unwrap(), Default::default())
.reload_config_and_respawn(config.build().unwrap(), Default::default(), None)
.await
.unwrap());

Expand Down Expand Up @@ -693,7 +693,7 @@ async fn topology_rebuild_connected() {
config.add_sink("out1", &["in1"], sink1);

assert!(topology
.reload_config_and_respawn(config.build().unwrap(), Default::default())
.reload_config_and_respawn(config.build().unwrap(), Default::default(), None)
.await
.unwrap());

Expand Down Expand Up @@ -752,7 +752,7 @@ async fn topology_rebuild_connected_transform() {
config.add_sink("out1", &["t2"], sink2);

assert!(topology
.reload_config_and_respawn(config.build().unwrap(), Default::default())
.reload_config_and_respawn(config.build().unwrap(), Default::default(), None)
.await
.unwrap());

Expand Down Expand Up @@ -807,7 +807,7 @@ async fn topology_optional_healthcheck_does_not_fail_reload() {
let (mut topology, _) = start_topology(config, false).await;
let config = basic_config_with_sink_failing_healthcheck();
assert!(topology
.reload_config_and_respawn(config, Default::default())
.reload_config_and_respawn(config, Default::default(), None)
.await
.unwrap());
}
Expand All @@ -820,7 +820,7 @@ async fn topology_healthcheck_not_run_on_unchanged_reload() {
let mut config = basic_config_with_sink_failing_healthcheck();
config.healthchecks.require_healthy = true;
assert!(topology
.reload_config_and_respawn(config, Default::default())
.reload_config_and_respawn(config, Default::default(), None)
.await
.unwrap());
}
Expand All @@ -846,7 +846,7 @@ async fn topology_healthcheck_run_for_changes_on_reload() {
let mut config = config.build().unwrap();
config.healthchecks.require_healthy = true;
assert!(!topology
.reload_config_and_respawn(config, Default::default())
.reload_config_and_respawn(config, Default::default(), None)
.await
.unwrap());
}
Expand Down
12 changes: 6 additions & 6 deletions src/topology/test/reload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async fn topology_reuse_old_port() {

let (mut topology, _) = start_topology(old_config.build().unwrap(), false).await;
assert!(topology
.reload_config_and_respawn(new_config.build().unwrap(), Default::default())
.reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None)
.await
.unwrap());
}
Expand All @@ -90,7 +90,7 @@ async fn topology_rebuild_old() {

let (mut topology, _) = start_topology(old_config.build().unwrap(), false).await;
assert!(!topology
.reload_config_and_respawn(new_config.build().unwrap(), Default::default())
.reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None)
.await
.unwrap());
}
Expand All @@ -107,7 +107,7 @@ async fn topology_old() {

let (mut topology, _) = start_topology(old_config.clone().build().unwrap(), false).await;
assert!(topology
.reload_config_and_respawn(old_config.build().unwrap(), Default::default())
.reload_config_and_respawn(old_config.build().unwrap(), Default::default(), None)
.await
.unwrap());
}
Expand Down Expand Up @@ -258,7 +258,7 @@ async fn topology_readd_input() {
new_config.add_source("in2", internal_metrics_source());
new_config.add_sink("out", &["in1"], prom_exporter_sink(address_0, 1));
assert!(topology
.reload_config_and_respawn(new_config.build().unwrap(), Default::default())
.reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None)
.await
.unwrap());

Expand All @@ -268,7 +268,7 @@ async fn topology_readd_input() {
new_config.add_source("in2", internal_metrics_source());
new_config.add_sink("out", &["in1", "in2"], prom_exporter_sink(address_0, 1));
assert!(topology
.reload_config_and_respawn(new_config.build().unwrap(), Default::default())
.reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None)
.await
.unwrap());

Expand Down Expand Up @@ -299,7 +299,7 @@ async fn reload_sink_test(

// Now reload the topology with the "new" configuration, and make sure that a component is now listening on `new_address`.
assert!(topology
.reload_config_and_respawn(new_config, Default::default())
.reload_config_and_respawn(new_config, Default::default(), None)
.await
.unwrap());

Expand Down
8 changes: 4 additions & 4 deletions src/topology/test/transient_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn closed_source() {
topology.sources_finished().await;

assert!(topology
.reload_config_and_respawn(new_config.build().unwrap(), Default::default())
.reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None)
.await
.unwrap());
}
Expand All @@ -52,7 +52,7 @@ async fn remove_sink() {

let (mut topology, _) = start_topology(old_config.build().unwrap(), false).await;
assert!(topology
.reload_config_and_respawn(new_config.build().unwrap(), Default::default())
.reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None)
.await
.unwrap());
}
Expand All @@ -75,7 +75,7 @@ async fn remove_transform() {

let (mut topology, _) = start_topology(old_config.build().unwrap(), false).await;
assert!(topology
.reload_config_and_respawn(new_config.build().unwrap(), Default::default())
.reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None)
.await
.unwrap());
}
Expand All @@ -99,7 +99,7 @@ async fn replace_transform() {

let (mut topology, _) = start_topology(old_config.build().unwrap(), false).await;
assert!(topology
.reload_config_and_respawn(new_config.build().unwrap(), Default::default())
.reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None)
.await
.unwrap());
}

0 comments on commit 214cce4

Please sign in to comment.