From 3c2cf0e4ae596a45a3e35d0f11e8051a83b60a4b Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Thu, 30 Jan 2025 11:15:37 -0500 Subject: [PATCH] avro: Bump `apache-avro` to `0.17.0` --- Cargo.lock | 78 ++++---------------------------- Cargo.toml | 2 +- crates/avro/src/encode.rs | 17 ++++--- crates/avro/src/schema.rs | 6 +-- crates/parser/src/format/avro.rs | 22 ++++++++- 5 files changed, 44 insertions(+), 81 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e23eb0a7dc5..0d77f8eb4b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -351,31 +351,6 @@ version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" -[[package]] -name = "apache-avro" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ceb7c683b2f8f40970b70e39ff8be514c95b96fcb9c4af87e1ed2cb2e10801a0" -dependencies = [ - "crc32fast", - "digest", - "lazy_static", - "libflate", - "log", - "num-bigint", - "quad-rand", - "rand 0.8.5", - "regex-lite", - "serde", - "serde_json", - "snap", - "strum 0.25.0", - "strum_macros 0.25.3", - "thiserror", - "typed-builder 0.16.2", - "uuid 1.10.0", -] - [[package]] name = "apache-avro" version = "0.17.0" @@ -383,6 +358,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aef82843a0ec9f8b19567445ad2421ceeb1d711514384bdd3d49fe37102ee13" dependencies = [ "bigdecimal 0.4.7", + "crc32fast", "digest", "libflate", "log", @@ -393,10 +369,11 @@ dependencies = [ "serde", "serde_bytes", "serde_json", + "snap", "strum 0.26.3", "strum_macros 0.26.4", "thiserror", - "typed-builder 0.19.1", + "typed-builder", "uuid 1.10.0", ] @@ -721,7 +698,7 @@ dependencies = [ name = "avro" version = "0.0.0" dependencies = [ - "apache-avro 0.16.0", + "apache-avro", "doc", "hexdump", "insta", @@ -1992,7 +1969,7 @@ dependencies = [ "aes-siv", "allocator", "anyhow", - "apache-avro 0.16.0", + "apache-avro", "async-process", "async-trait", "avro", @@ -4518,7 +4495,7 @@ dependencies = [ name = "parser" version = "0.0.0" dependencies = [ - "apache-avro 0.16.0", + "apache-avro", "assert_cmd", "base64 0.13.1", "bytecount", @@ -5938,7 +5915,7 @@ version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bcc3cf40651cf503827a34bcd7efbbd4750a7e3adc6768bb8089977e4d07303b" dependencies = [ - "apache-avro 0.17.0", + "apache-avro", "byteorder", "dashmap 6.1.0", "futures", @@ -6620,12 +6597,6 @@ dependencies = [ "strum_macros 0.24.3", ] -[[package]] -name = "strum" -version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" - [[package]] name = "strum" version = "0.26.3" @@ -6645,19 +6616,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "strum_macros" -version = "0.25.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" -dependencies = [ - "heck 0.4.1", - "proc-macro2", - "quote", - "rustversion", - "syn 2.0.74", -] - [[package]] name = "strum_macros" version = "0.26.4" @@ -7335,33 +7293,13 @@ dependencies = [ "static_assertions", ] -[[package]] -name = "typed-builder" -version = "0.16.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34085c17941e36627a879208083e25d357243812c30e7d7387c3b954f30ade16" -dependencies = [ - "typed-builder-macro 0.16.2", -] - [[package]] name = "typed-builder" version = "0.19.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a06fbd5b8de54c5f7c91f6fe4cebb949be2125d7758e630bb58b1d831dbce600" dependencies = [ - "typed-builder-macro 0.19.1", -] - -[[package]] -name = "typed-builder-macro" -version = "0.16.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.74", + "typed-builder-macro", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index d9e7fbd7fc5..dd437431778 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ async-compression = { version = "0.3", features = [ async-stripe = { version = "0.37", features = ["runtime-tokio-hyper"] } async-trait = "0.1" atty = "0.2" -apache-avro = { version = "0.16.0", features = ["snappy"] } +apache-avro = { version = "0.17.0", features = ["snappy"] } base64 = "0.13" bigdecimal = "0.3.0" diff --git a/crates/avro/src/encode.rs b/crates/avro/src/encode.rs index 08b003516b0..003b6d33097 100644 --- a/crates/avro/src/encode.rs +++ b/crates/avro/src/encode.rs @@ -222,7 +222,7 @@ fn maybe_encode<'s, 'n, N: AsNode>( // Schematized field is not present in this object instance. if schema.name == FLOW_EXTRA_NAME { - let Schema::Map(value_schema) = &schema.schema else { + let Schema::Map(map_schema) = &schema.schema else { return Err(Error::ExtraPropertiesMap); }; // This field is constructed as the last schematized field of the schema. @@ -237,7 +237,7 @@ fn maybe_encode<'s, 'n, N: AsNode>( for (name, value) in extra.iter() { zig_zag(b, name.len() as i64); // Key length. b.extend(name.as_bytes()); // Key content. - encode(loc.push_prop(name), b, &value_schema, *value)?; + encode(loc.push_prop(name), b, &map_schema.types, *value)?; } } zig_zag(b, 0); // Close map. @@ -270,23 +270,28 @@ fn maybe_encode<'s, 'n, N: AsNode>( } Ok(true) } - (Schema::Map(schema), Node::Object(fields)) => { + (Schema::Map(map_schema), Node::Object(fields)) => { if fields.len() != 0 { zig_zag(b, fields.len() as i64); for field in fields.iter() { zig_zag(b, field.property().len() as i64); // Key length. b.extend(field.property().as_bytes()); // Key content. - encode(loc.push_prop(field.property()), b, &schema, field.value())?; + encode( + loc.push_prop(field.property()), + b, + &map_schema.types, + field.value(), + )?; } } zig_zag(b, 0); // Close map. Ok(true) } - (Schema::Array(schema), Node::Array(items)) => { + (Schema::Array(array_schema), Node::Array(items)) => { if !items.is_empty() { zig_zag(b, items.len() as i64); for (index, item) in items.iter().enumerate() { - encode(loc.push_item(index), b, schema, item)?; + encode(loc.push_item(index), b, &array_schema.items, item)?; } } zig_zag(b, 0); // Close array. diff --git a/crates/avro/src/schema.rs b/crates/avro/src/schema.rs index 332e2e5c8a2..8237e15e2f6 100644 --- a/crates/avro/src/schema.rs +++ b/crates/avro/src/schema.rs @@ -135,7 +135,7 @@ fn object_to_avro(loc: json::Location, obj: doc::shape::ObjShape) -> avro::Schem // additional properties, then interpret it as an Avro map. if extra.type_ != types::INVALID && obj.properties.is_empty() { let schema = shape_to_avro(loc, extra, true); - return avro::Schema::Map(Box::new(schema)); + return avro::Schema::map(schema); } // Otherwise, build a Record which may have a placeholder @@ -166,7 +166,7 @@ fn object_to_avro(loc: json::Location, obj: doc::shape::ObjShape) -> avro::Schem if extra.type_ != types::INVALID { let schema = shape_to_avro(loc.push_prop(FLOW_EXTRA_NAME), extra, true); - let schema = avro::Schema::Map(Box::new(schema)); + let schema = avro::Schema::map(schema); fields.push(avro::RecordField { aliases: None, @@ -205,7 +205,7 @@ fn array_to_avro(loc: json::Location, shape: doc::shape::ArrayShape) -> avro::Sc } let items = shape_to_avro(loc.push_prop("_items"), items, true); - avro::Schema::Array(Box::new(items)) + avro::Schema::array(items) } // Map a location into a special Schema holding a string-encoded JSON value. diff --git a/crates/parser/src/format/avro.rs b/crates/parser/src/format/avro.rs index 087537ba8ca..31948abe8e7 100644 --- a/crates/parser/src/format/avro.rs +++ b/crates/parser/src/format/avro.rs @@ -22,6 +22,8 @@ pub enum AvroError { Read(apache_avro::Error), #[error("invalid floating point value '{0}' for column '{1}'")] InvalidFloat(String, String), + #[error("invalid big decimal value '{0}' for column '{1}'")] + InvalidBigDecimal(String, String), /// Date-like values in avro are essentially just type hints on top of numeric primitives, so /// it's distinctly possible for them to be out of range. This error is returned in that case. @@ -104,6 +106,16 @@ fn avro_to_json( Boolean(b) => Ok(Value::Bool(b)), Int(i) => Ok(Value::Number(i.into())), Long(l) => Ok(Value::Number(l.into())), + BigDecimal(d) => { + if allow_string_repr { + Ok(Value::String(d.to_string())) + } else { + Err(AvroError::InvalidBigDecimal( + d.to_string(), + column_name.to_string(), + )) + } + } // Floating point numbers require special handling for NAN and +/-inf because those values // can _only_ be represented as strings in json. If the json types happen to allow strings, // then we'll coerce them into strings, but only for the values that aren't representable @@ -184,9 +196,15 @@ fn avro_to_json( TimestampMicros(t) if allow_string_repr => { timestamp_from_unix_epoch("timestamp-micros", column_name, t, MICROS_PER_SEC) } + TimestampNanos(t) if allow_string_repr => { + timestamp_from_unix_epoch("timestamp-nanos", column_name, t, NANOS_PER_SEC) + } LocalTimestampMicros(t) if allow_string_repr => { timestamp_from_unix_epoch("local-timestamp-micros", column_name, t, MICROS_PER_SEC) } + LocalTimestampNanos(t) if allow_string_repr => { + timestamp_from_unix_epoch("local-timestamp-nanos", column_name, t, NANOS_PER_SEC) + } // If !allow_string_repr, then all the date values will be converted directly to json // numbers. This allows users to handle any conversions themselves, by disallowing string // types in their json schema. @@ -195,7 +213,9 @@ fn avro_to_json( | TimestampMicros(i) | LocalTimestampMicros(i) | TimestampMillis(i) - | LocalTimestampMillis(i) => Ok(Value::Number(i.into())), + | LocalTimestampMillis(i) + | TimestampNanos(i) + | LocalTimestampNanos(i) => Ok(Value::Number(i.into())), Duration(avro_dur) => { // avro durations are really weird. We always convert them to json objects, since