Skip to content

Commit

Permalink
feat(builder): add more builder metrics for latency tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Feb 13, 2025
1 parent 1988f58 commit e980b64
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 27 deletions.
6 changes: 5 additions & 1 deletion crates/builder/src/bundle_proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use async_trait::async_trait;
use futures::future;
use futures_util::TryFutureExt;
use linked_hash_map::LinkedHashMap;
use metrics::Histogram;
use metrics::{Counter, Histogram};
use metrics_derive::Metrics;
#[cfg(test)]
use mockall::automock;
Expand Down Expand Up @@ -315,6 +315,8 @@ where
entity_updates: context.entity_updates.into_values().collect(),
});
}

self.metric.bundle_simulation_failures.increment(1);
info!("Bundle gas estimation failed. Retrying after removing rejected op(s).");
}

Expand All @@ -334,6 +336,8 @@ struct BuilderProposerMetric {
bundle_build_ms: Histogram,
#[metric(describe = "the distribution of op simulation time of a bundle.")]
op_simulation_ms: Histogram,
#[metric(describe = "the number of bundle simulation failures.")]
bundle_simulation_failures: Counter,
}

impl<EP, BP> BundleProposerImpl<EP, BP>
Expand Down
54 changes: 35 additions & 19 deletions crates/builder/src/bundle_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use anyhow::{bail, Context};
use async_trait::async_trait;
use futures::Stream;
use futures_util::StreamExt;
use metrics::{Counter, Histogram};
use metrics::Counter;
use metrics_derive::Metrics;
#[cfg(test)]
use mockall::automock;
Expand Down Expand Up @@ -557,9 +557,16 @@ where
fee_increase_count: u64,
) -> anyhow::Result<SendBundleAttemptResult> {
let (nonce, required_fees) = state.transaction_tracker.get_nonce_and_required_fees()?;
let _timer_guard = rundler_utils::guard_timer::CustomTimerGuard::new(
self.metrics.bundle_build_time_ms.clone(),
);

let mut update_idle = |metrics: &BuilderMetric| {
let block_number = state.block_number();
if let Some(last_idle_block) = state.last_idle_block {
metrics
.builder_idle_blocks
.increment(block_number - last_idle_block);
}
state.last_idle_block = Some(block_number);
};

let bundle = match self
.proposer
Expand All @@ -568,9 +575,11 @@ where
{
Ok(bundle) => bundle,
Err(BundleProposerError::NoOperationsInitially) => {
update_idle(&self.metrics);
return Ok(SendBundleAttemptResult::NoOperationsInitially);
}
Err(BundleProposerError::NoOperationsAfterFeeFilter) => {
update_idle(&self.metrics);
return Ok(SendBundleAttemptResult::NoOperationsAfterFeeFilter);
}
Err(e) => bail!("Failed to make bundle: {e:?}"),
Expand All @@ -584,20 +593,23 @@ where
fee_increase_count,
required_fees,
));
update_idle(&self.metrics);
return Ok(SendBundleAttemptResult::NoOperationsAfterSimulation);
};

state.last_idle_block = None;

let BundleTx {
tx,
expected_storage,
op_hashes,
} = bundle_tx;

self.metrics.bundle_txns_sent.increment(1);

let send_result = state
.transaction_tracker
.send_transaction(tx.clone(), &expected_storage)
.send_transaction(tx.clone(), &expected_storage, state.block_number())
.await;
self.metrics.bundle_txns_sent.increment(1);

