diff --git a/src/orchestrator/types/detection_batch_stream.rs b/src/orchestrator/types/detection_batch_stream.rs index 0430179f..a1ac920b 100644 --- a/src/orchestrator/types/detection_batch_stream.rs +++ b/src/orchestrator/types/detection_batch_stream.rs @@ -1,14 +1,13 @@ use futures::{stream, Stream, StreamExt}; use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use super::{BoxStream, Chunk, DetectionBatcher, DetectionStream, Detections, DetectorId, InputId}; +use super::{Chunk, DetectionBatcher, DetectionStream, Detections, DetectorId, InputId}; use crate::orchestrator::Error; /// Wraps detection streams and produces a stream /// of batches using a [`DetectionBatcher`]. pub struct DetectionBatchStream { - inner: BoxStream>, + batch_rx: mpsc::Receiver>, } impl DetectionBatchStream @@ -71,9 +70,7 @@ where } }); - Self { - inner: ReceiverStream::new(batch_rx).boxed(), - } + Self { batch_rx } } } @@ -84,6 +81,6 @@ impl Stream for DetectionBatchStream { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - self.inner.as_mut().poll_next(cx) + self.batch_rx.poll_recv(cx) } }