Skip to content

Commit

Permalink
Add new tests, minor changes, trigger evalaute_all
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafasrepo committed Feb 22, 2024
1 parent 0223836 commit 5d532fd
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 20 deletions.
36 changes: 26 additions & 10 deletions datafusion/physical-expr/src/window/lead_lag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use crate::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::compute::cast;
use arrow::datatypes::{DataType, Field};
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue};
use datafusion_common::{
arrow_datafusion_err, exec_datafusion_err, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::PartitionEvaluator;
use std::any::Any;
use std::cmp::min;
Expand Down Expand Up @@ -141,6 +143,13 @@ pub(crate) struct WindowShiftEvaluator {
non_nulls_idx: Vec<usize>,
}

impl WindowShiftEvaluator {
fn is_lag(&self) -> bool {
// Mode is LAG, when shift_offset is positive
self.shift_offset > 0
}
}

fn create_empty_array(
value: Option<&ScalarValue>,
data_type: &DataType,
Expand Down Expand Up @@ -193,7 +202,7 @@ fn shift_with_default_value(

impl PartitionEvaluator for WindowShiftEvaluator {
fn get_range(&self, idx: usize, n_rows: usize) -> Result<Range<usize>> {
if self.shift_offset > 0 {
if self.is_lag() {
let offset = self.shift_offset as usize;
let start = idx.saturating_sub(offset);
let end = idx + 1;
Expand All @@ -207,7 +216,7 @@ impl PartitionEvaluator for WindowShiftEvaluator {

fn is_causal(&self) -> bool {
// Lagging windows are causal by definition:
self.shift_offset > 0
self.is_lag()
}

fn evaluate(
Expand All @@ -222,7 +231,7 @@ impl PartitionEvaluator for WindowShiftEvaluator {
let dtype = array.data_type();
let len = array.len() as i64;
// LAG mode
let mut idx = if self.is_causal() {
let mut idx = if self.is_lag() {
range.end as i64 - self.shift_offset - 1
} else {
// LEAD mode
Expand All @@ -232,15 +241,12 @@ impl PartitionEvaluator for WindowShiftEvaluator {
// Support LAG only for now, as LEAD requires some brainstorm first
// LAG with IGNORE NULLS calculated as the current row index - offset, but only for non-NULL rows
// If current row index points to NULL value the row is NOT counted
if self.ignore_nulls && self.is_causal() {
if self.ignore_nulls && self.is_lag() {
let prev_range_end = range.end - 1;
// Find a nonNULL row index that shifted by offset comparing to current row index
if self.shift_offset as usize <= self.non_nulls_idx.len() {
idx = *unsafe {
self.non_nulls_idx.get_unchecked(
self.non_nulls_idx.len() - self.shift_offset as usize,
)
} as i64;
let non_null_idx = self.non_nulls_idx.len() - self.shift_offset as usize;
idx = self.non_nulls_idx[non_null_idx] as i64;
} else {
idx = -1;
}
Expand All @@ -249,6 +255,11 @@ impl PartitionEvaluator for WindowShiftEvaluator {
if prev_range_end < array.len() && array.is_valid(prev_range_end) {
self.non_nulls_idx.push(prev_range_end);
}
} else if self.ignore_nulls && !self.is_lag() {
// IGNORE NULLS and LEAD mode.
return Err(exec_datafusion_err!(
"IGNORE NULLS mode for LEAD is not supported for BoundedWindowAggExec"
));
}

// Set the default value if
Expand All @@ -267,6 +278,11 @@ impl PartitionEvaluator for WindowShiftEvaluator {
values: &[ArrayRef],
_num_rows: usize,
) -> Result<ArrayRef> {
if self.ignore_nulls {
return Err(exec_datafusion_err!(
"IGNORE NULLS mode for LAG and LEAD is not supported for WindowAggExec"
));
}
// LEAD, LAG window functions take single column, values will have size 1
let value = &values[0];
shift_with_default_value(value, self.shift_offset, self.default_value.as_ref())
Expand Down
25 changes: 15 additions & 10 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4146,7 +4146,7 @@ NULL def NULL def
NULL def NULL NULL
NULL def b b

# LAG window function IGNORE/RESPECT NULLS support with descending order and default offset 1
# LAG window function IGNORE/RESPECT NULLS support with descending order and nondefault offset
query TTTT
select lag(a, 2, null) ignore nulls over (order by id desc) as x1,
lag(a, 2, 'def') ignore nulls over (order by id desc) as x2,
Expand All @@ -4159,15 +4159,20 @@ NULL def NULL def
NULL def x x
x x NULL NULL

# LAG window function IGNORE/RESPECT NULLS support with descending order and default offset 1. Bounded MODE
query TTTT
select lag(a, 2, null) ignore nulls over (order by id desc ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as x1,
# LAG window function IGNORE/RESPECT NULLS support with descending order and nondefault offset.
# To trigger WindowAggExec, we added a sum window function with all of the ranges.
statement error Execution error: IGNORE NULLS mode for LAG and LEAD is not supported for WindowAggExec
select lag(a, 2, null) ignore nulls over (order by id desc) as x1,
lag(a, 2, 'def') ignore nulls over (order by id desc) as x2,
lag(a, 2, null) respect nulls over (order by id desc) as x4,
lag(a, 2, 'def') respect nulls over (order by id desc) as x5
lag(a, 2, 'def') respect nulls over (order by id desc) as x5,
sum(id) over (order by id desc ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as sum_id
from (select 2 id, 'b' a union all select 1 id, null a union all select 3 id, null union all select 4 id, 'x')

# LEAD window function IGNORE/RESPECT NULLS support with descending order and nondefault offset
statement error Execution error: IGNORE NULLS mode for LEAD is not supported for BoundedWindowAggExec
select lead(a, 2, null) ignore nulls over (order by id desc) as x1,
lead(a, 2, 'def') ignore nulls over (order by id desc) as x2,
lead(a, 2, null) respect nulls over (order by id desc) as x4,
lead(a, 2, 'def') respect nulls over (order by id desc) as x5
from (select 2 id, 'b' a union all select 1 id, null a union all select 3 id, null union all select 4 id, 'x')
----
NULL def NULL def
NULL def NULL def
NULL def x x
x x NULL NULL

0 comments on commit 5d532fd

Please sign in to comment.