Skip to content

Commit

Permalink
Serialize record timestamp as msgpack Timestamp (#6)
Browse files Browse the repository at this point in the history
* Serialize record timestamp as fluent EventTime

* Use f64 to represent time

* Revert "Use f64 to represent time"

Float parser is disgusting in fluentd

This reverts commit ca8c12a.

* Add event_time feature
  • Loading branch information
DoumanAsh authored Aug 14, 2024
1 parent 3388dbd commit d0fb7af
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 40 deletions.
50 changes: 16 additions & 34 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,23 @@ name: Rust

on:
push:
paths:
- '.github/workflows/rust.yml'
- 'src/**.rs'
- '**.toml'
paths-ignore:
- 'README.md'
- 'LICENSE'
- '.gitignore'
branches:
- master
- 'master'
pull_request:
paths:
- '.github/workflows/rust.yml'
- 'src/**.rs'
- '**.toml'
branches:
- '**'
types: [opened, synchronize, reopened, ready_for_review]
paths-ignore:
- 'README.md'
- 'LICENSE'
- '.gitignore'

jobs:
build:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v1

- name: Install Rust Unix
if: runner.os != 'Windows'
run: |
if rustup --version >/dev/null 2>&1; then
rustup update
else
curl https://sh.rustup.rs -sSf | sh -s -- -y --profile minimal --default-toolchain stable
echo ::add-path::$HOME/.cargo/bin
fi
- name: Rust version
run: |
cargo --version
rustc --version
- name: Test
run: cargo test
check:
if: github.event.pull_request.draft == false
uses: DoumanAsh/douman-ci/.github/workflows/rust.yml@master
with:
cargo-no-features: true
cargo-features: event_time
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tracing-fluentd"
version = "0.4.0"
version = "0.4.1"
authors = ["Douman <douman@gmx.se>"]
edition = "2018"
description = "Enables forwarding of `tracing` events towards the `fluentd` server."
Expand Down Expand Up @@ -48,3 +48,7 @@ features = ["with-serde"]
version = "0.3.8"
default-features = false
features = ["registry", "fmt"]

[features]
# Specifies to encode timestamp as EventTime instead of default unix timestamp
event_time = []
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

Enables forwarding of `tracing` events towards the `fluentd` server.

Version corresponds to `tracing-subscriber` version.
## Features

- `event_time` - Specifies to encode timestamp as EventTime instead of default unix timestamp

## Example

Expand Down
50 changes: 46 additions & 4 deletions src/fluent.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//!Fluentd forward protocol definitions.
use serde::ser::{Serialize, Serializer, SerializeTuple, SerializeMap};

use std::time;
use core::fmt;
use std::borrow::Cow;

Expand Down Expand Up @@ -138,16 +139,16 @@ impl fmt::Debug for Value {
#[derive(Debug)]
///Representation of fluent entry within `Message`
pub struct Record {
time: u64,
time: time::Duration,
entries: Map,
}

impl Record {
#[inline(always)]
///Creates record with current timestamp
pub fn now() -> Self {
let time = match std::time::SystemTime::now().duration_since(std::time::SystemTime::UNIX_EPOCH) {
Ok(time) => time.as_secs(),
let time = match time::SystemTime::now().duration_since(time::SystemTime::UNIX_EPOCH) {
Ok(time) => time,
Err(_) => panic!("SystemTime is before UNIX!?"),
};

Expand Down Expand Up @@ -286,7 +287,48 @@ impl Serialize for Record {
#[inline]
fn serialize<SER: Serializer>(&self, ser: SER) -> Result<SER::Ok, SER::Error> {
let mut seq = ser.serialize_tuple(2)?;
seq.serialize_element(&self.time)?;

let seconds = self.time.as_secs();
#[cfg(feature = "event_time")]
{
struct Int8([u8; 8]);

impl Serialize for Int8 {
#[inline]
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_bytes(&self.0)
}
}

//rmpv derives extension type of bytes size
struct ExtType((i8, Int8));

impl Serialize for ExtType {
#[inline]
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
use rmp_serde::MSGPACK_EXT_STRUCT_NAME;

serializer.serialize_newtype_struct(MSGPACK_EXT_STRUCT_NAME, &self.0)
}
}

//seq.serialize_element(&self.time.as_secs())?;
//
//Serialize time as EventTime ext
//https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#eventtime-ext-format
//This is valid up to year 2106
let nanos = self.time.subsec_nanos();
let seconds = (seconds as u32).to_be_bytes();
let nanos = nanos.to_be_bytes();
let time = [seconds[0], seconds[1], seconds[2], seconds[3], nanos[0], nanos[1], nanos[2], nanos[3]];
let time = ExtType((0, Int8(time)));
seq.serialize_element(&time)?;
}
#[cfg(not(feature = "event_time"))]
{
seq.serialize_element(&seconds)?;
}

seq.serialize_element(&self.entries)?;
seq.end()
}
Expand Down
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
//! [tracing](https://github.com/tokio-rs/tracing) for [fluentd](https://www.fluentd.org/).
//!
//!## Features
//!
//!- `event_time` - Specifies to encode timestamp as EventTime instead of default unix timestamp
//!
//!## Example
//!
//!```rust
Expand Down

0 comments on commit d0fb7af

Please sign in to comment.