Skip to content

Commit

Permalink
remove custom extract_ok! macro (apache#14733)
Browse files Browse the repository at this point in the history
  • Loading branch information
ctsk authored Feb 19, 2025
1 parent a6a1be2 commit ee2d2a4
Showing 1 changed file with 10 additions and 24 deletions.
34 changes: 10 additions & 24 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,16 +632,6 @@ pub(crate) fn create_group_accumulator(
}
}

/// Extracts a successful Ok(_) or returns Poll::Ready(Some(Err(e))) with errors
macro_rules! extract_ok {
($RES: expr) => {{
match $RES {
Ok(v) => v,
Err(e) => return Poll::Ready(Some(Err(e))),
}
}};
}

impl Stream for GroupedHashAggregateStream {
type Item = Result<RecordBatch>;

Expand All @@ -661,7 +651,7 @@ impl Stream for GroupedHashAggregateStream {
let input_rows = batch.num_rows();

// Do the grouping
extract_ok!(self.group_aggregate_batch(batch));
self.group_aggregate_batch(batch)?;

self.update_skip_aggregation_probe(input_rows);

Expand All @@ -673,26 +663,24 @@ impl Stream for GroupedHashAggregateStream {
// emit all groups and switch to producing output
if self.hit_soft_group_limit() {
timer.done();
extract_ok!(self.set_input_done_and_produce_output());
self.set_input_done_and_produce_output()?;
// make sure the exec_state just set is not overwritten below
break 'reading_input;
}

if let Some(to_emit) = self.group_ordering.emit_to() {
timer.done();
if let Some(batch) =
extract_ok!(self.emit(to_emit, false))
{
if let Some(batch) = self.emit(to_emit, false)? {
self.exec_state =
ExecutionState::ProducingOutput(batch);
};
// make sure the exec_state just set is not overwritten below
break 'reading_input;
}

extract_ok!(self.emit_early_if_necessary());
self.emit_early_if_necessary()?;

extract_ok!(self.switch_to_skip_aggregation());
self.switch_to_skip_aggregation()?;

timer.done();
}
Expand All @@ -703,10 +691,10 @@ impl Stream for GroupedHashAggregateStream {
let timer = elapsed_compute.timer();

// Make sure we have enough capacity for `batch`, otherwise spill
extract_ok!(self.spill_previous_if_necessary(&batch));
self.spill_previous_if_necessary(&batch)?;

// Do the grouping
extract_ok!(self.group_aggregate_batch(batch));
self.group_aggregate_batch(batch)?;

// If we can begin emitting rows, do so,
// otherwise keep consuming input
Expand All @@ -716,16 +704,14 @@ impl Stream for GroupedHashAggregateStream {
// emit all groups and switch to producing output
if self.hit_soft_group_limit() {
timer.done();
extract_ok!(self.set_input_done_and_produce_output());
self.set_input_done_and_produce_output()?;
// make sure the exec_state just set is not overwritten below
break 'reading_input;
}

if let Some(to_emit) = self.group_ordering.emit_to() {
timer.done();
if let Some(batch) =
extract_ok!(self.emit(to_emit, false))
{
if let Some(batch) = self.emit(to_emit, false)? {
self.exec_state =
ExecutionState::ProducingOutput(batch);
};
Expand All @@ -745,7 +731,7 @@ impl Stream for GroupedHashAggregateStream {
// Found end from input stream
None => {
// inner is done, emit all rows and switch to producing output
extract_ok!(self.set_input_done_and_produce_output());
self.set_input_done_and_produce_output()?;
}
}
}
Expand Down

0 comments on commit ee2d2a4

Please sign in to comment.