diff --git a/datafusion/core/tests/sql/sql_api.rs b/datafusion/core/tests/sql/sql_api.rs index 034d6fa23d9c..ac77fa9312e7 100644 --- a/datafusion/core/tests/sql/sql_api.rs +++ b/datafusion/core/tests/sql/sql_api.rs @@ -19,6 +19,24 @@ use datafusion::prelude::*; use tempfile::TempDir; +#[tokio::test] +async fn test_window_function() { + let ctx = SessionContext::new(); + let df = ctx + .sql( + r#"SELECT + t1.v1, + SUM(t1.v1) OVER w + 1 + FROM + generate_series(1, 10000) AS t1(v1) + WINDOW + w AS (ORDER BY t1.v1 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);"#, + ) + .await; + println!("{:?}", df); + assert!(df.is_ok()); +} + #[tokio::test] async fn unsupported_ddl_returns_error() { // Verify SessionContext::with_sql_options errors appropriately diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index e21def4c3941..b594fd6a35f1 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -16,6 +16,7 @@ // under the License. use std::collections::HashSet; +use std::ops::ControlFlow; use std::sync::Arc; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; @@ -27,7 +28,7 @@ use crate::utils::{ use datafusion_common::error::DataFusionErrorBuilder; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion_common::{not_impl_err, plan_err, Result}; +use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result}; use datafusion_common::{RecursionUnnestOption, UnnestOptions}; use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions}; use datafusion_expr::expr_rewriter::{ @@ -44,8 +45,8 @@ use datafusion_expr::{ use indexmap::IndexMap; use sqlparser::ast::{ - Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr, - WildcardAdditionalOptions, WindowType, + Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr, VisitMut, + VisitorMut, WildcardAdditionalOptions, WindowType, }; use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, TableWithJoins}; @@ -849,21 +850,19 @@ fn check_conflicting_windows(window_defs: &[NamedWindowDefinition]) -> Result<() Ok(()) } -// If the projection is done over a named window, that window -// name must be defined. Otherwise, it gives an error. -fn match_window_definitions( - projection: &mut [SelectItem], - named_windows: &[NamedWindowDefinition], -) -> Result<()> { - for proj in projection.iter_mut() { - if let SelectItem::ExprWithAlias { - expr: SQLExpr::Function(f), - alias: _, - } - | SelectItem::UnnamedExpr(SQLExpr::Function(f)) = proj - { - for NamedWindowDefinition(window_ident, window_expr) in named_windows.iter() { - if let Some(WindowType::NamedWindow(ident)) = &f.over { +// Visit the expression to find all window functions +struct WindowFunctionVisitor<'a> { + named_windows: &'a [NamedWindowDefinition], +} + +impl VisitorMut for WindowFunctionVisitor<'_> { + fn pre_visit_expr(&mut self, expr: &mut SQLExpr) -> ControlFlow { + if let SQLExpr::Function(f) = expr { + if let Some(WindowType::NamedWindow(ident)) = &f.over { + let ident = ident.clone(); + for NamedWindowDefinition(window_ident, window_expr) in + self.named_windows.iter() + { if ident.eq(window_ident) { f.over = Some(match window_expr { NamedWindowExpr::NamedWindow(ident) => { @@ -875,11 +874,36 @@ fn match_window_definitions( }) } } + // All named windows must be defined with a WindowSpec. + if let Some(WindowType::NamedWindow(ident)) = &f.over { + return ControlFlow::Break(DataFusionError::Plan(format!( + "The window {ident} is not defined!" + ))); + } } - // All named windows must be defined with a WindowSpec. - if let Some(WindowType::NamedWindow(ident)) = &f.over { - return plan_err!("The window {ident} is not defined!"); - } + } + ControlFlow::Continue(()) + } + + type Break = DataFusionError; +} + +// If the projection is done over a named window, that window +// name must be defined. Otherwise, it gives an error. +fn match_window_definitions( + projection: &mut [SelectItem], + named_windows: &[NamedWindowDefinition], +) -> Result<()> { + for proj in projection.iter_mut() { + if let SelectItem::ExprWithAlias { expr, alias: _ } + | SelectItem::UnnamedExpr(expr) = proj + { + let mut visitor = WindowFunctionVisitor { named_windows }; + + match VisitMut::visit(expr, &mut visitor) { + ControlFlow::Continue(_) => (), + ControlFlow::Break(err) => return Err(err), + }; } } Ok(())