From 4e205652655bef63c458fee377964c9c1ff48161 Mon Sep 17 00:00:00 2001 From: declark1 <44146800+declark1@users.noreply.github.com> Date: Mon, 3 Mar 2025 16:41:53 -0800 Subject: [PATCH 01/14] Add orchestrator.types module Signed-off-by: declark1 <44146800+declark1@users.noreply.github.com> --- src/orchestrator.rs | 1 + src/orchestrator/types.rs | 31 +++ src/orchestrator/types/chat_message.rs | 56 +++++ src/orchestrator/types/chunk.rs | 149 ++++++++++++ src/orchestrator/types/detection.rs | 224 ++++++++++++++++++ .../types/detection_batch_stream.rs | 89 +++++++ src/orchestrator/types/detection_batcher.rs | 26 ++ .../detection_batcher/chat_completion.rs | 40 ++++ .../detection_batcher/max_processed_index.rs | 62 +++++ .../types/detection_batcher/noop.rs | 33 +++ 10 files changed, 711 insertions(+) create mode 100644 src/orchestrator/types.rs create mode 100644 src/orchestrator/types/chat_message.rs create mode 100644 src/orchestrator/types/chunk.rs create mode 100644 src/orchestrator/types/detection.rs create mode 100644 src/orchestrator/types/detection_batch_stream.rs create mode 100644 src/orchestrator/types/detection_batcher.rs create mode 100644 src/orchestrator/types/detection_batcher/chat_completion.rs create mode 100644 src/orchestrator/types/detection_batcher/max_processed_index.rs create mode 100644 src/orchestrator/types/detection_batcher/noop.rs diff --git a/src/orchestrator.rs b/src/orchestrator.rs index 6ef3de20..09972114 100644 --- a/src/orchestrator.rs +++ b/src/orchestrator.rs @@ -23,6 +23,7 @@ pub mod common; pub mod detector_processing; pub mod streaming; pub mod streaming_content_detection; +pub mod types; pub mod unary; use std::{collections::HashMap, pin::Pin, sync::Arc}; diff --git a/src/orchestrator/types.rs b/src/orchestrator/types.rs new file mode 100644 index 00000000..0b895ce6 --- /dev/null +++ b/src/orchestrator/types.rs @@ -0,0 +1,31 @@ +use std::pin::Pin; + +use futures::Stream; +use tokio::sync::mpsc; + +pub mod chat_message; +pub use chat_message::*; +pub mod chunk; +pub mod detection; +pub use chunk::*; +pub use detection::*; +pub mod detection_batcher; +pub use detection_batcher::*; +pub mod detection_batch_stream; +pub use detection_batch_stream::*; + +use super::Error; +use crate::{clients::openai::ChatCompletionChunk, models::ClassifiedGeneratedTextStreamResult}; + +pub type ChunkerId = String; +pub type DetectorId = String; +pub type InputId = u32; + +pub type BoxStream = Pin + Send>>; +pub type ChunkStream = BoxStream>; +pub type InputStream = BoxStream>; +pub type InputSender = mpsc::Sender>; +pub type InputReceiver = mpsc::Receiver>; +pub type DetectionStream = BoxStream>; +pub type GenerationStream = BoxStream<(usize, Result)>; +pub type ChatCompletionStream = BoxStream<(usize, Result, Error>)>; diff --git a/src/orchestrator/types/chat_message.rs b/src/orchestrator/types/chat_message.rs new file mode 100644 index 00000000..553a2476 --- /dev/null +++ b/src/orchestrator/types/chat_message.rs @@ -0,0 +1,56 @@ +use crate::clients::openai; + +/// A chat message. +#[derive(Default, Clone, Debug, PartialEq)] +pub struct ChatMessage<'a> { + /// Message index + /// Corresponds to choice index for chat completions. + pub index: u32, + /// The role of the author of this message. + pub role: Option<&'a openai::Role>, + /// The text contents of the message. + pub text: Option<&'a str>, +} + +/// An iterator over chat messages. +pub trait ChatMessageIterator { + /// Returns an iterator of [`ChatMessage`]s. + fn messages(&self) -> impl Iterator; +} + +impl ChatMessageIterator for openai::ChatCompletionsRequest { + fn messages(&self) -> impl Iterator { + self.messages.iter().enumerate().map(|(index, message)| { + let text = if let Some(openai::Content::Text(text)) = &message.content { + Some(text.as_str()) + } else { + None + }; + ChatMessage { + index: index as u32, + role: Some(&message.role), + text, + } + }) + } +} + +impl ChatMessageIterator for openai::ChatCompletion { + fn messages(&self) -> impl Iterator { + self.choices.iter().map(|choice| ChatMessage { + index: choice.index, + role: Some(&choice.message.role), + text: choice.message.content.as_deref(), + }) + } +} + +impl ChatMessageIterator for openai::ChatCompletionChunk { + fn messages(&self) -> impl Iterator { + self.choices.iter().map(|choice| ChatMessage { + index: choice.index, + role: choice.delta.role.as_ref(), + text: choice.delta.content.as_deref(), + }) + } +} diff --git a/src/orchestrator/types/chunk.rs b/src/orchestrator/types/chunk.rs new file mode 100644 index 00000000..f078b679 --- /dev/null +++ b/src/orchestrator/types/chunk.rs @@ -0,0 +1,149 @@ +use crate::pb::caikit_data_model::nlp as pb; + +/// A chunk. +#[derive(Default, Debug, Clone)] +pub struct Chunk { + /// Index of message where chunk begins + pub input_start_index: usize, + /// Index of message where chunk ends + pub input_end_index: usize, + /// Index of char where chunk begins + pub start: usize, + /// Index of char where chunk ends + pub end: usize, + /// Text + pub text: String, +} + +impl PartialOrd for Chunk { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Chunk { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + ( + self.input_start_index, + self.input_end_index, + self.start, + self.end, + ) + .cmp(&( + other.input_start_index, + other.input_end_index, + other.start, + other.end, + )) + } +} + +impl PartialEq for Chunk { + fn eq(&self, other: &Self) -> bool { + ( + self.input_start_index, + self.input_end_index, + self.start, + self.end, + ) == ( + other.input_start_index, + other.input_end_index, + other.start, + other.end, + ) + } +} + +impl Eq for Chunk {} + +impl std::hash::Hash for Chunk { + fn hash(&self, state: &mut H) { + self.input_start_index.hash(state); + self.input_end_index.hash(state); + self.start.hash(state); + self.end.hash(state); + } +} + +/// An array of chunks. +#[derive(Default, Debug, Clone)] +pub struct Chunks(Vec); + +impl Chunks { + pub fn new() -> Self { + Self::default() + } +} + +impl std::ops::Deref for Chunks { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for Chunks { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl IntoIterator for Chunks { + type Item = Chunk; + type IntoIter = as IntoIterator>::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +impl FromIterator for Chunks { + fn from_iter>(iter: T) -> Self { + let mut chunks = Chunks::new(); + for value in iter { + chunks.push(value); + } + chunks + } +} + +impl From> for Chunks { + fn from(value: Vec) -> Self { + Self(value) + } +} + +// Conversions + +impl From for Chunk { + fn from(value: pb::ChunkerTokenizationStreamResult) -> Self { + let text = value + .results + .into_iter() + .map(|token| token.text) + .collect::(); + Chunk { + input_start_index: value.input_start_index as usize, + input_end_index: value.input_end_index as usize, + start: value.start_index as usize, + end: value.processed_index as usize, + text, + } + } +} + +impl From for Chunks { + fn from(value: pb::TokenizationResults) -> Self { + value + .results + .into_iter() + .map(|token| Chunk { + start: token.start as usize, + end: token.end as usize, + text: token.text, + ..Default::default() + }) + .collect() + } +} diff --git a/src/orchestrator/types/detection.rs b/src/orchestrator/types/detection.rs new file mode 100644 index 00000000..45905fe5 --- /dev/null +++ b/src/orchestrator/types/detection.rs @@ -0,0 +1,224 @@ +use crate::{clients::detector, models}; + +/// A detection. +#[derive(Default, Debug, Clone, PartialEq)] +pub struct Detection { + pub start: Option, + pub end: Option, + pub text: Option, + pub detector_id: Option, + pub detection_type: String, + pub detection: String, + pub score: f64, + pub evidence: Vec, +} + +/// Detection evidence. +#[derive(Default, Clone, Debug, PartialEq)] +pub struct DetectionEvidence { + pub name: String, + pub value: Option, + pub score: Option, + pub evidence: Option>, +} + +/// Additional detection evidence. +#[derive(Default, Clone, Debug, PartialEq)] +pub struct Evidence { + pub name: String, + pub value: Option, + pub score: Option, +} + +/// An array of detections. +#[derive(Default, Debug, Clone)] +pub struct Detections(Vec); + +impl Detections { + pub fn new() -> Self { + Self::default() + } +} + +impl std::ops::Deref for Detections { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for Detections { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl IntoIterator for Detections { + type Item = Detection; + type IntoIter = as IntoIterator>::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +impl FromIterator for Detections { + fn from_iter>(iter: T) -> Self { + let mut detections = Detections::new(); + for value in iter { + detections.push(value); + } + detections + } +} + +impl From> for Detections { + fn from(value: Vec) -> Self { + Self(value) + } +} + +// Conversions + +impl From for Detection { + fn from(value: detector::ContentAnalysisResponse) -> Self { + Self { + start: Some(value.start), + end: Some(value.end), + text: Some(value.text), + detector_id: value.detector_id, + detection_type: value.detection_type, + detection: value.detection, + score: value.score, + evidence: value + .evidence + .map(|vs| vs.into_iter().map(Into::into).collect()) + .unwrap_or_default(), + } + } +} + +impl From>> for Detections { + fn from(value: Vec>) -> Self { + value + .into_iter() + .flatten() + .map(|detection| detection.into()) + .collect::() + } +} + +impl From for models::EvidenceObj { + fn from(value: DetectionEvidence) -> Self { + Self { + name: value.name, + value: value.value, + score: value.score, + evidence: value + .evidence + .map(|vs| vs.into_iter().map(Into::into).collect()), + } + } +} + +impl From for models::Evidence { + fn from(value: Evidence) -> Self { + Self { + name: value.name, + value: value.value, + score: value.score, + } + } +} + +impl From for DetectionEvidence { + fn from(value: models::EvidenceObj) -> Self { + Self { + name: value.name, + value: value.value, + score: value.score, + evidence: value + .evidence + .map(|vs| vs.into_iter().map(Into::into).collect()), + } + } +} + +impl From for Evidence { + fn from(value: models::Evidence) -> Self { + Self { + name: value.name, + value: value.value, + score: value.score, + } + } +} + +impl From for Detection { + fn from(value: models::DetectionResult) -> Self { + Self { + start: None, + end: None, + text: None, + detector_id: value.detector_id, + detection_type: value.detection_type, + detection: value.detection, + score: value.score, + evidence: value + .evidence + .map(|vs| vs.into_iter().map(Into::into).collect()) + .unwrap_or_default(), + } + } +} + +impl From> for Detections { + fn from(value: Vec) -> Self { + value.into_iter().map(Into::into).collect() + } +} + +impl From for models::TokenClassificationResult { + fn from(value: Detection) -> Self { + Self { + start: value.start.map(|v| v as u32).unwrap(), + end: value.end.map(|v| v as u32).unwrap(), + word: value.text.unwrap_or_default(), + entity: value.detection, + entity_group: value.detection_type, + detector_id: value.detector_id, + score: value.score, + token_count: None, + } + } +} + +impl From for Vec { + fn from(value: Detections) -> Self { + value.into_iter().map(Into::into).collect() + } +} + +impl From for detector::ContentAnalysisResponse { + fn from(value: Detection) -> Self { + let evidence = (!value.evidence.is_empty()) + .then_some(value.evidence.into_iter().map(Into::into).collect()); + Self { + start: value.start.unwrap(), + end: value.end.unwrap(), + text: value.text.unwrap(), + detection: value.detection, + detection_type: value.detection_type, + detector_id: value.detector_id, + score: value.score, + evidence, + } + } +} + +impl From for Vec { + fn from(value: Detections) -> Self { + value.into_iter().map(Into::into).collect() + } +} diff --git a/src/orchestrator/types/detection_batch_stream.rs b/src/orchestrator/types/detection_batch_stream.rs new file mode 100644 index 00000000..0430179f --- /dev/null +++ b/src/orchestrator/types/detection_batch_stream.rs @@ -0,0 +1,89 @@ +use futures::{stream, Stream, StreamExt}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; + +use super::{BoxStream, Chunk, DetectionBatcher, DetectionStream, Detections, DetectorId, InputId}; +use crate::orchestrator::Error; + +/// Wraps detection streams and produces a stream +/// of batches using a [`DetectionBatcher`]. +pub struct DetectionBatchStream { + inner: BoxStream>, +} + +impl DetectionBatchStream +where + B: DetectionBatcher, +{ + pub fn new(mut batcher: B, streams: Vec) -> Self { + // Create batch channel + let (batch_tx, batch_rx) = mpsc::channel(32); + // Create batcher channel + let (batcher_tx, mut batcher_rx) = + mpsc::channel::>(32); + + // Spawn batcher task + // This task receives new detections, pushes them to a batcher, and sends + // batches to the batch (output) stream as they become ready. + tokio::spawn(async move { + loop { + tokio::select! { + result = batcher_rx.recv() => { + match result { + Some(Ok((input_id, detector_id, chunk, detections))) => { + // Received new detections + // Push detections to batcher + batcher.push(input_id, detector_id, chunk, detections); + + // Check if we have any batches ready + if let Some(batch) = batcher.pop_batch() { + // Send batch to batch channel + let _ = batch_tx.send(Ok(batch)).await; + } + }, + Some(Err(error)) => { + // Received error + // Send error to batch channel + let _ = batch_tx.send(Err(error)).await; + break; + }, + None => { + // Batcher stream closed + // Terminate task + break; + }, + } + }, + } + } + }); + + // Create a stream set (a single stream) from multiple detection streams + let mut stream_set = stream::select_all(streams); + + // Spawn detection consumer task + // This task consumes new detections and sends them to the batcher task. + tokio::spawn(async move { + while let Some(result) = stream_set.next().await { + // Received new detections + // Send to batcher task + let _ = batcher_tx.send(result).await; + } + }); + + Self { + inner: ReceiverStream::new(batch_rx).boxed(), + } + } +} + +impl Stream for DetectionBatchStream { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.as_mut().poll_next(cx) + } +} diff --git a/src/orchestrator/types/detection_batcher.rs b/src/orchestrator/types/detection_batcher.rs new file mode 100644 index 00000000..c0fea47c --- /dev/null +++ b/src/orchestrator/types/detection_batcher.rs @@ -0,0 +1,26 @@ +pub mod chat_completion; +pub use chat_completion::*; +pub mod noop; +pub use noop::*; +pub mod max_processed_index; +pub use max_processed_index::*; + +use super::{Chunk, Detections, DetectorId, InputId}; + +/// A detection batcher. +/// Implements pluggable batching logic for a [`DetectionBatchStream`]. +pub trait DetectionBatcher: Send + 'static { + type Batch: Send + 'static; + + /// Pushes new detections. + fn push( + &mut self, + input_id: InputId, + detector_id: DetectorId, + chunk: Chunk, + detections: Detections, + ); + + /// Removes the next batch of detections, if ready. + fn pop_batch(&mut self) -> Option; +} diff --git a/src/orchestrator/types/detection_batcher/chat_completion.rs b/src/orchestrator/types/detection_batcher/chat_completion.rs new file mode 100644 index 00000000..82233c4c --- /dev/null +++ b/src/orchestrator/types/detection_batcher/chat_completion.rs @@ -0,0 +1,40 @@ +#![allow(dead_code)] +use super::{Chunk, DetectionBatcher, Detections, DetectorId, InputId}; +use crate::orchestrator::types::Chunks; + +/// A batcher for chat completions. +pub struct ChatCompletionBatcher { + detectors: Vec, + // state: TBD +} + +impl ChatCompletionBatcher { + pub fn new(detectors: Vec) -> Self { + // let state = TBD::new(); + Self { + detectors, + // state, + } + } +} + +impl DetectionBatcher for ChatCompletionBatcher { + type Batch = (u32, Chunks, Detections); // placeholder, actual type TBD + + fn push( + &mut self, + _input_id: InputId, + _detector_id: DetectorId, + _chunk: Chunk, + _detections: Detections, + ) { + // NOTE: input_id maps to choice_index + todo!() + } + + fn pop_batch(&mut self) -> Option { + // TODO: implement batching logic to align with requirements + // ref: https://github.com/foundation-model-stack/fms-guardrails-orchestrator/blob/main/docs/architecture/adrs/005-chat-completion-support.md#streaming-response + todo!() + } +} diff --git a/src/orchestrator/types/detection_batcher/max_processed_index.rs b/src/orchestrator/types/detection_batcher/max_processed_index.rs new file mode 100644 index 00000000..74fb1bc2 --- /dev/null +++ b/src/orchestrator/types/detection_batcher/max_processed_index.rs @@ -0,0 +1,62 @@ +use std::collections::{btree_map, BTreeMap}; + +use super::{Chunk, DetectionBatcher, Detections, DetectorId, InputId}; + +/// A batcher based on the original "max processed index" aggregator. +/// +/// Each chunk is a batch, returned in-order once detections +/// from all detectors have been received for the chunk. +/// +/// Assumes all detectors are using the same chunker. +pub struct MaxProcessedIndexBatcher { + n: usize, + state: BTreeMap>, +} + +impl MaxProcessedIndexBatcher { + pub fn new(n: usize) -> Self { + Self { + n, + state: BTreeMap::default(), + } + } +} + +impl DetectionBatcher for MaxProcessedIndexBatcher { + type Batch = (Chunk, Detections); + + fn push( + &mut self, + _input_id: InputId, + _detector_id: DetectorId, + chunk: Chunk, + detections: Detections, + ) { + match self.state.entry(chunk) { + btree_map::Entry::Vacant(entry) => { + // New chunk, insert entry + entry.insert(vec![detections]); + } + btree_map::Entry::Occupied(mut entry) => { + // Existing chunk, push detections + entry.get_mut().push(detections); + } + } + } + + fn pop_batch(&mut self) -> Option { + // Check if we have all detections for the next chunk + if self + .state + .first_key_value() + .is_some_and(|(_, detections)| detections.len() == self.n) + { + // We have all detections for the chunk, remove and return it. + if let Some((chunk, detections)) = self.state.pop_first() { + let detections = detections.into_iter().flatten().collect(); + return Some((chunk, detections)); + } + } + None + } +} diff --git a/src/orchestrator/types/detection_batcher/noop.rs b/src/orchestrator/types/detection_batcher/noop.rs new file mode 100644 index 00000000..03ae16a5 --- /dev/null +++ b/src/orchestrator/types/detection_batcher/noop.rs @@ -0,0 +1,33 @@ +use std::collections::VecDeque; + +use super::{Chunk, DetectionBatcher, Detections, DetectorId, InputId}; + +/// A no-op batcher that doesn't actually batch. +#[derive(Default)] +pub struct NoopBatcher { + state: VecDeque<(Chunk, Detections)>, +} + +impl NoopBatcher { + pub fn new() -> Self { + Self::default() + } +} + +impl DetectionBatcher for NoopBatcher { + type Batch = (Chunk, Detections); + + fn push( + &mut self, + _input_id: InputId, + _detector_id: DetectorId, + chunk: Chunk, + detections: Detections, + ) { + self.state.push_back((chunk, detections)); + } + + fn pop_batch(&mut self) -> Option { + self.state.pop_front() + } +} From c24215e7c39e21c34f19d4fbd8e0b98a02deff37 Mon Sep 17 00:00:00 2001 From: declark1 <44146800+declark1@users.noreply.github.com> Date: Mon, 3 Mar 2025 17:40:34 -0800 Subject: [PATCH 02/14] Update DetectionBatchStream to wrap batch_rx directly, update comments Signed-off-by: declark1 <44146800+declark1@users.noreply.github.com> --- .../types/detection_batch_stream.rs | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/src/orchestrator/types/detection_batch_stream.rs b/src/orchestrator/types/detection_batch_stream.rs index 0430179f..4b9a8a7b 100644 --- a/src/orchestrator/types/detection_batch_stream.rs +++ b/src/orchestrator/types/detection_batch_stream.rs @@ -1,14 +1,13 @@ use futures::{stream, Stream, StreamExt}; use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use super::{BoxStream, Chunk, DetectionBatcher, DetectionStream, Detections, DetectorId, InputId}; +use super::{Chunk, DetectionBatcher, DetectionStream, Detections, DetectorId, InputId}; use crate::orchestrator::Error; /// Wraps detection streams and produces a stream /// of batches using a [`DetectionBatcher`]. pub struct DetectionBatchStream { - inner: BoxStream>, + batch_rx: mpsc::Receiver>, } impl DetectionBatchStream @@ -23,33 +22,30 @@ where mpsc::channel::>(32); // Spawn batcher task - // This task receives new detections, pushes them to a batcher, and sends - // batches to the batch (output) stream as they become ready. + // This task receives new detections, pushes them to the batcher, + // and sends batches to the batch (output) channel as they become ready. tokio::spawn(async move { loop { tokio::select! { result = batcher_rx.recv() => { match result { Some(Ok((input_id, detector_id, chunk, detections))) => { - // Received new detections // Push detections to batcher batcher.push(input_id, detector_id, chunk, detections); - // Check if we have any batches ready + // Check if the next batch is ready if let Some(batch) = batcher.pop_batch() { // Send batch to batch channel let _ = batch_tx.send(Ok(batch)).await; } }, Some(Err(error)) => { - // Received error // Send error to batch channel let _ = batch_tx.send(Err(error)).await; break; }, None => { - // Batcher stream closed - // Terminate task + // Batcher channel closed break; }, } @@ -58,22 +54,19 @@ where } }); - // Create a stream set (a single stream) from multiple detection streams + // Create a stream set (single stream) from multiple detection streams let mut stream_set = stream::select_all(streams); // Spawn detection consumer task // This task consumes new detections and sends them to the batcher task. tokio::spawn(async move { while let Some(result) = stream_set.next().await { - // Received new detections - // Send to batcher task + // Send new detections to batcher task let _ = batcher_tx.send(result).await; } }); - Self { - inner: ReceiverStream::new(batch_rx).boxed(), - } + Self { batch_rx } } } @@ -84,6 +77,6 @@ impl Stream for DetectionBatchStream { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - self.inner.as_mut().poll_next(cx) + self.batch_rx.poll_recv(cx) } } From 19cd41e4fbaf6e3707fdce0b21ecd859c8a9ff48 Mon Sep 17 00:00:00 2001 From: declark1 <44146800+declark1@users.noreply.github.com> Date: Mon, 3 Mar 2025 18:03:36 -0800 Subject: [PATCH 03/14] Drop detection consumer task and consume detection stream_set from batcher task Signed-off-by: declark1 <44146800+declark1@users.noreply.github.com> --- .../types/detection_batch_stream.rs | 26 +++++-------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/src/orchestrator/types/detection_batch_stream.rs b/src/orchestrator/types/detection_batch_stream.rs index 4b9a8a7b..2bf8f1bf 100644 --- a/src/orchestrator/types/detection_batch_stream.rs +++ b/src/orchestrator/types/detection_batch_stream.rs @@ -1,7 +1,7 @@ use futures::{stream, Stream, StreamExt}; use tokio::sync::mpsc; -use super::{Chunk, DetectionBatcher, DetectionStream, Detections, DetectorId, InputId}; +use super::{DetectionBatcher, DetectionStream}; use crate::orchestrator::Error; /// Wraps detection streams and produces a stream @@ -17,17 +17,17 @@ where pub fn new(mut batcher: B, streams: Vec) -> Self { // Create batch channel let (batch_tx, batch_rx) = mpsc::channel(32); - // Create batcher channel - let (batcher_tx, mut batcher_rx) = - mpsc::channel::>(32); + + // Create a stream set (single stream) from multiple detection streams + let mut stream_set = stream::select_all(streams); // Spawn batcher task - // This task receives new detections, pushes them to the batcher, - // and sends batches to the batch (output) channel as they become ready. + // This task consumes new detections, pushes them to the batcher, + // and sends batches to the batch channel as they become ready. tokio::spawn(async move { loop { tokio::select! { - result = batcher_rx.recv() => { + result = stream_set.next() => { match result { Some(Ok((input_id, detector_id, chunk, detections))) => { // Push detections to batcher @@ -54,18 +54,6 @@ where } }); - // Create a stream set (single stream) from multiple detection streams - let mut stream_set = stream::select_all(streams); - - // Spawn detection consumer task - // This task consumes new detections and sends them to the batcher task. - tokio::spawn(async move { - while let Some(result) = stream_set.next().await { - // Send new detections to batcher task - let _ = batcher_tx.send(result).await; - } - }); - Self { batch_rx } } } From 76bac51c983b6b09841a76e8ff1c60ab63a24d38 Mon Sep 17 00:00:00 2001 From: declark1 <44146800+declark1@users.noreply.github.com> Date: Tue, 4 Mar 2025 09:31:33 -0800 Subject: [PATCH 04/14] Update MaxProcessedIndexBatcher doc string, add test_single_chunk_multiple_detectors Signed-off-by: declark1 <44146800+declark1@users.noreply.github.com> --- .../detection_batcher/max_processed_index.rs | 97 ++++++++++++++++++- 1 file changed, 93 insertions(+), 4 deletions(-) diff --git a/src/orchestrator/types/detection_batcher/max_processed_index.rs b/src/orchestrator/types/detection_batcher/max_processed_index.rs index 74fb1bc2..e8f3f627 100644 --- a/src/orchestrator/types/detection_batcher/max_processed_index.rs +++ b/src/orchestrator/types/detection_batcher/max_processed_index.rs @@ -2,12 +2,21 @@ use std::collections::{btree_map, BTreeMap}; use super::{Chunk, DetectionBatcher, Detections, DetectorId, InputId}; -/// A batcher based on the original "max processed index" aggregator. +/// A batcher based on the original "max processed index" +/// aggregator. /// -/// Each chunk is a batch, returned in-order once detections -/// from all detectors have been received for the chunk. +/// A batch corresponds to a chunk. Batches are +/// returned in-order as detections from all detectors +/// are received for the chunk. /// -/// Assumes all detectors are using the same chunker. +/// For example, if we have n chunks with 3 detectors +/// applied to each chunk, the first batch for chunk-1 is +/// popped once detections from 3 detectors are received +/// for chunk-1. The next batch for chunk-2 is popped once +/// detections from 3 detectors are received for chunk-2, +/// and so on. +/// +/// This batcher requires that all detectors use the same chunker. pub struct MaxProcessedIndexBatcher { n: usize, state: BTreeMap>, @@ -60,3 +69,83 @@ impl DetectionBatcher for MaxProcessedIndexBatcher { None } } + +#[cfg(test)] +mod test { + use super::*; + use crate::orchestrator::types::Detection; + + #[test] + fn test_single_chunk_multiple_detectors() { + let input_id = 0; + let chunk = Chunk { + input_start_index: 0, + input_end_index: 0, + start: 0, + end: 24, + text: "this is a dummy sentence".into(), + }; + + // Create a batcher that will process batches for 2 detectors + let n = 2; + let mut batcher = MaxProcessedIndexBatcher::new(n); + + // Push detections for pii detector + batcher.push( + input_id, + "pii".into(), + chunk.clone(), + vec![Detection { + start: Some(5), + end: Some(10), + detector_id: Some("pii".into()), + detection_type: "pii".into(), + score: 0.4, + ..Default::default() + }] + .into(), + ); + + // We only have detections for 1 detector + // pop_batch() should return None + assert!(batcher.pop_batch().is_none()); + + // Push detections for hap detector + batcher.push( + input_id, + "hap".into(), + chunk.clone(), + vec![ + Detection { + start: Some(5), + end: Some(10), + detector_id: Some("hap".into()), + detection_type: "hap".into(), + score: 0.8, + ..Default::default() + }, + Detection { + start: Some(15), + end: Some(20), + detector_id: Some("hap".into()), + detection_type: "hap".into(), + score: 0.8, + ..Default::default() + }, + ] + .into(), + ); + + // We have detections for 2 detectors + // pop_batch() should return a batch containing 3 detections for the chunk + let batch = batcher.pop_batch(); + assert!( + batch.is_some_and(|(chunk, detections)| { chunk == chunk && detections.len() == 3 }) + ); + } + + #[test] + fn test_out_of_order_chunks() { + todo!() + } +} From e30c3c7306bc7b0357da0308488ef65845d57610 Mon Sep 17 00:00:00 2001 From: declark1 <44146800+declark1@users.noreply.github.com> Date: Tue, 4 Mar 2025 10:04:52 -0800 Subject: [PATCH 05/14] Add test_out_of_order_chunks Signed-off-by: declark1 <44146800+declark1@users.noreply.github.com> --- .../detection_batcher/max_processed_index.rs | 86 ++++++++++++++++++- 1 file changed, 83 insertions(+), 3 deletions(-) diff --git a/src/orchestrator/types/detection_batcher/max_processed_index.rs b/src/orchestrator/types/detection_batcher/max_processed_index.rs index e8f3f627..823b39dc 100644 --- a/src/orchestrator/types/detection_batcher/max_processed_index.rs +++ b/src/orchestrator/types/detection_batcher/max_processed_index.rs @@ -90,7 +90,7 @@ mod test { let n = 2; let mut batcher = MaxProcessedIndexBatcher::new(n); - // Push detections for pii detector + // Push chunk detections for pii detector batcher.push( input_id, "pii".into(), @@ -110,7 +110,7 @@ mod test { // pop_batch() should return None assert!(batcher.pop_batch().is_none()); - // Push detections for hap detector + // Push chunk detections for hap detector batcher.push( input_id, "hap".into(), @@ -146,6 +146,86 @@ mod test { #[test] fn test_out_of_order_chunks() { - todo!() + let input_id = 0; + let chunks = [ + Chunk { + input_start_index: 0, + input_end_index: 10, + start: 0, + end: 56, + text: " a powerful tool for the development \ + of complex systems." + .into(), + }, + Chunk { + input_start_index: 11, + input_end_index: 26, + start: 56, + end: 135, + text: " It has been used in many fields, such as \ + computer vision and image processing." + .into(), + }, + ]; + + // Create a batcher that will process batches for 2 detectors + let n = 2; + let mut batcher = MaxProcessedIndexBatcher::new(n); + + // Push chunk-2 detections for pii detector + batcher.push( + input_id, + "pii".into(), + chunks[1].clone(), + Detections::default(), // no detections + ); + // Push chunk-2 detections for hap detector + batcher.push( + input_id, + "hap".into(), + chunks[1].clone(), + Detections::default(), // no detections + ); + // Push chunk-1 detections for hap detector + batcher.push( + input_id, + "hap".into(), + chunks[0].clone(), + Detections::default(), // no detections + ); + + // We have detections for chunk-2, but not chunk-1 + // pop_batch() should return None + assert!(batcher.pop_batch().is_none()); + + // Push chunk-1 detections for pii detector + batcher.push( + input_id, + "pii".into(), + chunks[0].clone(), + vec![Detection { + start: Some(10), + end: Some(20), + detector_id: Some("pii".into()), + detection_type: "pii".into(), + score: 0.4, + ..Default::default() + }] + .into(), + ); + + // We have detections for chunk-1 and chunk-2 + // pop_batch() should return chunk-1 with 1 pii detection + let batch = batcher.pop_batch(); + assert!(batch + .is_some_and(|(chunk, detections)| { chunk == chunks[0] && detections.len() == 1 })); + + // pop_batch() should return chunk-2 with no detections + let batch = batcher.pop_batch(); + assert!(batch + .is_some_and(|(chunk, detections)| { chunk == chunks[1] && detections.is_empty() })); + + // batcher state should be empty as all batches have been returned + assert!(batcher.state.is_empty()); } } From 3fd42d86777db97560e0968853a1e48671b9c11d Mon Sep 17 00:00:00 2001 From: declark1 <44146800+declark1@users.noreply.github.com> Date: Tue, 4 Mar 2025 11:21:11 -0800 Subject: [PATCH 06/14] Add test_detection_batch_stream Signed-off-by: declark1 <44146800+declark1@users.noreply.github.com> --- .../detection_batcher/max_processed_index.rs | 142 +++++++++++++++++- 1 file changed, 137 insertions(+), 5 deletions(-) diff --git a/src/orchestrator/types/detection_batcher/max_processed_index.rs b/src/orchestrator/types/detection_batcher/max_processed_index.rs index 823b39dc..4b5457f3 100644 --- a/src/orchestrator/types/detection_batcher/max_processed_index.rs +++ b/src/orchestrator/types/detection_batcher/max_processed_index.rs @@ -72,11 +72,20 @@ impl DetectionBatcher for MaxProcessedIndexBatcher { #[cfg(test)] mod test { + use std::task::Poll; + + use futures::StreamExt; + use tokio::sync::mpsc; + use tokio_stream::wrappers::ReceiverStream; + use super::*; - use crate::orchestrator::types::Detection; + use crate::orchestrator::{ + types::{Detection, DetectionBatchStream}, + Error, + }; #[test] - fn test_single_chunk_multiple_detectors() { + fn test_batcher_with_single_chunk() { let input_id = 0; let chunk = Chunk { input_start_index: 0, @@ -145,7 +154,7 @@ mod test { } #[test] - fn test_out_of_order_chunks() { + fn test_batcher_with_out_of_order_chunks() { let input_id = 0; let chunks = [ Chunk { @@ -194,7 +203,7 @@ mod test { Detections::default(), // no detections ); - // We have detections for chunk-2, but not chunk-1 + // We have all detections for chunk-2, but not chunk-1 // pop_batch() should return None assert!(batcher.pop_batch().is_none()); @@ -214,7 +223,7 @@ mod test { .into(), ); - // We have detections for chunk-1 and chunk-2 + // We have all detections for chunk-1 and chunk-2 // pop_batch() should return chunk-1 with 1 pii detection let batch = batcher.pop_batch(); assert!(batch @@ -228,4 +237,127 @@ mod test { // batcher state should be empty as all batches have been returned assert!(batcher.state.is_empty()); } + + #[tokio::test] + async fn test_detection_batch_stream() -> Result<(), Error> { + let input_id = 0; + let chunks = [ + Chunk { + input_start_index: 0, + input_end_index: 10, + start: 0, + end: 56, + text: " a powerful tool for the development \ + of complex systems." + .into(), + }, + Chunk { + input_start_index: 11, + input_end_index: 26, + start: 56, + end: 135, + text: " It has been used in many fields, such as \ + computer vision and image processing." + .into(), + }, + ]; + + // Create detection channels and streams + let (pii_detections_tx, pii_detections_rx) = + mpsc::channel::>(4); + let pii_detections_stream = ReceiverStream::new(pii_detections_rx).boxed(); + let (hap_detections_tx, hap_detections_rx) = + mpsc::channel::>(4); + let hap_detections_stream = ReceiverStream::new(hap_detections_rx).boxed(); + + // Create a batcher that will process batches for 2 detectors + let n = 2; + let batcher = MaxProcessedIndexBatcher::new(n); + + // Create detection batch stream + let streams = vec![pii_detections_stream, hap_detections_stream]; + let mut detection_batch_stream = DetectionBatchStream::new(batcher, streams); + + // Send chunk-2 detections for pii detector + let _ = pii_detections_tx + .send(Ok(( + input_id, + "pii".into(), + chunks[1].clone(), + Detections::default(), // no detections + ))) + .await; + + // Send chunk-1 detections for hap detector + let _ = hap_detections_tx + .send(Ok(( + input_id, + "hap".into(), + chunks[0].clone(), + Detections::default(), // no detections + ))) + .await; + + // Send chunk-2 detections for hap detector + let _ = hap_detections_tx + .send(Ok(( + input_id, + "hap".into(), + chunks[1].clone(), + Detections::default(), // no detections + ))) + .await; + + // We have all detections for chunk-2, but not chunk-1 + // detection_batch_stream.next() future should not be ready + assert!(matches!( + futures::poll!(detection_batch_stream.next()), + Poll::Pending + )); + + // Send chunk-1 detections for pii detector + let _ = pii_detections_tx + .send(Ok(( + input_id, + "pii".into(), + chunks[0].clone(), + vec![Detection { + start: Some(10), + end: Some(20), + detector_id: Some("pii".into()), + detection_type: "pii".into(), + score: 0.4, + ..Default::default() + }] + .into(), + ))) + .await; + + // We have all detections for chunk-1 and chunk-2 + // detection_batch_stream.next() should be ready and return chunk-1 with 1 pii detection + let batch = detection_batch_stream.next().await; + assert!(batch.is_some_and(|result| result + .is_ok_and(|(chunk, detections)| chunk == chunks[0] && detections.len() == 1))); + + // detection_batch_stream.next() should be ready and return chunk-2 with no detections + let batch = detection_batch_stream.next().await; + assert!(batch.is_some_and(|result| result + .is_ok_and(|(chunk, detections)| chunk == chunks[1] && detections.is_empty()))); + + // detection_batch_stream.next() future should not be ready + // as detection senders have not been closed + assert!(matches!( + futures::poll!(detection_batch_stream.next()), + Poll::Pending + )); + + // Drop detection senders + drop(pii_detections_tx); + drop(hap_detections_tx); + + // detection_batch_stream.next() should return None + assert!(detection_batch_stream.next().await.is_none()); + + Ok(()) + } } From 3e6c60bfe6ab907fc25aec476ee894c1de709db4 Mon Sep 17 00:00:00 2001 From: declark1 <44146800+declark1@users.noreply.github.com> Date: Tue, 4 Mar 2025 12:17:16 -0800 Subject: [PATCH 07/14] Add copyright to new files Signed-off-by: declark1 <44146800+declark1@users.noreply.github.com> --- src/orchestrator/common.rs | 16 ++++++++++++++++ src/orchestrator/common/utils.rs | 16 ++++++++++++++++ src/orchestrator/types.rs | 16 ++++++++++++++++ src/orchestrator/types/chat_message.rs | 16 ++++++++++++++++ src/orchestrator/types/chunk.rs | 16 ++++++++++++++++ src/orchestrator/types/detection.rs | 16 ++++++++++++++++ src/orchestrator/types/detection_batch_stream.rs | 16 ++++++++++++++++ src/orchestrator/types/detection_batcher.rs | 16 ++++++++++++++++ .../types/detection_batcher/chat_completion.rs | 16 ++++++++++++++++ .../detection_batcher/max_processed_index.rs | 16 ++++++++++++++++ src/orchestrator/types/detection_batcher/noop.rs | 16 ++++++++++++++++ 11 files changed, 176 insertions(+) diff --git a/src/orchestrator/common.rs b/src/orchestrator/common.rs index a1b758e7..a494b8d6 100644 --- a/src/orchestrator/common.rs +++ b/src/orchestrator/common.rs @@ -1,2 +1,18 @@ +/* + Copyright FMS Guardrails Orchestrator Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ pub mod utils; pub use utils::*; diff --git a/src/orchestrator/common/utils.rs b/src/orchestrator/common/utils.rs index b3fb15f4..9e402bb4 100644 --- a/src/orchestrator/common/utils.rs +++ b/src/orchestrator/common/utils.rs @@ -1,3 +1,19 @@ +/* + Copyright FMS Guardrails Orchestrator Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ use std::{collections::HashMap, sync::Arc}; use crate::{ diff --git a/src/orchestrator/types.rs b/src/orchestrator/types.rs index 0b895ce6..ea577c1b 100644 --- a/src/orchestrator/types.rs +++ b/src/orchestrator/types.rs @@ -1,3 +1,19 @@ +/* + Copyright FMS Guardrails Orchestrator Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ use std::pin::Pin; use futures::Stream; diff --git a/src/orchestrator/types/chat_message.rs b/src/orchestrator/types/chat_message.rs index 553a2476..92ab778c 100644 --- a/src/orchestrator/types/chat_message.rs +++ b/src/orchestrator/types/chat_message.rs @@ -1,3 +1,19 @@ +/* + Copyright FMS Guardrails Orchestrator Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ use crate::clients::openai; /// A chat message. diff --git a/src/orchestrator/types/chunk.rs b/src/orchestrator/types/chunk.rs index f078b679..249a696a 100644 --- a/src/orchestrator/types/chunk.rs +++ b/src/orchestrator/types/chunk.rs @@ -1,3 +1,19 @@ +/* + Copyright FMS Guardrails Orchestrator Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ use crate::pb::caikit_data_model::nlp as pb; /// A chunk. diff --git a/src/orchestrator/types/detection.rs b/src/orchestrator/types/detection.rs index 45905fe5..47afe9a8 100644 --- a/src/orchestrator/types/detection.rs +++ b/src/orchestrator/types/detection.rs @@ -1,3 +1,19 @@ +/* + Copyright FMS Guardrails Orchestrator Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ use crate::{clients::detector, models}; /// A detection. diff --git a/src/orchestrator/types/detection_batch_stream.rs b/src/orchestrator/types/detection_batch_stream.rs index 2bf8f1bf..373c0f77 100644 --- a/src/orchestrator/types/detection_batch_stream.rs +++ b/src/orchestrator/types/detection_batch_stream.rs @@ -1,3 +1,19 @@ +/* + Copyright FMS Guardrails Orchestrator Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ use futures::{stream, Stream, StreamExt}; use tokio::sync::mpsc; diff --git a/src/orchestrator/types/detection_batcher.rs b/src/orchestrator/types/detection_batcher.rs index c0fea47c..d089f740 100644 --- a/src/orchestrator/types/detection_batcher.rs +++ b/src/orchestrator/types/detection_batcher.rs @@ -1,3 +1,19 @@ +/* + Copyright FMS Guardrails Orchestrator Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ pub mod chat_completion; pub use chat_completion::*; pub mod noop; diff --git a/src/orchestrator/types/detection_batcher/chat_completion.rs b/src/orchestrator/types/detection_batcher/chat_completion.rs index 82233c4c..d14585db 100644 --- a/src/orchestrator/types/detection_batcher/chat_completion.rs +++ b/src/orchestrator/types/detection_batcher/chat_completion.rs @@ -1,3 +1,19 @@ +/* + Copyright FMS Guardrails Orchestrator Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ #![allow(dead_code)] use super::{Chunk, DetectionBatcher, Detections, DetectorId, InputId}; use crate::orchestrator::types::Chunks; diff --git a/src/orchestrator/types/detection_batcher/max_processed_index.rs b/src/orchestrator/types/detection_batcher/max_processed_index.rs index 4b5457f3..da6cc838 100644 --- a/src/orchestrator/types/detection_batcher/max_processed_index.rs +++ b/src/orchestrator/types/detection_batcher/max_processed_index.rs @@ -1,3 +1,19 @@ +/* + Copyright FMS Guardrails Orchestrator Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ use std::collections::{btree_map, BTreeMap}; use super::{Chunk, DetectionBatcher, Detections, DetectorId, InputId}; diff --git a/src/orchestrator/types/detection_batcher/noop.rs b/src/orchestrator/types/detection_batcher/noop.rs index 03ae16a5..e626c766 100644 --- a/src/orchestrator/types/detection_batcher/noop.rs +++ b/src/orchestrator/types/detection_batcher/noop.rs @@ -1,3 +1,19 @@ +/* + Copyright FMS Guardrails Orchestrator Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ use std::collections::VecDeque; use super::{Chunk, DetectionBatcher, Detections, DetectorId, InputId}; From e5ac6db5f9d49bbf3a5842cd0f3cc1d221efe747 Mon Sep 17 00:00:00 2001 From: declark1 <44146800+declark1@users.noreply.github.com> Date: Tue, 4 Mar 2025 12:33:37 -0800 Subject: [PATCH 08/14] Update DetectionBatchStream doc string Signed-off-by: declark1 <44146800+declark1@users.noreply.github.com> --- src/orchestrator/types/detection_batch_stream.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/orchestrator/types/detection_batch_stream.rs b/src/orchestrator/types/detection_batch_stream.rs index 373c0f77..f4ccbd5f 100644 --- a/src/orchestrator/types/detection_batch_stream.rs +++ b/src/orchestrator/types/detection_batch_stream.rs @@ -20,8 +20,12 @@ use tokio::sync::mpsc; use super::{DetectionBatcher, DetectionStream}; use crate::orchestrator::Error; -/// Wraps detection streams and produces a stream -/// of batches using a [`DetectionBatcher`]. +/// A stream adapter that wraps multiple detection streams and +/// produces a stream of batches using a [`DetectionBatcher`] +/// implementation. +/// +/// The detection batcher enables flexible batching +/// logic and returned batch types for different use cases. pub struct DetectionBatchStream { batch_rx: mpsc::Receiver>, } From 30d95fb8401acfa732b2ac40b199394ed5e61c3f Mon Sep 17 00:00:00 2001 From: declark1 <44146800+declark1@users.noreply.github.com> Date: Wed, 5 Mar 2025 10:22:18 -0800 Subject: [PATCH 09/14] Add ADR 010: DetectionBatcher & DetectionBatchStream Signed-off-by: declark1 <44146800+declark1@users.noreply.github.com> --- .../adrs/010-detection-batcher.md | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 docs/architecture/adrs/010-detection-batcher.md diff --git a/docs/architecture/adrs/010-detection-batcher.md b/docs/architecture/adrs/010-detection-batcher.md new file mode 100644 index 00000000..62cc8530 --- /dev/null +++ b/docs/architecture/adrs/010-detection-batcher.md @@ -0,0 +1,49 @@ +# ADR 010: DetectionBatcher & DetectionBatchStream + +This ADR documents the addition of two new abstractions to handle batching (fka "aggregation") of streaming detection results. + +1. `DetectionBatcher` +A trait to implement pluggable batching logic for a `DetectionBatchStream`. It includes an associated `Batch` type, enabling implementations to return different types of batches. + +2. `DetectionBatchStream` +A stream adapter that wraps multiple detection streams and produces a stream of batches using a `DetectionBatcher`. + +## Motivation + +To support initial streaming requirements outlined in ADR 002, we implemented the `Aggregator` and `Tracker` components. + +1. `Aggregator` handles batching detections and building results. Internally, it is implemented as 3 actors: + + - `AggregationActor` + Aggregates detections and sends them to the `ResultActor` + + - `GenerationActor` + Consumes generations from the generation stream, accumulates them, and provides them on-demand to the `ResultActor` to build responses + + - `ResultActor` + Builds results from detection batches and generations and sends them to result channel + +2. `Tracker` wraps a BTreeMap and contains batching logic. It is used internally by the `AggregationActor`. + +The primary issue with these components is that they were designed specifically for the *Streaming Classification With Generation* task and lack flexibility to be extended to additional streaming use cases that require batching detections, e.g. +- A use case may require different batching logic +- A use case may need to use different containers to implement it's batching logic +- A use case may need to return a different batch type +- A use case may need to build a different result type + +Additionally, actors are not used in other areas of this codebase and it introduces concepts that may be unfamiliar to new contributors, further increasing the learning curve. + +## Decisions + +1. The `DetectionBatcher` trait replaces the `Tracker`, enabling flexible and pluggable batching logic tailored to different use cases. + +2. The `DetectionBatchStream`, a stream adapter, replaces the `Aggregator`, enabling more flexiblity as it is generic over `DetectionBatcher`. + +3. The task of building results is decoupled and delegated to the task handler as a post-batching task. Instead of using an actor to accumulate and own generation/chat completion message state, a task handler can use a shared vec instead, e.g. `Arc>>`, or other approach per use case requirements. + +## Notes +1. The existing *Streaming Classification With Generation* batching logic has been re-implemented in `MaxProcessedIndexBatcher`, a `DetectionBatcher` implementation. + +## Status + +Pending \ No newline at end of file From a524a0e61331ec0567975aeed2e37d175e80fa54 Mon Sep 17 00:00:00 2001 From: declark1 <44146800+declark1@users.noreply.github.com> Date: Wed, 5 Mar 2025 11:21:20 -0800 Subject: [PATCH 10/14] Fix comment Signed-off-by: declark1 <44146800+declark1@users.noreply.github.com> --- src/orchestrator/types/detection_batch_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/orchestrator/types/detection_batch_stream.rs b/src/orchestrator/types/detection_batch_stream.rs index f4ccbd5f..2525e586 100644 --- a/src/orchestrator/types/detection_batch_stream.rs +++ b/src/orchestrator/types/detection_batch_stream.rs @@ -65,7 +65,7 @@ where break; }, None => { - // Batcher channel closed + // Detection stream set closed break; }, } From dfc0f1375482c270c910e85ef15e2fa5ee0bd418 Mon Sep 17 00:00:00 2001 From: declark1 <44146800+declark1@users.noreply.github.com> Date: Wed, 5 Mar 2025 15:28:23 -0800 Subject: [PATCH 11/14] Rename n to n_detectors in MaxProcessedIndexBatcher Signed-off-by: declark1 <44146800+declark1@users.noreply.github.com> --- .../types/detection_batcher/max_processed_index.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/orchestrator/types/detection_batcher/max_processed_index.rs b/src/orchestrator/types/detection_batcher/max_processed_index.rs index da6cc838..62a09711 100644 --- a/src/orchestrator/types/detection_batcher/max_processed_index.rs +++ b/src/orchestrator/types/detection_batcher/max_processed_index.rs @@ -34,14 +34,14 @@ use super::{Chunk, DetectionBatcher, Detections, DetectorId, InputId}; /// /// This batcher requires that all detectors use the same chunker. pub struct MaxProcessedIndexBatcher { - n: usize, + n_detectors: usize, state: BTreeMap>, } impl MaxProcessedIndexBatcher { - pub fn new(n: usize) -> Self { + pub fn new(n_detectors: usize) -> Self { Self { - n, + n_detectors, state: BTreeMap::default(), } } @@ -74,7 +74,7 @@ impl DetectionBatcher for MaxProcessedIndexBatcher { if self .state .first_key_value() - .is_some_and(|(_, detections)| detections.len() == self.n) + .is_some_and(|(_, detections)| detections.len() == self.n_detectors) { // We have all detections for the chunk, remove and return it. if let Some((chunk, detections)) = self.state.pop_first() { From 140b72c2330d395e0d98241a608c4d5627f84a6b Mon Sep 17 00:00:00 2001 From: declark1 <44146800+declark1@users.noreply.github.com> Date: Wed, 5 Mar 2025 15:39:14 -0800 Subject: [PATCH 12/14] Add docstrings to detection types and drop Option from evidence in DetectionEvidence for consistency Signed-off-by: declark1 <44146800+declark1@users.noreply.github.com> --- src/orchestrator/types/detection.rs | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/orchestrator/types/detection.rs b/src/orchestrator/types/detection.rs index 47afe9a8..8ad46df4 100644 --- a/src/orchestrator/types/detection.rs +++ b/src/orchestrator/types/detection.rs @@ -19,23 +19,35 @@ use crate::{clients::detector, models}; /// A detection. #[derive(Default, Debug, Clone, PartialEq)] pub struct Detection { + /// Start index of the detection pub start: Option, + /// End index of the detection pub end: Option, + /// Text corresponding to the detection pub text: Option, + /// ID of the detector pub detector_id: Option, + /// Type of detection pub detection_type: String, + /// Detection class pub detection: String, + /// Confidence level of the detection class pub score: f64, + /// Detection evidence pub evidence: Vec, } /// Detection evidence. #[derive(Default, Clone, Debug, PartialEq)] pub struct DetectionEvidence { + /// Evidence name pub name: String, + /// Evidence value pub value: Option, + /// Evidence score pub score: Option, - pub evidence: Option>, + /// Additional evidence + pub evidence: Vec, } /// Additional detection evidence. @@ -127,13 +139,13 @@ impl From>> for Detections { impl From for models::EvidenceObj { fn from(value: DetectionEvidence) -> Self { + let evidence = (!value.evidence.is_empty()) + .then_some(value.evidence.into_iter().map(Into::into).collect()); Self { name: value.name, value: value.value, score: value.score, - evidence: value - .evidence - .map(|vs| vs.into_iter().map(Into::into).collect()), + evidence, } } } @@ -156,7 +168,8 @@ impl From for DetectionEvidence { score: value.score, evidence: value .evidence - .map(|vs| vs.into_iter().map(Into::into).collect()), + .map(|vs| vs.into_iter().map(Into::into).collect()) + .unwrap_or_default(), } } } From f2e0b9049b19843779fbf15b540ae22683c5d73b Mon Sep 17 00:00:00 2001 From: declark1 <44146800+declark1@users.noreply.github.com> Date: Wed, 5 Mar 2025 16:09:42 -0800 Subject: [PATCH 13/14] Add refusal to ChatMessage Signed-off-by: declark1 <44146800+declark1@users.noreply.github.com> --- src/orchestrator/types/chat_message.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/orchestrator/types/chat_message.rs b/src/orchestrator/types/chat_message.rs index 92ab778c..025f2f3f 100644 --- a/src/orchestrator/types/chat_message.rs +++ b/src/orchestrator/types/chat_message.rs @@ -26,6 +26,8 @@ pub struct ChatMessage<'a> { pub role: Option<&'a openai::Role>, /// The text contents of the message. pub text: Option<&'a str>, + /// The refusal message. + pub refusal: Option<&'a str>, } /// An iterator over chat messages. @@ -46,6 +48,7 @@ impl ChatMessageIterator for openai::ChatCompletionsRequest { index: index as u32, role: Some(&message.role), text, + refusal: message.refusal.as_deref(), } }) } @@ -57,6 +60,7 @@ impl ChatMessageIterator for openai::ChatCompletion { index: choice.index, role: Some(&choice.message.role), text: choice.message.content.as_deref(), + refusal: choice.message.refusal.as_deref(), }) } } @@ -67,6 +71,7 @@ impl ChatMessageIterator for openai::ChatCompletionChunk { index: choice.index, role: choice.delta.role.as_ref(), text: choice.delta.content.as_deref(), + refusal: choice.delta.refusal.as_deref(), }) } } From 8e9d44e8ed10161c3cdafd3bc8beed12c1356631 Mon Sep 17 00:00:00 2001 From: declark1 <44146800+declark1@users.noreply.github.com> Date: Thu, 6 Mar 2025 10:41:10 -0800 Subject: [PATCH 14/14] Update detection batcher ADR Signed-off-by: declark1 <44146800+declark1@users.noreply.github.com> --- docs/architecture/adrs/010-detection-batcher.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/architecture/adrs/010-detection-batcher.md b/docs/architecture/adrs/010-detection-batcher.md index 62cc8530..a4a542d2 100644 --- a/docs/architecture/adrs/010-detection-batcher.md +++ b/docs/architecture/adrs/010-detection-batcher.md @@ -27,7 +27,7 @@ To support initial streaming requirements outlined in ADR 002, we implemented th The primary issue with these components is that they were designed specifically for the *Streaming Classification With Generation* task and lack flexibility to be extended to additional streaming use cases that require batching detections, e.g. - A use case may require different batching logic -- A use case may need to use different containers to implement it's batching logic +- A use case may need to use different data structures to implement it's batching logic - A use case may need to return a different batch type - A use case may need to build a different result type