Skip to content

Commit

Permalink
Merge pull request #71 from chainbound/feat/lz4
Browse files Browse the repository at this point in the history
feat: lz4 compression
  • Loading branch information
mempirate authored Feb 2, 2024
2 parents 65605e4 + 224077d commit 5651bf7
Showing 10 changed files with 110 additions and 16 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -48,9 +48,6 @@ quinn = "0.10"
# rustls needs to be the same version as the one used by quinn
rustls = { version = "0.21", features = ["quic", "dangerous_configuration"] }
rcgen = "0.12"
flate2 = "1"
zstd = "0.13"
snap = "1"

# benchmarking & profiling
criterion = { version = "0.5", features = ["async_tokio"] }
1 change: 1 addition & 0 deletions book/src/usage/compression.md
Original file line number Diff line number Diff line change
@@ -77,6 +77,7 @@ with the default compression methods:
- Gzip
- Zstd
- Snappy
- LZ4

If you wish to use a custom compression algorithm, this is not exposed with a public API yet.
If you need this, please [open an issue][new-issue] on Github and we will prioritize it!
1 change: 1 addition & 0 deletions msg-socket/src/lib.rs
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ mod req;
mod sub;

mod connection;
pub use connection::*;

use bytes::Bytes;
pub use pubs::{PubError, PubOptions, PubSocket};
7 changes: 2 additions & 5 deletions msg-socket/src/req/driver.rs
Original file line number Diff line number Diff line change
@@ -17,12 +17,9 @@ use tokio::{
use tokio_util::codec::Framed;
use tracing::{debug, error, trace};

use crate::{
connection::{ConnectionState, ExponentialBackoff},
req::SocketState,
};

use super::{Command, ReqError, ReqOptions};
use crate::{req::SocketState, ConnectionState, ExponentialBackoff};

use msg_transport::Transport;
use msg_wire::{
auth,
7 changes: 4 additions & 3 deletions msg-socket/src/req/socket.rs
Original file line number Diff line number Diff line change
@@ -8,9 +8,10 @@ use tokio::sync::{mpsc, oneshot};
use msg_transport::Transport;

use super::{Command, ReqDriver, ReqError, ReqOptions, DEFAULT_BUFFER_SIZE};
use crate::connection::{ConnectionState, ExponentialBackoff};
use crate::ReqMessage;
use crate::{req::stats::SocketStats, req::SocketState};
use crate::{
req::{stats::SocketStats, SocketState},
ConnectionState, ExponentialBackoff, ReqMessage,
};

/// The request socket.
pub struct ReqSocket<T: Transport> {
3 changes: 1 addition & 2 deletions msg-socket/src/sub/driver.rs
Original file line number Diff line number Diff line change
@@ -12,14 +12,13 @@ use tokio::sync::mpsc::{self, error::TrySendError};
use tokio_util::codec::Framed;
use tracing::{debug, error, info, warn};

use crate::connection::{ConnectionState, ExponentialBackoff};

use super::session::SessionCommand;
use super::{
session::PublisherSession,
stream::{PublisherStream, TopicMessage},
Command, PubMessage, SocketState, SubOptions,
};
use crate::{ConnectionState, ExponentialBackoff};

use msg_common::{channel, task::JoinMap, Channel};
use msg_transport::Transport;
8 changes: 5 additions & 3 deletions msg-wire/Cargo.toml
Original file line number Diff line number Diff line change
@@ -17,6 +17,8 @@ bytes.workspace = true
thiserror.workspace = true
tokio-util.workspace = true
tracing.workspace = true
flate2.workspace = true
zstd.workspace = true
snap.workspace = true

flate2 = "1"
zstd = "0.13"
snap = "1"
lz4_flex = "0.11"
39 changes: 39 additions & 0 deletions msg-wire/src/compression/lz4.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use bytes::Bytes;
use lz4_flex::{compress, decompress};
use std::io;

use super::{CompressionType, Compressor, Decompressor};

/// A compressor that uses the LZ4 algorithm.
#[derive(Default)]
pub struct Lz4Compressor;

impl Compressor for Lz4Compressor {
fn compression_type(&self) -> CompressionType {
CompressionType::Lz4
}

fn compress(&self, data: &[u8]) -> Result<Bytes, io::Error> {
let bytes = compress(data);

Ok(Bytes::from(bytes))
}
}

#[derive(Debug, Default)]
pub struct Lz4Decompressor;

impl Decompressor for Lz4Decompressor {
fn decompress(&self, data: &[u8]) -> Result<Bytes, io::Error> {
// Usually the Lz4 compression ratio is 2.1x. So 4x should be plenty.
let min_uncompressed_size = data.len() * 4;
let bytes = decompress(data, min_uncompressed_size).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Lz4 decompression failed: {}", e),
)
})?;

Ok(Bytes::from(bytes))
}
}
31 changes: 31 additions & 0 deletions msg-wire/src/compression/mod.rs
Original file line number Diff line number Diff line change
@@ -2,9 +2,11 @@ use bytes::Bytes;
use std::io;

