diff --git a/rust-toolchain.toml b/rust-toolchain.toml index be2896e..542be07 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,4 @@ [toolchain] - channel = "1.80.1" + channel = "1.83.0" components = ["rustfmt", "clippy"] profile = "default" diff --git a/src/cmd/configfile.rs b/src/cmd/configfile.rs index d174981..d97211c 100644 --- a/src/cmd/configfile.rs +++ b/src/cmd/configfile.rs @@ -222,6 +222,22 @@ pub fn run(config: &Configuration) { {{/each}} ] {{/each}} + + +# Callback commands. +# +# These are commands that are triggered by certain events (e.g. MQTT connected +# or error). These commands are intended to e.g. trigger a LED of a gateway. +# Commands are configured as an array, where the first item is the path to the +# command, and the (optional) remaining elements are the arguments. An empty +# array disables the callback. +[callbacks] + + # On MQTT connected. + on_mqtt_connected=[] + + # On MQTT connection error. + on_mqtt_connection_error=[] "#; let reg = Handlebars::new(); diff --git a/src/commands.rs b/src/commands.rs index 87679e3..14fdeb9 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -3,7 +3,7 @@ use std::process::Stdio; use anyhow::Result; use chirpstack_api::gw; -use log::info; +use log::{error, info}; use once_cell::sync::OnceCell; use tokio::io::AsyncWriteExt; use tokio::process::Command; @@ -74,9 +74,53 @@ pub async fn exec(pl: &gw::GatewayCommandExecRequest) -> Result 1 { + cmd.args(&cmd_args[1..]); + } + + if let Err(e) = cmd.output().await { + error!( + "Execute callback error, callback: {:?}, error: {}", + cmd_args, e + ); + } + } + }); +} + #[cfg(test)] mod test { use super::*; + use std::{env, fs}; + + #[tokio::test] + async fn test_exec_callback() { + let temp_file = env::temp_dir().join("test.txt"); + fs::write(&temp_file, vec![]).unwrap(); + assert!(fs::exists(&temp_file).unwrap()); + + exec_callback(&[ + "rm".into(), + temp_file.clone().into_os_string().into_string().unwrap(), + ]) + .await; + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + assert_eq!(false, fs::exists(&temp_file).unwrap()); + } #[tokio::test] async fn test_commands() { diff --git a/src/config.rs b/src/config.rs index 44b585d..0b6d2f9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -13,6 +13,7 @@ pub struct Configuration { pub backend: Backend, pub metadata: Metadata, pub commands: HashMap>, + pub callbacks: Callbacks, } impl Configuration { @@ -168,3 +169,10 @@ pub struct Metadata { pub r#static: HashMap, pub commands: HashMap>, } + +#[derive(Serialize, Deserialize, Default)] +#[serde(default)] +pub struct Callbacks { + pub on_mqtt_connected: Vec, + pub on_mqtt_connection_error: Vec, +} diff --git a/src/mqtt.rs b/src/mqtt.rs index 7eb34a1..2dbeb34 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -208,6 +208,9 @@ pub async fn setup(conf: &Configuration) -> Result<()> { // Eventloop tokio::spawn({ + let on_mqtt_connected = conf.callbacks.on_mqtt_connected.clone(); + let on_mqtt_connection_error = conf.callbacks.on_mqtt_connection_error.clone(); + async move { info!("Starting MQTT event loop"); @@ -228,6 +231,8 @@ pub async fn setup(conf: &Configuration) -> Result<()> { } Event::Incoming(Incoming::ConnAck(v)) => { if v.code == ConnectReturnCode::Success { + commands::exec_callback(&on_mqtt_connected).await; + if let Err(e) = connect_tx.try_send(()) { error!("Send to subscribe channel error, error: {}", e); } @@ -240,6 +245,8 @@ pub async fn setup(conf: &Configuration) -> Result<()> { } } Err(e) => { + commands::exec_callback(&on_mqtt_connection_error).await; + error!("MQTT error, error: {}", e); sleep(Duration::from_secs(1)).await }