Skip to content

Commit

Permalink
move resolve_table_references out of datafusion-catalog` (apache#14441
Browse files Browse the repository at this point in the history
)

* move  out of

* forgotten license

* Update references

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
logan-keede and alamb authored Feb 3, 2025
1 parent 16c0686 commit a7bb09f
Show file tree
Hide file tree
Showing 9 changed files with 292 additions and 252 deletions.
3 changes: 1 addition & 2 deletions datafusion-examples/examples/sql_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::common::plan_err;
use datafusion::common::{plan_err, TableReference};
use datafusion::config::ConfigOptions;
use datafusion::error::Result;
use datafusion::logical_expr::{
Expand All @@ -29,7 +29,6 @@ use datafusion::optimizer::{
use datafusion::sql::planner::{ContextProvider, SqlToRel};
use datafusion::sql::sqlparser::dialect::PostgreSqlDialect;
use datafusion::sql::sqlparser::parser::Parser;
use datafusion::sql::TableReference;
use std::any::Any;
use std::sync::Arc;

Expand Down
248 changes: 9 additions & 239 deletions datafusion/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,19 @@
//! * Simple memory based catalog: [`MemoryCatalogProviderList`], [`MemoryCatalogProvider`], [`MemorySchemaProvider`]
pub mod memory;
#[deprecated(
since = "46.0.0",
note = "use datafusion_sql::resolve::resolve_table_references"
)]
pub use datafusion_sql::resolve::resolve_table_references;
#[deprecated(
since = "46.0.0",
note = "use datafusion_common::{ResolvedTableReference, TableReference}"
)]
pub use datafusion_sql::{ResolvedTableReference, TableReference};
pub use memory::{
MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider,
};
use std::collections::BTreeSet;
use std::ops::ControlFlow;

mod r#async;
mod catalog;
mod dynamic_file;
Expand All @@ -43,239 +49,3 @@ pub use schema::*;
pub use session::*;
pub use table::*;
pub mod streaming;

/// Collects all tables and views referenced in the SQL statement. CTEs are collected separately.
/// This can be used to determine which tables need to be in the catalog for a query to be planned.
///
/// # Returns
///
/// A `(table_refs, ctes)` tuple, the first element contains table and view references and the second
/// element contains any CTE aliases that were defined and possibly referenced.
///
/// ## Example
///
/// ```
/// # use datafusion_sql::parser::DFParser;
/// # use datafusion_catalog::resolve_table_references;
/// let query = "SELECT a FROM foo where x IN (SELECT y FROM bar)";
/// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
/// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
/// assert_eq!(table_refs.len(), 2);
/// assert_eq!(table_refs[0].to_string(), "bar");
/// assert_eq!(table_refs[1].to_string(), "foo");
/// assert_eq!(ctes.len(), 0);
/// ```
///
/// ## Example with CTEs
///
/// ```
/// # use datafusion_sql::parser::DFParser;
/// # use datafusion_catalog::resolve_table_references;
/// let query = "with my_cte as (values (1), (2)) SELECT * from my_cte;";
/// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
/// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
/// assert_eq!(table_refs.len(), 0);
/// assert_eq!(ctes.len(), 1);
/// assert_eq!(ctes[0].to_string(), "my_cte");
/// ```
pub fn resolve_table_references(
statement: &datafusion_sql::parser::Statement,
enable_ident_normalization: bool,
) -> datafusion_common::Result<(Vec<TableReference>, Vec<TableReference>)> {
use datafusion_sql::parser::{
CopyToSource, CopyToStatement, Statement as DFStatement,
};
use datafusion_sql::planner::object_name_to_table_reference;
use information_schema::INFORMATION_SCHEMA;
use information_schema::INFORMATION_SCHEMA_TABLES;
use sqlparser::ast::*;

struct RelationVisitor {
relations: BTreeSet<ObjectName>,
all_ctes: BTreeSet<ObjectName>,
ctes_in_scope: Vec<ObjectName>,
}

impl RelationVisitor {
/// Record the reference to `relation`, if it's not a CTE reference.
fn insert_relation(&mut self, relation: &ObjectName) {
if !self.relations.contains(relation)
&& !self.ctes_in_scope.contains(relation)
{
self.relations.insert(relation.clone());
}
}
}

impl Visitor for RelationVisitor {
type Break = ();

fn pre_visit_relation(&mut self, relation: &ObjectName) -> ControlFlow<()> {
self.insert_relation(relation);
ControlFlow::Continue(())
}

fn pre_visit_query(&mut self, q: &Query) -> ControlFlow<Self::Break> {
if let Some(with) = &q.with {
for cte in &with.cte_tables {
// The non-recursive CTE name is not in scope when evaluating the CTE itself, so this is valid:
// `WITH t AS (SELECT * FROM t) SELECT * FROM t`
// Where the first `t` refers to a predefined table. So we are careful here
// to visit the CTE first, before putting it in scope.
if !with.recursive {
// This is a bit hackish as the CTE will be visited again as part of visiting `q`,
// but thankfully `insert_relation` is idempotent.
cte.visit(self);
}
self.ctes_in_scope
.push(ObjectName(vec![cte.alias.name.clone()]));
}
}
ControlFlow::Continue(())
}

fn post_visit_query(&mut self, q: &Query) -> ControlFlow<Self::Break> {
if let Some(with) = &q.with {
for _ in &with.cte_tables {
// Unwrap: We just pushed these in `pre_visit_query`
self.all_ctes.insert(self.ctes_in_scope.pop().unwrap());
}
}
ControlFlow::Continue(())
}

fn pre_visit_statement(&mut self, statement: &Statement) -> ControlFlow<()> {
if let Statement::ShowCreate {
obj_type: ShowCreateObject::Table | ShowCreateObject::View,
obj_name,
} = statement
{
self.insert_relation(obj_name)
}

// SHOW statements will later be rewritten into a SELECT from the information_schema
let requires_information_schema = matches!(
statement,
Statement::ShowFunctions { .. }
| Statement::ShowVariable { .. }
| Statement::ShowStatus { .. }
| Statement::ShowVariables { .. }
| Statement::ShowCreate { .. }
| Statement::ShowColumns { .. }
| Statement::ShowTables { .. }
| Statement::ShowCollation { .. }
);
if requires_information_schema {
for s in INFORMATION_SCHEMA_TABLES {
self.relations.insert(ObjectName(vec![
Ident::new(INFORMATION_SCHEMA),
Ident::new(*s),
]));
}
}
ControlFlow::Continue(())
}
}

let mut visitor = RelationVisitor {
relations: BTreeSet::new(),
all_ctes: BTreeSet::new(),
ctes_in_scope: vec![],
};

fn visit_statement(statement: &DFStatement, visitor: &mut RelationVisitor) {
match statement {
DFStatement::Statement(s) => {
let _ = s.as_ref().visit(visitor);
}
DFStatement::CreateExternalTable(table) => {
visitor.relations.insert(table.name.clone());
}
DFStatement::CopyTo(CopyToStatement { source, .. }) => match source {
CopyToSource::Relation(table_name) => {
visitor.insert_relation(table_name);
}
CopyToSource::Query(query) => {
query.visit(visitor);
}
},
DFStatement::Explain(explain) => visit_statement(&explain.statement, visitor),
}
}

visit_statement(statement, &mut visitor);

let table_refs = visitor
.relations
.into_iter()
.map(|x| object_name_to_table_reference(x, enable_ident_normalization))
.collect::<datafusion_common::Result<_>>()?;
let ctes = visitor
.all_ctes
.into_iter()
.map(|x| object_name_to_table_reference(x, enable_ident_normalization))
.collect::<datafusion_common::Result<_>>()?;
Ok((table_refs, ctes))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn resolve_table_references_shadowed_cte() {
use datafusion_sql::parser::DFParser;

// An interesting edge case where the `t` name is used both as an ordinary table reference
// and as a CTE reference.
let query = "WITH t AS (SELECT * FROM t) SELECT * FROM t";
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
assert_eq!(table_refs.len(), 1);
assert_eq!(ctes.len(), 1);
assert_eq!(ctes[0].to_string(), "t");
assert_eq!(table_refs[0].to_string(), "t");

// UNION is a special case where the CTE is not in scope for the second branch.
let query = "(with t as (select 1) select * from t) union (select * from t)";
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
assert_eq!(table_refs.len(), 1);
assert_eq!(ctes.len(), 1);
assert_eq!(ctes[0].to_string(), "t");
assert_eq!(table_refs[0].to_string(), "t");

// Nested CTEs are also handled.
// Here the first `u` is a CTE, but the second `u` is a table reference.
// While `t` is always a CTE.
let query = "(with t as (with u as (select 1) select * from u) select * from u cross join t)";
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
assert_eq!(table_refs.len(), 1);
assert_eq!(ctes.len(), 2);
assert_eq!(ctes[0].to_string(), "t");
assert_eq!(ctes[1].to_string(), "u");
assert_eq!(table_refs[0].to_string(), "u");
}

#[test]
fn resolve_table_references_recursive_cte() {
use datafusion_sql::parser::DFParser;

let query = "
WITH RECURSIVE nodes AS (
SELECT 1 as id
UNION ALL
SELECT id + 1 as id
FROM nodes
WHERE id < 10
)
SELECT * FROM nodes
";
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
assert_eq!(table_refs.len(), 0);
assert_eq!(ctes.len(), 1);
assert_eq!(ctes[0].to_string(), "nodes");
}
}
3 changes: 1 addition & 2 deletions datafusion/core/tests/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use arrow_schema::{Fields, SchemaBuilder};
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{TransformedResult, TreeNode};
use datafusion_common::{plan_err, DFSchema, Result, ScalarValue};
use datafusion_common::{plan_err, DFSchema, Result, ScalarValue, TableReference};
use datafusion_expr::interval_arithmetic::{Interval, NullableInterval};
use datafusion_expr::{
col, lit, AggregateUDF, BinaryExpr, Expr, ExprSchemable, LogicalPlan, Operator,
Expand All @@ -41,7 +41,6 @@ use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::ast::Statement;
use datafusion_sql::sqlparser::dialect::GenericDialect;
use datafusion_sql::sqlparser::parser::Parser;
use datafusion_sql::TableReference;

use chrono::DateTime;
use datafusion_functions::datetime;
Expand Down
3 changes: 1 addition & 2 deletions datafusion/optimizer/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};

