From c71998b846ef406084d6d160e52bdb0667c698a6 Mon Sep 17 00:00:00 2001 From: Dani Mehrjerdi Date: Tue, 25 Feb 2025 13:16:05 +0100 Subject: [PATCH] Add better conclusion loop (#401) --- .../src/auction/repository/add_auction.rs | 1 + .../src/auction/service/add_auction.rs | 17 +++++++-- .../src/auction/service/auction_manager.rs | 25 ++++++------- .../src/auction/service/conclude_auction.rs | 19 +++++++++- .../src/auction/service/conclude_auctions.rs | 37 ------------------- .../src/auction/service/handle_auction.rs | 18 +++++++-- auction-server/src/auction/service/mod.rs | 1 - auction-server/src/auction/service/workers.rs | 9 ----- 8 files changed, 58 insertions(+), 69 deletions(-) delete mode 100644 auction-server/src/auction/service/conclude_auctions.rs diff --git a/auction-server/src/auction/repository/add_auction.rs b/auction-server/src/auction/repository/add_auction.rs index 088c07549..ac54b2a22 100644 --- a/auction-server/src/auction/repository/add_auction.rs +++ b/auction-server/src/auction/repository/add_auction.rs @@ -24,6 +24,7 @@ impl Repository { .insert(auction.id, auction); } + // NOTE: Do not call this function directly. Instead call `add_auction` from `Service`. #[tracing::instrument(skip_all, name = "add_auction_repo", fields(auction_id))] pub async fn add_auction( &self, diff --git a/auction-server/src/auction/service/add_auction.rs b/auction-server/src/auction/service/add_auction.rs index acc8ae1a4..c41b6ed1d 100644 --- a/auction-server/src/auction/service/add_auction.rs +++ b/auction-server/src/auction/service/add_auction.rs @@ -1,5 +1,6 @@ use { super::{ + auction_manager::AuctionManager, ChainTrait, Service, }, @@ -13,14 +14,24 @@ pub struct AddAuctionInput { pub auction: entities::Auction, } -impl Service { +impl Service +where + Service: AuctionManager, +{ pub async fn add_auction( &self, input: AddAuctionInput, ) -> Result, RestError> { - self.repo.add_auction(input.auction).await.map_err(|e| { + let auction = self.repo.add_auction(input.auction).await.map_err(|e| { tracing::error!(error = ?e, "Failed to add auction"); RestError::TemporarilyUnavailable - }) + })?; + self.task_tracker.spawn({ + let service = self.clone(); + async move { + service.conclude_auction_loop(auction.id).await; + } + }); + Ok(auction) } } diff --git a/auction-server/src/auction/service/auction_manager.rs b/auction-server/src/auction/service/auction_manager.rs index e597494d9..255092bff 100644 --- a/auction-server/src/auction/service/auction_manager.rs +++ b/auction-server/src/auction/service/auction_manager.rs @@ -101,8 +101,6 @@ pub trait AuctionManager { async fn get_ws_client(&self) -> Result; /// Get the trigger stream for the ws client to subscribe to new triggers. async fn get_trigger_stream<'a>(client: &'a Self::WsClient) -> Result>; - /// Check if the auction is ready to be concluded based on the trigger. - fn is_ready_to_conclude(trigger: Self::Trigger) -> bool; /// Get the winner bids for the auction. Sorting bids by bid amount and simulating the bids to determine the winner bids. async fn get_winner_bids( @@ -140,6 +138,9 @@ pub trait AuctionManager { /// Check if the auction is expired based on the creation time of the auction. fn is_auction_expired(auction: &entities::Auction) -> bool; + + /// Get the conclusion interval for the auction. + fn get_conclusion_interval() -> Interval; } // While we are submitting bids together, increasing this number will have the following effects: @@ -169,10 +170,6 @@ impl AuctionManager for Service { Ok(block_stream) } - fn is_ready_to_conclude(_trigger: Self::Trigger) -> bool { - true - } - #[tracing::instrument(skip_all, fields(auction_id, bid_ids, simulation_result))] async fn get_winner_bids( &self, @@ -331,11 +328,12 @@ impl AuctionManager for Service { fn is_auction_expired(auction: &entities::Auction) -> bool { auction.creation_time + BID_MAXIMUM_LIFE_TIME_EVM < OffsetDateTime::now_utc() } + + fn get_conclusion_interval() -> Interval { + interval(Duration::from_secs(4)) + } } -/// This is to make sure we are not missing any transaction. -/// We run this once every minute (150 * 0.4). -const CONCLUSION_TRIGGER_INTERVAL_SVM: u64 = 150; const BID_MAXIMUM_LIFE_TIME_SVM: Duration = Duration::from_secs(120); const TRIGGER_DURATION_SVM: Duration = Duration::from_millis(400); @@ -391,11 +389,6 @@ impl AuctionManager for Service { Ok(TriggerStreamSvm::new(interval(TRIGGER_DURATION_SVM))) } - fn is_ready_to_conclude(trigger: Self::Trigger) -> bool { - // To make sure we run it once at the beginning - trigger % CONCLUSION_TRIGGER_INTERVAL_SVM == 1 - } - #[tracing::instrument(skip_all, fields(auction_id, bid_ids))] async fn get_winner_bids( &self, @@ -605,6 +598,10 @@ impl AuctionManager for Service { fn is_auction_expired(auction: &entities::Auction) -> bool { auction.creation_time + BID_MAXIMUM_LIFE_TIME_SVM * 2 < OffsetDateTime::now_utc() } + + fn get_conclusion_interval() -> Interval { + interval(Duration::from_secs(60)) + } } const SEND_TRANSACTION_RETRY_COUNT_SVM: i32 = 30; diff --git a/auction-server/src/auction/service/conclude_auction.rs b/auction-server/src/auction/service/conclude_auction.rs index 671199b12..67e4051fe 100644 --- a/auction-server/src/auction/service/conclude_auction.rs +++ b/auction-server/src/auction/service/conclude_auction.rs @@ -75,7 +75,7 @@ where /// Concludes an auction by getting the auction transaction status from the chain. #[tracing::instrument(skip_all)] - pub async fn conclude_auction(&self, input: ConcludeAuctionInput) -> anyhow::Result<()> { + async fn conclude_auction(&self, input: ConcludeAuctionInput) -> anyhow::Result<()> { let auction = input.auction; tracing::info!(chain_id = self.config.chain_id, auction_id = ?auction.id, permission_key = auction.permission_key.to_string(), "Concluding auction"); if let Some(tx_hash) = auction.tx_hash.clone() { @@ -121,4 +121,21 @@ where } Ok(()) } + + pub async fn conclude_auction_loop(&self, auction_id: entities::AuctionId) { + let mut interval = Self::get_conclusion_interval(); + loop { + interval.tick().await; + if let Some(auction) = self.repo.get_in_memory_auction_by_id(auction_id).await { + if let Err(e) = self + .conclude_auction(ConcludeAuctionInput { auction }) + .await + { + tracing::error!(error = ?e, "Failed to conclude auction"); + } + } else { + break; + } + } + } } diff --git a/auction-server/src/auction/service/conclude_auctions.rs b/auction-server/src/auction/service/conclude_auctions.rs deleted file mode 100644 index 7796c21ae..000000000 --- a/auction-server/src/auction/service/conclude_auctions.rs +++ /dev/null @@ -1,37 +0,0 @@ -use { - super::{ - auction_manager::AuctionManager, - ChainTrait, - Service, - }, - crate::auction::service::conclude_auction::ConcludeAuctionInput, -}; - -impl Service -where - Service: AuctionManager, -{ - pub async fn conclude_auctions(&self) { - let auctions = self.repo.get_in_memory_auctions().await; - for auction in auctions { - self.task_tracker.spawn({ - let service = self.clone(); - async move { - let result = service - .conclude_auction(ConcludeAuctionInput { - auction: auction.clone(), - }) - .await; - if let Err(err) = result { - tracing::error!( - error = ?err, - chain_id = service.config.chain_id, - auction_id = ?auction.id, - "Failed to conclude auction", - ); - } - } - }); - } - } -} diff --git a/auction-server/src/auction/service/handle_auction.rs b/auction-server/src/auction/service/handle_auction.rs index 631411a34..6ec654baa 100644 --- a/auction-server/src/auction/service/handle_auction.rs +++ b/auction-server/src/auction/service/handle_auction.rs @@ -9,7 +9,10 @@ use { self, BidStatus, }, - service::update_bid_status::UpdateBidStatusInput, + service::{ + add_auction::AddAuctionInput, + update_bid_status::UpdateBidStatusInput, + }, }, futures::future::join_all, time::OffsetDateTime, @@ -61,7 +64,14 @@ where return Ok(()); } - let auction = self.repo.add_auction(auction).await?; + let auction = self + .add_auction(AddAuctionInput { auction }) + .await + .map_err(|err| { + tracing::error!(error = ?err, "Failed to add auction"); + anyhow::anyhow!("Failed to add auction") + })?; + tracing::info!( auction = ?auction, chain_id = self.config.chain_id, @@ -73,7 +83,7 @@ where .await { Ok(tx_hash) => { - tracing::debug!("Submitted transaction: {:?}", tx_hash); + tracing::debug!(tx_hash = ?tx_hash, "Submitted transaction"); let auction = self.repo.submit_auction(auction, tx_hash.clone()).await?; join_all(auction.bids.iter().map(|bid| { self.update_bid_status(UpdateBidStatusInput { @@ -92,7 +102,7 @@ where .await; } Err(err) => { - tracing::error!("Transaction failed to submit: {:?}", err); + tracing::error!(error = ?err, "Transaction failed to submit"); } }; Ok(()) diff --git a/auction-server/src/auction/service/mod.rs b/auction-server/src/auction/service/mod.rs index 8d96bede5..ed2f3800b 100644 --- a/auction-server/src/auction/service/mod.rs +++ b/auction-server/src/auction/service/mod.rs @@ -70,7 +70,6 @@ pub mod add_auction; pub mod auction_manager; pub mod cancel_bid; pub mod conclude_auction; -pub mod conclude_auctions; pub mod get_auction_by_id; pub mod get_bid; pub mod get_bids; diff --git a/auction-server/src/auction/service/workers.rs b/auction-server/src/auction/service/workers.rs index d3eebc96d..b5338ea7c 100644 --- a/auction-server/src/auction/service/workers.rs +++ b/auction-server/src/auction/service/workers.rs @@ -72,15 +72,6 @@ where service.handle_auctions().await; } }); - - if Service::is_ready_to_conclude(trigger) { - self.task_tracker.spawn({ - let service = self.clone(); - async move { - service.conclude_auctions().await; - } - }); - } } _ = exit_check_interval.tick() => {} }