diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index a0251857c272..05122d5a5403 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -632,16 +632,6 @@ pub(crate) fn create_group_accumulator( } } -/// Extracts a successful Ok(_) or returns Poll::Ready(Some(Err(e))) with errors -macro_rules! extract_ok { - ($RES: expr) => {{ - match $RES { - Ok(v) => v, - Err(e) => return Poll::Ready(Some(Err(e))), - } - }}; -} - impl Stream for GroupedHashAggregateStream { type Item = Result; @@ -661,7 +651,7 @@ impl Stream for GroupedHashAggregateStream { let input_rows = batch.num_rows(); // Do the grouping - extract_ok!(self.group_aggregate_batch(batch)); + self.group_aggregate_batch(batch)?; self.update_skip_aggregation_probe(input_rows); @@ -673,16 +663,14 @@ impl Stream for GroupedHashAggregateStream { // emit all groups and switch to producing output if self.hit_soft_group_limit() { timer.done(); - extract_ok!(self.set_input_done_and_produce_output()); + self.set_input_done_and_produce_output()?; // make sure the exec_state just set is not overwritten below break 'reading_input; } if let Some(to_emit) = self.group_ordering.emit_to() { timer.done(); - if let Some(batch) = - extract_ok!(self.emit(to_emit, false)) - { + if let Some(batch) = self.emit(to_emit, false)? { self.exec_state = ExecutionState::ProducingOutput(batch); }; @@ -690,9 +678,9 @@ impl Stream for GroupedHashAggregateStream { break 'reading_input; } - extract_ok!(self.emit_early_if_necessary()); + self.emit_early_if_necessary()?; - extract_ok!(self.switch_to_skip_aggregation()); + self.switch_to_skip_aggregation()?; timer.done(); } @@ -703,10 +691,10 @@ impl Stream for GroupedHashAggregateStream { let timer = elapsed_compute.timer(); // Make sure we have enough capacity for `batch`, otherwise spill - extract_ok!(self.spill_previous_if_necessary(&batch)); + self.spill_previous_if_necessary(&batch)?; // Do the grouping - extract_ok!(self.group_aggregate_batch(batch)); + self.group_aggregate_batch(batch)?; // If we can begin emitting rows, do so, // otherwise keep consuming input @@ -716,16 +704,14 @@ impl Stream for GroupedHashAggregateStream { // emit all groups and switch to producing output if self.hit_soft_group_limit() { timer.done(); - extract_ok!(self.set_input_done_and_produce_output()); + self.set_input_done_and_produce_output()?; // make sure the exec_state just set is not overwritten below break 'reading_input; } if let Some(to_emit) = self.group_ordering.emit_to() { timer.done(); - if let Some(batch) = - extract_ok!(self.emit(to_emit, false)) - { + if let Some(batch) = self.emit(to_emit, false)? { self.exec_state = ExecutionState::ProducingOutput(batch); }; @@ -745,7 +731,7 @@ impl Stream for GroupedHashAggregateStream { // Found end from input stream None => { // inner is done, emit all rows and switch to producing output - extract_ok!(self.set_input_done_and_produce_output()); + self.set_input_done_and_produce_output()?; } } }