Skip to content

Commit

Permalink
Fix propagation of CSV options through protos (#245)
Browse files Browse the repository at this point in the history
* Fix sink output schema being passed in to `FileSinkExec` instead of sink input schema

* Expose double_quote csv option, and ensure all csv_options are propagated through logical/physical plans

---------

Co-authored-by: svranesevic <svranesevic@users.noreply.github.com>
  • Loading branch information
svranesevic and svranesevic authored Jun 11, 2024
1 parent 71393c5 commit 22e23cf
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 8 deletions.
1 change: 1 addition & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,7 @@ config_namespace! {
pub delimiter: u8, default = b','
pub quote: u8, default = b'"'
pub escape: Option<u8>, default = None
pub double_quote: bool, default = true
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
pub schema_infer_max_rec: usize, default = 100
pub date_format: Option<String>, default = None
Expand Down
7 changes: 6 additions & 1 deletion datafusion/common/src/file_options/csv_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,13 @@ impl TryFrom<&CsvOptions> for CsvWriterOptions {
fn try_from(value: &CsvOptions) -> Result<Self> {
let mut builder = WriterBuilder::default()
.with_header(value.has_header)
.with_delimiter(value.delimiter);
.with_quote(value.quote)
.with_delimiter(value.delimiter)
.with_double_quote(value.double_quote);

if let Some(v) = &value.escape {
builder = builder.with_escape(*v)
}
if let Some(v) = &value.date_format {
builder = builder.with_date_format(v.into())
}
Expand Down
7 changes: 7 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,12 @@ message CsvWriterOptions {
string time_format = 7;
// Optional value to represent null
string null_value = 8;
// Optional quote. Defaults to `b'"'`
string quote = 9;
// Optional escape. Defaults to `'\\'`
string escape = 10;
// Optional flag whether to double quote instead of escaping. Defaults to `true`
bool double_quote = 11;
}

// Options controlling CSV format
Expand All @@ -1271,6 +1277,7 @@ message CsvOptions {
string timestamp_tz_format = 10; // Optional timestamp with timezone format
string time_format = 11; // Optional time format
string null_value = 12; // Optional representation of null value
bool double_quote = 13; // Indicates whether to use double quotes instead of escaping
}

// Options controlling CSV format
Expand Down
70 changes: 70 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 27 additions & 1 deletion datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,9 @@ pub(crate) fn csv_writer_options_to_proto(
timestamp_format: csv_options.timestamp_format().unwrap_or("").to_owned(),
time_format: csv_options.time_format().unwrap_or("").to_owned(),
null_value: csv_options.null().to_owned(),
quote: (csv_options.quote() as char).to_string(),
escape: (csv_options.escape() as char).to_string(),
double_quote: csv_options.double_quote(),
}
}

Expand All @@ -1644,11 +1647,34 @@ pub(crate) fn csv_writer_options_from_proto(
return Err(proto_error("Error parsing CSV Delimiter"));
}
}
if !writer_options.quote.is_empty() {
if let Some(quote) = writer_options.quote.chars().next() {
if quote.is_ascii() {
builder = builder.with_quote(quote as u8);
} else {
return Err(proto_error("CSV quote is not ASCII"));
}
} else {
return Err(proto_error("Error parsing CSV quote"));
}
}
if !writer_options.escape.is_empty() {
if let Some(escape) = writer_options.escape.chars().next() {
if escape.is_ascii() {
builder = builder.with_escape(escape as u8);
} else {
return Err(proto_error("CSV escape is not ASCII"));
}
} else {
return Err(proto_error("Error parsing CSV escape"));
}
}
Ok(builder
.with_header(writer_options.has_header)
.with_date_format(writer_options.date_format.clone())
.with_datetime_format(writer_options.datetime_format.clone())
.with_timestamp_format(writer_options.timestamp_format.clone())
.with_time_format(writer_options.time_format.clone())
.with_null(writer_options.null_value.clone()))
.with_null(writer_options.null_value.clone())
.with_double_quote(writer_options.double_quote))
}
1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
delimiter: proto_opts.delimiter[0],
quote: proto_opts.quote[0],
escape: proto_opts.escape.first().copied(),
double_quote: proto_opts.double_quote,
compression: proto_opts.compression().into(),
schema_infer_max_rec: proto_opts.schema_infer_max_rec as usize,
date_format: (!proto_opts.date_format.is_empty())
Expand Down
12 changes: 6 additions & 6 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
.as_ref()
.ok_or_else(|| proto_error("Missing required field in protobuf"))?
.try_into()?;
let sink_schema = convert_required!(sink.sink_schema)?;
let sink_schema = input.schema();
let sort_order = sink
.sort_order
.as_ref()
Expand All @@ -1037,7 +1037,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
Ok(Arc::new(FileSinkExec::new(
input,
Arc::new(data_sink),
Arc::new(sink_schema),
sink_schema,
sort_order,
)))
}
Expand All @@ -1050,7 +1050,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
.as_ref()
.ok_or_else(|| proto_error("Missing required field in protobuf"))?
.try_into()?;
let sink_schema = convert_required!(sink.sink_schema)?;
let sink_schema = input.schema();
let sort_order = sink
.sort_order
.as_ref()
Expand All @@ -1067,7 +1067,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
Ok(Arc::new(FileSinkExec::new(
input,
Arc::new(data_sink),
Arc::new(sink_schema),
sink_schema,
sort_order,
)))
}
Expand All @@ -1080,7 +1080,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
.as_ref()
.ok_or_else(|| proto_error("Missing required field in protobuf"))?
.try_into()?;
let sink_schema = convert_required!(sink.sink_schema)?;
let sink_schema = input.schema();
let sort_order = sink
.sort_order
.as_ref()
Expand All @@ -1097,7 +1097,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
Ok(Arc::new(FileSinkExec::new(
input,
Arc::new(data_sink),
Arc::new(sink_schema),
sink_schema,
sort_order,
)))
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(),
time_format: opts.time_format.clone().unwrap_or_default(),
null_value: opts.null_value.clone().unwrap_or_default(),
double_quote: opts.double_quote,
})
}
}
Expand Down

0 comments on commit 22e23cf

Please sign in to comment.