Skip to content

Commit

Permalink
Avoid uploading empty xorbs.
Browse files Browse the repository at this point in the history
  • Loading branch information
hoytak committed Jan 31, 2025
1 parent 5cf29c1 commit 22c8ba8
Showing 1 changed file with 49 additions and 41 deletions.
90 changes: 49 additions & 41 deletions data/src/parallel_xorb_uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,51 +89,59 @@ impl XorbUpload for ParallelXorbUploader {

// Only upload a new xorb if there is new data; it may be that an existing new file is formed only
// from existing chunks.
let xorb_data_len = cas_data.data.len();

let cas_hash = cas_node_hash(&cas_data.chunks[..]);

// Rate limiting, the acquired permit is dropped after the task completes.
// The chosen Semaphore is fair, meaning xorbs added first will be scheduled to upload first.
let permit = self
.rate_limiter
.clone()
.acquire_owned()
.await
.map_err(|e| UploadTaskError(e.to_string()))?;

let item = (cas_hash, cas_data.data, cas_data.chunks);
let shard_manager = self.shard_manager.clone();
let cas = self.cas.clone();
let cas_prefix = self.cas_prefix.clone();

let mut upload_tasks = self.upload_tasks.lock().await;
let upload_progress_updater = self.upload_progress_updater.clone();
upload_tasks.spawn_on(
async move {
let ret = upload_and_register_xorb(item, shard_manager, cas, cas_prefix).await;
if ret.is_ok() {
if let Some(updater) = upload_progress_updater {
updater.update(xorb_data_len as u64);
if cas_data.data.is_empty() {
// Register any new files if present; ignore xorb uploading in this case.
for (fi, _chunk_hash_indices) in cas_data.pending_file_info {
debug_assert!(_chunk_hash_indices.is_empty());
self.shard_manager.add_file_reconstruction_info(fi).await?;
}
Ok(MerkleHash::default())
} else {
let xorb_data_len = cas_data.data.len();

let cas_hash = cas_node_hash(&cas_data.chunks[..]);

// Rate limiting, the acquired permit is dropped after the task completes.
// The chosen Semaphore is fair, meaning xorbs added first will be scheduled to upload first.
let permit = self
.rate_limiter
.clone()
.acquire_owned()
.await
.map_err(|e| UploadTaskError(e.to_string()))?;

let item = (cas_hash, cas_data.data, cas_data.chunks);
let shard_manager = self.shard_manager.clone();
let cas = self.cas.clone();
let cas_prefix = self.cas_prefix.clone();

let mut upload_tasks = self.upload_tasks.lock().await;
let upload_progress_updater = self.upload_progress_updater.clone();
upload_tasks.spawn_on(
async move {
let ret = upload_and_register_xorb(item, shard_manager, cas, cas_prefix).await;
if ret.is_ok() {
if let Some(updater) = upload_progress_updater {
updater.update(xorb_data_len as u64);
}
}
drop(permit);
ret
},
&self.threadpool.handle(),
);

// Now register any new files as needed.
for (mut fi, chunk_hash_indices) in cas_data.pending_file_info {
for i in chunk_hash_indices {
debug_assert_eq!(fi.segments[i].cas_hash, MerkleHash::default());
fi.segments[i].cas_hash = cas_hash;
}
drop(permit);
ret
},
&self.threadpool.handle(),
);

// Now register any new files as needed.
for (mut fi, chunk_hash_indices) in cas_data.pending_file_info {
for i in chunk_hash_indices {
debug_assert_eq!(fi.segments[i].cas_hash, MerkleHash::default());
fi.segments[i].cas_hash = cas_hash;
}

self.shard_manager.add_file_reconstruction_info(fi).await?;
self.shard_manager.add_file_reconstruction_info(fi).await?;
}
Ok(cas_hash)
}

Ok(cas_hash)
}

/// Flush makes sure all xorbs added to queue before this call are sent successfully
Expand Down

0 comments on commit 22c8ba8

Please sign in to comment.