From 99a9cc38164a863d8b32ae3c3781132f7e67982e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alja=C5=BE=20Mur=20Er=C5=BEen?= Date: Fri, 10 Jan 2025 14:28:37 +0100 Subject: [PATCH] refactor: move annotations into Options --- edgedb-protocol/tests/codecs.rs | 2 +- edgedb-tokio/src/client.rs | 35 ++++++++++++++++++++------------- edgedb-tokio/src/raw/options.rs | 3 +++ edgedb-tokio/src/transaction.rs | 33 +++++++++++++++---------------- 4 files changed, 41 insertions(+), 32 deletions(-) diff --git a/edgedb-protocol/tests/codecs.rs b/edgedb-protocol/tests/codecs.rs index bbc95887..fc73bab1 100644 --- a/edgedb-protocol/tests/codecs.rs +++ b/edgedb-protocol/tests/codecs.rs @@ -4,7 +4,7 @@ extern crate pretty_assertions; use std::error::Error; use std::sync::Arc; -use bytes::{Buf, Bytes}; +use bytes::Bytes; use edgedb_protocol::codec::build_codec; use edgedb_protocol::codec::{Codec, ObjectShape}; diff --git a/edgedb-tokio/src/client.rs b/edgedb-tokio/src/client.rs index 75451030..957944e6 100644 --- a/edgedb-tokio/src/client.rs +++ b/edgedb-tokio/src/client.rs @@ -2,7 +2,6 @@ use std::future::Future; use std::sync::Arc; use edgedb_protocol::common::{Capabilities, Cardinality, IoFormat}; -use edgedb_protocol::encoding::Annotations; use edgedb_protocol::model::Json; use edgedb_protocol::query_arg::QueryArgs; use edgedb_protocol::QueryResult; @@ -35,7 +34,6 @@ use crate::ResultVerbose; pub struct Client { options: Arc, pool: Pool, - annotations: Arc, } impl Client { @@ -48,7 +46,6 @@ impl Client { Client { options: Default::default(), pool: Pool::new(config), - annotations: Default::default(), } } @@ -85,7 +82,7 @@ impl Client { query.as_ref(), arguments, state, - &self.annotations, + &self.options.annotations, caps, io_format, cardinality, @@ -343,7 +340,13 @@ impl Client { let state = &self.options.state; let caps = Capabilities::MODIFICATIONS | Capabilities::DDL; match conn - .execute(query.as_ref(), arguments, state, &self.annotations, caps) + .execute( + query.as_ref(), + arguments, + state, + &self.options.annotations, + caps, + ) .await { Ok(_) => return Ok(()), @@ -411,7 +414,7 @@ impl Client { B: FnMut(Transaction) -> F, F: Future>, { - transaction(&self.pool, &self.options, &self.annotations, body).await + transaction(&self.pool, self.options.clone(), body).await } /// Returns client with adjusted options for future transactions. @@ -429,9 +432,9 @@ impl Client { transaction: options, retry: self.options.retry.clone(), state: self.options.state.clone(), + annotations: self.options.annotations.clone(), }), pool: self.pool.clone(), - annotations: self.annotations.clone(), } } /// Returns client with adjusted options for future retrying @@ -448,9 +451,9 @@ impl Client { transaction: self.options.transaction.clone(), retry: options, state: self.options.state.clone(), + annotations: self.options.annotations.clone(), }), pool: self.pool.clone(), - annotations: self.annotations.clone(), } } @@ -460,9 +463,9 @@ impl Client { transaction: self.options.transaction.clone(), retry: self.options.retry.clone(), state: Arc::new(f(&self.options.state)), + annotations: self.options.annotations.clone(), }), pool: self.pool.clone(), - annotations: self.annotations.clone(), } } @@ -574,8 +577,8 @@ impl Client { pub fn with_tag(&self, tag: Option<&str>) -> Result { const KEY: &str = "tag"; - let annotations = if self.annotations.get(KEY).map(|s| s.as_str()) != tag { - let mut annotations = (*self.annotations).clone(); + let annotations = if self.options.annotations.get(KEY).map(|s| s.as_str()) != tag { + let mut annotations = (*self.options.annotations).clone(); if let Some(tag) = tag { if tag.starts_with("edgedb/") { return Err(InvalidArgumentError::with_message("reserved tag: edgedb/*")); @@ -594,13 +597,17 @@ impl Client { } Arc::new(annotations) } else { - self.annotations.clone() + self.options.annotations.clone() }; Ok(Client { - options: self.options.clone(), + options: Arc::new(Options { + transaction: self.options.transaction.clone(), + retry: self.options.retry.clone(), + state: self.options.state.clone(), + annotations, + }), pool: self.pool.clone(), - annotations, }) } } diff --git a/edgedb-tokio/src/raw/options.rs b/edgedb-tokio/src/raw/options.rs index e0fc3929..e8ed38b3 100644 --- a/edgedb-tokio/src/raw/options.rs +++ b/edgedb-tokio/src/raw/options.rs @@ -1,5 +1,7 @@ use std::sync::Arc; +use edgedb_protocol::encoding::Annotations; + use crate::options::{RetryOptions, TransactionOptions}; use crate::raw::state::PoolState; @@ -8,4 +10,5 @@ pub struct Options { pub(crate) transaction: TransactionOptions, pub(crate) retry: RetryOptions, pub(crate) state: Arc, + pub(crate) annotations: Arc, } diff --git a/edgedb-tokio/src/transaction.rs b/edgedb-tokio/src/transaction.rs index 249c664e..8b7e6d1d 100644 --- a/edgedb-tokio/src/transaction.rs +++ b/edgedb-tokio/src/transaction.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use bytes::BytesMut; use edgedb_protocol::common::CompilationOptions; use edgedb_protocol::common::{Capabilities, Cardinality, InputLanguage, IoFormat}; -use edgedb_protocol::encoding::Annotations; use edgedb_protocol::model::Json; use edgedb_protocol::query_arg::{Encoder, QueryArgs}; use edgedb_protocol::QueryResult; @@ -14,7 +13,7 @@ use tokio::time::sleep; use crate::errors::ClientError; use crate::errors::{Error, ErrorKind, SHOULD_RETRY}; use crate::errors::{NoDataError, ProtocolEncodingError}; -use crate::raw::{Options, Pool, PoolConnection, PoolState, Response}; +use crate::raw::{Options, Pool, PoolConnection, Response}; use crate::ResultVerbose; /// Transaction object passed to the closure via @@ -27,8 +26,7 @@ use crate::ResultVerbose; #[derive(Debug)] pub struct Transaction { iteration: u32, - state: Arc, - annotations: Arc, + options: Arc, inner: Option, } @@ -59,8 +57,7 @@ impl Drop for Transaction { pub(crate) async fn transaction( pool: &Pool, - options: &Options, - annotations: &Arc, + options: Arc, mut body: B, ) -> Result where @@ -73,8 +70,7 @@ where let (tx, mut rx) = oneshot::channel(); let tran = Transaction { iteration, - state: options.state.clone(), - annotations: annotations.clone(), + options: options.clone(), inner: Some(Inner { started: false, conn, @@ -90,7 +86,7 @@ where Ok(val) => { log::debug!("Comitting transaction"); if started { - conn.statement("COMMIT", &options.state, annotations) + conn.statement("COMMIT", &options.state, &options.annotations) .await?; } return Ok(val); @@ -98,7 +94,7 @@ where Err(outer) => { log::debug!("Rolling back transaction on error"); if started { - conn.statement("ROLLBACK", &options.state, annotations) + conn.statement("ROLLBACK", &options.state, &options.annotations) .await?; } @@ -146,9 +142,10 @@ impl Transaction { async fn ensure_started(&mut self) -> anyhow::Result<(), Error> { if let Some(inner) = &mut self.inner { if !inner.started { + let options = &self.options; inner .conn - .statement("START TRANSACTION", &self.state, &self.annotations) + .statement("START TRANSACTION", &options.state, &options.annotations) .await?; inner.started = true; } @@ -176,8 +173,8 @@ impl Transaction { .query( query.as_ref(), arguments, - &self.state, - &self.annotations, + &self.options.state, + &self.options.annotations, Capabilities::MODIFICATIONS, io_format, cardinality, @@ -396,9 +393,11 @@ impl Transaction { io_format: IoFormat::Binary, expected_cardinality: Cardinality::Many, }; - let state = self.state.clone(); // TODO: optimize, by careful borrow + let state = &self.options.state; let conn = assert_transaction(&mut self.inner); - let desc = conn.parse(&flags, query, &state, &self.annotations).await?; + let desc = conn + .parse(&flags, query, state, &self.options.annotations) + .await?; let inp_desc = desc.input().map_err(ProtocolEncodingError::with_source)?; let mut arg_buf = BytesMut::with_capacity(8); @@ -410,8 +409,8 @@ impl Transaction { conn.execute( &flags, query, - &state, - &self.annotations, + state, + &self.options.annotations, &desc, &arg_buf.freeze(), )