Skip to content

Commit

Permalink
support binaryView
Browse files Browse the repository at this point in the history
  • Loading branch information
Cheng-Yuan-Lai committed Mar 6, 2025
1 parent 1bba47c commit 03b40f4
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 40 deletions.
123 changes: 89 additions & 34 deletions datafusion/functions/src/crypto/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

//! "crypto" DataFusion functions
use arrow::array::{Array, ArrayRef, BinaryArray, OffsetSizeTrait};
use arrow::array::{
Array, ArrayRef, BinaryArray, BinaryArrayType, BinaryViewArray, GenericBinaryArray,
OffsetSizeTrait,
};
use arrow::array::{AsArray, GenericStringArray, StringArray, StringViewArray};
use arrow::datatypes::DataType;
use blake2::{Blake2b512, Blake2s256, Digest};
Expand Down Expand Up @@ -198,11 +201,13 @@ pub fn utf8_or_binary_to_binary_type(
arg_type: &DataType,
name: &str,
) -> Result<DataType> {
dbg!(arg_type);
Ok(match arg_type {
DataType::Utf8View
| DataType::LargeUtf8
| DataType::Utf8
| DataType::Binary
| DataType::BinaryView
| DataType::LargeBinary => DataType::Binary,
DataType::Null => DataType::Null,
_ => {
Expand Down Expand Up @@ -251,30 +256,44 @@ impl DigestAlgorithm {
where
T: OffsetSizeTrait,
{
let input_value = as_generic_binary_array::<T>(value)?;
let array: ArrayRef = match self {
Self::Md5 => digest_to_array!(Md5, input_value),
Self::Sha224 => digest_to_array!(Sha224, input_value),
Self::Sha256 => digest_to_array!(Sha256, input_value),
Self::Sha384 => digest_to_array!(Sha384, input_value),
Self::Sha512 => digest_to_array!(Sha512, input_value),
Self::Blake2b => digest_to_array!(Blake2b512, input_value),
Self::Blake2s => digest_to_array!(Blake2s256, input_value),
Self::Blake3 => {
let binary_array: BinaryArray = input_value
.iter()
.map(|opt| {
opt.map(|x| {
let mut digest = Blake3::default();
digest.update(x);
Blake3::finalize(&digest).as_bytes().to_vec()
})
})
.collect();
Arc::new(binary_array)
let array = match value.data_type() {
DataType::Binary | DataType::LargeBinary => {
let v = value.as_binary::<T>();
self.digest_binary_array_impl::<&GenericBinaryArray<T>>(v)
}
DataType::BinaryView => {
let v = value.as_binary_view();
self.digest_binary_array_impl::<&BinaryViewArray>(v)
}
other => {
return exec_err!("unsupported type for digest_utf_array: {other:?}")
}
};
Ok(ColumnarValue::Array(array))
// let input_value = as_generic_binary_array::<T>(value)?;
// let array: ArrayRef = match self {
// Self::Md5 => digest_to_array!(Md5, input_value),
// Self::Sha224 => digest_to_array!(Sha224, input_value),
// Self::Sha256 => digest_to_array!(Sha256, input_value),
// Self::Sha384 => digest_to_array!(Sha384, input_value),
// Self::Sha512 => digest_to_array!(Sha512, input_value),
// Self::Blake2b => digest_to_array!(Blake2b512, input_value),
// Self::Blake2s => digest_to_array!(Blake2s256, input_value),
// Self::Blake3 => {
// let binary_array: BinaryArray = input_value
// .iter()
// .map(|opt| {
// opt.map(|x| {
// let mut digest = Blake3::default();
// digest.update(x);
// Blake3::finalize(&digest).as_bytes().to_vec()
// })
// })
// .collect();
// Arc::new(binary_array)
// }
// };
// Ok(ColumnarValue::Array(array))
}

/// digest a string array to their hash values
Expand Down Expand Up @@ -328,6 +347,37 @@ impl DigestAlgorithm {
}
}
}

pub fn digest_binary_array_impl<'a, BinaryArrType>(
self,
input_value: BinaryArrType,
) -> ArrayRef
where
BinaryArrType: BinaryArrayType<'a>,
{
match self {
Self::Md5 => digest_to_array!(Md5, input_value),
Self::Sha224 => digest_to_array!(Sha224, input_value),
Self::Sha256 => digest_to_array!(Sha256, input_value),
Self::Sha384 => digest_to_array!(Sha384, input_value),
Self::Sha512 => digest_to_array!(Sha512, input_value),
Self::Blake2b => digest_to_array!(Blake2b512, input_value),
Self::Blake2s => digest_to_array!(Blake2s256, input_value),
Self::Blake3 => {
let binary_array: BinaryArray = input_value
.iter()
.map(|opt| {
opt.map(|x| {
let mut digest = Blake3::default();
digest.update(x);
Blake3::finalize(&digest).as_bytes().to_vec()
})
})
.collect();
Arc::new(binary_array)
}
}
}
}
pub fn digest_process(
value: &ColumnarValue,
Expand All @@ -342,22 +392,27 @@ pub fn digest_process(
DataType::LargeBinary => {
digest_algorithm.digest_binary_array::<i64>(a.as_ref())
}
other => exec_err!(
"Unsupported data type {other:?} for function {digest_algorithm}"
),
},
ColumnarValue::Scalar(scalar) => match scalar {
ScalarValue::Utf8View(a)
| ScalarValue::Utf8(a)
| ScalarValue::LargeUtf8(a) => {
Ok(digest_algorithm
.digest_scalar(a.as_ref().map(|s: &String| s.as_bytes())))
DataType::BinaryView => {
digest_algorithm.digest_binary_array::<i32>(a.as_ref())
}
ScalarValue::Binary(a) | ScalarValue::LargeBinary(a) => Ok(digest_algorithm
.digest_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
other => exec_err!(
"Unsupported data type {other:?} for function {digest_algorithm}"
),
},
ColumnarValue::Scalar(scalar) => {
match scalar {
ScalarValue::Utf8View(a)
| ScalarValue::Utf8(a)
| ScalarValue::LargeUtf8(a) => Ok(digest_algorithm
.digest_scalar(a.as_ref().map(|s: &String| s.as_bytes()))),
ScalarValue::Binary(a)
| ScalarValue::LargeBinary(a)
| ScalarValue::BinaryView(a) => Ok(digest_algorithm
.digest_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
other => exec_err!(
"Unsupported data type {other:?} for function {digest_algorithm}"
),
}
}
}
}
12 changes: 6 additions & 6 deletions datafusion/functions/src/crypto/sha256.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use datafusion_common::{
};
use datafusion_expr::{
ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
TypeSignature, Volatility,
Volatility,
};
use datafusion_expr_common::signature::{Coercion, TypeSignatureClass};
use datafusion_macros::user_doc;
Expand Down Expand Up @@ -57,12 +57,12 @@ impl Default for SHA256Func {
impl SHA256Func {
pub fn new() -> Self {
Self {
signature: Signature::new(
TypeSignature::Coercible(vec![Coercion::new_implicit(
TypeSignatureClass::Native(logical_string()),
vec![TypeSignatureClass::Native(logical_binary())],
signature: Signature::coercible(
vec![Coercion::new_implicit(
TypeSignatureClass::Native(logical_binary()),
vec![TypeSignatureClass::Native(logical_string())],
NativeType::Binary,
)]),
)],
Volatility::Immutable,
),
}
Expand Down

0 comments on commit 03b40f4

Please sign in to comment.