Skip to content

Commit

Permalink
[WIP] Begin Manifest Sharding
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherian committed Feb 21, 2025
1 parent 33f41e8 commit fd1c572
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 31 deletions.
17 changes: 10 additions & 7 deletions icechunk/src/asset_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ mod test {
use super::*;
use crate::{
format::{
manifest::{ChunkInfo, ChunkPayload},
manifest::{ChunkInfo, ChunkPayload, ManifestShards},
ChunkIndices, NodeId,
},
storage::{logging::LoggingStorage, new_in_memory_storage, Storage},
Expand All @@ -692,6 +692,7 @@ mod test {
let settings = storage::Settings::default();
let manager = AssetManager::new_no_cache(backend.clone(), settings.clone(), 1);

let shards = ManifestShards::default(1);
let node1 = NodeId::random();
let node2 = NodeId::random();
let ci1 = ChunkInfo {
Expand All @@ -705,7 +706,7 @@ mod test {
payload: ChunkPayload::Inline(Bytes::copy_from_slice(b"b")),
};
let pre_existing_manifest =
Manifest::from_iter(vec![ci1].into_iter()).await?.unwrap();
Manifest::from_iter(vec![ci1].into_iter(), &shards).await?.unwrap();
let pre_existing_manifest = Arc::new(pre_existing_manifest);
let pre_existing_id = pre_existing_manifest.id();
let pre_size = manager.write_manifest(Arc::clone(&pre_existing_manifest)).await?;
Expand All @@ -720,7 +721,7 @@ mod test {
);

let manifest =
Arc::new(Manifest::from_iter(vec![ci2.clone()].into_iter()).await?.unwrap());
Arc::new(Manifest::from_iter(vec![ci2.clone()].into_iter(), &shards).await?.unwrap());
let id = manifest.id();
let size = caching.write_manifest(Arc::clone(&manifest)).await?;

Expand Down Expand Up @@ -765,6 +766,7 @@ mod test {
let settings = storage::Settings::default();
let manager = AssetManager::new_no_cache(backend.clone(), settings.clone(), 1);

let shards = ManifestShards::default(1);
let ci1 = ChunkInfo {
node: NodeId::random(),
coord: ChunkIndices(vec![]),
Expand All @@ -780,15 +782,15 @@ mod test {
let ci9 = ChunkInfo { node: NodeId::random(), ..ci1.clone() };

let manifest1 =
Arc::new(Manifest::from_iter(vec![ci1, ci2, ci3]).await?.unwrap());
Arc::new(Manifest::from_iter(vec![ci1, ci2, ci3], &shards).await?.unwrap());
let id1 = manifest1.id();
let size1 = manager.write_manifest(Arc::clone(&manifest1)).await?;
let manifest2 =
Arc::new(Manifest::from_iter(vec![ci4, ci5, ci6]).await?.unwrap());
Arc::new(Manifest::from_iter(vec![ci4, ci5, ci6], &shards).await?.unwrap());
let id2 = manifest2.id();
let size2 = manager.write_manifest(Arc::clone(&manifest2)).await?;
let manifest3 =
Arc::new(Manifest::from_iter(vec![ci7, ci8, ci9]).await?.unwrap());
Arc::new(Manifest::from_iter(vec![ci7, ci8, ci9], &shards).await?.unwrap());
let id3 = manifest3.id();
let size3 = manager.write_manifest(Arc::clone(&manifest3)).await?;

Expand Down Expand Up @@ -829,12 +831,13 @@ mod test {
let manager =
Arc::new(AssetManager::new_no_cache(storage.clone(), settings.clone(), 1));

let shards = ManifestShards::default(2);
// some reasonable size so it takes some time to parse
let manifest = Manifest::from_iter((0..5_000).map(|_| ChunkInfo {
node: NodeId::random(),
coord: ChunkIndices(Vec::from([rand::random(), rand::random()])),
payload: ChunkPayload::Inline("hello".into()),
}))
}), &shards)
.await
.unwrap()
.unwrap();
Expand Down
118 changes: 99 additions & 19 deletions icechunk/src/format/manifest.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::{borrow::Cow, convert::Infallible, ops::Range, sync::Arc};
use std::{
borrow::Cow, collections::HashMap, convert::Infallible, ops::Range, sync::Arc,
};

use crate::format::flatbuffers::gen;
use bytes::Bytes;
use flatbuffers::VerifierOptions;
use futures::{Stream, TryStreamExt};
use itertools::Itertools;
use itertools::{multiunzip, repeat_n, Itertools};
use serde::{Deserialize, Serialize};
use thiserror::Error;

Expand All @@ -21,12 +23,6 @@ use super::{
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ManifestExtents(Vec<Range<u32>>);

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ManifestRef {
pub object_id: ManifestId,
pub extents: ManifestExtents,
}

impl ManifestExtents {
pub fn new(from: &[u32], to: &[u32]) -> Self {
let v = from
Expand All @@ -37,9 +33,77 @@ impl ManifestExtents {
Self(v)
}

pub fn contains(&self, coord: &[u32]) -> bool {
self.iter().zip(coord.iter()).all(|(range, that)| range.contains(that))
}

pub fn iter(&self) -> impl Iterator<Item = &Range<u32>> {
self.0.iter()
}

pub fn len(&self) -> usize {
self.0.len()
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ManifestRef {
pub object_id: ManifestId,
pub extents: ManifestExtents,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ManifestShards(Vec<ManifestExtents>);

impl ManifestShards {
pub fn default(ndim: usize) -> Self {
Self(vec![ManifestExtents(repeat_n(0..u32::MAX, ndim).collect())])
}

pub fn from_edges(iter: impl IntoIterator<Item = Vec<u32>>) -> Self {
let res = iter
.into_iter()
.map(|x| x.into_iter().tuple_windows())
.multi_cartesian_product()
.map(|x| multiunzip(x))
.map(|(from, to): (Vec<u32>, Vec<u32>)| {
ManifestExtents::new(from.as_slice(), to.as_slice())
});
Self(res.collect())
}

// Returns the index of shard_range that includes ChunkIndices
// This can be used at write time to split manifests based on the config
// and at read time to choose which manifest to query for chunk payload
pub fn which(&self, coord: &ChunkIndices) -> Result<usize, IcechunkFormatError> {
// shard_range[i] must bound ChunkIndices
// 0 <= return value <= shard_range.len()
// it is possible that shard_range does not include a coord. say we have 2x2 shard grid
// but only shard (0,0) and shard (1,1) are populated with data.
// A coord located in (1, 0) should return Err
// Since shard_range need not form a regular grid, we must iterate through and find the first result.
// ManifestExtents in shard_range MUST NOT overlap with each other. How do we ensure this?
// ndim must be the same
// debug_assert_eq!(coord.0.len(), shard_range[0].len());
// FIXME: could optimize for unbounded single manifest
self.iter()
.enumerate()
.find(|(_, e)| e.contains(coord.0.as_slice()))
.map(|(i, _)| i as usize)
.ok_or(IcechunkFormatError::from(
IcechunkFormatErrorKind::InvalidIndexForSharding {
coords: coord.clone(),
},
))
}

pub fn iter(&self) -> impl Iterator<Item = &ManifestExtents> {
self.0.iter()
}

pub fn len(&self) -> usize {
self.0.len()
}
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -177,6 +241,7 @@ impl Manifest {

pub async fn from_stream<E>(
stream: impl Stream<Item = Result<ChunkInfo, E>>,
shards: &ManifestShards,
) -> Result<Option<Self>, E> {
// TODO: what's a good capacity?
let mut builder = flatbuffers::FlatBufferBuilder::with_capacity(1024 * 1024);
Expand All @@ -186,28 +251,42 @@ impl Manifest {

let mut all = all.iter().peekable();

let mut array_manifests = Vec::with_capacity(1);
let mut array_manifests = Vec::with_capacity(shards.len());
// why peek?
// this seems to handle multiple nodes, but we only send one in?
while let Some(current_node) = all.peek().map(|chunk| &chunk.node).cloned() {
let mut sharded_refs: HashMap<usize, Vec<_>> = HashMap::new();
sharded_refs.reserve(shards.len());
// TODO: what is a good capacity
let mut refs = Vec::with_capacity(8_192);
let ref_capacity = 8_192;
sharded_refs.insert(0, Vec::with_capacity(ref_capacity));
while let Some(chunk) = all.next_if(|chunk| chunk.node == current_node) {
refs.push(mk_chunk_ref(&mut builder, chunk));
let shard_index = shards.which(&chunk.coord).unwrap();
sharded_refs
.entry(shard_index)
.or_insert_with(|| Vec::with_capacity(ref_capacity))
.push(mk_chunk_ref(&mut builder, chunk));
}

let node_id = Some(gen::ObjectId8::new(&current_node.0));
let refs = Some(builder.create_vector(refs.as_slice()));
let array_manifest = gen::ArrayManifest::create(
&mut builder,
&gen::ArrayManifestArgs { node_id: node_id.as_ref(), refs },
);
array_manifests.push(array_manifest);
for refs in sharded_refs.values() {
// FIXME: skip empty refs?
let refs = Some(builder.create_vector(refs.as_slice()));
let array_manifest = gen::ArrayManifest::create(
&mut builder,
&gen::ArrayManifestArgs { node_id: node_id.as_ref(), refs },
);
array_manifests.push(array_manifest);
}
}

if array_manifests.is_empty() {
// empty manifet
// empty manifest
return Ok(None);
}

// looks we now

let arrays = builder.create_vector(array_manifests.as_slice());
let manifest_id = ManifestId::random();
let bin_manifest_id = gen::ObjectId12::new(&manifest_id.0);
Expand All @@ -227,8 +306,9 @@ impl Manifest {
/// Used for tests
pub async fn from_iter<T: IntoIterator<Item = ChunkInfo>>(
iter: T,
shards: &ManifestShards,
) -> Result<Option<Self>, Infallible> {
Self::from_stream(futures::stream::iter(iter.into_iter().map(Ok))).await
Self::from_stream(futures::stream::iter(iter.into_iter().map(Ok)), shards).await
}

pub fn len(&self) -> usize {
Expand Down
2 changes: 2 additions & 0 deletions icechunk/src/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ pub enum IcechunkFormatErrorKind {
NodeNotFound { path: Path },
#[error("chunk coordinates not found `{coords:?}`")]
ChunkCoordinatesNotFound { coords: ChunkIndices },
#[error("invalid chunk index for sharding manifests: {coords:?}")]
InvalidIndexForSharding { coords: ChunkIndices },
#[error("manifest information cannot be found in snapshot `{manifest_id}`")]
ManifestInfoNotFound { manifest_id: ManifestId },
#[error("invalid magic numbers in file")]
Expand Down
3 changes: 3 additions & 0 deletions icechunk/src/format/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ impl DimensionShape {
pub struct ArrayShape(Vec<DimensionShape>);

impl ArrayShape {
pub fn len(&self) -> usize {
self.0.len()
}
pub fn new<I>(it: I) -> Option<Self>
where
I: IntoIterator<Item = (u64, u64)>,
Expand Down
53 changes: 48 additions & 5 deletions icechunk/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
format::{
manifest::{
ChunkInfo, ChunkPayload, ChunkRef, Manifest, ManifestExtents, ManifestRef,
VirtualChunkLocation, VirtualChunkRef, VirtualReferenceError,
ManifestShards, VirtualChunkLocation, VirtualChunkRef, VirtualReferenceError,
VirtualReferenceErrorKind,
},
snapshot::{
Expand Down Expand Up @@ -1376,14 +1376,28 @@ impl<'a> FlushProcess<'a> {
) -> SessionResult<()> {
let mut from = vec![];
let mut to = vec![];

// iterator of chunks for a single node
let chunks = stream::iter(
self.change_set
.new_array_chunk_iterator(node_id, node_path)
.map(Ok::<ChunkInfo, Infallible>),
);

let shards = ManifestShards::from_edges(vec![vec![0, 10, 20], vec![0, 10, 20]]);

// for it in chunks {}

// if manifest-split-size is set then we know extents, this is then duplicated between snapshot and config
// 1. Should we record the "real" extent or the theoretical-max extent (as set in the config).
// 2.
// if not, we aggregate_extents
// this is a choice, we could just always aggregate extents
let chunks = aggregate_extents(&mut from, &mut to, chunks, |ci| &ci.coord);

if let Some(new_manifest) = Manifest::from_stream(chunks).await.unwrap() {
// loop over streams and associated extents
if let Some(new_manifest) = Manifest::from_stream(chunks, &shards).await.unwrap()
{
let new_manifest = Arc::new(new_manifest);
let new_manifest_size =
self.asset_manager.write_manifest(Arc::clone(&new_manifest)).await?;
Expand Down Expand Up @@ -1425,7 +1439,17 @@ impl<'a> FlushProcess<'a> {
let updated_chunks =
aggregate_extents(&mut from, &mut to, updated_chunks, |ci| &ci.coord);

if let Some(new_manifest) = Manifest::from_stream(updated_chunks).await? {
// FIXME
let shards = {
if let NodeData::Array { shape, .. } = node.node_data.clone() {
ManifestShards::default(shape.len())
} else {
todo!()
}
};

if let Some(new_manifest) = Manifest::from_stream(updated_chunks, &shards).await?
{
let new_manifest = Arc::new(new_manifest);
let new_manifest_size =
self.asset_manager.write_manifest(Arc::clone(&new_manifest)).await?;
Expand Down Expand Up @@ -1742,7 +1766,7 @@ mod tests {
basic_solver::{BasicConflictSolver, VersionSelection},
detector::ConflictDetector,
},
format::manifest::ManifestExtents,
format::manifest::{ManifestExtents, ManifestShards},
refs::{fetch_tag, Ref},
repository::VersionInfo,
storage::new_in_memory_storage,
Expand Down Expand Up @@ -1938,6 +1962,24 @@ mod tests {
prop_assert_eq!(to, expected_to);
}

#[tokio::test]
async fn test_which_shard() -> Result<(), Box<dyn Error>> {
let shards = ManifestShards::from_edges(vec![vec![0, 10, 20]]);

assert_eq!(shards.which(&ChunkIndices(vec![1])).unwrap(), 0);
assert_eq!(shards.which(&ChunkIndices(vec![11])).unwrap(), 1);

let edges = vec![vec![0, 10, 20], vec![0, 10, 20]];

let shards = ManifestShards::from_edges(edges);
assert_eq!(shards.which(&ChunkIndices(vec![1, 1])).unwrap(), 0);
assert_eq!(shards.which(&ChunkIndices(vec![1, 10])).unwrap(), 1);
assert_eq!(shards.which(&ChunkIndices(vec![1, 11])).unwrap(), 1);
assert!(shards.which(&ChunkIndices(vec![21, 21])).is_err());

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_repository_with_updates() -> Result<(), Box<dyn Error>> {
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
Expand All @@ -1946,6 +1988,7 @@ mod tests {
AssetManager::new_no_cache(Arc::clone(&storage), storage_settings.clone(), 1);

let array_id = NodeId::random();
let shards = ManifestShards::default(2);
let chunk1 = ChunkInfo {
node: array_id.clone(),
coord: ChunkIndices(vec![0, 0, 0]),
Expand All @@ -1963,7 +2006,7 @@ mod tests {
};

let manifest =
Manifest::from_iter(vec![chunk1.clone(), chunk2.clone()]).await?.unwrap();
Manifest::from_iter(vec![chunk1.clone(), chunk2.clone()], &shards).await?.unwrap();
let manifest = Arc::new(manifest);
let manifest_id = manifest.id();
let manifest_size = asset_manager.write_manifest(Arc::clone(&manifest)).await?;
Expand Down

0 comments on commit fd1c572

Please sign in to comment.