Skip to content

Commit

Permalink
feat(chronicle-sink): add support for compression to Google Chronicle…
Browse files Browse the repository at this point in the history
… sink (vectordotdev#21272)

* feat(chronicle): add support for compression to Google Chronicle sink

* docs(chronicle-compression): add changelog documentation

* feat(chronicle_compression): add configurable gzip compression level

* docs(chronicle_compression): update documentation for CHronicle compression with configurable compression levels

* docs(chronicle-compression): implement configurable schema generation for the compression level config

* docs(chronicle_compression): write custom configuration implementation for Chronicle Compression

* style(chronicle_compression): tidy up chronicle sink code

* fix(chronicle_compression): fix compression configuration

* refactor(chronicle-compression): remove some of the duplication ofthe chronicle compressiontype

re

* style(chronicle-compression): fix formatting error

* Update changelog.d/chronicle_compression_support.enhancement.md

---------

Co-authored-by: Matt Searle <matt.searle@10xbanking.com>
Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com>
  • Loading branch information
3 people authored Oct 24, 2024
1 parent 4d1f3fd commit 17466c6
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 21 deletions.
3 changes: 3 additions & 0 deletions changelog.d/chronicle_compression_support.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add Gzip compression support to the`gcp_chronicle_unstructured` sink. See official documentation [here](https://cloud.google.com/chronicle/docs/reference/ingestion-api#frequently_asked_questions).

authors: chocpanda
17 changes: 15 additions & 2 deletions src/sinks/gcp_chronicle/chronicle_unstructured.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! See <https://cloud.google.com/chronicle/docs/reference/ingestion-api#unstructuredlogentries>
//! for more information.
use bytes::{Bytes, BytesMut};

use futures_util::{future::BoxFuture, task::Poll};
use goauth::scopes::Scope;
use http::{header::HeaderValue, Request, StatusCode, Uri};
Expand Down Expand Up @@ -33,6 +34,7 @@ use crate::{
schema,
sinks::{
gcp_chronicle::{
compression::ChronicleCompression,
partitioner::{ChroniclePartitionKey, ChroniclePartitioner},
sink::ChronicleSink,
},
Expand Down Expand Up @@ -108,6 +110,7 @@ pub struct ChronicleUnstructuredTowerRequestConfigDefaults;
impl TowerRequestConfigDefaults for ChronicleUnstructuredTowerRequestConfigDefaults {
const RATE_LIMIT_NUM: u64 = 1_000;
}

/// Configuration for the `gcp_chronicle_unstructured` sink.
#[configurable_component(sink(
"gcp_chronicle_unstructured",
Expand Down Expand Up @@ -155,6 +158,10 @@ pub struct ChronicleUnstructuredConfig {
#[configurable(derived)]
pub encoding: EncodingConfig,

#[serde(default)]
#[configurable(derived)]
pub compression: ChronicleCompression,

#[configurable(derived)]
#[serde(default)]
pub request: TowerRequestConfig<ChronicleUnstructuredTowerRequestConfigDefaults>,
Expand Down Expand Up @@ -193,6 +200,7 @@ impl GenerateConfig for ChronicleUnstructuredConfig {
credentials_path = "/path/to/credentials.json"
customer_id = "customer_id"
namespace = "namespace"
compression = "gzip"
log_type = "log_type"
encoding.codec = "text"
"#})
Expand Down Expand Up @@ -411,6 +419,7 @@ impl Encoder<(ChroniclePartitionKey, Vec<Event>)> for ChronicleEncoder {
#[derive(Clone, Debug)]
struct ChronicleRequestBuilder {
encoder: ChronicleEncoder,
compression: Compression,
}

struct ChronicleRequestPayload {
Expand Down Expand Up @@ -438,7 +447,7 @@ impl RequestBuilder<(ChroniclePartitionKey, Vec<Event>)> for ChronicleRequestBui
type Error = io::Error;

fn compression(&self) -> Compression {
Compression::None
self.compression
}

fn encoder(&self) -> &Self::Encoder {
Expand Down Expand Up @@ -480,6 +489,7 @@ impl ChronicleRequestBuilder {
fn new(config: &ChronicleUnstructuredConfig) -> crate::Result<Self> {
let transformer = config.encoding.transformer();
let serializer = config.encoding.config().build()?;
let compression = Compression::from(config.compression);
let encoder = crate::codecs::Encoder::<()>::new(serializer);
let encoder = ChronicleEncoder {
customer_id: config.customer_id.clone(),
Expand All @@ -494,7 +504,10 @@ impl ChronicleRequestBuilder {
encoder,
transformer,
};
Ok(Self { encoder })
Ok(Self {
encoder,
compression,
})
}
}

Expand Down
131 changes: 131 additions & 0 deletions src/sinks/gcp_chronicle/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use serde::{de, ser};
use serde_json::Value;
use std::{cell::RefCell, collections::BTreeSet};
use vector_lib::configurable::ToValue;

use indexmap::IndexMap;
use vector_lib::configurable::attributes::CustomAttribute;
use vector_lib::configurable::{
schema::{
apply_base_metadata, generate_one_of_schema, generate_struct_schema,
get_or_generate_schema, SchemaGenerator, SchemaObject,
},
Configurable, GenerateError, Metadata,
};

use crate::sinks::util::buffer::compression::{
generate_string_schema, CompressionLevel, ALGORITHM_NAME, ENUM_TAGGING_MODE, LEVEL_NAME,
};
use crate::sinks::util::Compression;

/// Compression configuration.
#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
#[derivative(Default)]
pub enum ChronicleCompression {
/// No compression.
#[derivative(Default)]
None,

/// [Gzip][gzip] compression.
///
/// [gzip]: https://www.gzip.org/
Gzip(CompressionLevel),
}

impl From<ChronicleCompression> for Compression {
fn from(compression: ChronicleCompression) -> Self {
match compression {
ChronicleCompression::None => Compression::None,
ChronicleCompression::Gzip(compression_level) => Compression::Gzip(compression_level),
}
}
}

impl TryFrom<Compression> for ChronicleCompression {
type Error = String;

fn try_from(compression: Compression) -> Result<Self, Self::Error> {
match compression {
Compression::None => Ok(ChronicleCompression::None),
Compression::Gzip(compression_level) => {
Ok(ChronicleCompression::Gzip(compression_level))
}
_ => Err("Compression type is not supported by Chronicle".to_string()),
}
}
}

// Schema generation largely copied from `src/sinks/util/buffer/compression`
impl Configurable for ChronicleCompression {
fn metadata() -> Metadata {
Compression::metadata()
}

fn generate_schema(gen: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
// First, we'll create the string-only subschemas for each algorithm, and wrap those up
// within a one-of schema.
let mut string_metadata = Metadata::with_description("Compression algorithm.");
string_metadata.add_custom_attribute(CustomAttribute::kv(ENUM_TAGGING_MODE, "external"));

let none_string_subschema = generate_string_schema("None", None, "No compression.");
let gzip_string_subschema = generate_string_schema(
"Gzip",
Some("[Gzip][gzip] compression."),
"[gzip]: https://www.gzip.org/",
);

let mut all_string_oneof_subschema =
generate_one_of_schema(&[none_string_subschema, gzip_string_subschema]);
apply_base_metadata(&mut all_string_oneof_subschema, string_metadata);

let compression_level_schema =
get_or_generate_schema(&CompressionLevel::as_configurable_ref(), gen, None)?;

let mut required = BTreeSet::new();
required.insert(ALGORITHM_NAME.to_string());

let mut properties = IndexMap::new();
properties.insert(
ALGORITHM_NAME.to_string(),
all_string_oneof_subschema.clone(),
);
properties.insert(LEVEL_NAME.to_string(), compression_level_schema);

let mut full_subschema = generate_struct_schema(properties, required, None);
let mut full_metadata =
Metadata::with_description("Compression algorithm and compression level.");
full_metadata.add_custom_attribute(CustomAttribute::flag("docs::hidden"));
apply_base_metadata(&mut full_subschema, full_metadata);

Ok(generate_one_of_schema(&[
all_string_oneof_subschema,
full_subschema,
]))
}
}

impl ToValue for ChronicleCompression {
fn to_value(&self) -> Value {
serde_json::to_value(Compression::from(*self))
.expect("Could not convert compression settings to JSON")
}
}

impl<'de> de::Deserialize<'de> for ChronicleCompression {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: de::Deserializer<'de>,
{
Compression::deserialize(deserializer)
.and_then(|x| ChronicleCompression::try_from(x).map_err(de::Error::custom))
}
}

impl ser::Serialize for ChronicleCompression {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: ser::Serializer,
{
Compression::serialize(&Compression::from(*self), serializer)
}
}
1 change: 1 addition & 0 deletions src/sinks/gcp_chronicle/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#[cfg(feature = "sinks-gcp-chronicle")]
pub mod chronicle_unstructured;
pub mod compression;
pub mod partitioner;
pub mod sink;
39 changes: 20 additions & 19 deletions src/sinks/util/buffer/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,26 @@ impl ser::Serialize for Compression {
}
}

pub const ALGORITHM_NAME: &str = "algorithm";
pub const LEVEL_NAME: &str = "level";
pub const LOGICAL_NAME: &str = "logical_name";
pub const ENUM_TAGGING_MODE: &str = "docs::enum_tagging";

pub fn generate_string_schema(
logical_name: &str,
title: Option<&'static str>,
description: &'static str,
) -> SchemaObject {
let mut const_schema = generate_const_string_schema(logical_name.to_lowercase());
let mut const_metadata = Metadata::with_description(description);
if let Some(title) = title {
const_metadata.set_title(title);
}
const_metadata.add_custom_attribute(CustomAttribute::kv(LOGICAL_NAME, logical_name));
apply_base_metadata(&mut const_schema, const_metadata);
const_schema
}

// TODO: Consider an approach for generating schema of "string or object" structure used by this type.
impl Configurable for Compression {
fn referenceable_name() -> Option<&'static str> {
Expand All @@ -288,25 +308,6 @@ impl Configurable for Compression {
}

fn generate_schema(gen: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
const ALGORITHM_NAME: &str = "algorithm";
const LEVEL_NAME: &str = "level";
const LOGICAL_NAME: &str = "logical_name";
const ENUM_TAGGING_MODE: &str = "docs::enum_tagging";

let generate_string_schema = |logical_name: &str,
title: Option<&'static str>,
description: &'static str|
-> SchemaObject {
let mut const_schema = generate_const_string_schema(logical_name.to_lowercase());
let mut const_metadata = Metadata::with_description(description);
if let Some(title) = title {
const_metadata.set_title(title);
}
const_metadata.add_custom_attribute(CustomAttribute::kv(LOGICAL_NAME, logical_name));
apply_base_metadata(&mut const_schema, const_metadata);
const_schema
};

// First, we'll create the string-only subschemas for each algorithm, and wrap those up
// within a one-of schema.
let mut string_metadata = Metadata::with_description("Compression algorithm.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,25 @@ base: components: sinks: gcp_chronicle_unstructured: configuration: {
}
}
}
compression: {
description: """
Compression configuration.
All compression algorithms use the default compression level unless otherwise specified.
"""
required: false
type: string: {
default: "none"
enum: {
gzip: """
[Gzip][gzip] compression.
[gzip]: https://www.gzip.org/
"""
none: "No compression."
}
}
}
credentials_path: {
description: """
Path to a [service account][gcp_service_account_credentials] credentials JSON file.
Expand Down

0 comments on commit 17466c6

Please sign in to comment.