Skip to content

Commit

Permalink
Serialize record timestamp as fluent EventTime
Browse files Browse the repository at this point in the history
  • Loading branch information
DoumanAsh committed Aug 13, 2024
1 parent 3388dbd commit db8dd49
Showing 1 changed file with 39 additions and 4 deletions.
43 changes: 39 additions & 4 deletions src/fluent.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//!Fluentd forward protocol definitions.
use serde::ser::{Serialize, Serializer, SerializeTuple, SerializeMap};

use std::time;
use core::fmt;
use std::borrow::Cow;

Expand Down Expand Up @@ -135,19 +136,40 @@ impl fmt::Debug for Value {
}
}

struct Int8([u8; 8]);

impl Serialize for Int8 {
#[inline]
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_bytes(&self.0)
}
}

//rmpv derives extension type of bytes size
struct ExtType((i8, Int8));

impl Serialize for ExtType {
#[inline]
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
use rmp_serde::MSGPACK_EXT_STRUCT_NAME;

serializer.serialize_newtype_struct(MSGPACK_EXT_STRUCT_NAME, &self.0)
}
}

#[derive(Debug)]
///Representation of fluent entry within `Message`
pub struct Record {
time: u64,
time: time::Duration,
entries: Map,
}

impl Record {
#[inline(always)]
///Creates record with current timestamp
pub fn now() -> Self {
let time = match std::time::SystemTime::now().duration_since(std::time::SystemTime::UNIX_EPOCH) {
Ok(time) => time.as_secs(),
let time = match time::SystemTime::now().duration_since(time::SystemTime::UNIX_EPOCH) {
Ok(time) => time,
Err(_) => panic!("SystemTime is before UNIX!?"),
};

Expand Down Expand Up @@ -286,7 +308,20 @@ impl Serialize for Record {
#[inline]
fn serialize<SER: Serializer>(&self, ser: SER) -> Result<SER::Ok, SER::Error> {
let mut seq = ser.serialize_tuple(2)?;
seq.serialize_element(&self.time)?;

//seq.serialize_element(&self.time.as_secs())?;
//
//Serialize time as EventTime ext
//https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#eventtime-ext-format
//This is valid up to year 2106
let seconds = self.time.as_secs() as u32;
let nanos = self.time.subsec_nanos();
let seconds = seconds.to_be_bytes();
let nanos = nanos.to_be_bytes();
let time = [seconds[0], seconds[1], seconds[2], seconds[3], nanos[0], nanos[1], nanos[2], nanos[3]];
let time = ExtType((0, Int8(time)));
seq.serialize_element(&time)?;

seq.serialize_element(&self.entries)?;
seq.end()
}
Expand Down

0 comments on commit db8dd49

Please sign in to comment.