Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into feature/postgres-sink
Browse files Browse the repository at this point in the history
  • Loading branch information
pront committed Mar 6, 2025
2 parents 8a20469 + 07f7562 commit a251125
Show file tree
Hide file tree
Showing 14 changed files with 212 additions and 28 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/cross.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ jobs:
${{ runner.os }}-cargo-
- run: echo "::add-matcher::.github/matchers/rust.json"
- run: 'cargo install cross --version 0.2.4 --force --locked'
- run: |
rustup target add x86_64-unknown-linux-gnu
cargo install cross --version 0.2.4 --force --locked
# Why is this build, not check? Because we need to make sure the linking phase works.
# aarch64 and musl in particular are notoriously hard to link.
# While it may be tempting to slot a `check` in here for quickness, please don't.
Expand Down
26 changes: 8 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ tokio = { version = "1.43.0", default-features = false, features = ["full"] }
toml = { version = "0.8.20", default-features = false, features = ["display", "parse"] }
tonic = { version = "0.11", default-features = false, features = ["transport", "codegen", "prost", "tls", "tls-roots", "gzip"] }
tonic-build = { version = "0.11", default-features = false, features = ["transport", "prost"] }
uuid = { version = "1.12.0", features = ["v4", "v7", "serde"] }
uuid = { version = "1.15.1", features = ["v4", "v7", "serde"] }
vector-lib = { path = "lib/vector-lib", default-features = false, features = ["vrl"] }
vector-config = { path = "lib/vector-config" }
vector-config-common = { path = "lib/vector-config-common" }
Expand Down
4 changes: 1 addition & 3 deletions PRIVACY.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ checks, capturing diagnostic information, and sharing crash reports.
## Vector Website & Docs

The Vector website does collect various analytics. Aggregated analytics data is
derived from backend server logs which are anonymized. Vector uses
[Netlify analytics][netlify_analytics] for this.
derived from backend server logs which are anonymized.

## Vector Community

Expand All @@ -54,5 +53,4 @@ privacy policy [here][discord_pp].

[github_pp]: https://help.github.com/en/github/site-policy/github-privacy-statement
[discord_pp]: https://discord.com/privacy/
[netlify_analytics]: https://www.netlify.com/products/analytics/
[vero_pp]: https://www.getvero.com/privacy/
3 changes: 3 additions & 0 deletions changelog.d/22536_last_message_length_encoding_framing.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fix an issue where using the LengthDelimitedEncoder, the last or only message is not correctly framed.

authors: callum-ryan
3 changes: 2 additions & 1 deletion lib/vector-vrl/web-playground/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ crate-type = ["cdylib"]
wasm-bindgen = "0.2"
vrl.workspace = true
serde.workspace = true
serde-wasm-bindgen = "0.6"
gloo-utils = { version = "0.2", features = ["serde"] }
vector-vrl-functions = { path = "../functions" }
enrichment = { path = "../../enrichment" }
# Required per https://docs.rs/getrandom/latest/getrandom/#webassembly-support
getrandom = { version = "0.2.15", features = ["js"] }

