Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fragmentation Prevention #147

Merged
merged 7 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 78 additions & 4 deletions data/src/clean.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::mem::take;
use std::ops::DerefMut;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -26,7 +26,10 @@ use utils::progress::ProgressUpdater;
use utils::ThreadPool;

use crate::chunking::{chunk_target_default, ChunkYieldType};
use crate::constants::MIN_SPACING_BETWEEN_GLOBAL_DEDUP_QUERIES;
use crate::constants::{
MIN_N_CHUNKS_PER_RANGE_HIGH, MIN_N_CHUNKS_PER_RANGE_LOW, MIN_SPACING_BETWEEN_GLOBAL_DEDUP_QUERIES,
NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR,
};
use crate::data_processing::CASDataAggregator;
use crate::errors::DataProcessingError::*;
use crate::errors::Result;
Expand Down Expand Up @@ -57,6 +60,38 @@ struct DedupFileTrackingInfo {
current_cas_file_info_indices: Vec<usize>,
current_cas_block_hashes: HashMap<MerkleHash, usize>,
cas_data: CASDataAggregator,
/// This tracks the number of chunks in each of the last N ranges
rolling_last_nranges: VecDeque<usize>,
/// This tracks the total number of chunks in the last N ranges
rolling_nranges_chunks: usize,
/// chooses between MIN_N_CHUNKS_PER_RANGE_HIGH or MIN_N_CHUNKS_PER_RANGE_LOW
/// Used to provide some hysteresis on the defrag decision
defrag_at_low_threshold: bool,
}

impl DedupFileTrackingInfo {
fn increment_last_range_in_fragmentation_estimate(&mut self, nchunks: usize) {
if let Some(back) = self.rolling_last_nranges.back_mut() {
*back += nchunks;
self.rolling_nranges_chunks += nchunks;
}
}
fn add_range_to_fragmentation_estimate(&mut self, nchunks: usize) {
self.rolling_last_nranges.push_back(nchunks);
self.rolling_nranges_chunks += nchunks;
if self.rolling_last_nranges.len() > NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR {
self.rolling_nranges_chunks -= self.rolling_last_nranges.pop_front().unwrap();
}
}
/// Returns the average number of chunks per range
/// None if there is is not enough data for an estimate
fn rolling_chunks_per_range(&self) -> Option<f32> {
if self.rolling_last_nranges.len() < NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR {
None
} else {
Some(self.rolling_nranges_chunks as f32 / self.rolling_last_nranges.len() as f32)
}
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -462,8 +497,41 @@ impl Cleaner {

while cur_idx < chunks.len() {
let mut n_bytes = 0;
let mut dedupe_query = deduped_blocks[cur_idx].take();

// check the fragmentation state and if it is pretty fragmented
// we skip dedupe
let mut forced_nodedupe = false;
if let Some((n_deduped, _)) = dedupe_query {
if let Some(chunks_per_range) = tracking_info.rolling_chunks_per_range() {
let target_cpr = if tracking_info.defrag_at_low_threshold {
MIN_N_CHUNKS_PER_RANGE_LOW
} else {
MIN_N_CHUNKS_PER_RANGE_HIGH
};
if chunks_per_range < target_cpr {
// chunks per range is pretty poor, we should not dedupe.
// However, here we do get to look ahead a little bit
// and check the size of the next dedupe window.
// if it is too small, it is not going to improve
// the chunks per range and so we skip it.
if (n_deduped as f32) < chunks_per_range {
dedupe_query = None;
forced_nodedupe = true;
// once I start skipping dedupe, we try to raise
// the cpr to the high threshold
tracking_info.defrag_at_low_threshold = false;
}
} else {
// once I start deduping again, we lower CPR
// to the low threshold so we allow for more small
// fragments.
tracking_info.defrag_at_low_threshold = true;
}
}
}

if let Some((n_deduped, fse)) = deduped_blocks[cur_idx].take() {
if let Some((n_deduped, fse)) = dedupe_query {
// We found one or more chunk hashes present in a cas block somewhere.
ylow marked this conversation as resolved.
Show resolved Hide resolved

// Update all the metrics.
Expand All @@ -482,9 +550,12 @@ impl Cleaner {
let last_entry = tracking_info.file_info.last_mut().unwrap();
last_entry.unpacked_segment_bytes += n_bytes as u32;
last_entry.chunk_index_end = fse.chunk_index_end;
// update the fragmentation estimation window
tracking_info.increment_last_range_in_fragmentation_estimate(n_deduped);
} else {
// This block is new
tracking_info.file_info.push(fse);
tracking_info.add_range_to_fragmentation_estimate(n_deduped);
}

cur_idx += n_deduped;
Expand All @@ -496,7 +567,8 @@ impl Cleaner {
// This is new data.
let add_new_data;

if let Some(idx) = tracking_info.current_cas_block_hashes.get(&chunk.hash) {
if tracking_info.current_cas_block_hashes.contains_key(&chunk.hash) && !forced_nodedupe {
let idx = tracking_info.current_cas_block_hashes.get(&chunk.hash).unwrap();
let idx = *idx;
// This chunk will get the CAS hash updated when the local CAS block
// is full and registered.
Expand All @@ -521,6 +593,7 @@ impl Cleaner {
last_entry.unpacked_segment_bytes += n_bytes as u32;
last_entry.chunk_index_end += 1;
add_new_data = true;
tracking_info.increment_last_range_in_fragmentation_estimate(1);
} else {
// This block is unrelated to the previous one.
// This chunk will get the CAS hash updated when the local CAS block
Expand All @@ -535,6 +608,7 @@ impl Cleaner {
chunk_len,
chunk_len + 1,
));
tracking_info.add_range_to_fragmentation_estimate(1);
add_new_data = true;
}

Expand Down
8 changes: 8 additions & 0 deletions data/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,11 @@ pub const CURRENT_VERSION: &str = env!("CARGO_PKG_VERSION");
/// Maximum number of entries in the file construction cache
/// which stores File Hash -> reconstruction instructions
pub const FILE_RECONSTRUCTION_CACHE_SIZE: usize = 65536;

/// Number of ranges to use when estimating fragmentation
pub const NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR: usize = 128;

/// Minimum number of chunks per range. Used to control fragmentation
/// This targets an average of 1MB per range
pub const MIN_N_CHUNKS_PER_RANGE_LOW: f32 = 4.0;
pub const MIN_N_CHUNKS_PER_RANGE_HIGH: f32 = 8.0;
Loading