Skip to content

Commit

Permalink
Add streaming example
Browse files Browse the repository at this point in the history
  • Loading branch information
nrempel committed Nov 5, 2024
1 parent 445a032 commit 5cba9f2
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 0 deletions.
5 changes: 5 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,15 @@ path = "src/prefix.rs"
name = "custom_query"
path = "src/custom_query.rs"

[[bin]]
name = "stream"
path = "src/stream.rs"

[dependencies]
bigtable_rs = { path = "../bigtable_rs" }
tokio = { version = "1.34.0", features = ["rt-multi-thread"] }
env_logger = "0.11.1"
futures-util = "0.3.31"
log = "0.4.20"
warp = "0.3.6"
serde = { version = "1.0", features = ["derive"] }
Expand Down
82 changes: 82 additions & 0 deletions examples/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use bigtable_rs::bigtable;
use bigtable_rs::google::bigtable::v2::row_filter::{Chain, Filter};
use bigtable_rs::google::bigtable::v2::row_range::{EndKey, StartKey};
use bigtable_rs::google::bigtable::v2::{ReadRowsRequest, RowFilter, RowRange, RowSet};
use env_logger;
use futures_util::TryStreamExt;
use std::error::Error;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();

let project_id = "project-1";
let instance_name = "instance-1";
let table_name = "table-1";
let channel_size = 4;
let timeout = Duration::from_secs(10);

let key_start: String = "key1".to_owned();
let key_end: String = "key4".to_owned();

// make a bigtable client
let connection = bigtable::BigTableConnection::new(
project_id,
instance_name,
true,
channel_size,
Some(timeout),
)
.await?;
let mut bigtable = connection.client();

// prepare a ReadRowsRequest
let request = ReadRowsRequest {
app_profile_id: "default".to_owned(),
table_name: bigtable.get_full_table_name(table_name),
rows_limit: 10,
rows: Some(RowSet {
row_keys: vec![], // use this field to put keys for reading specific rows
row_ranges: vec![RowRange {
start_key: Some(StartKey::StartKeyClosed(key_start.into_bytes())),
end_key: Some(EndKey::EndKeyOpen(key_end.into_bytes())),
}],
}),
filter: Some(RowFilter {
filter: Some(Filter::Chain(Chain {
filters: vec![
RowFilter {
filter: Some(Filter::FamilyNameRegexFilter("cf1".to_owned())),
},
RowFilter {
filter: Some(Filter::ColumnQualifierRegexFilter("c1".as_bytes().to_vec())),
},
RowFilter {
filter: Some(Filter::CellsPerColumnLimitFilter(2)),
},
],
})),
}),
..ReadRowsRequest::default()
};

// calling bigtable API to get streaming results
let mut stream = bigtable.stream_rows(request).await?;

// process the stream of results
while let Some((key, data)) = stream.try_next().await? {
println!("------------\n{}", String::from_utf8(key.clone()).unwrap());
data.into_iter().for_each(|row_cell| {
println!(
" [{}:{}] \"{}\" @ {}",
row_cell.family_name,
String::from_utf8(row_cell.qualifier).unwrap(),
String::from_utf8(row_cell.value).unwrap(),
row_cell.timestamp_micros
)
});
}

Ok(())
}

0 comments on commit 5cba9f2

Please sign in to comment.