Skip to content

Commit

Permalink
Add better conclusion loop (#401)
Browse files Browse the repository at this point in the history
  • Loading branch information
danimhr authored Feb 25, 2025
1 parent 53f0f23 commit c71998b
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 69 deletions.
1 change: 1 addition & 0 deletions auction-server/src/auction/repository/add_auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ impl<T: ChainTrait> Repository<T> {
.insert(auction.id, auction);
}

// NOTE: Do not call this function directly. Instead call `add_auction` from `Service`.
#[tracing::instrument(skip_all, name = "add_auction_repo", fields(auction_id))]
pub async fn add_auction(
&self,
Expand Down
17 changes: 14 additions & 3 deletions auction-server/src/auction/service/add_auction.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use {
super::{
auction_manager::AuctionManager,
ChainTrait,
Service,
},
Expand All @@ -13,14 +14,24 @@ pub struct AddAuctionInput<T: ChainTrait> {
pub auction: entities::Auction<T>,
}

impl<T: ChainTrait> Service<T> {
impl<T: ChainTrait> Service<T>
where
Service<T>: AuctionManager<T>,
{
pub async fn add_auction(
&self,
input: AddAuctionInput<T>,
) -> Result<entities::Auction<T>, RestError> {
self.repo.add_auction(input.auction).await.map_err(|e| {
let auction = self.repo.add_auction(input.auction).await.map_err(|e| {
tracing::error!(error = ?e, "Failed to add auction");
RestError::TemporarilyUnavailable
})
})?;
self.task_tracker.spawn({
let service = self.clone();
async move {
service.conclude_auction_loop(auction.id).await;
}
});
Ok(auction)
}
}
25 changes: 11 additions & 14 deletions auction-server/src/auction/service/auction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ pub trait AuctionManager<T: ChainTrait> {
async fn get_ws_client(&self) -> Result<Self::WsClient>;
/// Get the trigger stream for the ws client to subscribe to new triggers.
async fn get_trigger_stream<'a>(client: &'a Self::WsClient) -> Result<Self::TriggerStream<'a>>;
/// Check if the auction is ready to be concluded based on the trigger.
fn is_ready_to_conclude(trigger: Self::Trigger) -> bool;

/// Get the winner bids for the auction. Sorting bids by bid amount and simulating the bids to determine the winner bids.
async fn get_winner_bids(
Expand Down Expand Up @@ -140,6 +138,9 @@ pub trait AuctionManager<T: ChainTrait> {

/// Check if the auction is expired based on the creation time of the auction.
fn is_auction_expired(auction: &entities::Auction<T>) -> bool;

/// Get the conclusion interval for the auction.
fn get_conclusion_interval() -> Interval;
}

// While we are submitting bids together, increasing this number will have the following effects:
Expand Down Expand Up @@ -169,10 +170,6 @@ impl AuctionManager<Evm> for Service<Evm> {
Ok(block_stream)
}

fn is_ready_to_conclude(_trigger: Self::Trigger) -> bool {
true
}

#[tracing::instrument(skip_all, fields(auction_id, bid_ids, simulation_result))]
async fn get_winner_bids(
&self,
Expand Down Expand Up @@ -331,11 +328,12 @@ impl AuctionManager<Evm> for Service<Evm> {
fn is_auction_expired(auction: &entities::Auction<Evm>) -> bool {
auction.creation_time + BID_MAXIMUM_LIFE_TIME_EVM < OffsetDateTime::now_utc()
}

fn get_conclusion_interval() -> Interval {
interval(Duration::from_secs(4))
}
}

/// This is to make sure we are not missing any transaction.
/// We run this once every minute (150 * 0.4).
const CONCLUSION_TRIGGER_INTERVAL_SVM: u64 = 150;
const BID_MAXIMUM_LIFE_TIME_SVM: Duration = Duration::from_secs(120);
const TRIGGER_DURATION_SVM: Duration = Duration::from_millis(400);

Expand Down Expand Up @@ -391,11 +389,6 @@ impl AuctionManager<Svm> for Service<Svm> {
Ok(TriggerStreamSvm::new(interval(TRIGGER_DURATION_SVM)))
}

fn is_ready_to_conclude(trigger: Self::Trigger) -> bool {
// To make sure we run it once at the beginning
trigger % CONCLUSION_TRIGGER_INTERVAL_SVM == 1
}

#[tracing::instrument(skip_all, fields(auction_id, bid_ids))]
async fn get_winner_bids(
&self,
Expand Down Expand Up @@ -605,6 +598,10 @@ impl AuctionManager<Svm> for Service<Svm> {
fn is_auction_expired(auction: &entities::Auction<Svm>) -> bool {
auction.creation_time + BID_MAXIMUM_LIFE_TIME_SVM * 2 < OffsetDateTime::now_utc()
}

fn get_conclusion_interval() -> Interval {
interval(Duration::from_secs(60))
}
}

const SEND_TRANSACTION_RETRY_COUNT_SVM: i32 = 30;
Expand Down
19 changes: 18 additions & 1 deletion auction-server/src/auction/service/conclude_auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ where

/// Concludes an auction by getting the auction transaction status from the chain.
#[tracing::instrument(skip_all)]
pub async fn conclude_auction(&self, input: ConcludeAuctionInput<T>) -> anyhow::Result<()> {
async fn conclude_auction(&self, input: ConcludeAuctionInput<T>) -> anyhow::Result<()> {
let auction = input.auction;
tracing::info!(chain_id = self.config.chain_id, auction_id = ?auction.id, permission_key = auction.permission_key.to_string(), "Concluding auction");
if let Some(tx_hash) = auction.tx_hash.clone() {
Expand Down Expand Up @@ -121,4 +121,21 @@ where
}
Ok(())
}

pub async fn conclude_auction_loop(&self, auction_id: entities::AuctionId) {
let mut interval = Self::get_conclusion_interval();
loop {
interval.tick().await;
if let Some(auction) = self.repo.get_in_memory_auction_by_id(auction_id).await {
if let Err(e) = self
.conclude_auction(ConcludeAuctionInput { auction })
.await
{
tracing::error!(error = ?e, "Failed to conclude auction");
}
} else {
break;
}
}
}
}
37 changes: 0 additions & 37 deletions auction-server/src/auction/service/conclude_auctions.rs

This file was deleted.

18 changes: 14 additions & 4 deletions auction-server/src/auction/service/handle_auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use {
self,
BidStatus,
},
service::update_bid_status::UpdateBidStatusInput,
service::{
add_auction::AddAuctionInput,
update_bid_status::UpdateBidStatusInput,
},
},
futures::future::join_all,
time::OffsetDateTime,
Expand Down Expand Up @@ -61,7 +64,14 @@ where
return Ok(());
}

let auction = self.repo.add_auction(auction).await?;
let auction = self
.add_auction(AddAuctionInput { auction })
.await
.map_err(|err| {
tracing::error!(error = ?err, "Failed to add auction");
anyhow::anyhow!("Failed to add auction")
})?;

tracing::info!(
auction = ?auction,
chain_id = self.config.chain_id,
Expand All @@ -73,7 +83,7 @@ where
.await
{
Ok(tx_hash) => {
tracing::debug!("Submitted transaction: {:?}", tx_hash);
tracing::debug!(tx_hash = ?tx_hash, "Submitted transaction");
let auction = self.repo.submit_auction(auction, tx_hash.clone()).await?;
join_all(auction.bids.iter().map(|bid| {
self.update_bid_status(UpdateBidStatusInput {
Expand All @@ -92,7 +102,7 @@ where
.await;
}
Err(err) => {
tracing::error!("Transaction failed to submit: {:?}", err);
tracing::error!(error = ?err, "Transaction failed to submit");
}
};
Ok(())
Expand Down
1 change: 0 additions & 1 deletion auction-server/src/auction/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ pub mod add_auction;
pub mod auction_manager;
pub mod cancel_bid;
pub mod conclude_auction;
pub mod conclude_auctions;
pub mod get_auction_by_id;
pub mod get_bid;
pub mod get_bids;
Expand Down
9 changes: 0 additions & 9 deletions auction-server/src/auction/service/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,6 @@ where
service.handle_auctions().await;
}
});

if Service::is_ready_to_conclude(trigger) {
self.task_tracker.spawn({
let service = self.clone();
async move {
service.conclude_auctions().await;
}
});
}
}
_ = exit_check_interval.tick() => {}
}
Expand Down

0 comments on commit c71998b

Please sign in to comment.