Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Transactions and Atomic Reads in Zookeeper Async Rust Client #15

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions examples/transactions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use std::{env, time::Duration};

use zookeeper_async::{WatchedEvent, Watcher, ZooKeeper};

struct LoggingWatcher;
impl Watcher for LoggingWatcher {
fn handle(&self, e: WatchedEvent) {
println!("{:?}", e)
}
}

#[tokio::main]
async fn main() {
let zk_urls = zk_server_urls();
println!("connecting to {}", zk_urls);

let zk = ZooKeeper::connect(&zk_urls, Duration::from_secs(15), LoggingWatcher)
.await
.unwrap();

// Create transaction that creates a node and a child node
let results = zk
.transaction()
.create(
"/test",
vec![],
zookeeper_async::Acl::open_unsafe().clone(),
zookeeper_async::CreateMode::Persistent,
)
.create(
"/test/child1",
vec![],
zookeeper_async::Acl::open_unsafe().clone(),
zookeeper_async::CreateMode::Persistent,
)
// Check that the node exists
.check("/test", None)
.commit()
.await
.unwrap();

for result in results {
println!("{:?}", result);
}

// Create transaction that sets data on a node
let results = zk
.transaction()
.create(
"/test2",
vec![],
zookeeper_async::Acl::open_unsafe().clone(),
zookeeper_async::CreateMode::Persistent,
)
.set_data("/test2", vec![1, 2, 3], None)
.create(
"/test2/child1",
vec![],
zookeeper_async::Acl::open_unsafe().clone(),
zookeeper_async::CreateMode::Persistent,
)
.set_data("/test2/child1", vec![4, 5, 6], None)
.commit()
.await
.unwrap();

for result in results {
println!("{:?}", result);
}

// Read the data from the node
let results = zk
.read()
.get_data("/test2", false)
.get_data("/test2/child1", false)
.execute()
.await
.unwrap();

for result in results {
println!("{:?}", result);
}

// Delete all nodes
let results = zk
.transaction()
.delete("/test/child1", None)
.delete("/test", None)
.delete("/test2/child1", None)
.delete("/test2", None)
.commit()
.await
.unwrap();

for result in results {
println!("{:?}", result);
}
}

fn zk_server_urls() -> String {
let key = "ZOOKEEPER_SERVERS";
match env::var(key) {
Ok(val) => val,
Err(_) => "localhost:2181".to_string(),
}
}
3 changes: 3 additions & 0 deletions src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use num_enum::*;
)]
#[repr(i32)]
pub enum ZkError {
/// Operation completed successfully.
/// This code is used to indicate success in transaction operations.
Ok = 0,
/// This code is never returned from the server. It should not be used other than to indicate a
/// range. Specifically error codes greater than this value are API errors (while values less
/// than this indicate a system error).
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod consts;
mod data;
mod io;
mod listeners;
mod multi_op;
mod paths;
mod proto;
pub mod recipes;
Expand All @@ -16,6 +17,7 @@ pub use self::zookeeper::{ZkResult, ZooKeeper};
pub use acl::*;
pub use consts::*;
pub use data::*;
pub use multi_op::*;
pub use watch::{Watch, WatchType, WatchedEvent, Watcher};
pub use zookeeper_ext::ZooKeeperExt;

Expand Down
162 changes: 162 additions & 0 deletions src/multi_op.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
use std::time::Duration;

use crate::{
proto::{
CheckRequest, CreateRequest, CreateTTLRequest, DeleteRequest, GetDataRequest, Op,
SetDataRequest,
},
Acl, CreateMode, Stat, ZkResult, ZooKeeper,
};

#[derive(Debug)]
pub enum OperationResult {
Create(String),
Create2(String, Stat),
CreateTtl(String, Stat),
SetData(Stat),
Delete,
Check,
}

#[derive(Debug)]
pub enum ReadOperationResult {
GetData(Vec<u8>, Stat),
GetChildren(Vec<String>),
}

pub struct Transaction<'a> {
zookeeper: &'a ZooKeeper,
operations: Vec<Op>,
}

pub struct Read<'a> {
zookeeper: &'a ZooKeeper,
operations: Vec<Op>,
}

impl<'a> Transaction<'a> {
pub fn new(zookeeper: &'a ZooKeeper) -> Self {
Self {
zookeeper,
operations: Vec::new(),
}
}

/// See [ZooKeeper::create]
pub fn create(mut self, path: &str, data: Vec<u8>, acl: Vec<Acl>, mode: CreateMode) -> Self {
self.operations.push(Op::Create(CreateRequest {
path: path.to_string(),
data,
acl,
flags: mode as i32,
}));
self
}

/// See [ZooKeeper::create2]
pub fn create2(mut self, path: &str, data: Vec<u8>, acl: Vec<Acl>, mode: CreateMode) -> Self {
self.operations.push(Op::Create2(CreateRequest {
path: path.to_string(),
data,
acl,
flags: mode as i32,
}));
self
}

/// See [ZooKeeper::create_ttl]
pub fn create_ttl(
mut self,
path: &str,
data: Vec<u8>,
acl: Vec<Acl>,
mode: CreateMode,
ttl: Duration,
) -> Self {
self.operations.push(Op::CreateTtl(CreateTTLRequest {
path: path.to_string(),
data,
acl,
flags: mode as i32,
ttl: ttl.as_millis() as i64,
}));
self
}

/// See [ZooKeeper::set_data]
pub fn set_data(mut self, path: &str, data: Vec<u8>, version: Option<i32>) -> Self {
self.operations.push(Op::SetData(SetDataRequest {
path: path.to_string(),
data,
version: version.unwrap_or(-1),
}));
self
}

/// See [ZooKeeper::delete]
pub fn delete(mut self, path: &str, version: Option<i32>) -> Self {
self.operations.push(Op::Delete(DeleteRequest {
path: path.to_string(),
version: version.unwrap_or(-1),
}));
self
}

/// Check if the path exists and the version matches. If the version is not provided, it will
/// check if the path exists.
pub fn check(mut self, path: &str, version: Option<i32>) -> Self {
self.operations.push(Op::Check(CheckRequest {
path: path.to_string(),
version: version.unwrap_or(-1),
}));
self
}

/// Commit the transaction
///
/// # Errors
///
/// If any of the operations fail, the first error will be returned.
///
/// See [ZooKeeper] for more information on errors.
/// See [crate::ZkError] for list of possible errrors.
pub async fn commit(self) -> ZkResult<Vec<OperationResult>> {
self.zookeeper.multi(self.operations).await
}
}

impl<'a> Read<'a> {
pub fn new(zookeeper: &'a ZooKeeper) -> Self {
Self {
zookeeper,
operations: Vec::new(),
}
}
/// See [ZooKeeper::get_data]
pub fn get_data(mut self, path: &str, watch: bool) -> Self {
self.operations.push(Op::GetData(GetDataRequest {
path: path.to_string(),
watch,
}));
self
}

/// See [ZooKeeper::get_children]
pub fn get_children(mut self, path: &str, watch: bool) -> Self {
self.operations.push(Op::GetChildren(GetDataRequest {
path: path.to_string(),
watch,
}));
self
}

/// # Errors
///
/// If any of the operations fail, the first error will be returned.
///
/// See [ZooKeeper] for more information on errors.
/// See [crate::ZkError] for list of possible errrors.
pub async fn execute(self) -> ZkResult<Vec<ReadOperationResult>> {
self.zookeeper.multi_read(self.operations).await
}
}
Loading
Loading