Skip to content

Commit

Permalink
refactor: rework {Async,}Source
Browse files Browse the repository at this point in the history
  • Loading branch information
threadexio committed Sep 2, 2024
1 parent f9b1d7a commit 2d3db09
Show file tree
Hide file tree
Showing 11 changed files with 389 additions and 346 deletions.
10 changes: 6 additions & 4 deletions channels-io/src/framed/decoder.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use alloc::vec::Vec;

/// TODO: docs
/// Decode an item from a buffer of bytes.
pub trait Decoder {
/// TODO: docs
/// The type of items the decoder accepts.
type Output;

/// TODO: docs
/// Error type returned by the decoder.
type Error;

/// TODO: docs
/// Decode an item from `buf`.
///
/// Implementations should remove data from `buf` as each item is decoded.
fn decode(
&mut self,
buf: &mut Vec<u8>,
Expand Down
226 changes: 120 additions & 106 deletions channels-io/src/framed/framed_read.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use core::fmt;
use core::pin::Pin;
use core::task::{ready, Context, Poll};

Expand All @@ -6,13 +7,25 @@ use alloc::vec::Vec;
use pin_project::pin_project;

use crate::buf::Cursor;
use crate::error::ReadError;
use crate::framed::Decoder;
use crate::source::{AsyncSource, Source};
use crate::util::{slice_uninit_assume_init_mut, PollExt};
use crate::util::slice_uninit_assume_init_mut;
use crate::{AsyncRead, AsyncReadExt, Read, ReadExt};

/// TODO: docs
/// Convert a [`Read`] to a [`Source`] or an [`AsyncRead`] to an [`AsyncSource`].
///
/// This abstraction converts unstructured input streams to structured typed streams.
/// It reads raw bytes from a reader and processes them into structured data with the
/// help of a [`Decoder`]. The [`Decoder`] decides how the raw bytes are converted back
/// to items. At the other end of the stream, a [`FramedWrite`] will have produced the bytes
/// with a matching [`Encoder`].
///
/// The [`Decoder`] will read bytes from the reader via an intermediary buffer owned by the
/// [`FramedRead`] instance, the "read buffer". The [`Decoder`] will remove bytes from it
/// as each item is decoded.
///
/// [`FramedWrite`]: crate::framed::FramedWrite
/// [`Encoder`]: crate::framed::Encoder
#[pin_project]
#[derive(Debug)]
pub struct FramedRead<R, D> {
Expand All @@ -23,14 +36,15 @@ pub struct FramedRead<R, D> {
}

impl<R, D> FramedRead<R, D> {
/// TODO: docs
/// Create a new [`FramedRead`].
#[inline]
#[must_use]
pub const fn new(reader: R, decoder: D) -> Self {
Self { reader, decoder, buf: Vec::new() }
}

/// TODO: docs
/// Create a new [`FramedRead`] that can hold `capacity` bytes in its read buffer
/// before allocating.
#[inline]
#[must_use]
pub fn with_capacity(
Expand All @@ -41,49 +55,63 @@ impl<R, D> FramedRead<R, D> {
Self { reader, decoder, buf: Vec::with_capacity(capacity) }
}

/// TODO: docs
/// Get a reference to the underlying reader.
#[inline]
#[must_use]
pub fn reader(&self) -> &R {
&self.reader
}

/// TODO: docs
/// Get a mutable reference to the underlying reader.
#[inline]
#[must_use]
pub fn reader_mut(&mut self) -> &mut R {
&mut self.reader
}

/// TODO: docs
/// Get a pinned reference to the underlying reader.
#[inline]
#[must_use]
pub fn reader_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
self.project().reader
}

/// TODO: docs
/// Get a reference to the underlying decoder.
#[inline]
#[must_use]
pub fn decoder(&self) -> &D {
&self.decoder
}

/// TODO: docs
/// Get a mutable reference to the decoder.
#[inline]
#[must_use]
pub fn decoder_mut(&mut self) -> &mut D {
&mut self.decoder
}

/// TODO: docs
/// Get a reference to the decoder from a pinned reference of the [`FramedRead`].
#[inline]
#[must_use]
pub fn decoder_pin_mut(self: Pin<&mut Self>) -> &mut D {
self.project().decoder
}

/// TODO: docs
/// Get a reference to the read buffer.
#[inline]
#[must_use]
pub fn read_buffer(&self) -> &Vec<u8> {
&self.buf
}

/// Get a mutable reference to the read buffer.
#[inline]
#[must_use]
pub fn read_buffer_mut(&mut self) -> &mut Vec<u8> {
&mut self.buf
}

/// Get a reference to the read buffer from a pinned reference of the [`FramedRead`].
#[inline]
#[must_use]
pub fn map_decoder<T, F>(self, f: F) -> FramedRead<R, T>
Expand All @@ -98,95 +126,56 @@ impl<R, D> FramedRead<R, D> {
}
}

/// TODO: docs
#[inline]
#[must_use]
pub fn read_buffer(&self) -> &Vec<u8> {
&self.buf
}

/// TODO: docs
#[inline]
#[must_use]
pub fn read_buffer_mut(&mut self) -> &mut Vec<u8> {
&mut self.buf
}

/// TODO: docs
/// Destruct this [`FramedRead`] and get back the reader, dropping the decoder.
#[inline]
#[must_use]
pub fn into_reader(self) -> R {
self.reader
}

/// TODO: docs
/// Destruct this [`FramedRead`] and get back the decoder, dropping the reader.
#[inline]
#[must_use]
pub fn into_decoder(self) -> D {
self.decoder
}

/// TODO: docs
/// Destruct this [`FramedRead`] and get back both the decoder and the reader.
#[inline]
#[must_use]
pub fn into_inner(self) -> (R, D) {
(self.reader, self.decoder)
}
}

/// TODO: docs
// TODO: error impl
/// Errors when receiving an item over a [`FramedRead`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum FramedReadError<T, Io> {
/// TODO: docs
/// The decoder returned an error.
Decode(T),
/// TODO: docs
/// There was an I/O error.
Io(Io),
}

impl<R, D> FramedRead<R, D>
impl<T, Io> fmt::Display for FramedReadError<T, Io>
where
D: Decoder,
T: fmt::Display,
Io: fmt::Display,
{
#[allow(clippy::type_complexity)]
fn poll_next_internal<F, E>(
self: Pin<&mut Self>,
mut read_buf: F,
) -> Poll<Result<D::Output, FramedReadError<D::Error, E>>>
where
E: ReadError,
F: FnMut(
Pin<&mut R>,
&mut Cursor<&mut [u8]>,
) -> Poll<Result<(), E>>,
{
let mut this = self.project();

loop {
match this.decoder.decode(this.buf) {
Ok(Some(x)) => return Poll::Ready(Ok(x)),
Ok(None) => {
if this.buf.spare_capacity_mut().is_empty() {
this.buf.reserve(1024);
}

ready!(poll_read_vec(
this.reader.as_mut(),
this.buf,
&mut read_buf
))
.map_err(FramedReadError::Io)?;
},
Err(e) => {
return Poll::Ready(Err(FramedReadError::Decode(
e,
)))
},
}
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Decode(e) => e.fmt(f),
Self::Io(e) => e.fmt(f),
}
}
}

#[cfg(feature = "std")]
impl<T, Io> std::error::Error for FramedReadError<T, Io> where
Self: fmt::Debug + fmt::Display
{
}

impl<R, D> AsyncSource for FramedRead<R, D>
where
R: AsyncRead,
Expand All @@ -199,7 +188,40 @@ where
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Self::Item> {
self.poll_next_internal(|r, buf| r.poll_read_buf(cx, buf))
let mut this = self.project();

loop {
match this.decoder.decode(this.buf) {
Ok(Some(x)) => return Poll::Ready(Ok(x)),
Ok(None) => {
if this.buf.spare_capacity_mut().is_empty() {
this.buf.reserve(1);
}

let n = {
let mut buf =
Cursor::new(get_vec_spare_cap(this.buf));

ready!(this
.reader
.as_mut()
.poll_read_buf(cx, &mut buf))
.map_err(FramedReadError::Io)?;

buf.pos()
};

unsafe {
this.buf.set_len(this.buf.len() + n);
}
},
Err(e) => {
return Poll::Ready(Err(
FramedReadError::Decode(e),
));
},
}
}
}
}

