From 03b40f42bb1cc41d5d6601893584516983738e47 Mon Sep 17 00:00:00 2001 From: Cheng-Yuan-Lai Date: Thu, 6 Mar 2025 23:47:53 +0800 Subject: [PATCH] support binaryView --- datafusion/functions/src/crypto/basic.rs | 123 ++++++++++++++++------ datafusion/functions/src/crypto/sha256.rs | 12 +-- 2 files changed, 95 insertions(+), 40 deletions(-) diff --git a/datafusion/functions/src/crypto/basic.rs b/datafusion/functions/src/crypto/basic.rs index 191154b8f8ff..3375addbd7cd 100644 --- a/datafusion/functions/src/crypto/basic.rs +++ b/datafusion/functions/src/crypto/basic.rs @@ -17,7 +17,10 @@ //! "crypto" DataFusion functions -use arrow::array::{Array, ArrayRef, BinaryArray, OffsetSizeTrait}; +use arrow::array::{ + Array, ArrayRef, BinaryArray, BinaryArrayType, BinaryViewArray, GenericBinaryArray, + OffsetSizeTrait, +}; use arrow::array::{AsArray, GenericStringArray, StringArray, StringViewArray}; use arrow::datatypes::DataType; use blake2::{Blake2b512, Blake2s256, Digest}; @@ -198,11 +201,13 @@ pub fn utf8_or_binary_to_binary_type( arg_type: &DataType, name: &str, ) -> Result { + dbg!(arg_type); Ok(match arg_type { DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 | DataType::Binary + | DataType::BinaryView | DataType::LargeBinary => DataType::Binary, DataType::Null => DataType::Null, _ => { @@ -251,30 +256,44 @@ impl DigestAlgorithm { where T: OffsetSizeTrait, { - let input_value = as_generic_binary_array::(value)?; - let array: ArrayRef = match self { - Self::Md5 => digest_to_array!(Md5, input_value), - Self::Sha224 => digest_to_array!(Sha224, input_value), - Self::Sha256 => digest_to_array!(Sha256, input_value), - Self::Sha384 => digest_to_array!(Sha384, input_value), - Self::Sha512 => digest_to_array!(Sha512, input_value), - Self::Blake2b => digest_to_array!(Blake2b512, input_value), - Self::Blake2s => digest_to_array!(Blake2s256, input_value), - Self::Blake3 => { - let binary_array: BinaryArray = input_value - .iter() - .map(|opt| { - opt.map(|x| { - let mut digest = Blake3::default(); - digest.update(x); - Blake3::finalize(&digest).as_bytes().to_vec() - }) - }) - .collect(); - Arc::new(binary_array) + let array = match value.data_type() { + DataType::Binary | DataType::LargeBinary => { + let v = value.as_binary::(); + self.digest_binary_array_impl::<&GenericBinaryArray>(v) + } + DataType::BinaryView => { + let v = value.as_binary_view(); + self.digest_binary_array_impl::<&BinaryViewArray>(v) + } + other => { + return exec_err!("unsupported type for digest_utf_array: {other:?}") } }; Ok(ColumnarValue::Array(array)) + // let input_value = as_generic_binary_array::(value)?; + // let array: ArrayRef = match self { + // Self::Md5 => digest_to_array!(Md5, input_value), + // Self::Sha224 => digest_to_array!(Sha224, input_value), + // Self::Sha256 => digest_to_array!(Sha256, input_value), + // Self::Sha384 => digest_to_array!(Sha384, input_value), + // Self::Sha512 => digest_to_array!(Sha512, input_value), + // Self::Blake2b => digest_to_array!(Blake2b512, input_value), + // Self::Blake2s => digest_to_array!(Blake2s256, input_value), + // Self::Blake3 => { + // let binary_array: BinaryArray = input_value + // .iter() + // .map(|opt| { + // opt.map(|x| { + // let mut digest = Blake3::default(); + // digest.update(x); + // Blake3::finalize(&digest).as_bytes().to_vec() + // }) + // }) + // .collect(); + // Arc::new(binary_array) + // } + // }; + // Ok(ColumnarValue::Array(array)) } /// digest a string array to their hash values @@ -328,6 +347,37 @@ impl DigestAlgorithm { } } } + + pub fn digest_binary_array_impl<'a, BinaryArrType>( + self, + input_value: BinaryArrType, + ) -> ArrayRef + where + BinaryArrType: BinaryArrayType<'a>, + { + match self { + Self::Md5 => digest_to_array!(Md5, input_value), + Self::Sha224 => digest_to_array!(Sha224, input_value), + Self::Sha256 => digest_to_array!(Sha256, input_value), + Self::Sha384 => digest_to_array!(Sha384, input_value), + Self::Sha512 => digest_to_array!(Sha512, input_value), + Self::Blake2b => digest_to_array!(Blake2b512, input_value), + Self::Blake2s => digest_to_array!(Blake2s256, input_value), + Self::Blake3 => { + let binary_array: BinaryArray = input_value + .iter() + .map(|opt| { + opt.map(|x| { + let mut digest = Blake3::default(); + digest.update(x); + Blake3::finalize(&digest).as_bytes().to_vec() + }) + }) + .collect(); + Arc::new(binary_array) + } + } + } } pub fn digest_process( value: &ColumnarValue, @@ -342,22 +392,27 @@ pub fn digest_process( DataType::LargeBinary => { digest_algorithm.digest_binary_array::(a.as_ref()) } - other => exec_err!( - "Unsupported data type {other:?} for function {digest_algorithm}" - ), - }, - ColumnarValue::Scalar(scalar) => match scalar { - ScalarValue::Utf8View(a) - | ScalarValue::Utf8(a) - | ScalarValue::LargeUtf8(a) => { - Ok(digest_algorithm - .digest_scalar(a.as_ref().map(|s: &String| s.as_bytes()))) + DataType::BinaryView => { + digest_algorithm.digest_binary_array::(a.as_ref()) } - ScalarValue::Binary(a) | ScalarValue::LargeBinary(a) => Ok(digest_algorithm - .digest_scalar(a.as_ref().map(|v: &Vec| v.as_slice()))), other => exec_err!( "Unsupported data type {other:?} for function {digest_algorithm}" ), }, + ColumnarValue::Scalar(scalar) => { + match scalar { + ScalarValue::Utf8View(a) + | ScalarValue::Utf8(a) + | ScalarValue::LargeUtf8(a) => Ok(digest_algorithm + .digest_scalar(a.as_ref().map(|s: &String| s.as_bytes()))), + ScalarValue::Binary(a) + | ScalarValue::LargeBinary(a) + | ScalarValue::BinaryView(a) => Ok(digest_algorithm + .digest_scalar(a.as_ref().map(|v: &Vec| v.as_slice()))), + other => exec_err!( + "Unsupported data type {other:?} for function {digest_algorithm}" + ), + } + } } } diff --git a/datafusion/functions/src/crypto/sha256.rs b/datafusion/functions/src/crypto/sha256.rs index cd63b846fcc6..7a25ac96e639 100644 --- a/datafusion/functions/src/crypto/sha256.rs +++ b/datafusion/functions/src/crypto/sha256.rs @@ -24,7 +24,7 @@ use datafusion_common::{ }; use datafusion_expr::{ ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, - TypeSignature, Volatility, + Volatility, }; use datafusion_expr_common::signature::{Coercion, TypeSignatureClass}; use datafusion_macros::user_doc; @@ -57,12 +57,12 @@ impl Default for SHA256Func { impl SHA256Func { pub fn new() -> Self { Self { - signature: Signature::new( - TypeSignature::Coercible(vec![Coercion::new_implicit( - TypeSignatureClass::Native(logical_string()), - vec![TypeSignatureClass::Native(logical_binary())], + signature: Signature::coercible( + vec![Coercion::new_implicit( + TypeSignatureClass::Native(logical_binary()), + vec![TypeSignatureClass::Native(logical_string())], NativeType::Binary, - )]), + )], Volatility::Immutable, ), }