From 3e6e503b9c02012c3f3b780d2a4752d6ee06843c Mon Sep 17 00:00:00 2001 From: Lewin Bormann Date: Fri, 21 Feb 2020 23:33:28 +0100 Subject: [PATCH] Implement VarIntAsyncReader For #4. --- src/reader.rs | 91 +++++++++++++++++++++++++++++++++------------ src/varint_tests.rs | 37 +++++++++++++++++- 2 files changed, 103 insertions(+), 25 deletions(-) diff --git a/src/reader.rs b/src/reader.rs index b0bfaf6..e4fc009 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -1,8 +1,11 @@ use std::io; use std::io::{Read, Result}; -use fixed::FixedInt; -use varint::{VarInt, MSB}; +use crate::fixed::FixedInt; +use crate::varint::{VarInt, MSB}; + +use futures::io::AsyncReadExt; +use futures::prelude::*; /// A trait for reading VarInts from any other `Reader`. /// @@ -17,37 +20,77 @@ pub trait VarIntReader { fn read_varint(&mut self) -> Result; } -impl VarIntReader for R { - fn read_varint(&mut self) -> Result { - const BUFLEN: usize = 10; - let mut buf = [0 as u8; BUFLEN]; - let mut i = 0; - - loop { - if i >= BUFLEN { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "Unterminated varint", - )); - } +/// Like a VarIntReader, but returns a future. +#[async_trait::async_trait] +pub trait VarIntAsyncReader { + async fn read_varint_async(&mut self) -> Result; +} + +/// VarIntProcessor encapsulates the logic for decoding a VarInt byte-by-byte. +#[derive(Default)] +pub struct VarIntProcessor { + buf: [u8; 10], + i: usize, +} + +impl VarIntProcessor { + fn push(&mut self, b: u8) -> Result<()> { + if self.i >= 10 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Unterminated varint", + )); + } + self.buf[self.i] = b; + self.i += 1; + Ok(()) + } + fn finished(&self) -> bool { + (self.i > 0 && (self.buf[self.i - 1] & MSB == 0)) + } + fn decode(&self) -> VI { + VI::decode_var(&self.buf[0..self.i]).0 + } +} - let read = try!(self.read(&mut buf[i..i + 1])); +#[async_trait::async_trait] +impl VarIntAsyncReader for AR { + async fn read_varint_async(&mut self) -> Result { + let mut buf = [0 as u8; 1]; + let mut p = VarIntProcessor::default(); + + while !p.finished() { + let read = self.read(&mut buf).await?; // EOF - if read == 0 && i == 0 { + if read == 0 && p.i == 0 { return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF")); } - if buf[i] & MSB == 0 { - break; + p.push(buf[0])?; + } + + Ok(p.decode()) + } +} + +impl VarIntReader for R { + fn read_varint(&mut self) -> Result { + let mut buf = [0 as u8; 1]; + let mut p = VarIntProcessor::default(); + + while !p.finished() { + let read = self.read(&mut buf)?; + + // EOF + if read == 0 && p.i == 0 { + return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF")); } - i += 1; + p.push(buf[0])?; } - let (result, _) = VI::decode_var(&buf[0..i + 1]); - - Ok(result) + Ok(p.decode()) } } @@ -63,7 +106,7 @@ impl FixedIntReader for R { fn read_fixedint(&mut self) -> Result { let mut buf = [0 as u8; 8]; - let read = try!(self.read(&mut buf[0..FI::required_space()])); + let read = self.read(&mut buf[0..FI::required_space()])?; if read == 0 { return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF")); diff --git a/src/varint_tests.rs b/src/varint_tests.rs index 3d9438d..8baf83d 100644 --- a/src/varint_tests.rs +++ b/src/varint_tests.rs @@ -1,6 +1,6 @@ #[cfg(test)] mod tests { - use crate::reader::VarIntReader; + use crate::reader::{VarIntAsyncReader, VarIntReader}; use crate::varint::VarInt; use crate::writer::VarIntWriter; @@ -105,4 +105,39 @@ mod tests { assert!(reader.read_varint::().is_err()); } + + #[test] + fn test_async_reader() { + let mut buf = Vec::with_capacity(128); + + let i1: u32 = 1; + let i2: u32 = 65532; + let i3: u32 = 4200123456; + let i4: i64 = i3 as i64 * 1000; + let i5: i32 = -32456; + + assert!(buf.write_varint(i1).is_ok()); + assert!(buf.write_varint(i2).is_ok()); + assert!(buf.write_varint(i3).is_ok()); + assert!(buf.write_varint(i4).is_ok()); + assert!(buf.write_varint(i5).is_ok()); + + let mut reader: &[u8] = buf.as_ref(); + + futures::executor::block_on(async { + assert_eq!(i1, reader.read_varint_async().await.unwrap()); + assert_eq!(i2, reader.read_varint_async().await.unwrap()); + assert_eq!(i3, reader.read_varint_async().await.unwrap()); + assert_eq!(i4, reader.read_varint_async().await.unwrap()); + assert_eq!(i5, reader.read_varint_async().await.unwrap()); + assert!(reader.read_varint_async::().await.is_err()); + }); + } + + #[test] + fn test_unterminated_varint() { + let mut buf = vec![0xff as u8; 12]; + let mut read = buf.as_slice(); + assert!(read.read_varint::().is_err()); + } }