Skip to content

Commit

Permalink
feat: Add implementation of multple writes (transactions) and atomic …
Browse files Browse the repository at this point in the history
…reads
  • Loading branch information
MaximFischuk committed May 31, 2024
1 parent 7b6f1b9 commit be2eae8
Show file tree
Hide file tree
Showing 7 changed files with 870 additions and 4 deletions.
106 changes: 106 additions & 0 deletions examples/transactions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use std::{env, time::Duration};

use zookeeper_async::{WatchedEvent, Watcher, ZooKeeper};

struct LoggingWatcher;
impl Watcher for LoggingWatcher {
fn handle(&self, e: WatchedEvent) {
println!("{:?}", e)
}
}

#[tokio::main]
async fn main() {
let zk_urls = zk_server_urls();
println!("connecting to {}", zk_urls);

let zk = ZooKeeper::connect(&zk_urls, Duration::from_secs(15), LoggingWatcher)
.await
.unwrap();

// Create transaction that creates a node and a child node
let results = zk
.transaction()
.create(
"/test",
vec![],
zookeeper_async::Acl::open_unsafe().clone(),
zookeeper_async::CreateMode::Persistent,
)
.create(
"/test/child1",
vec![],
zookeeper_async::Acl::open_unsafe().clone(),
zookeeper_async::CreateMode::Persistent,
)
// Check that the node exists
.check("/test", None)
.commit()
.await
.unwrap();

for result in results {
println!("{:?}", result);
}

// Create transaction that sets data on a node
let results = zk
.transaction()
.create(
"/test2",
vec![],
zookeeper_async::Acl::open_unsafe().clone(),
zookeeper_async::CreateMode::Persistent,
)
.set_data("/test2", vec![1, 2, 3], None)
.create(
"/test2/child1",
vec![],
zookeeper_async::Acl::open_unsafe().clone(),
zookeeper_async::CreateMode::Persistent,
)
.set_data("/test2/child1", vec![4, 5, 6], None)
.commit()
.await
.unwrap();

for result in results {
println!("{:?}", result);
}

// Read the data from the node
let results = zk
.read()
.get_data("/test2", false)
.get_data("/test2/child1", false)
.execute()
.await
.unwrap();

for result in results {
println!("{:?}", result);
}

// Delete all nodes
let results = zk
.transaction()
.delete("/test/child1", None)
.delete("/test", None)
.delete("/test2/child1", None)
.delete("/test2", None)
.commit()
.await
.unwrap();

for result in results {
println!("{:?}", result);
}
}

fn zk_server_urls() -> String {
let key = "ZOOKEEPER_SERVERS";
match env::var(key) {
Ok(val) => val,
Err(_) => "localhost:2181".to_string(),
}
}
3 changes: 3 additions & 0 deletions src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use num_enum::*;
)]
#[repr(i32)]
pub enum ZkError {
/// Operation completed successfully.
/// This code is used to indicate success in transaction operations.
Ok = 0,
/// This code is never returned from the server. It should not be used other than to indicate a
/// range. Specifically error codes greater than this value are API errors (while values less
/// than this indicate a system error).
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod consts;
mod data;
mod io;
mod listeners;
mod multi_op;
mod paths;
mod proto;
pub mod recipes;
Expand All @@ -16,6 +17,7 @@ pub use self::zookeeper::{ZkResult, ZooKeeper};
pub use acl::*;
pub use consts::*;
pub use data::*;
pub use multi_op::*;
pub use watch::{Watch, WatchType, WatchedEvent, Watcher};
pub use zookeeper_ext::ZooKeeperExt;

Expand Down
162 changes: 162 additions & 0 deletions src/multi_op.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
use std::time::Duration;

use crate::{
proto::{
CheckRequest, CreateRequest, CreateTTLRequest, DeleteRequest, GetDataRequest, Op,
SetDataRequest,
},
Acl, CreateMode, Stat, ZkResult, ZooKeeper,
};

#[derive(Debug)]
pub enum OperationResult {
Create(String),
Create2(String, Stat),
CreateTtl(String, Stat),
SetData(Stat),
Delete,
Check,
}

#[derive(Debug)]
pub enum ReadOperationResult {
GetData(Vec<u8>, Stat),
GetChildren(Vec<String>),
}

pub struct Transaction<'a> {
zookeeper: &'a ZooKeeper,
operations: Vec<Op>,
}

