Skip to content

Commit

Permalink
feat(opentelemetry source): add support for enriching logs with HTTP …
Browse files Browse the repository at this point in the history
…headers (#21674)

* feat(opentelemetry): add support for enriching logs with HTTP headers

* Add sources::util::http::headers module

* http_server: Use sources::util::http::add_headers

* opentelemetry: Use sources::util::http::add_headers

* http_server: Correct description of header injection behavior with conflicting fields
  • Loading branch information
jblazquez authored Nov 5, 2024
1 parent da0680c commit 872d0b5
Show file tree
Hide file tree
Showing 13 changed files with 401 additions and 69 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ sources-gcp_pubsub = ["gcp", "dep:h2", "dep:prost", "dep:prost-types", "protobuf
sources-heroku_logs = ["sources-utils-http", "sources-utils-http-query", "sources-http_server"]
sources-host_metrics = ["heim/cpu", "heim/host", "heim/memory", "heim/net"]
sources-http_client = ["sources-utils-http-client"]
sources-http_server = ["sources-utils-http", "sources-utils-http-query"]
sources-http_server = ["sources-utils-http", "sources-utils-http-headers", "sources-utils-http-query"]
sources-internal_logs = []
sources-internal_metrics = []
sources-static_metrics = []
Expand All @@ -594,7 +594,7 @@ sources-logstash = ["sources-utils-net-tcp", "tokio-util/net"]
sources-mongodb_metrics = ["dep:mongodb"]
sources-nats = ["dep:async-nats", "dep:nkeys"]
sources-nginx_metrics = ["dep:nom"]
sources-opentelemetry = ["dep:hex", "vector-lib/opentelemetry", "dep:prost", "dep:prost-types", "sources-http_server", "sources-utils-http", "sources-vector"]
sources-opentelemetry = ["dep:hex", "vector-lib/opentelemetry", "dep:prost", "dep:prost-types", "sources-http_server", "sources-utils-http", "sources-utils-http-headers", "sources-vector"]
sources-postgresql_metrics = ["dep:postgres-openssl", "dep:tokio-postgres"]
sources-prometheus = ["sources-prometheus-scrape", "sources-prometheus-remote-write", "sources-prometheus-pushgateway"]
sources-prometheus-scrape = ["sinks-prometheus", "sources-utils-http-client", "vector-lib/prometheus"]
Expand All @@ -611,6 +611,7 @@ sources-utils-http = ["sources-utils-http-auth", "sources-utils-http-encoding",
sources-utils-http-auth = ["sources-utils-http-error"]
sources-utils-http-encoding = ["sources-utils-http-error"]
sources-utils-http-error = []
sources-utils-http-headers = []
sources-utils-http-prelude = ["sources-utils-http", "sources-utils-http-auth", "sources-utils-http-encoding", "sources-utils-http-error"]
sources-utils-http-query = []
sources-utils-http-client = ["sources-utils-http", "sources-http_server"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `opentelemetry` source can now be configured to enrich log events with HTTP headers received in the OTLP/HTTP request.

authors: jblazquez
73 changes: 22 additions & 51 deletions src/sources/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use http::StatusCode;
use http_serde;
use tokio_util::codec::Decoder as _;
use vrl::value::{kind::Collection, Kind};
use warp::http::{HeaderMap, HeaderValue};
use warp::http::HeaderMap;

use vector_lib::codecs::{
decoding::{DeserializerConfig, FramingConfig},
Expand All @@ -26,11 +26,11 @@ use crate::{
GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
SourceOutput,
},
event::{Event, Value},
event::Event,
http::KeepaliveConfig,
serde::{bool_or_struct, default_decoding},
sources::util::{
http::{add_query_parameters, HttpMethod},
http::{add_headers, add_query_parameters, HttpMethod},
Encoding, ErrorMessage, HttpSource, HttpSourceAuthConfig,
},
tls::TlsEnableableConfig,
Expand Down Expand Up @@ -91,7 +91,7 @@ pub struct SimpleHttpConfig {
///
/// Specifying "*" results in all headers included in the log event.
///
/// These override any values included in the JSON payload with conflicting names.
/// These headers are not included in the JSON payload if a field with a conflicting name exists.
#[serde(default)]
#[configurable(metadata(docs::examples = "User-Agent"))]
#[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
Expand Down Expand Up @@ -429,7 +429,7 @@ impl HttpSource for SimpleHttpSource {
&self,
events: &mut [Event],
request_path: &str,
headers_config: &HeaderMap,
headers: &HeaderMap,
query_parameters: &HashMap<String, String>,
source_ip: Option<&SocketAddr>,
) {
Expand All @@ -446,50 +446,6 @@ impl HttpSource for SimpleHttpSource {
request_path.to_owned(),
);

for h in &self.headers {
match h {
// Add each non-wildcard containing header that was specified
// in the `headers` config option to the event if an exact match
// is found.
HttpConfigParamKind::Exact(header_name) => {
let value =
headers_config.get(header_name).map(HeaderValue::as_bytes);

self.log_namespace.insert_source_metadata(
SimpleHttpConfig::NAME,
log,
Some(LegacyKey::InsertIfEmpty(path!(header_name))),
path!("headers", header_name),
Value::from(value.map(Bytes::copy_from_slice)),
);
}
// Add all headers that match against wildcard pattens specified
// in the `headers` config option to the event.
HttpConfigParamKind::Glob(header_pattern) => {
for header_name in headers_config.keys() {
if header_pattern.matches_with(
header_name.as_str(),
glob::MatchOptions::default(),
) {
let value = headers_config
.get(header_name)
.map(HeaderValue::as_bytes);

self.log_namespace.insert_source_metadata(
SimpleHttpConfig::NAME,
log,
Some(LegacyKey::InsertIfEmpty(path!(
header_name.as_str()
))),
path!("headers", header_name.as_str()),
Value::from(value.map(Bytes::copy_from_slice)),
);
}
}
}
};
}

self.log_namespace.insert_standard_vector_source_metadata(
log,
SimpleHttpConfig::NAME,
Expand All @@ -512,6 +468,14 @@ impl HttpSource for SimpleHttpSource {
}
}

add_headers(
events,
&self.headers,
headers,
self.log_namespace,
SimpleHttpConfig::NAME,
);

add_query_parameters(
events,
&self.query_parameters,
Expand Down Expand Up @@ -1120,6 +1084,8 @@ mod tests {
let mut headers = HeaderMap::new();
headers.insert("User-Agent", "test_client".parse().unwrap());
headers.insert("X-Case-Sensitive-Value", "CaseSensitive".parse().unwrap());
// Header that conflicts with an existing field.
headers.insert("key1", "value_from_header".parse().unwrap());

let (rx, addr) = source(
vec!["*".to_string()],
Expand Down Expand Up @@ -1219,7 +1185,11 @@ mod tests {
.await;

spawn_ok_collect_n(
send_with_query(addr, "{\"key1\":\"value1\"}", "source=staging&region=gb"),
send_with_query(
addr,
"{\"key1\":\"value1\",\"key2\":\"value2\"}",
"source=staging&region=gb&key1=value_from_query",
),
rx,
1,
)
Expand All @@ -1230,7 +1200,8 @@ mod tests {
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(log["key1"], "value1".into());
assert_eq!(log["key1"], "value_from_query".into());
assert_eq!(log["key2"], "value2".into());
assert_eq!(log["source"], "staging".into());
assert_eq!(log["region"], "gb".into());
assert_event_metadata(log).await;
Expand Down
59 changes: 45 additions & 14 deletions src/sources/opentelemetry/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ use vector_lib::{
event::{BatchNotifier, BatchStatus},
EstimatedJsonEncodedSizeOf,
};
use warp::{filters::BoxedFilter, reject::Rejection, reply::Response, Filter, Reply};
use warp::{
filters::BoxedFilter, http::HeaderMap, reject::Rejection, reply::Response, Filter, Reply,
};

use crate::http::{KeepaliveConfig, MaxConnectionAgeLayer};
use crate::sources::http_server::HttpConfigParamKind;
use crate::sources::util::add_headers;
use crate::{
event::Event,
http::build_http_trace_layer,
Expand All @@ -36,6 +40,7 @@ use crate::{
SourceSender,
};

use super::OpentelemetryConfig;
use super::{reply::protobuf, status::Status};

#[derive(Clone, Copy, Debug, Snafu)]
Expand Down Expand Up @@ -86,13 +91,15 @@ pub(crate) fn build_warp_filter(
out: SourceSender,
bytes_received: Registered<BytesReceived>,
events_received: Registered<EventsReceived>,
headers: Vec<HttpConfigParamKind>,
) -> BoxedFilter<(Response,)> {
let log_filters = build_warp_log_filter(
acknowledgements,
log_namespace,
out.clone(),
bytes_received.clone(),
events_received.clone(),
headers,
);
let trace_filters = build_warp_trace_filter(
acknowledgements,
Expand All @@ -103,12 +110,28 @@ pub(crate) fn build_warp_filter(
log_filters.or(trace_filters).unify().boxed()
}

fn enrich_events(
events: &mut [Event],
headers_config: &[HttpConfigParamKind],
headers: &HeaderMap,
log_namespace: LogNamespace,
) {
add_headers(
events,
headers_config,
headers,
log_namespace,
OpentelemetryConfig::NAME,
);
}

fn build_warp_log_filter(
acknowledgements: bool,
log_namespace: LogNamespace,
out: SourceSender,
bytes_received: Registered<BytesReceived>,
events_received: Registered<EventsReceived>,
headers: Vec<HttpConfigParamKind>,
) -> BoxedFilter<(Response,)> {
warp::post()
.and(warp::path!("v1" / "logs"))
Expand All @@ -117,21 +140,29 @@ fn build_warp_log_filter(
"application/x-protobuf",
))
.and(warp::header::optional::<String>("content-encoding"))
.and(warp::header::headers_cloned())
.and(warp::body::bytes())
.and_then(move |encoding_header: Option<String>, body: Bytes| {
let events = decode(encoding_header.as_deref(), body).and_then(|body| {
bytes_received.emit(ByteSize(body.len()));
decode_log_body(body, log_namespace, &events_received)
});
.and_then(
move |encoding_header: Option<String>, headers_config: HeaderMap, body: Bytes| {
let events = decode(encoding_header.as_deref(), body)
.and_then(|body| {
bytes_received.emit(ByteSize(body.len()));
decode_log_body(body, log_namespace, &events_received)
})
.map(|mut events| {
enrich_events(&mut events, &headers, &headers_config, log_namespace);
events
});

handle_request(
events,
acknowledgements,
out.clone(),
super::LOGS,
ExportLogsServiceResponse::default(),
)
})
handle_request(
events,
acknowledgements,
out.clone(),
super::LOGS,
ExportLogsServiceResponse::default(),
)
},
)
.boxed()
}

Expand Down
2 changes: 2 additions & 0 deletions src/sources/opentelemetry/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ async fn receive_logs_legacy_namespace() {
address: source_http_address().parse().unwrap(),
tls: Default::default(),
keepalive: Default::default(),
headers: vec![],
},
acknowledgements: Default::default(),
log_namespace: Default::default(),
Expand Down Expand Up @@ -149,6 +150,7 @@ async fn receive_trace() {
address: source_http_address().parse().unwrap(),
tls: Default::default(),
keepalive: Default::default(),
headers: vec![],
},
acknowledgements: Default::default(),
log_namespace: Default::default(),
Expand Down
20 changes: 20 additions & 0 deletions src/sources/opentelemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ use crate::{
tls::{MaybeTlsSettings, TlsEnableableConfig},
};

use super::http_server::{build_param_matcher, remove_duplicates};

pub const LOGS: &str = "logs";
pub const TRACES: &str = "traces";

Expand Down Expand Up @@ -113,13 +115,28 @@ struct HttpConfig {
#[configurable(derived)]
#[serde(default)]
keepalive: KeepaliveConfig,

/// A list of HTTP headers to include in the log event.
///
/// Accepts the wildcard (`*`) character for headers matching a specified pattern.
///
/// Specifying "*" results in all headers included in the log event.
///
/// These headers are not included in the JSON payload if a field with a conflicting name exists.
#[serde(default)]
#[configurable(metadata(docs::examples = "User-Agent"))]
#[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
#[configurable(metadata(docs::examples = "X-*"))]
#[configurable(metadata(docs::examples = "*"))]
headers: Vec<String>,
}

fn example_http_config() -> HttpConfig {
HttpConfig {
address: "0.0.0.0:4318".parse().unwrap(),
tls: None,
keepalive: KeepaliveConfig::default(),
headers: vec![],
}
}

Expand Down Expand Up @@ -178,12 +195,15 @@ impl SourceConfig for OpentelemetryConfig {
let http_tls_settings = MaybeTlsSettings::from_config(&self.http.tls, true)?;
let protocol = http_tls_settings.http_protocol_name();
let bytes_received = register!(BytesReceived::from(Protocol::from(protocol)));
let headers =
build_param_matcher(&remove_duplicates(self.http.headers.clone(), "headers"))?;
let filters = build_warp_filter(
acknowledgements,
log_namespace,
cx.out,
bytes_received,
events_received,
headers,
);
let http_source = run_http_server(
self.http.address,
Expand Down
Loading

0 comments on commit 872d0b5

Please sign in to comment.