Skip to content

Commit

Permalink
feat: Add ScalarUDF support in FFI crate (apache#14579)
Browse files Browse the repository at this point in the history
* initial commit for scalar udf in ffi crate

* Add utility functions for converting back and forth to RResult

* License text

* There is no need to repeat the trait doc strings

* Resolve clippy warning

* Add unit tests for ffi scalar udfs

* Add license text

* Switch over ffi modules to use the new macros for conversion back and forth between result and rresult

* Attempting to fix CI based on recommendation to try running clean, but this shouldn't be necessary

* Revert "Attempting to fix CI based on recommendation to try running clean, but this shouldn't be necessary"

This reverts commit 10248c2.

* arrow_schema was removed during rebase

* Switch from trying to expose the entire type signature to using the user_defined type

* Call function to get valid types for scalar udf

* Adding documentation

* Resolve doctest failure
  • Loading branch information
timsaucer authored Feb 19, 2025
1 parent ee2d2a4 commit 8ab0661
Show file tree
Hide file tree
Showing 12 changed files with 741 additions and 178 deletions.
13 changes: 12 additions & 1 deletion datafusion/ffi/src/arrow_wrappers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use std::sync::Arc;

use abi_stable::StableAbi;
use arrow::{
array::{make_array, ArrayRef},
datatypes::{Schema, SchemaRef},
ffi::{FFI_ArrowArray, FFI_ArrowSchema},
ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema},
};
use log::error;

Expand Down Expand Up @@ -68,3 +69,13 @@ pub struct WrappedArray {

pub schema: WrappedSchema,
}

