Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into feature/internal_er…
Browse files Browse the repository at this point in the history
…r_9164
  • Loading branch information
Omega359 committed Feb 21, 2024
2 parents 1e7f650 + 6fad5ed commit 86d2498
Show file tree
Hide file tree
Showing 52 changed files with 2,281 additions and 943 deletions.
13 changes: 4 additions & 9 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -480,16 +480,11 @@ jobs:
uses: ./.github/actions/setup-builder
with:
rust-version: stable
- name: Install cargo-tomlfmt
run: which cargo-tomlfmt || cargo install cargo-tomlfmt

- name: Install taplo
run: cargo +stable install taplo-cli --version ^0.9 --locked
# if you encounter an error, try running 'taplo format' to fix the formatting automatically.
- name: Check Cargo.toml formatting
run: |
# if you encounter an error, try running 'cargo tomlfmt -p path/to/Cargo.toml' to fix the formatting automatically.
# If the error still persists, you need to manually edit the Cargo.toml file, which introduces formatting violation.
#
# ignore ./Cargo.toml because putting workspaces in multi-line lists make it easy to read
ci/scripts/rust_toml_fmt.sh
run: taplo format --check

config-docs-check:
name: check configs.md is up-to-date
Expand Down
21 changes: 20 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,26 @@

[workspace]
exclude = ["datafusion-cli"]
members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/execution", "datafusion/functions", "datafusion/functions-array", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/physical-plan", "datafusion/proto", "datafusion/proto/gen", "datafusion/sql", "datafusion/sqllogictest", "datafusion/substrait", "datafusion/wasmtest", "datafusion-examples", "docs", "test-utils", "benchmarks",
members = [
"datafusion/common",
"datafusion/core",
"datafusion/expr",
"datafusion/execution",
"datafusion/functions",
"datafusion/functions-array",
"datafusion/optimizer",
"datafusion/physical-expr",
"datafusion/physical-plan",
"datafusion/proto",
"datafusion/proto/gen",
"datafusion/sql",
"datafusion/sqllogictest",
"datafusion/substrait",
"datafusion/wasmtest",
"datafusion-examples",
"docs",
"test-utils",
"benchmarks",
]
resolver = "2"

Expand Down
8 changes: 3 additions & 5 deletions ci/scripts/rust_toml_fmt.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
# specific language governing permissions and limitations
# under the License.

# Run cargo-tomlfmt with flag `-d` in dry run to check formatting
# Run `taplo format` with flag `--check` in dry run to check formatting
# without overwritng the file. If any error occur, you may want to
# rerun 'cargo tomlfmt -p path/to/Cargo.toml' without '-d' to fix
# the formatting automatically.
# rerun `taplo format` to fix the formatting automatically.
set -ex
for toml in $(find . -mindepth 2 -name 'Cargo.toml'); do
cargo tomlfmt -d -p $toml
taplo format
done
10 changes: 9 additions & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,15 @@ async-trait = "0.1.41"
aws-config = "0.55"
aws-credential-types = "0.55"
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "36.0.0", features = ["avro", "crypto_expressions", "encoding_expressions", "parquet", "regex_expressions", "unicode_expressions", "compression"] }
datafusion = { path = "../datafusion/core", version = "36.0.0", features = [
"avro",
"crypto_expressions",
"encoding_expressions",
"parquet",
"regex_expressions",
"unicode_expressions",
"compression",
] }
datafusion-common = { path = "../datafusion/common" }
dirs = "4.0.0"
env_logger = "0.9"
Expand Down
74 changes: 49 additions & 25 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ use arrow::{
UInt16Type, UInt32Type, UInt64Type, UInt8Type, DECIMAL128_MAX_PRECISION,
},
};
use arrow_array::cast::as_list_array;
use arrow_array::{ArrowNativeTypeOp, Scalar};

