From 197721c624705aa9c62f49fdfccd0afe5b7cff18 Mon Sep 17 00:00:00 2001 From: Rajat Arya Date: Fri, 13 Sep 2024 19:13:02 -0700 Subject: [PATCH 1/2] Fixing upload --- cas_client/src/remote_client.rs | 28 +++++++++++++++++----------- cas_client/src/util.rs | 12 ++++-------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/cas_client/src/remote_client.rs b/cas_client/src/remote_client.rs index 70c3176a..9e79669b 100644 --- a/cas_client/src/remote_client.rs +++ b/cas_client/src/remote_client.rs @@ -1,4 +1,4 @@ -use std::io::{Cursor, Write}; +use std::io::{BufWriter, Cursor, Write}; use anyhow::anyhow; use bytes::Buf; @@ -46,7 +46,7 @@ impl Client for RemoteClient { hash: *hash, }; - let was_uploaded = self.client.upload(&key, data, chunk_boundaries).await?; + let was_uploaded = self.client.upload(&key, &data, chunk_boundaries).await?; if !was_uploaded { debug!("{key:?} not inserted into CAS."); @@ -157,22 +157,27 @@ impl CASAPIClient { Ok(Some(length)) } - pub async fn upload>( + pub async fn upload( &self, key: &Key, - contents: T, + contents: &[u8], chunk_boundaries: Vec, ) -> Result { - let chunk_boundaries_query = chunk_boundaries - .iter() - .map(|num| num.to_string()) - .collect::>() - .join(","); - let url = Url::parse(&format!("{0}/{1}/xorb/{key}?{chunk_boundaries_query}", self.scheme, self.endpoint))?; + let url = Url::parse(&format!("{0}/{1}/xorb/{key}", self.scheme, self.endpoint))?; + + let writer = Cursor::new(Vec::new()); + let mut buf = BufWriter::new(writer); + + let (_,_) = CasObject::serialize( + &mut buf, + &key.hash, + contents, + &chunk_boundaries.into_iter().map(|x| x as u32).collect() + )?; debug!("Upload: POST to {url:?} for {key:?}"); - let response = self.client.post(url).body(contents.into()).send().await?; + let response = self.client.post(url).body(buf.buffer().to_vec()).send().await?; let response_body = response.bytes().await?; let response_parsed: UploadXorbResponse = serde_json::from_reader(response_body.reader())?; @@ -189,6 +194,7 @@ impl CASAPIClient { } async fn reconstruct(&self, reconstruction_response: QueryReconstructionResponse, writer: &mut W) -> Result { + let info = reconstruction_response.reconstruction; let total_len = info.iter().fold(0, |acc, x| acc + x.unpacked_length); let futs = info.into_iter().map(|term| { diff --git a/cas_client/src/util.rs b/cas_client/src/util.rs index d70047a4..fc12f9b1 100644 --- a/cas_client/src/util.rs +++ b/cas_client/src/util.rs @@ -1,27 +1,23 @@ #[cfg(test)] pub(crate) mod grpc_mock { - use std::sync::atomic::{AtomicU16, Ordering}; + use std::sync::atomic::AtomicU16; use std::sync::Arc; - use std::time::Duration; use cas::infra::infra_utils_server::InfraUtils; - use oneshot::{channel, Receiver}; + use oneshot::Receiver; use tokio::sync::oneshot; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; - use tokio::time::sleep; - use tonic::transport::{Error, Server}; + use tonic::transport::Error; use tonic::{Request, Response, Status}; - use crate::cas_connection_pool::CasConnectionConfig; - use cas::cas::cas_server::{Cas, CasServer}; + use cas::cas::cas_server::Cas; use cas::cas::{ GetRangeRequest, GetRangeResponse, GetRequest, GetResponse, HeadRequest, HeadResponse, PutCompleteRequest, PutCompleteResponse, PutRequest, PutResponse, }; use cas::common::{Empty, InitiateRequest, InitiateResponse}; use cas::infra::EndpointLoadResponse; - use retry_strategy::RetryStrategy; const TEST_PORT_START: u16 = 64400; From 45afab672e9da1d2d8da7fed20c28e821afce51c Mon Sep 17 00:00:00 2001 From: Rajat Arya Date: Fri, 13 Sep 2024 19:17:09 -0700 Subject: [PATCH 2/2] fixing merge --- cas_client/src/util.rs | 466 +++++++++++++++++++++-------------------- 1 file changed, 235 insertions(+), 231 deletions(-) diff --git a/cas_client/src/util.rs b/cas_client/src/util.rs index fc12f9b1..94db4f46 100644 --- a/cas_client/src/util.rs +++ b/cas_client/src/util.rs @@ -1,231 +1,235 @@ -#[cfg(test)] -pub(crate) mod grpc_mock { - use std::sync::atomic::AtomicU16; - use std::sync::Arc; - - use cas::infra::infra_utils_server::InfraUtils; - use oneshot::Receiver; - use tokio::sync::oneshot; - use tokio::sync::oneshot::Sender; - use tokio::task::JoinHandle; - use tonic::transport::Error; - use tonic::{Request, Response, Status}; - - use cas::cas::cas_server::Cas; - use cas::cas::{ - GetRangeRequest, GetRangeResponse, GetRequest, GetResponse, HeadRequest, HeadResponse, - PutCompleteRequest, PutCompleteResponse, PutRequest, PutResponse, - }; - use cas::common::{Empty, InitiateRequest, InitiateResponse}; - use cas::infra::EndpointLoadResponse; - - const TEST_PORT_START: u16 = 64400; - - lazy_static::lazy_static! { - static ref CURRENT_PORT: AtomicU16 = AtomicU16::new(TEST_PORT_START); - } - - trait_set::trait_set! { - pub trait PutFn = Fn(Request) -> Result, Status> + 'static; - pub trait InitiateFn = Fn(Request) -> Result, Status> + 'static; - pub trait PutCompleteFn = Fn(Request) -> Result, Status> + 'static; - pub trait GetFn = Fn(Request) -> Result, Status> + 'static; - pub trait GetRangeFn = Fn(Request) -> Result, Status> + 'static; - pub trait HeadFn = Fn(Request) -> Result, Status> + 'static; - } - - /// "Mocks" the grpc service for CAS. This is implemented by allowing the test writer - /// to define the functionality needed for the server and then calling `#start()` to - /// run the server on some port. A GrpcClient will be returned to test with as well - /// as a shutdown hook that can be called to shutdown the mock service. - #[derive(Default)] - pub struct MockService { - put_fn: Option>, - initiate_fn: Option>, - put_complete_fn: Option>, - get_fn: Option>, - get_range_fn: Option>, - head_fn: Option>, - } - - impl MockService { - #[allow(dead_code)] - pub fn with_initiate(self, f: F) -> Self { - Self { - initiate_fn: Some(Arc::new(f)), - ..self - } - } - #[allow(dead_code)] - pub fn with_put_complete(self, f: F) -> Self { - Self { - put_complete_fn: Some(Arc::new(f)), - ..self - } - } - - pub fn with_put(self, f: F) -> Self { - Self { - put_fn: Some(Arc::new(f)), - ..self - } - } - - #[allow(dead_code)] - pub fn with_get(self, f: F) -> Self { - Self { - get_fn: Some(Arc::new(f)), - ..self - } - } - - #[allow(dead_code)] - pub fn with_get_range(self, f: F) -> Self { - Self { - get_range_fn: Some(Arc::new(f)), - ..self - } - } - - #[allow(dead_code)] - pub fn with_head(self, f: F) -> Self { - Self { - head_fn: Some(Arc::new(f)), - ..self - } - } - - /* - pub async fn start(self) -> (ShutdownHook, GrpcClient) { - self.start_with_retry_strategy(RetryStrategy::new(2, 1)) - .await - } - - pub async fn start_with_retry_strategy( - self, - strategy: RetryStrategy, - ) -> (ShutdownHook, GrpcClient) { - // Get next port - let port = CURRENT_PORT.fetch_add(1, Ordering::SeqCst); - let addr = format!("127.0.0.1:{}", port).parse().unwrap(); - - // Start up the server - let (tx, rx) = channel::<()>(); - let handle = tokio::spawn( - Server::builder() - .add_service(CasServer::new(self)) - .serve_with_shutdown(addr, shutdown(rx)), - ); - let shutdown_hook = ShutdownHook::new(tx, handle); - - // Wait for server to start up - sleep(Duration::from_millis(10)).await; - - // Create dedicated client for server - let endpoint = format!("127.0.0.1:{}", port); - let user_id = "xet_user".to_string(); - let auth = "xet_auth".to_string(); - let repo_paths = vec!["example".to_string()]; - let version = "0.1.0".to_string(); - let cas_client = get_client(CasConnectionConfig::new( - endpoint, user_id, auth, repo_paths, version, - )) - .await - .unwrap(); - let client = GrpcClient::new("127.0.0.1".to_string(), cas_client, strategy); - (shutdown_hook, client) - } - - */ - } - - // Unsafe hacks so that we can dynamically add in overrides to the mock functionality - // (Fn isn't sync/send). There's probably a better way to do this that isn't so blunt/fragile. - unsafe impl Send for MockService {} - unsafe impl Sync for MockService {} - - #[async_trait::async_trait] - impl InfraUtils for MockService { - async fn endpoint_load( - &self, - _request: Request, - ) -> Result, Status> { - unimplemented!() - } - async fn initiate( - &self, - request: Request, - ) -> Result, Status> { - self.initiate_fn.as_ref().unwrap()(request) - } - } - #[async_trait::async_trait] - impl Cas for MockService { - async fn initiate( - &self, - request: Request, - ) -> Result, Status> { - self.initiate_fn.as_ref().unwrap()(request) - } - - async fn put(&self, request: Request) -> Result, Status> { - self.put_fn.as_ref().unwrap()(request) - } - - async fn put_complete( - &self, - request: Request, - ) -> Result, Status> { - self.put_complete_fn.as_ref().unwrap()(request) - } - - async fn get(&self, request: Request) -> Result, Status> { - self.get_fn.as_ref().unwrap()(request) - } - - async fn get_range( - &self, - request: Request, - ) -> Result, Status> { - self.get_range_fn.as_ref().unwrap()(request) - } - - async fn head( - &self, - request: Request, - ) -> Result, Status> { - self.head_fn.as_ref().unwrap()(request) - } - } - - async fn shutdown(rx: Receiver<()>) { - let _ = rx.await; - } - - /// Encapsulates logic to shutdown a running tonic Server. This is done through - /// sending a message on a channel that the server is listening on for shutdown. - /// Once the message has been sent, the spawned task is awaited using its JoinHandle. - /// - /// TODO: implementing `Drop` with async is difficult and the naïve implementation - /// ends up blocking the test completion. There is likely some deadlock somewhere. - pub struct ShutdownHook { - tx: Option>, - join_handle: Option>>, - } - - impl ShutdownHook { - pub fn new(tx: Sender<()>, join_handle: JoinHandle>) -> Self { - Self { - tx: Some(tx), - join_handle: Some(join_handle), - } - } - - pub async fn async_drop(&mut self) { - let tx = self.tx.take(); - let handle = self.join_handle.take(); - let _ = tx.unwrap().send(()); - let _ = handle.unwrap().await; - } - } -} +// #[cfg(test)] +// pub(crate) mod grpc_mock { +// use std::sync::atomic::{AtomicU16, Ordering}; +// use std::sync::Arc; +// use std::time::Duration; + +// use cas::infra::infra_utils_server::InfraUtils; +// use oneshot::{channel, Receiver}; +// use tokio::sync::oneshot; +// use tokio::sync::oneshot::Sender; +// use tokio::task::JoinHandle; +// use tokio::time::sleep; +// use tonic::transport::{Error, Server}; +// use tonic::{Request, Response, Status}; + +// use crate::cas_connection_pool::CasConnectionConfig; +// use cas::cas::cas_server::{Cas, CasServer}; +// use cas::cas::{ +// GetRangeRequest, GetRangeResponse, GetRequest, GetResponse, HeadRequest, HeadResponse, +// PutCompleteRequest, PutCompleteResponse, PutRequest, PutResponse, +// }; +// use cas::common::{Empty, InitiateRequest, InitiateResponse}; +// use cas::infra::EndpointLoadResponse; +// use retry_strategy::RetryStrategy; + +// const TEST_PORT_START: u16 = 64400; + +// lazy_static::lazy_static! { +// static ref CURRENT_PORT: AtomicU16 = AtomicU16::new(TEST_PORT_START); +// } + +// trait_set::trait_set! { +// pub trait PutFn = Fn(Request) -> Result, Status> + 'static; +// pub trait InitiateFn = Fn(Request) -> Result, Status> + 'static; +// pub trait PutCompleteFn = Fn(Request) -> Result, Status> + 'static; +// pub trait GetFn = Fn(Request) -> Result, Status> + 'static; +// pub trait GetRangeFn = Fn(Request) -> Result, Status> + 'static; +// pub trait HeadFn = Fn(Request) -> Result, Status> + 'static; +// } + +// /// "Mocks" the grpc service for CAS. This is implemented by allowing the test writer +// /// to define the functionality needed for the server and then calling `#start()` to +// /// run the server on some port. A GrpcClient will be returned to test with as well +// /// as a shutdown hook that can be called to shutdown the mock service. +// #[derive(Default)] +// pub struct MockService { +// put_fn: Option>, +// initiate_fn: Option>, +// put_complete_fn: Option>, +// get_fn: Option>, +// get_range_fn: Option>, +// head_fn: Option>, +// } + +// impl MockService { +// #[allow(dead_code)] +// pub fn with_initiate(self, f: F) -> Self { +// Self { +// initiate_fn: Some(Arc::new(f)), +// ..self +// } +// } +// #[allow(dead_code)] +// pub fn with_put_complete(self, f: F) -> Self { +// Self { +// put_complete_fn: Some(Arc::new(f)), +// ..self +// } +// } + +// pub fn with_put(self, f: F) -> Self { +// Self { +// put_fn: Some(Arc::new(f)), +// ..self +// } +// } + +// #[allow(dead_code)] +// pub fn with_get(self, f: F) -> Self { +// Self { +// get_fn: Some(Arc::new(f)), +// ..self +// } +// } + +// #[allow(dead_code)] +// pub fn with_get_range(self, f: F) -> Self { +// Self { +// get_range_fn: Some(Arc::new(f)), +// ..self +// } +// } + +// #[allow(dead_code)] +// pub fn with_head(self, f: F) -> Self { +// Self { +// head_fn: Some(Arc::new(f)), +// ..self +// } +// } + +// /* +// pub async fn start(self) -> (ShutdownHook, GrpcClient) { +// self.start_with_retry_strategy(RetryStrategy::new(2, 1)) +// .await +// } + +// pub async fn start_with_retry_strategy( +// self, +// strategy: RetryStrategy, +// ) -> (ShutdownHook, GrpcClient) { +// // Get next port +// let port = CURRENT_PORT.fetch_add(1, Ordering::SeqCst); +// let addr = format!("127.0.0.1:{}", port).parse().unwrap(); + +// // Start up the server +// let (tx, rx) = channel::<()>(); +// let handle = tokio::spawn( +// Server::builder() +// .add_service(CasServer::new(self)) +// .serve_with_shutdown(addr, shutdown(rx)), +// ); +// let shutdown_hook = ShutdownHook::new(tx, handle); + +// // Wait for server to start up +// sleep(Duration::from_millis(10)).await; + +// // Create dedicated client for server +// let endpoint = format!("127.0.0.1:{}", port); +// let user_id = "xet_user".to_string(); +// let auth = "xet_auth".to_string(); +// let repo_paths = vec!["example".to_string()]; +// let version = "0.1.0".to_string(); +// let cas_client = get_client(CasConnectionConfig::new( +// endpoint, user_id, auth, repo_paths, version, +// )) +// .await +// .unwrap(); +// let client = GrpcClient::new("127.0.0.1".to_string(), cas_client, strategy); +// (shutdown_hook, client) +// } + +// */ +// } + +// // Unsafe hacks so that we can dynamically add in overrides to the mock functionality +// // (Fn isn't sync/send). There's probably a better way to do this that isn't so blunt/fragile. +// unsafe impl Send for MockService {} +// unsafe impl Sync for MockService {} + +// #[async_trait::async_trait] +// impl InfraUtils for MockService { +// async fn endpoint_load( +// &self, +// _request: Request, +// ) -> Result, Status> { +// unimplemented!() +// } +// async fn initiate( +// &self, +// request: Request, +// ) -> Result, Status> { +// self.initiate_fn.as_ref().unwrap()(request) +// } +// } +// #[async_trait::async_trait] +// impl Cas for MockService { +// async fn initiate( +// &self, +// request: Request, +// ) -> Result, Status> { +// self.initiate_fn.as_ref().unwrap()(request) +// } + +// async fn put(&self, request: Request) -> Result, Status> { +// self.put_fn.as_ref().unwrap()(request) +// } + +// async fn put_complete( +// &self, +// request: Request, +// ) -> Result, Status> { +// self.put_complete_fn.as_ref().unwrap()(request) +// } + +// async fn get(&self, request: Request) -> Result, Status> { +// self.get_fn.as_ref().unwrap()(request) +// } + +// async fn get_range( +// &self, +// request: Request, +// ) -> Result, Status> { +// self.get_range_fn.as_ref().unwrap()(request) +// } + +// async fn head( +// &self, +// request: Request, +// ) -> Result, Status> { +// self.head_fn.as_ref().unwrap()(request) +// } +// } + +// async fn shutdown(rx: Receiver<()>) { +// let _ = rx.await; +// } + +// /// Encapsulates logic to shutdown a running tonic Server. This is done through +// /// sending a message on a channel that the server is listening on for shutdown. +// /// Once the message has been sent, the spawned task is awaited using its JoinHandle. +// /// +// /// TODO: implementing `Drop` with async is difficult and the naïve implementation +// /// ends up blocking the test completion. There is likely some deadlock somewhere. +// pub struct ShutdownHook { +// tx: Option>, +// join_handle: Option>>, +// } + +// impl ShutdownHook { +// pub fn new(tx: Sender<()>, join_handle: JoinHandle>) -> Self { +// Self { +// tx: Some(tx), +// join_handle: Some(join_handle), +// } +// } + +// pub async fn async_drop(&mut self) { +// let tx = self.tx.take(); +// let handle = self.join_handle.take(); +// let _ = tx.unwrap().send(()); +// let _ = handle.unwrap().await; +// } +// } +// }