Skip to content

Commit

Permalink
feat: track memory usage for recursive CTE, enable recursive CTEs by …
Browse files Browse the repository at this point in the history
…default (#9619)

* feat: track memory usage for recursive CTE

* add e2e test

* drop previous stream

* move doc

* fix sql_integration

* fix config test

* pass the reservation to MemoryStream

* export ReservedBatches
  • Loading branch information
jonahgao authored Mar 16, 2024
1 parent 6e90f01 commit 80d223f
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 45 deletions.
4 changes: 1 addition & 3 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,7 @@ config_namespace! {
pub listing_table_ignore_subdirectory: bool, default = true

/// Should DataFusion support recursive CTEs
/// Defaults to false since this feature is a work in progress and may not
/// behave as expected
pub enable_recursive_ctes: bool, default = false
pub enable_recursive_ctes: bool, default = true
}
}

Expand Down
22 changes: 22 additions & 0 deletions datafusion/core/tests/memory_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,28 @@ async fn sort_spill_reservation() {
test.with_config(config).with_expected_success().run().await;
}

#[tokio::test]
async fn oom_recursive_cte() {
TestCase::new()
.with_query(
"WITH RECURSIVE nodes AS (
SELECT 1 as id
UNION ALL
SELECT UNNEST(RANGE(id+1, id+1000)) as id
FROM nodes
WHERE id < 10
)
SELECT * FROM nodes;",
)
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"RecursiveQuery",
])
.with_memory_limit(2_000)
.run()
.await
}

/// Run the query with the specified memory limit,
/// and verifies the expected errors are returned
#[derive(Clone, Debug)]
Expand Down
10 changes: 10 additions & 0 deletions datafusion/physical-plan/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use super::{
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{internal_err, project_schema, Result};
use datafusion_execution::memory_pool::MemoryReservation;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};