pub use struct_builder::ScalarStructBuilder;
Expand Down Expand Up @@ -2138,28 +2137,67 @@ impl ScalarValue {

/// Retrieve ScalarValue for each row in `array`
///
/// Example
/// Example 1: Array (ScalarValue::Int32)
/// ```
/// use datafusion_common::ScalarValue;
/// use arrow::array::ListArray;
/// use arrow::datatypes::{DataType, Int32Type};
///
/// // Equivalent to [[1,2,3], [4,5]]
/// let list_arr = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
/// Some(vec![Some(1), Some(2), Some(3)]),
/// None,
/// Some(vec![Some(4), Some(5)])
/// ]);
///
/// // Convert the array into Scalar Values for each row
/// let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&list_arr).unwrap();
///
/// let expected = vec![
/// vec![
/// vec![
/// ScalarValue::Int32(Some(1)),
/// ScalarValue::Int32(Some(2)),
/// ScalarValue::Int32(Some(3)),
/// ],
/// vec![
/// ScalarValue::Int32(Some(4)),
/// ScalarValue::Int32(Some(5)),
/// ],
/// ];
///
/// assert_eq!(scalar_vec, expected);
/// ```
///
/// Example 2: Nested array (ScalarValue::List)
/// ```
/// use datafusion_common::ScalarValue;
/// use arrow::array::ListArray;
/// use arrow::datatypes::{DataType, Int32Type};
/// use datafusion_common::utils::array_into_list_array;
/// use std::sync::Arc;
///
/// let list_arr = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
/// Some(vec![Some(1), Some(2), Some(3)]),
/// Some(vec![Some(4), Some(5)])
/// ]);
///
/// // Wrap into another layer of list, we got nested array as [ [[1,2,3], [4,5]] ]
/// let list_arr = array_into_list_array(Arc::new(list_arr));
///
/// // Convert the array into Scalar Values for each row, we got 1D arrays in this example
/// let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&list_arr).unwrap();
///
/// let l1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
/// Some(vec![Some(1), Some(2), Some(3)]),
/// ]);
/// let l2 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
/// Some(vec![Some(4), Some(5)]),
/// ]);
///
/// let expected = vec![
/// vec![
/// ScalarValue::List(Arc::new(l1)),
/// ScalarValue::List(Arc::new(l2)),
/// ],
/// vec![],
/// vec![ScalarValue::Int32(Some(4)), ScalarValue::Int32(Some(5))]
/// ];
///
/// assert_eq!(scalar_vec, expected);
Expand All @@ -2168,27 +2206,13 @@ impl ScalarValue {
let mut scalars = Vec::with_capacity(array.len());

for index in 0..array.len() {
let scalar_values = match array.data_type() {
DataType::List(_) => {
let list_array = as_list_array(array);
match list_array.is_null(index) {
true => Vec::new(),
false => {
let nested_array = list_array.value(index);
ScalarValue::convert_array_to_scalar_vec(&nested_array)?
.into_iter()
.flatten()
.collect()
}
}
}
_ => {
let scalar = ScalarValue::try_from_array(array, index)?;
vec![scalar]
}
};
let nested_array = array.as_list::<i32>().value(index);
let scalar_values = (0..nested_array.len())
.map(|i| ScalarValue::try_from_array(&nested_array, i))
.collect::<Result<Vec<_>>>()?;
scalars.push(scalar_values);
}

Ok(scalars)
}

Expand Down
25 changes: 22 additions & 3 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,15 @@ avro = ["apache-avro", "num-traits", "datafusion-common/avro"]
backtrace = ["datafusion-common/backtrace"]
compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression", "tokio-util"]
crypto_expressions = ["datafusion-physical-expr/crypto_expressions", "datafusion-optimizer/crypto_expressions"]
default = ["array_expressions", "crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression", "parquet"]
default = [
"array_expressions",
"crypto_expressions",
"encoding_expressions",
"regex_expressions",
"unicode_expressions",
"compression",
"parquet",
]
encoding_expressions = ["datafusion-functions/encoding_expressions"]
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = []
Expand All @@ -52,7 +60,11 @@ parquet = ["datafusion-common/parquet", "dep:parquet"]
pyarrow = ["datafusion-common/pyarrow", "parquet"]
regex_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion-optimizer/regex_expressions"]
serde = ["arrow-schema/serde"]
unicode_expressions = ["datafusion-physical-expr/unicode_expressions", "datafusion-optimizer/unicode_expressions", "datafusion-sql/unicode_expressions"]
unicode_expressions = [
"datafusion-physical-expr/unicode_expressions",
"datafusion-optimizer/unicode_expressions",
"datafusion-sql/unicode_expressions",
]

[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
Expand All @@ -61,7 +73,14 @@ arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-ipc = { workspace = true }
arrow-schema = { workspace = true }
async-compression = { version = "0.4.0", features = ["bzip2", "gzip", "xz", "zstd", "futures-io", "tokio"], optional = true }
async-compression = { version = "0.4.0", features = [
"bzip2",
"gzip",
"xz",
"zstd",
"futures-io",
"tokio",
], optional = true }
async-trait = { workspace = true }
bytes = { workspace = true }
bzip2 = { version = "0.4.3", optional = true }
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,7 @@ impl DataFrame {
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # use std::fs;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// use datafusion::dataframe::DataFrameWriteOptions;
Expand All @@ -1159,6 +1160,7 @@ impl DataFrame {
/// DataFrameWriteOptions::new(),
/// None, // can also specify CSV writing options here
/// ).await?;
/// # fs::remove_file("output.csv")?;
/// # Ok(())
/// # }
/// ```
Expand Down Expand Up @@ -1199,6 +1201,7 @@ impl DataFrame {
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # use std::fs;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// use datafusion::dataframe::DataFrameWriteOptions;
Expand All @@ -1210,6 +1213,7 @@ impl DataFrame {
/// "output.json",
/// DataFrameWriteOptions::new(),
/// ).await?;
/// # fs::remove_file("output.json")?;
/// # Ok(())
/// # }
/// ```
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ impl DataFrame {
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # use std::fs;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// use datafusion::dataframe::DataFrameWriteOptions;
Expand All @@ -44,6 +45,7 @@ impl DataFrame {
/// DataFrameWriteOptions::new(),
/// None, // can also specify parquet writing options here
/// ).await?;
/// # fs::remove_file("output.parquet")?;
/// # Ok(())
/// # }
/// ```
Expand Down
28 changes: 15 additions & 13 deletions datafusion/core/src/datasource/file_format/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,21 +391,23 @@ fn remove_partition_by_columns(
parted_batch: &RecordBatch,
partition_by: &[(String, DataType)],
) -> Result<RecordBatch> {
let end_idx = parted_batch.num_columns() - partition_by.len();
let non_part_cols = &parted_batch.columns()[..end_idx];

let partition_names: Vec<_> = partition_by.iter().map(|(s, _)| s).collect();
let non_part_schema = Schema::new(
parted_batch
.schema()
.fields()
.iter()
.filter(|f| !partition_names.contains(&f.name()))
.map(|f| (**f).clone())
.collect::<Vec<_>>(),
);
let (non_part_cols, non_part_fields): (Vec<_>, Vec<_>) = parted_batch
.columns()
.iter()
.zip(parted_batch.schema().fields())
.filter_map(|(a, f)| {
if !partition_names.contains(&f.name()) {
Some((a.clone(), (**f).clone()))
} else {
None
}
})
.unzip();

let non_part_schema = Schema::new(non_part_fields);
let final_batch_to_send =
RecordBatch::try_new(Arc::new(non_part_schema), non_part_cols.into())?;
RecordBatch::try_new(Arc::new(non_part_schema), non_part_cols)?;

Ok(final_batch_to_send)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl ParquetExec {
}
}
})
.filter(|p| !p.allways_true());
.filter(|p| !p.always_true());

let page_pruning_predicate = predicate.as_ref().and_then(|predicate_expr| {
match PagePruningPredicate::try_new(predicate_expr, file_schema.clone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl PagePruningPredicate {
.filter_map(|predicate| {
match PruningPredicate::try_new(predicate.clone(), schema.clone()) {
Ok(p)
if (!p.allways_true())
if (!p.always_true())
&& (p.required_columns().n_columns() < 2) =>
{
Some(Ok(p))
Expand Down
12 changes: 12 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,18 @@ impl FunctionRegistry for SessionContext {
fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
self.state.read().udwf(name)
}
fn register_udf(&mut self, udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> {
self.state.write().register_udf(udf)
}
fn register_udaf(
&mut self,
udaf: Arc<AggregateUDF>,
) -> Result<Option<Arc<AggregateUDF>>> {
self.state.write().register_udaf(udaf)
}
fn register_udwf(&mut self, udwf: Arc<WindowUDF>) -> Result<Option<Arc<WindowUDF>>> {
self.state.write().register_udwf(udwf)
}
}

/// A planner used to add extensions to DataFusion logical and physical plans.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ impl PruningPredicate {
///
/// This happens if the predicate is a literal `true` and
/// literal_guarantees is empty.
pub fn allways_true(&self) -> bool {
pub fn always_true(&self) -> bool {
is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty()
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ async fn csv_query_array_agg_distinct() -> Result<()> {
// We should have 1 row containing a list
let column = actual[0].column(0);
assert_eq!(column.len(), 1);

let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&column)?;
let mut scalars = scalar_vec[0].clone();

// workaround lack of Ord of ScalarValue
let cmp = |a: &ScalarValue, b: &ScalarValue| {
a.partial_cmp(b).expect("Can compare ScalarValues")
Expand Down
Loading

0 comments on commit 86d2498

Please sign in to comment.