impl TryFrom<WrappedArray> for ArrayRef {
type Error = arrow::error::ArrowError;

fn try_from(value: WrappedArray) -> Result<Self, Self::Error> {
let data = unsafe { from_ffi(value.array, &value.schema.0)? };

Ok(make_array(data))
}
}
25 changes: 8 additions & 17 deletions datafusion/ffi/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use datafusion::{
use tokio::runtime::Handle;

use crate::{
plan_properties::FFI_PlanProperties, record_batch_stream::FFI_RecordBatchStream,
df_result, plan_properties::FFI_PlanProperties,
record_batch_stream::FFI_RecordBatchStream, rresult,
};

/// A stable struct for sharing a [`ExecutionPlan`] across FFI boundaries.
Expand Down Expand Up @@ -112,13 +113,11 @@ unsafe extern "C" fn execute_fn_wrapper(
let ctx = &(*private_data).context;
let runtime = (*private_data).runtime.clone();

match plan.execute(partition, Arc::clone(ctx)) {
Ok(rbs) => RResult::ROk(FFI_RecordBatchStream::new(rbs, runtime)),
Err(e) => RResult::RErr(
format!("Error occurred during FFI_ExecutionPlan execute: {}", e).into(),
),
}
rresult!(plan
.execute(partition, Arc::clone(ctx))
.map(|rbs| FFI_RecordBatchStream::new(rbs, runtime)))
}

unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString {
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
let plan = &(*private_data).plan;
Expand Down Expand Up @@ -274,16 +273,8 @@ impl ExecutionPlan for ForeignExecutionPlan {
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unsafe {
match (self.plan.execute)(&self.plan, partition) {
RResult::ROk(stream) => {
let stream = Pin::new(Box::new(stream)) as SendableRecordBatchStream;
Ok(stream)
}
RResult::RErr(e) => Err(DataFusionError::Execution(format!(
"Error occurred during FFI call to FFI_ExecutionPlan execute. {}",
e
))),
}
df_result!((self.plan.execute)(&self.plan, partition))
.map(|stream| Pin::new(Box::new(stream)) as SendableRecordBatchStream)
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub mod record_batch_stream;
pub mod session_config;
pub mod table_provider;
pub mod table_source;
pub mod udf;
pub mod util;
pub mod volatility;

#[cfg(feature = "integration-tests")]
pub mod tests;
Expand Down
116 changes: 46 additions & 70 deletions datafusion/ffi/src/plan_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use std::{ffi::c_void, sync::Arc};

use abi_stable::{
std_types::{
RResult::{self, RErr, ROk},
RStr, RVec,
RResult::{self, ROk},
RString, RVec,
},
StableAbi,
};
Expand All @@ -44,7 +44,7 @@ use datafusion_proto::{
};
use prost::Message;

use crate::arrow_wrappers::WrappedSchema;
use crate::{arrow_wrappers::WrappedSchema, df_result, rresult_return};

/// A stable struct for sharing [`PlanProperties`] across FFI boundaries.
#[repr(C)]
Expand All @@ -54,7 +54,7 @@ pub struct FFI_PlanProperties {
/// The output partitioning is a [`Partitioning`] protobuf message serialized
/// into bytes to pass across the FFI boundary.
pub output_partitioning:
unsafe extern "C" fn(plan: &Self) -> RResult<RVec<u8>, RStr<'static>>,
unsafe extern "C" fn(plan: &Self) -> RResult<RVec<u8>, RString>,

/// Return the emission type of the plan.
pub emission_type: unsafe extern "C" fn(plan: &Self) -> FFI_EmissionType,
Expand All @@ -64,8 +64,7 @@ pub struct FFI_PlanProperties {

/// The output ordering is a [`PhysicalSortExprNodeCollection`] protobuf message
/// serialized into bytes to pass across the FFI boundary.
pub output_ordering:
unsafe extern "C" fn(plan: &Self) -> RResult<RVec<u8>, RStr<'static>>,
pub output_ordering: unsafe extern "C" fn(plan: &Self) -> RResult<RVec<u8>, RString>,

/// Return the schema of the plan.
pub schema: unsafe extern "C" fn(plan: &Self) -> WrappedSchema,
Expand All @@ -84,21 +83,13 @@ struct PlanPropertiesPrivateData {

unsafe extern "C" fn output_partitioning_fn_wrapper(
properties: &FFI_PlanProperties,
) -> RResult<RVec<u8>, RStr<'static>> {
) -> RResult<RVec<u8>, RString> {
let private_data = properties.private_data as *const PlanPropertiesPrivateData;
let props = &(*private_data).props;

let codec = DefaultPhysicalExtensionCodec {};
let partitioning_data =
match serialize_partitioning(props.output_partitioning(), &codec) {
Ok(p) => p,
Err(_) => {
return RErr(
"unable to serialize output_partitioning in FFI_PlanProperties"
.into(),
)
}
};
rresult_return!(serialize_partitioning(props.output_partitioning(), &codec));
let output_partitioning = partitioning_data.encode_to_vec();

ROk(output_partitioning.into())
Expand All @@ -122,31 +113,24 @@ unsafe extern "C" fn boundedness_fn_wrapper(

unsafe extern "C" fn output_ordering_fn_wrapper(
properties: &FFI_PlanProperties,
) -> RResult<RVec<u8>, RStr<'static>> {
) -> RResult<RVec<u8>, RString> {
let private_data = properties.private_data as *const PlanPropertiesPrivateData;
let props = &(*private_data).props;

let codec = DefaultPhysicalExtensionCodec {};
let output_ordering =
match props.output_ordering() {
Some(ordering) => {
let physical_sort_expr_nodes =
match serialize_physical_sort_exprs(ordering.to_owned(), &codec) {
Ok(v) => v,
Err(_) => return RErr(
"unable to serialize output_ordering in FFI_PlanProperties"
.into(),
),
};

let ordering_data = PhysicalSortExprNodeCollection {
physical_sort_expr_nodes,
};

ordering_data.encode_to_vec()
}
None => Vec::default(),
};
let output_ordering = match props.output_ordering() {
Some(ordering) => {
let physical_sort_expr_nodes = rresult_return!(
serialize_physical_sort_exprs(ordering.to_owned(), &codec)
);
let ordering_data = PhysicalSortExprNodeCollection {
physical_sort_expr_nodes,
};

ordering_data.encode_to_vec()
}
None => Vec::default(),
};
ROk(output_ordering.into())
}

Expand Down Expand Up @@ -200,40 +184,32 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
let codex = DefaultPhysicalExtensionCodec {};

let ffi_orderings = unsafe { (ffi_props.output_ordering)(&ffi_props) };
let orderings = match ffi_orderings {
ROk(ordering_vec) => {
let proto_output_ordering =
PhysicalSortExprNodeCollection::decode(ordering_vec.as_ref())
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Some(parse_physical_sort_exprs(
&proto_output_ordering.physical_sort_expr_nodes,
&default_ctx,
&schema,
&codex,
)?)
}
RErr(e) => return Err(DataFusionError::Plan(e.to_string())),
};

let ffi_partitioning = unsafe { (ffi_props.output_partitioning)(&ffi_props) };
let partitioning = match ffi_partitioning {
ROk(partitioning_vec) => {
let proto_output_partitioning =
Partitioning::decode(partitioning_vec.as_ref())
.map_err(|e| DataFusionError::External(Box::new(e)))?;
parse_protobuf_partitioning(
Some(&proto_output_partitioning),
&default_ctx,
&schema,
&codex,
)?
.ok_or(DataFusionError::Plan(
"Unable to deserialize partitioning protobuf in FFI_PlanProperties"
.to_string(),
))
}
RErr(e) => Err(DataFusionError::Plan(e.to_string())),
}?;
let proto_output_ordering =
PhysicalSortExprNodeCollection::decode(df_result!(ffi_orderings)?.as_ref())
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let orderings = Some(parse_physical_sort_exprs(
&proto_output_ordering.physical_sort_expr_nodes,
&default_ctx,
&schema,
&codex,
)?);

let partitioning_vec =
unsafe { df_result!((ffi_props.output_partitioning)(&ffi_props))? };
let proto_output_partitioning =
Partitioning::decode(partitioning_vec.as_ref())
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let partitioning = parse_protobuf_partitioning(
Some(&proto_output_partitioning),
&default_ctx,
&schema,
&codex,
)?
.ok_or(DataFusionError::Plan(
"Unable to deserialize partitioning protobuf in FFI_PlanProperties"
.to_string(),
))?;

let eq_properties = match orderings {
Some(ordering) => {
Expand Down
16 changes: 9 additions & 7 deletions datafusion/ffi/src/record_batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ use datafusion::{
use futures::{Stream, TryStreamExt};
use tokio::runtime::Handle;

use crate::arrow_wrappers::{WrappedArray, WrappedSchema};
use crate::{
arrow_wrappers::{WrappedArray, WrappedSchema},
rresult,
};

/// A stable struct for sharing [`RecordBatchStream`] across FFI boundaries.
/// We use the async-ffi crate for handling async calls across libraries.
Expand Down Expand Up @@ -97,13 +100,12 @@ fn record_batch_to_wrapped_array(
record_batch: RecordBatch,
) -> RResult<WrappedArray, RString> {
let struct_array = StructArray::from(record_batch);
match to_ffi(&struct_array.to_data()) {
Ok((array, schema)) => RResult::ROk(WrappedArray {
rresult!(
to_ffi(&struct_array.to_data()).map(|(array, schema)| WrappedArray {
array,
schema: WrappedSchema(schema),
}),
Err(e) => RResult::RErr(e.to_string().into()),
}
schema: WrappedSchema(schema)
})
)
}

// probably want to use pub unsafe fn from_ffi(array: FFI_ArrowArray, schema: &FFI_ArrowSchema) -> Result<ArrayData> {
Expand Down
Loading

0 comments on commit 8ab0661

Please sign in to comment.