diff --git a/icechunk/src/asset_manager.rs b/icechunk/src/asset_manager.rs index 30db9fa3..fdf6e4f3 100644 --- a/icechunk/src/asset_manager.rs +++ b/icechunk/src/asset_manager.rs @@ -680,7 +680,7 @@ mod test { use super::*; use crate::{ format::{ - manifest::{ChunkInfo, ChunkPayload}, + manifest::{ChunkInfo, ChunkPayload, ManifestShards}, ChunkIndices, NodeId, }, storage::{logging::LoggingStorage, new_in_memory_storage, Storage}, @@ -692,6 +692,7 @@ mod test { let settings = storage::Settings::default(); let manager = AssetManager::new_no_cache(backend.clone(), settings.clone(), 1); + let shards = ManifestShards::default(1); let node1 = NodeId::random(); let node2 = NodeId::random(); let ci1 = ChunkInfo { @@ -705,7 +706,7 @@ mod test { payload: ChunkPayload::Inline(Bytes::copy_from_slice(b"b")), }; let pre_existing_manifest = - Manifest::from_iter(vec![ci1].into_iter()).await?.unwrap(); + Manifest::from_iter(vec![ci1].into_iter(), &shards).await?.unwrap(); let pre_existing_manifest = Arc::new(pre_existing_manifest); let pre_existing_id = pre_existing_manifest.id(); let pre_size = manager.write_manifest(Arc::clone(&pre_existing_manifest)).await?; @@ -720,7 +721,7 @@ mod test { ); let manifest = - Arc::new(Manifest::from_iter(vec![ci2.clone()].into_iter()).await?.unwrap()); + Arc::new(Manifest::from_iter(vec![ci2.clone()].into_iter(), &shards).await?.unwrap()); let id = manifest.id(); let size = caching.write_manifest(Arc::clone(&manifest)).await?; @@ -765,6 +766,7 @@ mod test { let settings = storage::Settings::default(); let manager = AssetManager::new_no_cache(backend.clone(), settings.clone(), 1); + let shards = ManifestShards::default(1); let ci1 = ChunkInfo { node: NodeId::random(), coord: ChunkIndices(vec![]), @@ -780,15 +782,15 @@ mod test { let ci9 = ChunkInfo { node: NodeId::random(), ..ci1.clone() }; let manifest1 = - Arc::new(Manifest::from_iter(vec![ci1, ci2, ci3]).await?.unwrap()); + Arc::new(Manifest::from_iter(vec![ci1, ci2, ci3], &shards).await?.unwrap()); let id1 = manifest1.id(); let size1 = manager.write_manifest(Arc::clone(&manifest1)).await?; let manifest2 = - Arc::new(Manifest::from_iter(vec![ci4, ci5, ci6]).await?.unwrap()); + Arc::new(Manifest::from_iter(vec![ci4, ci5, ci6], &shards).await?.unwrap()); let id2 = manifest2.id(); let size2 = manager.write_manifest(Arc::clone(&manifest2)).await?; let manifest3 = - Arc::new(Manifest::from_iter(vec![ci7, ci8, ci9]).await?.unwrap()); + Arc::new(Manifest::from_iter(vec![ci7, ci8, ci9], &shards).await?.unwrap()); let id3 = manifest3.id(); let size3 = manager.write_manifest(Arc::clone(&manifest3)).await?; @@ -829,12 +831,13 @@ mod test { let manager = Arc::new(AssetManager::new_no_cache(storage.clone(), settings.clone(), 1)); + let shards = ManifestShards::default(2); // some reasonable size so it takes some time to parse let manifest = Manifest::from_iter((0..5_000).map(|_| ChunkInfo { node: NodeId::random(), coord: ChunkIndices(Vec::from([rand::random(), rand::random()])), payload: ChunkPayload::Inline("hello".into()), - })) + }), &shards) .await .unwrap() .unwrap(); diff --git a/icechunk/src/format/manifest.rs b/icechunk/src/format/manifest.rs index bcf665f1..c10a3314 100644 --- a/icechunk/src/format/manifest.rs +++ b/icechunk/src/format/manifest.rs @@ -1,10 +1,12 @@ -use std::{borrow::Cow, convert::Infallible, ops::Range, sync::Arc}; +use std::{ + borrow::Cow, collections::HashMap, convert::Infallible, ops::Range, sync::Arc, +}; use crate::format::flatbuffers::gen; use bytes::Bytes; use flatbuffers::VerifierOptions; use futures::{Stream, TryStreamExt}; -use itertools::Itertools; +use itertools::{multiunzip, repeat_n, Itertools}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -21,12 +23,6 @@ use super::{ #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ManifestExtents(Vec>); -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct ManifestRef { - pub object_id: ManifestId, - pub extents: ManifestExtents, -} - impl ManifestExtents { pub fn new(from: &[u32], to: &[u32]) -> Self { let v = from @@ -37,9 +33,77 @@ impl ManifestExtents { Self(v) } + pub fn contains(&self, coord: &[u32]) -> bool { + self.iter().zip(coord.iter()).all(|(range, that)| range.contains(that)) + } + pub fn iter(&self) -> impl Iterator> { self.0.iter() } + + pub fn len(&self) -> usize { + self.0.len() + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ManifestRef { + pub object_id: ManifestId, + pub extents: ManifestExtents, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ManifestShards(Vec); + +impl ManifestShards { + pub fn default(ndim: usize) -> Self { + Self(vec![ManifestExtents(repeat_n(0..u32::MAX, ndim).collect())]) + } + + pub fn from_edges(iter: impl IntoIterator>) -> Self { + let res = iter + .into_iter() + .map(|x| x.into_iter().tuple_windows()) + .multi_cartesian_product() + .map(|x| multiunzip(x)) + .map(|(from, to): (Vec, Vec)| { + ManifestExtents::new(from.as_slice(), to.as_slice()) + }); + Self(res.collect()) + } + + // Returns the index of shard_range that includes ChunkIndices + // This can be used at write time to split manifests based on the config + // and at read time to choose which manifest to query for chunk payload + pub fn which(&self, coord: &ChunkIndices) -> Result { + // shard_range[i] must bound ChunkIndices + // 0 <= return value <= shard_range.len() + // it is possible that shard_range does not include a coord. say we have 2x2 shard grid + // but only shard (0,0) and shard (1,1) are populated with data. + // A coord located in (1, 0) should return Err + // Since shard_range need not form a regular grid, we must iterate through and find the first result. + // ManifestExtents in shard_range MUST NOT overlap with each other. How do we ensure this? + // ndim must be the same + // debug_assert_eq!(coord.0.len(), shard_range[0].len()); + // FIXME: could optimize for unbounded single manifest + self.iter() + .enumerate() + .find(|(_, e)| e.contains(coord.0.as_slice())) + .map(|(i, _)| i as usize) + .ok_or(IcechunkFormatError::from( + IcechunkFormatErrorKind::InvalidIndexForSharding { + coords: coord.clone(), + }, + )) + } + + pub fn iter(&self) -> impl Iterator { + self.0.iter() + } + + pub fn len(&self) -> usize { + self.0.len() + } } #[derive(Debug, Error)] @@ -177,6 +241,7 @@ impl Manifest { pub async fn from_stream( stream: impl Stream>, + shards: &ManifestShards, ) -> Result, E> { // TODO: what's a good capacity? let mut builder = flatbuffers::FlatBufferBuilder::with_capacity(1024 * 1024); @@ -186,28 +251,42 @@ impl Manifest { let mut all = all.iter().peekable(); - let mut array_manifests = Vec::with_capacity(1); + let mut array_manifests = Vec::with_capacity(shards.len()); + // why peek? + // this seems to handle multiple nodes, but we only send one in? while let Some(current_node) = all.peek().map(|chunk| &chunk.node).cloned() { + let mut sharded_refs: HashMap> = HashMap::new(); + sharded_refs.reserve(shards.len()); // TODO: what is a good capacity - let mut refs = Vec::with_capacity(8_192); + let ref_capacity = 8_192; + sharded_refs.insert(0, Vec::with_capacity(ref_capacity)); while let Some(chunk) = all.next_if(|chunk| chunk.node == current_node) { - refs.push(mk_chunk_ref(&mut builder, chunk)); + let shard_index = shards.which(&chunk.coord).unwrap(); + sharded_refs + .entry(shard_index) + .or_insert_with(|| Vec::with_capacity(ref_capacity)) + .push(mk_chunk_ref(&mut builder, chunk)); } let node_id = Some(gen::ObjectId8::new(¤t_node.0)); - let refs = Some(builder.create_vector(refs.as_slice())); - let array_manifest = gen::ArrayManifest::create( - &mut builder, - &gen::ArrayManifestArgs { node_id: node_id.as_ref(), refs }, - ); - array_manifests.push(array_manifest); + for refs in sharded_refs.values() { + // FIXME: skip empty refs? + let refs = Some(builder.create_vector(refs.as_slice())); + let array_manifest = gen::ArrayManifest::create( + &mut builder, + &gen::ArrayManifestArgs { node_id: node_id.as_ref(), refs }, + ); + array_manifests.push(array_manifest); + } } if array_manifests.is_empty() { - // empty manifet + // empty manifest return Ok(None); } + // looks we now + let arrays = builder.create_vector(array_manifests.as_slice()); let manifest_id = ManifestId::random(); let bin_manifest_id = gen::ObjectId12::new(&manifest_id.0); @@ -227,8 +306,9 @@ impl Manifest { /// Used for tests pub async fn from_iter>( iter: T, + shards: &ManifestShards, ) -> Result, Infallible> { - Self::from_stream(futures::stream::iter(iter.into_iter().map(Ok))).await + Self::from_stream(futures::stream::iter(iter.into_iter().map(Ok)), shards).await } pub fn len(&self) -> usize { diff --git a/icechunk/src/format/mod.rs b/icechunk/src/format/mod.rs index dcb2c877..f02b9cb3 100644 --- a/icechunk/src/format/mod.rs +++ b/icechunk/src/format/mod.rs @@ -245,6 +245,8 @@ pub enum IcechunkFormatErrorKind { NodeNotFound { path: Path }, #[error("chunk coordinates not found `{coords:?}`")] ChunkCoordinatesNotFound { coords: ChunkIndices }, + #[error("invalid chunk index for sharding manifests: {coords:?}")] + InvalidIndexForSharding { coords: ChunkIndices }, #[error("manifest information cannot be found in snapshot `{manifest_id}`")] ManifestInfoNotFound { manifest_id: ManifestId }, #[error("invalid magic numbers in file")] diff --git a/icechunk/src/format/snapshot.rs b/icechunk/src/format/snapshot.rs index c6ebcbfb..4e90e8f1 100644 --- a/icechunk/src/format/snapshot.rs +++ b/icechunk/src/format/snapshot.rs @@ -37,6 +37,9 @@ impl DimensionShape { pub struct ArrayShape(Vec); impl ArrayShape { + pub fn len(&self) -> usize { + self.0.len() + } pub fn new(it: I) -> Option where I: IntoIterator, diff --git a/icechunk/src/session.rs b/icechunk/src/session.rs index 93988195..20008c24 100644 --- a/icechunk/src/session.rs +++ b/icechunk/src/session.rs @@ -27,7 +27,7 @@ use crate::{ format::{ manifest::{ ChunkInfo, ChunkPayload, ChunkRef, Manifest, ManifestExtents, ManifestRef, - VirtualChunkLocation, VirtualChunkRef, VirtualReferenceError, + ManifestShards, VirtualChunkLocation, VirtualChunkRef, VirtualReferenceError, VirtualReferenceErrorKind, }, snapshot::{ @@ -1376,14 +1376,40 @@ impl<'a> FlushProcess<'a> { ) -> SessionResult<()> { let mut from = vec![]; let mut to = vec![]; + + // iterator of chunks for a single node let chunks = stream::iter( self.change_set .new_array_chunk_iterator(node_id, node_path) .map(Ok::), ); + + let shards = ManifestShards::from_edges(vec![vec![0, 10, 20], vec![0, 10, 20]]); + + // for it in chunks {} + + // if manifest-split-size is set then we know extents, this is then duplicated between snapshot and config + // 1. Should we record the "real" extent or the theoretical-max extent (as set in the config). + // 2. + // if not, we aggregate_extents + // this is a choice, we could just always aggregate extents let chunks = aggregate_extents(&mut from, &mut to, chunks, |ci| &ci.coord); - if let Some(new_manifest) = Manifest::from_stream(chunks).await.unwrap() { + // fn grouped_from_stream( + // stream: impl Stream>, + // extents: &Vec, + // ) -> Result, E> { + // // could use into_grouping_map_by with a key function bin_to_shard + // // that bins the chunk coordinates into (manifest) shards + // let shards: HashMap = HashMap::new(); + // todo!(); + // // into_values does not preserve order! + // // but I think this is OK since Manifest tracks extents + // Vec::from_iter(shards.into_values().map(Manifest::new)) + // } + // loop over streams and associated extents + if let Some(new_manifest) = Manifest::from_stream(chunks, &shards).await.unwrap() + { let new_manifest = Arc::new(new_manifest); let new_manifest_size = self.asset_manager.write_manifest(Arc::clone(&new_manifest)).await?; @@ -1425,7 +1451,17 @@ impl<'a> FlushProcess<'a> { let updated_chunks = aggregate_extents(&mut from, &mut to, updated_chunks, |ci| &ci.coord); - if let Some(new_manifest) = Manifest::from_stream(updated_chunks).await? { + // FIXME + let shards = { + if let NodeData::Array { shape, .. } = node.node_data.clone() { + ManifestShards::default(shape.len()) + } else { + todo!() + } + }; + + if let Some(new_manifest) = Manifest::from_stream(updated_chunks, &shards).await? + { let new_manifest = Arc::new(new_manifest); let new_manifest_size = self.asset_manager.write_manifest(Arc::clone(&new_manifest)).await?; @@ -1742,7 +1778,7 @@ mod tests { basic_solver::{BasicConflictSolver, VersionSelection}, detector::ConflictDetector, }, - format::manifest::ManifestExtents, + format::manifest::{ManifestExtents, ManifestShards}, refs::{fetch_tag, Ref}, repository::VersionInfo, storage::new_in_memory_storage, @@ -1938,6 +1974,24 @@ mod tests { prop_assert_eq!(to, expected_to); } + #[tokio::test] + async fn test_which_shard() -> Result<(), Box> { + let shards = ManifestShards::from_edges(vec![vec![0, 10, 20]]); + + assert_eq!(shards.which(&ChunkIndices(vec![1])).unwrap(), 0); + assert_eq!(shards.which(&ChunkIndices(vec![11])).unwrap(), 1); + + let edges = vec![vec![0, 10, 20], vec![0, 10, 20]]; + + let shards = ManifestShards::from_edges(edges); + assert_eq!(shards.which(&ChunkIndices(vec![1, 1])).unwrap(), 0); + assert_eq!(shards.which(&ChunkIndices(vec![1, 10])).unwrap(), 1); + assert_eq!(shards.which(&ChunkIndices(vec![1, 11])).unwrap(), 1); + assert!(shards.which(&ChunkIndices(vec![21, 21])).is_err()); + + Ok(()) + } + #[tokio::test(flavor = "multi_thread")] async fn test_repository_with_updates() -> Result<(), Box> { let storage: Arc = new_in_memory_storage().await?; @@ -1946,6 +2000,7 @@ mod tests { AssetManager::new_no_cache(Arc::clone(&storage), storage_settings.clone(), 1); let array_id = NodeId::random(); + let shards = ManifestShards::default(2); let chunk1 = ChunkInfo { node: array_id.clone(), coord: ChunkIndices(vec![0, 0, 0]), @@ -1963,7 +2018,7 @@ mod tests { }; let manifest = - Manifest::from_iter(vec![chunk1.clone(), chunk2.clone()]).await?.unwrap(); + Manifest::from_iter(vec![chunk1.clone(), chunk2.clone()], &shards).await?.unwrap(); let manifest = Arc::new(manifest); let manifest_id = manifest.id(); let manifest_size = asset_manager.write_manifest(Arc::clone(&manifest)).await?;