Expand All @@ -212,48 +234,40 @@ where
Result<D::Output, FramedReadError<D::Error, R::Error>>;

fn next(&mut self) -> Self::Item {
let pinned = unsafe { Pin::new_unchecked(self) };
loop {
match self.decoder.decode(&mut self.buf) {
Ok(Some(x)) => return Ok(x),
Ok(None) => {
if self.buf.spare_capacity_mut().is_empty() {
self.buf.reserve(1);
}

pinned
.poll_next_internal(|r, buf| unsafe {
Poll::Ready(r.get_unchecked_mut().read_buf(buf))
})
.unwrap()
}
}
let n = {
let mut buf = Cursor::new(get_vec_spare_cap(
&mut self.buf,
));

fn poll_read_vec<R, E, F>(
r: Pin<&mut R>,
buf: &mut Vec<u8>,
mut read_buf: F,
) -> Poll<Result<usize, E>>
where
E: ReadError,
F: FnMut(
Pin<&mut R>,
&mut Cursor<&mut [u8]>,
) -> Poll<Result<(), E>>,
{
unsafe {
let spare_cap =
slice_uninit_assume_init_mut(buf.spare_capacity_mut());
if spare_cap.is_empty() {
return Poll::Ready(Ok(0));
}

let n = {
let mut buf = Cursor::new(spare_cap);
ready!(read_buf(r, &mut buf))?;
buf.pos()
};
self.reader
.read_buf(&mut buf)
.map_err(FramedReadError::Io)?;

let new_len = buf.len() + n;
buf.set_len(new_len);
buf.pos()
};

Poll::Ready(Ok(n))
unsafe {
self.buf.set_len(self.buf.len() + n);
}
},
Err(e) => return Err(FramedReadError::Decode(e)),
}
}
}
}

