Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
2010YOUY01 committed Mar 3, 2025
1 parent 382e232 commit 81fc7e0
Show file tree
Hide file tree
Showing 9 changed files with 427 additions and 153 deletions.
81 changes: 80 additions & 1 deletion datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ use datafusion_common::{assert_contains, Result};
use datafusion_execution::memory_pool::{
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
};
use datafusion_execution::TaskContext;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::{DiskManager, TaskContext};
use datafusion_expr::{Expr, TableType};
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_optimizer::join_selection::JoinSelection;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::collect;
use datafusion_physical_plan::spill::get_record_batch_memory_size;
use rand::Rng;
use test_utils::AccessLogGenerator;
Expand Down Expand Up @@ -468,6 +470,83 @@ async fn test_stringview_external_sort() {
let _ = df.collect().await.expect("Query execution failed");
}

// Tests for disk limit (`max_temp_directory_size` in `DiskManager`)
// ------------------------------------------------------------------

// Create a new `SessionContext` with speicified disk limit and memory pool limit
async fn setup_context(
disk_limit: usize,
memory_pool_limit: usize,
) -> Result<SessionContext> {
let disk_manager = DiskManager::try_new_without_arc(DiskManagerConfig::NewOs)?
.with_max_temp_directory_size(disk_limit)?;

let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(memory_pool_limit)))
.build_arc()
.unwrap();

let runtime = Arc::new(RuntimeEnv {
memory_pool: runtime.memory_pool.clone(),
disk_manager: Arc::new(disk_manager),
cache_manager: runtime.cache_manager.clone(),
object_store_registry: runtime.object_store_registry.clone(),
});

let config = SessionConfig::new()
.with_sort_spill_reservation_bytes(10 * 1024 * 1024) // 10MB
.with_target_partitions(1);

Ok(SessionContext::new_with_config_rt(config, runtime))
}

/// If the spilled bytes exceed the disk limit, the query should fail
/// (specified by `max_temp_directory_size` in `DiskManager`)
#[tokio::test]
async fn test_disk_spill_limit_reached() -> Result<()> {
let ctx = setup_context(100 * 1024 * 1024, 60 * 1024 * 1024).await?;

let df = ctx
.sql("select * from generate_series(1, 1000000000000) as t1(v1) order by v1")
.await
.unwrap();

let err = df.collect().await.unwrap_err();
assert_contains!(
err.to_string(),
"The used disk space during the spilling process has exceeded the allowable limit"
);

Ok(())
}

/// External query should succeed, if the spilled bytes is less than the disk limit
#[tokio::test]
async fn test_disk_spill_limit_not_reached() -> Result<()> {
let disk_spill_limit = 100 * 1024 * 1024; // 100MB
let ctx = setup_context(disk_spill_limit, 60 * 1024 * 1024).await?;

let df = ctx
.sql("select * from generate_series(1, 10000000) as t1(v1) order by v1")
.await
.unwrap();
let plan = df.create_physical_plan().await.unwrap();

let task_ctx = ctx.task_ctx();
let _ = collect(Arc::clone(&plan), task_ctx)
.await
.expect("Query execution failed");

let spill_count = plan.metrics().unwrap().spill_count().unwrap();
let spilled_bytes = plan.metrics().unwrap().spilled_bytes().unwrap();

println!("spill count {}, spill bytes {}", spill_count, spilled_bytes);
assert!(spill_count > 0);
assert!(spilled_bytes < disk_spill_limit);

Ok(())
}

/// Run the query with the specified memory limit,
/// and verifies the expected errors are returned
#[derive(Clone, Debug)]
Expand Down
179 changes: 178 additions & 1 deletion datafusion/execution/src/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,26 @@

//! [`DiskManager`]: Manages files generated during query execution
use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
use arrow::array::RecordBatch;
use arrow::datatypes::Schema;
use arrow::ipc::writer::StreamWriter;
use datafusion_common::{
config_err, exec_datafusion_err, internal_err, resources_datafusion_err,
resources_err, DataFusionError, Result,
};
use log::debug;
use parking_lot::Mutex;
use rand::{thread_rng, Rng};
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tempfile::{Builder, NamedTempFile, TempDir};

use crate::memory_pool::human_readable_size;

const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: usize = 100 * 1024 * 1024 * 1024; // 100GB

