From 6463da8c8effa99257e68ebebc3772ff41d0ff45 Mon Sep 17 00:00:00 2001 From: Ryan Levick Date: Mon, 13 Jan 2025 15:34:07 +0100 Subject: [PATCH] Rename partition_key to store_id Signed-off-by: Ryan Levick --- crates/app/src/lib.rs | 2 +- crates/key-value-azure/src/store.rs | 76 ++++++++++++++++------------- 2 files changed, 42 insertions(+), 36 deletions(-) diff --git a/crates/app/src/lib.rs b/crates/app/src/lib.rs index 806894bfa1..8c5b84b0e5 100644 --- a/crates/app/src/lib.rs +++ b/crates/app/src/lib.rs @@ -141,7 +141,7 @@ impl App { pub fn triggers_with_type<'a>( &'a self, trigger_type: &'a str, - ) -> impl Iterator { + ) -> impl Iterator> { self.triggers() .filter(move |trigger| trigger.locked.trigger_type == trigger_type) } diff --git a/crates/key-value-azure/src/store.rs b/crates/key-value-azure/src/store.rs index 198afabd96..86f0a8a92d 100644 --- a/crates/key-value-azure/src/store.rs +++ b/crates/key-value-azure/src/store.rs @@ -1,6 +1,5 @@ use anyhow::Result; use azure_data_cosmos::prelude::Operation; -use azure_data_cosmos::resources::collection::PartitionKey; use azure_data_cosmos::{ prelude::{AuthorizationToken, CollectionClient, CosmosClient, Query}, CosmosEntity, @@ -13,6 +12,11 @@ use std::sync::{Arc, Mutex}; pub struct KeyValueAzureCosmos { client: CollectionClient, + /// An optional app id + /// + /// If provided, the store will handle multiple stores per container using a + /// partition key of `/$app_id/$store_name`, otherwise there will be one container + /// per store, and the partition key will be `/id`. app_id: Option, } @@ -97,7 +101,7 @@ impl StoreManager for KeyValueAzureCosmos { async fn get(&self, name: &str) -> Result, Error> { Ok(Arc::new(AzureCosmosStore { client: self.client.clone(), - partition_key: self.app_id.as_ref().map(|i| format!("{i}/{name}")), + store_id: self.app_id.as_ref().map(|i| format!("{i}/{name}")), })) } @@ -117,10 +121,10 @@ impl StoreManager for KeyValueAzureCosmos { #[derive(Clone)] struct AzureCosmosStore { client: CollectionClient, - /// An optional partition key to use for all operations. + /// An optional store id to use as a partition key for all operations. /// - /// If the partition key is not set, the store will use `/id` as the partition key. - partition_key: Option, + /// If the store id not set, the store will use `/id` as the partition key. + store_id: Option, } #[async_trait] @@ -134,7 +138,7 @@ impl Store for AzureCosmosStore { let pair = Pair { id: key.to_string(), value: value.to_vec(), - partition_key: self.partition_key.clone(), + store_id: self.store_id.clone(), }; self.client .create_document(pair) @@ -148,7 +152,7 @@ impl Store for AzureCosmosStore { if self.exists(key).await? { let document_client = self .client - .document_client(key, &self.partition_key) + .document_client(key, &self.store_id) .map_err(log_error)?; document_client.delete_document().await.map_err(log_error)?; } @@ -201,7 +205,7 @@ impl Store for AzureCosmosStore { let operations = vec![Operation::incr("/value", delta).map_err(log_error)?]; let _ = self .client - .document_client(key.clone(), &self.partition_key) + .document_client(key.clone(), &self.store_id) .map_err(log_error)? .patch_document(operations) .await @@ -228,7 +232,7 @@ impl Store for AzureCosmosStore { client: self.client.clone(), etag: Mutex::new(None), bucket_rep, - partition_key: self.partition_key.clone(), + store_id: self.store_id.clone(), })) } } @@ -238,18 +242,18 @@ struct CompareAndSwap { client: CollectionClient, bucket_rep: u32, etag: Mutex>, - partition_key: Option, + store_id: Option, } impl CompareAndSwap { fn get_query(&self) -> String { let mut query = format!("SELECT * FROM c WHERE c.id='{}'", self.key); - self.append_partition_key(&mut query); + self.append_store_id(&mut query, true); query } - fn append_partition_key(&self, query: &mut String) { - append_partition_key_condition(query, self.partition_key.as_deref()); + fn append_store_id(&self, query: &mut String, condition_already_exists: bool) { + append_store_id_condition(query, self.store_id.as_deref(), condition_already_exists); } } @@ -291,20 +295,15 @@ impl Cas for CompareAndSwap { /// `swap` updates the value for the key using the etag saved in the `current` function for /// optimistic concurrency. async fn swap(&self, value: Vec) -> Result<(), SwapError> { - let pk = PartitionKey::from( - self.partition_key - .as_deref() - .unwrap_or_else(|| self.key.as_str()), - ); let pair = Pair { id: self.key.clone(), value, - partition_key: self.partition_key.clone(), + store_id: self.store_id.clone(), }; let doc_client = self .client - .document_client(&self.key, &pk) + .document_client(&self.key, &pair.partition_key()) .map_err(log_cas_error)?; let etag_value = self.etag.lock().unwrap().clone(); @@ -376,38 +375,47 @@ impl AzureCosmosStore { fn get_query(&self, key: &str) -> String { let mut query = format!("SELECT * FROM c WHERE c.id='{}'", key); - self.append_partition_key(&mut query); + self.append_store_id(&mut query, true); query } fn get_keys_query(&self) -> String { let mut query = "SELECT * FROM c".to_owned(); - self.append_partition_key(&mut query); + self.append_store_id(&mut query, false); query } fn get_in_query(&self, keys: Vec) -> String { let in_clause: String = keys .into_iter() - .map(|k| format!("'{}'", k)) + .map(|k| format!("'{k}'")) .collect::>() .join(", "); let mut query = format!("SELECT * FROM c WHERE c.id IN ({})", in_clause); - self.append_partition_key(&mut query); + self.append_store_id(&mut query, true); query } - fn append_partition_key(&self, query: &mut String) { - append_partition_key_condition(query, self.partition_key.as_deref()); + fn append_store_id(&self, query: &mut String, condition_already_exists: bool) { + append_store_id_condition(query, self.store_id.as_deref(), condition_already_exists); } } -/// Appends an option partition key condition to the query. -fn append_partition_key_condition(query: &mut String, partition_key: Option<&str>) { - if let Some(pk) = partition_key { - query.push_str(" AND c.partition_key='"); - query.push_str(pk); +/// Appends an option store id condition to the query. +fn append_store_id_condition( + query: &mut String, + store_id: Option<&str>, + condition_already_exists: bool, +) { + if let Some(s) = store_id { + if condition_already_exists { + query.push_str(" AND"); + } else { + query.push_str(" WHERE"); + } + query.push_str(" c.store_id='"); + query.push_str(s); query.push('\'') } } @@ -417,15 +425,13 @@ pub struct Pair { pub id: String, pub value: Vec, #[serde(skip_serializing_if = "Option::is_none")] - pub partition_key: Option, + pub store_id: Option, } impl CosmosEntity for Pair { type Entity = String; fn partition_key(&self) -> Self::Entity { - self.partition_key - .clone() - .unwrap_or_else(|| self.id.clone()) + self.store_id.clone().unwrap_or_else(|| self.id.clone()) } }