Skip to content

Commit

Permalink
Solver participation guard (#3257)
Browse files Browse the repository at this point in the history
# Description
From the original issue:
> When a solver repeatedly wins consecutive auctions but fails to settle
its solutions on-chain, it can lead to system downtime. To prevent this,
the autopilot must have the capability to temporarily exclude such
solvers from participating in competitions. This ensures no single
solver can disrupt the system's operations.

This PR implements it by introducing a new struct, which checks whether
the solver is allowed to participate in the next competition by using
two different approaches:

1. Moved the existing `Authenticator`'s `is_solver` on-chain call into
the new struct.
2. Introduced a new strategy, which finds a non-settling solver using a
SQL query. It selects 3 last auctions(configurable) with a deadline
until the current block to avoid selecting pending settlements and
checks if all of the auctions were settled by the same solver/solvers(in
case of multiple winners). This strategy caches the results to avoid
redundant DB queries. This query relies on the auction_id column from
the settlements table, which gets updated separately by the `Observer`
struct, so the cache gets updated only once the `Observer` has some
result.

These validators are called sequentially to avoid redundant RPC calls to
`Authenticator`. So it first checks for the DB-based validator cache
and, only then, sends the RPC call.

Once one of the strategies says the solver is not allowed to
participate, it gets deny-listed for 5m(configurable).

Each validator can be enabled/disabled separately in case of any issue.

## Metrics

Added a metric that gets populated by the DB-based validator once a
solver is marked as banned. The idea is to create an alert that is sent
if there are more than 4 such occurrences for the last 30 minutes for
the same solver, meaning it should be considered disabling the solver.

# Open discussions

1. Since the current SQL query filters out auctions where a deadline has
not been reached, the following case is possible:
The solver gets banned, while the same solver has a pending settlement.
In case this gets settled, the solver remains banned. While this is a
niche case, it would be better to unblock the solver before the cache
TTL deadline is reached. This has not been implemented in the current PR
since some refactoring is required in the Observer struct. If this is
approved, it can be implemented quickly.

2. Whether it makes sense to introduce a metrics-based strategy similar
to the bad token detector's where the solver gets banned in case >95%(or
similar) of settlements fail.

## How to test
A new SQL query test. Existing e2e tests.

## Related Issues

Fixes #3221 

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- New Features
 - Introduced advanced solver participation controls with configurable
eligibility checks, integrating both on-chain and database validations.
 - Enabled asynchronous real-time notifications for settlement updates,
enhancing system responsiveness.
 - Added metrics tracking to monitor auction participation and
performance.

- Chores
 - Updated internal dependencies and restructured driver configuration.
 - Reorganized the database schema to support improved auction and
settlement processing.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
squadgazzz authored Feb 26, 2025
1 parent 97ddf0c commit 59a1010
Show file tree
Hide file tree
Showing 14 changed files with 628 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/autopilot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ chrono = { workspace = true }
clap = { workspace = true }
contracts = { path = "../contracts" }
cow-amm = { path = "../cow-amm" }
dashmap = { workspace = true }
database = { path = "../database" }
derive_more = { workspace = true }
ethcontract = { workspace = true }
Expand Down
90 changes: 86 additions & 4 deletions crates/autopilot/src/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,32 @@ pub struct Arguments {
/// Archive node URL used to index CoW AMM
#[clap(long, env)]
pub archive_node_url: Option<Url>,

/// Configuration for the solver participation guard.
#[clap(flatten)]
pub db_based_solver_participation_guard: DbBasedSolverParticipationGuardConfig,
}

#[derive(Debug, clap::Parser)]
pub struct DbBasedSolverParticipationGuardConfig {
/// Enables or disables the solver participation guard
#[clap(
id = "db_enabled",
long = "db-based-solver-participation-guard-enabled",
env = "DB_BASED_SOLVER_PARTICIPATION_GUARD_ENABLED",
default_value = "true"
)]
pub enabled: bool,

/// Sets the duration for which the solver remains blacklisted.
/// Technically, the time-to-live for the solver participation blacklist
/// cache.
#[clap(long, env, default_value = "5m", value_parser = humantime::parse_duration)]
pub solver_blacklist_cache_ttl: Duration,

/// The number of last auctions to check solver participation eligibility.
#[clap(long, env, default_value = "3")]
pub solver_last_auctions_participation_count: u32,
}