/// Configuration for temporary disk access
#[derive(Debug, Clone)]
pub enum DiskManagerConfig {
Expand Down Expand Up @@ -75,6 +87,14 @@ pub struct DiskManager {
/// If `Some(vec![])` a new OS specified temporary directory will be created
/// If `None` an error will be returned (configured not to spill)
local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,

/// The maximum amount of data (in bytes) stored inside the temporary directories.
/// Default to 100GB
max_temp_directory_size: usize,

/// Used disk space in the temporary directories. Now only spilled data for
/// external executors are counted.
used_disk_space: AtomicUsize,
}

impl DiskManager {
Expand All @@ -84,6 +104,8 @@ impl DiskManager {
DiskManagerConfig::Existing(manager) => Ok(manager),
DiskManagerConfig::NewOs => Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(vec![])),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: AtomicUsize::new(0),
})),
DiskManagerConfig::NewSpecified(conf_dirs) => {
let local_dirs = create_local_dirs(conf_dirs)?;
Expand All @@ -93,14 +115,67 @@ impl DiskManager {
);
Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(local_dirs)),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: AtomicUsize::new(0),
}))
}
DiskManagerConfig::Disabled => Ok(Arc::new(Self {
local_dirs: Mutex::new(None),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: AtomicUsize::new(0),
})),
}
}

pub fn try_new_without_arc(config: DiskManagerConfig) -> Result<Self> {
match config {
DiskManagerConfig::Existing(manager) => {
Arc::try_unwrap(manager).map_err(|_| {
DataFusionError::Internal("Failed to unwrap Arc".to_string())
})
}
DiskManagerConfig::NewOs => Ok(Self {
local_dirs: Mutex::new(Some(vec![])),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: AtomicUsize::new(0),
}),
DiskManagerConfig::NewSpecified(conf_dirs) => {
let local_dirs = create_local_dirs(conf_dirs)?;
debug!(
"Created local dirs {:?} as DataFusion working directory",
local_dirs
);
Ok(Self {
local_dirs: Mutex::new(Some(local_dirs)),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: AtomicUsize::new(0),
})
}
DiskManagerConfig::Disabled => Ok(Self {
local_dirs: Mutex::new(None),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: AtomicUsize::new(0),
}),
}
}

/// Set the maximum amount of data (in bytes) stored inside the temporary directories.
pub fn with_max_temp_directory_size(
mut self,
max_temp_directory_size: usize,
) -> Result<Self> {
// If the disk manager is disabled and `max_temp_directory_size` is not 0,
// this operation is not meaningful, fail early.
if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
return config_err!(
"Cannot set max temp directory size for disabled disk manager"
);
}

self.max_temp_directory_size = max_temp_directory_size;
Ok(self)
}

/// Return true if this disk manager supports creating temporary
/// files. If this returns false, any call to `create_tmp_file`
/// will error.
Expand Down Expand Up @@ -144,6 +219,63 @@ impl DiskManager {
.map_err(DataFusionError::IoError)?,
})
}

pub fn try_write_record_batches(
&self,
batches: &[RecordBatch],
request_description: &str,
) -> Result<(RefCountedTempFile, SingleSpillStats)> {
if batches.is_empty() {
return internal_err!(
"`try_write_record_batches` requires at least one batch"
);
}

let spill_file = self.create_tmp_file(request_description)?;
let schema = batches[0].schema();

let mut stream_writer = IPCStreamWriter::new(spill_file.path(), schema.as_ref())?;

for batch in batches {
// The IPC Stream writer does not have a mechanism to avoid writing duplicate
// `Buffer`s repeatedly, so we do not use `get_record_batch_memory_size()`
// to estimate the memory size with duplicated `Buffer`s.
let estimate_extra_size = batch.get_array_memory_size();

if self
.used_disk_space
.fetch_add(estimate_extra_size, Ordering::Relaxed)
> self.max_temp_directory_size
{
return resources_err!(
"The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.",
human_readable_size(self.max_temp_directory_size)
);
}

stream_writer.write(batch)?;
}

stream_writer.finish()?;

Ok((
spill_file,
SingleSpillStats {
spilled_batches: stream_writer.num_batches,
spilled_bytes: stream_writer.num_bytes,
spilled_rows: stream_writer.num_rows,
},
))
}
}

pub struct SingleSpillStats {
/// The number of spilled batches
pub spilled_batches: usize,
/// The total size of spilled data
pub spilled_bytes: usize,
/// The total number of spilled rows
pub spilled_rows: usize,
}

