diff --git a/CHANGELOG.md b/CHANGELOG.md index 678bf69..0a01be6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,9 +8,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate ### Changed +- Now this crate is pure Rust, no more C/C++ dependencies. - insert: increase max size of frames to improve throughput ([#130]). +- compression: replace `lz4` sys binding with `lz4-flex` (pure Rust). +- compression: replace `clickhouse-rs-cityhash-sys` sys binding with `cityhash-rs` (pure Rust) ([#107]). + +### Deprecated +- compression: `Compression::Lz4Hc` is deprecated and becomes an alias to `Compression::Lz4`. [#130]: https://github.com/ClickHouse/clickhouse-rs/issues/130 +[#107]: https://github.com/ClickHouse/clickhouse-rs/issues/107 ## [0.12.1] - 2024-08-07 ### Added diff --git a/Cargo.toml b/Cargo.toml index aab4d0e..e05fc52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,7 +55,7 @@ inserter = ["dep:quanta"] watch = ["dep:sha-1", "dep:serde_json", "serde/derive"] uuid = ["dep:uuid"] time = ["dep:time"] -lz4 = ["dep:lz4", "dep:clickhouse-rs-cityhash-sys"] +lz4 = ["dep:lz4_flex", "dep:cityhash-rs"] native-tls = ["dep:hyper-tls"] rustls-tls = ["dep:hyper-rustls"] @@ -78,8 +78,8 @@ static_assertions = "1.1" sealed = "0.5" sha-1 = { version = "0.10", optional = true } serde_json = { version = "1.0.68", optional = true } -lz4 = { version = "1.23.3", optional = true } -clickhouse-rs-cityhash-sys = { version = "0.1.2", optional = true } +lz4_flex = { version = "0.11.3", default-features = false, features = ["std"], optional = true } +cityhash-rs = { version = "=1.0.1", optional = true } # exact version for safety uuid = { version = "1", optional = true } time = { version = "0.3", optional = true } bstr = { version = "1.2", default-features = false } @@ -95,4 +95,4 @@ serde_bytes = "0.11.4" serde_repr = "0.1.7" uuid = { version = "1", features = ["v4"] } time = { version = "0.3.17", features = ["macros", "rand"] } -rand = "0.8.5" +rand = { version = "0.8.5", features = ["small_rng"] } diff --git a/README.md b/README.md index 7139dfa..f53f9cc 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # clickhouse-rs -A typed client for ClickHouse. +Official pure Rust typed client for ClickHouse DB. [![Crates.io][crates-badge]][crates-url] [![Documentation][docs-badge]][docs-url] diff --git a/benches/insert.rs b/benches/insert.rs index f630a78..03a7fe3 100644 --- a/benches/insert.rs +++ b/benches/insert.rs @@ -102,16 +102,6 @@ where rt.block_on((f)(client, iters)).unwrap() }) }); - #[cfg(feature = "lz4")] - group.bench_function("lz4hc(4)", |b| { - b.iter_custom(|iters| { - let rt = Runtime::new().unwrap(); - let client = Client::default() - .with_url(format!("http://{addr}")) - .with_compression(Compression::Lz4Hc(4)); - rt.block_on((f)(client, iters)).unwrap() - }) - }); group.finish(); } diff --git a/benches/select.rs b/benches/select.rs index ccf0378..49f8ab9 100644 --- a/benches/select.rs +++ b/benches/select.rs @@ -17,18 +17,36 @@ mod common; async fn serve( request: Request, + chunk: Bytes, ) -> Response> { common::skip_incoming(request).await; - let chunk = Bytes::from_static(&[15; 128 * 1024]); let stream = stream::repeat(chunk).map(|chunk| Ok(Frame::data(chunk))); - Response::new(StreamBody::new(stream)) } +fn prepare_chunk() -> Bytes { + use rand::{distributions::Standard, rngs::SmallRng, Rng, SeedableRng}; + + // Generate random data to avoid _real_ compression. + // TODO: It would be more useful to generate real data. + let mut rng = SmallRng::seed_from_u64(0xBA5E_FEED); + let raw: Vec<_> = (&mut rng).sample_iter(Standard).take(128 * 1024).collect(); + + // If the feature is enabled, compress the data even if we use the `None` + // compression. The compression ratio is low anyway due to random data. + #[cfg(feature = "lz4")] + let chunk = clickhouse::_priv::lz4_compress(&raw).unwrap(); + #[cfg(not(feature = "lz4"))] + let chunk = Bytes::from(raw); + + chunk +} + fn select(c: &mut Criterion) { let addr = "127.0.0.1:6543".parse().unwrap(); - let _server = common::start_server(addr, serve); + let chunk = prepare_chunk(); + let _server = common::start_server(addr, move |req| serve(req, chunk.clone())); #[allow(dead_code)] #[derive(Debug, Row, Deserialize)] @@ -53,7 +71,7 @@ fn select(c: &mut Criterion) { let mut group = c.benchmark_group("select"); group.throughput(Throughput::Bytes(mem::size_of::() as u64)); - group.bench_function("select", |b| { + group.bench_function("no compression", |b| { b.iter_custom(|iters| { let rt = Runtime::new().unwrap(); let client = Client::default() @@ -64,6 +82,18 @@ fn select(c: &mut Criterion) { start.elapsed() }) }); + #[cfg(feature = "lz4")] + group.bench_function("lz4", |b| { + b.iter_custom(|iters| { + let rt = Runtime::new().unwrap(); + let client = Client::default() + .with_url(format!("http://{addr}")) + .with_compression(Compression::Lz4); + let start = Instant::now(); + rt.block_on(run(client, iters)).unwrap(); + start.elapsed() + }) + }); group.finish(); } diff --git a/src/compression/lz4.rs b/src/compression/lz4.rs index ec4e85a..5b1cdf5 100644 --- a/src/compression/lz4.rs +++ b/src/compression/lz4.rs @@ -1,17 +1,16 @@ use std::{ - os::raw::{c_char, c_int}, pin::Pin, task::{Context, Poll}, }; use bytes::{Buf, BufMut, Bytes, BytesMut}; +use cityhash_rs::cityhash_102_128; use futures::{ready, stream::Stream}; -use lz4::liblz4::LZ4_decompress_safe; +use lz4_flex::block; use crate::{ buflist::BufList, error::{Error, Result}, - Compression, }; const MAX_COMPRESSED_SIZE: u32 = 1024 * 1024 * 1024; @@ -146,50 +145,30 @@ impl Lz4Decoder { } let mut uncompressed = vec![0u8; meta.uncompressed_size as usize]; - decompress(&self.buffer[LZ4_HEADER_SIZE..], &mut uncompressed)?; + let len = decompress(&self.buffer[LZ4_HEADER_SIZE..], &mut uncompressed)?; + debug_assert_eq!(len as u32, meta.uncompressed_size); + Ok(uncompressed.into()) } } fn calc_checksum(buffer: &[u8]) -> u128 { - let hash = clickhouse_rs_cityhash_sys::city_hash_128(buffer); - u128::from(hash.hi) << 64 | u128::from(hash.lo) + let hash = cityhash_102_128(buffer); + hash << 64 | hash >> 64 } -fn decompress(compressed: &[u8], uncompressed: &mut [u8]) -> Result<()> { - // SAFETY: all pointers are valid and sizes are correspondingly correct. - let status = unsafe { - LZ4_decompress_safe( - compressed.as_ptr() as *const c_char, - uncompressed.as_mut_ptr() as *mut c_char, - compressed.len() as c_int, - uncompressed.len() as c_int, - ) - }; - - if status < 0 { - return Err(Error::Decompression("can't decompress data".into())); - } - - Ok(()) +fn decompress(compressed: &[u8], uncompressed: &mut [u8]) -> Result { + block::decompress_into(compressed, uncompressed).map_err(|err| Error::Decompression(err.into())) } -pub(crate) fn compress(uncompressed: &[u8], mode: Compression) -> Result { - do_compress(uncompressed, mode).map_err(|err| Error::Decompression(err.into())) -} - -fn do_compress(uncompressed: &[u8], mode: Compression) -> std::io::Result { - let max_compressed_size = lz4::block::compress_bound(uncompressed.len())?; +pub(crate) fn compress(uncompressed: &[u8]) -> Result { + let max_compressed_size = block::get_maximum_output_size(uncompressed.len()); let mut buffer = BytesMut::new(); buffer.resize(LZ4_META_SIZE + max_compressed_size, 0); - let compressed_data_size = lz4::block::compress_to_buffer( - uncompressed, - Some(compression_mode(mode)), - false, - &mut buffer[LZ4_META_SIZE..], - )?; + let compressed_data_size = block::compress_into(uncompressed, &mut buffer[LZ4_META_SIZE..]) + .map_err(|err| Error::Compression(err.into()))?; buffer.truncate(LZ4_META_SIZE + compressed_data_size); @@ -206,16 +185,6 @@ fn do_compress(uncompressed: &[u8], mode: Compression) -> std::io::Result Ok(buffer.freeze()) } -fn compression_mode(mode: Compression) -> lz4::block::CompressionMode { - use lz4::block::CompressionMode; - - match mode { - Compression::None => unreachable!(), - Compression::Lz4 => CompressionMode::DEFAULT, - Compression::Lz4Hc(level) => CompressionMode::HIGHCOMPRESSION(level), - } -} - #[tokio::test] async fn it_decompresses() { use futures::stream::{self, TryStreamExt}; @@ -273,6 +242,6 @@ fn it_compresses() { 110, 103, 3, 97, 98, 99, ]; - let actual = compress(&source, Compression::Lz4).unwrap(); + let actual = compress(&source).unwrap(); assert_eq!(actual, expected); } diff --git a/src/compression/mod.rs b/src/compression/mod.rs index 4a9ceb2..66a8bd0 100644 --- a/src/compression/mod.rs +++ b/src/compression/mod.rs @@ -15,7 +15,12 @@ pub enum Compression { /// High compression levels are useful in networks with low bandwidth. /// Affects only `INSERT`s, because others are compressed by the server. /// Possible levels: `[1, 12]`. Recommended level range: `[4, 9]`. + /// + /// Deprecated: `lz4_flex` doesn't support HC mode yet: [lz4_flex#165]. + /// + /// [lz4_flex#165]: https://github.com/PSeitz/lz4_flex/issues/165 #[cfg(feature = "lz4")] + #[deprecated(note = "use `Compression::Lz4` instead")] Lz4Hc(i32), } diff --git a/src/insert.rs b/src/insert.rs index beab5a4..81574f6 100644 --- a/src/insert.rs +++ b/src/insert.rs @@ -4,7 +4,6 @@ use bytes::{Bytes, BytesMut}; use hyper::{self, Request}; use replace_with::replace_with_or_abort; use serde::Serialize; -use static_assertions::const_assert; use tokio::{ task::JoinHandle, time::{Instant, Sleep}, @@ -311,7 +310,7 @@ impl Insert { #[cfg(feature = "lz4")] fn take_and_prepare_chunk(&mut self) -> Result { Ok(if self.compression.is_lz4() { - let compressed = crate::compression::lz4::compress(&self.buffer, self.compression)?; + let compressed = crate::compression::lz4::compress(&self.buffer)?; self.buffer.clear(); compressed } else { diff --git a/src/lib.rs b/src/lib.rs index 518d728..112a14e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,10 +14,10 @@ use hyper_util::{ rt::TokioExecutor, }; -pub use clickhouse_derive::Row; +use self::{error::Result, http_client::HttpClient}; pub use self::{compression::Compression, row::Row}; -use self::{error::Result, http_client::HttpClient}; +pub use clickhouse_derive::Row; pub mod error; pub mod insert; @@ -222,8 +222,19 @@ impl Client { watch::Watch::new(self, query) } - /// Used internally to modify the options map of an _already cloned_ [`Client`] instance. + /// Used internally to modify the options map of an _already cloned_ + /// [`Client`] instance. pub(crate) fn add_option(&mut self, name: impl Into, value: impl Into) { self.options.insert(name.into(), value.into()); } } + +/// This is a private API exported only for internal purposes. +/// Do not use it in your code directly, it doesn't follow semver. +#[doc(hidden)] +pub mod _priv { + #[cfg(feature = "lz4")] + pub fn lz4_compress(uncompressed: &[u8]) -> super::Result { + crate::compression::lz4::compress(uncompressed) + } +} diff --git a/src/response.rs b/src/response.rs index d20c541..4a8823a 100644 --- a/src/response.rs +++ b/src/response.rs @@ -214,6 +214,7 @@ impl Decompress { match compression { Compression::None => Self::Plain(stream), #[cfg(feature = "lz4")] + #[allow(deprecated)] Compression::Lz4 | Compression::Lz4Hc(_) => Self::Lz4(Lz4Decoder::new(stream)), } } diff --git a/tests/it/compression.rs b/tests/it/compression.rs index 8994b7e..c0c1be1 100644 --- a/tests/it/compression.rs +++ b/tests/it/compression.rs @@ -51,10 +51,3 @@ async fn lz4() { let client = prepare_database!().with_compression(Compression::Lz4); check(client).await; } - -#[cfg(feature = "lz4")] -#[tokio::test] -async fn lz4_hc() { - let client = prepare_database!().with_compression(Compression::Lz4Hc(4)); - check(client).await; -}