impl std::fmt::Display for Arguments {
Expand Down Expand Up @@ -290,6 +316,7 @@ impl std::fmt::Display for Arguments {
max_winners_per_auction,
archive_node_url,
max_solutions_per_solver,
db_based_solver_participation_guard,
} = self;

write!(f, "{}", shared)?;
Expand Down Expand Up @@ -373,6 +400,11 @@ impl std::fmt::Display for Arguments {
"max_solutions_per_solver: {:?}",
max_solutions_per_solver
)?;
writeln!(
f,
"db_based_solver_participation_guard: {:?}",
db_based_solver_participation_guard
)?;
Ok(())
}
}
Expand All @@ -384,6 +416,7 @@ pub struct Solver {
pub url: Url,
pub submission_account: Account,
pub fairness_threshold: Option<U256>,
pub requested_timeout_on_problems: bool,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -431,18 +464,31 @@ impl FromStr for Solver {
_ => Account::Address(H160::from_str(parts[2]).context("failed to parse submission")?),
};

let fairness_threshold = match parts.get(3) {
Some(value) => {
Some(U256::from_dec_str(value).context("failed to parse fairness threshold")?)
let mut fairness_threshold: Option<U256> = Default::default();
let mut requested_timeout_on_problems = false;

if let Some(value) = parts.get(3) {
match U256::from_dec_str(value) {
Ok(parsed_fairness_threshold) => {
fairness_threshold = Some(parsed_fairness_threshold);
}
Err(_) => {
requested_timeout_on_problems =
value.to_lowercase() == "requested_timeout_on_problems";
}
}
None => None,
};

if let Some(value) = parts.get(4) {
requested_timeout_on_problems = value.to_lowercase() == "requested_timeout_on_problems";
}

Ok(Self {
name: name.to_owned(),
url,
fairness_threshold,
submission_account,
requested_timeout_on_problems,
})
}
}
Expand Down Expand Up @@ -641,6 +687,7 @@ mod test {
name: "name1".into(),
url: Url::parse("http://localhost:8080").unwrap(),
fairness_threshold: None,
requested_timeout_on_problems: false,
submission_account: Account::Address(H160::from_slice(&hex!(
"C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
))),
Expand All @@ -656,6 +703,7 @@ mod test {
name: "name1".into(),
url: Url::parse("http://localhost:8080").unwrap(),
fairness_threshold: None,
requested_timeout_on_problems: false,
submission_account: Account::Kms(
Arn::from_str("arn:aws:kms:supersecretstuff").unwrap(),
),
Expand All @@ -674,6 +722,40 @@ mod test {
"C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
))),
fairness_threshold: Some(U256::exp10(18)),
requested_timeout_on_problems: false,
};
assert_eq!(driver, expected);
}

#[test]
fn parse_driver_with_accepts_unsettled_blocking_flag() {
let argument =
"name1|http://localhost:8080|0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2|requested_timeout_on_problems";
let driver = Solver::from_str(argument).unwrap();
let expected = Solver {
name: "name1".into(),
url: Url::parse("http://localhost:8080").unwrap(),
submission_account: Account::Address(H160::from_slice(&hex!(
"C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
))),
fairness_threshold: None,
requested_timeout_on_problems: true,
};
assert_eq!(driver, expected);
}

#[test]
fn parse_driver_with_threshold_and_accepts_unsettled_blocking_flag() {
let argument = "name1|http://localhost:8080|0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2|1000000000000000000|requested_timeout_on_problems";
let driver = Solver::from_str(argument).unwrap();
let expected = Solver {
name: "name1".into(),
url: Url::parse("http://localhost:8080").unwrap(),
submission_account: Account::Address(H160::from_slice(&hex!(
"C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
))),
fairness_threshold: Some(U256::exp10(18)),
requested_timeout_on_problems: true,
};
assert_eq!(driver, expected);
}
Expand Down
6 changes: 5 additions & 1 deletion crates/autopilot/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ use {
};

mod participant;
mod participation_guard;

pub use participant::{Participant, Ranked, Unranked};
pub use {
participant::{Participant, Ranked, Unranked},
participation_guard::SolverParticipationGuard,
};

type SolutionId = u64;

Expand Down
111 changes: 111 additions & 0 deletions crates/autopilot/src/domain/competition/participation_guard/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use {
crate::{
domain::{Metrics, eth},
infra,
},
ethrpc::block_stream::CurrentBlockWatcher,
std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
},
};

/// Checks the DB by searching for solvers that won N last consecutive auctions
/// but never settled any of them.
#[derive(Clone)]
pub(super) struct Validator(Arc<Inner>);

struct Inner {
persistence: infra::Persistence,
banned_solvers: dashmap::DashMap<eth::Address, Instant>,
ttl: Duration,
last_auctions_count: u32,
drivers_by_address: HashMap<eth::Address, Arc<infra::Driver>>,
}

