Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated DF (patched) to Dec-4 #52

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -585,9 +585,9 @@ jobs:
#
# To reproduce:
# 1. Install the version of Rust that is failing. Example:
# rustup install 1.80.0
# rustup install 1.80.1
# 2. Run the command that failed with that version. Example:
# cargo +1.80.0 check -p datafusion
# cargo +1.80.1 check -p datafusion
#
# To resolve, either:
# 1. Change your code to use older Rust features,
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ homepage = "https://datafusion.apache.org"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/datafusion"
rust-version = "1.80"
rust-version = "1.80.1"
version = "43.0.0"

[workspace.dependencies]
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ license = "Apache-2.0"
homepage = "https://datafusion.apache.org"
repository = "https://github.com/apache/datafusion"
# Specify MSRV here as `cargo msrv` doesn't support workspace version
rust-version = "1.80"
rust-version = "1.80.1"
readme = "README.md"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ config_namespace! {
///
/// This is used to workaround bugs in the planner that are now caught by
/// the new schema verification step.
pub skip_physical_aggregate_schema_check: bool, default = false
pub skip_physical_aggregate_schema_check: bool, default = true

/// Specifies the reserved memory for each spillable sort operation to
/// facilitate an in-memory merge.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ authors = { workspace = true }
# Specify MSRV here as `cargo msrv` doesn't support workspace version and fails with
# "Unable to find key 'package.rust-version' (or 'package.metadata.msrv') in 'arrow-datafusion/Cargo.toml'"
# https://github.com/foresterre/cargo-msrv/issues/590
rust-version = "1.80"
rust-version = "1.80.1"

[lints]
workspace = true
Expand Down
10 changes: 10 additions & 0 deletions datafusion/core/src/physical_optimizer/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};

use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::union::UnionExec;
use itertools::izip;

/// The SanityCheckPlan rule rejects the following query plans:
Expand Down Expand Up @@ -126,6 +128,14 @@ pub fn check_plan_sanity(
plan.required_input_ordering().iter(),
plan.required_input_distribution().iter()
) {
// TEMP HACK WORKAROUND https://github.com/apache/datafusion/issues/11492
if child.as_any().downcast_ref::<UnionExec>().is_some() {
continue;
}
if child.as_any().downcast_ref::<SortExec>().is_some() {
continue;
}

let child_eq_props = child.equivalence_properties();
if let Some(sort_req) = sort_req {
if !child_eq_props.ordering_satisfy_requirement(sort_req) {
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,9 @@ impl DefaultPhysicalPlanner {
differences.push(format!("field nullability at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.is_nullable(), logical_field.is_nullable()));
}
}

log::warn!("Physical input schema should be the same as the one converted from logical input schema, but did not match for logical plan:\n{}", input.display_indent());

return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences
.iter()
.map(|s| format!("\n\t- {}", s))
Expand Down
2 changes: 1 addition & 1 deletion datafusion/ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ repository = { workspace = true }
license = { workspace = true }
authors = { workspace = true }
# Specify MSRV here as `cargo msrv` doesn't support workspace version
rust-version = "1.80"
rust-version = "1.80.1"

[lints]
workspace = true
Expand Down
7 changes: 0 additions & 7 deletions datafusion/physical-expr-common/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,6 @@ impl PhysicalSortExpr {
}
}

/// Access the PhysicalSortExpr as a PhysicalExpr
impl AsRef<dyn PhysicalExpr> for PhysicalSortExpr {
fn as_ref(&self) -> &(dyn PhysicalExpr + 'static) {
self.expr.as_ref()
}
}

impl PartialEq for PhysicalSortExpr {
fn eq(&self, other: &PhysicalSortExpr) -> bool {
self.options == other.options && self.expr.eq(&other.expr)
Expand Down
49 changes: 4 additions & 45 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{IndexSet, JoinType};
use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;

#[derive(Debug, Clone)]
/// A structure representing a expression known to be constant in a physical execution plan.
///
/// The `ConstExpr` struct encapsulates an expression that is constant during the execution
Expand All @@ -37,10 +38,9 @@ use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;
///
/// - `expr`: Constant expression for a node in the physical plan.
///
/// - `across_partitions`: A boolean flag indicating whether the constant
/// expression is the same across partitions. If set to `true`, the constant
/// expression has same value for all partitions. If set to `false`, the
/// constant expression may have different values for different partitions.
/// - `across_partitions`: A boolean flag indicating whether the constant expression is
/// valid across partitions. If set to `true`, the constant expression has same value for all partitions.
/// If set to `false`, the constant expression may have different values for different partitions.
///
/// # Example
///
Expand All @@ -53,21 +53,11 @@ use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;
/// // create a constant expression from a physical expression
/// let const_expr = ConstExpr::from(col);
/// ```
#[derive(Debug, Clone)]
pub struct ConstExpr {
/// The expression that is known to be constant (e.g. a `Column`)
expr: Arc<dyn PhysicalExpr>,
/// Does the constant have the same value across all partitions? See
/// struct docs for more details
across_partitions: bool,
}

impl PartialEq for ConstExpr {
fn eq(&self, other: &Self) -> bool {
self.across_partitions == other.across_partitions && self.expr.eq(&other.expr)
}
}

impl ConstExpr {
/// Create a new constant expression from a physical expression.
///
Expand All @@ -81,17 +71,11 @@ impl ConstExpr {
}
}

/// Set the `across_partitions` flag
///
/// See struct docs for more details
pub fn with_across_partitions(mut self, across_partitions: bool) -> Self {
self.across_partitions = across_partitions;
self
}

/// Is the expression the same across all partitions?
///
/// See struct docs for more details
pub fn across_partitions(&self) -> bool {
self.across_partitions
}
Expand All @@ -114,31 +98,6 @@ impl ConstExpr {
across_partitions: self.across_partitions,
})
}

/// Returns true if this constant expression is equal to the given expression
pub fn eq_expr(&self, other: impl AsRef<dyn PhysicalExpr>) -> bool {
self.expr.as_ref() == other.as_ref()
}

/// Returns a [`Display`]able list of `ConstExpr`.
pub fn format_list(input: &[ConstExpr]) -> impl Display + '_ {
struct DisplayableList<'a>(&'a [ConstExpr]);
impl Display for DisplayableList<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let mut first = true;
for const_expr in self.0 {
if first {
first = false;
} else {
write!(f, ",")?;
}
write!(f, "{}", const_expr)?;
}
Ok(())
}
}
DisplayableList(input)
}
}

/// Display implementation for `ConstExpr`
Expand Down
Loading
Loading