diff --git a/Cargo.toml b/Cargo.toml index c3ad148..51d71b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,10 @@ harness = false name = "select" harness = false +[[example]] +name = "inserter" +required-features = ["inserter"] + [[example]] name = "mock" required-features = ["test-util"] diff --git a/README.md b/README.md index ccec331..3f2bf71 100644 --- a/README.md +++ b/README.md @@ -160,6 +160,8 @@ if stats.rows > 0 { } ``` +Please, read [examples](https://github.com/ClickHouse/clickhouse-rs/tree/main/examples/inserter.rs) to understand how to use it properly in different real-world cases. + * `Inserter` ends an active insert in `commit()` if thresholds (`max_bytes`, `max_rows`, `period`) are reached. * The interval between ending active `INSERT`s can be biased by using `with_period_bias` to avoid load spikes by parallel inserters. * `Inserter::time_left()` can be used to detect when the current period ends. Call `Inserter::commit()` again to check limits if your stream emits items rarely. diff --git a/examples/README.md b/examples/README.md index 3f1cdaa..d1b128a 100644 --- a/examples/README.md +++ b/examples/README.md @@ -8,8 +8,9 @@ If something is missing, or you found a mistake in one of these examples, please ### General usage -- [usage.rs](usage.rs) - creating tables, executing other DDLs, inserting the data, and selecting it back. Additionally, it covers the client-side batching via the `inserter` feature, as well as `WATCH` queries. Optional cargo features: `inserter`, `watch`. +- [usage.rs](usage.rs) - creating tables, executing other DDLs, inserting the data, and selecting it back. Additionally, it covers `WATCH` queries. Optional cargo features: `inserter`, `watch`. - [mock.rs](mock.rs) - writing tests with `mock` feature. Cargo features: requires `test-util`. +- [inserter.rs](inserter.rs) - using the client-side batching via the `inserter` feature. Cargo features: requires `inserter`. - [async_insert.rs](async_insert.rs) - using the server-side batching via the [asynchronous inserts](https://clickhouse.com/docs/en/optimize/asynchronous-inserts) ClickHouse feature - [clickhouse_cloud.rs](clickhouse_cloud.rs) - using the client with ClickHouse Cloud, highlighting a few relevant settings (`wait_end_of_query`, `select_sequential_consistency`). Cargo features: requires `rustls-tls`; the code also works with `native-tls`. - [clickhouse_settings.rs](clickhouse_settings.rs) - applying various ClickHouse settings on the query level @@ -56,4 +57,4 @@ If a particular example requires a cargo feature, you could run it as follows: cargo run --package clickhouse --example usage --features inserter watch ``` -Additionally, the individual examples should be runnable via the IDE such as CLion or RustRover. \ No newline at end of file +Additionally, the individual examples should be runnable via the IDE such as CLion or RustRover. diff --git a/examples/inserter.rs b/examples/inserter.rs new file mode 100644 index 0000000..baaedbc --- /dev/null +++ b/examples/inserter.rs @@ -0,0 +1,173 @@ +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use tokio::{ + sync::mpsc::{self, error::TryRecvError, Receiver}, + time::timeout, +}; + +use clickhouse::{error::Result, sql::Identifier, Client, Row}; + +const TABLE_NAME: &str = "chrs_inserter"; + +#[derive(Debug, Row, Serialize, Deserialize)] +struct MyRow { + no: u32, +} + +// Pattern 1: dense streams +// ------------------------ +// This pattern is useful when the stream is dense, i.e. with no/small pauses +// between rows. For instance, when reading from a file or another database. +// In other words, this pattern is applicable for ETL-like tasks. +async fn dense(client: &Client, mut rx: Receiver) -> Result<()> { + let mut inserter = client + .inserter(TABLE_NAME)? + // We limit the number of rows to be inserted in a single `INSERT` statement. + // We use small value (100) for the example only. + // See documentation of `with_max_rows` for details. + .with_max_rows(100) + // You can also use other limits. For instance, limit by the size. + // First reached condition will end the current `INSERT`. + .with_max_bytes(1_048_576); + + while let Some(no) = rx.recv().await { + inserter.write(&MyRow { no })?; + inserter.commit().await?; + } + + inserter.end().await?; + Ok(()) +} + +// Pattern 2: sparse streams +// ------------------------- +// This pattern is useful when the stream is sparse, i.e. with pauses between +// rows. For instance, when streaming a real-time stream of events into CH. +// Some rows are arriving one by one with delay, some batched. +async fn sparse(client: &Client, mut rx: Receiver) -> Result<()> { + let mut inserter = client + .inserter(TABLE_NAME)? + // Slice the stream into chunks (one `INSERT` per chunk) by time. + // See documentation of `with_period` for details. + .with_period(Some(Duration::from_millis(100))) + // If you have a lot of parallel inserters (e.g. on multiple nodes), + // it's reasonable to add some bias to the period to spread the load. + .with_period_bias(0.1) + // We also can use other limits. This is useful when the stream is + // recovered after a long time of inactivity (e.g. restart of service or CH). + .with_max_rows(500_000); + + loop { + let no = match rx.try_recv() { + Ok(event) => event, + Err(TryRecvError::Empty) => { + // If there is no available events, we should wait for the next one. + // However, we don't know when the next event will arrive. + // So, we should wait no longer than the left time of the current period. + let time_left = inserter.time_left().expect("with_period is set"); + + // Note: `rx.recv()` must be cancel safe for your channel. + // This is true for popular `tokio`, `futures-channel`, `flume` channels. + match timeout(time_left, rx.recv()).await { + Ok(Some(event)) => event, + // The stream is closed. + Ok(None) => break, + // Timeout + Err(_) => { + // If the period is over, we allow the inserter to end the current `INSERT` + // statement. If no `INSERT` is in progress, this call is no-op. + inserter.commit().await?; + continue; + } + } + } + Err(TryRecvError::Disconnected) => break, + }; + + inserter.write(&MyRow { no })?; + inserter.commit().await?; + + // You can use result of `commit()` to get the number of rows inserted. + // It's useful not only for statistics but also to implement + // at-least-once delivery by sending this info back to the sender, + // where all unacknowledged events should be stored in this case. + } + + inserter.end().await?; + Ok(()) +} + +fn spawn_data_generator(n: u32, sparse: bool) -> Receiver { + let (tx, rx) = mpsc::channel(1000); + + tokio::spawn(async move { + for no in 0..n { + if sparse { + let delay_ms = if no % 100 == 0 { 20 } else { 2 }; + tokio::time::sleep(Duration::from_millis(delay_ms)).await; + } + + tx.send(no).await.unwrap(); + } + }); + + rx +} + +async fn fetch_batches(client: &Client) -> Result> { + client + .query( + "SELECT toString(insertion_time), count() + FROM ? + GROUP BY insertion_time + ORDER BY insertion_time", + ) + .bind(Identifier(TABLE_NAME)) + .fetch_all::<(String, u64)>() + .await +} + +#[tokio::main] +async fn main() -> Result<()> { + let client = Client::default().with_url("http://localhost:8123"); + + client + .query( + "CREATE OR REPLACE TABLE ? ( + no UInt32, + insertion_time DateTime64(6) DEFAULT now64(6) + ) + ENGINE = MergeTree + ORDER BY no", + ) + .bind(Identifier(TABLE_NAME)) + .execute() + .await?; + + println!("Pattern 1: dense streams"); + let rx = spawn_data_generator(1000, false); + dense(&client, rx).await?; + + // Prints 10 batches with 100 rows in each. + for (insertion_time, count) in fetch_batches(&client).await? { + println!("{}: {} rows", insertion_time, count); + } + + client + .query("TRUNCATE TABLE ?") + .bind(Identifier(TABLE_NAME)) + .execute() + .await?; + + println!("\nPattern 2: sparse streams"); + let rx = spawn_data_generator(1000, true); + sparse(&client, rx).await?; + + // Prints batches every 100±10ms. + for (insertion_time, count) in fetch_batches(&client).await? { + println!("{}: {} rows", insertion_time, count); + } + + Ok(()) +} diff --git a/examples/usage.rs b/examples/usage.rs index 787b5f8..9a49eb0 100644 --- a/examples/usage.rs +++ b/examples/usage.rs @@ -37,6 +37,8 @@ async fn insert(client: &Client) -> Result<()> { insert.end().await } +// This is a very basic example of using the `inserter` feature. +// See `inserter.rs` for real-world patterns. #[cfg(feature = "inserter")] async fn inserter(client: &Client) -> Result<()> { let mut inserter = client @@ -45,10 +47,6 @@ async fn inserter(client: &Client) -> Result<()> { .with_period(Some(std::time::Duration::from_secs(15))); for i in 0..1000 { - if i == 500 { - inserter.set_max_rows(300); - } - inserter.write(&MyRow { no: i, name: "foo" })?; inserter.commit().await?; } diff --git a/src/inserter.rs b/src/inserter.rs index 3e42901..7597409 100644 --- a/src/inserter.rs +++ b/src/inserter.rs @@ -53,6 +53,7 @@ impl Inserter where T: Row, { + // TODO: (breaking change) remove `Result`. pub(crate) fn new(client: &Client, table: &str) -> Result { Ok(Self { client: client.clone(), @@ -83,6 +84,9 @@ where /// The maximum number of uncompressed bytes in one `INSERT` statement. /// + /// This is the soft limit, which can be exceeded if rows between + /// [`Inserter::commit()`] calls are larger than set value. + /// /// Note: ClickHouse inserts batches atomically only if all rows fit in the /// same partition and their number is less [`max_insert_block_size`]. /// @@ -96,6 +100,13 @@ where /// The maximum number of rows in one `INSERT` statement. /// + /// In order to reduce overhead of merging small parts by ClickHouse, use + /// larger values (e.g. 100_000 or even larger). Consider also/instead + /// [`Inserter::with_max_bytes()`] if rows can be large. + /// + /// This is the soft limit, which can be exceeded if multiple rows are + /// written between [`Inserter::commit()`] calls. + /// /// Note: ClickHouse inserts batches atomically only if all rows fit in the /// same partition and their number is less [`max_insert_block_size`]. /// @@ -114,6 +125,11 @@ where /// However, it's possible to use [`Inserter::time_left()`] and set a /// timer up to call [`Inserter::commit()`] to check passed time again. /// + /// Usually, it's reasonable to use 1-10s period, but it depends on + /// desired delay for reading the data from the table. + /// Larger values = less overhead for merging parts by CH. + /// Smaller values = less delay for readers. + /// /// Extra ticks are skipped if the previous `INSERT` is still in progress: /// ```text /// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 | @@ -141,7 +157,8 @@ where self } - /// Similar to [`Client::with_option`], but for the INSERT statements generated by this [`Inserter`] only. + /// Similar to [`Client::with_option`], but for the INSERT statements + /// generated by this [`Inserter`] only. pub fn with_option(mut self, name: impl Into, value: impl Into) -> Self { self.client.add_option(name, value); self @@ -192,7 +209,8 @@ where /// Serializes the provided row into an internal buffer. /// - /// To check the limits and send the data to ClickHouse, call [`Inserter::commit()`]. + /// To check the limits and send the data to ClickHouse, call + /// [`Inserter::commit()`]. /// /// # Panics /// If called after the previous call that returned an error.