diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index c31d5f62c726..417bc4cf977b 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -29,9 +29,8 @@ use crate::{ DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, }; -use arrow::array::ArrayRef; use arrow::datatypes::SchemaRef; -use arrow::record_batch::{RecordBatch, RecordBatchOptions}; +use arrow::record_batch::RecordBatch; use datafusion_common::stats::Precision; use datafusion_common::{internal_err, DataFusionError, Result}; use datafusion_execution::TaskContext; @@ -507,26 +506,15 @@ impl LimitStream { // self.fetch -= batch.num_rows(); Some(batch) - } else { + } else if batch.num_rows() >= self.fetch { let batch_rows = self.fetch; self.fetch = 0; self.input = None; // clear input so it can be dropped early - let limited_columns: Vec = batch - .columns() - .iter() - .map(|col| col.slice(0, col.len().min(batch_rows))) - .collect(); - let options = - RecordBatchOptions::new().with_row_count(Option::from(batch_rows)); - Some( - RecordBatch::try_new_with_options( - batch.schema(), - limited_columns, - &options, - ) - .unwrap(), - ) + // It is guaranteed that batch_rows is <= batch.num_rows + Some(batch.slice(0, batch_rows)) + } else { + unreachable!() } } } @@ -575,6 +563,7 @@ mod tests { use crate::{common, test}; use crate::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; + use arrow_array::RecordBatchOptions; use arrow_schema::Schema; use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::PhysicalExpr;