Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raw transactions #373

Merged
merged 3 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 77 additions & 15 deletions gel-tokio/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::raw::{Options, PoolState, Response};
use crate::raw::{Pool, QueryCapabilities};
use crate::state::{AliasesDelta, ConfigDelta, GlobalsDelta};
use crate::state::{AliasesModifier, ConfigModifier, Fn, GlobalsModifier};
use crate::transaction::{transaction, Transaction};
use crate::transaction;
use crate::ResultVerbose;

/// Gel database client.
Expand Down Expand Up @@ -374,13 +374,49 @@ impl Client {
}
}

/// Execute a transaction
/// Execute a transaction and retry.
///
/// Transaction body must be encompassed in the closure. The closure **may
/// be executed multiple times**. This includes not only database queries
/// but also executing the whole function, so the transaction code must be
/// prepared to be idempotent.
///
/// # Example
///
/// ```rust,no_run
/// # async fn main_() -> Result<(), gel_tokio::Error> {
/// let conn = gel_tokio::create_client().await?;
/// let val = conn.transaction(|mut tx| async move {
/// tx.query_required_single::<i64, _>("
/// WITH C := UPDATE Counter SET { value := .value + 1}
/// SELECT C.value LIMIT 1
/// ", &()
/// ).await
/// }).await?;
/// # Ok(())
/// # }
/// ```
///
/// # Commit and rollback
///
/// If the closure returns [Result::Ok], the transaction is committed.
/// If the closure returns [Result::Err], the transaction is either retried or aborted,
/// depending on weather the error has `SHOULD_RETRY`` tag set.
///
/// To manually abort a transaction, [gel_errors::UserError] can be returned:
///
/// ```rust,no_run
/// use gel_errors::ErrorKind;
/// # async fn main_() -> Result<(), gel_tokio::Error> {
/// # let conn = gel_tokio::create_client().await?;
/// let val = conn.transaction(|mut tx| async move {
/// tx.execute("UPDATE Foo SET { x := 1 };", &()).await;
/// Err(gel_errors::UserError::build()) // abort transaction
/// }).await?;
/// # Ok(())
/// # }
/// ```
///
/// # Returning custom errors
///
/// See [this example](https://github.com/edgedb/edgedb-rust/blob/master/gel-tokio/examples/transaction_errors.rs)
Expand All @@ -393,28 +429,54 @@ impl Client {
/// dropped after closure exists. General rule: do not store transaction
/// anywhere and do not send to another coroutine. Pass to all further
/// function calls by reference.
pub async fn transaction<T, B, F>(&self, body: B) -> Result<T, Error>
where
B: FnMut(transaction::RetryingTransaction) -> F,
F: Future<Output = Result<T, Error>>,
{
transaction::run_and_retry(&self.pool, self.options.clone(), body).await
}

/// Start a transaction without the retry mechanism.
///
/// Returns [RawTransaction] which implements [crate::QueryExecutor] and can
/// be used to execute queries within the transaction.
///
/// The transaction will never retry failed queries, even if the database signals that the
/// query should be retried. For this reason, it is recommended to use [Client::within_transaction]
/// when possible.
///
/// <div class="warning">
/// Transactions can fail for benign reasons and should always handle that case gracefully.
/// `RawTransaction` does not provide any retry mechanisms, so this responsibility falls
/// onto the user. For example, even only two select queries in a transaction can fail due to
/// concurrent modification of the database.
/// </div>
///
/// # Commit and rollback
///
/// To commit the changes made during the transaction,
/// [commit](crate::RawTransaction::commit) method must be called, otherwise the
/// transaction will roll back when [RawTransaction] is dropped.
///
/// # Example
///
/// ```rust,no_run
/// # async fn transaction() -> Result<(), gel_tokio::Error> {
/// # async fn main_() -> Result<(), gel_tokio::Error> {
/// let conn = gel_tokio::create_client().await?;
/// let val = conn.transaction(|mut tx| async move {
/// tx.query_required_single::<i64, _>("
/// WITH C := UPDATE Counter SET { value := .value + 1}
/// SELECT C.value LIMIT 1
/// let mut tx = conn.transaction_raw().await?;
/// tx.query_required_single::<i64, _>("
/// WITH C := UPDATE Counter SET { value := .value + 1}
/// SELECT C.value LIMIT 1
/// ", &()
/// ).await
/// }).await?;
/// ).await;
/// tx.commit().await;
/// # Ok(())
/// # }
/// ```
pub async fn transaction<T, B, F>(&self, body: B) -> Result<T, Error>
where
B: FnMut(Transaction) -> F,
F: Future<Output = Result<T, Error>>,
{
transaction(&self.pool, self.options.clone(), body).await
#[cfg(feature = "unstable")]
pub async fn transaction_raw(&self) -> Result<transaction::RawTransaction, Error> {
crate::transaction::start(&self.pool, self.options.clone()).await
}

/// Returns client with adjusted options for future transactions.
Expand Down
4 changes: 3 additions & 1 deletion gel-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pub use errors::Error;
pub use options::{RetryCondition, RetryOptions, TransactionOptions};
pub use query_executor::{QueryExecutor, ResultVerbose};
pub use state::{ConfigDelta, GlobalsDelta};
pub use transaction::Transaction;
pub use transaction::{RetryingTransaction, Transaction};

/// The ordered list of project filenames supported.
pub const PROJECT_FILES: &[&str] = &["gel.toml", "edgedb.toml"];
Expand All @@ -161,6 +161,8 @@ pub const DEFAULT_PROJECT_FILE: &str = PROJECT_FILES[0];

#[cfg(feature = "unstable")]
pub use builder::{get_project_path, get_stash_path};
#[cfg(feature = "unstable")]
pub use transaction::RawTransaction;

/// Create a connection to the database with default parameters
///
Expand Down
4 changes: 2 additions & 2 deletions gel-tokio/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub trait QueryExecutor: Sized {
A: QueryArgs,
R: QueryResult + Send;

/// see [Client::query_with_warnings]
/// see [Client::query_verbose]
fn query_verbose<R, A>(
self,
query: impl AsRef<str> + Send,
Expand Down Expand Up @@ -171,7 +171,7 @@ impl QueryExecutor for &Client {
}
}

impl QueryExecutor for &mut Transaction {
impl<T: std::ops::DerefMut<Target = Transaction>> QueryExecutor for &mut T {
fn query<R, A>(
self,
query: impl AsRef<str> + Send,
Expand Down
Loading