/// A wrapper around a [`NamedTempFile`] that also contains
Expand Down Expand Up @@ -183,6 +315,51 @@ fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<Arc<TempDir>>> {
.collect()
}

/// Write in Arrow IPC Stream format to a file.
///
/// Stream format is used for spill because it supports dictionary replacement, and the random
/// access of IPC File format is not needed (IPC File format doesn't support dictionary replacement).
struct IPCStreamWriter {
/// Inner writer
pub writer: StreamWriter<File>,
/// Batches written
pub num_batches: usize,
/// Rows written
pub num_rows: usize,
/// Bytes written
pub num_bytes: usize,
}

impl IPCStreamWriter {
/// Create new writer
pub fn new(path: &Path, schema: &Schema) -> Result<Self> {
let file = File::create(path).map_err(|e| {
exec_datafusion_err!("Failed to create partition file at {path:?}: {e:?}")
})?;
Ok(Self {
num_batches: 0,
num_rows: 0,
num_bytes: 0,
writer: StreamWriter::try_new(file, schema)?,
})
}

/// Write one single batch
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.writer.write(batch)?;
self.num_batches += 1;
self.num_rows += batch.num_rows();
let num_bytes: usize = batch.get_array_memory_size();
self.num_bytes += num_bytes;
Ok(())
}

/// Finish the writer
pub fn finish(&mut self) -> Result<()> {
self.writer.finish().map_err(Into::into)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub mod registry {
};
}

pub use disk_manager::DiskManager;
pub use disk_manager::{DiskManager, SingleSpillStats};
pub use registry::FunctionRegistry;
pub use stream::{RecordBatchStream, SendableRecordBatchStream};
pub use task::TaskContext;
25 changes: 12 additions & 13 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::aggregates::{
create_schema, evaluate_group_by, evaluate_many, evaluate_optional, AggregateMode,
PhysicalGroupBy,
};
use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput};
use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput, SpillMetrics};
use crate::sorts::sort::sort_batch;
use crate::sorts::streaming_merge::StreamingMergeBuilder;
use crate::spill::{read_spill_as_stream, spill_record_batch_by_size};
Expand Down Expand Up @@ -109,12 +109,8 @@ struct SpillState {
/// Peak memory used for buffered data.
/// Calculated as sum of peak memory values across partitions
peak_mem_used: metrics::Gauge,
/// count of spill files during the execution of the operator
spill_count: metrics::Count,
/// total spilled bytes during the execution of the operator
spilled_bytes: metrics::Count,
/// total spilled rows during the execution of the operator
spilled_rows: metrics::Count,
/// Spilling-related metrics
spill_metrics: SpillMetrics,
}

/// Tracks if the aggregate should skip partial aggregations
Expand Down Expand Up @@ -553,9 +549,11 @@ impl GroupedHashAggregateStream {
merging_group_by: PhysicalGroupBy::new_single(agg_group_by.expr.clone()),
peak_mem_used: MetricBuilder::new(&agg.metrics)
.gauge("peak_mem_used", partition),
spill_count: MetricBuilder::new(&agg.metrics).spill_count(partition),
spilled_bytes: MetricBuilder::new(&agg.metrics).spilled_bytes(partition),
spilled_rows: MetricBuilder::new(&agg.metrics).spilled_rows(partition),
spill_metrics: SpillMetrics::new(
MetricBuilder::new(&agg.metrics).spill_count(partition),
MetricBuilder::new(&agg.metrics).spilled_bytes(partition),
MetricBuilder::new(&agg.metrics).spilled_rows(partition),
),
};

// Skip aggregation is supported if:
Expand Down Expand Up @@ -998,11 +996,12 @@ impl GroupedHashAggregateStream {
self.spill_state.spills.push(spillfile);

// Update metrics
self.spill_state.spill_count.add(1);
self.spill_state
let spill_metrics = &self.spill_state.spill_metrics;
spill_metrics.spill_file_count.add(1);
spill_metrics
.spilled_bytes
.add(sorted.get_array_memory_size());
self.spill_state.spilled_rows.add(sorted.num_rows());
spill_metrics.spilled_rows.add(sorted.num_rows());

Ok(())
}
Expand Down
Loading

0 comments on commit 81fc7e0

Please sign in to comment.