Skip to content

Commit

Permalink
avro: Bump apache-avro to 0.17.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Feb 6, 2025
1 parent 764b5e0 commit 3c2cf0e
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 81 deletions.
78 changes: 8 additions & 70 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async-compression = { version = "0.3", features = [
async-stripe = { version = "0.37", features = ["runtime-tokio-hyper"] }
async-trait = "0.1"
atty = "0.2"
apache-avro = { version = "0.16.0", features = ["snappy"] }
apache-avro = { version = "0.17.0", features = ["snappy"] }

base64 = "0.13"
bigdecimal = "0.3.0"
Expand Down
17 changes: 11 additions & 6 deletions crates/avro/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ fn maybe_encode<'s, 'n, N: AsNode>(
// Schematized field is not present in this object instance.

if schema.name == FLOW_EXTRA_NAME {
let Schema::Map(value_schema) = &schema.schema else {
let Schema::Map(map_schema) = &schema.schema else {
return Err(Error::ExtraPropertiesMap);
};
// This field is constructed as the last schematized field of the schema.
Expand All @@ -237,7 +237,7 @@ fn maybe_encode<'s, 'n, N: AsNode>(
for (name, value) in extra.iter() {
zig_zag(b, name.len() as i64); // Key length.
b.extend(name.as_bytes()); // Key content.
encode(loc.push_prop(name), b, &value_schema, *value)?;
encode(loc.push_prop(name), b, &map_schema.types, *value)?;
}
}
zig_zag(b, 0); // Close map.
Expand Down Expand Up @@ -270,23 +270,28 @@ fn maybe_encode<'s, 'n, N: AsNode>(
}
Ok(true)
}
(Schema::Map(schema), Node::Object(fields)) => {
(Schema::Map(map_schema), Node::Object(fields)) => {
if fields.len() != 0 {
zig_zag(b, fields.len() as i64);
for field in fields.iter() {
zig_zag(b, field.property().len() as i64); // Key length.
b.extend(field.property().as_bytes()); // Key content.
encode(loc.push_prop(field.property()), b, &schema, field.value())?;
encode(
loc.push_prop(field.property()),
b,
&map_schema.types,
field.value(),
)?;
}
}
zig_zag(b, 0); // Close map.
Ok(true)
}
(Schema::Array(schema), Node::Array(items)) => {
(Schema::Array(array_schema), Node::Array(items)) => {
if !items.is_empty() {
zig_zag(b, items.len() as i64);
for (index, item) in items.iter().enumerate() {
encode(loc.push_item(index), b, schema, item)?;
encode(loc.push_item(index), b, &array_schema.items, item)?;
}
}
zig_zag(b, 0); // Close array.
Expand Down
6 changes: 3 additions & 3 deletions crates/avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ fn object_to_avro(loc: json::Location, obj: doc::shape::ObjShape) -> avro::Schem
// additional properties, then interpret it as an Avro map.
if extra.type_ != types::INVALID && obj.properties.is_empty() {
let schema = shape_to_avro(loc, extra, true);
return avro::Schema::Map(Box::new(schema));
return avro::Schema::map(schema);
}

// Otherwise, build a Record which may have a placeholder
Expand Down Expand Up @@ -166,7 +166,7 @@ fn object_to_avro(loc: json::Location, obj: doc::shape::ObjShape) -> avro::Schem

if extra.type_ != types::INVALID {
let schema = shape_to_avro(loc.push_prop(FLOW_EXTRA_NAME), extra, true);
let schema = avro::Schema::Map(Box::new(schema));
let schema = avro::Schema::map(schema);

fields.push(avro::RecordField {
aliases: None,
Expand Down Expand Up @@ -205,7 +205,7 @@ fn array_to_avro(loc: json::Location, shape: doc::shape::ArrayShape) -> avro::Sc
}

let items = shape_to_avro(loc.push_prop("_items"), items, true);
avro::Schema::Array(Box::new(items))
avro::Schema::array(items)
}

// Map a location into a special Schema holding a string-encoded JSON value.
Expand Down
22 changes: 21 additions & 1 deletion crates/parser/src/format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub enum AvroError {
Read(apache_avro::Error),
#[error("invalid floating point value '{0}' for column '{1}'")]
InvalidFloat(String, String),
#[error("invalid big decimal value '{0}' for column '{1}'")]
InvalidBigDecimal(String, String),

/// Date-like values in avro are essentially just type hints on top of numeric primitives, so
/// it's distinctly possible for them to be out of range. This error is returned in that case.
Expand Down Expand Up @@ -104,6 +106,16 @@ fn avro_to_json(
Boolean(b) => Ok(Value::Bool(b)),
Int(i) => Ok(Value::Number(i.into())),
Long(l) => Ok(Value::Number(l.into())),
BigDecimal(d) => {
if allow_string_repr {
Ok(Value::String(d.to_string()))
} else {
Err(AvroError::InvalidBigDecimal(
d.to_string(),
column_name.to_string(),
))
}
}
// Floating point numbers require special handling for NAN and +/-inf because those values
// can _only_ be represented as strings in json. If the json types happen to allow strings,
// then we'll coerce them into strings, but only for the values that aren't representable
Expand Down Expand Up @@ -184,9 +196,15 @@ fn avro_to_json(
TimestampMicros(t) if allow_string_repr => {
timestamp_from_unix_epoch("timestamp-micros", column_name, t, MICROS_PER_SEC)
}
TimestampNanos(t) if allow_string_repr => {
timestamp_from_unix_epoch("timestamp-nanos", column_name, t, NANOS_PER_SEC)
}
LocalTimestampMicros(t) if allow_string_repr => {
timestamp_from_unix_epoch("local-timestamp-micros", column_name, t, MICROS_PER_SEC)
}
LocalTimestampNanos(t) if allow_string_repr => {
timestamp_from_unix_epoch("local-timestamp-nanos", column_name, t, NANOS_PER_SEC)
}
// If !allow_string_repr, then all the date values will be converted directly to json
// numbers. This allows users to handle any conversions themselves, by disallowing string
// types in their json schema.
Expand All @@ -195,7 +213,9 @@ fn avro_to_json(
| TimestampMicros(i)
| LocalTimestampMicros(i)
| TimestampMillis(i)
| LocalTimestampMillis(i) => Ok(Value::Number(i.into())),
| LocalTimestampMillis(i)
| TimestampNanos(i)
| LocalTimestampNanos(i) => Ok(Value::Number(i.into())),

Duration(avro_dur) => {
// avro durations are really weird. We always convert them to json objects, since
Expand Down

0 comments on commit 3c2cf0e

Please sign in to comment.