diff --git a/auction-server/src/api.rs b/auction-server/src/api.rs index e3603e9b..fd1b1827 100644 --- a/auction-server/src/api.rs +++ b/auction-server/src/api.rs @@ -113,6 +113,8 @@ pub enum RestError { ProfileNotFound, /// The quote was not found. QuoteNotFound, + /// Duplicate opportunity. + DuplicateOpportunity, } impl RestError { @@ -159,6 +161,10 @@ impl RestError { StatusCode::NOT_FOUND, "No quote is currently available".to_string(), ), + RestError::DuplicateOpportunity => ( + StatusCode::BAD_REQUEST, + "Same opportunity is submitted recently".to_string(), + ), } } } diff --git a/auction-server/src/auction/service/cancel_bid.rs b/auction-server/src/auction/service/cancel_bid.rs index 90f23b2b..55c2e70b 100644 --- a/auction-server/src/auction/service/cancel_bid.rs +++ b/auction-server/src/auction/service/cancel_bid.rs @@ -48,7 +48,8 @@ impl Service { }, }, }) - .await + .await?; + Ok(()) } _ => Err(RestError::BadParameters( "Bid is only cancellable in awaiting_signature state".to_string(), diff --git a/auction-server/src/auction/service/update_bid_status.rs b/auction-server/src/auction/service/update_bid_status.rs index 85d1a910..8a0d99ae 100644 --- a/auction-server/src/auction/service/update_bid_status.rs +++ b/auction-server/src/auction/service/update_bid_status.rs @@ -20,7 +20,10 @@ pub struct UpdateBidStatusInput { impl Service { #[tracing::instrument(skip_all, fields(bid_id, status))] - pub async fn update_bid_status(&self, input: UpdateBidStatusInput) -> Result<(), RestError> { + pub async fn update_bid_status( + &self, + input: UpdateBidStatusInput, + ) -> Result { tracing::Span::current().record("bid_id", input.bid.id.to_string()); tracing::Span::current().record("status", format!("{:?}", input.new_status)); @@ -51,6 +54,6 @@ impl Service { tracing::error!(error = e.to_string(), "Failed to send update event"); } } - Ok(()) + Ok(is_updated) } } diff --git a/auction-server/src/opportunity/service/add_opportunity.rs b/auction-server/src/opportunity/service/add_opportunity.rs index fb6040d1..b93ef402 100644 --- a/auction-server/src/opportunity/service/add_opportunity.rs +++ b/auction-server/src/opportunity/service/add_opportunity.rs @@ -69,9 +69,7 @@ where let action = self.assess_action(&opportunity_create).await; if let OpportunityAction::Ignore = action { tracing::info!("Submitted opportunity ignored: {:?}", opportunity_create); - return Err(RestError::BadParameters( - "Same opportunity is submitted recently".to_string(), - )); + return Err(RestError::DuplicateOpportunity); } self.verify_opportunity(VerifyOpportunityInput { diff --git a/auction-server/src/opportunity/service/get_quote.rs b/auction-server/src/opportunity/service/get_quote.rs index 8013ae69..f78a3899 100644 --- a/auction-server/src/opportunity/service/get_quote.rs +++ b/auction-server/src/opportunity/service/get_quote.rs @@ -10,13 +10,11 @@ use { entities::{ Auction, BidPaymentInstructionType, - BidStatus, BidStatusAuction, }, service::{ add_auction::AddAuctionInput, auction_manager::AuctionManager, - get_auction_by_id::GetAuctionByIdInput, get_pending_bids::GetLiveBidsInput, update_bid_status::UpdateBidStatusInput, Service as AuctionService, @@ -41,7 +39,6 @@ use { }, axum_prometheus::metrics, express_relay_api_types::opportunity::ProgramSvm, - futures::future::join_all, solana_sdk::pubkey::Pubkey, spl_associated_token_account::get_associated_token_address_with_program_id, std::{ @@ -446,10 +443,39 @@ impl Service { self.remove_quote_opportunity(opportunity.clone()).await; let signature = winner_bid.chain_data.transaction.signatures[0]; - join_all(auction.bids.iter().map(|bid| { - auction_service.update_bid_status(UpdateBidStatusInput { + // Update the status of all bids in the auction except the winner bid + let auction_bids = auction.bids.clone(); + auction_bids.into_iter().for_each(|bid| { + if bid.id != winner_bid.id { + self.task_tracker.spawn({ + let (auction_service, winner_bid) = + (auction_service.clone(), winner_bid.clone()); + async move { + auction_service + .update_bid_status(UpdateBidStatusInput { + new_status: AuctionService::get_new_status( + &bid, + &vec![winner_bid], + BidStatusAuction { + tx_hash: signature, + id: auction.id, + }, + false, + ), + bid, + }) + .await + } + }); + } + }); + + // We check if the winner bid status is successfully updated. + // This is important for the submit_quote function to work correctly. + if !auction_service + .update_bid_status(UpdateBidStatusInput { new_status: AuctionService::get_new_status( - bid, + winner_bid, &vec![winner_bid.clone()], BidStatusAuction { tx_hash: signature, @@ -457,36 +483,14 @@ impl Service { }, false, ), - bid: bid.clone(), + bid: winner_bid.clone(), }) - })) - .await; - - // We check if the winner bid status is successfully updated. - // This is important for the submit_quote function to work correctly. - match auction_service - .get_auction_by_id(GetAuctionByIdInput { - auction_id: auction.id, - }) - .await + .await? { - Some(auction) => match auction.bids.iter().find(|bid| bid.id == winner_bid.id) { - Some(bid) => { - if !bid.status.is_awaiting_signature() { - tracing::error!(winner_bid = ?winner_bid, opportunity = ?opportunity, "Failed to update winner bid status"); - return Err(RestError::TemporarilyUnavailable); - } - } - None => { - tracing::error!(auction = ?auction, winner_bid = ?winner_bid, "Failed to winner bid from auction"); - return Err(RestError::TemporarilyUnavailable); - } - }, - None => { - tracing::error!(auction = ?auction, opportunity = ?opportunity, winner_bid = ?winner_bid, "Failed to get auction by id"); - return Err(RestError::TemporarilyUnavailable); - } - }; + // This can only happen if the bid is already updated by another auction for another get_quote request + // TODO We should handle this case more gracefully + return Err(RestError::DuplicateOpportunity); + } let metadata = self .get_express_relay_metadata(GetExpressRelayMetadata { diff --git a/auction-server/src/opportunity/service/mod.rs b/auction-server/src/opportunity/service/mod.rs index f0456e1f..45466951 100644 --- a/auction-server/src/opportunity/service/mod.rs +++ b/auction-server/src/opportunity/service/mod.rs @@ -46,6 +46,7 @@ use { sync::Arc, }, tokio::sync::RwLock, + tokio_util::task::TaskTracker, }; pub mod add_opportunity; @@ -258,18 +259,25 @@ impl ChainType for ChainTypeSvm { // TODO maybe just create a service per chain_id? pub struct Service = DB> { - store: Arc, + store: Arc, // TODO maybe after adding state for opportunity we can remove the arc - repo: Arc>, - config: HashMap, + repo: Arc>, + config: HashMap, + task_tracker: TaskTracker, } impl> Service { - pub fn new(store: Arc, db: U, config: HashMap) -> Self { + pub fn new( + store: Arc, + task_tracker: TaskTracker, + db: U, + config: HashMap, + ) -> Self { Self { store, repo: Arc::new(Repository::new(db)), config, + task_tracker, } } pub async fn update_metrics(&self) { @@ -333,6 +341,7 @@ pub mod tests { let service = Service::>::new( store.clone(), + TaskTracker::new(), db, chains_svm, ); diff --git a/auction-server/src/server.rs b/auction-server/src/server.rs index f46fde14..1484d11e 100644 --- a/auction-server/src/server.rs +++ b/auction-server/src/server.rs @@ -331,6 +331,7 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> { opportunity_service::ChainTypeEvm, >::new( store.clone(), + task_tracker.clone(), pool.clone(), config_opportunity_service_evm, )); @@ -338,6 +339,7 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> { opportunity_service::ChainTypeSvm, >::new( store.clone(), + task_tracker.clone(), pool.clone(), config_opportunity_service_svm, ));