Skip to content

Commit

Permalink
Rename partition_key to store_id
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Levick <ryan.levick@fermyon.com>
  • Loading branch information
rylev committed Jan 13, 2025
1 parent 0576dda commit 6463da8
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 36 deletions.
2 changes: 1 addition & 1 deletion crates/app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl App {
pub fn triggers_with_type<'a>(
&'a self,
trigger_type: &'a str,
) -> impl Iterator<Item = AppTrigger> {
) -> impl Iterator<Item = AppTrigger<'a>> {
self.triggers()
.filter(move |trigger| trigger.locked.trigger_type == trigger_type)
}
Expand Down
76 changes: 41 additions & 35 deletions crates/key-value-azure/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use anyhow::Result;
use azure_data_cosmos::prelude::Operation;
use azure_data_cosmos::resources::collection::PartitionKey;
use azure_data_cosmos::{
prelude::{AuthorizationToken, CollectionClient, CosmosClient, Query},
CosmosEntity,
Expand All @@ -13,6 +12,11 @@ use std::sync::{Arc, Mutex};

pub struct KeyValueAzureCosmos {
client: CollectionClient,
/// An optional app id
///
/// If provided, the store will handle multiple stores per container using a
/// partition key of `/$app_id/$store_name`, otherwise there will be one container
/// per store, and the partition key will be `/id`.
app_id: Option<String>,
}

Expand Down Expand Up @@ -97,7 +101,7 @@ impl StoreManager for KeyValueAzureCosmos {
async fn get(&self, name: &str) -> Result<Arc<dyn Store>, Error> {
Ok(Arc::new(AzureCosmosStore {
client: self.client.clone(),
partition_key: self.app_id.as_ref().map(|i| format!("{i}/{name}")),
store_id: self.app_id.as_ref().map(|i| format!("{i}/{name}")),
}))
}

Expand All @@ -117,10 +121,10 @@ impl StoreManager for KeyValueAzureCosmos {
#[derive(Clone)]
struct AzureCosmosStore {
client: CollectionClient,
/// An optional partition key to use for all operations.
/// An optional store id to use as a partition key for all operations.
///
/// If the partition key is not set, the store will use `/id` as the partition key.
partition_key: Option<String>,
/// If the store id not set, the store will use `/id` as the partition key.
store_id: Option<String>,
}

#[async_trait]
Expand All @@ -134,7 +138,7 @@ impl Store for AzureCosmosStore {
let pair = Pair {
id: key.to_string(),
value: value.to_vec(),
partition_key: self.partition_key.clone(),
store_id: self.store_id.clone(),
};
self.client
.create_document(pair)
Expand All @@ -148,7 +152,7 @@ impl Store for AzureCosmosStore {
if self.exists(key).await? {
let document_client = self
.client
.document_client(key, &self.partition_key)
.document_client(key, &self.store_id)
.map_err(log_error)?;
document_client.delete_document().await.map_err(log_error)?;
}
Expand Down Expand Up @@ -201,7 +205,7 @@ impl Store for AzureCosmosStore {
let operations = vec![Operation::incr("/value", delta).map_err(log_error)?];
let _ = self
.client
.document_client(key.clone(), &self.partition_key)
.document_client(key.clone(), &self.store_id)
.map_err(log_error)?
.patch_document(operations)
.await
Expand All @@ -228,7 +232,7 @@ impl Store for AzureCosmosStore {
client: self.client.clone(),
etag: Mutex::new(None),
bucket_rep,
partition_key: self.partition_key.clone(),
store_id: self.store_id.clone(),
}))
}
}
Expand All @@ -238,18 +242,18 @@ struct CompareAndSwap {
client: CollectionClient,
bucket_rep: u32,
etag: Mutex<Option<String>>,
partition_key: Option<String>,
store_id: Option<String>,
}

impl CompareAndSwap {
fn get_query(&self) -> String {
let mut query = format!("SELECT * FROM c WHERE c.id='{}'", self.key);
self.append_partition_key(&mut query);
self.append_store_id(&mut query, true);
query
}

fn append_partition_key(&self, query: &mut String) {
append_partition_key_condition(query, self.partition_key.as_deref());
fn append_store_id(&self, query: &mut String, condition_already_exists: bool) {
append_store_id_condition(query, self.store_id.as_deref(), condition_already_exists);
}
}

Expand Down Expand Up @@ -291,20 +295,15 @@ impl Cas for CompareAndSwap {
/// `swap` updates the value for the key using the etag saved in the `current` function for
/// optimistic concurrency.
async fn swap(&self, value: Vec<u8>) -> Result<(), SwapError> {
let pk = PartitionKey::from(
self.partition_key
.as_deref()
.unwrap_or_else(|| self.key.as_str()),
);
let pair = Pair {
id: self.key.clone(),
value,
partition_key: self.partition_key.clone(),
store_id: self.store_id.clone(),
};

let doc_client = self
.client
.document_client(&self.key, &pk)
.document_client(&self.key, &pair.partition_key())
.map_err(log_cas_error)?;

let etag_value = self.etag.lock().unwrap().clone();
Expand Down Expand Up @@ -376,38 +375,47 @@ impl AzureCosmosStore {

fn get_query(&self, key: &str) -> String {
let mut query = format!("SELECT * FROM c WHERE c.id='{}'", key);
self.append_partition_key(&mut query);
self.append_store_id(&mut query, true);
query
}

fn get_keys_query(&self) -> String {
let mut query = "SELECT * FROM c".to_owned();
self.append_partition_key(&mut query);
self.append_store_id(&mut query, false);
query
}

fn get_in_query(&self, keys: Vec<String>) -> String {
let in_clause: String = keys
.into_iter()
.map(|k| format!("'{}'", k))
.map(|k| format!("'{k}'"))
.collect::<Vec<String>>()
.join(", ");

let mut query = format!("SELECT * FROM c WHERE c.id IN ({})", in_clause);
self.append_partition_key(&mut query);
self.append_store_id(&mut query, true);
query
}

fn append_partition_key(&self, query: &mut String) {
append_partition_key_condition(query, self.partition_key.as_deref());
fn append_store_id(&self, query: &mut String, condition_already_exists: bool) {
append_store_id_condition(query, self.store_id.as_deref(), condition_already_exists);
}
}

/// Appends an option partition key condition to the query.
fn append_partition_key_condition(query: &mut String, partition_key: Option<&str>) {
if let Some(pk) = partition_key {
query.push_str(" AND c.partition_key='");
query.push_str(pk);
/// Appends an option store id condition to the query.
fn append_store_id_condition(
query: &mut String,
store_id: Option<&str>,
condition_already_exists: bool,
) {
if let Some(s) = store_id {
if condition_already_exists {
query.push_str(" AND");
} else {
query.push_str(" WHERE");
}
query.push_str(" c.store_id='");
query.push_str(s);
query.push('\'')
}
}
Expand All @@ -417,15 +425,13 @@ pub struct Pair {
pub id: String,
pub value: Vec<u8>,
#[serde(skip_serializing_if = "Option::is_none")]
pub partition_key: Option<String>,
pub store_id: Option<String>,
}

impl CosmosEntity for Pair {
type Entity = String;

fn partition_key(&self) -> Self::Entity {
self.partition_key
.clone()
.unwrap_or_else(|| self.id.clone())
self.store_id.clone().unwrap_or_else(|| self.id.clone())
}
}

0 comments on commit 6463da8

Please sign in to comment.