diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index b44127d6a1b7..3208f2dd169f 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -113,6 +113,11 @@ harness = false name = "chr" required-features = ["string_expressions"] +[[bench]] +harness = false +name = "gcd" +required-features = ["math_expressions"] + [[bench]] harness = false name = "uuid" diff --git a/datafusion/functions/benches/gcd.rs b/datafusion/functions/benches/gcd.rs new file mode 100644 index 000000000000..f8c855c82ad4 --- /dev/null +++ b/datafusion/functions/benches/gcd.rs @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +extern crate criterion; + +use arrow::{ + array::{ArrayRef, Int64Array}, + datatypes::DataType, +}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use datafusion_common::ScalarValue; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs}; +use datafusion_functions::math::gcd; +use rand::Rng; +use std::sync::Arc; + +fn generate_i64_array(n_rows: usize) -> ArrayRef { + let mut rng = rand::thread_rng(); + let values = (0..n_rows) + .map(|_| rng.gen_range(0..1000)) + .collect::>(); + Arc::new(Int64Array::from(values)) as ArrayRef +} + +fn criterion_benchmark(c: &mut Criterion) { + let n_rows = 100000; + let array_a = ColumnarValue::Array(generate_i64_array(n_rows)); + let array_b = ColumnarValue::Array(generate_i64_array(n_rows)); + let udf = gcd(); + + c.bench_function("gcd both array", |b| { + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: vec![array_a.clone(), array_b.clone()], + number_rows: 0, + return_type: &DataType::Int64, + }) + .expect("date_bin should work on valid values"), + ) + }) + }); + + // 10! = 3628800 + let scalar_b = ColumnarValue::Scalar(ScalarValue::Int64(Some(3628800))); + + c.bench_function("gcd array and scalar", |b| { + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: vec![array_a.clone(), scalar_b.clone()], + number_rows: 0, + return_type: &DataType::Int64, + }) + .expect("date_bin should work on valid values"), + ) + }) + }); + + // scalar and scalar + let scalar_a = ColumnarValue::Scalar(ScalarValue::Int64(Some(3628800))); + + c.bench_function("gcd both scalar", |b| { + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: vec![scalar_a.clone(), scalar_b.clone()], + number_rows: 0, + return_type: &DataType::Int64, + }) + .expect("date_bin should work on valid values"), + ) + }) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions/src/math/gcd.rs b/datafusion/functions/src/math/gcd.rs index 911e00308ab7..7fe253b4afbc 100644 --- a/datafusion/functions/src/math/gcd.rs +++ b/datafusion/functions/src/math/gcd.rs @@ -15,19 +15,15 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{ArrayRef, Int64Array}; +use arrow::array::{new_null_array, ArrayRef, AsArray, Int64Array, PrimitiveArray}; +use arrow::compute::try_binary; +use arrow::datatypes::{DataType, Int64Type}; use arrow::error::ArrowError; use std::any::Any; use std::mem::swap; use std::sync::Arc; -use arrow::datatypes::DataType; -use arrow::datatypes::DataType::Int64; - -use crate::utils::make_scalar_function; -use datafusion_common::{ - arrow_datafusion_err, exec_err, internal_datafusion_err, DataFusionError, Result, -}; +use datafusion_common::{exec_err, internal_datafusion_err, Result, ScalarValue}; use datafusion_expr::{ ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, @@ -54,9 +50,12 @@ impl Default for GcdFunc { impl GcdFunc { pub fn new() -> Self { - use DataType::*; Self { - signature: Signature::uniform(2, vec![Int64], Volatility::Immutable), + signature: Signature::uniform( + 2, + vec![DataType::Int64], + Volatility::Immutable, + ), } } } @@ -75,11 +74,34 @@ impl ScalarUDFImpl for GcdFunc { } fn return_type(&self, _arg_types: &[DataType]) -> Result { - Ok(Int64) + Ok(DataType::Int64) } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - make_scalar_function(gcd, vec![])(&args.args) + let args: [ColumnarValue; 2] = args.args.try_into().map_err(|_| { + internal_datafusion_err!("Expected 2 arguments for function gcd") + })?; + + match args { + [ColumnarValue::Array(a), ColumnarValue::Array(b)] => { + compute_gcd_for_arrays(&a, &b) + } + [ColumnarValue::Scalar(ScalarValue::Int64(a)), ColumnarValue::Scalar(ScalarValue::Int64(b))] => { + match (a, b) { + (Some(a), Some(b)) => Ok(ColumnarValue::Scalar(ScalarValue::Int64( + Some(compute_gcd(a, b)?), + ))), + _ => Ok(ColumnarValue::Scalar(ScalarValue::Int64(None))), + } + } + [ColumnarValue::Array(a), ColumnarValue::Scalar(ScalarValue::Int64(b))] => { + compute_gcd_with_scalar(&a, b) + } + [ColumnarValue::Scalar(ScalarValue::Int64(a)), ColumnarValue::Array(b)] => { + compute_gcd_with_scalar(&b, a) + } + _ => exec_err!("Unsupported argument types for function gcd"), + } } fn documentation(&self) -> Option<&Documentation> { @@ -87,24 +109,34 @@ impl ScalarUDFImpl for GcdFunc { } } -/// Gcd SQL function -fn gcd(args: &[ArrayRef]) -> Result { - match args[0].data_type() { - Int64 => { - let arg1 = downcast_named_arg!(&args[0], "x", Int64Array); - let arg2 = downcast_named_arg!(&args[1], "y", Int64Array); +fn compute_gcd_for_arrays(a: &ArrayRef, b: &ArrayRef) -> Result { + let a = a.as_primitive::(); + let b = b.as_primitive::(); + try_binary(a, b, compute_gcd) + .map(|arr: PrimitiveArray| { + ColumnarValue::Array(Arc::new(arr) as ArrayRef) + }) + .map_err(Into::into) // convert ArrowError to DataFusionError +} - Ok(arg1 +fn compute_gcd_with_scalar(arr: &ArrayRef, scalar: Option) -> Result { + match scalar { + Some(scalar_value) => { + let result: Result = arr + .as_primitive::() .iter() - .zip(arg2.iter()) - .map(|(a1, a2)| match (a1, a2) { - (Some(a1), Some(a2)) => Ok(Some(compute_gcd(a1, a2)?)), + .map(|val| match val { + Some(val) => Ok(Some(compute_gcd(val, scalar_value)?)), _ => Ok(None), }) - .collect::>() - .map(Arc::new)? as ArrayRef) + .collect(); + + result.map(|arr| ColumnarValue::Array(Arc::new(arr) as ArrayRef)) } - other => exec_err!("Unsupported data type {other:?} for function gcd"), + None => Ok(ColumnarValue::Array(new_null_array( + &DataType::Int64, + arr.len(), + ))), } } @@ -132,61 +164,12 @@ pub(super) fn unsigned_gcd(mut a: u64, mut b: u64) -> u64 { } /// Computes greatest common divisor using Binary GCD algorithm. -pub fn compute_gcd(x: i64, y: i64) -> Result { +pub fn compute_gcd(x: i64, y: i64) -> Result { let a = x.unsigned_abs(); let b = y.unsigned_abs(); let r = unsigned_gcd(a, b); // gcd(i64::MIN, i64::MIN) = i64::MIN.unsigned_abs() cannot fit into i64 r.try_into().map_err(|_| { - arrow_datafusion_err!(ArrowError::ComputeError(format!( - "Signed integer overflow in GCD({x}, {y})" - ))) + ArrowError::ComputeError(format!("Signed integer overflow in GCD({x}, {y})")) }) } - -#[cfg(test)] -mod test { - use std::sync::Arc; - - use arrow::{ - array::{ArrayRef, Int64Array}, - error::ArrowError, - }; - - use crate::math::gcd::gcd; - use datafusion_common::{cast::as_int64_array, DataFusionError}; - - #[test] - fn test_gcd_i64() { - let args: Vec = vec![ - Arc::new(Int64Array::from(vec![0, 3, 25, -16])), // x - Arc::new(Int64Array::from(vec![0, -2, 15, 8])), // y - ]; - - let result = gcd(&args).expect("failed to initialize function gcd"); - let ints = as_int64_array(&result).expect("failed to initialize function gcd"); - - assert_eq!(ints.len(), 4); - assert_eq!(ints.value(0), 0); - assert_eq!(ints.value(1), 1); - assert_eq!(ints.value(2), 5); - assert_eq!(ints.value(3), 8); - } - - #[test] - fn overflow_on_both_param_i64_min() { - let args: Vec = vec![ - Arc::new(Int64Array::from(vec![i64::MIN])), // x - Arc::new(Int64Array::from(vec![i64::MIN])), // y - ]; - - match gcd(&args) { - // we expect a overflow - Err(DataFusionError::ArrowError(ArrowError::ComputeError(_), _)) => {} - Err(_) => { - panic!("failed to initialize function gcd") - } - Ok(_) => panic!("GCD({0}, {0}) should have overflown", i64::MIN), - }; - } -} diff --git a/datafusion/sqllogictest/test_files/math.slt b/datafusion/sqllogictest/test_files/math.slt index a3cf1a4e573f..a49e0a642106 100644 --- a/datafusion/sqllogictest/test_files/math.slt +++ b/datafusion/sqllogictest/test_files/math.slt @@ -623,12 +623,12 @@ select 1 1 1 # gcd with columns and expresions -query II rowsort +query II select gcd(a, b), gcd(c*d + 1, abs(e)) + f from signed_integers; ---- 1 11 -1 13 2 -10 +1 13 NULL NULL # gcd(i64::MIN, i64::MIN)