Skip to content

Commit

Permalink
fix: nested window function
Browse files Browse the repository at this point in the history
  • Loading branch information
chenkovsky committed Mar 5, 2025
1 parent c0d53ad commit fad31ec
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 22 deletions.
18 changes: 18 additions & 0 deletions datafusion/core/tests/sql/sql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,24 @@ use datafusion::prelude::*;

use tempfile::TempDir;

#[tokio::test]
async fn test_window_function() {
let ctx = SessionContext::new();
let df = ctx
.sql(
r#"SELECT
t1.v1,
SUM(t1.v1) OVER w + 1
FROM
generate_series(1, 10000) AS t1(v1)
WINDOW
w AS (ORDER BY t1.v1 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);"#,
)
.await;
println!("{:?}", df);
assert!(df.is_ok());
}

#[tokio::test]
async fn unsupported_ddl_returns_error() {
// Verify SessionContext::with_sql_options errors appropriately
Expand Down
68 changes: 46 additions & 22 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use std::collections::HashSet;
use std::ops::ControlFlow;
use std::sync::Arc;

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
Expand All @@ -27,7 +28,7 @@ use crate::utils::{

use datafusion_common::error::DataFusionErrorBuilder;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{not_impl_err, plan_err, Result};
use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
use datafusion_common::{RecursionUnnestOption, UnnestOptions};
use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions};
use datafusion_expr::expr_rewriter::{
Expand All @@ -44,8 +45,8 @@ use datafusion_expr::{

use indexmap::IndexMap;
use sqlparser::ast::{
Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr,
WildcardAdditionalOptions, WindowType,
Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr, VisitMut,
VisitorMut, WildcardAdditionalOptions, WindowType,
};
use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, TableWithJoins};

Expand Down Expand Up @@ -849,21 +850,19 @@ fn check_conflicting_windows(window_defs: &[NamedWindowDefinition]) -> Result<()
Ok(())
}

// If the projection is done over a named window, that window
// name must be defined. Otherwise, it gives an error.
fn match_window_definitions(
projection: &mut [SelectItem],
named_windows: &[NamedWindowDefinition],
) -> Result<()> {
for proj in projection.iter_mut() {
if let SelectItem::ExprWithAlias {
expr: SQLExpr::Function(f),
alias: _,
}
| SelectItem::UnnamedExpr(SQLExpr::Function(f)) = proj
{
for NamedWindowDefinition(window_ident, window_expr) in named_windows.iter() {
if let Some(WindowType::NamedWindow(ident)) = &f.over {
// Visit the expression to find all window functions
struct WindowFunctionVisitor<'a> {
named_windows: &'a [NamedWindowDefinition],
}

impl VisitorMut for WindowFunctionVisitor<'_> {
fn pre_visit_expr(&mut self, expr: &mut SQLExpr) -> ControlFlow<Self::Break> {
if let SQLExpr::Function(f) = expr {
if let Some(WindowType::NamedWindow(ident)) = &f.over {
let ident = ident.clone();
for NamedWindowDefinition(window_ident, window_expr) in
self.named_windows.iter()
{
if ident.eq(window_ident) {
f.over = Some(match window_expr {
NamedWindowExpr::NamedWindow(ident) => {
Expand All @@ -875,11 +874,36 @@ fn match_window_definitions(
})
}
}
// All named windows must be defined with a WindowSpec.
if let Some(WindowType::NamedWindow(ident)) = &f.over {
return ControlFlow::Break(DataFusionError::Plan(format!(
"The window {ident} is not defined!"
)));
}
}
// All named windows must be defined with a WindowSpec.
if let Some(WindowType::NamedWindow(ident)) = &f.over {
return plan_err!("The window {ident} is not defined!");
}
}
ControlFlow::Continue(())
}

type Break = DataFusionError;
}

// If the projection is done over a named window, that window
// name must be defined. Otherwise, it gives an error.
fn match_window_definitions(
projection: &mut [SelectItem],
named_windows: &[NamedWindowDefinition],
) -> Result<()> {
for proj in projection.iter_mut() {
if let SelectItem::ExprWithAlias { expr, alias: _ }
| SelectItem::UnnamedExpr(expr) = proj
{
let mut visitor = WindowFunctionVisitor { named_windows };

match VisitMut::visit(expr, &mut visitor) {
ControlFlow::Continue(_) => (),
ControlFlow::Break(err) => return Err(err),
};
}
}
Ok(())
Expand Down

0 comments on commit fad31ec

Please sign in to comment.