pub struct Read<'a> {
zookeeper: &'a ZooKeeper,
operations: Vec<Op>,
}

impl<'a> Transaction<'a> {
pub fn new(zookeeper: &'a ZooKeeper) -> Self {
Self {
zookeeper,
operations: Vec::new(),
}
}

/// See [ZooKeeper::create]
pub fn create(mut self, path: &str, data: Vec<u8>, acl: Vec<Acl>, mode: CreateMode) -> Self {
self.operations.push(Op::Create(CreateRequest {
path: path.to_string(),
data,
acl,
flags: mode as i32,
}));
self
}

/// See [ZooKeeper::create2]
pub fn create2(mut self, path: &str, data: Vec<u8>, acl: Vec<Acl>, mode: CreateMode) -> Self {
self.operations.push(Op::Create2(CreateRequest {
path: path.to_string(),
data,
acl,
flags: mode as i32,
}));
self
}

/// See [ZooKeeper::create_ttl]
pub fn create_ttl(
mut self,
path: &str,
data: Vec<u8>,
acl: Vec<Acl>,
mode: CreateMode,
ttl: Duration,
) -> Self {
self.operations.push(Op::CreateTtl(CreateTTLRequest {
path: path.to_string(),
data,
acl,
flags: mode as i32,
ttl: ttl.as_millis() as i64,
}));
self
}

/// See [ZooKeeper::set_data]
pub fn set_data(mut self, path: &str, data: Vec<u8>, version: Option<i32>) -> Self {
self.operations.push(Op::SetData(SetDataRequest {
path: path.to_string(),
data,
version: version.unwrap_or(-1),
}));
self
}

/// See [ZooKeeper::delete]
pub fn delete(mut self, path: &str, version: Option<i32>) -> Self {
self.operations.push(Op::Delete(DeleteRequest {
path: path.to_string(),
version: version.unwrap_or(-1),
}));
self
}

/// Check if the path exists and the version matches. If the version is not provided, it will
/// check if the path exists.
pub fn check(mut self, path: &str, version: Option<i32>) -> Self {
self.operations.push(Op::Check(CheckRequest {
path: path.to_string(),
version: version.unwrap_or(-1),
}));
self
}

/// Commit the transaction
///
/// # Errors
///
/// If any of the operations fail, the first error will be returned.
///
/// See [ZooKeeper] for more information on errors.
/// See [crate::ZkError] for list of possible errrors.
pub async fn commit(self) -> ZkResult<Vec<OperationResult>> {
self.zookeeper.multi(self.operations).await
}
}

impl<'a> Read<'a> {
pub fn new(zookeeper: &'a ZooKeeper) -> Self {
Self {
zookeeper,
operations: Vec::new(),
}
}
/// See [ZooKeeper::get_data]
pub fn get_data(mut self, path: &str, watch: bool) -> Self {
self.operations.push(Op::GetData(GetDataRequest {
path: path.to_string(),
watch,
}));
self
}

/// See [ZooKeeper::get_children]
pub fn get_children(mut self, path: &str, watch: bool) -> Self {
self.operations.push(Op::GetChildren(GetDataRequest {
path: path.to_string(),
watch,
}));
self
}

/// # Errors
///
/// If any of the operations fail, the first error will be returned.
///
/// See [ZooKeeper] for more information on errors.
/// See [crate::ZkError] for list of possible errrors.
pub async fn execute(self) -> ZkResult<Vec<ReadOperationResult>> {
self.zookeeper.multi_read(self.operations).await
}
}
Loading

0 comments on commit be2eae8

Please sign in to comment.