From eb5a578ab55ddb6c6114442331fd658b550ee762 Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Mon, 29 Apr 2024 14:47:17 -0700 Subject: [PATCH] add listener and cli subcommand "live" --- src/cli/main.rs | 54 ++++++++++++++----- src/notifier/mod.rs | 36 +------------ src/notifier/nats.rs | 126 +++++++++++++++++++++++++++++-------------- 3 files changed, 129 insertions(+), 87 deletions(-) diff --git a/src/cli/main.rs b/src/cli/main.rs index 0d9a78b..0c78044 100644 --- a/src/cli/main.rs +++ b/src/cli/main.rs @@ -5,7 +5,7 @@ mod bootstrap; use crate::api::{start_api_service, BrokerSearchQuery}; use crate::backup::backup_database; use crate::bootstrap::download_file; -use bgpkit_broker::notifier::Notifier; +use bgpkit_broker::notifier::NatsNotifier; use bgpkit_broker::{ crawl_collector, load_collectors, BgpkitBroker, Collector, LocalBrokerDb, DEFAULT_PAGE_SIZE, }; @@ -180,6 +180,18 @@ enum Commands { #[clap(short, long)] json: bool, }, + + /// Streaming live from a broker NATS server + Live { + /// URL to NATS server, e.g. nats://localhost:4222. + /// If not specified, will try to read from BGPKIT_BROKER_NATS_URL env variable. + #[clap(short, long)] + url: Option, + + /// Subject to subscribe to, default to public.broker.> + #[clap(short, long)] + subject: Option, + }, } fn min_update_interval_check(s: &str) -> Result { @@ -208,7 +220,7 @@ async fn update_database( db: LocalBrokerDb, collectors: Vec, days: Option, - notifier: Notifier, + notifier: Option, ) { let now = Utc::now(); let latest_date; @@ -259,8 +271,10 @@ async fn update_database( Ok(items) => { let inserted = db.insert_items(&items, true).await.unwrap(); if !inserted.is_empty() { - if let Err(e) = notifier.notify_items(&inserted).await { - error!("{}", e); + if let Some(n) = ¬ifier { + if let Err(e) = n.send(&inserted).await { + error!("{}", e); + } } } total_inserted_count += inserted.len(); @@ -360,14 +374,9 @@ fn main() { loop { interval.tick().await; + let notifier = NatsNotifier::new(None).await.ok(); // updating from the latest data available - update_database( - db.clone(), - collectors.clone(), - None, - Notifier::new().await, - ) - .await; + update_database(db.clone(), collectors.clone(), None, notifier).await; info!("wait for {} seconds before next update", update_interval); } }); @@ -481,7 +490,7 @@ fn main() { rt.block_on(async { let db = LocalBrokerDb::new(&db_path).await.unwrap(); - update_database(db, collectors, days, Notifier::new().await).await; + update_database(db, collectors, days, NatsNotifier::new(None).await.ok()).await; }); } Commands::Search { query, json, url } => { @@ -572,5 +581,26 @@ fn main() { println!("{}", Table::new(items).with(Style::markdown())); } } + Commands::Live { url, subject } => { + dotenvy::dotenv().ok(); + enable_logging(); + let rt = get_tokio_runtime(); + rt.block_on(async { + let mut notifier = match NatsNotifier::new(url).await { + Ok(n) => n, + Err(e) => { + error!("{}", e); + return; + } + }; + if let Err(e) = notifier.start_subscription(subject).await { + error!("{}", e); + return; + } + while let Some(item) = notifier.next().await { + println!("{}", item); + } + }); + } } } diff --git a/src/notifier/mod.rs b/src/notifier/mod.rs index f8adce8..745f978 100644 --- a/src/notifier/mod.rs +++ b/src/notifier/mod.rs @@ -1,37 +1,5 @@ -use crate::notifier::nats::NatsNotifier; -use crate::{BrokerError, BrokerItem}; -use tracing::info; - #[cfg(feature = "nats")] mod nats; -pub struct Notifier { - #[cfg(feature = "nats")] - nats: Option, -} - -impl Notifier { - pub async fn new() -> Self { - Self { - #[cfg(feature = "nats")] - nats: { - match NatsNotifier::new().await { - Ok(n) => Some(n), - Err(e) => { - info!("NATS notifier not available: {}", e); - None - } - } - }, - } - } - - pub async fn notify_items(&self, items: &Vec) -> Result<(), BrokerError> { - #[cfg(feature = "nats")] - if let Some(nats) = &self.nats { - nats.notify_items(items).await?; - } - - Ok(()) - } -} +#[cfg(feature = "nats")] +pub use nats::NatsNotifier; diff --git a/src/notifier/nats.rs b/src/notifier/nats.rs index 6c800fe..5a09a4e 100644 --- a/src/notifier/nats.rs +++ b/src/notifier/nats.rs @@ -1,56 +1,56 @@ use crate::{BrokerError, BrokerItem}; +use async_nats::Subscriber; +use futures::StreamExt; +use tracing::info; pub struct NatsNotifier { client: async_nats::Client, + root_subject: String, + subscriber: Option, } -fn item_to_subject(item: &BrokerItem) -> String { +fn item_to_subject(root_subject: &str, item: &BrokerItem) -> String { let project = match item.collector_id.starts_with("rrc") { true => "riperis", false => "route-views", }; + let subject = root_subject.strip_suffix('.').unwrap_or(root_subject); + format!( - "public.broker.{}.{}.{}", - project, item.collector_id, item.data_type + "{}.{}.{}.{}", + subject, project, item.collector_id, item.data_type ) } impl NatsNotifier { /// Creates a new NATS notifier. - pub async fn new() -> Result { + pub async fn new(url: Option) -> Result { dotenvy::dotenv().ok(); - let url = match dotenvy::var("BGPKIT_BROKER_NATS_URL") { - Ok(url) => url, - Err(_) => { - return Err(BrokerError::NotifierError( - "BGPKIT_BROKER_NATS_URL env variable not set".to_string(), - )) - } - }; - let user = match dotenvy::var("BGPKIT_BROKER_NATS_USER") { - Ok(user) => user, - Err(_) => { - return Err(BrokerError::NotifierError( - "BGPKIT_BROKER_NATS_USER env variable not set".to_string(), - )) - } - }; - let pass = match dotenvy::var("BGPKIT_BROKER_NATS_PASS") { - Ok(pass) => pass, - Err(_) => { - return Err(BrokerError::NotifierError( - "BGPKIT_BROKER_NATS_PASS env variable not set".to_string(), - )) - } + let url = match url { + None => match dotenvy::var("BGPKIT_BROKER_NATS_URL") { + Ok(url) => url, + Err(_) => { + return Err(BrokerError::NotifierError( + "BGPKIT_BROKER_NATS_URL env variable not set".to_string(), + )) + } + }, + Some(u) => u, }; - let client = match async_nats::ConnectOptions::with_user_and_password(user, pass) - .connect(url) - .await - { - Ok(c) => c, + let root_subject = dotenvy::var("BGPKIT_BROKER_NATS_ROOT_SUBJECT") + .unwrap_or_else(|_| "public.broker".to_string()); + + let client = match async_nats::connect(url).await { + Ok(c) => { + info!( + "successfully connected to NATS server with root subject: {}", + root_subject + ); + c + } Err(e) => { return Err(BrokerError::BrokerError(format!( "NATS connection error: {}", @@ -59,7 +59,11 @@ impl NatsNotifier { } }; - Ok(Self { client }) + Ok(Self { + client, + root_subject, + subscriber: None, + }) } /// Publishes broker items to NATS server. @@ -71,10 +75,10 @@ impl NatsNotifier { /// # Errors /// /// Returns an `async_nats::Error` if there was an error during the publishing process. - pub async fn notify_items(&self, items: &Vec) -> Result<(), BrokerError> { + pub async fn send(&self, items: &[BrokerItem]) -> Result<(), BrokerError> { for item in items { let item_str = serde_json::to_string(item)?; - let subject = item_to_subject(item); + let subject = item_to_subject(self.root_subject.as_str(), item); if let Err(e) = self.client.publish(subject, item_str.into()).await { return Err(BrokerError::NotifierError(format!( "NATS publish error: {}", @@ -90,17 +94,42 @@ impl NatsNotifier { }; Ok(()) } + + pub async fn start_subscription(&mut self, subject: Option) -> Result<(), BrokerError> { + let sub = match subject { + Some(s) => s, + None => format!("{}.>", self.root_subject), + }; + + match self.client.subscribe(sub.clone()).await { + Ok(subscriber) => { + info!("subscribed to NATS subject: {}", sub); + self.subscriber = Some(subscriber); + Ok(()) + } + Err(e) => Err(BrokerError::BrokerError(format!( + "NATS subscription error: {}", + e + ))), + } + } + + pub async fn next(&mut self) -> Option { + match self.subscriber.as_mut() { + None => None, + Some(s) => s.next().await.map(|msg| { + let msg_text = std::str::from_utf8(msg.payload.as_ref()).unwrap(); + serde_json::from_str(msg_text).unwrap() + }), + } + } } #[cfg(test)] mod tests { use super::*; - #[tokio::test] - async fn test_connection() { - let notifier = NatsNotifier::new().await.unwrap(); - dbg!(notifier.client.connection_state()); - + async fn send_test_item(notifier: &NatsNotifier) { let item = BrokerItem { ts_start: Default::default(), ts_end: Default::default(), @@ -110,6 +139,21 @@ mod tests { rough_size: 100, exact_size: 101, }; - dbg!(notifier.notify_items(&vec![item]).await).ok(); + notifier.send(&[item]).await.unwrap(); + } + + #[tokio::test] + async fn test_connection() { + let notifier = NatsNotifier::new(None).await.unwrap(); + dbg!(notifier.client.connection_state()); + send_test_item(¬ifier).await; + } + + #[tokio::test] + async fn test_subscribe() { + let mut notifier = NatsNotifier::new(None).await.unwrap(); + notifier.start_subscription(&None).await.unwrap(); + let item: BrokerItem = notifier.next().await.unwrap(); + dbg!(&item); } }