Skip to content

Commit

Permalink
feat: Metric collection for opportunities (#402)
Browse files Browse the repository at this point in the history
* feat: Metric collection for opportunities
  • Loading branch information
m30m authored Feb 25, 2025
1 parent c71998b commit 51706ed
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 18 deletions.
2 changes: 1 addition & 1 deletion auction-server/src/auction/repository/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl<T: ChainTrait> Repository<T> {
chain_id,
}
}
pub async fn update_metrics(&self) {
pub(super) async fn update_metrics(&self) {
let label = [("chain_id", self.chain_id.to_string())];
let store = &self.in_memory_store;
metrics::gauge!("in_memory_auctions", &label).set(store.auctions.read().await.len() as f64);
Expand Down
16 changes: 2 additions & 14 deletions auction-server/src/auction/service/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ impl Service<Evm> {
}

const GET_LATEST_BLOCKHASH_INTERVAL_SVM: Duration = Duration::from_secs(5);
const METRIC_COLLECTION_INTERVAL: Duration = Duration::from_secs(1);
impl Service<Svm> {
pub async fn conclude_auction_for_log(
&self,
Expand Down Expand Up @@ -257,19 +256,8 @@ impl Service<Svm> {
Ok(())
}

pub async fn run_metric_collector_loop(&self) -> Result<()> {
let mut exit_check_interval = tokio::time::interval(EXIT_CHECK_INTERVAL);
let mut metric_interval = tokio::time::interval(METRIC_COLLECTION_INTERVAL);
while !SHOULD_EXIT.load(Ordering::Acquire) {
tokio::select! {
_ = metric_interval.tick() => {
self.repo.update_metrics().await;
}
_ = exit_check_interval.tick() => {}
}
}
tracing::info!("Shutting down metric collector svm...");
Ok(())
pub async fn update_metrics(&self) {
self.repo.update_metrics().await;
}

pub async fn run_watcher_loop(&self) -> Result<()> {
Expand Down
6 changes: 6 additions & 0 deletions auction-server/src/opportunity/repository/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use {
super::entities,
crate::kernel::db::DB,
axum_prometheus::metrics,
ethers::types::Address,
express_relay::state::ExpressRelayMetadata,
solana_sdk::pubkey::Pubkey,
Expand Down Expand Up @@ -115,4 +116,9 @@ impl<T: InMemoryStore, U: OpportunityTable<T>> Repository<T, U> {
db,
}
}
pub(super) async fn update_metrics(&self) {
let store = &self.in_memory_store;
metrics::gauge!("in_memory_opportunities")
.set(store.opportunities.read().await.len() as f64);
}
}
3 changes: 3 additions & 0 deletions auction-server/src/opportunity/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@ impl<T: ChainType, U: OpportunityTable<T::InMemoryStore>> Service<T, U> {
config,
}
}
pub async fn update_metrics(&self) {
self.repo.update_metrics().await;
}
}

#[cfg(test)]
Expand Down
29 changes: 26 additions & 3 deletions auction-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,24 @@ where
}
}

async fn metric_collector<F, Fut>(service_name: String, update_metrics: F)
where
F: Fn() -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
let mut exit_check_interval = tokio::time::interval(EXIT_CHECK_INTERVAL);
let mut metric_interval = tokio::time::interval(METRIC_COLLECTION_INTERVAL);
while !SHOULD_EXIT.load(Ordering::Acquire) {
tokio::select! {
_ = metric_interval.tick() => {
update_metrics().await;
}
_ = exit_check_interval.tick() => {}
}
}
tracing::info!("Shutting down metric collector for {}...", service_name);
}

async fn fetch_access_tokens(db: &PgPool) -> HashMap<models::AccessTokenToken, models::Profile> {
let access_tokens = sqlx::query_as!(
models::AccessToken,
Expand Down Expand Up @@ -534,11 +552,11 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> {
async {
let metric_loops = auction_services.iter().filter_map(|(chain_id, service)| {
if let auction_service::ServiceEnum::Svm(service) = service {
Some(fault_tolerant_handler(
format!("metric loop for chain {}", chain_id.clone()),
Some(metric_collector(
format!("auction service for chain {}", chain_id.clone()),
|| {
let service = service.clone();
async move { service.run_metric_collector_loop().await }
async move { service.update_metrics().await }
},
))
} else {
Expand Down Expand Up @@ -569,6 +587,10 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> {
fault_tolerant_handler("svm verification loop".to_string(), || {
run_verification_loop(store_new.opportunity_service_svm.clone())
}),
metric_collector("opportunity store".to_string(), || {
let service = store_new.opportunity_service_svm.clone();
async move { service.update_metrics().await }
}),
fault_tolerant_handler("start api".to_string(), || api::start_api(
run_options.clone(),
store_new.clone(),
Expand Down Expand Up @@ -615,3 +637,4 @@ fn setup_chain_store_svm(config_map: ConfigMap) -> Result<HashMap<ChainId, Chain
// we don't rely on global state for anything else.
pub(crate) static SHOULD_EXIT: AtomicBool = AtomicBool::new(false);
pub const EXIT_CHECK_INTERVAL: Duration = Duration::from_secs(1);
const METRIC_COLLECTION_INTERVAL: Duration = Duration::from_secs(1);

0 comments on commit 51706ed

Please sign in to comment.