From 4043f6d4a2e9193f4785c9fa5f91efd1d11b331b Mon Sep 17 00:00:00 2001 From: guibescos <59208140+guibescos@users.noreply.github.com> Date: Mon, 3 Mar 2025 22:00:59 +0100 Subject: [PATCH] do it (#412) --- .../src/opportunity/repository/db.rs | 173 ------------------ .../src/opportunity/repository/mod.rs | 10 +- .../src/opportunity/repository/models.rs | 171 ++++++++++++++++- .../opportunity/service/add_opportunity.rs | 4 +- auction-server/src/opportunity/service/mod.rs | 16 +- 5 files changed, 183 insertions(+), 191 deletions(-) delete mode 100644 auction-server/src/opportunity/repository/db.rs diff --git a/auction-server/src/opportunity/repository/db.rs b/auction-server/src/opportunity/repository/db.rs deleted file mode 100644 index 4a383b45..00000000 --- a/auction-server/src/opportunity/repository/db.rs +++ /dev/null @@ -1,173 +0,0 @@ -#[cfg(test)] -use mockall::automock; -use { - super::{ - entities, - models, - InMemoryStore, - OpportunityMetadata, - OpportunityRemovalReason, - }, - crate::{ - api::RestError, - kernel::{ - db::DB, - entities::{ - ChainId, - PermissionKey, - }, - }, - opportunity::entities::Opportunity, - }, - axum::async_trait, - sqlx::QueryBuilder, - std::fmt::Debug, - time::{ - OffsetDateTime, - PrimitiveDateTime, - }, - tracing::{ - info_span, - Instrument, - }, -}; - -#[cfg_attr(test, automock)] -#[async_trait] -pub trait OpportunityTable: Debug + Send + Sync + 'static { - async fn add_opportunity(&self, opportunity: &T::Opportunity) -> Result<(), RestError>; - async fn get_opportunities( - &self, - chain_id: ChainId, - permission_key: Option, - from_time: Option, - ) -> Result, RestError>; - async fn remove_opportunities( - &self, - permission_key: PermissionKey, - chain_id: ChainId, - reason: OpportunityRemovalReason, - ) -> anyhow::Result<()>; - async fn remove_opportunity( - &self, - opportunity: &T::Opportunity, - reason: OpportunityRemovalReason, - ) -> anyhow::Result<()>; -} -#[async_trait] -impl OpportunityTable for DB { - async fn add_opportunity(&self, opportunity: &T::Opportunity) -> Result<(), RestError> { - let metadata = opportunity.get_models_metadata(); - let chain_type = ::ModelMetadata::get_chain_type(); - sqlx::query!("INSERT INTO opportunity (id, - creation_time, - permission_key, - chain_id, - chain_type, - metadata, - sell_tokens, - buy_tokens) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", - opportunity.id, - PrimitiveDateTime::new(opportunity.creation_time.date(), opportunity.creation_time.time()), - opportunity.permission_key.to_vec(), - opportunity.chain_id, - chain_type as _, - serde_json::to_value(metadata).expect("Failed to serialize metadata"), - serde_json::to_value(&opportunity.sell_tokens).expect("Failed to serialize sell_tokens"), - serde_json::to_value(&opportunity.buy_tokens).expect("Failed to serialize buy_tokens")) - .execute(self) - .instrument(info_span!("db_add_opportunity")) - .await - .map_err(|e| { - tracing::error!("DB: Failed to insert opportunity: {}", e); - RestError::TemporarilyUnavailable - })?; - Ok(()) - } - - async fn get_opportunities( - &self, - chain_id: ChainId, - permission_key: Option, - from_time: Option, - ) -> Result::Opportunity>, RestError> { - let mut query = QueryBuilder::new("SELECT * from opportunity WHERE chain_type = "); - query.push_bind( - <::ModelMetadata>::get_chain_type(), - ); - query.push(" AND chain_id = "); - query.push_bind(chain_id.clone()); - if let Some(permission_key) = permission_key.clone() { - query.push(" AND permission_key = "); - query.push_bind(permission_key.to_vec()); - } - if let Some(from_time) = from_time { - query.push(" AND creation_time >= "); - query.push_bind(from_time); - } - query.push(" ORDER BY creation_time ASC LIMIT "); - query.push_bind(super::OPPORTUNITY_PAGE_SIZE_CAP as i64); - let opps: Vec::ModelMetadata>> = query - .build_query_as() - .fetch_all(self) - .instrument(info_span!("db_get_opportunities")) - .await - .map_err(|e| { - tracing::error!( - "DB: Failed to fetch opportunities: {} - chain_id: {:?} - permission_key: {:?} - from_time: {:?}", - e, - chain_id, - permission_key, - from_time, - ); - RestError::TemporarilyUnavailable - })?; - - opps.into_iter().map(|opp| opp.clone().try_into().map_err( - |_| { - tracing::error!( - "Failed to convert database opportunity to entity opportunity: {:?} - chain_id: {:?} - permission_key: {:?} - from_time: {:?}", - opp, - chain_id, - permission_key, - from_time, - ); - RestError::TemporarilyUnavailable - } - )).collect() - } - - async fn remove_opportunities( - &self, - permission_key: PermissionKey, - chain_id: ChainId, - reason: OpportunityRemovalReason, - ) -> anyhow::Result<()> { - let now = OffsetDateTime::now_utc(); - sqlx::query("UPDATE opportunity SET removal_time = $1, removal_reason = $2 WHERE permission_key = $3 AND chain_id = $4 and removal_time IS NULL") - .bind(PrimitiveDateTime::new(now.date(), now.time())) - .bind(reason) - .bind(permission_key.as_ref()) - .bind(chain_id) - .execute(self) - .instrument(info_span!("db_remove_opportunities")) - .await?; - Ok(()) - } - - async fn remove_opportunity( - &self, - opportunity: &T::Opportunity, - reason: OpportunityRemovalReason, - ) -> anyhow::Result<()> { - let now = OffsetDateTime::now_utc(); - sqlx::query("UPDATE opportunity SET removal_time = $1, removal_reason = $2 WHERE id = $3 AND removal_time IS NULL") - .bind(PrimitiveDateTime::new(now.date(), now.time())) - .bind(reason) - .bind(opportunity.id) - .execute(self) - .instrument(info_span!("db_remove_opportunity")) - .await?; - Ok(()) - } -} diff --git a/auction-server/src/opportunity/repository/mod.rs b/auction-server/src/opportunity/repository/mod.rs index 60ded0e1..adf0034b 100644 --- a/auction-server/src/opportunity/repository/mod.rs +++ b/auction-server/src/opportunity/repository/mod.rs @@ -13,7 +13,6 @@ use { mod add_opportunity; mod add_spoof_info; -mod db; mod get_express_relay_metadata; mod get_in_memory_opportunities; mod get_in_memory_opportunities_by_key; @@ -26,17 +25,14 @@ mod refresh_in_memory_opportunity; mod remove_opportunities; mod remove_opportunity; -pub use { - db::*, - models::*, -}; +pub use models::*; pub const OPPORTUNITY_PAGE_SIZE_CAP: usize = 100; #[derive(Debug)] pub struct Repository { pub in_memory_store: T, - pub db: Box>, + pub db: Box>, } pub trait InMemoryStore: @@ -109,7 +105,7 @@ impl Deref for InMemoryStoreSvm { } impl Repository { - pub fn new(db: impl OpportunityTable) -> Self { + pub fn new(db: impl Database) -> Self { Self { in_memory_store: T::new(), db: Box::new(db), diff --git a/auction-server/src/opportunity/repository/models.rs b/auction-server/src/opportunity/repository/models.rs index 6599bed0..03ace6ef 100644 --- a/auction-server/src/opportunity/repository/models.rs +++ b/auction-server/src/opportunity/repository/models.rs @@ -1,8 +1,27 @@ +#[cfg(test)] +use mockall::automock; use { + super::{ + entities, + models, + InMemoryStore, + }, crate::{ + api::RestError, + kernel::{ + db::DB, + entities::{ + ChainId, + PermissionKey, + }, + }, models::ChainType, - opportunity::entities::FeeToken, + opportunity::entities::{ + FeeToken, + Opportunity as OpportunityTrait, + }, }, + axum::async_trait, ethers::types::{ Address, Bytes, @@ -25,12 +44,20 @@ use { sqlx::{ prelude::FromRow, types::{ - time::PrimitiveDateTime, Json, JsonValue, }, + QueryBuilder, }, std::fmt::Debug, + time::{ + OffsetDateTime, + PrimitiveDateTime, + }, + tracing::{ + info_span, + Instrument, + }, uuid::Uuid, }; @@ -123,3 +150,143 @@ pub struct Opportunity { pub removal_reason: Option, pub metadata: Json, } + +#[cfg_attr(test, automock)] +#[async_trait] +pub trait Database: Debug + Send + Sync + 'static { + async fn add_opportunity(&self, opportunity: &T::Opportunity) -> Result<(), RestError>; + async fn get_opportunities( + &self, + chain_id: ChainId, + permission_key: Option, + from_time: Option, + ) -> Result, RestError>; + async fn remove_opportunities( + &self, + permission_key: PermissionKey, + chain_id: ChainId, + reason: OpportunityRemovalReason, + ) -> anyhow::Result<()>; + async fn remove_opportunity( + &self, + opportunity: &T::Opportunity, + reason: OpportunityRemovalReason, + ) -> anyhow::Result<()>; +} +#[async_trait] +impl Database for DB { + async fn add_opportunity(&self, opportunity: &T::Opportunity) -> Result<(), RestError> { + let metadata = opportunity.get_models_metadata(); + let chain_type = ::ModelMetadata::get_chain_type(); + sqlx::query!("INSERT INTO opportunity (id, + creation_time, + permission_key, + chain_id, + chain_type, + metadata, + sell_tokens, + buy_tokens) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", + opportunity.id, + PrimitiveDateTime::new(opportunity.creation_time.date(), opportunity.creation_time.time()), + opportunity.permission_key.to_vec(), + opportunity.chain_id, + chain_type as _, + serde_json::to_value(metadata).expect("Failed to serialize metadata"), + serde_json::to_value(&opportunity.sell_tokens).expect("Failed to serialize sell_tokens"), + serde_json::to_value(&opportunity.buy_tokens).expect("Failed to serialize buy_tokens")) + .execute(self) + .instrument(info_span!("db_add_opportunity")) + .await + .map_err(|e| { + tracing::error!("DB: Failed to insert opportunity: {}", e); + RestError::TemporarilyUnavailable + })?; + Ok(()) + } + + async fn get_opportunities( + &self, + chain_id: ChainId, + permission_key: Option, + from_time: Option, + ) -> Result::Opportunity>, RestError> { + let mut query = QueryBuilder::new("SELECT * from opportunity WHERE chain_type = "); + query.push_bind( + <::ModelMetadata>::get_chain_type(), + ); + query.push(" AND chain_id = "); + query.push_bind(chain_id.clone()); + if let Some(permission_key) = permission_key.clone() { + query.push(" AND permission_key = "); + query.push_bind(permission_key.to_vec()); + } + if let Some(from_time) = from_time { + query.push(" AND creation_time >= "); + query.push_bind(from_time); + } + query.push(" ORDER BY creation_time ASC LIMIT "); + query.push_bind(super::OPPORTUNITY_PAGE_SIZE_CAP as i64); + let opps: Vec::ModelMetadata>> = query + .build_query_as() + .fetch_all(self) + .instrument(info_span!("db_get_opportunities")) + .await + .map_err(|e| { + tracing::error!( + "DB: Failed to fetch opportunities: {} - chain_id: {:?} - permission_key: {:?} - from_time: {:?}", + e, + chain_id, + permission_key, + from_time, + ); + RestError::TemporarilyUnavailable + })?; + + opps.into_iter().map(|opp| opp.clone().try_into().map_err( + |_| { + tracing::error!( + "Failed to convert database opportunity to entity opportunity: {:?} - chain_id: {:?} - permission_key: {:?} - from_time: {:?}", + opp, + chain_id, + permission_key, + from_time, + ); + RestError::TemporarilyUnavailable + } + )).collect() + } + + async fn remove_opportunities( + &self, + permission_key: PermissionKey, + chain_id: ChainId, + reason: OpportunityRemovalReason, + ) -> anyhow::Result<()> { + let now = OffsetDateTime::now_utc(); + sqlx::query("UPDATE opportunity SET removal_time = $1, removal_reason = $2 WHERE permission_key = $3 AND chain_id = $4 and removal_time IS NULL") + .bind(PrimitiveDateTime::new(now.date(), now.time())) + .bind(reason) + .bind(permission_key.as_ref()) + .bind(chain_id) + .execute(self) + .instrument(info_span!("db_remove_opportunities")) + .await?; + Ok(()) + } + + async fn remove_opportunity( + &self, + opportunity: &T::Opportunity, + reason: OpportunityRemovalReason, + ) -> anyhow::Result<()> { + let now = OffsetDateTime::now_utc(); + sqlx::query("UPDATE opportunity SET removal_time = $1, removal_reason = $2 WHERE id = $3 AND removal_time IS NULL") + .bind(PrimitiveDateTime::new(now.date(), now.time())) + .bind(reason) + .bind(opportunity.id) + .execute(self) + .instrument(info_span!("db_remove_opportunity")) + .await?; + Ok(()) + } +} diff --git a/auction-server/src/opportunity/service/add_opportunity.rs b/auction-server/src/opportunity/service/add_opportunity.rs index 2edcc726..ab657317 100644 --- a/auction-server/src/opportunity/service/add_opportunity.rs +++ b/auction-server/src/opportunity/service/add_opportunity.rs @@ -131,7 +131,7 @@ mod tests { OpportunitySvmProgramLimo, TokenAmountSvm, }, - repository::MockOpportunityTable, + repository::MockDatabase, service::{ add_opportunity::AddOpportunityInput, ChainTypeSvm, @@ -150,7 +150,7 @@ mod tests { async fn test_add_opportunity() { let chain_id = "solana".to_string(); let rpc_client = MockRpcClient::default(); - let mut mock_db = MockOpportunityTable::default(); + let mut mock_db = MockDatabase::default(); mock_db.expect_add_opportunity().returning(|_| Ok(())); diff --git a/auction-server/src/opportunity/service/mod.rs b/auction-server/src/opportunity/service/mod.rs index 31a33a44..0760fd1a 100644 --- a/auction-server/src/opportunity/service/mod.rs +++ b/auction-server/src/opportunity/service/mod.rs @@ -1,11 +1,9 @@ -#[cfg(test)] -use mockall::mock; use { super::repository::{ + Database, InMemoryStore, InMemoryStoreEvm, InMemoryStoreSvm, - OpportunityTable, Repository, }, crate::{ @@ -14,7 +12,6 @@ use { }, kernel::{ contracts::AdapterFactory, - db::DB, entities::{ ChainId, ChainType as ChainTypeEnum, @@ -50,6 +47,11 @@ use { tokio::sync::RwLock, tokio_util::task::TaskTracker, }; +#[cfg(test)] +use { + crate::kernel::db::DB, + mockall::mock, +}; pub mod add_opportunity; pub mod get_config; @@ -272,7 +274,7 @@ impl Service { pub fn new( store: Arc, task_tracker: TaskTracker, - db: impl OpportunityTable, + db: impl Database, config: HashMap, ) -> Self { Self { @@ -297,7 +299,7 @@ pub mod tests { UpdateEvent, }, kernel::traced_sender_svm::tests::MockRpcClient, - opportunity::repository::MockOpportunityTable, + opportunity::repository::MockDatabase, server::setup_metrics_recorder, }, std::sync::atomic::AtomicUsize, @@ -307,7 +309,7 @@ pub mod tests { impl Service { pub fn new_with_mocks_svm( chain_id: ChainId, - db: MockOpportunityTable, + db: MockDatabase, rpc_client: MockRpcClient, ) -> (Self, Receiver) { let config_svm = crate::opportunity::service::ConfigSvm {