From 0f61841a7d0b1815cf3cebeef82cf82a840abe3d Mon Sep 17 00:00:00 2001 From: Assaf Vayner Date: Wed, 29 Jan 2025 10:24:24 -0800 Subject: [PATCH] chunk cache change hash to checksum (#156) fix XET-281 Calculating the hash of the cache file takes too long of a time relative to the target cache put performance in CPU constrained systems. We've already dropped the verification on gets, now dropping on writes and replacing with a CRC. PS includes some good style updates. --- Cargo.lock | 12 ++- chunk_cache/Cargo.toml | 3 +- chunk_cache/src/disk.rs | 162 ++++++++++------------------- chunk_cache/src/disk/cache_item.rs | 31 ++---- chunk_cache_bench/Cargo.lock | 15 ++- hf_xet/Cargo.lock | 13 ++- 6 files changed, 106 insertions(+), 130 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 87a85adf..d549d888 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -593,11 +593,12 @@ name = "chunk_cache" version = "0.1.0" dependencies = [ "base64 0.22.1", - "blake3", "cas_types", "clap 4.5.20", + "crc32fast", "error_printer", "file_utils", + "log", "merklehash", "mockall", "once_cell", @@ -755,6 +756,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" +dependencies = [ + "cfg-if 1.0.0", +] + [[package]] name = "criterion" version = "0.3.6" diff --git a/chunk_cache/Cargo.toml b/chunk_cache/Cargo.toml index 9a95233a..d909cbbd 100644 --- a/chunk_cache/Cargo.toml +++ b/chunk_cache/Cargo.toml @@ -10,13 +10,14 @@ thiserror = "2.0" error_printer = { path = "../error_printer" } file_utils = { path = "../file_utils" } utils = { path = "../utils" } -blake3 = "1.5.4" base64 = "0.22.1" tracing = "0.1.40" rand = "0.8.5" mockall = "0.13.0" clap = { version = "4.5.20", optional = true, features = ["derive"] } once_cell = "1.20.2" +crc32fast = "1.4.2" +log = "0.4.22" [dev-dependencies] tokio = { version = "1.36", features = ["full"] } diff --git a/chunk_cache/src/disk.rs b/chunk_cache/src/disk.rs index b63543c3..fe6ab829 100644 --- a/chunk_cache/src/disk.rs +++ b/chunk_cache/src/disk.rs @@ -80,10 +80,11 @@ impl DiskCache { println!(); for item in items.iter() { println!( - "\titem: chunk range [{}-{}) ; len({})", + "\titem: chunk range [{}-{}) ; len({}); checksum({})", item.range.start, item.range.end, - output_bytes(item.len as usize) + output_bytes(item.len as usize), + item.checksum, ); } } @@ -146,22 +147,15 @@ impl DiskCache { let mut num_items = 0; let max_num_bytes = 2 * capacity; - let readdir = match read_dir(cache_root) { - Ok(Some(rd)) => rd, - Ok(None) => return Ok(CacheState::new(state, 0, 0)), - Err(e) => return Err(e), + let Some(cache_root_readdir) = read_dir(cache_root)? else { + return Ok(CacheState::new(state, 0, 0)); }; // loop through cache root directory, first level containing "prefix" directories // each of which may contain key directories with cache items - for key_prefix_dir in readdir { - // this match pattern appears often in this function, and we could write a macro to replace it - // however this puts an implicit change of control flow with continue/return cases that is - // hard to decipher from a macro, so avoid replace it for readability - let key_prefix_dir = match is_ok_dir(key_prefix_dir) { - Ok(Some(dirent)) => dirent, - Ok(None) => continue, - Err(e) => return Err(e), + for key_prefix_dir in cache_root_readdir { + let Some(key_prefix_dir) = is_ok_dir(key_prefix_dir)? else { + continue; }; let key_prefix_dir_name = key_prefix_dir.file_name(); @@ -170,14 +164,12 @@ impl DiskCache { continue; } - let key_prefix_readir = match read_dir(key_prefix_dir.path()) { - Ok(Some(rd)) => rd, - Ok(None) => continue, - Err(e) => return Err(e), + let Some(key_prefix_readdir) = read_dir(key_prefix_dir.path())? else { + continue; }; // loop throught key directories inside prefix directory - for key_dir in key_prefix_readir { + for key_dir in key_prefix_readdir { let key_dir = match is_ok_dir(key_dir) { Ok(Some(dirent)) => dirent, Ok(None) => continue, @@ -243,31 +235,12 @@ impl DiskCache { } loop { - let cache_item = if let Some(item) = self.find_match(key, range)? { - item - } else { + let Some(cache_item) = self.find_match(key, range)? else { return Ok(None); }; let path = self.item_path(key, &cache_item)?; - // OLD, needed for hash validation, read file to buffer to do validation - // let mut file_buf = { - // let file = match File::open(&path) { - // Ok(file) => file, - // Err(e) => match e.kind() { - // ErrorKind::NotFound => { - // self.remove_item(key, &cache_item)?; - // continue; - // }, - // _ => return Err(e.into()), - // }, - // }; - // // let mut buf = Vec::with_capacity(file.metadata()?.len() as usize); - // // file.read_to_end(&mut buf)?; - // // Cursor::new(buf) - // }; - let mut file_buf = match File::open(&path) { Ok(file) => file, Err(e) => match e.kind() { @@ -279,23 +252,13 @@ impl DiskCache { }, }; - // TODO: reintroduce hash validation of cache file, but not for every get, memoize success status per cache - // file let hash = compute_hash_from_reader(&mut file_buf)?; - // if hash != cache_item.hash { - // debug!("file hash mismatch on path: {path:?}, key: {key}, item: {cache_item}"); - // if to_print { - // info!("removed cache item 0"); - // } - // self.remove_item(key, &cache_item)?; - // continue; - // } + // TODO: reintroduce checksum validation of cache file, but not for every get, memoize success status per + // cache item file_buf.seek(SeekFrom::Start(0))?; - let header_result = CacheFileHeader::deserialize(&mut file_buf) - .debug_error(format!("failed to deserialize cache file header on path: {path:?}")); - let header = if let Ok(header) = header_result { - header - } else { + let Ok(header) = CacheFileHeader::deserialize(&mut file_buf) + .debug_error(format!("failed to deserialize cache file header on path: {path:?}")) + else { self.remove_item(key, &cache_item)?; continue; }; @@ -308,9 +271,7 @@ impl DiskCache { fn find_match(&self, key: &Key, range: &ChunkRange) -> OptionResult { let state = self.state.lock()?; - let items = if let Some(items) = state.inner.get(key) { - items - } else { + let Some(items) = state.inner.get(key) else { return Ok(None); }; @@ -331,13 +292,13 @@ impl DiskCache { data: &[u8], ) -> Result<(), ChunkCacheError> { if range.start >= range.end - || chunk_byte_indices.len() != (range.end - range.start + 1) as usize - // chunk_byte_indices is guarenteed to be more than 1 element at this point - || chunk_byte_indices[0] != 0 - || *chunk_byte_indices.last().unwrap() as usize != data.len() - || !strictly_increasing(chunk_byte_indices) - // assert 1 new range doesn't take up more than 10% of capacity - || data.len() > (self.capacity as usize / 10) + || chunk_byte_indices.len() != (range.end - range.start + 1) as usize + // chunk_byte_indices is guarenteed to be more than 1 element at this point + || chunk_byte_indices[0] != 0 + || *chunk_byte_indices.last().unwrap() as usize != data.len() + || !strictly_increasing(chunk_byte_indices) + // assert 1 new range doesn't take up more than 10% of capacity + || data.len() > (self.capacity as usize / 10) { return Err(ChunkCacheError::InvalidArguments); } @@ -352,21 +313,27 @@ impl DiskCache { let header = CacheFileHeader::new(chunk_byte_indices); let mut header_buf = Vec::with_capacity(header.header_len()); header.serialize(&mut header_buf)?; - let hash = compute_hash(&header_buf, data); + let checksum = { + let mut hasher = crc32fast::Hasher::new(); + hasher.update(&header_buf); + hasher.update(data); + hasher.finalize() + }; let cache_item = CacheItem { range: range.clone(), len: (header_buf.len() + data.len()) as u64, - hash, + checksum, }; - let path = self.item_path(key, &cache_item)?; - - let mut fw = SafeFileCreator::new(path)?; - - fw.write_all(&header_buf)?; - fw.write_all(data)?; - fw.close()?; + { + // write cache item file + let path = self.item_path(key, &cache_item)?; + let mut fw = SafeFileCreator::new(path)?; + fw.write_all(&header_buf)?; + fw.write_all(data)?; + fw.close()?; + } // evict items after ensuring the file write but before committing to cache state // to avoid removing new item. @@ -387,7 +354,7 @@ impl DiskCache { let mut overlapping_item_paths = HashSet::new(); let mut total_bytes_rm = 0; let num_items_rm = to_remove.len(); - // removing by index in reverse to guarentee lower-index items aren't shifted/moved + // removing by index in reverse to guarantee lower-index items aren't shifted/moved for item_idx in to_remove.into_iter().rev() { let item = items.swap_remove(item_idx); overlapping_item_paths.insert(self.item_path(key, &item)?); @@ -439,31 +406,25 @@ impl DiskCache { // validate stored data let path = self.item_path(key, cache_item)?; - let mut r = { - let mut file = if let Ok(file) = File::open(path) { - file - } else { - self.remove_item(key, cache_item)?; - return Ok(false); - }; - let md = file.metadata()?; - if md.len() != cache_item.len { - self.remove_item(key, cache_item)?; - return Ok(false); - } - let mut buf = Vec::with_capacity(md.len() as usize); - file.read_to_end(&mut buf)?; - Cursor::new(buf) + + let Ok(mut file) = File::open(path) else { + self.remove_item(key, cache_item)?; + return Ok(false); }; - let hash = blake3::Hasher::new().update_reader(&mut r)?.finalize(); - if hash != cache_item.hash { + let md = file.metadata()?; + if md.len() != cache_item.len { self.remove_item(key, cache_item)?; return Ok(false); } - r.seek(SeekFrom::Start(0))?; - let header = if let Ok(header) = CacheFileHeader::deserialize(&mut r) { - header - } else { + let mut buf = Vec::with_capacity(md.len() as usize); + file.read_to_end(&mut buf)?; + let checksum = crc32fast::hash(&buf); + if checksum != cache_item.checksum { + self.remove_item(key, cache_item)?; + return Ok(false); + } + let mut reader = Cursor::new(buf); + let Ok(header) = CacheFileHeader::deserialize(&mut reader) else { self.remove_item(key, cache_item)?; return Ok(false); }; @@ -489,7 +450,7 @@ impl DiskCache { } } - let stored_data = get_range_from_cache_file(&header, &mut r, range, cache_item.range.start)?; + let stored_data = get_range_from_cache_file(&header, &mut reader, range, cache_item.range.start)?; if data != stored_data { return Err(ChunkCacheError::InvalidArguments); } @@ -617,15 +578,6 @@ fn get_range_from_cache_file( Ok(buf) } -fn compute_hash(header: &[u8], data: &[u8]) -> blake3::Hash { - blake3::Hasher::new().update(header).update(data).finalize() -} - -// OLD -// fn compute_hash_from_reader(r: &mut impl Read) -> Result { -// Ok(blake3::Hasher::new().update_reader(r)?.finalize()) -// } - // wrapper over std::fs::read_dir // returns Ok(None) on a not found error fn read_dir(path: impl AsRef) -> OptionResult { diff --git a/chunk_cache/src/disk/cache_item.rs b/chunk_cache/src/disk/cache_item.rs index b3aa10bd..a126de11 100644 --- a/chunk_cache/src/disk/cache_item.rs +++ b/chunk_cache/src/disk/cache_item.rs @@ -1,16 +1,16 @@ use std::cmp::Ordering; -use std::io::{Cursor, Read, Write}; +use std::io::Cursor; use std::mem::size_of; use base64::Engine; -use blake3::Hash; use cas_types::ChunkRange; use utils::serialization_utils::{read_u32, read_u64, write_u32, write_u64}; use super::BASE64_ENGINE; use crate::error::ChunkCacheError; -const CACHE_ITEM_FILE_NAME_BUF_SIZE: usize = size_of::() * 2 + size_of::() + blake3::OUT_LEN; +// range start, range end, length, and checksum +const CACHE_ITEM_FILE_NAME_BUF_SIZE: usize = size_of::() * 2 + size_of::() + size_of::(); /// A CacheItem represents metadata for a single range in the cache /// it contains the range of chunks the item is for @@ -20,12 +20,12 @@ const CACHE_ITEM_FILE_NAME_BUF_SIZE: usize = size_of::() * 2 + size_of::) -> std::fmt::Result { - write!(f, "CacheItem {{ range: {:?}, len: {}, hash: {} }}", self.range, self.len, self.hash,) + write!(f, "CacheItem {{ range: {:?}, len: {}, checksum: {} }}", self.range, self.len, self.checksum,) } } @@ -53,7 +53,7 @@ impl CacheItem { write_u32(&mut w, self.range.start)?; write_u32(&mut w, self.range.end)?; write_u64(&mut w, self.len)?; - write_hash(&mut w, &self.hash)?; + write_u32(&mut w, self.checksum)?; Ok(BASE64_ENGINE.encode(buf)) } @@ -66,7 +66,7 @@ impl CacheItem { let start = read_u32(&mut r)?; let end = read_u32(&mut r)?; let len = read_u64(&mut r)?; - let hash = read_hash(&mut r)?; + let checksum = read_u32(&mut r)?; if start >= end { return Err(ChunkCacheError::BadRange); } @@ -74,25 +74,14 @@ impl CacheItem { Ok(Self { range: ChunkRange { start, end }, len, - hash, + checksum, }) } } -pub fn write_hash(writer: &mut impl Write, hash: &blake3::Hash) -> Result<(), std::io::Error> { - writer.write_all(hash.as_bytes()) -} - -pub fn read_hash(reader: &mut impl Read) -> Result { - let mut m = [0u8; 32]; - reader.read_exact(&mut m)?; - Ok(blake3::Hash::from_bytes(m)) -} - #[cfg(test)] mod tests { use base64::Engine; - use blake3::OUT_LEN; use cas_types::ChunkRange; use crate::disk::cache_item::CACHE_ITEM_FILE_NAME_BUF_SIZE; @@ -103,7 +92,7 @@ mod tests { Self { range: Default::default(), len: Default::default(), - hash: blake3::Hash::from_bytes([0u8; OUT_LEN]), + checksum: Default::default(), } } } @@ -113,7 +102,7 @@ mod tests { let cache_item = CacheItem { range: ChunkRange { start: 0, end: 1024 }, len: 16 << 20, - hash: blake3::hash(&(1..100).collect::>()), + checksum: 10000, }; let file_name = cache_item.file_name().unwrap(); diff --git a/chunk_cache_bench/Cargo.lock b/chunk_cache_bench/Cargo.lock index bced245c..3852c254 100644 --- a/chunk_cache_bench/Cargo.lock +++ b/chunk_cache_bench/Cargo.lock @@ -374,10 +374,11 @@ name = "chunk_cache" version = "0.1.0" dependencies = [ "base64 0.22.1", - "blake3", "cas_types", + "crc32fast", "error_printer", "file_utils", + "log", "merklehash", "mockall", "once_cell", @@ -1756,6 +1757,7 @@ dependencies = [ name = "merklehash" version = "0.14.5" dependencies = [ + "base64 0.22.1", "blake3", "heed", "rand 0.8.5", @@ -3850,6 +3852,7 @@ dependencies = [ "thiserror 2.0.11", "tokio", "tracing", + "xet_threadpool", ] [[package]] @@ -4303,6 +4306,16 @@ dependencies = [ "rustix", ] +[[package]] +name = "xet_threadpool" +version = "0.1.0" +dependencies = [ + "lazy_static", + "thiserror 2.0.11", + "tokio", + "tracing", +] + [[package]] name = "yoke" version = "0.7.4" diff --git a/hf_xet/Cargo.lock b/hf_xet/Cargo.lock index c6f8499e..3d0afd7a 100644 --- a/hf_xet/Cargo.lock +++ b/hf_xet/Cargo.lock @@ -346,10 +346,11 @@ name = "chunk_cache" version = "0.1.0" dependencies = [ "base64 0.22.1", - "blake3", "cas_types", + "crc32fast", "error_printer", "file_utils", + "log", "merklehash", "mockall", "once_cell", @@ -448,6 +449,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" +dependencies = [ + "cfg-if 1.0.0", +] + [[package]] name = "crossbeam-deque" version = "0.8.5" @@ -1701,6 +1711,7 @@ dependencies = [ name = "merklehash" version = "0.14.5" dependencies = [ + "base64 0.22.1", "blake3", "heed", "rand 0.8.5",