Skip to content

Commit

Permalink
fix(codecs): LengthDelimitedEncoder fix last message framing (#22536)
Browse files Browse the repository at this point in the history
* fix(sink encoding): ensure last/only msg encoded

Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com>

* fix: readme newline

Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com>

* chore: add changelog entry

Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com>

* fix: use CARGO_MANIFEST_DIR for test data path

Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com>

---------

Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com>
Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com>
  • Loading branch information
callum-ryan and pront authored Mar 5, 2025
1 parent cba85ca commit 7650fa2
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 4 deletions.
3 changes: 3 additions & 0 deletions changelog.d/22536_last_message_length_encoding_framing.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fix an issue where using the LengthDelimitedEncoder, the last or only message is not correctly framed.

authors: callum-ryan
135 changes: 131 additions & 4 deletions src/sinks/util/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ impl Encoder<Vec<Event>> for (Transformer, crate::codecs::Encoder<Framer>) {
byte_size.add_event(&event, event.estimated_json_encoded_size_of());

let mut bytes = BytesMut::new();
match position {
Position::Last | Position::Only => {
match (position, encoder.framer()) {
(
Position::Last | Position::Only,
Framer::CharacterDelimited(_) | Framer::NewlineDelimited(_),
) => {
encoder
.serialize(event, &mut bytes)
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
Expand Down Expand Up @@ -144,10 +147,14 @@ where
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::env;
use std::path::PathBuf;

use bytes::{BufMut, Bytes};
use vector_lib::codecs::encoding::{ProtobufSerializerConfig, ProtobufSerializerOptions};
use vector_lib::codecs::{
CharacterDelimitedEncoder, JsonSerializerConfig, NewlineDelimitedEncoder,
TextSerializerConfig,
CharacterDelimitedEncoder, JsonSerializerConfig, LengthDelimitedEncoder,
NewlineDelimitedEncoder, TextSerializerConfig,
};
use vector_lib::event::LogEvent;
use vector_lib::{internal_event::CountByteSize, json_size::JsonSize};
Expand Down Expand Up @@ -375,4 +382,124 @@ mod tests {
assert_eq!(String::from_utf8(writer).unwrap(), r"value");
assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
}

fn test_data_dir() -> PathBuf {
PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
}

#[test]
fn test_encode_batch_protobuf_single() {
let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
let input_proto_size = message_raw.len();

// default LengthDelimitedCoderOptions.length_field_length is 4
let mut buf = BytesMut::with_capacity(64);
buf.reserve(4 + input_proto_size);
buf.put_uint(input_proto_size as u64, 4);
buf.extend_from_slice(&message_raw[..]);
let expected_bytes = buf.freeze();

let config = ProtobufSerializerConfig {
protobuf: ProtobufSerializerOptions {
desc_file: test_data_dir().join("test_proto.desc"),
message_type: "test_proto.User".to_string(),
},
};

let encoding = (
Transformer::default(),
crate::codecs::Encoder::<Framer>::new(
LengthDelimitedEncoder::default().into(),
config.build().unwrap().into(),
),
);

let mut writer = Vec::new();
let input = vec![Event::Log(LogEvent::from(BTreeMap::from([
(KeyString::from("id"), Value::from("123")),
(KeyString::from("name"), Value::from("Alice")),
(KeyString::from("age"), Value::from(30)),
(
KeyString::from("emails"),
Value::from(vec!["alice@example.com", "alice@work.com"]),
),
])))];

let input_json_size = input
.iter()
.map(|event| event.estimated_json_encoded_size_of())
.sum::<JsonSize>();

let (written, size) = encoding.encode_input(input, &mut writer).unwrap();

assert_eq!(input_proto_size, 49);
assert_eq!(written, input_proto_size + 4);
assert_eq!(CountByteSize(1, input_json_size), size.size().unwrap());
assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
}

#[test]
fn test_encode_batch_protobuf_multiple() {
let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
let messages = vec![message_raw.clone(), message_raw.clone()];
let total_input_proto_size: usize = messages.iter().map(|m| m.len()).sum();

let mut buf = BytesMut::with_capacity(128);
for message in messages {
// default LengthDelimitedCoderOptions.length_field_length is 4
buf.reserve(4 + message.len());
buf.put_uint(message.len() as u64, 4);
buf.extend_from_slice(&message[..]);
}
let expected_bytes = buf.freeze();

let config = ProtobufSerializerConfig {
protobuf: ProtobufSerializerOptions {
desc_file: test_data_dir().join("test_proto.desc"),
message_type: "test_proto.User".to_string(),
},
};

let encoding = (
Transformer::default(),
crate::codecs::Encoder::<Framer>::new(
LengthDelimitedEncoder::default().into(),
config.build().unwrap().into(),
),
);

let mut writer = Vec::new();
let input = vec![
Event::Log(LogEvent::from(BTreeMap::from([
(KeyString::from("id"), Value::from("123")),
(KeyString::from("name"), Value::from("Alice")),
(KeyString::from("age"), Value::from(30)),
(
KeyString::from("emails"),
Value::from(vec!["alice@example.com", "alice@work.com"]),
),
]))),
Event::Log(LogEvent::from(BTreeMap::from([
(KeyString::from("id"), Value::from("123")),
(KeyString::from("name"), Value::from("Alice")),
(KeyString::from("age"), Value::from(30)),
(
KeyString::from("emails"),
Value::from(vec!["alice@example.com", "alice@work.com"]),
),
]))),
];

let input_json_size: JsonSize = input
.iter()
.map(|event| event.estimated_json_encoded_size_of())
.sum();

let (written, size) = encoding.encode_input(input, &mut writer).unwrap();

assert_eq!(total_input_proto_size, 49 * 2);
assert_eq!(written, total_input_proto_size + 8);
assert_eq!(CountByteSize(2, input_json_size), size.size().unwrap());
assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
}
}
2 changes: 2 additions & 0 deletions tests/data/protobuf/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__pycache__/
test_proto_pb2.py
12 changes: 12 additions & 0 deletions tests/data/protobuf/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
generate-desc:
protoc -I=. -o test_proto.desc test_proto.proto

generate-pb2:
@protoc --python_out=. test_proto.proto

check-protobuf:
@python3 -c "import google.protobuf" 2>/dev/null || (echo 'protobuf is NOT installed in python3 environment' && exit 1)

generate-test-payload: check-protobuf generate-pb2
@python3 serialize.py

11 changes: 11 additions & 0 deletions tests/data/protobuf/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
## Protobuf files for encoding tests

These proto files are used in [src/sinks/util/encoding.rs](../../../src/sinks/util/encoding.rs) tests to confirm framing works as intended

### Regenerate

There is a Makefile to ease the process of compiling the test binary file. It requires `protobuf` to be installed in a python3 environment.

* `make generate-test-payload`
* this script will generate the required *_pb2.py and serialise a test message.

16 changes: 16 additions & 0 deletions tests/data/protobuf/serialize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import test_proto_pb2

out_path = "test_proto.pb"

user1 = test_proto_pb2.User(
id="123",
name="Alice",
age=30,
emails=["alice@example.com", "alice@work.com"]
)

single_binary_data = user1.SerializeToString()
with open(out_path, "wb") as f:
f.write(single_binary_data)

print(f"Output: {out_path} size = {len(single_binary_data)} bytes")
9 changes: 9 additions & 0 deletions tests/data/protobuf/test_proto.desc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

|
test_proto.proto
test_proto"T
User
id ( Rid
name ( Rname
age (Rage
emails ( Remailsbproto3
2 changes: 2 additions & 0 deletions tests/data/protobuf/test_proto.pb
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

123Alice"alice@example.com"alice@work.com
11 changes: 11 additions & 0 deletions tests/data/protobuf/test_proto.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
syntax = "proto3";

package test_proto;

// Define a User message
message User {
string id = 1;
string name = 2;
int32 age = 3;
repeated string emails = 4;
}

0 comments on commit 7650fa2

Please sign in to comment.