Skip to content

Commit

Permalink
fatxpool: do not use individual transaction listeners (#7316)
Browse files Browse the repository at this point in the history
#### Description
During 2s block investigation it turned out that
[ForkAwareTxPool::register_listeners](https://github.com/paritytech/polkadot-sdk/blob/master/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs#L1036)
call takes significant amount of time.
```
register_listeners: at HashAndNumber { number: 12, hash: 0xe9a1...0b1d2 } took 200.041933ms
register_listeners: at HashAndNumber { number: 13, hash: 0x5eb8...a87c6 } took 264.487414ms
register_listeners: at HashAndNumber { number: 14, hash: 0x30cb...2e6ec } took 340.525566ms
register_listeners: at HashAndNumber { number: 15, hash: 0x0450...4f05c } took 405.686659ms
register_listeners: at HashAndNumber { number: 16, hash: 0xfa6f...16c20 } took 477.977836ms
register_listeners: at HashAndNumber { number: 17, hash: 0x5474...5d0c1 } took 483.046029ms
register_listeners: at HashAndNumber { number: 18, hash: 0x3ca5...37b78 } took 482.715468ms
register_listeners: at HashAndNumber { number: 19, hash: 0xbfcc...df254 } took 484.206999ms
register_listeners: at HashAndNumber { number: 20, hash: 0xd748...7f027 } took 414.635236ms
register_listeners: at HashAndNumber { number: 21, hash: 0x2baa...f66b5 } took 418.015897ms
register_listeners: at HashAndNumber { number: 22, hash: 0x5f1d...282b5 } took 423.342397ms
register_listeners: at HashAndNumber { number: 23, hash: 0x7a18...f2d03 } took 472.742939ms
register_listeners: at HashAndNumber { number: 24, hash: 0xc381...3fd07 } took 489.625557ms
```

This PR implements the idea outlined in #7071. Instead of having a
separate listener for every transaction in each view, we now use a
single stream of aggregated events per view, with each stream providing
events for all transactions in that view. Each event is represented as a
tuple: (transaction-hash, transaction-status). This significantly reduce
the time required for `maintain`.

#### Review Notes
- single aggregated stream, provided by the individual view delivers
events in form of `(transaction-hash, transaction-status)`,
- `MultiViewListener` now has a task. This task is responsible for:
- polling the stream map (which consists of individual view's aggregated
streams) and the `controller_receiver` which provides side-channel
[commands](https://github.com/paritytech/polkadot-sdk/blob/2b18e080cfcd6b56ee638c729f891154e566e52e/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs#L68-L95)
(like `AddView` or `FinalizeTransaction`) sent from the _transaction
pool_.
- dispatching individual transaction statuses and control commands into
the external (created via API, e.g. over RPC) listeners of individual
transactions,
- external listener is responsible for status handling _logic_ (e.g.
deduplication of events, or ignoring some of them) and triggering
statuses to external world (_this was not changed_).
- level of debug messages was adjusted (per-tx messages shall be
_trace_),

Closes #7071

---------

Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
  • Loading branch information
michalkucharczyk and skunert authored Feb 4, 2025
1 parent 37446fc commit aa42deb
Show file tree
Hide file tree
Showing 14 changed files with 740 additions and 606 deletions.
4 changes: 2 additions & 2 deletions substrate/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use std::{
use codec::{Decode, Encode};
use futures::{pin_mut, FutureExt, StreamExt};
use jsonrpsee::RpcModule;
use log::{debug, error, warn};
use log::{debug, error, trace, warn};
use sc_client_api::{blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ProofProvider};
use sc_network::{
config::MultiaddrWithPeerId, service::traits::NetworkService, NetworkBackend, NetworkBlock,
Expand Down Expand Up @@ -538,7 +538,7 @@ where
{
Ok(_) => {
let elapsed = start.elapsed();
debug!(target: sc_transaction_pool::LOG_TARGET, "import transaction: {elapsed:?}");
trace!(target: sc_transaction_pool::LOG_TARGET, "import transaction: {elapsed:?}");
TransactionImport::NewGood
},
Err(e) => match e.into_pool_error() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub enum DroppedReason<Hash> {
}

/// Dropped-logic related event from the single view.
pub type ViewStreamEvent<C> = crate::graph::DroppedByLimitsEvent<ExtrinsicHash<C>, BlockHash<C>>;
pub type ViewStreamEvent<C> = crate::graph::TransactionStatusEvent<ExtrinsicHash<C>, BlockHash<C>>;

/// Dropped-logic stream of events coming from the single view.
type ViewStream<C> = Pin<Box<dyn futures::Stream<Item = ViewStreamEvent<C>> + Send>>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use super::{
import_notification_sink::MultiViewImportNotificationSink,
metrics::MetricsLink as PrometheusMetrics,
multi_view_listener::MultiViewListener,
tx_mem_pool::{InsertionInfo, TxInMemPool, TxMemPool, TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER},
tx_mem_pool::{InsertionInfo, TxMemPool, TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER},
view::View,
view_store::ViewStore,
};
Expand Down Expand Up @@ -193,7 +193,9 @@ where
future_limits: crate::PoolLimit,
mempool_max_transactions_count: usize,
) -> (Self, ForkAwareTxPoolTask) {
let listener = Arc::from(MultiViewListener::new());
let (listener, listener_task) = MultiViewListener::new_with_worker();
let listener = Arc::new(listener);

let (import_notification_sink, import_notification_sink_task) =
MultiViewImportNotificationSink::new_with_worker();

Expand All @@ -220,6 +222,7 @@ where

let combined_tasks = async move {
tokio::select! {
_ = listener_task => {},
_ = import_notification_sink_task => {},
_ = dropped_monitor_task => {}
}
Expand Down Expand Up @@ -279,14 +282,7 @@ where
match dropped.reason {
DroppedReason::Usurped(new_tx_hash) => {
if let Some(new_tx) = mempool.get_by_hash(new_tx_hash) {
view_store
.replace_transaction(
new_tx.source(),
new_tx.tx(),
tx_hash,
new_tx.is_watched(),
)
.await;
view_store.replace_transaction(new_tx.source(), new_tx.tx(), tx_hash).await;
} else {
trace!(
target: LOG_TARGET,
Expand Down Expand Up @@ -318,7 +314,10 @@ where
finalized_hash: Block::Hash,
) -> Self {
let metrics = PrometheusMetrics::new(prometheus);
let listener = Arc::from(MultiViewListener::new());

let (listener, listener_task) = MultiViewListener::new_with_worker();
let listener = Arc::new(listener);

let (revalidation_queue, revalidation_task) =
revalidation_worker::RevalidationQueue::new_with_worker();

Expand Down Expand Up @@ -347,6 +346,7 @@ where

let combined_tasks = async move {
tokio::select! {
_ = listener_task => {}
_ = revalidation_task => {},
_ = import_notification_sink_task => {},
_ = dropped_monitor_task => {}
Expand Down Expand Up @@ -1077,6 +1077,7 @@ where
)
};

let start = Instant::now();
// 1. Capture all import notification from the very beginning, so first register all
//the listeners.
self.import_notification_sink.add_view(
Expand All @@ -1089,39 +1090,38 @@ where
view.pool.validated_pool().create_dropped_by_limits_stream().boxed(),
);

let start = Instant::now();
let watched_xts = self.register_listeners(&mut view).await;
let duration = start.elapsed();
self.view_store.listener.add_view_aggregated_stream(
view.at.hash,
view.pool.validated_pool().create_aggregated_stream().boxed(),
);
// sync the transactions statuses and referencing views in all the listeners with newly
// cloned view.
view.pool.validated_pool().retrigger_notifications();
debug!(
target: LOG_TARGET,
?at,
?duration,
duration = ?start.elapsed(),
"register_listeners"
);

// 2. Handle transactions from the tree route. Pruning transactions from the view first
// will make some space for mempool transactions in case we are at the view's limits.
let start = Instant::now();
self.update_view_with_fork(&view, tree_route, at.clone()).await;
let duration = start.elapsed();
debug!(
target: LOG_TARGET,
?at,
?duration,
duration = ?start.elapsed(),
"update_view_with_fork"
);

// 3. Finally, submit transactions from the mempool.
let start = Instant::now();
self.update_view_with_mempool(&mut view, watched_xts).await;
let duration = start.elapsed();
self.update_view_with_mempool(&mut view).await;
debug!(
target: LOG_TARGET,
?at,
?duration,
duration= ?start.elapsed(),
"update_view_with_mempool"
);
let view = Arc::from(view);
Expand Down Expand Up @@ -1173,53 +1173,6 @@ where
all_extrinsics
}

/// For every watched transaction in the mempool registers a transaction listener in the view.
///
/// The transaction listener for a given view is also added to multi-view listener. This allows
/// to track aggreagated progress of the transaction within the transaction pool.
///
/// Function returns a list of currently watched transactions in the mempool.
async fn register_listeners(
&self,
view: &View<ChainApi>,
) -> Vec<(ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>)> {
debug!(
target: LOG_TARGET,
view_at = ?view.at,
xts_count = ?self.mempool.unwatched_and_watched_count(),
active_views_count = self.active_views_count(),
"register_listeners"
);

//todo [#5495]: maybe we don't need to register listener in view? We could use
// multi_view_listener.transaction_in_block
let results = self
.mempool
.clone_watched()
.into_iter()
.map(|(tx_hash, tx)| {
let watcher = view.create_watcher(tx_hash);
let at = view.at.clone();
async move {
trace!(
target: LOG_TARGET,
?tx_hash,
at = ?at.hash,
"adding watcher"
);
self.view_store.listener.add_view_watcher_for_tx(
tx_hash,
at.hash,
watcher.into_stream().boxed(),
);
(tx_hash, tx)
}
})
.collect::<Vec<_>>();

future::join_all(results).await
}

/// Updates the given view with the transactions from the internal mempol.
///
/// All transactions from the mempool (excluding those which are either already imported or
Expand All @@ -1229,15 +1182,7 @@ where
/// If there are no views, and mempool transaction is reported as invalid for the given view,
/// the transaction is reported as invalid and removed from the mempool. This does not apply to
/// stale and temporarily banned transactions.
///
/// As the listeners for watched transactions were registered at the very beginning of maintain
/// procedure (`register_listeners`), this function accepts the list of watched transactions
/// from the mempool for which listener was actually registered to avoid submit/maintain races.
async fn update_view_with_mempool(
&self,
view: &View<ChainApi>,
watched_xts: Vec<(ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>)>,
) {
async fn update_view_with_mempool(&self, view: &View<ChainApi>) {
debug!(
target: LOG_TARGET,
view_at = ?view.at,
Expand All @@ -1247,15 +1192,16 @@ where
);
let included_xts = self.extrinsics_included_since_finalized(view.at.hash).await;

let (hashes, xts_filtered): (Vec<_>, Vec<_>) = watched_xts
let (hashes, xts_filtered): (Vec<_>, Vec<_>) = self
.mempool
.clone_transactions()
.into_iter()
.chain(self.mempool.clone_unwatched().into_iter())
.filter(|(hash, _)| !view.is_imported(hash))
.filter(|(hash, _)| !included_xts.contains(&hash))
.map(|(tx_hash, tx)| (tx_hash, (tx.source(), tx.tx())))
.unzip();

let watched_results = view
let results = view
.submit_many(xts_filtered)
.await
.into_iter()
Expand All @@ -1267,7 +1213,7 @@ where
})
.collect::<Vec<_>>();

let submitted_count = watched_results.len();
let submitted_count = results.len();

debug!(
target: LOG_TARGET,
Expand All @@ -1283,9 +1229,9 @@ where
// if there are no views yet, and a single newly created view is reporting error, just send
// out the invalid event, and remove transaction.
if self.view_store.is_empty() {
for result in watched_results {
for result in results {
if let Err(tx_hash) = result {
self.view_store.listener.invalidate_transactions(&[tx_hash]);
self.view_store.listener.transactions_invalidated(&[tx_hash]);
self.mempool.remove_transaction(&tx_hash);
}
}
Expand Down Expand Up @@ -1619,9 +1565,9 @@ where

info!(
target: LOG_TARGET,
mempool_len = format!("{:?}", self.mempool_len()),
txs = ?self.mempool_len(),
active_views_count = self.active_views_count(),
views_stats = ?self.views_stats(),
views = ?self.views_stats(),
?event,
?duration,
"maintain"
Expand Down
Loading

0 comments on commit aa42deb

Please sign in to comment.