Skip to content

Commit

Permalink
(feat reload components) Handle Vec<ComponentKey> instead of single one
Browse files Browse the repository at this point in the history
  • Loading branch information
gllb committed Mar 3, 2025
1 parent 8a7ad04 commit c941b7f
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 19 deletions.
6 changes: 6 additions & 0 deletions lib/vector-common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ impl ComponentKey {
}
}

impl AsRef<ComponentKey> for ComponentKey {
fn as_ref(&self) -> &ComponentKey {
&self
}
}

impl From<String> for ComponentKey {
fn from(id: String) -> Self {
Self { id }
Expand Down
8 changes: 4 additions & 4 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ async fn handle_signal(
allow_empty_config: bool,
) -> Option<SignalTo> {
match signal {
Ok(SignalTo::ReloadComponent(component_key)) => {
Ok(SignalTo::ReloadComponents(component_keys)) => {
let mut topology_controller = topology_controller.lock().await;

// Reload paths
Expand All @@ -355,7 +355,7 @@ async fn handle_signal(
reload_config_from_result(
topology_controller,
new_config,
Some(&component_key))
Some(component_keys.iter().map(AsRef::as_ref).collect()))
.await
}
Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
Expand Down Expand Up @@ -392,10 +392,10 @@ async fn handle_signal(
async fn reload_config_from_result(
mut topology_controller: MutexGuard<'_, TopologyController>,
config: Result<Config, Vec<String>>,
component_to_reload: Option<&ComponentKey>
components_to_reload: Option<Vec<&ComponentKey>>
) -> Option<SignalTo> {
match config {
Ok(new_config) => match topology_controller.reload(new_config, component_to_reload).await {
Ok(new_config) => match topology_controller.reload(new_config, components_to_reload).await {
ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
_ => None,
},
Expand Down
10 changes: 4 additions & 6 deletions src/config/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,10 @@ pub fn spawn_thread<'a>(

info!("Configuration file changed.");
if component_keys.len() > 0 {
for component_key in component_keys {
info!("Component {} configuration changed.", component_key);
_ = signal_tx.send(crate::signal::SignalTo::ReloadComponent(component_key)).map_err(|error| {
error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error)
});
}
info!("Component {:?} configuration changed.", component_keys);
_ = signal_tx.send(crate::signal::SignalTo::ReloadComponents(component_keys)).map_err(|error| {
error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error)
});
} else {
_ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk)
.map_err(|error| {
Expand Down
4 changes: 2 additions & 2 deletions src/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ pub type SignalRx = broadcast::Receiver<SignalTo>;
/// Control messages used by Vector to drive topology and shutdown events.
#[allow(clippy::large_enum_variant)] // discovered during Rust upgrade to 1.57; just allowing for now since we did previously
pub enum SignalTo {
/// Signal to reload a given component.
ReloadComponent(ComponentKey),
/// Signal to reload given components.
ReloadComponents(Vec<ComponentKey>),
/// Signal to reload config from a string.
ReloadFromConfigBuilder(ConfigBuilder),
/// Signal to reload config from the filesystem.
Expand Down
4 changes: 2 additions & 2 deletions src/topology/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub enum ReloadOutcome {
impl TopologyController {
pub async fn reload(
&mut self, mut new_config: config::Config,
component_to_reload: Option<&config::ComponentKey>
components_to_reload: Option<Vec<&config::ComponentKey>>
) -> ReloadOutcome {
new_config
.healthchecks
Expand Down Expand Up @@ -106,7 +106,7 @@ impl TopologyController {

match self
.topology
.reload_config_and_respawn(new_config, self.extra_context.clone(), component_to_reload)
.reload_config_and_respawn(new_config, self.extra_context.clone(), components_to_reload)
.await
{
Ok(true) => {
Expand Down
10 changes: 5 additions & 5 deletions src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl RunningTopology {
&mut self,
new_config: Config,
extra_context: ExtraContext,
component_to_reload: Option<&ComponentKey>,
components_to_reload: Option<Vec<&ComponentKey>>,
) -> Result<bool, ()> {
info!("Reloading running topology with new configuration.");

Expand All @@ -245,7 +245,7 @@ impl RunningTopology {
//
// We also shutdown any component that is simply being removed entirely.
let diff = ConfigDiff::new(&self.config, &new_config);
let buffers = self.shutdown_diff(&diff, &new_config, component_to_reload).await;
let buffers = self.shutdown_diff(&diff, &new_config, components_to_reload).await;

// Gives windows some time to make available any port
// released by shutdown components.
Expand Down Expand Up @@ -350,7 +350,7 @@ impl RunningTopology {
&mut self,
diff: &ConfigDiff,
new_config: &Config,
component_to_reload: Option<&ComponentKey>,
components_to_reload: Option<Vec<&ComponentKey>>,
) -> HashMap<ComponentKey, BuiltBuffer> {
// First, we shutdown any changed/removed sources. This ensures that we can allow downstream
// components to terminate naturally by virtue of the flow of events stopping.
Expand Down Expand Up @@ -546,8 +546,8 @@ impl RunningTopology {
}))
.collect::<Vec<_>>();

match component_to_reload {
Some(component) => sinks_to_change.push(component),
match components_to_reload {
Some(mut components) => sinks_to_change.append(&mut components),
_ => (),
}

Expand Down

0 comments on commit c941b7f

Please sign in to comment.