Skip to content

Commit

Permalink
unify timeout configuration in SendExtrinsicOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
snowmead committed Feb 28, 2025
1 parent 99e40f7 commit 0dc3eb6
Show file tree
Hide file tree
Showing 18 changed files with 192 additions and 163 deletions.
18 changes: 14 additions & 4 deletions client/blockchain-service/src/capacity_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::VecDeque;
use std::{collections::VecDeque, time::Duration};

use anyhow::anyhow;
use log::{debug, error};
Expand All @@ -11,7 +11,7 @@ use shc_forest_manager::traits::ForestStorageHandler;
use sp_api::ProvideRuntimeApi;
use sp_core::H256;

use crate::{transaction::SubmittedTransaction, BlockchainService};
use crate::{transaction::SubmittedTransaction, types::SendExtrinsicOptions, BlockchainService};

const LOG_TARGET: &str = "blockchain-service-capacity-manager";

Expand Down Expand Up @@ -304,13 +304,23 @@ where
pallet_storage_providers::Call::change_capacity { new_capacity },
);

let extrinsic_retry_timeout = Duration::from_secs(self.config.extrinsic_retry_timeout);

// Send extrinsic to increase capacity
match self.send_extrinsic(call, Default::default()).await {
match self
.send_extrinsic(call, &SendExtrinsicOptions::new(extrinsic_retry_timeout))
.await
{
Ok(output) => {
// Add all pending requests to the list of requests waiting for inclusion.
if let Some(capacity_manager) = self.capacity_manager.as_mut() {
capacity_manager.add_pending_requests_to_waiting_for_inclusion(
SubmittedTransaction::new(output.receiver, output.hash, output.nonce),
SubmittedTransaction::new(
output.receiver,
output.hash,
output.nonce,
extrinsic_retry_timeout,
),
);
} else {
error!(target: LOG_TARGET, "Capacity manager not initialized");
Expand Down
9 changes: 4 additions & 5 deletions client/blockchain-service/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ pub trait BlockchainServiceInterface {
async fn submit_extrinsic_with_retry(
&self,
call: impl Into<storage_hub_runtime::RuntimeCall> + Send,
options: SendExtrinsicOptions,
retry_strategy: RetryStrategy,
with_events: bool,
) -> Result<Option<StorageHubEventsVec>>;
Expand Down Expand Up @@ -849,6 +850,7 @@ where
async fn submit_extrinsic_with_retry(
&self,
call: impl Into<storage_hub_runtime::RuntimeCall> + Send,
options: SendExtrinsicOptions,
retry_strategy: RetryStrategy,
with_events: bool,
) -> Result<Option<StorageHubEventsVec>> {
Expand All @@ -861,14 +863,11 @@ where
for retry_count in 0..=retry_strategy.max_retries {
debug!(target: LOG_TARGET, "Submitting transaction {:?} with tip {}", call, tip);

let extrinsic_options = SendExtrinsicOptions::new()
let extrinsic_options = SendExtrinsicOptions::new(options.timeout())
.with_tip(tip as u128)
.with_nonce(nonce);

let mut transaction = self
.send_extrinsic(call.clone(), extrinsic_options)
.await?
.with_timeout(retry_strategy.timeout);
let mut transaction = self.send_extrinsic(call.clone(), extrinsic_options).await?;

let result: Result<Option<StorageHubEventsVec>, _> = if with_events {
transaction
Expand Down
7 changes: 6 additions & 1 deletion client/blockchain-service/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ pub struct BlockchainService<FSH>
where
FSH: ForestStorageHandler + Clone + Send + Sync + 'static,
{
/// The configuration for the BlockchainService.
pub(crate) config: BlockchainServiceConfig,
/// The event bus provider.
pub(crate) event_bus_provider: BlockchainServiceEventBusProvider,
/// The parachain client. Used to interact with the runtime.
Expand Down Expand Up @@ -279,13 +281,14 @@ where
call,
options,
callback,
} => match self.send_extrinsic(call, options).await {
} => match self.send_extrinsic(call, &options).await {
Ok(output) => {
debug!(target: LOG_TARGET, "Extrinsic sent successfully: {:?}", output);
match callback.send(Ok(SubmittedTransaction::new(
output.receiver,
output.hash,
output.nonce,
options.timeout(),
))) {
Ok(_) => {
trace!(target: LOG_TARGET, "Receiver sent successfully");
Expand Down Expand Up @@ -1100,6 +1103,7 @@ where
{
/// Create a new [`BlockchainService`].
pub fn new(
config: BlockchainServiceConfig,
client: Arc<ParachainClient>,
keystore: KeystorePtr,
rpc_handlers: Arc<RpcHandlers>,
Expand All @@ -1109,6 +1113,7 @@ where
capacity_request_queue: Option<CapacityRequestQueue>,
) -> Self {
Self {
config,
event_bus_provider: BlockchainServiceEventBusProvider::new(),
client,
keystore,
Expand Down
3 changes: 3 additions & 0 deletions client/blockchain-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod utils;
use std::{path::PathBuf, sync::Arc};

use capacity_manager::{CapacityConfig, CapacityRequestQueue};
use handler::BlockchainServiceConfig;
use sc_service::RpcHandlers;
use sp_keystore::KeystorePtr;

Expand All @@ -21,6 +22,7 @@ pub use self::handler::BlockchainService;

pub async fn spawn_blockchain_service<FSH>(
task_spawner: &TaskSpawner,
config: BlockchainServiceConfig,
client: Arc<ParachainClient>,
keystore: KeystorePtr,
rpc_handlers: Arc<RpcHandlers>,
Expand All @@ -37,6 +39,7 @@ where
.with_group("network");

let blockchain_service = BlockchainService::<FSH>::new(
config,
client,
keystore,
rpc_handlers,
Expand Down
55 changes: 15 additions & 40 deletions client/blockchain-service/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
time::{Duration, Instant},
};

use log::{debug, error, info, warn};
use log::{debug, error, info};
use shc_actors_framework::actor::ActorHandle;
use shc_common::types::StorageHubEventsVec;
use shc_forest_manager::traits::ForestStorageHandler;
Expand All @@ -30,21 +30,18 @@ pub struct SubmittedTransaction {
watcher: Receiver<String>,
/// The hash of the transaction.
hash: ExtrinsicHash,
/// The maximum amount of time to wait for the transaction to either be successful or fail.
timeout: Option<Duration>,
/// The nonce of the transaction.
nonce: u32,
/// The maximum amount of time to wait for the transaction to either be successful or fail.
timeout: Duration,
}

/// TODO: CONSTANTS
const DEFAULT_WATCH_TRANSACTION_TIMEOUT: Duration = Duration::from_secs(60);

impl SubmittedTransaction {
pub fn new(watcher: Receiver<String>, hash: H256, nonce: u32) -> Self {
pub fn new(watcher: Receiver<String>, hash: H256, nonce: u32, timeout: Duration) -> Self {
Self {
watcher,
hash,
timeout: None,
timeout,
nonce,
}
}
Expand All @@ -59,15 +56,6 @@ impl SubmittedTransaction {
self.nonce
}

/// Sets the timeout for the transaction.
///
/// If the transaction is not successful within the specified timeout, it will be considered
/// failed and an error will be returned.
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}

/// Handles the lifecycle of a submitted transaction.
///
/// Waits for the transaction to be included in a block AND the checks the transaction is successful.
Expand Down Expand Up @@ -168,18 +156,14 @@ impl SubmittedTransaction {
// Get the elapsed time since submit.
let elapsed = start_time.elapsed();
// Calculate the remaining time to wait.
let remaining = match self.timeout {
Some(timeout) => {
// Check if the timeout has been reached.
if elapsed > timeout {
error!(target: LOG_TARGET, "Timeout waiting for transaction {} to be included in a block", self.hash);
return Err(WatchTransactionError::Timeout);
}

timeout - elapsed
}
None => DEFAULT_WATCH_TRANSACTION_TIMEOUT,
};
// Check if the timeout has been reached.
if elapsed > self.timeout {
error!(target: LOG_TARGET, "Timeout waiting for transaction {} to be included in a block", self.hash);
return Err(WatchTransactionError::Timeout);
}

let remaining = self.timeout - elapsed;

// Wait for either a new message from the watcher, or the timeout to be reached.
let result = match tokio::time::timeout(remaining, self.watcher.recv()).await {
Expand All @@ -192,20 +176,11 @@ impl SubmittedTransaction {
},
Err(_) => {
// Timeout reached, exit the loop.
match self.timeout {
Some(_) => {
error!(target: LOG_TARGET, "Timeout waiting for transaction to be included in a block");
return Err(WatchTransactionError::Timeout);
}
None => {
// No timeout set, continue waiting.
warn!(target: LOG_TARGET, "No timeout set and {:?} elapsed, continuing to wait for transaction to be included in a block.", DEFAULT_WATCH_TRANSACTION_TIMEOUT);

continue;
}
}
error!(target: LOG_TARGET, "Timeout waiting for transaction to be included in a block");
return Err(WatchTransactionError::Timeout);
}
};

// Parse the JSONRPC string. The strings sent by the RPC wacher should be valid JSONRPC strings.
let json: serde_json::Value = serde_json::from_str(&result).map_err(|_| {
let err_msg = format!(
Expand Down
29 changes: 14 additions & 15 deletions client/blockchain-service/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,17 @@ pub struct SendExtrinsicOptions {
tip: Tip,
/// Optionally override the nonce to use when sending the transaction.
nonce: Option<u32>,
/// Maximum time to wait for a response before assuming the extrinsic submission has failed.
timeout: Duration,
}

impl SendExtrinsicOptions {
pub fn new() -> Self {
Self::default()
pub fn new(timeout: Duration) -> Self {
Self {
tip: Tip::from(0),
nonce: None,
timeout,
}
}

pub fn with_tip(mut self, tip: u128) -> Self {
Expand All @@ -261,13 +267,18 @@ impl SendExtrinsicOptions {
pub fn nonce(&self) -> Option<u32> {
self.nonce
}

pub fn timeout(&self) -> Duration {
self.timeout
}
}

impl Default for SendExtrinsicOptions {
fn default() -> Self {
Self {
tip: Tip::from(0),
nonce: None,
timeout: Duration::from_secs(60),
}
}
}
Expand All @@ -287,8 +298,6 @@ impl Default for SendExtrinsicOptions {
pub struct RetryStrategy {
/// Maximum number of retries after which the extrinsic submission will be considered failed.
pub max_retries: u32,
/// Maximum time to wait for a response before assuming the extrinsic submission has failed.
pub timeout: Duration,
/// Maximum tip to be paid for the extrinsic submission. The progression follows an exponential
/// backoff strategy.
pub max_tip: f64,
Expand All @@ -310,10 +319,9 @@ pub struct RetryStrategy {

impl RetryStrategy {
/// Creates a new `RetryStrategy` instance.
pub fn new(max_retries: u32, timeout: Duration, max_tip: f64, base_multiplier: f64) -> Self {
pub fn new(max_retries: u32, max_tip: f64, base_multiplier: f64) -> Self {
Self {
max_retries,
timeout,
max_tip,
base_multiplier,
should_retry: None,
Expand All @@ -326,14 +334,6 @@ impl RetryStrategy {
self
}

/// Set the timeout for the extrinsic.
///
/// After this timeout, the extrinsic will be retried (if applicable) or fail.
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}

/// Set the maximum tip for the extrinsic.
///
/// As the number of times the extrinsic is retried increases, the tip will increase
Expand Down Expand Up @@ -413,7 +413,6 @@ impl Default for RetryStrategy {
fn default() -> Self {
Self {
max_retries: 5,
timeout: Duration::from_secs(30),
max_tip: 0.0,
base_multiplier: 2.0,
should_retry: None,
Expand Down
2 changes: 1 addition & 1 deletion client/blockchain-service/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ where
pub(crate) async fn send_extrinsic(
&mut self,
call: impl Into<storage_hub_runtime::RuntimeCall>,
options: SendExtrinsicOptions,
options: &SendExtrinsicOptions,
) -> Result<RpcExtrinsicOutput> {
debug!(target: LOG_TARGET, "Sending extrinsic to the runtime");

Expand Down
3 changes: 3 additions & 0 deletions node/src/services/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,13 @@ where

let capacity_config = self.capacity_config.clone();

let blockchain_service_config = self.blockchain_service_config.clone().unwrap_or_default();

let blockchain_service_handle = spawn_blockchain_service::<<(R, S) as ShNodeType>::FSH>(
self.task_spawner
.as_ref()
.expect("Task spawner is not set."),
blockchain_service_config,
client.clone(),
keystore.clone(),
rpc_handlers.clone(),
Expand Down
20 changes: 10 additions & 10 deletions node/src/tasks/bsp_charge_fees.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use shc_blockchain_service::{
LastChargeableInfoUpdated, ProcessStopStoringForInsolventUserRequest,
SpStopStoringInsolventUser, UserWithoutFunds,
},
types::StopStoringForInsolventUserRequest,
types::{SendExtrinsicOptions, StopStoringForInsolventUserRequest},
};
use shc_common::{consts::CURRENT_FOREST_KEY, types::MaxUsersToCharge};
use shc_forest_manager::traits::{ForestStorage, ForestStorageHandler};
Expand Down Expand Up @@ -317,15 +317,15 @@ where
// continue only if it is successful.
self.storage_hub_handler
.blockchain
.send_extrinsic(stop_storing_for_insolvent_user_call, Default::default())
.await?
.with_timeout(Duration::from_secs(
self.storage_hub_handler
.provider_config
.blockchain_service
.extrinsic_retry_timeout,
))
.watch_for_success(&self.storage_hub_handler.blockchain)
.send_extrinsic(
stop_storing_for_insolvent_user_call,
SendExtrinsicOptions::new(Duration::from_secs(
self.storage_hub_handler
.provider_config
.blockchain_service
.extrinsic_retry_timeout,
)),
)
.await?;

trace!(target: LOG_TARGET, "Stop storing submitted successfully");
Expand Down
Loading

0 comments on commit 0dc3eb6

Please sign in to comment.