match send_result {
Ok(tx_hash) => {
Expand Down Expand Up @@ -749,6 +761,7 @@ struct SenderMachineState<T, TRIG> {
send_bundle_response: Option<oneshot::Sender<SendBundleResult>>,
inner: InnerState,
requires_reset: bool,
pub last_idle_block: Option<u64>,
}

impl<T: TransactionTracker, TRIG: Trigger> SenderMachineState<T, TRIG> {
Expand All @@ -759,6 +772,7 @@ impl<T: TransactionTracker, TRIG: Trigger> SenderMachineState<T, TRIG> {
send_bundle_response: None,
inner: InnerState::new(),
requires_reset: false,
last_idle_block: None,
}
}

Expand Down Expand Up @@ -1258,8 +1272,8 @@ struct BuilderMetric {
cancellation_txns_failed: Counter,
#[metric(describe = "the count of state machine errors.")]
state_machine_errors: Counter,
#[metric(describe = "the timespan a bundle is build.")]
bundle_build_time_ms: Histogram,
#[metric(describe = "the count of blocks this builder has been idle.")]
builder_idle_blocks: Counter,
}

impl BuilderMetric {
Expand Down Expand Up @@ -1383,7 +1397,7 @@ mod tests {
// should send the bundle txn
mock_tracker
.expect_send_transaction()
.returning(|_, _| Box::pin(async { Ok(B256::ZERO) }));
.returning(|_, _, _| Box::pin(async { Ok(B256::ZERO) }));

let mut sender = new_sender(mock_proposer, mock_entry_point);

Expand Down Expand Up @@ -1464,6 +1478,7 @@ mod tests {
fee_increase_count: 0,
}),
requires_reset: false,
last_idle_block: None,
};

// first step has no update
Expand Down Expand Up @@ -1516,6 +1531,7 @@ mod tests {
fee_increase_count: 0,
}),
requires_reset: false,
last_idle_block: None,
};

// first and second step has no update
Expand Down Expand Up @@ -1580,6 +1596,7 @@ mod tests {
}),
}),
requires_reset: false,
last_idle_block: None,
};

// step state, block number should trigger move to cancellation
Expand Down Expand Up @@ -1624,6 +1641,7 @@ mod tests {
fee_increase_count: 0,
}),
requires_reset: false,
last_idle_block: None,
};

let mut sender = new_sender(mock_proposer, mock_entry_point);
Expand Down Expand Up @@ -1666,6 +1684,7 @@ mod tests {
fee_increase_count: 0,
}),
requires_reset: false,
last_idle_block: None,
};

let mut sender = new_sender(mock_proposer, mock_entry_point);
Expand Down Expand Up @@ -1721,7 +1740,7 @@ mod tests {
// should send the bundle txn, returns condition not met
mock_tracker
.expect_send_transaction()
.returning(|_, _| Box::pin(async { Err(TransactionTrackerError::ConditionNotMet) }));
.returning(|_, _, _| Box::pin(async { Err(TransactionTrackerError::ConditionNotMet) }));

// should notify proposer that condition was not met
mock_proposer
Expand All @@ -1739,6 +1758,7 @@ mod tests {
underpriced_info: None,
}),
requires_reset: false,
last_idle_block: None,
};

let mut sender = new_sender(mock_proposer, mock_entry_point);
Expand Down Expand Up @@ -1820,14 +1840,10 @@ mod tests {
mock_tracker
.expect_check_for_update()
.returning(|| Box::pin(async { Ok(None) }));
mock_trigger
.expect_last_block()
.once()
.in_sequence(seq)
.return_const(NewHead {
block_number,
block_hash: B256::ZERO,
});
mock_trigger.expect_last_block().return_const(NewHead {
block_number,
block_hash: B256::ZERO,
});
}

fn add_trigger_wait_for_block_last_block(
Expand Down
42 changes: 35 additions & 7 deletions crates/builder/src/transaction_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ use alloy_consensus::Transaction;
use alloy_primitives::B256;
use anyhow::{bail, Context};
use async_trait::async_trait;
use metrics::Gauge;
use metrics::{Gauge, Histogram};
use metrics_derive::Metrics;
#[cfg(test)]
use mockall::automock;
use rundler_provider::{EvmProvider, TransactionRequest};
use rundler_sim::ExpectedStorage;
use rundler_types::GasFees;
use tokio::time::Instant;
use tracing::{info, warn};

use crate::sender::{TransactionSender, TxSenderError, TxStatus};
Expand Down Expand Up @@ -49,6 +50,7 @@ pub(crate) trait TransactionTracker: Send + Sync {
&mut self,
tx: TransactionRequest,
expected_stroage: &ExpectedStorage,
block_number: u64,
) -> TransactionTrackerResult<B256>;

/// Cancel the abandoned transaction in the tracker.
Expand Down Expand Up @@ -143,6 +145,8 @@ struct PendingTransaction {
tx_hash: B256,
gas_fees: GasFees,
attempt_number: u64,
sent_at_block: Option<u64>,
sent_at_time: Option<Instant>,
}

impl<P, T> TransactionTrackerImpl<P, T>
Expand Down Expand Up @@ -282,6 +286,7 @@ where
&mut self,
tx: TransactionRequest,
expected_storage: &ExpectedStorage,
block_number: u64,
) -> TransactionTrackerResult<B256> {
self.validate_transaction(&tx)?;
let gas_fees = GasFees {
Expand All @@ -308,6 +313,8 @@ where
tx_hash: sent_tx.tx_hash,
gas_fees,
attempt_number: self.attempt_count,
sent_at_block: Some(block_number),
sent_at_time: Some(Instant::now()),
});
self.has_abandoned = false;
self.attempt_count += 1;
Expand All @@ -332,6 +339,8 @@ where
tx_hash: B256::ZERO,
gas_fees,
attempt_number: self.attempt_count,
sent_at_block: None,
sent_at_time: None,
});
};

