Skip to content

Commit

Permalink
Use jsonlines as response in stream-content (#329)
Browse files Browse the repository at this point in the history
* wrapped response stream in jsonlines in stream content

Signed-off-by: Shonda-Adena-Witherspoon <shonda.adena.witherspoon@ibm.com>

* removed utils json as no longer used

Signed-off-by: Shonda-Adena-Witherspoon <shonda.adena.witherspoon@ibm.com>

---------

Signed-off-by: Shonda-Adena-Witherspoon <shonda.adena.witherspoon@ibm.com>
  • Loading branch information
swith004 authored Mar 5, 2025
1 parent 05f512d commit fd73de9
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 49 deletions.
37 changes: 7 additions & 30 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ use axum::{
};
use axum_extra::{extract::WithRejection, json_lines::JsonLines};
use futures::{
stream::{self, BoxStream},
stream::{self, BoxStream, TryStreamExt},
Stream, StreamExt,
};
use hyper::body::Incoming;
use hyper_util::rt::{TokioExecutor, TokioIo};
use opentelemetry::trace::TraceContextExt;
use rustls::{server::WebPkiClientVerifier, RootCertStore, ServerConfig};
use tokio::{net::TcpListener, signal, sync::mpsc};
use tokio::{net::TcpListener, signal};
use tokio_rustls::TlsAcceptor;
use tokio_stream::wrappers::ReceiverStream;
use tower::Service;
Expand Down Expand Up @@ -464,37 +464,14 @@ async fn stream_content_detection(

// Create task and submit to handler
let task = StreamingContentDetectionTask::new(trace_id, headers, input_stream);
let mut response_stream = state
let response_stream = state
.orchestrator
.handle_streaming_content_detection(task)
.await;

// Create output stream
// This stream returns ND-JSON formatted messages to the client
// StreamingContentDetectionResponse / server::Error
let (output_tx, output_rx) = mpsc::channel::<Result<String, Infallible>>(32);
let output_stream = ReceiverStream::new(output_rx);

// Spawn task to consume response stream (typed) and send to output stream (json)
tokio::spawn(async move {
while let Some(result) = response_stream.next().await {
match result {
Ok(msg) => {
let msg = utils::json::to_nd_string(&msg).unwrap();
let _ = output_tx.send(Ok(msg)).await;
}
Err(error) => {
// Convert orchestrator::Error to server::Error
let error: Error = error.into();
// server::Error doesn't impl Serialize, so we use to_json()
let error_msg = utils::json::to_nd_string(&error.to_json()).unwrap();
let _ = output_tx.send(Ok(error_msg)).await;
}
}
}
});
.await
.map_err(Error::from);

Ok(Response::new(axum::body::Body::from_stream(output_stream)))
// Wrap the response stream in Jsonlines
Ok(JsonLines::new(response_stream).into_response())
}

#[instrument(skip_all)]
Expand Down
1 change: 0 additions & 1 deletion src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use hyper::Uri;
use url::Url;

pub mod json;
pub mod tls;
pub mod trace;

Expand Down
18 changes: 0 additions & 18 deletions src/utils/json.rs

This file was deleted.

0 comments on commit fd73de9

Please sign in to comment.