Skip to content

Commit

Permalink
Merge pull request #4 from bgpkit/fix-collector-filtering
Browse files Browse the repository at this point in the history
Fix collector filtering
  • Loading branch information
digizeph authored Jul 10, 2022
2 parents 096f2c2 + 6347580 commit 25b3c3c
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 21 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bgpkit-broker"
version = "0.4.0"
version = "0.4.1"
edition = "2018"
authors = ["Mingwei Zhang <mingwei@bgpkit.com>"]
readme = "README.md"
Expand All @@ -21,4 +21,4 @@ chrono = { version = "0.4", features = ["serde"] }


[dev-dependencies]
env_logger = "0.9.0"
env_logger = "0.9.0"
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub enum BrokerError {
impl Display for BrokerError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
BrokerError::NetworkError(e) => {write!(f, "NETWORK_ERROR: {}", e.to_string())}
BrokerError::NetworkError(e) => {write!(f, "NETWORK_ERROR: {}", e)}
BrokerError::BrokerError(e) => {write!(f, "BROKER_ERROR: {}", e)}
}
}
Expand Down
59 changes: 43 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ impl BgpkitBroker {

/// Construct new BgpkitBroker given a broker URL.
pub fn new(broker_url: &str) -> Self {
let url = broker_url.trim_end_matches("/").to_string();
let url = broker_url.trim_end_matches('/').to_string();
Self { broker_url: url , query_params: QueryParams{..Default::default()}}
}

/// Construct new BgpkitBroker given a broker URL.
pub fn new_with_params(broker_url: &str, query_params: QueryParams) -> Self {
let url = broker_url.trim_end_matches("/").to_string();
let url = broker_url.trim_end_matches('/').to_string();
Self { broker_url: url , query_params}
}

Expand All @@ -101,9 +101,9 @@ impl BgpkitBroker {
let url = format!("{}/search{}", &self.broker_url, params);
log::info!("sending broker query to {}", &url);
match run_query(url.as_str()) {
Ok(res) => return Ok(res),
Err(e) => return Err(e)
};
Ok(res) => Ok(res),
Err(e) => Err(e)
}
}

/// Send query to get **all** data times returned.
Expand All @@ -116,13 +116,22 @@ impl BgpkitBroker {
Ok(res) => res,
Err(e) => {return Err(e)}
};
if res_items.len()==0 {

let items_count = res_items.len() as i64;

if items_count ==0 {
// reaches the end
break;
}

items.extend(res_items);
let cur_page = p.page;
p = p.page(cur_page+1);

if items_count < params.page_size {
// reaches the end
break;
}
}
Ok(items)
}
Expand All @@ -141,19 +150,19 @@ fn run_query(url: &str) -> Result<Vec<BrokerItem>, BrokerError>{
{
Ok(res) => {
if let Some(e) = res.error {
return Err(BrokerError::BrokerError(e));
Err(BrokerError::BrokerError(e))
} else {
Ok(res.data)
}
},
Err(e) => {
// json decoding error. most likely the service returns an error message without
// `data` field.
return Err(BrokerError::BrokerError(e.to_string()))
Err(BrokerError::BrokerError(e.to_string()))
}
}
}
Err(e) => { return Err(BrokerError::from(e)) }
Err(e) => { Err(BrokerError::from(e)) }
}
}

Expand Down Expand Up @@ -209,7 +218,7 @@ impl Iterator for BrokerItemIterator {
Ok(i) => i,
Err(_) => return None
};
if items.len()==0 {
if items.is_empty() {
// first run, nothing returned
return None
} else {
Expand All @@ -220,22 +229,22 @@ impl Iterator for BrokerItemIterator {
}

if let Some(item) = self.cached_items.pop() {
return Some(item)
Some(item)
} else {
self.query_params.page += 1;
let url = format!("{}/search{}", &self.broker_url, &self.query_params);
let items = match run_query(url.as_str()) {
Ok(i) => i,
Err(_) => return None
};
if items.len()==0 {
if items.is_empty() {
// first run, nothing returned
return None
} else {
self.cached_items = items;
self.cached_items.reverse();
}
return Some(self.cached_items.pop().unwrap())
Some(self.cached_items.pop().unwrap())
}
}
}
Expand All @@ -260,7 +269,6 @@ impl IntoIterator for &BgpkitBroker {

#[cfg(test)]
mod tests {
use env_logger::Env;
use super::*;

#[test]
Expand Down Expand Up @@ -312,8 +320,6 @@ mod tests {

#[test]
fn test_iterator() {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();

let broker = BgpkitBroker::new_with_params(
"https://api.broker.bgpkit.com/v2",
QueryParams{
Expand All @@ -334,4 +340,25 @@ mod tests {
});
assert_eq!(broker.into_iter().count(), 6);
}

#[test]
fn test_filters() {
let mut params = QueryParams {
ts_start: Some("1634693400".to_string()),
ts_end: Some("1634693400".to_string()),
..Default::default()
};
let broker = BgpkitBroker::new("https://api.broker.bgpkit.com/v2");
let items = broker.query_all(&params).unwrap();
assert_eq!(items.len(), 106);

params.collector_id = Some("rrc00".to_string());
let items = broker.query_all(&params).unwrap();
assert_eq!(items.len(), 2);

params.collector_id = None;
params.project = Some("riperis".to_string());
let items = broker.query_all(&params).unwrap();
assert_eq!(items.len(), 46);
}
}
4 changes: 2 additions & 2 deletions src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl std::fmt::Display for QueryParams {
params_vec.push(format!("ts_end={}", v));
}
if let Some(v) = &self.collector_id {
params_vec.push(format!("collector={}", v));
params_vec.push(format!("collector_id={}", v));
}
if let Some(v) = &self.project {
params_vec.push(format!("project={}", v));
Expand All @@ -96,7 +96,7 @@ impl std::fmt::Display for QueryParams {
params_vec.push(format!("page={}", self.page));
params_vec.push(format!("page_size={}", self.page_size));

if params_vec.len()>0 {
if !params_vec.is_empty() {
write!(f, "?{}", params_vec.join("&"))
} else {
write!(f, "")
Expand Down

0 comments on commit 25b3c3c

Please sign in to comment.