Skip to content

Commit

Permalink
process stream-content requests as ndjson (w/jsonlines) (#327)
Browse files Browse the repository at this point in the history
* added jsonlines to handler for stream_content_detection

Signed-off-by: Shonda-Adena-Witherspoon <shonda.adena.witherspoon@ibm.com>

* added content-type validation to stream-content and new Error

Signed-off-by: Shonda-Adena-Witherspoon <shonda.adena.witherspoon@ibm.com>

---------

Signed-off-by: Shonda-Adena-Witherspoon <shonda.adena.witherspoon@ibm.com>
Co-authored-by: Shonda-Adena-Witherspoon <shonda.adena.witherspoon@ibm.com>
  • Loading branch information
swith004 and Shonda-Adena-Witherspoon authored Mar 4, 2025
1 parent 39a1378 commit 05f512d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 11 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ path = "src/main.rs"
anyhow = "1.0.95"
async-trait = "0.1.85"
axum = { version = "0.8.1", features = ["json"] }
axum-extra = "0.10.0"
axum-extra = {version = "0.10.0", features = ["json-lines"]}
clap = { version = "4.5.26", features = ["derive", "env"] }
eventsource-stream = "0.2.3"
futures = "0.3.31"
Expand Down
36 changes: 26 additions & 10 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use axum::{
routing::{get, post},
Json, Router,
};
use axum_extra::extract::WithRejection;
use axum_extra::{extract::WithRejection, json_lines::JsonLines};
use futures::{
stream::{self, BoxStream},
Stream, StreamExt,
Expand Down Expand Up @@ -433,23 +433,35 @@ async fn stream_classification_with_gen(
async fn stream_content_detection(
State(state): State<Arc<ServerState>>,
headers: HeaderMap,
request: Request,
) -> Response {
json_lines: JsonLines<StreamingContentDetectionRequest>,
) -> Result<impl IntoResponse, Error> {
let trace_id = Span::current().context().span().span_context().trace_id();
let headers = filter_headers(&state.orchestrator.config().passthrough_headers, headers);
info!(?trace_id, "handling content detection streaming request");

// Validate the content-type from the header and ensure it is application/x-ndjson
// If it's not, return a UnsupportedContentType error with the appropriate message
let content_type = headers
.get(http::header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok());
match content_type {
Some(content_type) if content_type.starts_with("application/x-ndjson") => (),
_ => {
return Err(Error::UnsupportedContentType(
"expected application/x-ndjson".into(),
))
}
};
let headers = filter_headers(&state.orchestrator.config().passthrough_headers, headers);

// Create input stream
let input_stream = request
.into_body()
.into_data_stream()
let input_stream = json_lines
.map(|result| {
let message =
serde_json::from_slice::<StreamingContentDetectionRequest>(&result.unwrap())?;
let message = result.unwrap();
message.validate()?;
Ok(message)
})
.boxed();

// Create task and submit to handler
let task = StreamingContentDetectionTask::new(trace_id, headers, input_stream);
let mut response_stream = state
Expand Down Expand Up @@ -482,7 +494,7 @@ async fn stream_content_detection(
}
});

Response::new(axum::body::Body::from_stream(output_stream))
Ok(Response::new(axum::body::Body::from_stream(output_stream)))
}

#[instrument(skip_all)]
Expand Down Expand Up @@ -684,6 +696,8 @@ pub enum Error {
JsonExtractorRejection(#[from] JsonRejection),
#[error("{0}")]
JsonError(String),
#[error("unsupported content type: {0}")]
UnsupportedContentType(String),
}

impl From<orchestrator::Error> for Error {
Expand Down Expand Up @@ -717,6 +731,7 @@ impl Error {
Validation(_) => (StatusCode::UNPROCESSABLE_ENTITY, self.to_string()),
NotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
ServiceUnavailable(_) => (StatusCode::SERVICE_UNAVAILABLE, self.to_string()),
UnsupportedContentType(_) => (StatusCode::UNSUPPORTED_MEDIA_TYPE, self.to_string()),
Unexpected => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()),
JsonExtractorRejection(json_rejection) => match json_rejection {
JsonRejection::JsonDataError(e) => {
Expand All @@ -742,6 +757,7 @@ impl IntoResponse for Error {
Validation(_) => (StatusCode::UNPROCESSABLE_ENTITY, self.to_string()),
NotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
ServiceUnavailable(_) => (StatusCode::SERVICE_UNAVAILABLE, self.to_string()),
UnsupportedContentType(_) => (StatusCode::UNSUPPORTED_MEDIA_TYPE, self.to_string()),
Unexpected => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()),
JsonExtractorRejection(json_rejection) => match json_rejection {
JsonRejection::JsonDataError(e) => {
Expand Down

0 comments on commit 05f512d

Please sign in to comment.