impl Validator {
pub fn new(
persistence: infra::Persistence,
current_block: CurrentBlockWatcher,
competition_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
ttl: Duration,
last_auctions_count: u32,
drivers_by_address: HashMap<eth::Address, Arc<infra::Driver>>,
) -> Self {
let self_ = Self(Arc::new(Inner {
persistence,
banned_solvers: Default::default(),
ttl,
last_auctions_count,
drivers_by_address,
}));

self_.start_maintenance(competition_updates_receiver, current_block);

self_
}

/// Update the internal cache only once the competition auctions table is
/// updated to avoid redundant DB queries on each block or any other
/// timeout.
fn start_maintenance(
&self,
mut competition_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
current_block: CurrentBlockWatcher,
) {
let self_ = self.clone();
tokio::spawn(async move {
while competition_updates_receiver.recv().await.is_some() {
let current_block = current_block.borrow().number;
let non_settling_solvers = match self_
.0
.persistence
.find_non_settling_solvers(self_.0.last_auctions_count, current_block)
.await
{
Ok(non_settling_solvers) => non_settling_solvers,
Err(err) => {
tracing::warn!(?err, "error while searching for non-settling solvers");
continue;
}
};

let now = Instant::now();
let non_settling_solver_names: Vec<&str> = non_settling_solvers
.iter()
.filter_map(|solver| self_.0.drivers_by_address.get(solver))
.map(|driver| {
Metrics::get()
.non_settling_solver
.with_label_values(&[&driver.name]);
// Check if solver accepted this feature. This should be removed once the
// CIP making this mandatory has been approved.
if driver.requested_timeout_on_problems {
tracing::debug!(solver = ?driver.name, "disabling solver temporarily");
self_
.0
.banned_solvers
.insert(driver.submission_address, now);
}
driver.name.as_ref()
})
.collect();

tracing::debug!(solvers = ?non_settling_solver_names, "found non-settling solvers");
}
tracing::error!("stream of settlement updates terminated unexpectedly");
});
}
}

#[async_trait::async_trait]
impl super::Validator for Validator {
async fn is_allowed(&self, solver: &eth::Address) -> anyhow::Result<bool> {
if let Some(entry) = self.0.banned_solvers.get(solver) {
return Ok(entry.elapsed() >= self.0.ttl);
}

Ok(true)
}
}
74 changes: 74 additions & 0 deletions crates/autopilot/src/domain/competition/participation_guard/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
mod db;
mod onchain;

use {
crate::{
arguments::DbBasedSolverParticipationGuardConfig,
domain::eth,
infra::{self, Ethereum},
},
std::sync::Arc,
};

/// This struct checks whether a solver can participate in the competition by
/// using different validators.
#[derive(Clone)]
pub struct SolverParticipationGuard(Arc<Inner>);

struct Inner {
/// Stores the validators in order they will be called.
validators: Vec<Box<dyn Validator + Send + Sync>>,
}

impl SolverParticipationGuard {
pub fn new(
eth: Ethereum,
persistence: infra::Persistence,
competition_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
db_based_validator_config: DbBasedSolverParticipationGuardConfig,
drivers: impl IntoIterator<Item = Arc<infra::Driver>>,
) -> Self {
let mut validators: Vec<Box<dyn Validator + Send + Sync>> = Vec::new();

if db_based_validator_config.enabled {
let current_block = eth.current_block().clone();
let database_solver_participation_validator = db::Validator::new(
persistence,
current_block,
competition_updates_receiver,
db_based_validator_config.solver_blacklist_cache_ttl,
db_based_validator_config.solver_last_auctions_participation_count,
drivers
.into_iter()
.map(|driver| (driver.submission_address, driver.clone()))
.collect(),
);
validators.push(Box::new(database_solver_participation_validator));
}

let onchain_solver_participation_validator = onchain::Validator { eth };
validators.push(Box::new(onchain_solver_participation_validator));

Self(Arc::new(Inner { validators }))
}

/// Checks if a solver can participate in the competition.
/// Sequentially asks internal validators to avoid redundant RPC calls in
/// the following order:
/// 1. DB-based validator: operates fast since it uses in-memory cache.
/// 2. Onchain-based validator: only then calls the Authenticator contract.
pub async fn can_participate(&self, solver: &eth::Address) -> anyhow::Result<bool> {
for validator in &self.0.validators {
if !validator.is_allowed(solver).await? {
return Ok(false);
}
}

Ok(true)
}
}

#[async_trait::async_trait]
trait Validator: Send + Sync {
async fn is_allowed(&self, solver: &eth::Address) -> anyhow::Result<bool>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use crate::{domain::eth, infra::Ethereum};

/// Calls Authenticator contract to check if a solver has a sufficient
/// permission.
pub(super) struct Validator {
pub eth: Ethereum,
}

#[async_trait::async_trait]
impl super::Validator for Validator {
async fn is_allowed(&self, solver: &eth::Address) -> anyhow::Result<bool> {
Ok(self
.eth
.contracts()
.authenticator()
.is_solver(solver.0)
.call()
.await?)
}
}
Loading

0 comments on commit 59a1010

Please sign in to comment.