Skip to content

Commit

Permalink
[WIP] manifest sharding
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherian committed Feb 27, 2025
1 parent 1c60e5b commit e7d9221
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 32 deletions.
78 changes: 70 additions & 8 deletions icechunk/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::format::flatbuffers::gen;
use bytes::Bytes;
use flatbuffers::VerifierOptions;
use futures::{Stream, TryStreamExt};
use itertools::Itertools;
use itertools::{multiunzip, repeat_n, Itertools};
use serde::{Deserialize, Serialize};
use thiserror::Error;

Expand All @@ -21,12 +21,6 @@ use super::{
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ManifestExtents(Vec<Range<u32>>);

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ManifestRef {
pub object_id: ManifestId,
pub extents: ManifestExtents,
}

impl ManifestExtents {
pub fn new(from: &[u32], to: &[u32]) -> Self {
let v = from
Expand All @@ -37,9 +31,77 @@ impl ManifestExtents {
Self(v)
}

pub fn contains(&self, coord: &[u32]) -> bool {
self.iter().zip(coord.iter()).all(|(range, that)| range.contains(that))
}

pub fn iter(&self) -> impl Iterator<Item = &Range<u32>> {
self.0.iter()
}

pub fn len(&self) -> usize {
self.0.len()
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ManifestRef {
pub object_id: ManifestId,
pub extents: ManifestExtents,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ManifestShards(Vec<ManifestExtents>);

impl ManifestShards {
pub fn default(ndim: usize) -> Self {
Self(vec![ManifestExtents(repeat_n(0..u32::MAX, ndim).collect())])
}

pub fn from_edges(iter: impl IntoIterator<Item = Vec<u32>>) -> Self {
let res = iter
.into_iter()
.map(|x| x.into_iter().tuple_windows())
.multi_cartesian_product()
.map(|x| multiunzip(x))
.map(|(from, to): (Vec<u32>, Vec<u32>)| {
ManifestExtents::new(from.as_slice(), to.as_slice())
});
Self(res.collect())
}

// Returns the index of shard_range that includes ChunkIndices
// This can be used at write time to split manifests based on the config
// and at read time to choose which manifest to query for chunk payload
pub fn which(&self, coord: &ChunkIndices) -> Result<usize, IcechunkFormatError> {
// shard_range[i] must bound ChunkIndices
// 0 <= return value <= shard_range.len()
// it is possible that shard_range does not include a coord. say we have 2x2 shard grid
// but only shard (0,0) and shard (1,1) are populated with data.
// A coord located in (1, 0) should return Err
// Since shard_range need not form a regular grid, we must iterate through and find the first result.
// ManifestExtents in shard_range MUST NOT overlap with each other. How do we ensure this?
// ndim must be the same
// debug_assert_eq!(coord.0.len(), shard_range[0].len());
// FIXME: could optimize for unbounded single manifest
self.iter()
.enumerate()
.find(|(_, e)| e.contains(coord.0.as_slice()))
.map(|(i, _)| i as usize)
.ok_or(IcechunkFormatError::from(
IcechunkFormatErrorKind::InvalidIndexForSharding {
coords: coord.clone(),
},
))
}

pub fn iter(&self) -> impl Iterator<Item = &ManifestExtents> {
self.0.iter()
}

pub fn len(&self) -> usize {
self.0.len()
}
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -204,7 +266,7 @@ impl Manifest {
}

if array_manifests.is_empty() {
// empty manifet
// empty manifest
return Ok(None);
}

Expand Down
2 changes: 2 additions & 0 deletions icechunk/src/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ pub enum IcechunkFormatErrorKind {
NodeNotFound { path: Path },
#[error("chunk coordinates not found `{coords:?}`")]
ChunkCoordinatesNotFound { coords: ChunkIndices },
#[error("invalid chunk index for sharding manifests: {coords:?}")]
InvalidIndexForSharding { coords: ChunkIndices },
#[error("manifest information cannot be found in snapshot `{manifest_id}`")]
ManifestInfoNotFound { manifest_id: ManifestId },
#[error("invalid magic numbers in file")]
Expand Down
3 changes: 3 additions & 0 deletions icechunk/src/format/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ impl DimensionShape {
pub struct ArrayShape(Vec<DimensionShape>);

impl ArrayShape {
pub fn len(&self) -> usize {
self.0.len()
}
pub fn new<I>(it: I) -> Option<Self>
where
I: IntoIterator<Item = (u64, u64)>,
Expand Down
101 changes: 77 additions & 24 deletions icechunk/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
format::{
manifest::{
ChunkInfo, ChunkPayload, ChunkRef, Manifest, ManifestExtents, ManifestRef,
VirtualChunkLocation, VirtualChunkRef, VirtualReferenceError,
ManifestShards, VirtualChunkLocation, VirtualChunkRef, VirtualReferenceError,
VirtualReferenceErrorKind,
},
snapshot::{
Expand Down Expand Up @@ -1376,32 +1376,58 @@ impl<'a> FlushProcess<'a> {
) -> SessionResult<()> {
let mut from = vec![];
let mut to = vec![];
let chunks = stream::iter(
self.change_set
.new_array_chunk_iterator(node_id, node_path)
.map(Ok::<ChunkInfo, Infallible>),
);
let chunks = aggregate_extents(&mut from, &mut to, chunks, |ci| &ci.coord);

if let Some(new_manifest) = Manifest::from_stream(chunks).await.unwrap() {
let new_manifest = Arc::new(new_manifest);
let new_manifest_size =
self.asset_manager.write_manifest(Arc::clone(&new_manifest)).await?;

let file_info =
ManifestFileInfo::new(new_manifest.as_ref(), new_manifest_size);
self.manifest_files.insert(file_info);
let mut chunks = self.change_set.new_array_chunk_iterator(node_id, node_path);

let shards = ManifestShards::from_edges(vec![vec![0, 10, 20], vec![0, 10, 20]]);

let mut sharded_refs: HashMap<usize, Vec<_>> = HashMap::new();
sharded_refs.reserve(shards.len());
// TODO: what is a good capacity
let ref_capacity = 8_192;
sharded_refs.insert(0, Vec::with_capacity(ref_capacity));
while let Some(chunk) = chunks.next() {
let shard_index = shards.which(&chunk.coord).unwrap();
sharded_refs
.entry(shard_index)
.or_insert_with(|| Vec::with_capacity(ref_capacity))
.push(chunk);
}

let new_ref = ManifestRef {
object_id: new_manifest.id().clone(),
extents: ManifestExtents::new(&from, &to),
};
for i in 0..shards.len() {
if let Some(shard_chunks) = sharded_refs.remove(&i) {
let shard_chunks = stream::iter(
shard_chunks.into_iter().map(Ok::<ChunkInfo, Infallible>),
);
let shard_chunks =
aggregate_extents(&mut from, &mut to, shard_chunks, |ci| &ci.coord);

if let Some(new_manifest) =
Manifest::from_stream(shard_chunks).await.unwrap()
{
let new_manifest = Arc::new(new_manifest);
let new_manifest_size = self
.asset_manager
.write_manifest(Arc::clone(&new_manifest))
.await?;

let file_info =
ManifestFileInfo::new(new_manifest.as_ref(), new_manifest_size);
self.manifest_files.insert(file_info);

let new_ref = ManifestRef {
object_id: new_manifest.id().clone(),
extents: ManifestExtents::new(&from, &to),
};

self.manifest_refs
.entry(node_id.clone())
.and_modify(|v| v.push(new_ref.clone()))
.or_insert_with(|| vec![new_ref]);
self.manifest_refs
.entry(node_id.clone())
.and_modify(|v| v.push(new_ref.clone()))
.or_insert_with(|| vec![new_ref]);
}
}
}

Ok(())
}

Expand All @@ -1425,6 +1451,15 @@ impl<'a> FlushProcess<'a> {
let updated_chunks =
aggregate_extents(&mut from, &mut to, updated_chunks, |ci| &ci.coord);

// // FIXME
// let shards = {
// if let NodeData::Array { shape, .. } = node.node_data.clone() {
// ManifestShards::default(shape.len())
// } else {
// todo!()
// }
// };

if let Some(new_manifest) = Manifest::from_stream(updated_chunks).await? {
let new_manifest = Arc::new(new_manifest);
let new_manifest_size =
Expand Down Expand Up @@ -1742,7 +1777,7 @@ mod tests {
basic_solver::{BasicConflictSolver, VersionSelection},
detector::ConflictDetector,
},
format::manifest::ManifestExtents,
format::manifest::{ManifestExtents, ManifestShards},
refs::{fetch_tag, Ref},
repository::VersionInfo,
storage::new_in_memory_storage,
Expand Down Expand Up @@ -1938,6 +1973,24 @@ mod tests {
prop_assert_eq!(to, expected_to);
}

#[tokio::test]
async fn test_which_shard() -> Result<(), Box<dyn Error>> {
let shards = ManifestShards::from_edges(vec![vec![0, 10, 20]]);

assert_eq!(shards.which(&ChunkIndices(vec![1])).unwrap(), 0);
assert_eq!(shards.which(&ChunkIndices(vec![11])).unwrap(), 1);

let edges = vec![vec![0, 10, 20], vec![0, 10, 20]];

let shards = ManifestShards::from_edges(edges);
assert_eq!(shards.which(&ChunkIndices(vec![1, 1])).unwrap(), 0);
assert_eq!(shards.which(&ChunkIndices(vec![1, 10])).unwrap(), 1);
assert_eq!(shards.which(&ChunkIndices(vec![1, 11])).unwrap(), 1);
assert!(shards.which(&ChunkIndices(vec![21, 21])).is_err());

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_repository_with_updates() -> Result<(), Box<dyn Error>> {
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
Expand Down

0 comments on commit e7d9221

Please sign in to comment.