Skip to content

Commit

Permalink
add listener and cli subcommand "live"
Browse files Browse the repository at this point in the history
  • Loading branch information
digizeph committed Apr 29, 2024
1 parent bbd7216 commit eb5a578
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 87 deletions.
54 changes: 42 additions & 12 deletions src/cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod bootstrap;
use crate::api::{start_api_service, BrokerSearchQuery};
use crate::backup::backup_database;
use crate::bootstrap::download_file;
use bgpkit_broker::notifier::Notifier;
use bgpkit_broker::notifier::NatsNotifier;
use bgpkit_broker::{
crawl_collector, load_collectors, BgpkitBroker, Collector, LocalBrokerDb, DEFAULT_PAGE_SIZE,
};
Expand Down Expand Up @@ -180,6 +180,18 @@ enum Commands {
#[clap(short, long)]
json: bool,
},

/// Streaming live from a broker NATS server
Live {
/// URL to NATS server, e.g. nats://localhost:4222.
/// If not specified, will try to read from BGPKIT_BROKER_NATS_URL env variable.
#[clap(short, long)]
url: Option<String>,

/// Subject to subscribe to, default to public.broker.>
#[clap(short, long)]
subject: Option<String>,
},
}

fn min_update_interval_check(s: &str) -> Result<u64, String> {
Expand Down Expand Up @@ -208,7 +220,7 @@ async fn update_database(
db: LocalBrokerDb,
collectors: Vec<Collector>,
days: Option<u32>,
notifier: Notifier,
notifier: Option<NatsNotifier>,
) {
let now = Utc::now();
let latest_date;
Expand Down Expand Up @@ -259,8 +271,10 @@ async fn update_database(
Ok(items) => {
let inserted = db.insert_items(&items, true).await.unwrap();
if !inserted.is_empty() {
if let Err(e) = notifier.notify_items(&inserted).await {
error!("{}", e);
if let Some(n) = &notifier {
if let Err(e) = n.send(&inserted).await {
error!("{}", e);
}
}
}
total_inserted_count += inserted.len();
Expand Down Expand Up @@ -360,14 +374,9 @@ fn main() {

loop {
interval.tick().await;
let notifier = NatsNotifier::new(None).await.ok();
// updating from the latest data available
update_database(
db.clone(),
collectors.clone(),
None,
Notifier::new().await,
)
.await;
update_database(db.clone(), collectors.clone(), None, notifier).await;
info!("wait for {} seconds before next update", update_interval);
}
});
Expand Down Expand Up @@ -481,7 +490,7 @@ fn main() {

rt.block_on(async {
let db = LocalBrokerDb::new(&db_path).await.unwrap();
update_database(db, collectors, days, Notifier::new().await).await;
update_database(db, collectors, days, NatsNotifier::new(None).await.ok()).await;
});
}
Commands::Search { query, json, url } => {
Expand Down Expand Up @@ -572,5 +581,26 @@ fn main() {
println!("{}", Table::new(items).with(Style::markdown()));
}
}
Commands::Live { url, subject } => {
dotenvy::dotenv().ok();
enable_logging();
let rt = get_tokio_runtime();
rt.block_on(async {
let mut notifier = match NatsNotifier::new(url).await {
Ok(n) => n,
Err(e) => {
error!("{}", e);
return;
}
};
if let Err(e) = notifier.start_subscription(subject).await {
error!("{}", e);
return;
}
while let Some(item) = notifier.next().await {
println!("{}", item);
}
});
}
}
}
36 changes: 2 additions & 34 deletions src/notifier/mod.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,5 @@
use crate::notifier::nats::NatsNotifier;
use crate::{BrokerError, BrokerItem};
use tracing::info;

#[cfg(feature = "nats")]
mod nats;

pub struct Notifier {
#[cfg(feature = "nats")]
nats: Option<NatsNotifier>,
}

impl Notifier {
pub async fn new() -> Self {
Self {
#[cfg(feature = "nats")]
nats: {
match NatsNotifier::new().await {
Ok(n) => Some(n),
Err(e) => {
info!("NATS notifier not available: {}", e);
None
}
}
},
}
}

pub async fn notify_items(&self, items: &Vec<BrokerItem>) -> Result<(), BrokerError> {
#[cfg(feature = "nats")]
if let Some(nats) = &self.nats {
nats.notify_items(items).await?;
}

Ok(())
}
}
#[cfg(feature = "nats")]
pub use nats::NatsNotifier;
126 changes: 85 additions & 41 deletions src/notifier/nats.rs
Original file line number Diff line number Diff line change
@@ -1,56 +1,56 @@
use crate::{BrokerError, BrokerItem};
use async_nats::Subscriber;
use futures::StreamExt;
use tracing::info;

pub struct NatsNotifier {
client: async_nats::Client,
root_subject: String,
subscriber: Option<Subscriber>,
}

fn item_to_subject(item: &BrokerItem) -> String {
fn item_to_subject(root_subject: &str, item: &BrokerItem) -> String {
let project = match item.collector_id.starts_with("rrc") {
true => "riperis",
false => "route-views",
};

let subject = root_subject.strip_suffix('.').unwrap_or(root_subject);

format!(
"public.broker.{}.{}.{}",
project, item.collector_id, item.data_type
"{}.{}.{}.{}",
subject, project, item.collector_id, item.data_type
)
}

impl NatsNotifier {
/// Creates a new NATS notifier.
pub async fn new() -> Result<Self, BrokerError> {
pub async fn new(url: Option<String>) -> Result<Self, BrokerError> {
dotenvy::dotenv().ok();

let url = match dotenvy::var("BGPKIT_BROKER_NATS_URL") {
Ok(url) => url,
Err(_) => {
return Err(BrokerError::NotifierError(
"BGPKIT_BROKER_NATS_URL env variable not set".to_string(),
))
}
};
let user = match dotenvy::var("BGPKIT_BROKER_NATS_USER") {
Ok(user) => user,
Err(_) => {
return Err(BrokerError::NotifierError(
"BGPKIT_BROKER_NATS_USER env variable not set".to_string(),
))
}
};
let pass = match dotenvy::var("BGPKIT_BROKER_NATS_PASS") {
Ok(pass) => pass,
Err(_) => {
return Err(BrokerError::NotifierError(
"BGPKIT_BROKER_NATS_PASS env variable not set".to_string(),
))
}
let url = match url {
None => match dotenvy::var("BGPKIT_BROKER_NATS_URL") {
Ok(url) => url,
Err(_) => {
return Err(BrokerError::NotifierError(
"BGPKIT_BROKER_NATS_URL env variable not set".to_string(),
))
}
},
Some(u) => u,
};

let client = match async_nats::ConnectOptions::with_user_and_password(user, pass)
.connect(url)
.await
{
Ok(c) => c,
let root_subject = dotenvy::var("BGPKIT_BROKER_NATS_ROOT_SUBJECT")
.unwrap_or_else(|_| "public.broker".to_string());

let client = match async_nats::connect(url).await {
Ok(c) => {
info!(
"successfully connected to NATS server with root subject: {}",
root_subject
);
c
}
Err(e) => {
return Err(BrokerError::BrokerError(format!(
"NATS connection error: {}",
Expand All @@ -59,7 +59,11 @@ impl NatsNotifier {
}
};

Ok(Self { client })
Ok(Self {
client,
root_subject,
subscriber: None,
})
}

/// Publishes broker items to NATS server.
Expand All @@ -71,10 +75,10 @@ impl NatsNotifier {
/// # Errors
///
/// Returns an `async_nats::Error` if there was an error during the publishing process.
pub async fn notify_items(&self, items: &Vec<BrokerItem>) -> Result<(), BrokerError> {
pub async fn send(&self, items: &[BrokerItem]) -> Result<(), BrokerError> {
for item in items {
let item_str = serde_json::to_string(item)?;
let subject = item_to_subject(item);
let subject = item_to_subject(self.root_subject.as_str(), item);
if let Err(e) = self.client.publish(subject, item_str.into()).await {
return Err(BrokerError::NotifierError(format!(
"NATS publish error: {}",
Expand All @@ -90,17 +94,42 @@ impl NatsNotifier {
};
Ok(())
}

pub async fn start_subscription(&mut self, subject: Option<String>) -> Result<(), BrokerError> {
let sub = match subject {
Some(s) => s,
None => format!("{}.>", self.root_subject),
};

match self.client.subscribe(sub.clone()).await {
Ok(subscriber) => {
info!("subscribed to NATS subject: {}", sub);
self.subscriber = Some(subscriber);
Ok(())
}
Err(e) => Err(BrokerError::BrokerError(format!(
"NATS subscription error: {}",
e
))),
}
}

pub async fn next(&mut self) -> Option<BrokerItem> {
match self.subscriber.as_mut() {
None => None,
Some(s) => s.next().await.map(|msg| {
let msg_text = std::str::from_utf8(msg.payload.as_ref()).unwrap();
serde_json::from_str(msg_text).unwrap()
}),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_connection() {
let notifier = NatsNotifier::new().await.unwrap();
dbg!(notifier.client.connection_state());

async fn send_test_item(notifier: &NatsNotifier) {
let item = BrokerItem {
ts_start: Default::default(),
ts_end: Default::default(),
Expand All @@ -110,6 +139,21 @@ mod tests {
rough_size: 100,
exact_size: 101,
};
dbg!(notifier.notify_items(&vec![item]).await).ok();
notifier.send(&[item]).await.unwrap();
}

#[tokio::test]
async fn test_connection() {
let notifier = NatsNotifier::new(None).await.unwrap();
dbg!(notifier.client.connection_state());
send_test_item(&notifier).await;
}

#[tokio::test]
async fn test_subscribe() {
let mut notifier = NatsNotifier::new(None).await.unwrap();
notifier.start_subscription(&None).await.unwrap();
let item: BrokerItem = notifier.next().await.unwrap();
dbg!(&item);
}
}

0 comments on commit eb5a578

Please sign in to comment.