Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Unified configurations #385

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
47 changes: 47 additions & 0 deletions bsp_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
[provider]
# BSP provider type
provider_type = "bsp"
# Using memory storage layer for simplicity
storage_layer = "memory"
# Maximum storage capacity in bytes (4GB)
max_storage_capacity = 4294967295
# Jump capacity in bytes (1GB)
jump_capacity = 1073741824
# Extrinsic retry timeout in seconds
extrinsic_retry_timeout = 60
# Node key for identity
node_key = "0x2e6e3670c96202a2d6f5a58b7ac9092c5a51e0250f324eec2111ca94f5e568be"
# Path to keystores
keystore_path = "./docker/dev-keystores"

# Configuration for the BSP Upload File task
[provider.bsp_upload_file]
# Maximum number of times to retry file upload operations
max_try_count = 5
# Maximum tip amount to use when submitting file upload extrinsics
max_tip = 100.0

# Configuration for the BSP Move Bucket task
[provider.bsp_move_bucket]
# Grace period in seconds to accept download requests after a bucket move is accepted
move_bucket_accepted_grace_period = 14400

# Configuration for the BSP Charge Fees task
[provider.bsp_charge_fees]
# Minimum debt threshold for charging users
min_debt = 0

# Configuration for the BSP Submit Proof task
[provider.bsp_submit_proof]
# Maximum number of attempts to submit a proof
max_submission_attempts = 5

# Configuration for the Blockchain Service
[provider.blockchain_service]
extrinsic_retry_timeout = 60

# Optional indexer configuration
[indexer]
# Set to true to enable the indexer
indexer = true
database_url = "postgresql://postgres:postgres@docker-sh-postgres-1:5432/storage_hub"
2 changes: 2 additions & 0 deletions client/actors-framework/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
// TODO CONSTANTS
pub const MAX_PENDING_EVENTS: usize = 2000;
pub const MAX_TASKS_SPAWNED_PER_QUEUE: usize = 2000;

// TODO CONSTANTS
Comment on lines +1 to +5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's up here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for spotting, marked them as constants to remove to make them cli parameters

pub const DEFAULT_ACTOR_COMMAND_QUEUE_WARNING_SIZE: usize = 2000;
23 changes: 18 additions & 5 deletions client/blockchain-service/src/capacity_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::VecDeque;
use std::{collections::VecDeque, time::Duration};

use anyhow::anyhow;
use log::{debug, error};
Expand All @@ -11,7 +11,10 @@ use shc_forest_manager::traits::ForestStorageHandler;
use sp_api::ProvideRuntimeApi;
use sp_core::H256;

use crate::{transaction::SubmittedTransaction, types::ManagedProvider, BlockchainService};
use crate::{
transaction::SubmittedTransaction, types::ManagedProvider, types::SendExtrinsicOptions,
BlockchainService,
};

const LOG_TARGET: &str = "blockchain-service-capacity-manager";

Expand Down Expand Up @@ -154,7 +157,7 @@ impl CapacityRequestQueue {
}

/// Configuration parameters determining values for capacity increases.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct CapacityConfig {
/// Maximum storage capacity of the provider in bytes.
///
Expand Down Expand Up @@ -304,13 +307,23 @@ where
pallet_storage_providers::Call::change_capacity { new_capacity },
);

let extrinsic_retry_timeout = Duration::from_secs(self.config.extrinsic_retry_timeout);

// Send extrinsic to increase capacity
match self.send_extrinsic(call, Default::default()).await {
match self
.send_extrinsic(call, &SendExtrinsicOptions::new(extrinsic_retry_timeout))
.await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to be useful at all if we're not calling watch_transaction (which we shouldn't)

