Skip to content

Commit

Permalink
refactor: move annotations into Options (#372)
Browse files Browse the repository at this point in the history
  • Loading branch information
aljazerzen authored Jan 10, 2025
1 parent 0ac0190 commit 1e4f496
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 32 deletions.
2 changes: 1 addition & 1 deletion edgedb-protocol/tests/codecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ extern crate pretty_assertions;
use std::error::Error;
use std::sync::Arc;

use bytes::{Buf, Bytes};
use bytes::Bytes;

use edgedb_protocol::codec::build_codec;
use edgedb_protocol::codec::{Codec, ObjectShape};
Expand Down
35 changes: 21 additions & 14 deletions edgedb-tokio/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::future::Future;
use std::sync::Arc;

use edgedb_protocol::common::{Capabilities, Cardinality, IoFormat};
use edgedb_protocol::encoding::Annotations;
use edgedb_protocol::model::Json;
use edgedb_protocol::query_arg::QueryArgs;
use edgedb_protocol::QueryResult;
Expand Down Expand Up @@ -35,7 +34,6 @@ use crate::ResultVerbose;
pub struct Client {
options: Arc<Options>,
pool: Pool,
annotations: Arc<Annotations>,
}

impl Client {
Expand All @@ -48,7 +46,6 @@ impl Client {
Client {
options: Default::default(),
pool: Pool::new(config),
annotations: Default::default(),
}
}

Expand Down Expand Up @@ -85,7 +82,7 @@ impl Client {
query.as_ref(),
arguments,
state,
&self.annotations,
&self.options.annotations,
caps,
io_format,
cardinality,
Expand Down Expand Up @@ -343,7 +340,13 @@ impl Client {
let state = &self.options.state;
let caps = Capabilities::MODIFICATIONS | Capabilities::DDL;
match conn
.execute(query.as_ref(), arguments, state, &self.annotations, caps)
.execute(
query.as_ref(),
arguments,
state,
&self.options.annotations,
caps,
)
.await
{
Ok(_) => return Ok(()),
Expand Down Expand Up @@ -411,7 +414,7 @@ impl Client {
B: FnMut(Transaction) -> F,
F: Future<Output = Result<T, Error>>,
{
transaction(&self.pool, &self.options, &self.annotations, body).await
transaction(&self.pool, self.options.clone(), body).await
}

/// Returns client with adjusted options for future transactions.
Expand All @@ -429,9 +432,9 @@ impl Client {
transaction: options,
retry: self.options.retry.clone(),
state: self.options.state.clone(),
annotations: self.options.annotations.clone(),
}),
pool: self.pool.clone(),
annotations: self.annotations.clone(),
}
}
/// Returns client with adjusted options for future retrying
Expand All @@ -448,9 +451,9 @@ impl Client {
transaction: self.options.transaction.clone(),
retry: options,
state: self.options.state.clone(),
annotations: self.options.annotations.clone(),
}),
pool: self.pool.clone(),
annotations: self.annotations.clone(),
}
}

Expand All @@ -460,9 +463,9 @@ impl Client {
transaction: self.options.transaction.clone(),
retry: self.options.retry.clone(),
state: Arc::new(f(&self.options.state)),
annotations: self.options.annotations.clone(),
}),
pool: self.pool.clone(),
annotations: self.annotations.clone(),
}
}