use datafusion_common::config::ConfigOptions;
use datafusion_common::{assert_contains, plan_err, Result};
use datafusion_common::{assert_contains, plan_err, Result, TableReference};
use datafusion_expr::sqlparser::dialect::PostgreSqlDialect;
use datafusion_expr::test::function_stub::sum_udaf;
use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, WindowUDF};
Expand All @@ -36,7 +36,6 @@ use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::ast::Statement;
use datafusion_sql::sqlparser::dialect::GenericDialect;
use datafusion_sql::sqlparser::parser::Parser;
use datafusion_sql::TableReference;

#[cfg(test)]
#[ctor::ctor]
Expand Down
3 changes: 1 addition & 2 deletions datafusion/sql/examples/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{collections::HashMap, sync::Arc};
use arrow_schema::{DataType, Field, Schema};

use datafusion_common::config::ConfigOptions;
use datafusion_common::{plan_err, Result};
use datafusion_common::{plan_err, Result, TableReference};
use datafusion_expr::planner::ExprPlanner;
use datafusion_expr::WindowUDF;
use datafusion_expr::{
Expand All @@ -32,7 +32,6 @@ use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_sql::{
planner::{ContextProvider, SqlToRel},
sqlparser::{dialect::GenericDialect, parser::Parser},
TableReference,
};

fn main() {
Expand Down
3 changes: 1 addition & 2 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,11 +1073,10 @@ mod tests {
use sqlparser::parser::Parser;

use datafusion_common::config::ConfigOptions;
use datafusion_common::TableReference;
use datafusion_expr::logical_plan::builder::LogicalTableSource;
use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};

use crate::TableReference;

use super::*;

struct TestContextProvider {
Expand Down
6 changes: 5 additions & 1 deletion datafusion/sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub mod parser;
pub mod planner;
mod query;
mod relation;
pub mod resolve;
mod select;
mod set_expr;
mod stack;
Expand All @@ -49,6 +50,9 @@ mod statement;
pub mod unparser;
pub mod utils;
mod values;

#[deprecated(
since = "46.0.0",
note = "use datafusion_common::{ResolvedTableReference, TableReference}"
)]
pub use datafusion_common::{ResolvedTableReference, TableReference};
pub use sqlparser;
Loading

0 comments on commit a7bb09f

Please sign in to comment.