[build-dependencies]
cargo-lock = "10.1.0"
135 changes: 131 additions & 4 deletions src/sinks/util/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ impl Encoder<Vec<Event>> for (Transformer, crate::codecs::Encoder<Framer>) {
byte_size.add_event(&event, event.estimated_json_encoded_size_of());

let mut bytes = BytesMut::new();
match position {
Position::Last | Position::Only => {
match (position, encoder.framer()) {
(
Position::Last | Position::Only,
Framer::CharacterDelimited(_) | Framer::NewlineDelimited(_),
) => {
encoder
.serialize(event, &mut bytes)
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
Expand Down Expand Up @@ -144,10 +147,14 @@ where
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::env;
use std::path::PathBuf;

use bytes::{BufMut, Bytes};
use vector_lib::codecs::encoding::{ProtobufSerializerConfig, ProtobufSerializerOptions};
use vector_lib::codecs::{
CharacterDelimitedEncoder, JsonSerializerConfig, NewlineDelimitedEncoder,
TextSerializerConfig,
CharacterDelimitedEncoder, JsonSerializerConfig, LengthDelimitedEncoder,
NewlineDelimitedEncoder, TextSerializerConfig,
};
use vector_lib::event::LogEvent;
use vector_lib::{internal_event::CountByteSize, json_size::JsonSize};
Expand Down Expand Up @@ -375,4 +382,124 @@ mod tests {
assert_eq!(String::from_utf8(writer).unwrap(), r"value");
assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
}

fn test_data_dir() -> PathBuf {
PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
}

#[test]
fn test_encode_batch_protobuf_single() {
let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
let input_proto_size = message_raw.len();

// default LengthDelimitedCoderOptions.length_field_length is 4
let mut buf = BytesMut::with_capacity(64);
buf.reserve(4 + input_proto_size);
buf.put_uint(input_proto_size as u64, 4);
buf.extend_from_slice(&message_raw[..]);
let expected_bytes = buf.freeze();

let config = ProtobufSerializerConfig {
protobuf: ProtobufSerializerOptions {
desc_file: test_data_dir().join("test_proto.desc"),
message_type: "test_proto.User".to_string(),
},
};

let encoding = (
Transformer::default(),
crate::codecs::Encoder::<Framer>::new(
LengthDelimitedEncoder::default().into(),
config.build().unwrap().into(),
),
);

let mut writer = Vec::new();
let input = vec![Event::Log(LogEvent::from(BTreeMap::from([
(KeyString::from("id"), Value::from("123")),
(KeyString::from("name"), Value::from("Alice")),
(KeyString::from("age"), Value::from(30)),
(
KeyString::from("emails"),
Value::from(vec!["alice@example.com", "alice@work.com"]),
),
])))];

let input_json_size = input
.iter()
.map(|event| event.estimated_json_encoded_size_of())
.sum::<JsonSize>();

let (written, size) = encoding.encode_input(input, &mut writer).unwrap();

assert_eq!(input_proto_size, 49);
assert_eq!(written, input_proto_size + 4);
assert_eq!(CountByteSize(1, input_json_size), size.size().unwrap());
assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
}

#[test]
fn test_encode_batch_protobuf_multiple() {
let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
let messages = vec![message_raw.clone(), message_raw.clone()];
let total_input_proto_size: usize = messages.iter().map(|m| m.len()).sum();

let mut buf = BytesMut::with_capacity(128);
for message in messages {
// default LengthDelimitedCoderOptions.length_field_length is 4
buf.reserve(4 + message.len());
buf.put_uint(message.len() as u64, 4);
buf.extend_from_slice(&message[..]);
}
let expected_bytes = buf.freeze();

let config = ProtobufSerializerConfig {
protobuf: ProtobufSerializerOptions {
desc_file: test_data_dir().join("test_proto.desc"),
message_type: "test_proto.User".to_string(),
},
};

let encoding = (
Transformer::default(),
crate::codecs::Encoder::<Framer>::new(
LengthDelimitedEncoder::default().into(),
config.build().unwrap().into(),
),
);

let mut writer = Vec::new();
let input = vec![
Event::Log(LogEvent::from(BTreeMap::from([
(KeyString::from("id"), Value::from("123")),
(KeyString::from("name"), Value::from("Alice")),
(KeyString::from("age"), Value::from(30)),
(
KeyString::from("emails"),
Value::from(vec!["alice@example.com", "alice@work.com"]),
),
]))),
Event::Log(LogEvent::from(BTreeMap::from([
(KeyString::from("id"), Value::from("123")),
(KeyString::from("name"), Value::from("Alice")),
(KeyString::from("age"), Value::from(30)),
(
KeyString::from("emails"),
Value::from(vec!["alice@example.com", "alice@work.com"]),
),
]))),
];

let input_json_size: JsonSize = input
.iter()
.map(|event| event.estimated_json_encoded_size_of())
.sum();

let (written, size) = encoding.encode_input(input, &mut writer).unwrap();

assert_eq!(total_input_proto_size, 49 * 2);
assert_eq!(written, total_input_proto_size + 8);
assert_eq!(CountByteSize(2, input_json_size), size.size().unwrap());
assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
}
}
2 changes: 2 additions & 0 deletions tests/data/protobuf/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__pycache__/
test_proto_pb2.py
12 changes: 12 additions & 0 deletions tests/data/protobuf/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
generate-desc:
protoc -I=. -o test_proto.desc test_proto.proto

generate-pb2:
@protoc --python_out=. test_proto.proto

check-protobuf:
@python3 -c "import google.protobuf" 2>/dev/null || (echo 'protobuf is NOT installed in python3 environment' && exit 1)

generate-test-payload: check-protobuf generate-pb2
@python3 serialize.py

11 changes: 11 additions & 0 deletions tests/data/protobuf/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
## Protobuf files for encoding tests

These proto files are used in [src/sinks/util/encoding.rs](../../../src/sinks/util/encoding.rs) tests to confirm framing works as intended

### Regenerate

There is a Makefile to ease the process of compiling the test binary file. It requires `protobuf` to be installed in a python3 environment.

* `make generate-test-payload`
* this script will generate the required *_pb2.py and serialise a test message.

16 changes: 16 additions & 0 deletions tests/data/protobuf/serialize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import test_proto_pb2

out_path = "test_proto.pb"

user1 = test_proto_pb2.User(
id="123",
name="Alice",
age=30,
emails=["alice@example.com", "alice@work.com"]
)

single_binary_data = user1.SerializeToString()
with open(out_path, "wb") as f:
f.write(single_binary_data)

print(f"Output: {out_path} size = {len(single_binary_data)} bytes")
9 changes: 9 additions & 0 deletions tests/data/protobuf/test_proto.desc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

|
test_proto.proto
test_proto"T
User
id ( Rid
name ( Rname
age (Rage
emails ( Remailsbproto3
2 changes: 2 additions & 0 deletions tests/data/protobuf/test_proto.pb
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

123Alice"alice@example.com"alice@work.com
11 changes: 11 additions & 0 deletions tests/data/protobuf/test_proto.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
syntax = "proto3";

package test_proto;

// Define a User message
message User {
string id = 1;
string name = 2;
int32 age = 3;
repeated string emails = 4;
}

0 comments on commit a251125

Please sign in to comment.