Expand Down Expand Up @@ -388,6 +397,8 @@ where
tx_hash: cancel_info.tx_hash,
gas_fees,
attempt_number: self.attempt_count,
sent_at_block: None,
sent_at_time: None,
});

self.attempt_count += 1;
Expand All @@ -410,6 +421,8 @@ where
tx_hash: B256::ZERO,
gas_fees,
attempt_number: self.attempt_count,
sent_at_block: None,
sent_at_time: None,
});
};

Expand Down Expand Up @@ -450,6 +463,17 @@ where
gas_price,
is_success,
};

if let Some(sent_at_time) = tx.sent_at_time {
let elapsed = Instant::now().duration_since(sent_at_time);
self.metrics.txn_time_to_mine.record(elapsed.as_secs_f64());
}
if let Some(sent_at_block) = tx.sent_at_block {
self.metrics
.txn_blocks_to_mine
.record((block_number - sent_at_block) as f64);
}

break;
}
}
Expand Down Expand Up @@ -546,6 +570,10 @@ struct TransactionTrackerMetrics {
current_max_fee_per_gas: Gauge,
#[metric(describe = "the maximum priority fee per gas of current transaction.")]
max_priority_fee_per_gas: Gauge,
#[metric(describe = "the blocks it takes for a transaction to mine.")]
txn_blocks_to_mine: Histogram,
#[metric(describe = "the time it takes for a transaction to mine.")]
txn_time_to_mine: Histogram,
}

#[cfg(test)]
Expand Down Expand Up @@ -613,7 +641,7 @@ mod tests {
let exp = ExpectedStorage::default();

// send dummy transaction
let _sent = tracker.send_transaction(tx, &exp).await;
let _sent = tracker.send_transaction(tx, &exp, 0).await;
let nonce_and_fees = tracker.get_nonce_and_required_fees().unwrap();

assert_eq!(
Expand Down Expand Up @@ -659,7 +687,7 @@ mod tests {
let exp = ExpectedStorage::default();

// send dummy transaction
let _sent = tracker.send_transaction(tx, &exp).await;
let _sent = tracker.send_transaction(tx, &exp, 0).await;
let _tracker_update = tracker.check_for_update().await.unwrap();

tracker.abandon();
Expand Down Expand Up @@ -690,7 +718,7 @@ mod tests {

let tx = TransactionRequest::default();
let exp = ExpectedStorage::default();
let sent_transaction = tracker.send_transaction(tx, &exp).await;
let sent_transaction = tracker.send_transaction(tx, &exp, 0).await;

assert!(sent_transaction.is_err());
}
Expand All @@ -717,7 +745,7 @@ mod tests {

let tx = TransactionRequest::default().nonce(0);
let exp = ExpectedStorage::default();
let sent_transaction = tracker.send_transaction(tx, &exp).await;
let sent_transaction = tracker.send_transaction(tx, &exp, 0).await;

assert!(sent_transaction.is_err());
}
Expand All @@ -743,7 +771,7 @@ mod tests {

let tx = TransactionRequest::default().nonce(0);
let exp = ExpectedStorage::default();
tracker.send_transaction(tx, &exp).await.unwrap();
tracker.send_transaction(tx, &exp, 0).await.unwrap();
}

#[tokio::test]
Expand Down Expand Up @@ -842,7 +870,7 @@ mod tests {
let exp = ExpectedStorage::default();

// send dummy transaction
let _sent = tracker.send_transaction(tx, &exp).await;
let _sent = tracker.send_transaction(tx, &exp, 0).await;
let tracker_update = tracker.check_for_update().await.unwrap().unwrap();

assert!(matches!(tracker_update, TrackerUpdate::Mined { .. }));
Expand Down

0 comments on commit e980b64

Please sign in to comment.