Skip to content

Commit

Permalink
Update DetectionBatchStream to wrap batch_rx directly
Browse files Browse the repository at this point in the history
Signed-off-by: declark1 <44146800+declark1@users.noreply.github.com>
  • Loading branch information
declark1 committed Mar 4, 2025
1 parent 4e20565 commit cbdfe89
Showing 1 changed file with 4 additions and 7 deletions.
11 changes: 4 additions & 7 deletions src/orchestrator/types/detection_batch_stream.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use futures::{stream, Stream, StreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

use super::{BoxStream, Chunk, DetectionBatcher, DetectionStream, Detections, DetectorId, InputId};
use super::{Chunk, DetectionBatcher, DetectionStream, Detections, DetectorId, InputId};
use crate::orchestrator::Error;

/// Wraps detection streams and produces a stream
/// of batches using a [`DetectionBatcher`].
pub struct DetectionBatchStream<B: DetectionBatcher> {
inner: BoxStream<Result<B::Batch, Error>>,
batch_rx: mpsc::Receiver<Result<B::Batch, Error>>,
}

impl<B> DetectionBatchStream<B>
Expand Down Expand Up @@ -71,9 +70,7 @@ where
}
});

Self {
inner: ReceiverStream::new(batch_rx).boxed(),
}
Self { batch_rx }
}
}

Expand All @@ -84,6 +81,6 @@ impl<T: DetectionBatcher> Stream for DetectionBatchStream<T> {
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.inner.as_mut().poll_next(cx)
self.batch_rx.poll_recv(cx)
}
}

0 comments on commit cbdfe89

Please sign in to comment.