From 81fc7e01e9bc7a4020bbc67fcc50e1b85204323c Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Mon, 3 Mar 2025 11:25:29 +0800 Subject: [PATCH] draft --- datafusion/core/tests/memory_limit/mod.rs | 81 +++++++- datafusion/execution/src/disk_manager.rs | 179 +++++++++++++++++- datafusion/execution/src/lib.rs | 2 +- .../physical-plan/src/aggregates/row_hash.rs | 25 ++- .../src/joins/sort_merge_join.rs | 43 ++--- .../{baseline.rs => grouped_metrics.rs} | 36 ++++ datafusion/physical-plan/src/metrics/mod.rs | 4 +- datafusion/physical-plan/src/sorts/sort.rs | 37 ++-- datafusion/physical-plan/src/spill.rs | 173 ++++++++--------- 9 files changed, 427 insertions(+), 153 deletions(-) rename datafusion/physical-plan/src/metrics/{baseline.rs => grouped_metrics.rs} (87%) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 2deb8fde2da6f..01a4320bad7ea 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -44,11 +44,13 @@ use datafusion_common::{assert_contains, Result}; use datafusion_execution::memory_pool::{ FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, }; -use datafusion_execution::TaskContext; +use datafusion_execution::runtime_env::RuntimeEnv; +use datafusion_execution::{DiskManager, TaskContext}; use datafusion_expr::{Expr, TableType}; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_optimizer::join_selection::JoinSelection; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::collect; use datafusion_physical_plan::spill::get_record_batch_memory_size; use rand::Rng; use test_utils::AccessLogGenerator; @@ -468,6 +470,83 @@ async fn test_stringview_external_sort() { let _ = df.collect().await.expect("Query execution failed"); } +// Tests for disk limit (`max_temp_directory_size` in `DiskManager`) +// ------------------------------------------------------------------ + +// Create a new `SessionContext` with speicified disk limit and memory pool limit +async fn setup_context( + disk_limit: usize, + memory_pool_limit: usize, +) -> Result { + let disk_manager = DiskManager::try_new_without_arc(DiskManagerConfig::NewOs)? + .with_max_temp_directory_size(disk_limit)?; + + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(FairSpillPool::new(memory_pool_limit))) + .build_arc() + .unwrap(); + + let runtime = Arc::new(RuntimeEnv { + memory_pool: runtime.memory_pool.clone(), + disk_manager: Arc::new(disk_manager), + cache_manager: runtime.cache_manager.clone(), + object_store_registry: runtime.object_store_registry.clone(), + }); + + let config = SessionConfig::new() + .with_sort_spill_reservation_bytes(10 * 1024 * 1024) // 10MB + .with_target_partitions(1); + + Ok(SessionContext::new_with_config_rt(config, runtime)) +} + +/// If the spilled bytes exceed the disk limit, the query should fail +/// (specified by `max_temp_directory_size` in `DiskManager`) +#[tokio::test] +async fn test_disk_spill_limit_reached() -> Result<()> { + let ctx = setup_context(100 * 1024 * 1024, 60 * 1024 * 1024).await?; + + let df = ctx + .sql("select * from generate_series(1, 1000000000000) as t1(v1) order by v1") + .await + .unwrap(); + + let err = df.collect().await.unwrap_err(); + assert_contains!( + err.to_string(), + "The used disk space during the spilling process has exceeded the allowable limit" + ); + + Ok(()) +} + +/// External query should succeed, if the spilled bytes is less than the disk limit +#[tokio::test] +async fn test_disk_spill_limit_not_reached() -> Result<()> { + let disk_spill_limit = 100 * 1024 * 1024; // 100MB + let ctx = setup_context(disk_spill_limit, 60 * 1024 * 1024).await?; + + let df = ctx + .sql("select * from generate_series(1, 10000000) as t1(v1) order by v1") + .await + .unwrap(); + let plan = df.create_physical_plan().await.unwrap(); + + let task_ctx = ctx.task_ctx(); + let _ = collect(Arc::clone(&plan), task_ctx) + .await + .expect("Query execution failed"); + + let spill_count = plan.metrics().unwrap().spill_count().unwrap(); + let spilled_bytes = plan.metrics().unwrap().spilled_bytes().unwrap(); + + println!("spill count {}, spill bytes {}", spill_count, spilled_bytes); + assert!(spill_count > 0); + assert!(spilled_bytes < disk_spill_limit); + + Ok(()) +} + /// Run the query with the specified memory limit, /// and verifies the expected errors are returned #[derive(Clone, Debug)] diff --git a/datafusion/execution/src/disk_manager.rs b/datafusion/execution/src/disk_manager.rs index caa62eefe14c7..5e6a65af5ef31 100644 --- a/datafusion/execution/src/disk_manager.rs +++ b/datafusion/execution/src/disk_manager.rs @@ -17,14 +17,26 @@ //! [`DiskManager`]: Manages files generated during query execution -use datafusion_common::{resources_datafusion_err, DataFusionError, Result}; +use arrow::array::RecordBatch; +use arrow::datatypes::Schema; +use arrow::ipc::writer::StreamWriter; +use datafusion_common::{ + config_err, exec_datafusion_err, internal_err, resources_datafusion_err, + resources_err, DataFusionError, Result, +}; use log::debug; use parking_lot::Mutex; use rand::{thread_rng, Rng}; +use std::fs::File; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tempfile::{Builder, NamedTempFile, TempDir}; +use crate::memory_pool::human_readable_size; + +const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: usize = 100 * 1024 * 1024 * 1024; // 100GB + /// Configuration for temporary disk access #[derive(Debug, Clone)] pub enum DiskManagerConfig { @@ -75,6 +87,14 @@ pub struct DiskManager { /// If `Some(vec![])` a new OS specified temporary directory will be created /// If `None` an error will be returned (configured not to spill) local_dirs: Mutex>>>, + + /// The maximum amount of data (in bytes) stored inside the temporary directories. + /// Default to 100GB + max_temp_directory_size: usize, + + /// Used disk space in the temporary directories. Now only spilled data for + /// external executors are counted. + used_disk_space: AtomicUsize, } impl DiskManager { @@ -84,6 +104,8 @@ impl DiskManager { DiskManagerConfig::Existing(manager) => Ok(manager), DiskManagerConfig::NewOs => Ok(Arc::new(Self { local_dirs: Mutex::new(Some(vec![])), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: AtomicUsize::new(0), })), DiskManagerConfig::NewSpecified(conf_dirs) => { let local_dirs = create_local_dirs(conf_dirs)?; @@ -93,14 +115,67 @@ impl DiskManager { ); Ok(Arc::new(Self { local_dirs: Mutex::new(Some(local_dirs)), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: AtomicUsize::new(0), })) } DiskManagerConfig::Disabled => Ok(Arc::new(Self { local_dirs: Mutex::new(None), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: AtomicUsize::new(0), })), } } + pub fn try_new_without_arc(config: DiskManagerConfig) -> Result { + match config { + DiskManagerConfig::Existing(manager) => { + Arc::try_unwrap(manager).map_err(|_| { + DataFusionError::Internal("Failed to unwrap Arc".to_string()) + }) + } + DiskManagerConfig::NewOs => Ok(Self { + local_dirs: Mutex::new(Some(vec![])), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: AtomicUsize::new(0), + }), + DiskManagerConfig::NewSpecified(conf_dirs) => { + let local_dirs = create_local_dirs(conf_dirs)?; + debug!( + "Created local dirs {:?} as DataFusion working directory", + local_dirs + ); + Ok(Self { + local_dirs: Mutex::new(Some(local_dirs)), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: AtomicUsize::new(0), + }) + } + DiskManagerConfig::Disabled => Ok(Self { + local_dirs: Mutex::new(None), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: AtomicUsize::new(0), + }), + } + } + + /// Set the maximum amount of data (in bytes) stored inside the temporary directories. + pub fn with_max_temp_directory_size( + mut self, + max_temp_directory_size: usize, + ) -> Result { + // If the disk manager is disabled and `max_temp_directory_size` is not 0, + // this operation is not meaningful, fail early. + if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 { + return config_err!( + "Cannot set max temp directory size for disabled disk manager" + ); + } + + self.max_temp_directory_size = max_temp_directory_size; + Ok(self) + } + /// Return true if this disk manager supports creating temporary /// files. If this returns false, any call to `create_tmp_file` /// will error. @@ -144,6 +219,63 @@ impl DiskManager { .map_err(DataFusionError::IoError)?, }) } + + pub fn try_write_record_batches( + &self, + batches: &[RecordBatch], + request_description: &str, + ) -> Result<(RefCountedTempFile, SingleSpillStats)> { + if batches.is_empty() { + return internal_err!( + "`try_write_record_batches` requires at least one batch" + ); + } + + let spill_file = self.create_tmp_file(request_description)?; + let schema = batches[0].schema(); + + let mut stream_writer = IPCStreamWriter::new(spill_file.path(), schema.as_ref())?; + + for batch in batches { + // The IPC Stream writer does not have a mechanism to avoid writing duplicate + // `Buffer`s repeatedly, so we do not use `get_record_batch_memory_size()` + // to estimate the memory size with duplicated `Buffer`s. + let estimate_extra_size = batch.get_array_memory_size(); + + if self + .used_disk_space + .fetch_add(estimate_extra_size, Ordering::Relaxed) + > self.max_temp_directory_size + { + return resources_err!( + "The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.", + human_readable_size(self.max_temp_directory_size) + ); + } + + stream_writer.write(batch)?; + } + + stream_writer.finish()?; + + Ok(( + spill_file, + SingleSpillStats { + spilled_batches: stream_writer.num_batches, + spilled_bytes: stream_writer.num_bytes, + spilled_rows: stream_writer.num_rows, + }, + )) + } +} + +pub struct SingleSpillStats { + /// The number of spilled batches + pub spilled_batches: usize, + /// The total size of spilled data + pub spilled_bytes: usize, + /// The total number of spilled rows + pub spilled_rows: usize, } /// A wrapper around a [`NamedTempFile`] that also contains @@ -183,6 +315,51 @@ fn create_local_dirs(local_dirs: Vec) -> Result>> { .collect() } +/// Write in Arrow IPC Stream format to a file. +/// +/// Stream format is used for spill because it supports dictionary replacement, and the random +/// access of IPC File format is not needed (IPC File format doesn't support dictionary replacement). +struct IPCStreamWriter { + /// Inner writer + pub writer: StreamWriter, + /// Batches written + pub num_batches: usize, + /// Rows written + pub num_rows: usize, + /// Bytes written + pub num_bytes: usize, +} + +impl IPCStreamWriter { + /// Create new writer + pub fn new(path: &Path, schema: &Schema) -> Result { + let file = File::create(path).map_err(|e| { + exec_datafusion_err!("Failed to create partition file at {path:?}: {e:?}") + })?; + Ok(Self { + num_batches: 0, + num_rows: 0, + num_bytes: 0, + writer: StreamWriter::try_new(file, schema)?, + }) + } + + /// Write one single batch + pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { + self.writer.write(batch)?; + self.num_batches += 1; + self.num_rows += batch.num_rows(); + let num_bytes: usize = batch.get_array_memory_size(); + self.num_bytes += num_bytes; + Ok(()) + } + + /// Finish the writer + pub fn finish(&mut self) -> Result<()> { + self.writer.finish().map_err(Into::into) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs index a9e3a27f80356..6241f90c88a51 100644 --- a/datafusion/execution/src/lib.rs +++ b/datafusion/execution/src/lib.rs @@ -40,7 +40,7 @@ pub mod registry { }; } -pub use disk_manager::DiskManager; +pub use disk_manager::{DiskManager, SingleSpillStats}; pub use registry::FunctionRegistry; pub use stream::{RecordBatchStream, SendableRecordBatchStream}; pub use task::TaskContext; diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 05122d5a5403d..03ea6620e91b0 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -27,7 +27,7 @@ use crate::aggregates::{ create_schema, evaluate_group_by, evaluate_many, evaluate_optional, AggregateMode, PhysicalGroupBy, }; -use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput}; +use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput, SpillMetrics}; use crate::sorts::sort::sort_batch; use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::spill::{read_spill_as_stream, spill_record_batch_by_size}; @@ -109,12 +109,8 @@ struct SpillState { /// Peak memory used for buffered data. /// Calculated as sum of peak memory values across partitions peak_mem_used: metrics::Gauge, - /// count of spill files during the execution of the operator - spill_count: metrics::Count, - /// total spilled bytes during the execution of the operator - spilled_bytes: metrics::Count, - /// total spilled rows during the execution of the operator - spilled_rows: metrics::Count, + /// Spilling-related metrics + spill_metrics: SpillMetrics, } /// Tracks if the aggregate should skip partial aggregations @@ -553,9 +549,11 @@ impl GroupedHashAggregateStream { merging_group_by: PhysicalGroupBy::new_single(agg_group_by.expr.clone()), peak_mem_used: MetricBuilder::new(&agg.metrics) .gauge("peak_mem_used", partition), - spill_count: MetricBuilder::new(&agg.metrics).spill_count(partition), - spilled_bytes: MetricBuilder::new(&agg.metrics).spilled_bytes(partition), - spilled_rows: MetricBuilder::new(&agg.metrics).spilled_rows(partition), + spill_metrics: SpillMetrics::new( + MetricBuilder::new(&agg.metrics).spill_count(partition), + MetricBuilder::new(&agg.metrics).spilled_bytes(partition), + MetricBuilder::new(&agg.metrics).spilled_rows(partition), + ), }; // Skip aggregation is supported if: @@ -998,11 +996,12 @@ impl GroupedHashAggregateStream { self.spill_state.spills.push(spillfile); // Update metrics - self.spill_state.spill_count.add(1); - self.spill_state + let spill_metrics = &self.spill_state.spill_metrics; + spill_metrics.spill_file_count.add(1); + spill_metrics .spilled_bytes .add(sorted.get_array_memory_size()); - self.spill_state.spilled_rows.add(sorted.num_rows()); + spill_metrics.spilled_rows.add(sorted.num_rows()); Ok(()) } diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 9b008f5242c47..35aa1e82f4aab 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -41,7 +41,9 @@ use crate::joins::utils::{ reorder_output_after_swap, symmetric_join_output_partitioning, JoinFilter, JoinOn, JoinOnRef, }; -use crate::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; +use crate::metrics::{ + Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, SpillMetrics, +}; use crate::projection::{ join_allows_pushdown, join_table_borders, new_join_children, physical_to_column_exprs, update_join_on, ProjectionExec, @@ -580,12 +582,8 @@ struct SortMergeJoinMetrics { /// Peak memory used for buffered data. /// Calculated as sum of peak memory values across partitions peak_mem_used: metrics::Gauge, - /// count of spills during the execution of the operator - spill_count: Count, - /// total spilled bytes during the execution of the operator - spilled_bytes: Count, - /// total spilled rows during the execution of the operator - spilled_rows: Count, + /// Spilling-related metrics + spill_metrics: SpillMetrics, } impl SortMergeJoinMetrics { @@ -599,9 +597,11 @@ impl SortMergeJoinMetrics { MetricBuilder::new(metrics).counter("output_batches", partition); let output_rows = MetricBuilder::new(metrics).output_rows(partition); let peak_mem_used = MetricBuilder::new(metrics).gauge("peak_mem_used", partition); - let spill_count = MetricBuilder::new(metrics).spill_count(partition); - let spilled_bytes = MetricBuilder::new(metrics).spilled_bytes(partition); - let spilled_rows = MetricBuilder::new(metrics).spilled_rows(partition); + let spill_metrics = SpillMetrics::new( + MetricBuilder::new(metrics).spill_count(partition), + MetricBuilder::new(metrics).spilled_bytes(partition), + MetricBuilder::new(metrics).spilled_rows(partition), + ); Self { join_time, @@ -610,9 +610,7 @@ impl SortMergeJoinMetrics { output_batches, output_rows, peak_mem_used, - spill_count, - spilled_bytes, - spilled_rows, + spill_metrics, } } } @@ -1387,26 +1385,17 @@ impl SortMergeJoinStream { } Err(_) if self.runtime_env.disk_manager.tmp_files_enabled() => { // spill buffered batch to disk - let spill_file = self - .runtime_env - .disk_manager - .create_tmp_file("sort_merge_join_buffered_spill")?; - if let Some(batch) = buffered_batch.batch { - spill_record_batches( + let spill_file = spill_record_batches( &[batch], - spill_file.path().into(), - Arc::clone(&self.buffered_schema), + Arc::clone(&self.runtime_env.disk_manager), + "sort_merge_join_buffered_spill", + &mut self.join_metrics.spill_metrics, )?; + buffered_batch.spill_file = Some(spill_file); buffered_batch.batch = None; - // update metrics to register spill - self.join_metrics.spill_count.add(1); - self.join_metrics - .spilled_bytes - .add(buffered_batch.size_estimation); - self.join_metrics.spilled_rows.add(buffered_batch.num_rows); Ok(()) } else { internal_err!("Buffered batch has empty body") diff --git a/datafusion/physical-plan/src/metrics/baseline.rs b/datafusion/physical-plan/src/metrics/grouped_metrics.rs similarity index 87% rename from datafusion/physical-plan/src/metrics/baseline.rs rename to datafusion/physical-plan/src/metrics/grouped_metrics.rs index b26a08dd0fada..f68672525cca5 100644 --- a/datafusion/physical-plan/src/metrics/baseline.rs +++ b/datafusion/physical-plan/src/metrics/grouped_metrics.rs @@ -143,6 +143,42 @@ impl Drop for BaselineMetrics { } } +/// A set of metrics related to spilling during the execution of an operator +#[derive(Debug, Clone)] +pub struct SpillMetrics { + /// count of spill files during the execution of the operator + pub spill_file_count: Count, + /// total spilled bytes during the execution of the operator + pub spilled_bytes: Count, + /// total spilled rows during the execution of the operator + pub spilled_rows: Count, +} + +impl Default for SpillMetrics { + fn default() -> Self { + Self { + spill_file_count: Count::default(), + spilled_bytes: Count::default(), + spilled_rows: Count::default(), + } + } +} + +impl SpillMetrics { + /// Create a new set of spill metrics + pub fn new( + spill_file_count: Count, + spilled_bytes: Count, + spilled_rows: Count, + ) -> Self { + Self { + spill_file_count, + spilled_bytes, + spilled_rows, + } + } +} + /// Trait for things that produce output rows as a result of execution. pub trait RecordOutput { /// Record that some number of output rows have been produced diff --git a/datafusion/physical-plan/src/metrics/mod.rs b/datafusion/physical-plan/src/metrics/mod.rs index 50252e8d973ac..d7b0d6b9c0952 100644 --- a/datafusion/physical-plan/src/metrics/mod.rs +++ b/datafusion/physical-plan/src/metrics/mod.rs @@ -17,8 +17,8 @@ //! Metrics for recording information about execution -mod baseline; mod builder; +mod grouped_metrics; mod value; use parking_lot::Mutex; @@ -31,8 +31,8 @@ use std::{ use datafusion_common::HashMap; // public exports -pub use baseline::{BaselineMetrics, RecordOutput}; pub use builder::MetricBuilder; +pub use grouped_metrics::{BaselineMetrics, RecordOutput, SpillMetrics}; pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp}; /// Something that tracks a value of interest (metric) of a DataFusion diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 751496c70808e..a129dcf8f053c 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -29,7 +29,7 @@ use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; use crate::expressions::PhysicalSortExpr; use crate::limit::LimitStream; use crate::metrics::{ - BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, + BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, SpillMetrics, }; use crate::projection::{make_with_child, update_expr, ProjectionExec}; use crate::sorts::streaming_merge::StreamingMergeBuilder; @@ -65,23 +65,19 @@ struct ExternalSorterMetrics { /// metrics baseline: BaselineMetrics, - /// count of spills during the execution of the operator - spill_count: Count, - - /// total spilled bytes during the execution of the operator - spilled_bytes: Count, - - /// total spilled rows during the execution of the operator - spilled_rows: Count, + /// Spilling-related metrics + spill_metrics: SpillMetrics, } impl ExternalSorterMetrics { fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { Self { baseline: BaselineMetrics::new(metrics, partition), - spill_count: MetricBuilder::new(metrics).spill_count(partition), - spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition), - spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition), + spill_metrics: SpillMetrics::new( + MetricBuilder::new(metrics).spill_count(partition), + MetricBuilder::new(metrics).spilled_bytes(partition), + MetricBuilder::new(metrics).spilled_rows(partition), + ), } } } @@ -377,17 +373,17 @@ impl ExternalSorter { /// How many bytes have been spilled to disk? fn spilled_bytes(&self) -> usize { - self.metrics.spilled_bytes.value() + self.metrics.spill_metrics.spilled_bytes.value() } /// How many rows have been spilled to disk? fn spilled_rows(&self) -> usize { - self.metrics.spilled_rows.value() + self.metrics.spill_metrics.spilled_rows.value() } /// How many spill files have been created? fn spill_count(&self) -> usize { - self.metrics.spill_count.value() + self.metrics.spill_metrics.spill_file_count.value() } /// Writes any `in_memory_batches` to a spill file and clears @@ -404,17 +400,14 @@ impl ExternalSorter { debug!("Spilling sort data of ExternalSorter to disk whilst inserting"); - let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?; let batches = std::mem::take(&mut self.in_mem_batches); - let (spilled_rows, spilled_bytes) = spill_record_batches( + let spill_file = spill_record_batches( &batches, - spill_file.path().into(), - Arc::clone(&self.schema), + Arc::clone(&self.runtime.disk_manager), + "Sorting", + &mut self.metrics.spill_metrics, )?; let used = self.reservation.free(); - self.metrics.spill_count.add(1); - self.metrics.spilled_bytes.add(spilled_bytes); - self.metrics.spilled_rows.add(spilled_rows); self.spills.push(spill_file); Ok(used) } diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill.rs index fa1b8a91cec7c..3872560472552 100644 --- a/datafusion/physical-plan/src/spill.rs +++ b/datafusion/physical-plan/src/spill.rs @@ -21,19 +21,19 @@ use std::fs::File; use std::io::BufReader; use std::path::{Path, PathBuf}; use std::ptr::NonNull; +use std::sync::Arc; use arrow::array::ArrayData; use arrow::datatypes::{Schema, SchemaRef}; use arrow::ipc::{reader::StreamReader, writer::StreamWriter}; use arrow::record_batch::RecordBatch; -use log::debug; use tokio::sync::mpsc::Sender; use datafusion_common::{exec_datafusion_err, HashSet, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; -use datafusion_execution::memory_pool::human_readable_size; -use datafusion_execution::SendableRecordBatchStream; +use datafusion_execution::{DiskManager, SendableRecordBatchStream}; +use crate::metrics::SpillMetrics; use crate::stream::RecordBatchReceiverStream; /// Read spilled batches from the disk @@ -54,41 +54,51 @@ pub(crate) fn read_spill_as_stream( Ok(builder.build()) } -/// Spills in-memory `batches` to disk. -/// -/// Returns total number of the rows spilled to disk. pub(crate) fn spill_record_batches( batches: &[RecordBatch], - path: PathBuf, - schema: SchemaRef, -) -> Result<(usize, usize)> { - let mut writer = IPCStreamWriter::new(path.as_ref(), schema.as_ref())?; - for batch in batches { - writer.write(batch)?; - } - writer.finish()?; - debug!( - "Spilled {} batches of total {} rows to disk, memory released {}", - writer.num_batches, - writer.num_rows, - human_readable_size(writer.num_bytes), - ); - Ok((writer.num_rows, writer.num_bytes)) + disk_manager: Arc, + request_description: &str, + spill_metrics: &mut SpillMetrics, +) -> Result { + let (spill_file, spill_stats) = + disk_manager.try_write_record_batches(batches, request_description)?; + + spill_metrics.spilled_bytes.add(spill_stats.spilled_bytes); + spill_metrics.spilled_rows.add(spill_stats.spilled_rows); + spill_metrics.spill_file_count.add(1); + + Ok(spill_file) } -fn read_spill(sender: Sender>, path: &Path) -> Result<()> { - let file = BufReader::new(File::open(path)?); - let reader = StreamReader::try_new(file, None)?; - for batch in reader { - sender - .blocking_send(batch.map_err(Into::into)) - .map_err(|e| exec_datafusion_err!("{e}"))?; +pub(crate) fn spill_record_batch_by_row_size( + batch: &RecordBatch, + disk_manager: Arc, + request_description: &str, + spill_metrics: &mut SpillMetrics, + row_limit: usize, +) -> Result { + let total_rows = batch.num_rows(); + let mut batches = Vec::new(); + let mut offset = 0; + + // Slice the batch (with zero-copy) into smaller batches + while offset < total_rows { + let length = std::cmp::min(total_rows - offset, row_limit); + let sliced_batch = batch.slice(offset, length); + batches.push(sliced_batch); + offset += length; } - Ok(()) + + // Spill the sliced batches to disk + spill_record_batches(&batches, disk_manager, request_description, spill_metrics) } /// Spill the `RecordBatch` to disk as smaller batches /// split by `batch_size_rows` +#[deprecated( + since = "46.0.0", + note = "This function is intended primarily for internal use within DataFusion for the purpose of spilling operators. If you are relying on this API, please use [`arrow::ipc::writer::StreamWriter`] instead." +)] pub fn spill_record_batch_by_size( batch: &RecordBatch, path: PathBuf, @@ -110,6 +120,39 @@ pub fn spill_record_batch_by_size( Ok(()) } +// /// Spills in-memory `batches` to disk. +// /// +// /// Returns total number of the rows spilled to disk. +// pub(crate) fn spill_record_batches( +// batches: &[RecordBatch], +// path: PathBuf, +// schema: SchemaRef, +// ) -> Result<(usize, usize)> { +// let mut writer = IPCStreamWriter::new(path.as_ref(), schema.as_ref())?; +// for batch in batches { +// writer.write(batch)?; +// } +// writer.finish()?; +// debug!( +// "Spilled {} batches of total {} rows to disk, memory released {}", +// writer.num_batches, +// writer.num_rows, +// human_readable_size(writer.num_bytes), +// ); +// Ok((writer.num_rows, writer.num_bytes)) +// } + +fn read_spill(sender: Sender>, path: &Path) -> Result<()> { + let file = BufReader::new(File::open(path)?); + let reader = StreamReader::try_new(file, None)?; + for batch in reader { + sender + .blocking_send(batch.map_err(Into::into)) + .map_err(|e| exec_datafusion_err!("{e}"))?; + } + Ok(()) +} + /// Calculate total used memory of this batch. /// /// This function is used to estimate the physical memory usage of the `RecordBatch`. @@ -178,51 +221,6 @@ fn count_array_data_memory_size( } } -/// Write in Arrow IPC Stream format to a file. -/// -/// Stream format is used for spill because it supports dictionary replacement, and the random -/// access of IPC File format is not needed (IPC File format doesn't support dictionary replacement). -struct IPCStreamWriter { - /// Inner writer - pub writer: StreamWriter, - /// Batches written - pub num_batches: usize, - /// Rows written - pub num_rows: usize, - /// Bytes written - pub num_bytes: usize, -} - -impl IPCStreamWriter { - /// Create new writer - pub fn new(path: &Path, schema: &Schema) -> Result { - let file = File::create(path).map_err(|e| { - exec_datafusion_err!("Failed to create partition file at {path:?}: {e:?}") - })?; - Ok(Self { - num_batches: 0, - num_rows: 0, - num_bytes: 0, - writer: StreamWriter::try_new(file, schema)?, - }) - } - - /// Write one single batch - pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { - self.writer.write(batch)?; - self.num_batches += 1; - self.num_rows += batch.num_rows(); - let num_bytes: usize = batch.get_array_memory_size(); - self.num_bytes += num_bytes; - Ok(()) - } - - /// Finish the writer - pub fn finish(&mut self) -> Result<()> { - self.writer.finish().map_err(Into::into) - } -} - #[cfg(test)] mod tests { use super::*; @@ -256,15 +254,17 @@ mod tests { let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; - let spill_file = disk_manager.create_tmp_file("Test Spill")?; let schema = batch1.schema(); let num_rows = batch1.num_rows() + batch2.num_rows(); - let (spilled_rows, _) = spill_record_batches( + + let mut tmp_metrics = SpillMetrics::default(); + let spill_file = spill_record_batches( &[batch1, batch2], - spill_file.path().into(), - Arc::clone(&schema), + Arc::clone(&disk_manager), + "Test Spill", + &mut tmp_metrics, )?; - assert_eq!(spilled_rows, num_rows); + assert_eq!(tmp_metrics.spilled_rows.value(), num_rows); let file = BufReader::new(File::open(spill_file.path())?); let reader = StreamReader::try_new(file, None)?; @@ -322,14 +322,15 @@ mod tests { let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; - let spill_file = disk_manager.create_tmp_file("Test Spill")?; - let num_rows = batch1.num_rows() + batch2.num_rows(); - let (spilled_rows, _) = spill_record_batches( - &[batch1, batch2], - spill_file.path().into(), - Arc::clone(&dict_schema), + let mut tmp_metrics = SpillMetrics::default(); + let spill_file = spill_record_batches( + &[batch1.clone(), batch2.clone()], + Arc::clone(&disk_manager), + "Test Spill", + &mut tmp_metrics, )?; - assert_eq!(spilled_rows, num_rows); + let num_rows = batch1.num_rows() + batch2.num_rows(); + assert_eq!(tmp_metrics.spilled_rows.value(), num_rows); let file = BufReader::new(File::open(spill_file.path())?); let reader = StreamReader::try_new(file, None)?;