diff --git a/data/src/clean.rs b/data/src/clean.rs index 4ea8c58..d1d870d 100644 --- a/data/src/clean.rs +++ b/data/src/clean.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::mem::take; use std::ops::DerefMut; use std::path::{Path, PathBuf}; @@ -26,7 +26,10 @@ use utils::progress::ProgressUpdater; use xet_threadpool::ThreadPool; use crate::chunking::{chunk_target_default, ChunkYieldType}; -use crate::constants::MIN_SPACING_BETWEEN_GLOBAL_DEDUP_QUERIES; +use crate::constants::{ + DEFAULT_MIN_N_CHUNKS_PER_RANGE, MIN_N_CHUNKS_PER_RANGE_HYSTERESIS_FACTOR, MIN_SPACING_BETWEEN_GLOBAL_DEDUP_QUERIES, + NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR, +}; use crate::data_processing::CASDataAggregator; use crate::errors::DataProcessingError::*; use crate::errors::Result; @@ -44,6 +47,13 @@ lazy_static! { .unwrap_or(1); } +lazy_static! { + pub static ref MIN_N_CHUNKS_PER_RANGE: f32 = std::env::var("XET_MIN_N_CHUNKS_PER_RANGE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_MIN_N_CHUNKS_PER_RANGE); +} + pub enum BufferItem { Value(T), Completed, @@ -56,6 +66,39 @@ struct DedupFileTrackingInfo { current_cas_file_info_indices: Vec, current_cas_block_hashes: HashMap, cas_data: CASDataAggregator, + /// This tracks the number of chunks in each of the last N ranges + rolling_last_nranges: VecDeque, + /// This tracks the total number of chunks in the last N ranges + rolling_nranges_chunks: usize, + /// Used to provide some hysteresis on the defrag decision + /// chooses between MIN_N_CHUNKS_PER_RANGE + /// or MIN_N_CHUNKS_PER_RANGE * HYSTERESIS_FACTOR (hysteresis factor < 1.0) + defrag_at_low_threshold: bool, +} + +impl DedupFileTrackingInfo { + fn increment_last_range_in_fragmentation_estimate(&mut self, nchunks: usize) { + if let Some(back) = self.rolling_last_nranges.back_mut() { + *back += nchunks; + self.rolling_nranges_chunks += nchunks; + } + } + fn add_range_to_fragmentation_estimate(&mut self, nchunks: usize) { + self.rolling_last_nranges.push_back(nchunks); + self.rolling_nranges_chunks += nchunks; + if self.rolling_last_nranges.len() > NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR { + self.rolling_nranges_chunks -= self.rolling_last_nranges.pop_front().unwrap(); + } + } + /// Returns the average number of chunks per range + /// None if there is is not enough data for an estimate + fn rolling_chunks_per_range(&self) -> Option { + if self.rolling_last_nranges.len() < NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR { + None + } else { + Some(self.rolling_nranges_chunks as f32 / self.rolling_last_nranges.len() as f32) + } + } } #[derive(Debug)] @@ -422,8 +465,41 @@ impl Cleaner { while cur_idx < chunks.len() { let mut n_bytes = 0; + let mut dedupe_query = deduped_blocks[cur_idx].take(); + + // check the fragmentation state and if it is pretty fragmented + // we skip dedupe + let mut forced_nodedupe = false; + if let Some((n_deduped, _)) = dedupe_query { + if let Some(chunks_per_range) = tracking_info.rolling_chunks_per_range() { + let target_cpr = if tracking_info.defrag_at_low_threshold { + (*MIN_N_CHUNKS_PER_RANGE) * MIN_N_CHUNKS_PER_RANGE_HYSTERESIS_FACTOR + } else { + *MIN_N_CHUNKS_PER_RANGE + }; + if chunks_per_range < target_cpr { + // chunks per range is pretty poor, we should not dedupe. + // However, here we do get to look ahead a little bit + // and check the size of the next dedupe window. + // if it is too small, it is not going to improve + // the chunks per range and so we skip it. + if (n_deduped as f32) < chunks_per_range { + dedupe_query = None; + forced_nodedupe = true; + // once I start skipping dedupe, we try to raise + // the cpr to the high threshold + tracking_info.defrag_at_low_threshold = false; + } + } else { + // once I start deduping again, we lower CPR + // to the low threshold so we allow for more small + // fragments. + tracking_info.defrag_at_low_threshold = true; + } + } + } - if let Some((n_deduped, fse)) = deduped_blocks[cur_idx].take() { + if let Some((n_deduped, fse)) = dedupe_query { // We found one or more chunk hashes present in a cas block somewhere. // Update all the metrics. @@ -442,9 +518,12 @@ impl Cleaner { let last_entry = tracking_info.file_info.last_mut().unwrap(); last_entry.unpacked_segment_bytes += n_bytes as u32; last_entry.chunk_index_end = fse.chunk_index_end; + // update the fragmentation estimation window + tracking_info.increment_last_range_in_fragmentation_estimate(n_deduped); } else { // This block is new tracking_info.file_info.push(fse); + tracking_info.add_range_to_fragmentation_estimate(n_deduped); } cur_idx += n_deduped; @@ -456,7 +535,8 @@ impl Cleaner { // This is new data. let add_new_data; - if let Some(idx) = tracking_info.current_cas_block_hashes.get(&chunk.hash) { + if tracking_info.current_cas_block_hashes.contains_key(&chunk.hash) && !forced_nodedupe { + let idx = tracking_info.current_cas_block_hashes.get(&chunk.hash).unwrap(); let idx = *idx; // This chunk will get the CAS hash updated when the local CAS block // is full and registered. @@ -481,6 +561,7 @@ impl Cleaner { last_entry.unpacked_segment_bytes += n_bytes as u32; last_entry.chunk_index_end += 1; add_new_data = true; + tracking_info.increment_last_range_in_fragmentation_estimate(1); } else { // This block is unrelated to the previous one. // This chunk will get the CAS hash updated when the local CAS block @@ -495,6 +576,7 @@ impl Cleaner { chunk_len, chunk_len + 1, )); + tracking_info.add_range_to_fragmentation_estimate(1); add_new_data = true; } diff --git a/data/src/constants.rs b/data/src/constants.rs index f7cb02d..fdf78c5 100644 --- a/data/src/constants.rs +++ b/data/src/constants.rs @@ -33,3 +33,15 @@ pub const CURRENT_VERSION: &str = env!("CARGO_PKG_VERSION"); /// Maximum number of entries in the file construction cache /// which stores File Hash -> reconstruction instructions pub const FILE_RECONSTRUCTION_CACHE_SIZE: usize = 65536; + +/// Number of ranges to use when estimating fragmentation +pub const NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR: usize = 128; + +/// Minimum number of chunks per range. Used to control fragmentation +/// This targets an average of 1MB per range. +/// The hysteresis factor multiplied by the target Chunks Per Range (CPR) controls +/// the low end of the hysteresis range. Basically, dedupe will stop +/// when CPR drops below hysteresis * target_cpr, and will start again when +/// CPR increases above target CPR. +pub const MIN_N_CHUNKS_PER_RANGE_HYSTERESIS_FACTOR: f32 = 0.5; +pub const DEFAULT_MIN_N_CHUNKS_PER_RANGE: f32 = 8.0;