Skip to content

Commit

Permalink
Move settlement queue to the driver (#3129)
Browse files Browse the repository at this point in the history
# Description
This is a follow-up to
#3116, which addresses a
[suggestion](#3116 (review))
of moving the settlement queue to the driver. This should help avoid
increasing the block deadline since it can still be calculated starting
from the simulation block.

# Changes

- [ ] Move the settlement queue to the driver. Each solver has its own
driver endpoint and, now, its own settlement queue.
- [ ] Calculate the deadline starting from the simulation block(revert
to the previous version).
- [ ] Do not send a settle request to the solver once the deadline is
reached.
- [ ] A new driver's config to configure the max queue size.

cowprotocol/infrastructure#2241 should be
reverted.

## How to test
Existing tests. More driver tests will be implemented once the approach
is approved.
  • Loading branch information
squadgazzz authored Dec 23, 2024
1 parent d331509 commit 59091c1
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 66 deletions.
52 changes: 5 additions & 47 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use {
database::order_events::OrderEventLabel,
ethcontract::U256,
ethrpc::block_stream::BlockInfo,
futures::{future::BoxFuture, FutureExt, TryFutureExt},
futures::{FutureExt, TryFutureExt},
itertools::Itertools,
model::solver_competition::{
CompetitionAuction,
Expand All @@ -38,7 +38,7 @@ use {
sync::Arc,
time::{Duration, Instant},
},
tokio::sync::{mpsc, Mutex},
tokio::sync::Mutex,
tracing::Instrument,
};

Expand Down Expand Up @@ -66,9 +66,6 @@ pub struct RunLoop {
/// Maintenance tasks that should run before every runloop to have
/// the most recent data available.
maintenance: Arc<Maintenance>,
/// Queues by solver for executing settle futures one by one guaranteeing
/// FIFO execution order.
settlement_queues: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<BoxFuture<'static, ()>>>>>,
}

impl RunLoop {
Expand All @@ -93,7 +90,6 @@ impl RunLoop {
in_flight_orders: Default::default(),
liveness,
maintenance,
settlement_queues: Arc::new(Mutex::new(HashMap::new())),
}
}

Expand Down Expand Up @@ -238,7 +234,7 @@ impl RunLoop {
}

let competition_simulation_block = self.eth.current_block().borrow().number;
let block_deadline = auction.block + self.config.submission_deadline;
let block_deadline = competition_simulation_block + self.config.submission_deadline;

// Post-processing should not be executed asynchronously since it includes steps
// of storing all the competition/auction-related data to the DB.
Expand Down Expand Up @@ -345,45 +341,7 @@ impl RunLoop {
}
.instrument(tracing::Span::current());

let sender = self.get_settlement_queue_sender(&driver.name).await;
if let Err(err) = sender.send(Box::pin(settle_fut)) {
tracing::warn!(driver = %driver.name, ?err, "failed to send settle future to queue");
}
}

/// Retrieves or creates the settlement queue sender for a given driver.
///
/// This function ensures that there is a settlement execution queue
/// associated with the specified `driver_name`. If a queue already
/// exists, it returns the existing sender. If not, it creates a new
/// queue, starts a background task to process settlement futures,
/// guaranteeing FIFO execution order for settlements per driver, and
/// returns the new sender.
async fn get_settlement_queue_sender(
self: &Arc<Self>,
driver_name: &str,
) -> mpsc::UnboundedSender<BoxFuture<'static, ()>> {
let mut settlement_queues = self.settlement_queues.lock().await;
match settlement_queues.get(driver_name) {
Some(sender) => sender.clone(),
None => {
let (tx, mut rx) = mpsc::unbounded_channel::<BoxFuture<'static, ()>>();
let driver_name = driver_name.to_string();
let self_ = self.clone();

settlement_queues.insert(driver_name.clone(), tx.clone());

tokio::spawn(async move {
while let Some(fut) = rx.recv().await {
fut.await;
}

tracing::info!(driver = %driver_name, "settlement execution queue stopped");
self_.settlement_queues.lock().await.remove(&driver_name);
});
tx
}
}
tokio::spawn(settle_fut);
}

async fn post_processing(
Expand Down Expand Up @@ -801,7 +759,7 @@ impl RunLoop {
let current_block = self.eth.current_block().borrow().number;
anyhow::ensure!(
current_block < submission_deadline_latest_block,
"submission deadline was missed while waiting for the settlement queue"
"submission deadline was missed"
);

let request = settle::Request {
Expand Down
2 changes: 1 addition & 1 deletion crates/driver/src/domain/competition/auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ impl From<eth::U256> for Price {
/// All auction prices
pub type Prices = HashMap<eth::TokenAddress, Price>;

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Id(pub i64);

impl Id {
Expand Down
128 changes: 124 additions & 4 deletions crates/driver/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
Mempools,
},
crate::{
domain::{competition::solution::Settlement, eth},
domain::{competition::solution::Settlement, eth, time::DeadlineExceeded},
infra::{
self,
blockchain::Ethereum,
Expand All @@ -25,6 +25,8 @@ use {
sync::{Arc, Mutex},
},
tap::TapFallible,
tokio::sync::{mpsc, oneshot},
tracing::Instrument,
};

pub mod auction;
Expand All @@ -39,6 +41,8 @@ pub use {
solution::Solution,
};

use crate::domain::BlockNo;

/// An ongoing competition. There is one competition going on per solver at any
/// time. The competition stores settlements to solutions generated by the
/// driver, and allows them to be executed onchain when requested later. The
Expand All @@ -54,9 +58,41 @@ pub struct Competition {
/// Cached solutions with the most recent solutions at the front.
pub settlements: Mutex<VecDeque<Settlement>>,
pub bad_tokens: Arc<bad_tokens::Detector>,
settle_queue: mpsc::Sender<SettleRequest>,
}

impl Competition {
pub fn new(
solver: Solver,
eth: Ethereum,
liquidity: infra::liquidity::Fetcher,
simulator: Simulator,
mempools: Mempools,
bad_tokens: Arc<bad_tokens::Detector>,
) -> Arc<Self> {
let (settle_sender, settle_receiver) = mpsc::channel(solver.settle_queue_size());

let competition = Arc::new(Self {
solver,
eth,
liquidity,
simulator,
mempools,
settlements: Default::default(),
settle_queue: settle_sender,
bad_tokens,
});

let competition_clone = Arc::clone(&competition);
tokio::spawn(async move {
competition_clone
.process_settle_requests(settle_receiver)
.await;
});

competition
}

/// Solve an auction as part of this competition.
pub async fn solve(&self, auction: Auction) -> Result<Option<Solved>, Error> {
let auction = &self
Expand Down Expand Up @@ -311,17 +347,92 @@ impl Competition {
/// [`Competition::solve`] to generate the solution.
pub async fn settle(
&self,
auction_id: Option<i64>,
auction_id: Option<auction::Id>,
solution_id: u64,
submission_deadline: BlockNo,
) -> Result<Settled, Error> {
let (response_sender, response_receiver) = oneshot::channel();

let request = SettleRequest {
auction_id,
solution_id,
submission_deadline,
response_sender,
};

self.settle_queue.try_send(request).map_err(|err| {
tracing::warn!(?err, "Failed to enqueue /settle request");
Error::SubmissionError
})?;

response_receiver.await.map_err(|err| {
tracing::error!(?err, "Failed to dequeue /settle response");
Error::SubmissionError
})?
}

pub fn ensure_settle_queue_capacity(&self) -> Result<(), Error> {
if self.settle_queue.capacity() == 0 {
tracing::warn!("settlement queue is full; auction is rejected");
Err(Error::SettlementQueueIsFull)
} else {
Ok(())
}
}

async fn process_settle_requests(
self: Arc<Self>,
mut settle_receiver: mpsc::Receiver<SettleRequest>,
) {
while let Some(request) = settle_receiver.recv().await {
let SettleRequest {
auction_id,
solution_id,
submission_deadline,
response_sender,
} = request;
let solver = self.solver.name().as_str();
async {
if self.eth.current_block().borrow().number >= submission_deadline {
if let Err(err) = response_sender.send(Err(DeadlineExceeded.into())) {
tracing::error!(
?err,
"settle deadline exceeded. unable to return a response"
);
}
return;
}

observe::settling();
let result = self
.process_settle_request(auction_id, solution_id, submission_deadline)
.await;
observe::settled(self.solver.name(), &result);

if let Err(err) = response_sender.send(result) {
tracing::error!(?err, "Failed to send /settle response");
}
}
.instrument(
tracing::info_span!("/settle", solver, auction_id = ?auction_id.map(|id| id.0)),
)
.await
}
}

async fn process_settle_request(
&self,
auction_id: Option<auction::Id>,
solution_id: u64,
submission_deadline: u64,
submission_deadline: BlockNo,
) -> Result<Settled, Error> {
let settlement = {
let mut lock = self.settlements.lock().unwrap();
let index = lock
.iter()
.position(|s| {
s.solution().get() == solution_id
&& auction_id.is_none_or(|id| s.auction_id.0 == id)
&& auction_id.is_none_or(|id| s.auction_id == id)
})
.ok_or(Error::SolutionNotAvailable)?;
// remove settlement to ensure we can't settle it twice by accident
Expand Down Expand Up @@ -425,6 +536,13 @@ fn merge(solutions: impl Iterator<Item = Solution>, auction: &Auction) -> Vec<So
merged
}

struct SettleRequest {
auction_id: Option<auction::Id>,
solution_id: u64,
submission_deadline: BlockNo,
response_sender: oneshot::Sender<Result<Settled, Error>>,
}

/// Solution information sent to the protocol by the driver before the solution
/// ranking happens.
#[derive(Debug)]
Expand Down Expand Up @@ -492,4 +610,6 @@ pub enum Error {
Solver(#[from] solver::Error),
#[error("failed to submit the solution")]
SubmissionError,
#[error("too many pending settlements for the same solver")]
SettlementQueueIsFull,
}
1 change: 1 addition & 0 deletions crates/driver/src/infra/api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl From<competition::Error> for (hyper::StatusCode, axum::Json<Error>) {
competition::Error::DeadlineExceeded(_) => Kind::DeadlineExceeded,
competition::Error::Solver(_) => Kind::SolverFailed,
competition::Error::SubmissionError => Kind::FailedToSubmit,
competition::Error::SettlementQueueIsFull => Kind::SolverFailed,
};
error.into()
}
Expand Down
17 changes: 8 additions & 9 deletions crates/driver/src/infra/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,14 @@ impl Api {
let router = router.with_state(State(Arc::new(Inner {
eth: self.eth.clone(),
solver: solver.clone(),
competition: domain::Competition {
competition: domain::Competition::new(
solver,
eth: self.eth.clone(),
liquidity: self.liquidity.clone(),
simulator: self.simulator.clone(),
mempools: self.mempools.clone(),
settlements: Default::default(),
bad_tokens: Arc::new(bad_tokens),
},
self.eth.clone(),
self.liquidity.clone(),
self.simulator.clone(),
self.mempools.clone(),
Arc::new(bad_tokens),
),
liquidity: self.liquidity.clone(),
tokens: tokens.clone(),
pre_processor: pre_processor.clone(),
Expand Down Expand Up @@ -162,7 +161,7 @@ impl State {
struct Inner {
eth: Ethereum,
solver: Solver,
competition: domain::Competition,
competition: Arc<domain::Competition>,
liquidity: liquidity::Fetcher,
tokens: tokens::Fetcher,
pre_processor: domain::competition::AuctionProcessor,
Expand Down
14 changes: 9 additions & 5 deletions crates/driver/src/infra/api/routes/settle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ mod dto;

use {
crate::{
domain::competition,
domain::{competition, competition::auction},
infra::{
api::{Error, State},
api::{self, Error, State},
observe,
},
},
Expand All @@ -19,23 +19,27 @@ async fn route(
state: axum::extract::State<State>,
req: axum::Json<dto::SettleRequest>,
) -> Result<(), (hyper::StatusCode, axum::Json<Error>)> {
let auction_id = req.auction_id;
let auction_id = req
.auction_id
.map(auction::Id::try_from)
.transpose()
.map_err(Into::<api::routes::AuctionError>::into)?;
let solver = state.solver().name().to_string();

let handle_request = async move {
observe::settling();
let result = state
.competition()
.settle(
req.auction_id,
auction_id,
req.solution_id,
req.submission_deadline_latest_block,
)
.await;
observe::settled(state.solver().name(), &result);
result.map(|_| ()).map_err(Into::into)
}
.instrument(tracing::info_span!("/settle", solver, auction_id));
.instrument(tracing::info_span!("/settle", solver, auction_id = ?auction_id.map(|id| id.0)));

// Handle `/settle` call in a background task to ensure that we correctly
// submit the settlement (or cancellation) on-chain even if the server
Expand Down
1 change: 1 addition & 0 deletions crates/driver/src/infra/api/routes/solve/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ async fn route(
.prioritize(auction, &competition.solver.account().address())
.await;
let result = competition.solve(auction).await;
competition.ensure_settle_queue_capacity()?;
observe::solved(state.solver().name(), &result);
Ok(axum::Json(dto::SolveResponse::new(
result?,
Expand Down
1 change: 1 addition & 0 deletions crates/driver/src/infra/config/file/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ pub async fn load(chain: chain::Id, path: &Path) -> infra::Config {
.bad_token_detection
.metrics_strategy_required_measurements,
},
settle_queue_size: config.settle_queue_size,
}
}))
.await,
Expand Down
Loading

0 comments on commit 59091c1

Please sign in to comment.