mod gzip;
mod lz4;
mod snappy;
mod zstd;
pub use gzip::*;
pub use lz4::*;
pub use snappy::*;
pub use zstd::*;

@@ -16,6 +18,7 @@ pub enum CompressionType {
Gzip = 1,
Zstd = 2,
Snappy = 3,
Lz4 = 4,
}

impl TryFrom<u8> for CompressionType {
@@ -27,6 +30,7 @@ impl TryFrom<u8> for CompressionType {
1 => Ok(CompressionType::Gzip),
2 => Ok(CompressionType::Zstd),
3 => Ok(CompressionType::Snappy),
4 => Ok(CompressionType::Lz4),
_ => Err(value),
}
}
@@ -63,6 +67,7 @@ pub fn try_decompress_payload(compression_type: u8, data: Bytes) -> Result<Bytes
CompressionType::Gzip => GzipDecompressor.decompress(data.as_ref()),
CompressionType::Zstd => ZstdDecompressor.decompress(data.as_ref()),
CompressionType::Snappy => SnappyDecompressor.decompress(data.as_ref()),
CompressionType::Lz4 => Lz4Decompressor.decompress(data.as_ref()),
},
Err(unsupported_compression_type) => Err(io::Error::new(
io::ErrorKind::InvalidData,
@@ -120,6 +125,21 @@ mod tests {
assert_eq!(data, decompressed);
}

#[test]
fn test_lz4_compression() {
let compressor = Lz4Compressor;
let decompressor = Lz4Decompressor;

let data =
Bytes::from("hellooooooooooooooooo wwwwwoooooooooooooooooooooooooooooooooooooorld");
println!("Before: {:?}", data.len());
let compressed = compressor.compress(&data).unwrap();
println!("After: {:?}", compressed.len());
let decompressed = decompressor.decompress(&compressed).unwrap();

assert_eq!(data, decompressed);
}

fn compression_test<C: Compressor>(data: &Bytes, comp: C) -> (std::time::Duration, f64, Bytes) {
let uncompressed_size = data.len() as f64;
let start = std::time::Instant::now();
@@ -169,6 +189,13 @@ mod tests {
snappy_perf, snappy_time
);

let lz4 = Lz4Compressor;
let (lz4_time, lz4_perf, lz4_comp) = compression_test(&data, lz4);
println!(
"lz4 compression shrank the data by {:.2}% in {:?}",
lz4_perf, lz4_time
);

println!("------");

let gzip = GzipDecompressor;
@@ -182,5 +209,9 @@ mod tests {
let snappy = SnappyDecompressor;
let snappy_time = decompression_test(&snappy_comp, snappy);
println!("snappy decompression took {:?}", snappy_time);

let lz4 = Lz4Decompressor;
let lz4_time = decompression_test(&lz4_comp, lz4);
println!("lz4 decompression took {:?}", lz4_time);
}
}

0 comments on commit 5651bf7

Please sign in to comment.