diff --git a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs index 3806e1cd20a77..36eaca43cf8d7 100644 --- a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use arrow::{ - array::{ArrayRef, Int32Array}, + array::{as_string_array, ArrayRef, Int32Array, StringArray}, compute::SortOptions, record_batch::RecordBatch, }; @@ -29,6 +29,7 @@ use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_common::cast::as_int32_array; use datafusion_execution::memory_pool::GreedyMemoryPool; use datafusion_physical_expr::expressions::col; use datafusion_physical_expr_common::sort_expr::LexOrdering; @@ -42,12 +43,17 @@ const KB: usize = 1 << 10; #[cfg_attr(tarpaulin, ignore)] async fn test_sort_10k_mem() { for (batch_size, should_spill) in [(5, false), (20000, true), (500000, true)] { - SortTest::new() + let (input, collected) = SortTest::new() .with_int32_batches(batch_size) + .with_sort_columns(vec!["x"]) .with_pool_size(10 * KB) .with_should_spill(should_spill) .run() .await; + + let expected = partitions_to_sorted_vec(&input); + let actual = batches_to_vec(&collected); + assert_eq!(expected, actual, "failure in @ batch_size {batch_size:?}"); } } @@ -57,29 +63,123 @@ async fn test_sort_100k_mem() { for (batch_size, should_spill) in [(5, false), (10000, false), (20000, true), (1000000, true)] { - SortTest::new() + let (input, collected) = SortTest::new() .with_int32_batches(batch_size) + .with_sort_columns(vec!["x"]) + .with_pool_size(100 * KB) + .with_should_spill(should_spill) + .run() + .await; + + let expected = partitions_to_sorted_vec(&input); + let actual = batches_to_vec(&collected); + assert_eq!(expected, actual, "failure in @ batch_size {batch_size:?}"); + } +} + +#[tokio::test] +#[cfg_attr(tarpaulin, ignore)] +async fn test_sort_strings_100k_mem() { + for (batch_size, should_spill) in + [(5, false), (1000, false), (10000, true), (20000, true)] + { + let (input, collected) = SortTest::new() + .with_utf8_batches(batch_size) + .with_sort_columns(vec!["x"]) .with_pool_size(100 * KB) .with_should_spill(should_spill) .run() .await; + + let mut input = input + .iter() + .flat_map(|p| p.iter()) + .map(|b| { + let array = b.column(0); + as_string_array(array) + .iter() + .map(|s| s.unwrap().to_string()) + }) + .flatten() + .collect::>(); + input.sort_unstable(); + let actual = collected + .iter() + .map(|b| { + let array = b.column(0); + as_string_array(array) + .iter() + .map(|s| s.unwrap().to_string()) + }) + .flatten() + .collect::>(); + assert_eq!(input, actual); + } +} + +#[tokio::test] +#[cfg_attr(tarpaulin, ignore)] +async fn test_sort_multi_columns_100k_mem() { + for (batch_size, should_spill) in + [(5, false), (1000, false), (10000, true), (20000, true)] + { + let (input, collected) = SortTest::new() + .with_int32_utf8_batches(batch_size) + .with_sort_columns(vec!["x", "y"]) + .with_pool_size(100 * KB) + .with_should_spill(should_spill) + .run() + .await; + + fn record_batch_to_vec(b: &RecordBatch) -> Vec<(i32, String)> { + let mut rows: Vec<_> = Vec::new(); + let i32_array = as_int32_array(b.column(0)).unwrap(); + let string_array = as_string_array(b.column(1)); + for i in 0..b.num_rows() { + let str = string_array.value(i).to_string(); + let i32 = i32_array.value(i); + rows.push((i32, str)); + } + rows + } + let mut input = input + .iter() + .flat_map(|p| p.iter()) + .map(record_batch_to_vec) + .flatten() + .collect::>(); + input.sort_unstable(); + let actual = collected + .iter() + .map(record_batch_to_vec) + .flatten() + .collect::>(); + assert_eq!(input, actual); } } #[tokio::test] async fn test_sort_unlimited_mem() { for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, false)] { - SortTest::new() + let (input, collected) = SortTest::new() .with_int32_batches(batch_size) + .with_sort_columns(vec!["x"]) .with_pool_size(usize::MAX) .with_should_spill(should_spill) .run() .await; + + let expected = partitions_to_sorted_vec(&input); + let actual = batches_to_vec(&collected); + assert_eq!(expected, actual, "failure in @ batch_size {batch_size:?}"); } } + #[derive(Debug, Default)] struct SortTest { input: Vec>, + /// The names of the columns to sort by + sort_columns: Vec, /// GreedyMemoryPool size, if specified pool_size: Option, /// If true, expect the sort to spill @@ -91,12 +191,29 @@ impl SortTest { Default::default() } + fn with_sort_columns(mut self, sort_columns: Vec<&str>) -> Self { + self.sort_columns = sort_columns.iter().map(|s| s.to_string()).collect(); + self + } + /// Create batches of int32 values of rows fn with_int32_batches(mut self, rows: usize) -> Self { self.input = vec![make_staggered_i32_batches(rows)]; self } + /// Create batches of utf8 values of rows + fn with_utf8_batches(mut self, rows: usize) -> Self { + self.input = vec![make_staggered_utf8_batches(rows)]; + self + } + + /// Create batches of int32 and utf8 values of rows + fn with_int32_utf8_batches(mut self, rows: usize) -> Self { + self.input = vec![make_staggered_i32_utf8_batches(rows)]; + self + } + /// specify that this test should use a memory pool of the specified size fn with_pool_size(mut self, pool_size: usize) -> Self { self.pool_size = Some(pool_size); @@ -110,7 +227,7 @@ impl SortTest { /// Sort the input using SortExec and ensure the results are /// correct according to `Vec::sort` both with and without spilling - async fn run(&self) { + async fn run(&self) -> (Vec>, Vec) { let input = self.input.clone(); let first_batch = input .iter() @@ -119,16 +236,21 @@ impl SortTest { .expect("at least one batch"); let schema = first_batch.schema(); - let sort = LexOrdering::new(vec![PhysicalSortExpr { - expr: col("x", &schema).unwrap(), - options: SortOptions { - descending: false, - nulls_first: true, - }, - }]); + let sort_ordering = LexOrdering::new( + self.sort_columns + .iter() + .map(|c| PhysicalSortExpr { + expr: col(c, &schema).unwrap(), + options: SortOptions { + descending: false, + nulls_first: true, + }, + }) + .collect(), + ); let exec = MemorySourceConfig::try_new_exec(&input, schema, None).unwrap(); - let sort = Arc::new(SortExec::new(sort, exec)); + let sort = Arc::new(SortExec::new(sort_ordering, exec)); let session_config = SessionConfig::new(); let session_ctx = if let Some(pool_size) = self.pool_size { @@ -153,9 +275,6 @@ impl SortTest { let task_ctx = session_ctx.task_ctx(); let collected = collect(sort.clone(), task_ctx).await.unwrap(); - let expected = partitions_to_sorted_vec(&input); - let actual = batches_to_vec(&collected); - if self.should_spill { assert_ne!( sort.metrics().unwrap().spill_count().unwrap(), @@ -175,7 +294,8 @@ impl SortTest { 0, "The sort should have returned all memory used back to the memory pool" ); - assert_eq!(expected, actual, "failure in @ pool_size {self:?}"); + + (input, collected) } } @@ -203,3 +323,63 @@ fn make_staggered_i32_batches(len: usize) -> Vec { } batches } + +/// Return randomly sized record batches in a field named 'x' of type `Utf8` +/// with randomized content +fn make_staggered_utf8_batches(len: usize) -> Vec { + let mut rng = rand::thread_rng(); + let max_batch = 1024; + + let mut batches = vec![]; + let mut remaining = len; + while remaining != 0 { + let to_read = rng.gen_range(0..=remaining.min(max_batch)); + remaining -= to_read; + + batches.push( + RecordBatch::try_from_iter(vec![( + "x", + Arc::new(StringArray::from_iter_values( + (0..to_read).map(|_| format!("test_string_{}", rng.gen::())), + )) as ArrayRef, + )]) + .unwrap(), + ) + } + batches +} + +/// Return randomly sized record batches in a field named 'x' of type `Int32` +/// with randomized i32 content and a field named 'y' of type `Utf8` +/// with randomized content +fn make_staggered_i32_utf8_batches(len: usize) -> Vec { + let mut rng = rand::thread_rng(); + let max_batch = 1024; + + let mut batches = vec![]; + let mut remaining = len; + while remaining != 0 { + let to_read = rng.gen_range(0..=remaining.min(max_batch)); + remaining -= to_read; + + batches.push( + RecordBatch::try_from_iter(vec![ + ( + "x", + Arc::new(Int32Array::from_iter_values( + (0..to_read).map(|_| rng.gen()), + )) as ArrayRef, + ), + ( + "y", + Arc::new(StringArray::from_iter_values( + (0..to_read).map(|_| format!("test_string_{}", rng.gen::())), + )) as ArrayRef, + ), + ]) + .unwrap(), + ) + } + + batches +} diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 00c863036edb3..649468260e560 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -225,6 +225,8 @@ struct ExternalSorter { // ======================================================================== /// Potentially unsorted in memory buffer in_mem_batches: Vec, + /// if `Self::in_mem_batches` are sorted + in_mem_batches_sorted: bool, /// If data has previously been spilled, the locations of the /// spill files (in Arrow IPC format) @@ -277,6 +279,7 @@ impl ExternalSorter { Self { schema, in_mem_batches: vec![], + in_mem_batches_sorted: false, spills: vec![], expr: expr.into(), metrics, @@ -309,6 +312,7 @@ impl ExternalSorter { } self.in_mem_batches.push(input); + self.in_mem_batches_sorted = false; Ok(()) } @@ -423,7 +427,8 @@ impl ExternalSorter { async fn sort_or_spill_in_mem_batches(&mut self) -> Result<()> { // Release the memory reserved for merge back to the pool so // there is some left when `in_mem_sort_stream` requests an - // allocation. + // allocation. At the end of this function, memory will be + // reserved again for the next spill. self.merge_reservation.free(); let before = self.reservation.size(); @@ -458,6 +463,7 @@ impl ExternalSorter { self.spills.push(spill_file); } else { self.in_mem_batches.push(batch); + self.in_mem_batches_sorted = true; } } Some(writer) => { @@ -662,10 +668,10 @@ impl ExternalSorter { /// Estimate how much memory is needed to sort a `RecordBatch`. /// /// This is used to pre-reserve memory for the sort/merge. The sort/merge process involves -/// creating sorted copies of sorted columns in record batches, the sorted copies could be -/// in either row format or array format. Please refer to cursor.rs and stream.rs for more -/// details. No matter what format the sorted copies are, they will use more memory than -/// the original record batch. +/// creating sorted copies of sorted columns in record batches for speeding up comparison +/// in sorting and merging. The sorted copies are in either row format or array format. +/// Please refer to cursor.rs and stream.rs for more details. No matter what format the +/// sorted copies are, they will use more memory than the original record batch. fn get_reserved_byte_for_record_batch(batch: &RecordBatch) -> usize { // 2x may not be enough for some cases, but it's a good start. // If 2x is not enough, user can set a larger value for `sort_spill_reservation_bytes`