From db8dd494551d4373a66c10c25ed3ab1c2f6efd41 Mon Sep 17 00:00:00 2001 From: DoumanAsh Date: Tue, 13 Aug 2024 08:20:25 +0900 Subject: [PATCH] Serialize record timestamp as fluent EventTime --- src/fluent.rs | 43 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/src/fluent.rs b/src/fluent.rs index b26f1ed..b445507 100644 --- a/src/fluent.rs +++ b/src/fluent.rs @@ -1,6 +1,7 @@ //!Fluentd forward protocol definitions. use serde::ser::{Serialize, Serializer, SerializeTuple, SerializeMap}; +use std::time; use core::fmt; use std::borrow::Cow; @@ -135,10 +136,31 @@ impl fmt::Debug for Value { } } +struct Int8([u8; 8]); + +impl Serialize for Int8 { + #[inline] + fn serialize(&self, serializer: S) -> Result { + serializer.serialize_bytes(&self.0) + } +} + +//rmpv derives extension type of bytes size +struct ExtType((i8, Int8)); + +impl Serialize for ExtType { + #[inline] + fn serialize(&self, serializer: S) -> Result { + use rmp_serde::MSGPACK_EXT_STRUCT_NAME; + + serializer.serialize_newtype_struct(MSGPACK_EXT_STRUCT_NAME, &self.0) + } +} + #[derive(Debug)] ///Representation of fluent entry within `Message` pub struct Record { - time: u64, + time: time::Duration, entries: Map, } @@ -146,8 +168,8 @@ impl Record { #[inline(always)] ///Creates record with current timestamp pub fn now() -> Self { - let time = match std::time::SystemTime::now().duration_since(std::time::SystemTime::UNIX_EPOCH) { - Ok(time) => time.as_secs(), + let time = match time::SystemTime::now().duration_since(time::SystemTime::UNIX_EPOCH) { + Ok(time) => time, Err(_) => panic!("SystemTime is before UNIX!?"), }; @@ -286,7 +308,20 @@ impl Serialize for Record { #[inline] fn serialize(&self, ser: SER) -> Result { let mut seq = ser.serialize_tuple(2)?; - seq.serialize_element(&self.time)?; + + //seq.serialize_element(&self.time.as_secs())?; + // + //Serialize time as EventTime ext + //https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#eventtime-ext-format + //This is valid up to year 2106 + let seconds = self.time.as_secs() as u32; + let nanos = self.time.subsec_nanos(); + let seconds = seconds.to_be_bytes(); + let nanos = nanos.to_be_bytes(); + let time = [seconds[0], seconds[1], seconds[2], seconds[3], nanos[0], nanos[1], nanos[2], nanos[3]]; + let time = ExtType((0, Int8(time))); + seq.serialize_element(&time)?; + seq.serialize_element(&self.entries)?; seq.end() }