diff --git a/Cargo.lock b/Cargo.lock index de0df15..17b1d34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -305,16 +305,15 @@ dependencies = [ [[package]] name = "console" -version = "0.15.2" +version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c050367d967ced717c04b65d8c619d863ef9292ce0c5760028655a2fb298718c" +checksum = "c3d79fbe8970a77e3e34151cc13d3b3e248aa0faaecb9f6091fa07ebefe5ad60" dependencies = [ "encode_unicode", "lazy_static", "libc", - "terminal_size", "unicode-width", - "winapi", + "windows-sys", ] [[package]] @@ -545,21 +544,6 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" -[[package]] -name = "futures-lite" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" -dependencies = [ - "fastrand", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite", - "waker-fn", -] - [[package]] name = "futures-macro" version = "0.3.25" @@ -923,12 +907,6 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" -[[package]] -name = "parking" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" - [[package]] name = "parking_lot" version = "0.12.1" @@ -1447,27 +1425,36 @@ version = "0.11.1" dependencies = [ "anyhow", "async-trait", - "bytes", "chrono", "clap 3.2.23", "console", "env_logger", "fs-err", "futures", - "futures-lite", "glob", - "humantime", "itertools", + "quick-junit", + "rand", + "sqllogictest", + "sqllogictest-engines", + "tokio", +] + +[[package]] +name = "sqllogictest-engines" +version = "0.11.1" +dependencies = [ + "async-trait", + "bytes", + "chrono", + "futures", "log", "pg_interval", "postgres-types", - "quick-junit", - "rand", "rust_decimal", "serde", "serde_json", "sqllogictest", - "tempfile", "thiserror", "tokio", "tokio-postgres", @@ -1530,16 +1517,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "terminal_size" -version = "0.1.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "633c1a546cee861a1a6d0dc69ebeca693bf4296661ba7852b9d21d159e0506df" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "test_dir_escape" version = "0.1.0" @@ -1770,12 +1747,6 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" -[[package]] -name = "waker-fn" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" - [[package]] name = "wasi" version = "0.10.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 0f2e84b..99ea3dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["sqllogictest", "sqllogictest-bin", "examples/*", "tests"] +members = ["examples/*", "sqllogictest", "sqllogictest-bin", "sqllogictest-engines", "tests"] [workspace.package] version = "0.11.1" diff --git a/sqllogictest-bin/Cargo.toml b/sqllogictest-bin/Cargo.toml index a801179..c1c8a82 100644 --- a/sqllogictest-bin/Cargo.toml +++ b/sqllogictest-bin/Cargo.toml @@ -11,27 +11,17 @@ description = "Sqllogictest CLI." [dependencies] anyhow = { version = "1" } async-trait = "0.1" -bytes = "1" chrono = { version = "0.4" } clap = { version = "3", features = ["derive", "env"] } console = { version = "0.15" } env_logger = { version = "0.9" } futures = { version = "0.3", default-features = false } -futures-lite = "1" glob = "0.3" -humantime = "2" itertools = "0.10" -log = "0.4" -postgres-types = { version = "0.2.3", features = ["derive", "with-chrono-0_4"] } -pg_interval = "0.4" quick-junit = { version = "0.2" } rand = "0.8" -rust_decimal = { version = "1.7.0", features = ["tokio-pg"] } -serde = { version = "1", features = ["derive"] } -serde_json = "1" sqllogictest = { path = "../sqllogictest", version = "0.11" } -tempfile = "3" -thiserror = "1" +sqllogictest-engines = { path = "../sqllogictest-engines", version = "0.11" } tokio = { version = "1", features = [ "rt", "rt-multi-thread", @@ -40,6 +30,4 @@ tokio = { version = "1", features = [ "fs", "process", ] } -tokio-postgres = { version = "0.7" } -tokio-util = { version = "0.7", features = ["codec"] } fs-err = "2.9.0" diff --git a/sqllogictest-bin/src/engines/mod.rs b/sqllogictest-bin/src/engines.rs similarity index 74% rename from sqllogictest-bin/src/engines/mod.rs rename to sqllogictest-bin/src/engines.rs index 9aee701..12e0c21 100644 --- a/sqllogictest-bin/src/engines/mod.rs +++ b/sqllogictest-bin/src/engines.rs @@ -1,16 +1,12 @@ -mod postgres; -use clap::ArgEnum; -use postgres::Postgres; -use tokio::process::Command; -mod postgres_extended; use std::fmt::Display; -mod external; use async_trait::async_trait; -use postgres_extended::PostgresExtended; +use clap::ArgEnum; use sqllogictest::{AsyncDB, DBOutput}; +use sqllogictest_engines::external::ExternalDriver; +use sqllogictest_engines::postgres::{PostgresConfig, PostgresExtended, PostgresSimple}; +use tokio::process::Command; -use self::external::ExternalDriver; use super::{DBConfig, Result}; #[derive(Copy, Clone, Debug, PartialEq, Eq, ArgEnum)] @@ -28,16 +24,32 @@ pub enum EngineConfig { } enum Engines { - Postgres(Postgres), + Postgres(PostgresSimple), PostgresExtended(PostgresExtended), External(ExternalDriver), } +impl From<&DBConfig> for PostgresConfig { + fn from(config: &DBConfig) -> Self { + let (host, port) = config.random_addr(); + + let mut pg_config = PostgresConfig::new(); + pg_config + .host(host) + .port(port) + .dbname(&config.db) + .user(&config.user) + .password(&config.pass); + + pg_config + } +} + pub(super) async fn connect(engine: &EngineConfig, config: &DBConfig) -> Result { Ok(match engine { - EngineConfig::Postgres => Engines::Postgres(Postgres::connect(config).await?), + EngineConfig::Postgres => Engines::Postgres(PostgresSimple::connect(config.into()).await?), EngineConfig::PostgresExtended => { - Engines::PostgresExtended(PostgresExtended::connect(config).await?) + Engines::PostgresExtended(PostgresExtended::connect(config.into()).await?) } EngineConfig::External(cmd_tmpl) => { let (host, port) = config.random_addr(); @@ -48,7 +60,7 @@ pub(super) async fn connect(engine: &EngineConfig, config: &DBConfig) -> Result< .replace("{user}", &config.user) .replace("{pass}", &config.pass); let mut cmd = Command::new("bash"); - let cmd = cmd.args(["-c", &cmd_str]); + cmd.args(["-c", &cmd_str]); Engines::External(ExternalDriver::connect(cmd).await?) } }) diff --git a/sqllogictest-engines/Cargo.toml b/sqllogictest-engines/Cargo.toml new file mode 100644 index 0000000..2c0cfc8 --- /dev/null +++ b/sqllogictest-engines/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "sqllogictest-engines" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +description = "Sqllogictest built-in engines." + +[dependencies] +async-trait = "0.1" +bytes = "1" +chrono = { version = "0.4" } +futures = { version = "0.3", default-features = false } +log = "0.4" +pg_interval = "0.4" +postgres-types = { version = "0.2.3", features = ["derive", "with-chrono-0_4"] } +rust_decimal = { version = "1.7.0", features = ["tokio-pg"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +sqllogictest = { path = "../sqllogictest", version = "0.11" } +thiserror = "1" +tokio = { version = "1", features = [ + "rt", + "rt-multi-thread", + "sync", + "macros", + "fs", + "process", +] } +tokio-postgres = { version = "0.7" } +tokio-util = { version = "0.7", features = ["codec"] } diff --git a/sqllogictest-bin/src/engines/external.rs b/sqllogictest-engines/src/external.rs similarity index 96% rename from sqllogictest-bin/src/engines/external.rs rename to sqllogictest-engines/src/external.rs index 7f0c843..377efe5 100644 --- a/sqllogictest-bin/src/engines/external.rs +++ b/sqllogictest-engines/src/external.rs @@ -64,7 +64,8 @@ pub enum ExternalDriverError { type Result = std::result::Result; impl ExternalDriver { - pub async fn connect(cmd: &mut Command) -> Result { + /// Spawn and pipe into the subprocess with the given `cmd`. + pub async fn connect(mut cmd: Command) -> Result { let cmd = cmd.stdin(Stdio::piped()).stdout(Stdio::piped()); let mut child = cmd.spawn()?; diff --git a/sqllogictest-engines/src/lib.rs b/sqllogictest-engines/src/lib.rs new file mode 100644 index 0000000..49a26a4 --- /dev/null +++ b/sqllogictest-engines/src/lib.rs @@ -0,0 +1,2 @@ +pub mod external; +pub mod postgres; diff --git a/sqllogictest-engines/src/postgres.rs b/sqllogictest-engines/src/postgres.rs new file mode 100644 index 0000000..9db8835 --- /dev/null +++ b/sqllogictest-engines/src/postgres.rs @@ -0,0 +1,60 @@ +mod extended; +mod simple; + +use std::marker::PhantomData; +use std::sync::Arc; + +use tokio::task::JoinHandle; + +type Result = std::result::Result; + +/// Marker type for the Postgres simple query protocol. +pub struct Simple; +/// Marker type for the Postgres extended query protocol. +pub struct Extended; + +/// Generic Postgres engine based on the client from [`tokio_postgres`]. The protocol `P` can be +/// either [`Simple`] or [`Extended`]. +pub struct Postgres

{ + client: Arc, + join_handle: JoinHandle<()>, + _protocol: PhantomData

, +} + +/// Postgres engine using the simple query protocol. +pub type PostgresSimple = Postgres; +/// Postgres engine using the extended query protocol. +pub type PostgresExtended = Postgres; + +/// Connection configuration. This is a re-export of [`tokio_postgres::Config`]. +pub type PostgresConfig = tokio_postgres::Config; + +impl

Postgres

{ + /// Connects to the Postgres server with the given `config`. + pub async fn connect(config: PostgresConfig) -> Result { + let (client, connection) = config.connect(tokio_postgres::NoTls).await?; + + let join_handle = tokio::spawn(async move { + if let Err(e) = connection.await { + log::error!("Postgres connection error: {:?}", e); + } + }); + + Ok(Self { + client: Arc::new(client), + join_handle, + _protocol: PhantomData, + }) + } + + /// Returns a reference of the inner Postgres client. + pub fn pg_client(&self) -> &tokio_postgres::Client { + &self.client + } +} + +impl

Drop for Postgres

{ + fn drop(&mut self) { + self.join_handle.abort() + } +} diff --git a/sqllogictest-bin/src/engines/postgres_extended.rs b/sqllogictest-engines/src/postgres/extended.rs similarity index 90% rename from sqllogictest-bin/src/engines/postgres_extended.rs rename to sqllogictest-engines/src/postgres/extended.rs index e0b5366..9b2d839 100644 --- a/sqllogictest-bin/src/engines/postgres_extended.rs +++ b/sqllogictest-engines/src/postgres/extended.rs @@ -1,54 +1,14 @@ use std::fmt::Write; -use std::sync::Arc; +use std::time::Duration; -use anyhow::Context; use async_trait::async_trait; use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime}; use pg_interval::Interval; use postgres_types::Type; use rust_decimal::Decimal; use sqllogictest::{ColumnType, DBOutput}; -use tokio::task::JoinHandle; -use crate::{DBConfig, Result}; - -pub struct PostgresExtended { - client: Arc, - join_handle: JoinHandle<()>, -} - -impl PostgresExtended { - pub(super) async fn connect(config: &DBConfig) -> Result { - let (host, port) = config.random_addr(); - - let (client, connection) = tokio_postgres::Config::new() - .host(host) - .port(port) - .dbname(&config.db) - .user(&config.user) - .password(&config.pass) - .connect(tokio_postgres::NoTls) - .await - .context(format!("failed to connect to postgres at {host}:{port}"))?; - - let join_handle = tokio::spawn(async move { - if let Err(e) = connection.await { - log::error!("PostgresExtended connection error: {:?}", e); - } - }); - - Ok(Self { - client: Arc::new(client), - join_handle, - }) - } -} - -impl Drop for PostgresExtended { - fn drop(&mut self) { - self.join_handle.abort() - } -} +use super::{Extended, Postgres, Result}; macro_rules! array_process { ($row:ident, $row_vec:ident, $idx:ident, $t:ty) => { @@ -219,10 +179,10 @@ fn float8_to_str(value: &f64) -> String { } #[async_trait] -impl sqllogictest::AsyncDB for PostgresExtended { +impl sqllogictest::AsyncDB for Postgres { type Error = tokio_postgres::error::Error; - async fn run(&mut self, sql: &str) -> Result { + async fn run(&mut self, sql: &str) -> Result { let mut output = vec![]; let is_query_sql = { @@ -358,4 +318,8 @@ impl sqllogictest::AsyncDB for PostgresExtended { fn engine_name(&self) -> &str { "postgres-extended" } + + async fn sleep(dur: Duration) { + tokio::time::sleep(dur).await + } } diff --git a/sqllogictest-bin/src/engines/postgres_extended_test.slt b/sqllogictest-engines/src/postgres/postgres_extended_test.slt similarity index 100% rename from sqllogictest-bin/src/engines/postgres_extended_test.slt rename to sqllogictest-engines/src/postgres/postgres_extended_test.slt diff --git a/sqllogictest-bin/src/engines/postgres.rs b/sqllogictest-engines/src/postgres/simple.rs similarity index 61% rename from sqllogictest-bin/src/engines/postgres.rs rename to sqllogictest-engines/src/postgres/simple.rs index 0009e46..7c2e5fd 100644 --- a/sqllogictest-bin/src/engines/postgres.rs +++ b/sqllogictest-engines/src/postgres/simple.rs @@ -1,55 +1,15 @@ -use std::sync::Arc; +use std::time::Duration; -use anyhow::Context; use async_trait::async_trait; use sqllogictest::{ColumnType, DBOutput}; -use tokio::task::JoinHandle; -use crate::{DBConfig, Result}; - -pub struct Postgres { - client: Arc, - join_handle: JoinHandle<()>, -} - -impl Postgres { - pub(super) async fn connect(config: &DBConfig) -> Result { - let (host, port) = config.random_addr(); - - let (client, connection) = tokio_postgres::Config::new() - .host(host) - .port(port) - .dbname(&config.db) - .user(&config.user) - .password(&config.pass) - .connect(tokio_postgres::NoTls) - .await - .context(format!("failed to connect to postgres at {host}:{port}"))?; - - let join_handle = tokio::spawn(async move { - if let Err(e) = connection.await { - log::error!("Postgres connection error: {:?}", e); - } - }); - - Ok(Self { - client: Arc::new(client), - join_handle, - }) - } -} - -impl Drop for Postgres { - fn drop(&mut self) { - self.join_handle.abort() - } -} +use super::{Postgres, Result, Simple}; #[async_trait] -impl sqllogictest::AsyncDB for Postgres { +impl sqllogictest::AsyncDB for Postgres { type Error = tokio_postgres::error::Error; - async fn run(&mut self, sql: &str) -> Result { + async fn run(&mut self, sql: &str) -> Result { let mut output = vec![]; // NOTE: @@ -99,4 +59,8 @@ impl sqllogictest::AsyncDB for Postgres { fn engine_name(&self) -> &str { "postgres" } + + async fn sleep(dur: Duration) { + tokio::time::sleep(dur).await + } }