Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: expose built-in engines to sqllogictest-engines crate #154

Merged
merged 7 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 18 additions & 47 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
members = ["sqllogictest", "sqllogictest-bin", "examples/*", "tests"]
members = ["examples/*", "sqllogictest", "sqllogictest-bin", "sqllogictest-engines", "tests"]

[workspace.package]
version = "0.11.1"
Expand Down
14 changes: 1 addition & 13 deletions sqllogictest-bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,17 @@ description = "Sqllogictest CLI."
[dependencies]
anyhow = { version = "1" }
async-trait = "0.1"
bytes = "1"
chrono = { version = "0.4" }
clap = { version = "3", features = ["derive", "env"] }
console = { version = "0.15" }
env_logger = { version = "0.9" }
futures = { version = "0.3", default-features = false }
futures-lite = "1"
glob = "0.3"
humantime = "2"
itertools = "0.10"
log = "0.4"
postgres-types = { version = "0.2.3", features = ["derive", "with-chrono-0_4"] }
pg_interval = "0.4"
quick-junit = { version = "0.2" }
rand = "0.8"
rust_decimal = { version = "1.7.0", features = ["tokio-pg"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sqllogictest = { path = "../sqllogictest", version = "0.11" }
tempfile = "3"
thiserror = "1"
sqllogictest-engines = { path = "../sqllogictest-engines", version = "0.11" }
tokio = { version = "1", features = [
"rt",
"rt-multi-thread",
Expand All @@ -40,6 +30,4 @@ tokio = { version = "1", features = [
"fs",
"process",
] }
tokio-postgres = { version = "0.7" }
tokio-util = { version = "0.7", features = ["codec"] }
fs-err = "2.9.0"
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
mod postgres;
use clap::ArgEnum;
use postgres::Postgres;
use tokio::process::Command;
mod postgres_extended;
use std::fmt::Display;
mod external;

use async_trait::async_trait;
use postgres_extended::PostgresExtended;
use clap::ArgEnum;
use sqllogictest::{AsyncDB, DBOutput};
use sqllogictest_engines::external::ExternalDriver;
use sqllogictest_engines::postgres::{PostgresConfig, PostgresExtended, PostgresSimple};
use tokio::process::Command;

use self::external::ExternalDriver;
use super::{DBConfig, Result};

#[derive(Copy, Clone, Debug, PartialEq, Eq, ArgEnum)]
Expand All @@ -28,16 +24,32 @@ pub enum EngineConfig {
}

enum Engines {
Postgres(Postgres),
Postgres(PostgresSimple),
PostgresExtended(PostgresExtended),
External(ExternalDriver),
}

impl From<&DBConfig> for PostgresConfig {
fn from(config: &DBConfig) -> Self {
let (host, port) = config.random_addr();

let mut pg_config = PostgresConfig::new();
pg_config
.host(host)
.port(port)
.dbname(&config.db)
.user(&config.user)
.password(&config.pass);

pg_config
}
}

pub(super) async fn connect(engine: &EngineConfig, config: &DBConfig) -> Result<impl AsyncDB> {
Ok(match engine {
EngineConfig::Postgres => Engines::Postgres(Postgres::connect(config).await?),
EngineConfig::Postgres => Engines::Postgres(PostgresSimple::connect(config.into()).await?),
EngineConfig::PostgresExtended => {
Engines::PostgresExtended(PostgresExtended::connect(config).await?)
Engines::PostgresExtended(PostgresExtended::connect(config.into()).await?)
}
EngineConfig::External(cmd_tmpl) => {
let (host, port) = config.random_addr();
Expand All @@ -48,7 +60,7 @@ pub(super) async fn connect(engine: &EngineConfig, config: &DBConfig) -> Result<
.replace("{user}", &config.user)
.replace("{pass}", &config.pass);
let mut cmd = Command::new("bash");
let cmd = cmd.args(["-c", &cmd_str]);
cmd.args(["-c", &cmd_str]);
Engines::External(ExternalDriver::connect(cmd).await?)
}
})
Expand Down
33 changes: 33 additions & 0 deletions sqllogictest-engines/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
name = "sqllogictest-engines"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
repository = { workspace = true }
description = "Sqllogictest built-in engines."

[dependencies]
async-trait = "0.1"
bytes = "1"
chrono = { version = "0.4" }
futures = { version = "0.3", default-features = false }
log = "0.4"
pg_interval = "0.4"
postgres-types = { version = "0.2.3", features = ["derive", "with-chrono-0_4"] }
rust_decimal = { version = "1.7.0", features = ["tokio-pg"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sqllogictest = { path = "../sqllogictest", version = "0.11" }
thiserror = "1"
tokio = { version = "1", features = [
"rt",
"rt-multi-thread",
"sync",
"macros",
"fs",
"process",
] }
tokio-postgres = { version = "0.7" }
tokio-util = { version = "0.7", features = ["codec"] }
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ pub enum ExternalDriverError {
type Result<T> = std::result::Result<T, ExternalDriverError>;

impl ExternalDriver {
pub async fn connect(cmd: &mut Command) -> Result<Self> {
/// Spawn and pipe into the subprocess with the given `cmd`.
pub async fn connect(mut cmd: Command) -> Result<Self> {
let cmd = cmd.stdin(Stdio::piped()).stdout(Stdio::piped());

let mut child = cmd.spawn()?;
Expand Down
2 changes: 2 additions & 0 deletions sqllogictest-engines/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod external;
pub mod postgres;
60 changes: 60 additions & 0 deletions sqllogictest-engines/src/postgres.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
mod extended;
mod simple;

use std::marker::PhantomData;
use std::sync::Arc;

use tokio::task::JoinHandle;

type Result<T> = std::result::Result<T, tokio_postgres::Error>;

/// Marker type for the Postgres simple query protocol.
pub struct Simple;
/// Marker type for the Postgres extended query protocol.
pub struct Extended;

/// Generic Postgres engine based on the client from [`tokio_postgres`]. The protocol `P` can be
/// either [`Simple`] or [`Extended`].
pub struct Postgres<P> {
client: Arc<tokio_postgres::Client>,
join_handle: JoinHandle<()>,
_protocol: PhantomData<P>,
}

/// Postgres engine using the simple query protocol.
pub type PostgresSimple = Postgres<Simple>;
/// Postgres engine using the extended query protocol.
pub type PostgresExtended = Postgres<Extended>;

/// Connection configuration. This is a re-export of [`tokio_postgres::Config`].
pub type PostgresConfig = tokio_postgres::Config;

impl<P> Postgres<P> {
/// Connects to the Postgres server with the given `config`.
pub async fn connect(config: PostgresConfig) -> Result<Self> {
let (client, connection) = config.connect(tokio_postgres::NoTls).await?;

let join_handle = tokio::spawn(async move {
if let Err(e) = connection.await {
log::error!("Postgres connection error: {:?}", e);
}
});

Ok(Self {
client: Arc::new(client),
join_handle,
_protocol: PhantomData,
})
}

/// Returns a reference of the inner Postgres client.
pub fn pg_client(&self) -> &tokio_postgres::Client {
&self.client
}
}

impl<P> Drop for Postgres<P> {
fn drop(&mut self) {
self.join_handle.abort()
}
}
Loading