From 5cf29c1be2a3092fb81e9c4d87880b20228797a9 Mon Sep 17 00:00:00 2001 From: Yucheng Low <2740522+ylow@users.noreply.github.com> Date: Thu, 30 Jan 2025 13:40:07 -0800 Subject: [PATCH] Fragmentation Prevention (#147) For https://linear.app/xet/issue/XET-246/fragmentation-prevention We use average chunks / range as a fragmentation estimator, targetting an average of 16 chunks per range which roughly equates to 1MB per range. This is computed over the last window of 32 ranges. If the average drops below the target, dedupe is disabled until the average is above the target again. Running on first 1GB of a *highly* fragmented file (comprising of a few hundred KB of an existing file, followed by a hundred KB of zeros, repeat) we see the following: - Baseline: 1000000001 bytes -> 726845953 bytes, 2975 ranges, 336134 average bytes per range - 512KB target (anti-fragmentation goal of 8 chunk per range): 1000000001 bytes -> 873515521 bytes, 1465 ranges, 682594 average bytes per range - 1MB target (anti-fragmentation goal of 16 chunks per range): 1000000001 bytes -> 932235777 bytes, 829 ranges, 1206273 average bytes per range This also includes a hysteresis implementation: - 512KB target (anti-fragmentation goal of 8 chunk per range): 1000000001 bytes -> 873515521 bytes, 1657 ranges, 603500 average bytes per range. The hysteresis turned out to be pretty important for deduping a content defined chunked variant of Parquet: Without hysteresis (only concern is how v2 dedupes against v1): ``` parquet file v1: 5728317968 bytes -> 5728137283 bytes parquet file v2: 5726717793 bytes -> 4544391399 bytes (11.14 chunks per range) ``` With hysteresis ``` parquet file v1: 5728317968 bytes -> 5728137283 bytes parquet file v2: 5726717793 bytes -> 3568275084 bytes (8.11 chunks per range) ``` So with the hysteresis implementation we are closer to the target chunk per range and we are able to still dedupe pretty well. As comparison, *without* any fragmentation prevention: ``` parquet file v1: 5728317968 bytes -> 5728137283 bytes parquet file v2: 5726717793 bytes -> 3402767500 bytes (6.89 chunks per segment) ``` --- data/src/clean.rs | 90 +++++++++++++++++++++++++++++++++++++++++-- data/src/constants.rs | 12 ++++++ 2 files changed, 98 insertions(+), 4 deletions(-) diff --git a/data/src/clean.rs b/data/src/clean.rs index 4ea8c582..d1d870df 100644 --- a/data/src/clean.rs +++ b/data/src/clean.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::mem::take; use std::ops::DerefMut; use std::path::{Path, PathBuf}; @@ -26,7 +26,10 @@ use utils::progress::ProgressUpdater; use xet_threadpool::ThreadPool; use crate::chunking::{chunk_target_default, ChunkYieldType}; -use crate::constants::MIN_SPACING_BETWEEN_GLOBAL_DEDUP_QUERIES; +use crate::constants::{ + DEFAULT_MIN_N_CHUNKS_PER_RANGE, MIN_N_CHUNKS_PER_RANGE_HYSTERESIS_FACTOR, MIN_SPACING_BETWEEN_GLOBAL_DEDUP_QUERIES, + NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR, +}; use crate::data_processing::CASDataAggregator; use crate::errors::DataProcessingError::*; use crate::errors::Result; @@ -44,6 +47,13 @@ lazy_static! { .unwrap_or(1); } +lazy_static! { + pub static ref MIN_N_CHUNKS_PER_RANGE: f32 = std::env::var("XET_MIN_N_CHUNKS_PER_RANGE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_MIN_N_CHUNKS_PER_RANGE); +} + pub enum BufferItem { Value(T), Completed, @@ -56,6 +66,39 @@ struct DedupFileTrackingInfo { current_cas_file_info_indices: Vec, current_cas_block_hashes: HashMap, cas_data: CASDataAggregator, + /// This tracks the number of chunks in each of the last N ranges + rolling_last_nranges: VecDeque, + /// This tracks the total number of chunks in the last N ranges + rolling_nranges_chunks: usize, + /// Used to provide some hysteresis on the defrag decision + /// chooses between MIN_N_CHUNKS_PER_RANGE + /// or MIN_N_CHUNKS_PER_RANGE * HYSTERESIS_FACTOR (hysteresis factor < 1.0) + defrag_at_low_threshold: bool, +} + +impl DedupFileTrackingInfo { + fn increment_last_range_in_fragmentation_estimate(&mut self, nchunks: usize) { + if let Some(back) = self.rolling_last_nranges.back_mut() { + *back += nchunks; + self.rolling_nranges_chunks += nchunks; + } + } + fn add_range_to_fragmentation_estimate(&mut self, nchunks: usize) { + self.rolling_last_nranges.push_back(nchunks); + self.rolling_nranges_chunks += nchunks; + if self.rolling_last_nranges.len() > NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR { + self.rolling_nranges_chunks -= self.rolling_last_nranges.pop_front().unwrap(); + } + } + /// Returns the average number of chunks per range + /// None if there is is not enough data for an estimate + fn rolling_chunks_per_range(&self) -> Option { + if self.rolling_last_nranges.len() < NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR { + None + } else { + Some(self.rolling_nranges_chunks as f32 / self.rolling_last_nranges.len() as f32) + } + } } #[derive(Debug)] @@ -422,8 +465,41 @@ impl Cleaner { while cur_idx < chunks.len() { let mut n_bytes = 0; + let mut dedupe_query = deduped_blocks[cur_idx].take(); + + // check the fragmentation state and if it is pretty fragmented + // we skip dedupe + let mut forced_nodedupe = false; + if let Some((n_deduped, _)) = dedupe_query { + if let Some(chunks_per_range) = tracking_info.rolling_chunks_per_range() { + let target_cpr = if tracking_info.defrag_at_low_threshold { + (*MIN_N_CHUNKS_PER_RANGE) * MIN_N_CHUNKS_PER_RANGE_HYSTERESIS_FACTOR + } else { + *MIN_N_CHUNKS_PER_RANGE + }; + if chunks_per_range < target_cpr { + // chunks per range is pretty poor, we should not dedupe. + // However, here we do get to look ahead a little bit + // and check the size of the next dedupe window. + // if it is too small, it is not going to improve + // the chunks per range and so we skip it. + if (n_deduped as f32) < chunks_per_range { + dedupe_query = None; + forced_nodedupe = true; + // once I start skipping dedupe, we try to raise + // the cpr to the high threshold + tracking_info.defrag_at_low_threshold = false; + } + } else { + // once I start deduping again, we lower CPR + // to the low threshold so we allow for more small + // fragments. + tracking_info.defrag_at_low_threshold = true; + } + } + } - if let Some((n_deduped, fse)) = deduped_blocks[cur_idx].take() { + if let Some((n_deduped, fse)) = dedupe_query { // We found one or more chunk hashes present in a cas block somewhere. // Update all the metrics. @@ -442,9 +518,12 @@ impl Cleaner { let last_entry = tracking_info.file_info.last_mut().unwrap(); last_entry.unpacked_segment_bytes += n_bytes as u32; last_entry.chunk_index_end = fse.chunk_index_end; + // update the fragmentation estimation window + tracking_info.increment_last_range_in_fragmentation_estimate(n_deduped); } else { // This block is new tracking_info.file_info.push(fse); + tracking_info.add_range_to_fragmentation_estimate(n_deduped); } cur_idx += n_deduped; @@ -456,7 +535,8 @@ impl Cleaner { // This is new data. let add_new_data; - if let Some(idx) = tracking_info.current_cas_block_hashes.get(&chunk.hash) { + if tracking_info.current_cas_block_hashes.contains_key(&chunk.hash) && !forced_nodedupe { + let idx = tracking_info.current_cas_block_hashes.get(&chunk.hash).unwrap(); let idx = *idx; // This chunk will get the CAS hash updated when the local CAS block // is full and registered. @@ -481,6 +561,7 @@ impl Cleaner { last_entry.unpacked_segment_bytes += n_bytes as u32; last_entry.chunk_index_end += 1; add_new_data = true; + tracking_info.increment_last_range_in_fragmentation_estimate(1); } else { // This block is unrelated to the previous one. // This chunk will get the CAS hash updated when the local CAS block @@ -495,6 +576,7 @@ impl Cleaner { chunk_len, chunk_len + 1, )); + tracking_info.add_range_to_fragmentation_estimate(1); add_new_data = true; } diff --git a/data/src/constants.rs b/data/src/constants.rs index f7cb02d2..fdf78c5b 100644 --- a/data/src/constants.rs +++ b/data/src/constants.rs @@ -33,3 +33,15 @@ pub const CURRENT_VERSION: &str = env!("CARGO_PKG_VERSION"); /// Maximum number of entries in the file construction cache /// which stores File Hash -> reconstruction instructions pub const FILE_RECONSTRUCTION_CACHE_SIZE: usize = 65536; + +/// Number of ranges to use when estimating fragmentation +pub const NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR: usize = 128; + +/// Minimum number of chunks per range. Used to control fragmentation +/// This targets an average of 1MB per range. +/// The hysteresis factor multiplied by the target Chunks Per Range (CPR) controls +/// the low end of the hysteresis range. Basically, dedupe will stop +/// when CPR drops below hysteresis * target_cpr, and will start again when +/// CPR increases above target CPR. +pub const MIN_N_CHUNKS_PER_RANGE_HYSTERESIS_FACTOR: f32 = 0.5; +pub const DEFAULT_MIN_N_CHUNKS_PER_RANGE: f32 = 8.0;