Skip to content

Commit 40fd112

Browse files
authored
Resolving Kamt issue#1862 (#1958)
Implement external iteration for the KAMT fixes #1862
1 parent b449426 commit 40fd112

File tree

5 files changed

+374
-49
lines changed

5 files changed

+374
-49
lines changed

ipld/kamt/src/error.rs

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ pub enum Error {
2222
/// Cid not found in store error
2323
#[error("Cid ({0}) did not match any in database")]
2424
CidNotFound(String),
25+
#[error("Iteration starting key not found in KAMT")]
26+
StartKeyNotFound,
2527
/// Dynamic error for when the error needs to be forwarded as is.
2628
#[error("{0}")]
2729
Dynamic(anyhow::Error),

ipld/kamt/src/iter.rs

+155
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// Copyright 2021-2023 Protocol Labs
2+
// SPDX-License-Identifier: Apache-2.0, MIT
3+
use std::borrow::Borrow;
4+
use std::iter::FusedIterator;
5+
6+
use fvm_ipld_blockstore::Blockstore;
7+
use fvm_ipld_encoding::de::DeserializeOwned;
8+
use fvm_ipld_encoding::CborStore;
9+
10+
use crate::hash_bits::HashBits;
11+
use crate::node::{match_extension, ExtensionMatch, Node};
12+
use crate::pointer::Pointer;
13+
use crate::{AsHashedKey, Config, Error, KeyValuePair};
14+
15+
/// Iterator over a KAMT. Items are ordered by-key, ascending.
16+
pub struct Iter<'a, BS, V, K, H, const N: usize = 32> {
17+
store: &'a BS,
18+
stack: Vec<std::slice::Iter<'a, Pointer<K, V, H, N>>>,
19+
current: std::slice::Iter<'a, KeyValuePair<K, V>>,
20+
}
21+
22+
impl<'a, K, V, BS, H, const N: usize> Iter<'a, BS, V, K, H, N>
23+
where
24+
K: DeserializeOwned,
25+
V: DeserializeOwned,
26+
BS: Blockstore,
27+
{
28+
pub(crate) fn new(store: &'a BS, root: &'a Node<K, V, H, N>) -> Self {
29+
Self {
30+
store,
31+
stack: vec![root.pointers.iter()],
32+
current: [].iter(),
33+
}
34+
}
35+
36+
pub(crate) fn new_from<Q: Sized>(
37+
store: &'a BS,
38+
root: &'a Node<K, V, H, N>,
39+
key: &Q,
40+
conf: &'a Config,
41+
) -> Result<Self, Error>
42+
where
43+
K: Borrow<Q>,
44+
Q: PartialEq,
45+
H: AsHashedKey<Q, N>,
46+
{
47+
let hashed_key = H::as_hashed_key(key);
48+
let mut hash = HashBits::new(&hashed_key);
49+
let mut node = root;
50+
let mut stack = Vec::new();
51+
52+
loop {
53+
let idx = hash.next(conf.bit_width)?;
54+
let ext;
55+
stack.push(node.pointers[node.index_for_bit_pos(idx)..].iter());
56+
(node, ext) = match stack.last_mut().unwrap().next() {
57+
Some(p) => match p {
58+
Pointer::Link {
59+
cid, cache, ext, ..
60+
} => {
61+
if let Some(cached_node) = cache.get() {
62+
(cached_node, ext)
63+
} else {
64+
let node =
65+
if let Some(node) = store.get_cbor::<Node<K, V, H, N>>(cid)? {
66+
node
67+
} else {
68+
#[cfg(not(feature = "ignore-dead-links"))]
69+
return Err(Error::CidNotFound(cid.to_string()));
70+
71+
#[cfg(feature = "ignore-dead-links")]
72+
continue;
73+
};
74+
75+
// Ignore error intentionally, the cache value will always be the same
76+
(cache.get_or_init(|| Box::new(node)), ext)
77+
}
78+
}
79+
Pointer::Dirty { node, ext, .. } => (node, ext),
80+
Pointer::Values(values) => {
81+
return match values.iter().position(|kv| kv.key().borrow() == key) {
82+
Some(offset) => Ok(Self {
83+
store,
84+
stack,
85+
current: values[offset..].iter(),
86+
}),
87+
None => Err(Error::StartKeyNotFound),
88+
}
89+
}
90+
},
91+
None => continue,
92+
};
93+
94+
match match_extension(conf, &mut hash, ext)? {
95+
ExtensionMatch::Full { .. } => {}
96+
ExtensionMatch::Partial { .. } => return Err(Error::StartKeyNotFound),
97+
}
98+
}
99+
}
100+
}
101+
impl<'a, K, V, BS, H, const N: usize> Iterator for Iter<'a, BS, V, K, H, N>
102+
where
103+
BS: Blockstore,
104+
K: DeserializeOwned + PartialOrd,
105+
V: DeserializeOwned,
106+
{
107+
type Item = Result<(&'a K, &'a V), Error>;
108+
109+
fn next(&mut self) -> Option<Self::Item> {
110+
if let Some(v) = self.current.next() {
111+
return Some(Ok((v.key(), v.value())));
112+
}
113+
loop {
114+
let Some(next) = self.stack.last_mut()?.next() else {
115+
self.stack.pop();
116+
continue;
117+
};
118+
match next {
119+
Pointer::Link { cid, cache, .. } => {
120+
let node = if let Some(cached_node) = cache.get() {
121+
cached_node
122+
} else {
123+
let node = match self.store.get_cbor::<Node<K, V, H, N>>(cid) {
124+
Ok(Some(node)) => node,
125+
#[cfg(not(feature = "ignore-dead-links"))]
126+
Ok(None) => return Some(Err(Error::CidNotFound(cid.to_string()))),
127+
#[cfg(feature = "ignore-dead-links")]
128+
Ok(None) => continue,
129+
Err(err) => return Some(Err(err.into())),
130+
};
131+
132+
// Ignore error intentionally, the cache value will always be the same
133+
cache.get_or_init(|| Box::new(node))
134+
};
135+
self.stack.push(node.pointers.iter())
136+
}
137+
Pointer::Dirty { node, .. } => self.stack.push(node.pointers.iter()),
138+
Pointer::Values(kvs) => {
139+
self.current = kvs.iter();
140+
if let Some(v) = self.current.next() {
141+
return Some(Ok((v.key(), v.value())));
142+
}
143+
}
144+
}
145+
}
146+
}
147+
}
148+
149+
impl<'a, K, V, BS, H, const N: usize> FusedIterator for Iter<'a, BS, V, K, H, N>
150+
where
151+
K: DeserializeOwned + PartialOrd,
152+
V: DeserializeOwned,
153+
BS: Blockstore,
154+
{
155+
}

