From 7650fa2c8c548e5ef5f22843b456b8297a60ee08 Mon Sep 17 00:00:00 2001 From: Callum Ryan <19956159+callum-ryan@users.noreply.github.com> Date: Wed, 5 Mar 2025 19:09:18 +0000 Subject: [PATCH] fix(codecs): `LengthDelimitedEncoder` fix last message framing (#22536) * fix(sink encoding): ensure last/only msg encoded Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> * fix: readme newline Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> * chore: add changelog entry Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> * fix: use CARGO_MANIFEST_DIR for test data path Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --------- Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Co-authored-by: Pavlos Rontidis --- ...ast_message_length_encoding_framing.fix.md | 3 + src/sinks/util/encoding.rs | 135 +++++++++++++++++- tests/data/protobuf/.gitignore | 2 + tests/data/protobuf/Makefile | 12 ++ tests/data/protobuf/README.md | 11 ++ tests/data/protobuf/serialize.py | 16 +++ tests/data/protobuf/test_proto.desc | 9 ++ tests/data/protobuf/test_proto.pb | 2 + tests/data/protobuf/test_proto.proto | 11 ++ 9 files changed, 197 insertions(+), 4 deletions(-) create mode 100644 changelog.d/22536_last_message_length_encoding_framing.fix.md create mode 100644 tests/data/protobuf/.gitignore create mode 100644 tests/data/protobuf/Makefile create mode 100644 tests/data/protobuf/README.md create mode 100644 tests/data/protobuf/serialize.py create mode 100644 tests/data/protobuf/test_proto.desc create mode 100644 tests/data/protobuf/test_proto.pb create mode 100644 tests/data/protobuf/test_proto.proto diff --git a/changelog.d/22536_last_message_length_encoding_framing.fix.md b/changelog.d/22536_last_message_length_encoding_framing.fix.md new file mode 100644 index 0000000000000..7f403cf9dbdd1 --- /dev/null +++ b/changelog.d/22536_last_message_length_encoding_framing.fix.md @@ -0,0 +1,3 @@ +Fix an issue where using the LengthDelimitedEncoder, the last or only message is not correctly framed. + +authors: callum-ryan diff --git a/src/sinks/util/encoding.rs b/src/sinks/util/encoding.rs index 73f5a6c3f8ce5..2958da0f9173e 100644 --- a/src/sinks/util/encoding.rs +++ b/src/sinks/util/encoding.rs @@ -45,8 +45,11 @@ impl Encoder> for (Transformer, crate::codecs::Encoder) { byte_size.add_event(&event, event.estimated_json_encoded_size_of()); let mut bytes = BytesMut::new(); - match position { - Position::Last | Position::Only => { + match (position, encoder.framer()) { + ( + Position::Last | Position::Only, + Framer::CharacterDelimited(_) | Framer::NewlineDelimited(_), + ) => { encoder .serialize(event, &mut bytes) .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?; @@ -144,10 +147,14 @@ where #[cfg(test)] mod tests { use std::collections::BTreeMap; + use std::env; + use std::path::PathBuf; + use bytes::{BufMut, Bytes}; + use vector_lib::codecs::encoding::{ProtobufSerializerConfig, ProtobufSerializerOptions}; use vector_lib::codecs::{ - CharacterDelimitedEncoder, JsonSerializerConfig, NewlineDelimitedEncoder, - TextSerializerConfig, + CharacterDelimitedEncoder, JsonSerializerConfig, LengthDelimitedEncoder, + NewlineDelimitedEncoder, TextSerializerConfig, }; use vector_lib::event::LogEvent; use vector_lib::{internal_event::CountByteSize, json_size::JsonSize}; @@ -375,4 +382,124 @@ mod tests { assert_eq!(String::from_utf8(writer).unwrap(), r"value"); assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap()); } + + fn test_data_dir() -> PathBuf { + PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf") + } + + #[test] + fn test_encode_batch_protobuf_single() { + let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap(); + let input_proto_size = message_raw.len(); + + // default LengthDelimitedCoderOptions.length_field_length is 4 + let mut buf = BytesMut::with_capacity(64); + buf.reserve(4 + input_proto_size); + buf.put_uint(input_proto_size as u64, 4); + buf.extend_from_slice(&message_raw[..]); + let expected_bytes = buf.freeze(); + + let config = ProtobufSerializerConfig { + protobuf: ProtobufSerializerOptions { + desc_file: test_data_dir().join("test_proto.desc"), + message_type: "test_proto.User".to_string(), + }, + }; + + let encoding = ( + Transformer::default(), + crate::codecs::Encoder::::new( + LengthDelimitedEncoder::default().into(), + config.build().unwrap().into(), + ), + ); + + let mut writer = Vec::new(); + let input = vec![Event::Log(LogEvent::from(BTreeMap::from([ + (KeyString::from("id"), Value::from("123")), + (KeyString::from("name"), Value::from("Alice")), + (KeyString::from("age"), Value::from(30)), + ( + KeyString::from("emails"), + Value::from(vec!["alice@example.com", "alice@work.com"]), + ), + ])))]; + + let input_json_size = input + .iter() + .map(|event| event.estimated_json_encoded_size_of()) + .sum::(); + + let (written, size) = encoding.encode_input(input, &mut writer).unwrap(); + + assert_eq!(input_proto_size, 49); + assert_eq!(written, input_proto_size + 4); + assert_eq!(CountByteSize(1, input_json_size), size.size().unwrap()); + assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes); + } + + #[test] + fn test_encode_batch_protobuf_multiple() { + let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap(); + let messages = vec![message_raw.clone(), message_raw.clone()]; + let total_input_proto_size: usize = messages.iter().map(|m| m.len()).sum(); + + let mut buf = BytesMut::with_capacity(128); + for message in messages { + // default LengthDelimitedCoderOptions.length_field_length is 4 + buf.reserve(4 + message.len()); + buf.put_uint(message.len() as u64, 4); + buf.extend_from_slice(&message[..]); + } + let expected_bytes = buf.freeze(); + + let config = ProtobufSerializerConfig { + protobuf: ProtobufSerializerOptions { + desc_file: test_data_dir().join("test_proto.desc"), + message_type: "test_proto.User".to_string(), + }, + }; + + let encoding = ( + Transformer::default(), + crate::codecs::Encoder::::new( + LengthDelimitedEncoder::default().into(), + config.build().unwrap().into(), + ), + ); + + let mut writer = Vec::new(); + let input = vec![ + Event::Log(LogEvent::from(BTreeMap::from([ + (KeyString::from("id"), Value::from("123")), + (KeyString::from("name"), Value::from("Alice")), + (KeyString::from("age"), Value::from(30)), + ( + KeyString::from("emails"), + Value::from(vec!["alice@example.com", "alice@work.com"]), + ), + ]))), + Event::Log(LogEvent::from(BTreeMap::from([ + (KeyString::from("id"), Value::from("123")), + (KeyString::from("name"), Value::from("Alice")), + (KeyString::from("age"), Value::from(30)), + ( + KeyString::from("emails"), + Value::from(vec!["alice@example.com", "alice@work.com"]), + ), + ]))), + ]; + + let input_json_size: JsonSize = input + .iter() + .map(|event| event.estimated_json_encoded_size_of()) + .sum(); + + let (written, size) = encoding.encode_input(input, &mut writer).unwrap(); + + assert_eq!(total_input_proto_size, 49 * 2); + assert_eq!(written, total_input_proto_size + 8); + assert_eq!(CountByteSize(2, input_json_size), size.size().unwrap()); + assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes); + } } diff --git a/tests/data/protobuf/.gitignore b/tests/data/protobuf/.gitignore new file mode 100644 index 0000000000000..57f3947d81c62 --- /dev/null +++ b/tests/data/protobuf/.gitignore @@ -0,0 +1,2 @@ +__pycache__/ +test_proto_pb2.py diff --git a/tests/data/protobuf/Makefile b/tests/data/protobuf/Makefile new file mode 100644 index 0000000000000..281cd85ce570d --- /dev/null +++ b/tests/data/protobuf/Makefile @@ -0,0 +1,12 @@ +generate-desc: + protoc -I=. -o test_proto.desc test_proto.proto + +generate-pb2: + @protoc --python_out=. test_proto.proto + +check-protobuf: + @python3 -c "import google.protobuf" 2>/dev/null || (echo 'protobuf is NOT installed in python3 environment' && exit 1) + +generate-test-payload: check-protobuf generate-pb2 + @python3 serialize.py + diff --git a/tests/data/protobuf/README.md b/tests/data/protobuf/README.md new file mode 100644 index 0000000000000..680cb84dd99e5 --- /dev/null +++ b/tests/data/protobuf/README.md @@ -0,0 +1,11 @@ +## Protobuf files for encoding tests + +These proto files are used in [src/sinks/util/encoding.rs](../../../src/sinks/util/encoding.rs) tests to confirm framing works as intended + +### Regenerate + +There is a Makefile to ease the process of compiling the test binary file. It requires `protobuf` to be installed in a python3 environment. + +* `make generate-test-payload` + * this script will generate the required *_pb2.py and serialise a test message. + diff --git a/tests/data/protobuf/serialize.py b/tests/data/protobuf/serialize.py new file mode 100644 index 0000000000000..e3eed0a24b5e0 --- /dev/null +++ b/tests/data/protobuf/serialize.py @@ -0,0 +1,16 @@ +import test_proto_pb2 + +out_path = "test_proto.pb" + +user1 = test_proto_pb2.User( + id="123", + name="Alice", + age=30, + emails=["alice@example.com", "alice@work.com"] +) + +single_binary_data = user1.SerializeToString() +with open(out_path, "wb") as f: + f.write(single_binary_data) + +print(f"Output: {out_path} size = {len(single_binary_data)} bytes") diff --git a/tests/data/protobuf/test_proto.desc b/tests/data/protobuf/test_proto.desc new file mode 100644 index 0000000000000..3ad4193007cf4 --- /dev/null +++ b/tests/data/protobuf/test_proto.desc @@ -0,0 +1,9 @@ + +| +test_proto.proto +test_proto"T +User +id ( Rid +name ( Rname +age (Rage +emails ( Remailsbproto3 \ No newline at end of file diff --git a/tests/data/protobuf/test_proto.pb b/tests/data/protobuf/test_proto.pb new file mode 100644 index 0000000000000..7fc938b1dd51d --- /dev/null +++ b/tests/data/protobuf/test_proto.pb @@ -0,0 +1,2 @@ + +123Alice"alice@example.com"alice@work.com \ No newline at end of file diff --git a/tests/data/protobuf/test_proto.proto b/tests/data/protobuf/test_proto.proto new file mode 100644 index 0000000000000..597252b095dc0 --- /dev/null +++ b/tests/data/protobuf/test_proto.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; + +package test_proto; + +// Define a User message +message User { + string id = 1; + string name = 2; + int32 age = 3; + repeated string emails = 4; +}