Expand Down Expand Up @@ -236,6 +237,8 @@ impl MemoryExec {
pub struct MemoryStream {
/// Vector of record batches
data: Vec<RecordBatch>,
/// Optional memory reservation bound to the data, freed on drop
reservation: Option<MemoryReservation>,
/// Schema representing the data
schema: SchemaRef,
/// Optional projection for which columns to load
Expand All @@ -253,11 +256,18 @@ impl MemoryStream {
) -> Result<Self> {
Ok(Self {
data,
reservation: None,
schema,
projection,
index: 0,
})
}

/// Set the memory reservation for the data
pub(super) fn with_reservation(mut self, reservation: MemoryReservation) -> Self {
self.reservation = Some(reservation);
self
}
}

impl Stream for MemoryStream {
Expand Down
19 changes: 16 additions & 3 deletions datafusion/physical-plan/src/recursive_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::task::{Context, Poll};

use super::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
work_table::{WorkTable, WorkTableExec},
work_table::{ReservedBatches, WorkTable, WorkTableExec},
PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use crate::{DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan};
Expand All @@ -32,6 +32,7 @@ use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};

Expand Down Expand Up @@ -236,6 +237,8 @@ struct RecursiveQueryStream {
/// In-memory buffer for storing a copy of the current results. Will be
/// cleared after each iteration.
buffer: Vec<RecordBatch>,
/// Tracks the memory used by the buffer
reservation: MemoryReservation,
// /// Metrics.
_baseline_metrics: BaselineMetrics,
}
Expand All @@ -250,6 +253,8 @@ impl RecursiveQueryStream {
baseline_metrics: BaselineMetrics,
) -> Self {
let schema = static_stream.schema();
let reservation =
MemoryConsumer::new("RecursiveQuery").register(task_context.memory_pool());
Self {
task_context,
work_table,
Expand All @@ -258,6 +263,7 @@ impl RecursiveQueryStream {
recursive_stream: None,
schema,
buffer: vec![],
reservation,
_baseline_metrics: baseline_metrics,
}
}
Expand All @@ -268,6 +274,10 @@ impl RecursiveQueryStream {
mut self: std::pin::Pin<&mut Self>,
batch: RecordBatch,
) -> Poll<Option<Result<RecordBatch>>> {
if let Err(e) = self.reservation.try_grow(batch.get_array_memory_size()) {
return Poll::Ready(Some(Err(e)));
}

self.buffer.push(batch.clone());
Poll::Ready(Some(Ok(batch)))
}
Expand All @@ -289,8 +299,11 @@ impl RecursiveQueryStream {
}

// Update the work table with the current buffer
let batches = self.buffer.drain(..).collect();
self.work_table.write(batches);
let reserved_batches = ReservedBatches::new(
std::mem::take(&mut self.buffer),
self.reservation.take(),
);
self.work_table.update(reserved_batches);

// We always execute (and re-execute iteratively) the first partition.
// Downstream plans should not expect any partitioning.
Expand Down
88 changes: 72 additions & 16 deletions datafusion/physical-plan/src/work_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,34 @@ use crate::{DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProp

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{internal_err, Result};
use datafusion_common::{internal_datafusion_err, internal_err, Result};
use datafusion_execution::memory_pool::MemoryReservation;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};

/// A vector of record batches with a memory reservation.
#[derive(Debug)]
pub(super) struct ReservedBatches {
batches: Vec<RecordBatch>,
#[allow(dead_code)]
reservation: MemoryReservation,
}

impl ReservedBatches {
pub(super) fn new(batches: Vec<RecordBatch>, reservation: MemoryReservation) -> Self {
ReservedBatches {
batches,
reservation,
}
}
}

/// The name is from PostgreSQL's terminology.
/// See <https://wiki.postgresql.org/wiki/CTEReadme#How_Recursion_Works>
/// This table serves as a mirror or buffer between each iteration of a recursive query.
#[derive(Debug)]
pub(super) struct WorkTable {
batches: Mutex<Option<Vec<RecordBatch>>>,
batches: Mutex<Option<ReservedBatches>>,
}

impl WorkTable {
Expand All @@ -51,14 +69,17 @@ impl WorkTable {

/// Take the previously written batches from the work table.
/// This will be called by the [`WorkTableExec`] when it is executed.
fn take(&self) -> Vec<RecordBatch> {
let batches = self.batches.lock().unwrap().take().unwrap_or_default();
batches
fn take(&self) -> Result<ReservedBatches> {
self.batches
.lock()
.unwrap()
.take()
.ok_or_else(|| internal_datafusion_err!("Unexpected empty work table"))
}

/// Write the results of a recursive query iteration to the work table.
pub(super) fn write(&self, input: Vec<RecordBatch>) {
self.batches.lock().unwrap().replace(input);
/// Update the results of a recursive query iteration to the work table.
pub(super) fn update(&self, batches: ReservedBatches) {
self.batches.lock().unwrap().replace(batches);
}
}

Expand Down Expand Up @@ -175,13 +196,11 @@ impl ExecutionPlan for WorkTableExec {
"WorkTableExec got an invalid partition {partition} (expected 0)"
);
}

let batches = self.work_table.take();
Ok(Box::pin(MemoryStream::try_new(
batches,
self.schema.clone(),
None,
)?))
let batch = self.work_table.take()?;
Ok(Box::pin(
MemoryStream::try_new(batch.batches, self.schema.clone(), None)?
.with_reservation(batch.reservation),
))
}

fn metrics(&self) -> Option<MetricsSet> {
Expand All @@ -194,4 +213,41 @@ impl ExecutionPlan for WorkTableExec {
}

#[cfg(test)]
mod tests {}
mod tests {
use super::*;
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use datafusion_execution::memory_pool::{MemoryConsumer, UnboundedMemoryPool};
use std::sync::Arc;

#[test]
fn test_work_table() {
let work_table = WorkTable::new();
// cann't take from empty work_table
assert!(work_table.take().is_err());

let pool = Arc::new(UnboundedMemoryPool::default()) as _;
let mut reservation = MemoryConsumer::new("test_work_table").register(&pool);

// update batch to work_table
let array: ArrayRef = Arc::new((0..5).collect::<Int32Array>());
let batch = RecordBatch::try_from_iter(vec![("col", array)]).unwrap();
reservation.try_grow(100).unwrap();
work_table.update(ReservedBatches::new(vec![batch.clone()], reservation));
// take from work_table
let reserved_batches = work_table.take().unwrap();
assert_eq!(reserved_batches.batches, vec![batch.clone()]);

// consume the batch by the MemoryStream
let memory_stream =
MemoryStream::try_new(reserved_batches.batches, batch.schema(), None)
.unwrap()
.with_reservation(reserved_batches.reservation);

// should still be reserved
assert_eq!(pool.reserved(), 100);

// the reservation should be freed after drop the memory_stream
drop(memory_stream);
assert_eq!(pool.reserved(), 0);
}
}
38 changes: 18 additions & 20 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1387,15 +1387,21 @@ fn recursive_ctes() {
select n + 1 FROM numbers WHERE N < 10
)
select * from numbers;";
let err = logical_plan(sql).expect_err("query should have failed");
assert_eq!(
"This feature is not implemented: Recursive CTEs are not enabled",
err.strip_backtrace()
);
quick_test(
sql,
"Projection: numbers.n\
\n SubqueryAlias: numbers\
\n RecursiveQuery: is_distinct=false\
\n Projection: Int64(1) AS n\
\n EmptyRelation\
\n Projection: numbers.n + Int64(1)\
\n Filter: numbers.n < Int64(10)\
\n TableScan: numbers",
)
}

#[test]
fn recursive_ctes_enabled() {
fn recursive_ctes_disabled() {
let sql = "
WITH RECURSIVE numbers AS (
select 1 as n
Expand All @@ -1404,28 +1410,20 @@ fn recursive_ctes_enabled() {
)
select * from numbers;";

// manually setting up test here so that we can enable recursive ctes
// manually setting up test here so that we can disable recursive ctes
let mut context = MockContextProvider::default();
context.options_mut().execution.enable_recursive_ctes = true;
context.options_mut().execution.enable_recursive_ctes = false;

let planner = SqlToRel::new_with_options(&context, ParserOptions::default());
let result = DFParser::parse_sql_with_dialect(sql, &GenericDialect {});
let mut ast = result.unwrap();

let plan = planner
let err = planner
.statement_to_plan(ast.pop_front().unwrap())
.expect("recursive cte plan creation failed");

.expect_err("query should have failed");
assert_eq!(
format!("{plan:?}"),
"Projection: numbers.n\
\n SubqueryAlias: numbers\
\n RecursiveQuery: is_distinct=false\
\n Projection: Int64(1) AS n\
\n EmptyRelation\
\n Projection: numbers.n + Int64(1)\
\n Filter: numbers.n < Int64(10)\
\n TableScan: numbers"
"This feature is not implemented: Recursive CTEs are not enabled",
err.strip_backtrace()
);
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ datafusion.execution.aggregate.scalar_update_factor 10
datafusion.execution.batch_size 8192
datafusion.execution.coalesce_batches true
datafusion.execution.collect_statistics false
datafusion.execution.enable_recursive_ctes false
datafusion.execution.enable_recursive_ctes true
datafusion.execution.listing_table_ignore_subdirectory true
datafusion.execution.max_buffered_batches_per_output_file 2
datafusion.execution.meta_fetch_concurrency 32
Expand Down Expand Up @@ -244,7 +244,7 @@ datafusion.execution.aggregate.scalar_update_factor 10 Specifies the threshold f
datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption
datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting
datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files
datafusion.execution.enable_recursive_ctes false Should DataFusion support recursive CTEs Defaults to false since this feature is a work in progress and may not behave as expected
datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs
datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`).
datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption
datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max |
| datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption |
| datafusion.execution.listing_table_ignore_subdirectory | true | Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). |
| datafusion.execution.enable_recursive_ctes | false | Should DataFusion support recursive CTEs Defaults to false since this feature is a work in progress and may not behave as expected |
| datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs |
| datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. |
| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores |
| datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |
Expand Down

0 comments on commit 80d223f

Please sign in to comment.