ipld/kamt/src/kamt.rs

+205-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use multihash::Code;
1111
use serde::de::DeserializeOwned;
1212
use serde::{Serialize, Serializer};
1313

14+
use crate::iter::Iter;
1415
use crate::node::Node;
1516
use crate::{AsHashedKey, Config, Error};
1617

@@ -352,6 +353,209 @@ where
352353
V: DeserializeOwned,
353354
F: FnMut(&K, &V) -> anyhow::Result<()>,
354355
{
355-
self.root.for_each(self.store.borrow(), &mut f)
356+
for res in self {
357+
let (k, v) = res?;
358+
(f)(k, v)?;
359+
}
360+
Ok(())
361+
}
362+
}
363+
364+
impl<BS, V, K, H, const N: usize> Kamt<BS, K, V, H, N>
365+
where
366+
K: DeserializeOwned + PartialOrd,
367+
V: DeserializeOwned,
368+
BS: Blockstore,
369+
{
370+
/// Returns an iterator over the entries of the map.
371+
///
372+
/// The iterator element type is `Result<(&'a K, &'a V), Error>`.
373+
///
374+
/// # Examples
375+
///
376+
/// ```
377+
/// use fvm_ipld_kamt::Kamt;
378+
/// use fvm_ipld_kamt::id::Identity;
379+
///
380+
/// let store = fvm_ipld_blockstore::MemoryBlockstore::default();
381+
///
382+
/// let mut map: Kamt<_, u32, _, Identity> = Kamt::new(store);
383+
/// map.set(1, 1).unwrap();
384+
/// map.set(4, 2).unwrap();
385+
///
386+
/// let mut x: u32 = 0;
387+
/// for res in map.iter() {
388+
/// let (key, value) = res.unwrap();
389+
/// println!("key: {}, value: {}", key, value);
390+
/// x = x+1;
391+
/// }
392+
/// assert_eq!(x,2)
393+
/// ```
394+
pub fn iter(&self) -> Iter<BS, V, K, H, N> {
395+
Iter::new(&self.store, &self.root)
396+
}
397+
398+
/// Iterate over the KAMT starting at the given key.
399+
///
400+
/// # Examples
401+
///
402+
/// ```
403+
/// use fvm_ipld_kamt::Kamt;
404+
/// use fvm_ipld_kamt::id::Identity;
405+
///
406+
/// let store = fvm_ipld_blockstore::MemoryBlockstore::default();
407+
///
408+
/// let mut map: Kamt<_, u32, _, Identity> = Kamt::new(store);
409+
/// map.set(1, 1).unwrap();
410+
/// map.set(2, 4).unwrap();
411+
/// map.set(3, 3).unwrap();
412+
/// map.set(4, 2).unwrap();
413+
///
414+
/// let mut results = map.iter().take(2).collect::<Result<Vec<_>, _>>().unwrap();
415+
///
416+
/// let last_key = results.last().unwrap().0;
417+
///
418+
/// for res in map.iter_from(last_key).unwrap().skip(1) {
419+
/// results.push(res.unwrap());
420+
/// }
421+
///
422+
/// println!("{:?}", results);
423+
/// assert_eq!(results.len(), 4);
424+
/// ```
425+
426+
/// Iterate over the KAMT starting at the given key. This can be used to implement "ranged" iteration:
427+
///
428+
/// ```rust
429+
/// use fvm_ipld_kamt::Kamt;
430+
/// use fvm_ipld_blockstore::MemoryBlockstore;
431+
/// use fvm_ipld_kamt::id::Identity;
432+
/// use fvm_ipld_kamt::Config;
433+
/// let store = MemoryBlockstore::default();
434+
///
435+
/// // Create a Kamt with 5 keys, a-e.
436+
/// let mut kamt: Kamt<_, u32, String, Identity> = Kamt::new_with_config(store, Config {
437+
/// bit_width: 5,
438+
/// ..Default::default()
439+
/// });
440+
/// let kvs: Vec<(u32, String)> = ["a", "b", "c", "d", "e"]
441+
/// .iter()
442+
/// .enumerate()
443+
/// .map(|(index, &k)| (index as u32, k.to_owned()))
444+
/// .collect();
445+
/// kvs.iter()
446+
/// .map(|(k, v)|kamt.set(k.clone(), v.clone())
447+
/// .map(|_|()))
448+
/// .collect::<Result<(), _>>()?;
449+
///
450+
/// // Read 2 elements.
451+
/// let mut results = kamt.iter().take(2).collect::<Result<Vec<(_,_)>, _>>()?;
452+
/// assert_eq!(results.len(), 2);
453+
/// // Read the rest then sort.
454+
/// for res in kamt.iter_from(results.last().unwrap().0)?.skip(1) {
455+
/// results.push((res?));
456+
/// }
457+
/// results.sort_by_key(|kv| kv.1);
458+
///
459+
/// // Assert that we got out what we put in.
460+
/// let results: Vec<_> = results.into_iter().map(|(k, v)|(k.clone(), v.clone())).collect();
461+
/// assert_eq!(kvs, results);
462+
///
463+
/// # anyhow::Ok(())
464+
/// ```
465+
466+
pub fn iter_from<Q>(&self, key: &Q) -> Result<Iter<BS, V, K, H, N>, Error>
467+
where
468+
K: Borrow<Q>,
469+
Q: PartialEq,
470+
H: AsHashedKey<Q, N>,
471+
{
472+
Iter::new_from(&self.store, &self.root, key, &self.conf)
473+
}
474+
}
475+
476+
impl<'a, BS, V, K, H, const N: usize> IntoIterator for &'a Kamt<BS, K, V, H, N>
477+
where
478+
K: DeserializeOwned + PartialOrd,
479+
V: DeserializeOwned,
480+
481+
BS: Blockstore,
482+
{
483+
type Item = Result<(&'a K, &'a V), Error>;
484+
type IntoIter = Iter<'a, BS, V, K, H, N>;
485+
486+
fn into_iter(self) -> Self::IntoIter {
487+
self.iter()
488+
}
489+
}
490+
491+
#[cfg(test)]
492+
mod tests {
493+
use super::*;
494+
use crate::id::Identity;
495+
use fvm_ipld_blockstore::MemoryBlockstore;
496+
497+
#[test]
498+
499+
fn test_iter_from() -> anyhow::Result<()> {
500+
let store = MemoryBlockstore::default();
501+
502+
// Create a Kamt with 5 keys, a-e.
503+
let mut kamt: Kamt<_, [u8; 32], String, Identity> =
504+
Kamt::new_with_config(store, Config::default());
505+
506+
let keys: Vec<[u8; 32]> = vec![
507+
hex::decode("0000000000000000000000000000000000000000000000000000000000000001")
508+
.unwrap()
509+
.try_into()
510+
.unwrap(),
511+
hex::decode("0000000000000000000000000000000000000000000000000000000000000002")
512+
.unwrap()
513+
.try_into()
514+
.unwrap(),
515+
hex::decode("0100000000000000000000000000000000000000000000000000000000000001")
516+
.unwrap()
517+
.try_into()
518+
.unwrap(),
519+
hex::decode("0100000000000000000000000000000000000000000000000000000000000002")
520+
.unwrap()
521+
.try_into()
522+
.unwrap(),
523+
hex::decode("0100000000000000000000000000000000000000000000000000000000000003")
524+
.unwrap()
525+
.try_into()
526+
.unwrap(),
527+
hex::decode("0200000000000000000000000000000000000000000000000000000000000001")
528+
.unwrap()
529+
.try_into()
530+
.unwrap(),
531+
hex::decode("0200000000000000000000000000000000000000000000000000000000000002")
532+
.unwrap()
533+
.try_into()
534+
.unwrap(),
535+
];
536+
537+
for (i, key) in keys.iter().enumerate() {
538+
let value = format!("value{}", i + 1);
539+
540+
kamt.set(*key, value)?;
541+
}
542+
let kvs: Vec<(&[u8; 32], &String)> = keys
543+
.iter()
544+
.map(|k| (k, kamt.get(k).unwrap().unwrap()))
545+
.collect();
546+
547+
// Read 2 elements.
548+
let mut results = kamt.iter().take(2).collect::<Result<Vec<_>, _>>()?;
549+
assert_eq!(results.len(), 2);
550+
551+
// Read the rest. Don't bother sorting because the KAMT stores keys in sorted order.
552+
for res in kamt.iter_from(results.last().unwrap().0)?.skip(1) {
553+
results.push(res?);
554+
}
555+
556+
// Assert that we got out what we put in.
557+
assert_eq!(kvs, results);
558+
559+
Ok(())
356560
}
357561
}

0 commit comments

Comments
 (0)