Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize record timestamp as msgpack Timestamp #6

Merged
merged 4 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 16 additions & 34 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,23 @@ name: Rust

on:
push:
paths:
- '.github/workflows/rust.yml'
- 'src/**.rs'
- '**.toml'
paths-ignore:
- 'README.md'
- 'LICENSE'
- '.gitignore'
branches:
- master
- 'master'
pull_request:
paths:
- '.github/workflows/rust.yml'
- 'src/**.rs'
- '**.toml'
branches:
- '**'
types: [opened, synchronize, reopened, ready_for_review]
paths-ignore:
- 'README.md'
- 'LICENSE'
- '.gitignore'

jobs:
build:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v1

- name: Install Rust Unix
if: runner.os != 'Windows'
run: |
if rustup --version >/dev/null 2>&1; then
rustup update
else
curl https://sh.rustup.rs -sSf | sh -s -- -y --profile minimal --default-toolchain stable
echo ::add-path::$HOME/.cargo/bin
fi

- name: Rust version
run: |
cargo --version
rustc --version

- name: Test
run: cargo test
check:
if: github.event.pull_request.draft == false
uses: DoumanAsh/douman-ci/.github/workflows/rust.yml@master
with:
cargo-no-features: true
cargo-features: event_time
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tracing-fluentd"
version = "0.4.0"
version = "0.4.1"
authors = ["Douman <douman@gmx.se>"]
edition = "2018"
description = "Enables forwarding of `tracing` events towards the `fluentd` server."
Expand Down Expand Up @@ -48,3 +48,7 @@ features = ["with-serde"]
version = "0.3.8"
default-features = false
features = ["registry", "fmt"]

[features]
# Specifies to encode timestamp as EventTime instead of default unix timestamp
event_time = []
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

Enables forwarding of `tracing` events towards the `fluentd` server.

Version corresponds to `tracing-subscriber` version.
## Features

- `event_time` - Specifies to encode timestamp as EventTime instead of default unix timestamp

## Example

Expand Down
50 changes: 46 additions & 4 deletions src/fluent.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//!Fluentd forward protocol definitions.
use serde::ser::{Serialize, Serializer, SerializeTuple, SerializeMap};

use std::time;
use core::fmt;
use std::borrow::Cow;

Expand Down Expand Up @@ -138,16 +139,16 @@ impl fmt::Debug for Value {
#[derive(Debug)]
///Representation of fluent entry within `Message`
pub struct Record {
time: u64,
time: time::Duration,
entries: Map,
}

impl Record {
#[inline(always)]
///Creates record with current timestamp
pub fn now() -> Self {
let time = match std::time::SystemTime::now().duration_since(std::time::SystemTime::UNIX_EPOCH) {
Ok(time) => time.as_secs(),
let time = match time::SystemTime::now().duration_since(time::SystemTime::UNIX_EPOCH) {
Ok(time) => time,
Err(_) => panic!("SystemTime is before UNIX!?"),
};

Expand Down Expand Up @@ -286,7 +287,48 @@ impl Serialize for Record {
#[inline]
fn serialize<SER: Serializer>(&self, ser: SER) -> Result<SER::Ok, SER::Error> {
let mut seq = ser.serialize_tuple(2)?;
seq.serialize_element(&self.time)?;

let seconds = self.time.as_secs();
#[cfg(feature = "event_time")]
{
struct Int8([u8; 8]);

impl Serialize for Int8 {
#[inline]
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_bytes(&self.0)
}
}

//rmpv derives extension type of bytes size
struct ExtType((i8, Int8));

impl Serialize for ExtType {
#[inline]
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
use rmp_serde::MSGPACK_EXT_STRUCT_NAME;

serializer.serialize_newtype_struct(MSGPACK_EXT_STRUCT_NAME, &self.0)
}
}

//seq.serialize_element(&self.time.as_secs())?;
//
//Serialize time as EventTime ext
//https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#eventtime-ext-format
//This is valid up to year 2106
let nanos = self.time.subsec_nanos();
let seconds = (seconds as u32).to_be_bytes();
let nanos = nanos.to_be_bytes();
let time = [seconds[0], seconds[1], seconds[2], seconds[3], nanos[0], nanos[1], nanos[2], nanos[3]];
let time = ExtType((0, Int8(time)));
seq.serialize_element(&time)?;
}
#[cfg(not(feature = "event_time"))]
{
seq.serialize_element(&seconds)?;
}

seq.serialize_element(&self.entries)?;
seq.end()
}
Expand Down
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
//! [tracing](https://github.com/tokio-rs/tracing) for [fluentd](https://www.fluentd.org/).
//!
//!## Features
//!
//!- `event_time` - Specifies to encode timestamp as EventTime instead of default unix timestamp
//!
//!## Example
//!
//!```rust
Expand Down
Loading