Expand Down Expand Up @@ -574,8 +577,8 @@ impl Client {
pub fn with_tag(&self, tag: Option<&str>) -> Result<Self, Error> {
const KEY: &str = "tag";

let annotations = if self.annotations.get(KEY).map(|s| s.as_str()) != tag {
let mut annotations = (*self.annotations).clone();
let annotations = if self.options.annotations.get(KEY).map(|s| s.as_str()) != tag {
let mut annotations = (*self.options.annotations).clone();
if let Some(tag) = tag {
if tag.starts_with("edgedb/") {
return Err(InvalidArgumentError::with_message("reserved tag: edgedb/*"));
Expand All @@ -594,13 +597,17 @@ impl Client {
}
Arc::new(annotations)
} else {
self.annotations.clone()
self.options.annotations.clone()
};

Ok(Client {
options: self.options.clone(),
options: Arc::new(Options {
transaction: self.options.transaction.clone(),
retry: self.options.retry.clone(),
state: self.options.state.clone(),
annotations,
}),
pool: self.pool.clone(),
annotations,
})
}
}
3 changes: 3 additions & 0 deletions edgedb-tokio/src/raw/options.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::sync::Arc;

use edgedb_protocol::encoding::Annotations;

use crate::options::{RetryOptions, TransactionOptions};
use crate::raw::state::PoolState;

Expand All @@ -8,4 +10,5 @@ pub struct Options {
pub(crate) transaction: TransactionOptions,
pub(crate) retry: RetryOptions,
pub(crate) state: Arc<PoolState>,
pub(crate) annotations: Arc<Annotations>,
}
33 changes: 16 additions & 17 deletions edgedb-tokio/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::sync::Arc;
use bytes::BytesMut;
use edgedb_protocol::common::CompilationOptions;
use edgedb_protocol::common::{Capabilities, Cardinality, InputLanguage, IoFormat};
use edgedb_protocol::encoding::Annotations;
use edgedb_protocol::model::Json;
use edgedb_protocol::query_arg::{Encoder, QueryArgs};
use edgedb_protocol::QueryResult;
Expand All @@ -14,7 +13,7 @@ use tokio::time::sleep;
use crate::errors::ClientError;
use crate::errors::{Error, ErrorKind, SHOULD_RETRY};
use crate::errors::{NoDataError, ProtocolEncodingError};
use crate::raw::{Options, Pool, PoolConnection, PoolState, Response};
use crate::raw::{Options, Pool, PoolConnection, Response};
use crate::ResultVerbose;

/// Transaction object passed to the closure via
Expand All @@ -27,8 +26,7 @@ use crate::ResultVerbose;
#[derive(Debug)]
pub struct Transaction {
iteration: u32,
state: Arc<PoolState>,
annotations: Arc<Annotations>,
options: Arc<Options>,
inner: Option<Inner>,
}

Expand Down Expand Up @@ -59,8 +57,7 @@ impl Drop for Transaction {

pub(crate) async fn transaction<T, B, F>(
pool: &Pool,
options: &Options,
annotations: &Arc<Annotations>,
options: Arc<Options>,
mut body: B,
) -> Result<T, Error>
where
Expand All @@ -73,8 +70,7 @@ where
let (tx, mut rx) = oneshot::channel();
let tran = Transaction {
iteration,
state: options.state.clone(),
annotations: annotations.clone(),
options: options.clone(),
inner: Some(Inner {
started: false,
conn,
Expand All @@ -90,15 +86,15 @@ where
Ok(val) => {
log::debug!("Comitting transaction");
if started {
conn.statement("COMMIT", &options.state, annotations)
conn.statement("COMMIT", &options.state, &options.annotations)
.await?;
}
return Ok(val);
}
Err(outer) => {
log::debug!("Rolling back transaction on error");
if started {
conn.statement("ROLLBACK", &options.state, annotations)
conn.statement("ROLLBACK", &options.state, &options.annotations)
.await?;
}

Expand Down Expand Up @@ -146,9 +142,10 @@ impl Transaction {
async fn ensure_started(&mut self) -> anyhow::Result<(), Error> {
if let Some(inner) = &mut self.inner {
if !inner.started {
let options = &self.options;
inner
.conn
.statement("START TRANSACTION", &self.state, &self.annotations)
.statement("START TRANSACTION", &options.state, &options.annotations)
.await?;
inner.started = true;
}
Expand Down Expand Up @@ -176,8 +173,8 @@ impl Transaction {
.query(
query.as_ref(),
arguments,
&self.state,
&self.annotations,
&self.options.state,
&self.options.annotations,
Capabilities::MODIFICATIONS,
io_format,
cardinality,
Expand Down Expand Up @@ -396,9 +393,11 @@ impl Transaction {
io_format: IoFormat::Binary,
expected_cardinality: Cardinality::Many,
};
let state = self.state.clone(); // TODO: optimize, by careful borrow
let state = &self.options.state;
let conn = assert_transaction(&mut self.inner);
let desc = conn.parse(&flags, query, &state, &self.annotations).await?;
let desc = conn
.parse(&flags, query, state, &self.options.annotations)
.await?;
let inp_desc = desc.input().map_err(ProtocolEncodingError::with_source)?;

let mut arg_buf = BytesMut::with_capacity(8);
Expand All @@ -410,8 +409,8 @@ impl Transaction {
conn.execute(
&flags,
query,
&state,
&self.annotations,
state,
&self.options.annotations,
&desc,
&arg_buf.freeze(),
)
Expand Down

0 comments on commit 1e4f496

Please sign in to comment.