From fd73de994e662d414a02e3ea860745e932e69df8 Mon Sep 17 00:00:00 2001 From: swith004 Date: Wed, 5 Mar 2025 16:06:22 -0500 Subject: [PATCH] Use jsonlines as response in stream-content (#329) * wrapped response stream in jsonlines in stream content Signed-off-by: Shonda-Adena-Witherspoon * removed utils json as no longer used Signed-off-by: Shonda-Adena-Witherspoon --------- Signed-off-by: Shonda-Adena-Witherspoon --- src/server.rs | 37 +++++++------------------------------ src/utils.rs | 1 - src/utils/json.rs | 18 ------------------ 3 files changed, 7 insertions(+), 49 deletions(-) delete mode 100644 src/utils/json.rs diff --git a/src/server.rs b/src/server.rs index 54236fc0..96af5c8e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -38,14 +38,14 @@ use axum::{ }; use axum_extra::{extract::WithRejection, json_lines::JsonLines}; use futures::{ - stream::{self, BoxStream}, + stream::{self, BoxStream, TryStreamExt}, Stream, StreamExt, }; use hyper::body::Incoming; use hyper_util::rt::{TokioExecutor, TokioIo}; use opentelemetry::trace::TraceContextExt; use rustls::{server::WebPkiClientVerifier, RootCertStore, ServerConfig}; -use tokio::{net::TcpListener, signal, sync::mpsc}; +use tokio::{net::TcpListener, signal}; use tokio_rustls::TlsAcceptor; use tokio_stream::wrappers::ReceiverStream; use tower::Service; @@ -464,37 +464,14 @@ async fn stream_content_detection( // Create task and submit to handler let task = StreamingContentDetectionTask::new(trace_id, headers, input_stream); - let mut response_stream = state + let response_stream = state .orchestrator .handle_streaming_content_detection(task) - .await; - - // Create output stream - // This stream returns ND-JSON formatted messages to the client - // StreamingContentDetectionResponse / server::Error - let (output_tx, output_rx) = mpsc::channel::>(32); - let output_stream = ReceiverStream::new(output_rx); - - // Spawn task to consume response stream (typed) and send to output stream (json) - tokio::spawn(async move { - while let Some(result) = response_stream.next().await { - match result { - Ok(msg) => { - let msg = utils::json::to_nd_string(&msg).unwrap(); - let _ = output_tx.send(Ok(msg)).await; - } - Err(error) => { - // Convert orchestrator::Error to server::Error - let error: Error = error.into(); - // server::Error doesn't impl Serialize, so we use to_json() - let error_msg = utils::json::to_nd_string(&error.to_json()).unwrap(); - let _ = output_tx.send(Ok(error_msg)).await; - } - } - } - }); + .await + .map_err(Error::from); - Ok(Response::new(axum::body::Body::from_stream(output_stream))) + // Wrap the response stream in Jsonlines + Ok(JsonLines::new(response_stream).into_response()) } #[instrument(skip_all)] diff --git a/src/utils.rs b/src/utils.rs index 3e74220c..f22a349a 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,7 +1,6 @@ use hyper::Uri; use url::Url; -pub mod json; pub mod tls; pub mod trace; diff --git a/src/utils/json.rs b/src/utils/json.rs deleted file mode 100644 index 53df7845..00000000 --- a/src/utils/json.rs +++ /dev/null @@ -1,18 +0,0 @@ -use serde::Serialize; - -/// Serialize the given data structure as a String of ND-JSON. -/// -/// # Errors -/// -/// Serialization can fail if `T`'s implementation of `Serialize` decides to -/// fail, or if `T` contains a map with non-string keys. -#[inline] -pub fn to_nd_string(value: &T) -> Result -where - T: ?Sized + Serialize, -{ - let mut bytes = serde_json::to_vec(value)?; - bytes.push(b'\n'); - let string = unsafe { String::from_utf8_unchecked(bytes) }; - Ok(string) -}