fn get_vec_spare_cap(vec: &mut Vec<u8>) -> &mut [u8] {
unsafe { slice_uninit_assume_init_mut(vec.spare_capacity_mut()) }
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
8 changes: 4 additions & 4 deletions channels-io/src/framed/framed_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use crate::{AsyncWrite, AsyncWriteExt, Write, WriteExt};

/// Convert a [`Write`] to a [`Sink`] or an [`AsyncWrite`] to an [`AsyncSink`].
///
/// This abstraction converts unstructured byte to structured typed output streams. It
/// accepts "items" that are then processed by an [`Encoder`] and sent to the writer. The
/// [`Encoder`] decides how the provided items are converted to bytes. At the other end
/// This abstraction converts unstructured byte streams to structured typed output streams.
/// It accepts "items" that are then processed by an [`Encoder`] and sent to the writer.
/// The [`Encoder`] decides how the provided items are converted to bytes. At the other end
/// of the stream, a [`FramedRead`] will read the output of the encoder and using a
/// matching [`Decoder`] will decode the byte stream back into the items given to this
/// [`FramedWrite`].
Expand Down Expand Up @@ -158,7 +158,7 @@ impl<W, E> FramedWrite<W, E> {
}
}

/// Errors when sending an iter over a [`FramedWrite`].
/// Errors when sending an item over a [`FramedWrite`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum FramedWriteError<T, Io> {
/// The encoder returned an error.
Expand Down
Loading

0 comments on commit 2d3db09

Please sign in to comment.