From 16f3dc3c1b1b1a9494fc18594929e20da8f28952 Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Tue, 13 Aug 2024 14:18:23 -0700 Subject: [PATCH] pass in notifier to update_database func --- src/cli/main.rs | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/src/cli/main.rs b/src/cli/main.rs index 99fe7d2..45b8f83 100644 --- a/src/cli/main.rs +++ b/src/cli/main.rs @@ -247,16 +247,9 @@ async fn update_database( mut db: LocalBrokerDb, collectors: Vec, days: Option, + notifier: &Option, send_heartbeat: bool, ) { - let notifier = match NatsNotifier::new(None).await { - Ok(n) => Some(n), - Err(e) => { - error!("want to set up notifier but failed: {}", e); - None - } - }; - let now = Utc::now(); let latest_ts_map: HashMap = db @@ -311,7 +304,7 @@ async fn update_database( Ok(items) => { let inserted = db.insert_items(&items, true).await.unwrap(); if !inserted.is_empty() { - if let Some(n) = ¬ifier { + if let Some(n) = notifier { if let Err(e) = n.send(&inserted).await { error!("{}", e); } @@ -412,8 +405,15 @@ fn main() { let rt = get_tokio_runtime(); let collectors = load_collectors().unwrap(); - rt.block_on(async { + let notifier = match NatsNotifier::new(None).await { + Ok(n) => Some(n), + Err(_e) => { + info!("no nats notifier configured, skip pushing notification"); + None + } + }; + let db = LocalBrokerDb::new(path.as_str()).await.unwrap(); let mut interval = tokio::time::interval(std::time::Duration::from_secs(update_interval)); @@ -421,7 +421,8 @@ fn main() { loop { interval.tick().await; // updating from the latest data available - update_database(db.clone(), collectors.clone(), None, true).await; + update_database(db.clone(), collectors.clone(), None, ¬ifier, true) + .await; info!("wait for {} seconds before next update", update_interval); } }); @@ -535,7 +536,14 @@ fn main() { rt.block_on(async { let db = LocalBrokerDb::new(&db_path).await.unwrap(); - update_database(db, collectors, days, false).await; + let notifier = match NatsNotifier::new(None).await { + Ok(n) => Some(n), + Err(_e) => { + info!("no nats notifier configured, skip pushing notification"); + None + } + }; + update_database(db, collectors, days, ¬ifier, false).await; }); } Commands::Search { query, json, url } => { @@ -644,6 +652,7 @@ fn main() { return; } }; + if let Err(e) = notifier.start_subscription(subject).await { error!("{}", e); return;