Skip to content

Commit

Permalink
feature flag avro
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS committed Mar 1, 2025
1 parent a0ad2bc commit 022d100
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 11 deletions.
1 change: 1 addition & 0 deletions datafusion/proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ name = "datafusion_proto"
default = ["parquet"]
json = ["pbjson", "serde", "serde_json"]
parquet = ["datafusion/parquet", "datafusion-common/parquet"]
avro = ["datafusion/avro", "datafusion-common/avro"]

[dependencies]
arrow = { workspace = true }
Expand Down
16 changes: 13 additions & 3 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use crate::{
use crate::protobuf::{proto_error, ToProtoError};
use arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion::datasource::cte_worktable::CteWorkTable;
#[cfg(feature = "avro")]
use datafusion::datasource::file_format::avro::AvroFormat;
#[cfg(feature = "parquet")]
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::{
Expand All @@ -43,8 +45,7 @@ use datafusion::datasource::file_format::{
use datafusion::{
datasource::{
file_format::{
avro::AvroFormat, csv::CsvFormat, json::JsonFormat as OtherNdJsonFormat,
FileFormat,
csv::CsvFormat, json::JsonFormat as OtherNdJsonFormat, FileFormat,
},
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
view::ViewTable,
Expand Down Expand Up @@ -440,7 +441,15 @@ impl AsLogicalPlan for LogicalPlanNode {
}
Arc::new(json)
}
FileFormatType::Avro(..) => Arc::new(AvroFormat),
#[cfg_attr(not(feature = "avro"), allow(unused_variables))]
FileFormatType::Avro(..) => {
#[cfg(feature = "avro")]
{
Arc::new(AvroFormat)
}
#[cfg(not(feature = "avro"))]
panic!("Unable to process avro file since `avro` feature is not enabled");
}
};

let table_paths = &scan
Expand Down Expand Up @@ -1072,6 +1081,7 @@ impl AsLogicalPlan for LogicalPlanNode {
}))
}

#[cfg(feature = "avro")]
if any.is::<AvroFormat>() {
maybe_some_type =
Some(FileFormatType::Avro(protobuf::AvroFormat {}))
Expand Down
25 changes: 17 additions & 8 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ use datafusion::datasource::file_format::file_compression_type::FileCompressionT
use datafusion::datasource::file_format::json::JsonSink;
#[cfg(feature = "parquet")]
use datafusion::datasource::file_format::parquet::ParquetSink;
#[cfg(feature = "avro")]
use datafusion::datasource::physical_plan::AvroSource;
#[cfg(feature = "parquet")]
use datafusion::datasource::physical_plan::ParquetSource;
use datafusion::datasource::physical_plan::{AvroSource, CsvSource, FileScanConfig};
use datafusion::datasource::physical_plan::{CsvSource, FileScanConfig};
use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
Expand Down Expand Up @@ -285,14 +287,20 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
#[cfg(not(feature = "parquet"))]
panic!("Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled")
}
#[cfg_attr(not(feature = "avro"), allow(unused_variables))]
PhysicalPlanType::AvroScan(scan) => {
let conf = parse_protobuf_file_scan_config(
scan.base_conf.as_ref().unwrap(),
registry,
extension_codec,
Arc::new(AvroSource::new()),
)?;
Ok(conf.build())
#[cfg(feature = "avro")]
{
let conf = parse_protobuf_file_scan_config(
scan.base_conf.as_ref().unwrap(),
registry,
extension_codec,
Arc::new(AvroSource::new()),
)?;
Ok(conf.build())
}
#[cfg(not(feature = "avro"))]
panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled")
}
PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
let input: Arc<dyn ExecutionPlan> = into_physical_plan(
Expand Down Expand Up @@ -1706,6 +1714,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
}
}

#[cfg(feature = "avro")]
if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
let data_source = data_source_exec.data_source();
if let Some(maybe_avro) =
Expand Down

0 comments on commit 022d100

Please sign in to comment.