Skip to content

Commit

Permalink
Merge branch 'apache:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
danila-b authored Mar 8, 2025
2 parents 5533efe + f47ea73 commit dccbe5d
Show file tree
Hide file tree
Showing 29 changed files with 645 additions and 230 deletions.
6 changes: 3 additions & 3 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,13 @@ metadata (number of cores, DataFusion version, etc.).
$ git checkout main
# generate an output script in /tmp/output_main
$ mkdir -p /tmp/output_main
$ cargo run --release --bin tpch -- benchmark datafusion --iterations 5 --path ./data --format parquet -o /tmp/output_main
$ cargo run --release --bin tpch -- benchmark datafusion --iterations 5 --path ./data --format parquet -o /tmp/output_main/tpch.json
# generate an output script in /tmp/output_branch
$ mkdir -p /tmp/output_branch
$ git checkout my_branch
$ cargo run --release --bin tpch -- benchmark datafusion --iterations 5 --path ./data --format parquet -o /tmp/output_branch
$ cargo run --release --bin tpch -- benchmark datafusion --iterations 5 --path ./data --format parquet -o /tmp/output_branch/tpch.json
# compare the results:
./compare.py /tmp/output_main/tpch-summary--1679330119.json /tmp/output_branch/tpch-summary--1679328405.json
./compare.py /tmp/output_main/tpch.json /tmp/output_branch/tpch.json
```

This will produce output like:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1228,12 +1228,8 @@ impl ScalarUDFImpl for MyRegexUdf {
}
}

fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
match args {
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
match args.args.as_slice() {
[ColumnarValue::Scalar(ScalarValue::Utf8(value))] => {
Ok(ColumnarValue::Scalar(ScalarValue::Boolean(
self.matches(value.as_deref()),
Expand Down
8 changes: 6 additions & 2 deletions datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,13 @@ impl ScalarUDF {
self.inner.is_nullable(args, schema)
}

#[deprecated(since = "46.0.0", note = "Use `invoke_with_args` instead")]
pub fn invoke_batch(
&self,
args: &[ColumnarValue],
number_rows: usize,
) -> Result<ColumnarValue> {
#[allow(deprecated)]
self.inner.invoke_batch(args, number_rows)
}

Expand All @@ -244,15 +246,15 @@ impl ScalarUDF {
///
/// Note: This method is deprecated and will be removed in future releases.
/// User defined functions should implement [`Self::invoke_with_args`] instead.
#[deprecated(since = "42.1.0", note = "Use `invoke_batch` instead")]
#[deprecated(since = "42.1.0", note = "Use `invoke_with_args` instead")]
pub fn invoke_no_args(&self, number_rows: usize) -> Result<ColumnarValue> {
#[allow(deprecated)]
self.inner.invoke_no_args(number_rows)
}

/// Returns a `ScalarFunctionImplementation` that can invoke the function
/// during execution
#[deprecated(since = "42.0.0", note = "Use `invoke_batch` instead")]
#[deprecated(since = "42.0.0", note = "Use `invoke_with_args` instead")]
pub fn fun(&self) -> ScalarFunctionImplementation {
let captured = Arc::clone(&self.inner);
#[allow(deprecated)]
Expand Down Expand Up @@ -613,6 +615,7 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
/// User defined functions should implement [`Self::invoke_with_args`] instead.
///
/// See <https://github.com/apache/datafusion/issues/13515> for more details.
#[deprecated(since = "46.0.0", note = "Use `invoke_with_args` instead")]
fn invoke_batch(
&self,
args: &[ColumnarValue],
Expand Down Expand Up @@ -643,6 +646,7 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
/// [`ColumnarValue::values_to_arrays`] can be used to convert the arguments
/// to arrays, which will likely be simpler code, but be slower.
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
#[allow(deprecated)]
self.invoke_batch(&args.args, args.number_rows)
}

Expand Down
13 changes: 10 additions & 3 deletions datafusion/functions-nested/benches/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::sync::Arc;

use datafusion_common::ScalarValue;
use datafusion_expr::planner::ExprPlanner;
use datafusion_expr::{ColumnarValue, Expr};
use datafusion_expr::{ColumnarValue, Expr, ScalarFunctionArgs};
use datafusion_functions_nested::map::map_udf;
use datafusion_functions_nested::planner::NestedFunctionPlanner;

Expand Down Expand Up @@ -94,11 +94,18 @@ fn criterion_benchmark(c: &mut Criterion) {
let keys = ColumnarValue::Scalar(ScalarValue::List(Arc::new(key_list)));
let values = ColumnarValue::Scalar(ScalarValue::List(Arc::new(value_list)));

let return_type = &map_udf()
.return_type(&[DataType::Utf8, DataType::Int32])
.expect("should get return type");

b.iter(|| {
black_box(
// TODO use invoke_with_args
map_udf()
.invoke_batch(&[keys.clone(), values.clone()], 1)
.invoke_with_args(ScalarFunctionArgs {
args: vec![keys.clone(), values.clone()],
number_rows: 1,
return_type,
})
.expect("map should work on valid values"),
);
});
Expand Down
36 changes: 24 additions & 12 deletions datafusion/functions/benches/character_length.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

extern crate criterion;

use arrow::datatypes::DataType;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion_expr::ScalarFunctionArgs;
use helper::gen_string_array;

mod helper;
Expand All @@ -26,6 +28,8 @@ fn criterion_benchmark(c: &mut Criterion) {
// All benches are single batch run with 8192 rows
let character_length = datafusion_functions::unicode::character_length();

let return_type = DataType::Utf8;

let n_rows = 8192;
for str_len in [8, 32, 128, 4096] {
// StringArray ASCII only
Expand All @@ -34,8 +38,11 @@ fn criterion_benchmark(c: &mut Criterion) {
&format!("character_length_StringArray_ascii_str_len_{}", str_len),
|b| {
b.iter(|| {
// TODO use invoke_with_args
black_box(character_length.invoke_batch(&args_string_ascii, n_rows))
black_box(character_length.invoke_with_args(ScalarFunctionArgs {
args: args_string_ascii.clone(),
number_rows: n_rows,
return_type: &return_type,
}))
})
},
);
Expand All @@ -46,8 +53,11 @@ fn criterion_benchmark(c: &mut Criterion) {
&format!("character_length_StringArray_utf8_str_len_{}", str_len),
|b| {
b.iter(|| {
// TODO use invoke_with_args
black_box(character_length.invoke_batch(&args_string_utf8, n_rows))
black_box(character_length.invoke_with_args(ScalarFunctionArgs {
args: args_string_utf8.clone(),
number_rows: n_rows,
return_type: &return_type,
}))
})
},
);
Expand All @@ -58,10 +68,11 @@ fn criterion_benchmark(c: &mut Criterion) {
&format!("character_length_StringViewArray_ascii_str_len_{}", str_len),
|b| {
b.iter(|| {
// TODO use invoke_with_args
black_box(
character_length.invoke_batch(&args_string_view_ascii, n_rows),
)
black_box(character_length.invoke_with_args(ScalarFunctionArgs {
args: args_string_view_ascii.clone(),
number_rows: n_rows,
return_type: &return_type,
}))
})
},
);
Expand All @@ -72,10 +83,11 @@ fn criterion_benchmark(c: &mut Criterion) {
&format!("character_length_StringViewArray_utf8_str_len_{}", str_len),
|b| {
b.iter(|| {
// TODO use invoke_with_args
black_box(
character_length.invoke_batch(&args_string_view_utf8, n_rows),
)
black_box(character_length.invoke_with_args(ScalarFunctionArgs {
args: args_string_view_utf8.clone(),
number_rows: n_rows,
return_type: &return_type,
}))
})
},
);
Expand Down
15 changes: 13 additions & 2 deletions datafusion/functions/benches/chr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ extern crate criterion;

use arrow::{array::PrimitiveArray, datatypes::Int64Type, util::test_util::seedable_rng};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion_expr::ColumnarValue;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs};
use datafusion_functions::string::chr;
use rand::Rng;

use arrow::datatypes::DataType;
use std::sync::Arc;

fn criterion_benchmark(c: &mut Criterion) {
Expand All @@ -44,7 +45,17 @@ fn criterion_benchmark(c: &mut Criterion) {
let input = Arc::new(input);
let args = vec![ColumnarValue::Array(input)];
c.bench_function("chr", |b| {
b.iter(|| black_box(cot_fn.invoke_batch(&args, size).unwrap()))
b.iter(|| {
black_box(
cot_fn
.invoke_with_args(ScalarFunctionArgs {
args: args.clone(),
number_rows: size,
return_type: &DataType::Utf8,
})
.unwrap(),
)
})
});
}

Expand Down
25 changes: 20 additions & 5 deletions datafusion/functions/benches/cot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ use arrow::{
util::bench_util::create_primitive_array,
};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion_expr::ColumnarValue;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs};
use datafusion_functions::math::cot;

use arrow::datatypes::DataType;
use std::sync::Arc;

fn criterion_benchmark(c: &mut Criterion) {
Expand All @@ -34,16 +35,30 @@ fn criterion_benchmark(c: &mut Criterion) {
let f32_args = vec![ColumnarValue::Array(f32_array)];
c.bench_function(&format!("cot f32 array: {}", size), |b| {
b.iter(|| {
// TODO use invoke_with_args
black_box(cot_fn.invoke_batch(&f32_args, size).unwrap())
black_box(
cot_fn
.invoke_with_args(ScalarFunctionArgs {
args: f32_args.clone(),
number_rows: size,
return_type: &DataType::Float32,
})
.unwrap(),
)
})
});
let f64_array = Arc::new(create_primitive_array::<Float64Type>(size, 0.2));
let f64_args = vec![ColumnarValue::Array(f64_array)];
c.bench_function(&format!("cot f64 array: {}", size), |b| {
b.iter(|| {
// TODO use invoke_with_args
black_box(cot_fn.invoke_batch(&f64_args, size).unwrap())
black_box(
cot_fn
.invoke_with_args(ScalarFunctionArgs {
args: f64_args.clone(),
number_rows: size,
return_type: &DataType::Float64,
})
.unwrap(),
)
})
});
}
Expand Down
14 changes: 10 additions & 4 deletions datafusion/functions/benches/date_bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use datafusion_common::ScalarValue;
use rand::rngs::ThreadRng;
use rand::Rng;

use datafusion_expr::ColumnarValue;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs};
use datafusion_functions::datetime::date_bin;

fn timestamps(rng: &mut ThreadRng) -> TimestampSecondArray {
Expand All @@ -45,12 +45,18 @@ fn criterion_benchmark(c: &mut Criterion) {
let interval = ColumnarValue::Scalar(ScalarValue::new_interval_dt(0, 1_000_000));
let timestamps = ColumnarValue::Array(timestamps_array);
let udf = date_bin();
let return_type = udf
.return_type(&[interval.data_type(), timestamps.data_type()])
.unwrap();

b.iter(|| {
// TODO use invoke_with_args
black_box(
udf.invoke_batch(&[interval.clone(), timestamps.clone()], batch_len)
.expect("date_bin should work on valid values"),
udf.invoke_with_args(ScalarFunctionArgs {
args: vec![interval.clone(), timestamps.clone()],
number_rows: batch_len,
return_type: &return_type,
})
.expect("date_bin should work on valid values"),
)
})
});
Expand Down
12 changes: 8 additions & 4 deletions datafusion/functions/benches/date_trunc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use datafusion_common::ScalarValue;
use rand::rngs::ThreadRng;
use rand::Rng;

use datafusion_expr::ColumnarValue;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs};
use datafusion_functions::datetime::date_trunc;

fn timestamps(rng: &mut ThreadRng) -> TimestampSecondArray {
Expand All @@ -46,11 +46,15 @@ fn criterion_benchmark(c: &mut Criterion) {
ColumnarValue::Scalar(ScalarValue::Utf8(Some("minute".to_string())));
let timestamps = ColumnarValue::Array(timestamps_array);
let udf = date_trunc();

let return_type = &udf.return_type(&[timestamps.data_type()]).unwrap();
b.iter(|| {
black_box(
udf.invoke_batch(&[precision.clone(), timestamps.clone()], batch_len)
.expect("date_trunc should work on valid values"),
udf.invoke_with_args(ScalarFunctionArgs {
args: vec![precision.clone(), timestamps.clone()],
number_rows: batch_len,
return_type,
})
.expect("date_trunc should work on valid values"),
)
})
});
Expand Down
Loading

0 comments on commit dccbe5d

Please sign in to comment.