diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 6817969580da0..256d41caf308e 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -31,6 +31,7 @@ use datafusion_execution::memory_pool::{ }; use datafusion_expr::{Expr, TableType}; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion_physical_plan::spill::get_record_batch_memory_size; use futures::StreamExt; use std::any::Any; use std::num::NonZeroUsize; @@ -265,6 +266,10 @@ async fn sort_spill_reservation() { // This test case shows how sort_spill_reservation works by // purposely sorting data that requires non trivial memory to // sort/merge. + + // Merge operation needs extra memory to do row conversion, so make the + // memory limit larger. + let mem_limit = partition_size * 2; let test = TestCase::new() // This query uses a different order than the input table to // force a sort. It also needs to have multiple columns to @@ -272,7 +277,7 @@ async fn sort_spill_reservation() { // substantial memory .with_query("select * from t ORDER BY a , b DESC") // enough memory to sort if we don't try to merge it all at once - .with_memory_limit(partition_size) + .with_memory_limit(mem_limit) // use a single partition so only a sort is needed .with_scenario(scenario) .with_disk_manager_config(DiskManagerConfig::NewOs) @@ -311,7 +316,7 @@ async fn sort_spill_reservation() { // reserve sufficient space up front for merge and this time, // which will force the spills to happen with less buffered // input and thus with enough to merge. - .with_sort_spill_reservation_bytes(partition_size / 2); + .with_sort_spill_reservation_bytes(mem_limit / 2); test.with_config(config).with_expected_success().run().await; } @@ -774,7 +779,10 @@ fn make_dict_batches() -> Vec { // How many bytes does the memory from dict_batches consume? fn batches_byte_size(batches: &[RecordBatch]) -> usize { - batches.iter().map(|b| b.get_array_memory_size()).sum() + batches + .iter() + .map(|b| get_record_batch_memory_size(b)) + .sum() } #[derive(Debug)] diff --git a/datafusion/physical-plan/src/sorts/builder.rs b/datafusion/physical-plan/src/sorts/builder.rs index d32c60697ec8c..dc2a84b51327d 100644 --- a/datafusion/physical-plan/src/sorts/builder.rs +++ b/datafusion/physical-plan/src/sorts/builder.rs @@ -22,6 +22,8 @@ use datafusion_common::Result; use datafusion_execution::memory_pool::MemoryReservation; use std::sync::Arc; +use crate::spill::get_record_batch_memory_size; + #[derive(Debug, Copy, Clone, Default)] struct BatchCursor { /// The index into BatchBuilder::batches @@ -69,7 +71,8 @@ impl BatchBuilder { /// Append a new batch in `stream_idx` pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> { - self.reservation.try_grow(batch.get_array_memory_size())?; + self.reservation + .try_grow(get_record_batch_memory_size(&batch))?; let batch_idx = self.batches.len(); self.batches.push((stream_idx, batch)); self.cursors[stream_idx] = BatchCursor { @@ -141,7 +144,8 @@ impl BatchBuilder { stream_cursor.batch_idx = retained; retained += 1; } else { - self.reservation.shrink(batch.get_array_memory_size()); + self.reservation + .shrink(get_record_batch_memory_size(&batch)); } retain }); diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index ce7efce415779..0aa920213c03a 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -31,7 +31,9 @@ use crate::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, }; use crate::sorts::streaming_merge::StreamingMergeBuilder; -use crate::spill::{read_spill_as_stream, spill_record_batches}; +use crate::spill::{ + get_record_batch_memory_size, read_spill_as_stream, spill_record_batches, +}; use crate::stream::RecordBatchStreamAdapter; use crate::topk::TopK; use crate::{ @@ -286,10 +288,12 @@ impl ExternalSorter { } self.reserve_memory_for_merge()?; - let size = input.get_array_memory_size(); + let size = get_record_batch_memory_size(&input); + if self.reservation.try_grow(size).is_err() { let before = self.reservation.size(); self.in_mem_sort().await?; + // Sorting may have freed memory, especially if fetch is `Some` // // As such we check again, and if the memory usage has dropped by @@ -426,7 +430,7 @@ impl ExternalSorter { let size: usize = self .in_mem_batches .iter() - .map(|x| x.get_array_memory_size()) + .map(|x| get_record_batch_memory_size(x)) .sum(); // Reserve headroom for next sort/merge @@ -521,7 +525,8 @@ impl ExternalSorter { // Concatenate memory batches together and sort let batch = concat_batches(&self.schema, &self.in_mem_batches)?; self.in_mem_batches.clear(); - self.reservation.try_resize(batch.get_array_memory_size())?; + self.reservation + .try_resize(get_record_batch_memory_size(&batch))?; let reservation = self.reservation.take(); return self.sort_batch_stream(batch, metrics, reservation); } @@ -530,7 +535,8 @@ impl ExternalSorter { .into_iter() .map(|batch| { let metrics = self.metrics.baseline.intermediate(); - let reservation = self.reservation.split(batch.get_array_memory_size()); + let reservation = + self.reservation.split(get_record_batch_memory_size(&batch)); let input = self.sort_batch_stream(batch, metrics, reservation)?; Ok(spawn_buffered(input, 1)) }) @@ -559,7 +565,7 @@ impl ExternalSorter { metrics: BaselineMetrics, reservation: MemoryReservation, ) -> Result { - assert_eq!(batch.get_array_memory_size(), reservation.size()); + assert_eq!(get_record_batch_memory_size(&batch), reservation.size()); let schema = batch.schema(); let fetch = self.fetch; @@ -1185,9 +1191,9 @@ mod tests { assert_eq!(metrics.output_rows().unwrap(), 10000); assert!(metrics.elapsed_compute().unwrap() > 0); - assert_eq!(metrics.spill_count().unwrap(), 4); - assert_eq!(metrics.spilled_bytes().unwrap(), 38784); - assert_eq!(metrics.spilled_rows().unwrap(), 9600); + assert_eq!(metrics.spill_count().unwrap(), 3); + assert_eq!(metrics.spilled_bytes().unwrap(), 36000); + assert_eq!(metrics.spilled_rows().unwrap(), 9000); let columns = result[0].columns(); diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill.rs index de85a7c6f0989..ae41a3450ec10 100644 --- a/datafusion/physical-plan/src/spill.rs +++ b/datafusion/physical-plan/src/spill.rs @@ -20,14 +20,16 @@ use std::fs::File; use std::io::BufReader; use std::path::{Path, PathBuf}; +use std::ptr::NonNull; +use arrow::array::ArrayData; use arrow::datatypes::SchemaRef; use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; use log::debug; use tokio::sync::mpsc::Sender; -use datafusion_common::{exec_datafusion_err, Result}; +use datafusion_common::{exec_datafusion_err, HashSet, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::human_readable_size; use datafusion_execution::SendableRecordBatchStream; @@ -109,10 +111,67 @@ pub fn spill_record_batch_by_size( Ok(()) } +/// Calculated total used memory of this batch. +/// +/// Example: +/// There is a buffer with memory region [0, 100] +/// The input `batch` has two arrays, referencing into the address [0, 10] and +/// [10, 20] of the buffer respectively. +/// The total memory size should be 100 (total memory allocated), instead of 20 +/// (actual used data size). +/// +/// Note: Current `RecordBatch`.get_array_memory_size()` will double count the +/// buffer memory size if multiple arrays within the batch are sharing the same +/// `Buffer`. This method provides temporary fix until the issue is resolved: +/// https://github.com/apache/arrow-rs/issues/6439 +pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize { + // Store pointers to `Buffer`'s start memory address (instead of actual + // used data region's pointer represented by current `Array`) + let mut counted_buffers: HashSet> = HashSet::new(); + let mut total_size = 0; + + for array in batch.columns() { + let array_data = array.to_data(); + count_array_data_memory_size(&array_data, &mut counted_buffers, &mut total_size); + } + + total_size +} + +/// Count the memory usage of `array_data` and its children recursively. +fn count_array_data_memory_size( + array_data: &ArrayData, + counted_buffers: &mut HashSet>, + total_size: &mut usize, +) { + // Count memory usage for `array_data` + for buffer in array_data.buffers() { + if counted_buffers.insert(buffer.data_ptr()) { + *total_size += buffer.capacity(); + } // Otherwise the buffer's memory is already counted + } + + if let Some(null_buffer) = array_data.nulls() { + if counted_buffers.insert(null_buffer.inner().inner().data_ptr()) { + *total_size += null_buffer.inner().inner().capacity(); + } + } + + // Count all children `ArrayData` recursively + for child in array_data.child_data() { + count_array_data_memory_size(child, counted_buffers, total_size); + } +} + #[cfg(test)] mod tests { + use super::*; use crate::spill::{spill_record_batch_by_size, spill_record_batches}; use crate::test::build_table_i32; + use arrow::array::{Float64Array, Int32Array}; + use arrow::datatypes::{DataType, Field, Float64Type, Int32Type, Schema}; + use arrow::record_batch::RecordBatch; + use arrow_array::ListArray; use datafusion_common::Result; use datafusion_execution::disk_manager::DiskManagerConfig; use datafusion_execution::DiskManager; @@ -147,7 +206,7 @@ mod tests { assert_eq!(cnt.unwrap(), num_rows); let file = BufReader::new(File::open(spill_file.path())?); - let reader = arrow::ipc::reader::FileReader::try_new(file, None)?; + let reader = FileReader::try_new(file, None)?; assert_eq!(reader.num_batches(), 2); assert_eq!(reader.schema(), schema); @@ -175,11 +234,107 @@ mod tests { )?; let file = BufReader::new(File::open(spill_file.path())?); - let reader = arrow::ipc::reader::FileReader::try_new(file, None)?; + let reader = FileReader::try_new(file, None)?; assert_eq!(reader.num_batches(), 4); assert_eq!(reader.schema(), schema); Ok(()) } + + #[test] + fn test_get_record_batch_memory_size() { + // Create a simple record batch with two columns + let schema = Arc::new(Schema::new(vec![ + Field::new("ints", DataType::Int32, true), + Field::new("float64", DataType::Float64, false), + ])); + + let int_array = + Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]); + let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]); + + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(int_array), Arc::new(float64_array)], + ) + .unwrap(); + + let size = get_record_batch_memory_size(&batch); + assert_eq!(size, 60); + } + + #[test] + fn test_get_record_batch_memory_size_empty() { + // Test with empty record batch + let schema = Arc::new(Schema::new(vec![Field::new( + "ints", + DataType::Int32, + false, + )])); + + let int_array: Int32Array = Int32Array::from(vec![] as Vec); + let batch = RecordBatch::try_new(schema, vec![Arc::new(int_array)]).unwrap(); + + let size = get_record_batch_memory_size(&batch); + assert_eq!(size, 0, "Empty batch should have 0 memory size"); + } + + #[test] + fn test_get_record_batch_memory_size_shared_buffer() { + // Test with slices that share the same underlying buffer + let schema = Arc::new(Schema::new(vec![ + Field::new("slice1", DataType::Int32, false), + Field::new("slice2", DataType::Int32, false), + ])); + + let original = Int32Array::from(vec![1, 2, 3, 4, 5]); + let slice1 = original.slice(0, 3); + let slice2 = original.slice(2, 3); + + let batch = + RecordBatch::try_new(schema, vec![Arc::new(slice1), Arc::new(slice2)]) + .unwrap(); + + let size = get_record_batch_memory_size(&batch); + // The size should only count the shared buffer once + assert_eq!(size, 20); + } + + #[test] + fn test_get_record_batch_memory_size_nested_array() { + let schema = Arc::new(Schema::new(vec![ + Field::new( + "nested_int", + DataType::List(Arc::new( + Field::new("item", DataType::Int32, true).into(), + )), + false, + ), + Field::new( + "nested_int2", + DataType::List(Arc::new( + Field::new("item", DataType::Int32, true).into(), + )), + false, + ), + ])); + + let int_list_array = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), Some(3)]), + ]); + + let int_list_array2 = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(4), Some(5), Some(6)]), + ]); + + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(int_list_array), Arc::new(int_list_array2)], + ) + .unwrap(); + + let size = get_record_batch_memory_size(&batch); + assert_eq!(size, 8320); + } }