Skip to content

Commit

Permalink
Fixing RemoteClient XORB upload (#12)
Browse files Browse the repository at this point in the history
* Fixing upload

* fixing merge
  • Loading branch information
rajatarya authored Sep 14, 2024
1 parent 539a4fc commit d819d0b
Showing 1 changed file with 17 additions and 11 deletions.
28 changes: 17 additions & 11 deletions cas_client/src/remote_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::io::{Cursor, Write};
use std::io::{BufWriter, Cursor, Write};

use anyhow::anyhow;
use bytes::Buf;
Expand Down Expand Up @@ -46,7 +46,7 @@ impl Client for RemoteClient {
hash: *hash,
};

let was_uploaded = self.client.upload(&key, data, chunk_boundaries).await?;
let was_uploaded = self.client.upload(&key, &data, chunk_boundaries).await?;

if !was_uploaded {
debug!("{key:?} not inserted into CAS.");
Expand Down Expand Up @@ -157,22 +157,27 @@ impl CASAPIClient {
Ok(Some(length))
}

pub async fn upload<T: Into<reqwest::Body>>(
pub async fn upload(
&self,
key: &Key,
contents: T,
contents: &[u8],
chunk_boundaries: Vec<u64>,
) -> Result<bool> {
let chunk_boundaries_query = chunk_boundaries
.iter()
.map(|num| num.to_string())
.collect::<Vec<String>>()
.join(",");
let url = Url::parse(&format!("{0}/{1}/xorb/{key}?{chunk_boundaries_query}", self.scheme, self.endpoint))?;
let url = Url::parse(&format!("{0}/{1}/xorb/{key}", self.scheme, self.endpoint))?;

let writer = Cursor::new(Vec::new());
let mut buf = BufWriter::new(writer);

let (_,_) = CasObject::serialize(
&mut buf,
&key.hash,
contents,
&chunk_boundaries.into_iter().map(|x| x as u32).collect()
)?;

debug!("Upload: POST to {url:?} for {key:?}");

let response = self.client.post(url).body(contents.into()).send().await?;
let response = self.client.post(url).body(buf.buffer().to_vec()).send().await?;
let response_body = response.bytes().await?;
let response_parsed: UploadXorbResponse = serde_json::from_reader(response_body.reader())?;

Expand All @@ -189,6 +194,7 @@ impl CASAPIClient {
}

async fn reconstruct<W: Write>(&self, reconstruction_response: QueryReconstructionResponse, writer: &mut W) -> Result<usize> {

let info = reconstruction_response.reconstruction;
let total_len = info.iter().fold(0, |acc, x| acc + x.unpacked_length);
let futs = info.into_iter().map(|term| {
Expand Down

0 comments on commit d819d0b

Please sign in to comment.