{
Ok(output) => {
// Add all pending requests to the list of requests waiting for inclusion.
if let Some(capacity_manager) = self.capacity_manager.as_mut() {
capacity_manager.add_pending_requests_to_waiting_for_inclusion(
SubmittedTransaction::new(output.receiver, output.hash, output.nonce),
SubmittedTransaction::new(
output.receiver,
output.hash,
output.nonce,
extrinsic_retry_timeout,
),
);
} else {
error!(target: LOG_TARGET, "Capacity manager not initialized");
Expand Down
9 changes: 4 additions & 5 deletions client/blockchain-service/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ pub trait BlockchainServiceInterface {
async fn submit_extrinsic_with_retry(
&self,
call: impl Into<storage_hub_runtime::RuntimeCall> + Send,
options: SendExtrinsicOptions,
retry_strategy: RetryStrategy,
with_events: bool,
) -> Result<Option<StorageHubEventsVec>>;
Expand Down Expand Up @@ -849,6 +850,7 @@ where
async fn submit_extrinsic_with_retry(
&self,
call: impl Into<storage_hub_runtime::RuntimeCall> + Send,
options: SendExtrinsicOptions,
retry_strategy: RetryStrategy,
with_events: bool,
) -> Result<Option<StorageHubEventsVec>> {
Expand All @@ -861,14 +863,11 @@ where
for retry_count in 0..=retry_strategy.max_retries {
debug!(target: LOG_TARGET, "Submitting transaction {:?} with tip {}", call, tip);

let extrinsic_options = SendExtrinsicOptions::new()
let extrinsic_options = SendExtrinsicOptions::new(options.timeout())
.with_tip(tip as u128)
.with_nonce(nonce);

let mut transaction = self
.send_extrinsic(call.clone(), extrinsic_options)
.await?
.with_timeout(retry_strategy.timeout);
let mut transaction = self.send_extrinsic(call.clone(), extrinsic_options).await?;

let result: Result<Option<StorageHubEventsVec>, _> = if with_events {
transaction
Expand Down
25 changes: 24 additions & 1 deletion client/blockchain-service/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use sc_client_api::{
};
use sc_service::RpcHandlers;
use sc_tracing::tracing::{debug, error, info, trace, warn};
use serde::Deserialize;
use shc_forest_manager::traits::ForestStorageHandler;
use sp_api::{ApiError, ProvideRuntimeApi};
use sp_blockchain::TreeRoute;
Expand Down Expand Up @@ -60,12 +61,14 @@ pub(crate) const LOG_TARGET: &str = "blockchain-service";
///
/// TODO: Define properly the number of blocks to come out of sync mode
/// TODO: Make this configurable in the config file
/// TODO: CONSTANTS
pub(crate) const SYNC_MODE_MIN_BLOCKS_BEHIND: BlockNumber = 5;

/// On blocks that are multiples of this number, the blockchain service will trigger the catch
/// up of proofs (see [`BlockchainService::proof_submission_catch_up`]).
///
/// TODO: Make this configurable in the config file
/// TODO: CONSTANTS
pub(crate) const CHECK_FOR_PENDING_PROOFS_PERIOD: BlockNumber = 4;

/// The maximum number of blocks from the past that will be processed for catching up the root
Expand All @@ -74,6 +77,7 @@ pub(crate) const CHECK_FOR_PENDING_PROOFS_PERIOD: BlockNumber = 4;
/// variant.
///
/// TODO: Make this configurable in the config file
/// TODO: CONSTANTS
pub(crate) const MAX_BLOCKS_BEHIND_TO_CATCH_UP_ROOT_CHANGES: BlockNumber = 10;

/// The BlockchainService actor.
Expand All @@ -85,6 +89,8 @@ pub struct BlockchainService<FSH>
where
FSH: ForestStorageHandler + Clone + Send + Sync + 'static,
{
/// The configuration for the BlockchainService.
pub(crate) config: BlockchainServiceConfig,
/// The event bus provider.
pub(crate) event_bus_provider: BlockchainServiceEventBusProvider,
/// The parachain client. Used to interact with the runtime.
Expand Down Expand Up @@ -128,6 +134,20 @@ where
pub(crate) capacity_manager: Option<CapacityRequestQueue>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct BlockchainServiceConfig {
/// Extrinsic retry timeout in seconds.
pub extrinsic_retry_timeout: u64,
}

impl Default for BlockchainServiceConfig {
fn default() -> Self {
Self {
extrinsic_retry_timeout: 60,
}
}
}

/// Event loop for the BlockchainService actor.
pub struct BlockchainServiceEventLoop<FSH>
where
Expand Down Expand Up @@ -221,13 +241,14 @@ where
call,
options,
callback,
} => match self.send_extrinsic(call, options).await {
} => match self.send_extrinsic(call, &options).await {
Ok(output) => {
debug!(target: LOG_TARGET, "Extrinsic sent successfully: {:?}", output);
match callback.send(Ok(SubmittedTransaction::new(
output.receiver,
output.hash,
output.nonce,
options.timeout(),
))) {
Ok(_) => {
trace!(target: LOG_TARGET, "Receiver sent successfully");
Expand Down Expand Up @@ -1108,6 +1129,7 @@ where
{
/// Create a new [`BlockchainService`].
pub fn new(
config: BlockchainServiceConfig,
client: Arc<ParachainClient>,
keystore: KeystorePtr,
rpc_handlers: Arc<RpcHandlers>,
Expand All @@ -1117,6 +1139,7 @@ where
capacity_request_queue: Option<CapacityRequestQueue>,
) -> Self {
Self {
config,
event_bus_provider: BlockchainServiceEventBusProvider::new(),
client,
keystore,
Expand Down
1 change: 1 addition & 0 deletions client/blockchain-service/src/handler_msp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::{
};

// TODO: Make this configurable in the config file
// TODO: CONSTANTS
const MAX_BATCH_MSP_RESPOND_STORE_REQUESTS: u32 = 100;

impl<FSH> BlockchainService<FSH>
Expand Down
3 changes: 3 additions & 0 deletions client/blockchain-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod utils;
use std::{path::PathBuf, sync::Arc};

use capacity_manager::{CapacityConfig, CapacityRequestQueue};
use handler::BlockchainServiceConfig;
use sc_service::RpcHandlers;
use sp_keystore::KeystorePtr;

Expand All @@ -23,6 +24,7 @@ pub use self::handler::BlockchainService;

pub async fn spawn_blockchain_service<FSH>(
task_spawner: &TaskSpawner,
config: BlockchainServiceConfig,
client: Arc<ParachainClient>,
keystore: KeystorePtr,
rpc_handlers: Arc<RpcHandlers>,
Expand All @@ -39,6 +41,7 @@ where
.with_group("network");

let blockchain_service = BlockchainService::<FSH>::new(
config,
client,
keystore,
rpc_handlers,
Expand Down
54 changes: 15 additions & 39 deletions client/blockchain-service/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
time::{Duration, Instant},
};

use log::{debug, error, info, warn};
use log::{debug, error, info};
use shc_actors_framework::actor::ActorHandle;
use shc_common::types::StorageHubEventsVec;
use shc_forest_manager::traits::ForestStorageHandler;
Expand All @@ -30,20 +30,18 @@ pub struct SubmittedTransaction {
watcher: Receiver<String>,
/// The hash of the transaction.
hash: ExtrinsicHash,
/// The maximum amount of time to wait for the transaction to either be successful or fail.
timeout: Option<Duration>,
/// The nonce of the transaction.
nonce: u32,
/// The maximum amount of time to wait for the transaction to either be successful or fail.
timeout: Duration,
}

const NO_TIMEOUT_INTERVAL_WARNING: Duration = Duration::from_secs(60);

impl SubmittedTransaction {
pub fn new(watcher: Receiver<String>, hash: H256, nonce: u32) -> Self {
pub fn new(watcher: Receiver<String>, hash: H256, nonce: u32, timeout: Duration) -> Self {
Self {
watcher,
hash,
timeout: None,
timeout,
nonce,
}
}
Expand All @@ -58,15 +56,6 @@ impl SubmittedTransaction {
self.nonce
}

/// Sets the timeout for the transaction.
///
/// If the transaction is not successful within the specified timeout, it will be considered
/// failed and an error will be returned.
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}

/// Handles the lifecycle of a submitted transaction.
///
/// Waits for the transaction to be included in a block AND the checks the transaction is successful.
Expand Down Expand Up @@ -167,18 +156,14 @@ impl SubmittedTransaction {
// Get the elapsed time since submit.
let elapsed = start_time.elapsed();
// Calculate the remaining time to wait.
let remaining = match self.timeout {
Some(timeout) => {
// Check if the timeout has been reached.
if elapsed > timeout {
error!(target: LOG_TARGET, "Timeout waiting for transaction {} to be included in a block", self.hash);
return Err(WatchTransactionError::Timeout);
}

timeout - elapsed
}
None => NO_TIMEOUT_INTERVAL_WARNING,
};
// Check if the timeout has been reached.
if elapsed > self.timeout {
error!(target: LOG_TARGET, "Timeout waiting for transaction {} to be included in a block", self.hash);
return Err(WatchTransactionError::Timeout);
}

let remaining = self.timeout - elapsed;

// Wait for either a new message from the watcher, or the timeout to be reached.
let result = match tokio::time::timeout(remaining, self.watcher.recv()).await {
Expand All @@ -191,20 +176,11 @@ impl SubmittedTransaction {
},
Err(_) => {
// Timeout reached, exit the loop.
match self.timeout {
Some(_) => {
error!(target: LOG_TARGET, "Timeout waiting for transaction to be included in a block");
return Err(WatchTransactionError::Timeout);
}
None => {
// No timeout set, continue waiting.
warn!(target: LOG_TARGET, "No timeout set and {:?} elapsed, continuing to wait for transaction to be included in a block.", NO_TIMEOUT_INTERVAL_WARNING);

continue;
}
}
error!(target: LOG_TARGET, "Timeout waiting for transaction to be included in a block");
return Err(WatchTransactionError::Timeout);
}
};

// Parse the JSONRPC string. The strings sent by the RPC wacher should be valid JSONRPC strings.
let json: serde_json::Value = serde_json::from_str(&result).map_err(|_| {
let err_msg = format!(
Expand Down
Loading
Loading