From 2c68f3129b404816b09c809b9ed9a99e83fdddda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alja=C5=BE=20Mur=20Er=C5=BEen?= Date: Fri, 10 Jan 2025 16:35:07 +0100 Subject: [PATCH] StandaloneTransaction --- edgedb-tokio/Cargo.toml | 2 +- edgedb-tokio/examples/transaction.rs | 2 +- edgedb-tokio/examples/transaction_errors.rs | 2 +- edgedb-tokio/src/client.rs | 62 ++++- edgedb-tokio/src/lib.rs | 2 +- edgedb-tokio/src/query_executor.rs | 4 +- edgedb-tokio/src/transaction.rs | 250 ++++++++++++++------ edgedb-tokio/tests/func/dbschema/test.esdl | 4 + edgedb-tokio/tests/func/transactions.rs | 46 +++- 9 files changed, 282 insertions(+), 92 deletions(-) diff --git a/edgedb-tokio/Cargo.toml b/edgedb-tokio/Cargo.toml index db712f3c..b8fcf366 100644 --- a/edgedb-tokio/Cargo.toml +++ b/edgedb-tokio/Cargo.toml @@ -16,7 +16,7 @@ edgedb-protocol = { path = "../edgedb-protocol", version = "0.6.0", features = [ ] } edgedb-errors = { path = "../edgedb-errors", version = "0.4.1" } edgedb-derive = { path = "../edgedb-derive", version = "0.5.1", optional = true } -tokio = { version = "1.15", features = ["net", "time", "sync", "macros"] } +tokio = { version = "1.15", features = ["net", "time", "sync", "macros", "rt"] } bytes = "1.5.0" scram = { version = "0.7", package = "scram-2" } serde = { version = "1.0", features = ["derive"] } diff --git a/edgedb-tokio/examples/transaction.rs b/edgedb-tokio/examples/transaction.rs index 4032f45f..3d71a290 100644 --- a/edgedb-tokio/examples/transaction.rs +++ b/edgedb-tokio/examples/transaction.rs @@ -3,7 +3,7 @@ async fn main() -> anyhow::Result<()> { env_logger::init(); let conn = edgedb_tokio::create_client().await?; let val = conn - .transaction(|mut transaction| async move { + .within_transaction(|mut transaction| async move { transaction .query_required_single::( "SELECT (UPDATE Counter SET { value := .value + 1}).value LIMIT 1", diff --git a/edgedb-tokio/examples/transaction_errors.rs b/edgedb-tokio/examples/transaction_errors.rs index 5f7571c1..e3489e9d 100644 --- a/edgedb-tokio/examples/transaction_errors.rs +++ b/edgedb-tokio/examples/transaction_errors.rs @@ -30,7 +30,7 @@ async fn main() -> anyhow::Result<()> { env_logger::init(); let conn = edgedb_tokio::create_client().await?; let res = conn - .transaction(|mut transaction| async move { + .within_transaction(|mut transaction| async move { let val = transaction .query_required_single::( " diff --git a/edgedb-tokio/src/client.rs b/edgedb-tokio/src/client.rs index 957944e6..1bfd52ca 100644 --- a/edgedb-tokio/src/client.rs +++ b/edgedb-tokio/src/client.rs @@ -16,7 +16,7 @@ use crate::raw::{Options, PoolState, Response}; use crate::raw::{Pool, QueryCapabilities}; use crate::state::{AliasesDelta, ConfigDelta, GlobalsDelta}; use crate::state::{AliasesModifier, ConfigModifier, Fn, GlobalsModifier}; -use crate::transaction::{transaction, Transaction}; +use crate::transaction::{RetryingTransaction, StandaloneTransaction}; use crate::ResultVerbose; /// The EdgeDB Client. @@ -374,13 +374,18 @@ impl Client { } } - /// Execute a transaction + /// Execute a transaction and retry. /// /// Transaction body must be encompassed in the closure. The closure **may /// be executed multiple times**. This includes not only database queries /// but also executing the whole function, so the transaction code must be /// prepared to be idempotent. /// + /// # Commit and rollback + /// + /// When the closure returns, the transaction is commit. Transaction cannot + /// be rolled back explicitly. + /// /// # Returning custom errors /// /// See [this example](https://github.com/edgedb/edgedb-rust/blob/master/edgedb-tokio/examples/transaction_errors.rs) @@ -397,24 +402,65 @@ impl Client { /// # Example /// /// ```rust,no_run - /// # async fn transaction() -> Result<(), edgedb_tokio::Error> { + /// # async fn main_() -> Result<(), edgedb_tokio::Error> { /// let conn = edgedb_tokio::create_client().await?; - /// let val = conn.transaction(|mut tx| async move { + /// let val = conn.within_transaction(|mut tx| async move { /// tx.query_required_single::(" /// WITH C := UPDATE Counter SET { value := .value + 1} /// SELECT C.value LIMIT 1 - /// ", &() + /// ", &() /// ).await /// }).await?; /// # Ok(()) /// # } /// ``` - pub async fn transaction(&self, body: B) -> Result + pub async fn within_transaction(&self, body: B) -> Result where - B: FnMut(Transaction) -> F, + B: FnMut(RetryingTransaction) -> F, F: Future>, { - transaction(&self.pool, self.options.clone(), body).await + crate::transaction::run_and_retry(&self.pool, self.options.clone(), body).await + } + + /// Start a transaction. + /// + /// Returns [StandaloneTransaction] which implements [crate::QueryExecutor] and can + /// be used to execute queries within the transaction. + /// + /// The transaction will never retry failed queries, even if the database signals that the + /// query should be retried. For this reason, it is recommended to use [Client::within_transaction] + /// when possible. + /// + ///
+ /// Transactions can fail for benign reasons and should always handle that case gracefully. + /// `StandaloneTransaction` does not provide any retry mechanisms, so this responsibility falls + /// onto the user. For example, even only two select queries in a transaction can fail due to + /// concurrent modification of the database. + ///
+ /// + /// # Commit and rollback + /// + /// To commit the changes made during the transaction, + /// [commit](crate::StandaloneTransaction::commit) method must be called, otherwise the + /// transaction will roll back when [StandaloneTransaction] is dropped. + /// + /// # Example + /// + /// ```rust,no_run + /// # async fn main_() -> Result<(), edgedb_tokio::Error> { + /// let conn = edgedb_tokio::create_client().await?; + /// let mut tx = conn.transaction().await?; + /// tx.query_required_single::(" + /// WITH C := UPDATE Counter SET { value := .value + 1} + /// SELECT C.value LIMIT 1 + /// ", &() + /// ).await; + /// tx.commit().await; + /// # Ok(()) + /// # } + /// ``` + pub async fn transaction(&self) -> Result { + crate::transaction::start(&self.pool, self.options.clone()).await } /// Returns client with adjusted options for future transactions. diff --git a/edgedb-tokio/src/lib.rs b/edgedb-tokio/src/lib.rs index 3f75f8f8..07a97e04 100644 --- a/edgedb-tokio/src/lib.rs +++ b/edgedb-tokio/src/lib.rs @@ -151,7 +151,7 @@ pub use errors::Error; pub use options::{RetryCondition, RetryOptions, TransactionOptions}; pub use query_executor::{QueryExecutor, ResultVerbose}; pub use state::{ConfigDelta, GlobalsDelta}; -pub use transaction::Transaction; +pub use transaction::{RetryingTransaction, StandaloneTransaction, Transaction}; /// The ordered list of project filenames supported. pub const PROJECT_FILES: &[&str] = &["gel.toml", "edgedb.toml"]; diff --git a/edgedb-tokio/src/query_executor.rs b/edgedb-tokio/src/query_executor.rs index 0aa01901..8d3b400b 100644 --- a/edgedb-tokio/src/query_executor.rs +++ b/edgedb-tokio/src/query_executor.rs @@ -29,7 +29,7 @@ pub trait QueryExecutor: Sized { A: QueryArgs, R: QueryResult + Send; - /// see [Client::query_with_warnings] + /// see [Client::query_verbose] fn query_verbose( self, query: impl AsRef + Send, @@ -171,7 +171,7 @@ impl QueryExecutor for &Client { } } -impl QueryExecutor for &mut Transaction { +impl> QueryExecutor for &mut T { fn query( self, query: impl AsRef + Send, diff --git a/edgedb-tokio/src/transaction.rs b/edgedb-tokio/src/transaction.rs index 8b7e6d1d..e68a4e44 100644 --- a/edgedb-tokio/src/transaction.rs +++ b/edgedb-tokio/src/transaction.rs @@ -10,93 +10,172 @@ use edgedb_protocol::QueryResult; use tokio::sync::oneshot; use tokio::time::sleep; -use crate::errors::ClientError; use crate::errors::{Error, ErrorKind, SHOULD_RETRY}; use crate::errors::{NoDataError, ProtocolEncodingError}; use crate::raw::{Options, Pool, PoolConnection, Response}; use crate::ResultVerbose; -/// Transaction object passed to the closure via -/// [`Client::transaction()`](crate::Client::transaction) method +/// A representation of a transaction. /// -/// The Transaction object must be dropped by the end of the closure execution. +/// It can be obtained in two flavors: +/// - [`RetryingTransaction`] from [`Client::within_transaction()`](crate::Client::within_transaction), +/// - [`StandaloneTransaction`] from [`Client::transaction()`](crate::Client::transaction). /// -/// All database queries in transaction should be executed using methods on -/// this object instead of using original [`Client`](crate::Client) instance. +/// Implements all query & execute functions as [Client](crate::Client) as well as +/// [QueryExecutor](crate::QueryExecutor). #[derive(Debug)] pub struct Transaction { - iteration: u32, options: Arc, - inner: Option, + conn: PoolConnection, + + started: bool, } +/// Transaction object returned by [`Client::transaction()`](crate::Client::transaction) method. +/// +/// When this object is dropped, the transaction will implicitly roll back. +/// Use [commit](StandaloneTransaction::commit) method to commit the changes made in the transaction. +/// +/// All database queries in transaction should be executed using methods on +/// this object instead of using original [`Client`](crate::Client) instance. #[derive(Debug)] -pub struct TransactionResult { - conn: PoolConnection, - started: bool, +pub struct StandaloneTransaction { + inner: Option, +} + +impl StandaloneTransaction { + pub async fn commit(mut self) -> Result<(), Error> { + if let Some(tran) = self.inner.take() { + tran.commit().await + } else { + log::trace!("standalone transaction done, noop commit"); + Ok(()) + } + } + + pub async fn rollback(mut self) -> Result<(), Error> { + if let Some(tran) = self.inner.take() { + tran.rollback().await + } else { + log::trace!("standalone transaction done, noop commit"); + Ok(()) + } + } +} + +impl std::ops::Deref for StandaloneTransaction { + type Target = Transaction; + + fn deref(&self) -> &Self::Target { + self.inner.as_ref().unwrap() + } } +impl std::ops::DerefMut for StandaloneTransaction { + fn deref_mut(&mut self) -> &mut Self::Target { + self.inner.as_mut().unwrap() + } +} + +impl Drop for StandaloneTransaction { + fn drop(&mut self) { + if let Some(tran) = self.inner.take() { + tokio::task::spawn(tran.rollback()); + } + } +} + +pub(crate) async fn start( + pool: &Pool, + options: Arc, +) -> Result { + let conn = pool.acquire().await?; + + Ok(StandaloneTransaction { + inner: Some(Transaction::new(options, conn)), + }) +} + +/// Transaction object passed to the closure via +/// [`Client::within_transaction()`](crate::Client::within_transaction) method. +/// +/// This object must be dropped by the end of the closure execution. +/// +/// All database queries in transaction should be executed using methods on +/// this object instead of using original [`Client`](crate::Client) instance. #[derive(Debug)] -pub struct Inner { - started: bool, - conn: PoolConnection, - return_conn: oneshot::Sender, +pub struct RetryingTransaction { + inner: Option, + iteration: u32, + result_tx: Option>, } -impl Drop for Transaction { +impl RetryingTransaction { + /// Zero-based iteration (attempt) number for the current transaction + /// + /// First attempt gets `iteration = 0`, second attempt `iteration = 1`, + /// etc. + pub fn iteration(&self) -> u32 { + self.iteration + } +} + +impl std::ops::Deref for RetryingTransaction { + type Target = Transaction; + + fn deref(&self) -> &Self::Target { + self.inner.as_ref().unwrap() + } +} + +impl std::ops::DerefMut for RetryingTransaction { + fn deref_mut(&mut self) -> &mut Self::Target { + self.inner.as_mut().unwrap() + } +} + +impl Drop for RetryingTransaction { fn drop(&mut self) { - self.inner.take().map( - |Inner { - started, - conn, - return_conn, - }| { return_conn.send(TransactionResult { started, conn }).ok() }, - ); + let tran = self.inner.take().unwrap(); + self.result_tx.take().unwrap().send(tran).ok(); } } -pub(crate) async fn transaction( +pub(crate) async fn run_and_retry( pool: &Pool, options: Arc, mut body: B, ) -> Result where - B: FnMut(Transaction) -> F, + B: FnMut(RetryingTransaction) -> F, F: Future>, { let mut iteration = 0; 'transaction: loop { let conn = pool.acquire().await?; + let tran = Transaction::new(options.clone(), conn); + let (tx, mut rx) = oneshot::channel(); - let tran = Transaction { + + let tran = RetryingTransaction { + inner: Some(tran), iteration, - options: options.clone(), - inner: Some(Inner { - started: false, - conn, - return_conn: tx, - }), + result_tx: Some(tx), }; let result = body(tran).await; - let TransactionResult { mut conn, started } = rx.try_recv().expect( + let tran = rx.try_recv().expect( "Transaction object must \ be dropped by the time transaction body finishes.", ); match result { Ok(val) => { log::debug!("Comitting transaction"); - if started { - conn.statement("COMMIT", &options.state, &options.annotations) - .await?; - } + tran.commit().await?; return Ok(val); } Err(outer) => { log::debug!("Rolling back transaction on error"); - if started { - conn.statement("ROLLBACK", &options.state, &options.annotations) - .await?; - } + tran.rollback().await?; let some_retry = outer.chain().find_map(|e| { e.downcast_ref::().and_then(|e| { @@ -127,31 +206,52 @@ where } } -fn assert_transaction(x: &mut Option) -> &mut PoolConnection { - &mut x.as_mut().expect("transaction object is dropped").conn -} - impl Transaction { - /// Zero-based iteration (attempt) number for the current transaction - /// - /// First attempt gets `iteration = 0`, second attempt `iteration = 1`, - /// etc. - pub fn iteration(&self) -> u32 { - self.iteration + fn new(options: Arc, conn: PoolConnection) -> Self { + Transaction { + options, + conn, + started: false, + } } + async fn ensure_started(&mut self) -> anyhow::Result<(), Error> { - if let Some(inner) = &mut self.inner { - if !inner.started { - let options = &self.options; - inner - .conn - .statement("START TRANSACTION", &options.state, &options.annotations) - .await?; - inner.started = true; - } + if !self.started { + let options = &self.options; + self.conn + .statement("START TRANSACTION", &options.state, &options.annotations) + .await?; + self.started = true; + } + Ok(()) + } + + async fn commit(mut self) -> anyhow::Result<(), Error> { + if !self.started { + log::trace!("transaction was never started, noop commit"); return Ok(()); } - Err(ClientError::with_message("using transaction after drop")) + + log::trace!("commit"); + let options = &self.options; + self.conn + .statement("COMMIT", &options.state, &options.annotations) + .await?; + Ok(()) + } + + async fn rollback(mut self) -> anyhow::Result<(), Error> { + if !self.started { + log::trace!("transaction was never started, noop commit"); + return Ok(()); + } + + log::trace!("rollback"); + let options = &self.options; + self.conn + .statement("ROLLBACK", &options.state, &options.annotations) + .await?; + Ok(()) } async fn query_helper( @@ -167,9 +267,8 @@ impl Transaction { { self.ensure_started().await?; - let conn = assert_transaction(&mut self.inner); - - conn.inner() + self.conn + .inner() .query( query.as_ref(), arguments, @@ -394,8 +493,8 @@ impl Transaction { expected_cardinality: Cardinality::Many, }; let state = &self.options.state; - let conn = assert_transaction(&mut self.inner); - let desc = conn + let desc = self + .conn .parse(&flags, query, state, &self.options.annotations) .await?; let inp_desc = desc.input().map_err(ProtocolEncodingError::with_source)?; @@ -406,15 +505,16 @@ impl Transaction { &mut arg_buf, ))?; - conn.execute( - &flags, - query, - state, - &self.options.annotations, - &desc, - &arg_buf.freeze(), - ) - .await?; + self.conn + .execute( + &flags, + query, + state, + &self.options.annotations, + &desc, + &arg_buf.freeze(), + ) + .await?; Ok(()) } } diff --git a/edgedb-tokio/tests/func/dbschema/test.esdl b/edgedb-tokio/tests/func/dbschema/test.esdl index 91123edf..cbb35532 100644 --- a/edgedb-tokio/tests/func/dbschema/test.esdl +++ b/edgedb-tokio/tests/func/dbschema/test.esdl @@ -19,4 +19,8 @@ module test { required sent_at: datetime; confirmed_at: datetime; } + + type X { + a: str; + } } diff --git a/edgedb-tokio/tests/func/transactions.rs b/edgedb-tokio/tests/func/transactions.rs index 12b3b3b0..b0c3977c 100644 --- a/edgedb-tokio/tests/func/transactions.rs +++ b/edgedb-tokio/tests/func/transactions.rs @@ -31,7 +31,7 @@ async fn transaction1( lock: Arc>, ) -> anyhow::Result { let val = client - .transaction(|mut tx| { + .within_transaction(|mut tx| { let lock = lock.clone(); let iterations = iterations.clone(); let barrier = barrier.clone(); @@ -118,7 +118,7 @@ async fn transaction1e( lock: Arc>, ) -> anyhow::Result { let val = client - .transaction(|mut tx| { + .within_transaction(|mut tx| { let lock = lock.clone(); let iterations = iterations.clone(); let barrier = barrier.clone(); @@ -163,7 +163,7 @@ async fn transaction_conflict_with_complex_err() -> anyhow::Result<()> { async fn queries() -> anyhow::Result<()> { let client = Client::new(&SERVER.config); client - .transaction(|mut tx| async move { + .within_transaction(|mut tx| async move { let value = tx.query::("SELECT 7*93", &()).await?; assert_eq!(value, vec![651]); @@ -206,3 +206,43 @@ async fn queries() -> anyhow::Result<()> { .await?; Ok(()) } + +#[tokio::test] +async fn standalone_01() -> anyhow::Result<()> { + let client = Client::new(&SERVER.config); + + let mut tx = client.transaction().await.unwrap(); + + let value = tx.query::("SELECT 7*93", &()).await?; + assert_eq!(value, vec![651]); + + tx.execute("SELECT 1+1", &()).await?; + + tx.commit().await?; + + Ok(()) +} + +#[tokio::test] +async fn standalone_02() -> anyhow::Result<()> { + let client = Client::new(&SERVER.config).with_default_module(Some("test")); + + { + let mut tx = client.transaction().await.unwrap(); + + tx.execute("insert X { a := $0 }", &("hello",)).await?; + + let a = tx + .query_single::("select X.a limit 1", &()) + .await?; + assert_eq!(a, Some("hello".to_string())); + + // no commit + } + + let a = client + .query_single::("select X.a limit 1", &()) + .await?; + assert_eq!(a, None); + Ok(()) +}