Skip to content

Commit

Permalink
fix: compaction grouping
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Dec 4, 2024
1 parent 9ced7b2 commit 66a974a
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 5 deletions.
16 changes: 11 additions & 5 deletions src/compaction/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,22 +128,28 @@ fn create_compaction_stream<'a>(
continue;
}

if level.is_disjoint {
let lo = level
if level.is_disjoint && level.len() > 1 {
let Some(lo) = level
.segments
.iter()
.enumerate()
.filter(|(_, segment)| to_compact.contains(&segment.metadata.id))
.min_by(|(a, _), (b, _)| a.cmp(b))
.map(|(idx, _)| idx)?;
.map(|(idx, _)| idx)
else {
continue;
};

let hi = level
let Some(hi) = level
.segments
.iter()
.enumerate()
.filter(|(_, segment)| to_compact.contains(&segment.metadata.id))
.max_by(|(a, _), (b, _)| a.cmp(b))
.map(|(idx, _)| idx)?;
.map(|(idx, _)| idx)
else {
continue;
};

readers.push(Box::new(LevelReader::from_indexes(
level.clone(),
Expand Down
45 changes: 45 additions & 0 deletions tests/compaction_readers_grouping.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use lsm_tree::{AbstractTree, Config, SequenceNumberCounter};
use std::sync::Arc;
use test_log::test;

#[test]
fn compaction_readers_grouping() -> lsm_tree::Result<()> {
let folder = tempfile::tempdir()?;
let path = folder.path();

let tree = Config::new(path).open()?;

let seqno = SequenceNumberCounter::default();

tree.insert("a".as_bytes(), "abc", seqno.next());
tree.insert("b".as_bytes(), "abc", seqno.next());
tree.insert("c".as_bytes(), "abc", seqno.next());
tree.flush_active_memtable(0)?;
assert_eq!(3, tree.len()?);

tree.compact(Arc::new(lsm_tree::compaction::PullDown(0, 2)), 0)?;

tree.insert("d".as_bytes(), "abc", seqno.next());
tree.insert("e".as_bytes(), "abc", seqno.next());
tree.insert("f".as_bytes(), "abc", seqno.next());
tree.flush_active_memtable(0)?;
assert_eq!(6, tree.len()?);

tree.insert("g".as_bytes(), "abc", seqno.next());
tree.insert("h".as_bytes(), "abc", seqno.next());
tree.insert("i".as_bytes(), "abc", seqno.next());
tree.flush_active_memtable(0)?;
assert_eq!(9, tree.len()?);

// NOTE: Previously, create_compaction_stream would short circuit
// breaking this
tree.compact(Arc::new(lsm_tree::compaction::PullDown(2, 3)), 0)?;

eprintln!("{}", tree.levels.read().expect("asdasd"));
assert!(!tree.levels.read().expect("asdasd").levels[0].is_empty());
assert!(tree.levels.read().expect("asdasd").levels[1].is_empty());
assert!(tree.levels.read().expect("asdasd").levels[2].is_empty());
assert!(!tree.levels.read().expect("asdasd").levels[3].is_empty());

Ok(())
}

0 comments on commit 66a974a

Please sign in to comment.