From 9a4f923ce7959b66b9ae7cfb7f809f53dbd66c95 Mon Sep 17 00:00:00 2001 From: Michael Assaf Date: Fri, 21 Feb 2025 16:31:53 -0500 Subject: [PATCH 1/7] implement capacity manager --- .../src/capacity_manager.rs | 417 ++++++++++++++++++ client/blockchain-service/src/commands.rs | 23 + client/blockchain-service/src/handler.rs | 25 +- client/blockchain-service/src/lib.rs | 4 + client/blockchain-service/src/utils.rs | 104 ++++- node/src/service.rs | 7 +- node/src/services/builder.rs | 46 +- node/src/services/handler.rs | 12 +- node/src/tasks/bsp_charge_fees.rs | 11 +- node/src/tasks/bsp_upload_file.rs | 140 +----- node/src/tasks/mock_bsp_volunteer.rs | 3 +- node/src/tasks/mock_sp_react_to_event.rs | 3 +- node/src/tasks/msp_charge_fees.rs | 6 +- node/src/tasks/msp_move_bucket.rs | 114 ++--- node/src/tasks/msp_upload_file.rs | 104 +---- node/src/tasks/sp_slash_provider.rs | 6 +- test/util/bspNet/block.ts | 12 +- test/util/bspNet/consts.ts | 1 + test/util/bspNet/test-api.ts | 7 +- 19 files changed, 686 insertions(+), 359 deletions(-) create mode 100644 client/blockchain-service/src/capacity_manager.rs diff --git a/client/blockchain-service/src/capacity_manager.rs b/client/blockchain-service/src/capacity_manager.rs new file mode 100644 index 000000000..b607bf339 --- /dev/null +++ b/client/blockchain-service/src/capacity_manager.rs @@ -0,0 +1,417 @@ +use std::collections::VecDeque; + +use anyhow::anyhow; +use log::{error, info}; +use pallet_storage_providers_runtime_api::{ + QueryEarliestChangeCapacityBlockError, QueryStorageProviderCapacityError, StorageProvidersApi, +}; +use sc_client_api::HeaderBackend; +use shc_common::types::{BlockNumber, StorageData, StorageProviderId}; +use shc_forest_manager::traits::ForestStorageHandler; +use sp_api::ProvideRuntimeApi; +use sp_core::H256; + +use crate::{transaction::SubmittedTransaction, BlockchainService}; + +const LOG_TARGET: &str = "blockchain-service-capacity-manager"; + +/// Queue of capacity requests for batching capacity increases in a single transaction. +pub struct CapacityRequestQueue { + /// Configuration parameters determining values for capacity increases. + capacity_config: CapacityConfig, + /// Pending capacity requests which have yet to be part of a transaction. + pending_requests: VecDeque, + /// Capacity requests bundled in a single transaction waiting to be included in a block. + /// + /// All requesters will be notified via the callback when the `CapacityChanged` event is processed + /// in the block important notification pipeline. This list will be cleared subsequently. + requests_waiting_for_inclusion: Vec, + /// Total required capacity. + total_required: StorageData, + /// The last submitted transaction which `requests_waiting_for_inclusion` is waiting for. + last_submitted_transaction: Option, +} + +impl CapacityRequestQueue { + pub fn new(capacity_config: CapacityConfig) -> Self { + Self { + capacity_config, + pending_requests: VecDeque::new(), + requests_waiting_for_inclusion: Vec::new(), + total_required: 0, + last_submitted_transaction: None, + } + } + + /// Get the last submitted transaction. + pub fn last_submitted_transaction(&self) -> Option<&SubmittedTransaction> { + self.last_submitted_transaction.as_ref() + } + + /// Get the configured maximum capacity allowed. + /// + /// Capacity requests will be rejected if the current provider capacity is at this limit. + pub fn max_capacity_allowed(&self) -> StorageData { + self.capacity_config.max_capacity + } + + /// Queue a capacity request. + /// + /// This will check for overflow and maximum capacity reached. + /// If the request cannot be queued, the error will be sent back to the caller. + pub fn queue_capacity_request( + &mut self, + request: CapacityRequest, + current_capacity: StorageData, + ) { + let Some(new_total_required) = self.total_required.checked_add(request.data.required) + else { + request.send_result(Err(anyhow!("Capacity overflow"))); + return; + }; + + if new_total_required > self.max_capacity_diff(current_capacity) { + request.send_result(Err(anyhow!("Maximum capacity reached"))); + return; + } + + self.total_required = new_total_required; + + self.pending_requests.push_back(request); + } + + /// Calculate the maximum capacity difference that can be requested. + fn max_capacity_diff(&self, current_capacity: StorageData) -> StorageData { + self.capacity_config + .max_capacity + .saturating_sub(current_capacity) + } + + /// Calculate the new capacity needed based on the total required capacity + pub fn calculate_new_capacity( + &self, + current_capacity: StorageData, + total_required: StorageData, + ) -> StorageData { + // Calculate how many jumps we need to cover the required capacity + let jumps_needed = (total_required + self.capacity_config.jump_capacity - 1) + / self.capacity_config.jump_capacity; + let total_jump_capacity = jumps_needed * self.capacity_config.jump_capacity; + + // Calculate new total capacity + let new_capacity = current_capacity.saturating_add(total_jump_capacity); + + // Ensure we don't exceed max capacity + new_capacity.min(self.capacity_config.max_capacity) + } + + /// Check if there are any pending requests + pub fn has_pending_requests(&self) -> bool { + !self.pending_requests.is_empty() + } + + /// Check if there are any requests waiting for inclusion + pub fn has_requests_waiting_for_inclusion(&self) -> bool { + !self.requests_waiting_for_inclusion.is_empty() + } + + /// Add all pending requests to the list of requests waiting for inclusion of the [`SubmittedTransaction`]. + pub fn add_pending_requests_to_waiting_for_inclusion( + &mut self, + submitted_transaction: SubmittedTransaction, + ) { + self.requests_waiting_for_inclusion + .extend(self.pending_requests.drain(..)); + self.last_submitted_transaction = Some(submitted_transaction); + } + + /// Complete all requests waiting for inclusion, notifying the callers of success. + /// + /// The `requests_waiting_for_inclusion` list is cleared after the requests are notified. + pub fn complete_requests_waiting_for_inclusion(&mut self) { + // Notify all callers of success + while let Some(request) = self.requests_waiting_for_inclusion.pop() { + request.send_result(Ok(())); + } + + // Clear the last submitted transaction + self.last_submitted_transaction = None; + } + + /// Fail all requests waiting for inclusion with an error message + /// + /// The `requests_waiting_for_inclusion` list is cleared after the requests are notified. + pub fn fail_requests_waiting_for_inclusion(&mut self, error_msg: String) { + while let Some(request) = self.requests_waiting_for_inclusion.pop() { + request.send_result(Err(anyhow!(error_msg.clone()))); + } + } + + /// Fail all pending requests with an error message + pub fn fail_requests(&mut self, error_msg: String) { + while let Some(request) = self.pending_requests.pop_front() { + request.send_result(Err(anyhow!(error_msg.clone()))); + } + } + + /// Reset the pending requests queue and total required capacity. + pub fn reset_queue(&mut self) { + self.pending_requests.clear(); + self.total_required = 0; + } +} + +/// Configuration parameters determining values for capacity increases. +#[derive(Clone)] +pub struct CapacityConfig { + /// Maximum storage capacity of the provider in bytes. + /// + /// The node will not increase its on-chain capacity above this value. + /// This is meant to reflect the actual physical storage capacity of the node. + max_capacity: StorageData, + /// Capacity increases by this amount in bytes a number of times based on the required capacity calculated + /// by the [`calculate_new_capacity`](CapacityRequestQueue::calculate_new_capacity) method. + /// + /// The jump capacity is the amount of storage that the node will increase in its on-chain + /// capacity by adding more stake. For example, if the jump capacity is set to 1k, and the + /// node needs 100 units of storage more to store a file, the node will automatically increase + /// its on-chain capacity by 1k units. + jump_capacity: StorageData, +} + +impl CapacityConfig { + pub fn new(max_capacity: StorageData, jump_capacity: StorageData) -> Self { + Self { + max_capacity, + jump_capacity, + } + } + + pub fn max_capacity(&self) -> StorageData { + self.max_capacity + } +} + +/// Individual capacity request for every caller. +pub struct CapacityRequest { + /// Data needed to process the capacity request. + data: CapacityRequestData, + /// Callback to notify the caller when the capacity request is processed. + callback: tokio::sync::oneshot::Sender>, +} + +impl CapacityRequest { + pub fn new( + data: CapacityRequestData, + callback: tokio::sync::oneshot::Sender>, + ) -> Self { + Self { data, callback } + } + + pub fn send_result(self, result: Result<(), anyhow::Error>) { + if let Err(e) = self.callback.send(result) { + error!(target: LOG_TARGET, "Failed to send capacity request result: {:?}", e); + } + } +} + +/// Data needed to process a capacity request. +pub struct CapacityRequestData { + /// Capacity requested to be increased. + required: StorageData, +} + +impl CapacityRequestData { + pub fn new(required: StorageData) -> Self { + Self { required } + } +} + +impl BlockchainService +where + FSH: ForestStorageHandler + Clone + Send + Sync + 'static, +{ + /// Helper method to temporarily take ownership of the capacity manager + pub(crate) fn execute_with_capacity_manager(&mut self, f: F) -> Result + where + F: FnOnce(&mut CapacityRequestQueue) -> R, + { + let mut manager = self + .capacity_manager + .take() + .ok_or_else(|| anyhow!("Capacity manager not initialized"))?; + + let result = f(&mut manager); + self.capacity_manager = Some(manager); + + Ok(result) + } + + /// Queue a capacity request. + /// + /// If the capacity request cannot be queued for any reason, the error will be sent back to the caller. + pub(crate) async fn queue_capacity_request(&mut self, capacity_request: CapacityRequest) { + match self.check_capacity_request_conditions().await { + Ok((_, current_capacity, _)) => { + // Wrap in Option to control ownership + let mut request_opt = Some(capacity_request); + + let result = self.execute_with_capacity_manager(|manager| { + // Take ownership from the Option + let request = request_opt.take().unwrap(); + manager.queue_capacity_request(request, current_capacity) + }); + + if let Err(e) = result { + // If we still have the request (manager wasn't initialized), send error + if let Some(request) = request_opt { + request.send_result(Err(e)); + } + // Else: request was successfully queued before an error occurred + } + } + Err(e) => { + // Send the error back to the caller. + capacity_request.send_result(Err(e)); + } + } + } + + /// Process any pending capacity requests. + /// + /// Since the `pending_requests` queue is kept in a valid state by pushing capacity requests that would still amount to a valid + /// `total_required` value not exceeding the `max_capacity_allowed`, we add them all to the `requests_waiting_for_inclusion` list + /// and send the `total_required` value in a single `change_capacity` extrinsic. + pub(crate) async fn process_capacity_requests( + &mut self, + block_number: BlockNumber, + ) -> Result<(), anyhow::Error> { + info!(target: LOG_TARGET, "[process_capacity_requests] Processing capacity requests"); + let (current_block_hash, current_capacity, inner_provider_id) = match self + .check_capacity_request_conditions() + .await + { + Ok(result) => result, + Err(e) => { + error!(target: LOG_TARGET, "Failed to check capacity request conditions: {:?}", e); + return Ok(()); + } + }; + + let capacity_manager_ref = self + .capacity_manager + .as_ref() + .expect("Capacity manager should be initialized; qed"); + + // Skip the process if there are no pending requests. + if !capacity_manager_ref.has_pending_requests() { + info!(target: LOG_TARGET, "[process_capacity_requests] No pending requests, skipping"); + return Ok(()); + } + + // Query earliest block to change capacity + info!(target: LOG_TARGET, "[process_capacity_requests] Querying earliest block to change capacity"); + let earliest_block = self + .client + .runtime_api() + .query_earliest_change_capacity_block(current_block_hash, &inner_provider_id) + .unwrap_or_else(|_| { + error!(target: LOG_TARGET, "Failed to query earliest block to change capacity"); + Err(QueryEarliestChangeCapacityBlockError::InternalError) + }) + .map_err(|e| anyhow!("Failed to query earliest block to change capacity: {:?}", e))?; + + if block_number < earliest_block - 1 { + info!(target: LOG_TARGET, "[process_capacity_requests] Earliest block to change capacity: {:?}", earliest_block); + // Must wait until the earliest block to change capacity. + return Ok(()); + } + + let required_capacity = capacity_manager_ref.total_required; + + // Calculate new capacity based on configuration + let new_capacity = + capacity_manager_ref.calculate_new_capacity(current_capacity, required_capacity); + + // Send the extrinsic to change the provider's capacity and wait for it to succeed. + let call = storage_hub_runtime::RuntimeCall::Providers( + pallet_storage_providers::Call::change_capacity { new_capacity }, + ); + + // Send extrinsic to increase capacity + match self.send_extrinsic(call, Default::default()).await { + Ok(output) => { + // Add all pending requests to the list of requests waiting for inclusion. + if let Err(e) = self.execute_with_capacity_manager(|manager| { + manager.add_pending_requests_to_waiting_for_inclusion( + SubmittedTransaction::new(output.receiver, output.hash, output.nonce), + ); + }) { + error!(target: LOG_TARGET, "Failed to add pending requests to waiting for inclusion: {:?}", e); + } + } + Err(e) => { + error!(target: LOG_TARGET, "Failed to send increase capacity extrinsic: {:?}", e); + // Notify all in-flight requests of the error + if let Err(e) = self.execute_with_capacity_manager(|manager| { + manager.fail_requests(e.to_string()); + }) { + error!(target: LOG_TARGET, "Failed to notify in-flight requests of the error: {:?}", e); + } + } + }; + + // Ensure the pending requests queue and total required capacity are reset so that + // new capacity requests can be queued and tally up from 0 again. + if let Err(e) = self.execute_with_capacity_manager(|manager| { + manager.reset_queue(); + }) { + error!(target: LOG_TARGET, "Failed to reset capacity manager: {:?}", e); + } + + Ok(()) + } + + /// Check if the capacity manager is initialized and if the provider ID is set. + /// + /// Ensure that the current capacity of the provider registered in the runtime is less than the maximum capacity configured + /// by the node operator. + async fn check_capacity_request_conditions( + &mut self, + ) -> Result<(H256, StorageData, H256), anyhow::Error> { + // Any errors in this block is considered a critical error which would not allow processing any capacity requests. + // Only process capacity requests if the capacity manager is initialized + let Some(capacity_manager) = &self.capacity_manager else { + return Err(anyhow!("Capacity manager not initialized")); + }; + + // Get provider ID + let Some(storage_provider_id) = &self.provider_id else { + return Err(anyhow!( + "No provider ID set, cannot process capacity requests" + )); + }; + + let inner_provider_id = match storage_provider_id { + StorageProviderId::MainStorageProvider(id) + | StorageProviderId::BackupStorageProvider(id) => id, + }; + + // Get current block hash + let current_block_hash = self.client.info().best_hash; + + // Query current capacity + let current_capacity = self + .client + .runtime_api() + .query_storage_provider_capacity(current_block_hash, inner_provider_id) + .unwrap_or_else(|_| Err(QueryStorageProviderCapacityError::InternalError)) + .map_err(|e| anyhow!("Failed to query current storage capacity: {:?}", e))?; + + if current_capacity >= capacity_manager.max_capacity_allowed() { + return Err(anyhow!("Provider already at maximum capacity")); + } + + Ok((current_block_hash, current_capacity, *inner_provider_id)) + } +} diff --git a/client/blockchain-service/src/commands.rs b/client/blockchain-service/src/commands.rs index c68382d8f..588ca263c 100644 --- a/client/blockchain-service/src/commands.rs +++ b/client/blockchain-service/src/commands.rs @@ -28,6 +28,7 @@ use shc_common::types::{ use storage_hub_runtime::{AccountId, Balance, StorageDataUnit}; use crate::{ + capacity_manager::CapacityRequestData, handler::BlockchainService, transaction::{SubmittedTransaction, WatchTransactionError}, types::{ @@ -201,6 +202,11 @@ pub enum BlockchainServiceCommand { request: FileDeletionRequest, callback: tokio::sync::oneshot::Sender>, }, + IncreaseCapacity { + request: CapacityRequestData, + callback: + tokio::sync::oneshot::Sender>>, + }, } /// Interface for interacting with the BlockchainService actor. @@ -372,6 +378,14 @@ pub trait BlockchainServiceInterface { async fn query_slash_amount_per_max_file_size(&self) -> Result; + /// Queue a CapacityRequest to be processed. + /// + /// Batching capacity requests in a single transaction with other tasks requiring capacity + /// increases. This is potentially a long-running operation since it requires waiting for + /// the block at which the capacity can be increased. This is enforced by the runtime based on the + /// providers pallet configuration parameter `MinBlocksBetweenCapacityChanges`. + async fn increase_capacity(&self, request: CapacityRequestData) -> Result<()>; + /// Helper function to check if an extrinsic failed or succeeded in a block. fn extrinsic_result(extrinsic: Extrinsic) -> Result; @@ -782,6 +796,15 @@ where rx.await.expect("Failed to receive response from BlockchainService. Probably means BlockchainService has crashed.") } + async fn increase_capacity(&self, request: CapacityRequestData) -> Result<()> { + let (callback, rx) = tokio::sync::oneshot::channel(); + let message = BlockchainServiceCommand::IncreaseCapacity { request, callback }; + self.send(message).await; + let rx = rx.await.expect("Failed to receive response from BlockchainService. Probably means BlockchainService has crashed."); + // This should receive a WatchTransaction so that the blockchain service can fire and forget + rx.await.expect("Failed to wait for capacity increase") + } + fn extrinsic_result(extrinsic: Extrinsic) -> Result { for ev in extrinsic.events { match ev.event { diff --git a/client/blockchain-service/src/handler.rs b/client/blockchain-service/src/handler.rs index b798976fc..d79a89310 100644 --- a/client/blockchain-service/src/handler.rs +++ b/client/blockchain-service/src/handler.rs @@ -21,6 +21,7 @@ use sp_runtime::{ traits::{Header, Zero}, AccountId32, SaturatedConversion, }; +use storage_hub_runtime::RuntimeEvent; use pallet_file_system_runtime_api::{ FileSystemApi, IsStorageRequestOpenToVolunteersError, QueryBspConfirmChunksToProveForFileError, @@ -45,9 +46,9 @@ use shc_common::{ }, }; use shp_file_metadata::FileKey; -use storage_hub_runtime::RuntimeEvent; use crate::{ + capacity_manager::{CapacityRequest, CapacityRequestQueue}, commands::BlockchainServiceCommand, events::{ AcceptedBspVolunteer, BlockchainServiceEventBusProvider, BspConfirmStoppedStoring, @@ -160,6 +161,10 @@ where /// /// This is meant to be used for periodic, low priority tasks. pub(crate) notify_period: Option, + /// Efficiently manages the capacity changes of storage providers. + /// + /// Only required if the node is running as a provider. + pub(crate) capacity_manager: Option, } /// Event loop for the BlockchainService actor. @@ -963,6 +968,19 @@ where } } } + BlockchainServiceCommand::IncreaseCapacity { request, callback } => { + // Create a new channel that will be used to notify completion + let (tx, rx) = tokio::sync::oneshot::channel(); + + // The capacity manager handles sending the result back to the caller so we don't need to do anything here. Whether the transaction failed or succeeded, or if the capacity request was never queued, the result will be sent back through the channel by the capacity manager. + self.queue_capacity_request(CapacityRequest::new(request, tx)) + .await; + + // Send the receiver back through the callback + if let Err(e) = callback.send(rx) { + error!(target: LOG_TARGET, "Failed to send capacity request receiver: {:?}", e); + } + } BlockchainServiceCommand::QueryMspIdOfBucketId { bucket_id, callback, @@ -1046,6 +1064,7 @@ where forest_storage_handler: FSH, rocksdb_root_path: impl Into, notify_period: Option, + capacity_request_queue: Option, ) -> Self { Self { event_bus_provider: BlockchainServiceEventBusProvider::new(), @@ -1063,6 +1082,7 @@ where persistent_state: BlockchainServiceStateStore::new(rocksdb_root_path.into()), pending_submit_proof_requests: BTreeSet::new(), notify_period, + capacity_manager: capacity_request_queue, } } @@ -1233,6 +1253,9 @@ where // It is not guaranteed that the tick number will increase at every block import. self.notify_tick_number(&block_hash); + // Notify the capacity manager that a new block has been imported. + self.notify_capacity_manager(&block_number).await; + // Process pending requests that update the forest root. self.check_pending_forest_root_writes(); diff --git a/client/blockchain-service/src/lib.rs b/client/blockchain-service/src/lib.rs index ed9d5322a..cb1f111d6 100644 --- a/client/blockchain-service/src/lib.rs +++ b/client/blockchain-service/src/lib.rs @@ -1,3 +1,4 @@ +pub mod capacity_manager; pub mod commands; pub mod events; pub mod handler; @@ -9,6 +10,7 @@ pub mod utils; use std::{path::PathBuf, sync::Arc}; +use capacity_manager::{CapacityConfig, CapacityRequestQueue}; use sc_service::RpcHandlers; use sp_keystore::KeystorePtr; @@ -25,6 +27,7 @@ pub async fn spawn_blockchain_service( forest_storage_handler: FSH, rocksdb_root_path: impl Into, notify_period: Option, + capacity_config: Option, ) -> ActorHandle> where FSH: shc_forest_manager::traits::ForestStorageHandler + Clone + Send + Sync + 'static, @@ -40,6 +43,7 @@ where forest_storage_handler, rocksdb_root_path, notify_period, + capacity_config.map(CapacityRequestQueue::new), ); task_spawner.spawn_actor(blockchain_service) diff --git a/client/blockchain-service/src/utils.rs b/client/blockchain-service/src/utils.rs index f1ad50765..f8410bb95 100644 --- a/client/blockchain-service/src/utils.rs +++ b/client/blockchain-service/src/utils.rs @@ -6,7 +6,9 @@ use cumulus_primitives_core::BlockT; use pallet_proofs_dealer_runtime_api::{ GetChallengePeriodError, GetChallengeSeedError, GetProofSubmissionRecordError, ProofsDealerApi, }; -use pallet_storage_providers_runtime_api::StorageProvidersApi; +use pallet_storage_providers_runtime_api::{ + QueryEarliestChangeCapacityBlockError, StorageProvidersApi, +}; use polkadot_runtime_common::BlockHashCount; use sc_client_api::{BlockBackend, BlockImportNotification, HeaderBackend}; use sc_tracing::tracing::{debug, error, info, trace, warn}; @@ -119,6 +121,106 @@ where } } + /// Sends back the result of the submitted transaction for all capacity requests waiting for inclusion if there is one. + /// + /// Begins another batch process of pending capacity requests if there are any and if + /// we are past the block at which the capacity can be increased. + pub(crate) async fn notify_capacity_manager(&mut self, block_number: &BlockNumber) { + if self.capacity_manager.is_none() { + return; + }; + + let current_block_hash = self.client.info().best_hash; + + let provider_id = match self.provider_id { + Some(provider_id) => match provider_id { + StorageProviderId::MainStorageProvider(id) + | StorageProviderId::BackupStorageProvider(id) => id, + }, + None => { + return; + } + }; + + let capacity_manager_ref = self.capacity_manager.as_ref().unwrap(); + + // Send response to all callers waiting for their capacity request to be included in a block. + if capacity_manager_ref.has_requests_waiting_for_inclusion() { + if let Some(last_submitted_transaction) = + capacity_manager_ref.last_submitted_transaction() + { + // Check if extrinsic was included in the current block. + if let Ok(extrinsic) = self + .get_extrinsic_from_block(current_block_hash, last_submitted_transaction.hash()) + .await + .map_err(|e| { + error!(target: LOG_TARGET, "[notify_capacity_manager] Failed to get extrinsic from block: {:?}", e); + anyhow::anyhow!("Failed to get extrinsic from block: {:?}", e) + }) + { + info!(target: LOG_TARGET, "[notify_capacity_manager] Extrinsic found in block, checking if it succeeded or failed"); + // Check if the extrinsic succeeded or failed. + // We notify the callers of the success or failure of the extrinsic. + for event in extrinsic.events.iter() { + match &event.event { + RuntimeEvent::System(system_event) => match system_event { + frame_system::Event::ExtrinsicSuccess { dispatch_info } => { + debug!( + target: LOG_TARGET, + "Extrinsic succeeded: {:?}", dispatch_info + ); + if let Err(e) = self.execute_with_capacity_manager(|manager| { + manager.complete_requests_waiting_for_inclusion(); + }) { + error!(target: LOG_TARGET, "Failed to complete requests waiting for inclusion: {:?}", e); + } + } + frame_system::Event::ExtrinsicFailed { dispatch_error, dispatch_info: _ } => { + error!( + target: LOG_TARGET, + "Extrinsic failed: {:?}", dispatch_error + ); + if let Err(e) = self.execute_with_capacity_manager(|manager| { + manager.fail_requests_waiting_for_inclusion(format!("Extrinsic failed: {:?}", dispatch_error)); + }) { + error!(target: LOG_TARGET, "Failed to fail requests waiting for inclusion: {:?}", e); + } + } + _ => {} + }, + _ => {} + } + } + } + } + } + + // Query earliest block to change capacity + let earliest_block = match self + .client + .runtime_api() + .query_earliest_change_capacity_block(current_block_hash, &provider_id) + .unwrap_or_else(|_| { + error!(target: LOG_TARGET, "[notify_capacity_manager] Failed to query earliest block to change capacity"); + Err(QueryEarliestChangeCapacityBlockError::InternalError) + }) { + Ok(earliest_block) => { + earliest_block + } + Err(e) => { + error!(target: LOG_TARGET, "[notify_capacity_manager] Failed to query earliest block to change capacity: {:?}", e); + return; + } + }; + + // We can send the transaction 1 block before the earliest block to change capacity since it will be included in the next block. + if *block_number >= earliest_block - 1 { + if let Err(e) = self.process_capacity_requests(*block_number).await { + error!(target: LOG_TARGET, "[notify_capacity_manager] Failed to process capacity requests: {:?}", e); + } + } + } + /// From a [`BlockImportNotification`], gets the imported block, and checks if: /// 1. The block is not the new best block. For example, it could be a block from a non-best fork branch. /// - If so, it returns [`NewBlockNotificationKind::NewNonBestBlock`]. diff --git a/node/src/service.rs b/node/src/service.rs index 3fcfdb8b1..087a1ffc5 100644 --- a/node/src/service.rs +++ b/node/src/service.rs @@ -3,6 +3,7 @@ // std use futures::{Stream, StreamExt}; use log::info; +use shc_blockchain_service::capacity_manager::CapacityConfig; use shc_indexer_db::DbPool; use shc_indexer_service::spawn_indexer_service; use std::{cell::RefCell, env, path::PathBuf, sync::Arc, time::Duration}; @@ -244,8 +245,10 @@ where storage_hub_builder .setup_storage_layer(storage_path.clone()) .with_retry_timeout(*extrinsic_retry_timeout) - .with_max_storage_capacity(*max_storage_capacity) - .with_jump_capacity(*jump_capacity); + .with_capacity_config(Some(CapacityConfig::new( + max_storage_capacity.unwrap_or_default(), + jump_capacity.unwrap_or_default(), + ))); // Setup specific configuration for the MSP node. if *provider_type == ProviderType::Msp { diff --git a/node/src/services/builder.rs b/node/src/services/builder.rs index 101b94e57..9345e3eba 100644 --- a/node/src/services/builder.rs +++ b/node/src/services/builder.rs @@ -4,11 +4,12 @@ use sc_service::RpcHandlers; use shc_indexer_db::DbPool; use sp_keystore::KeystorePtr; use std::{path::PathBuf, sync::Arc}; -use storage_hub_runtime::StorageDataUnit; use tokio::sync::RwLock; use shc_actors_framework::actor::{ActorHandle, TaskSpawner}; -use shc_blockchain_service::{spawn_blockchain_service, BlockchainService}; +use shc_blockchain_service::{ + capacity_manager::CapacityConfig, spawn_blockchain_service, BlockchainService, +}; use shc_common::types::ParachainClient; use shc_file_manager::{in_memory::InMemoryFileStorage, rocksdb::RocksDbFileStorage}; use shc_file_transfer_service::{spawn_file_transfer_service, FileTransferService}; @@ -42,8 +43,7 @@ where storage_path: Option, file_storage: Option::FL>>>, forest_storage_handler: Option<<(R, S) as ShNodeType>::FSH>, - max_storage_capacity: Option, - jump_capacity: Option, + capacity_config: Option, extrinsic_retry_timeout: u64, indexer_db_pool: Option, notify_period: Option, @@ -62,8 +62,7 @@ where storage_path: None, file_storage: None, forest_storage_handler: None, - max_storage_capacity: None, - jump_capacity: None, + capacity_config: None, extrinsic_retry_timeout: DEFAULT_EXTRINSIC_RETRY_TIMEOUT_SECONDS, indexer_db_pool: None, notify_period: None, @@ -95,22 +94,8 @@ where /// /// The node will not increase its on-chain capacity above this value. /// This is meant to reflect the actual physical storage capacity of the node. - pub fn with_max_storage_capacity( - &mut self, - max_storage_capacity: Option, - ) -> &mut Self { - self.max_storage_capacity = max_storage_capacity; - self - } - - /// Set the jump capacity. - /// - /// The jump capacity is the amount of storage that the node will increase in its on-chain - /// capacity by adding more stake. For example, if the jump capacity is set to 1k, and the - /// node needs 100 units of storage more to store a file, the node will automatically increase - /// its on-chain capacity by 1k units. - pub fn with_jump_capacity(&mut self, jump_capacity: Option) -> &mut Self { - self.jump_capacity = jump_capacity; + pub fn with_capacity_config(&mut self, capacity_config: Option) -> &mut Self { + self.capacity_config = capacity_config; self } @@ -154,6 +139,9 @@ where .forest_storage_handler .clone() .expect("Just checked that this is not None; qed"); + + let capacity_config = self.capacity_config.clone(); + let blockchain_service_handle = spawn_blockchain_service::<<(R, S) as ShNodeType>::FSH>( self.task_spawner .as_ref() @@ -164,6 +152,7 @@ where forest_storage_handler, rocksdb_root_path, self.notify_period, + capacity_config, ) .await; @@ -311,10 +300,7 @@ where .expect("Forest Storage Handler not set.") .clone(), ProviderConfig { - max_storage_capacity: self - .max_storage_capacity - .expect("Max Storage Capacity not set"), - jump_capacity: self.jump_capacity.expect("Jump Capacity not set"), + capacity_config: self.capacity_config.expect("Capacity Config not set"), extrinsic_retry_timeout: self.extrinsic_retry_timeout, }, self.indexer_db_pool.clone(), @@ -350,10 +336,7 @@ where .expect("Forest Storage Handler not set.") .clone(), ProviderConfig { - max_storage_capacity: self - .max_storage_capacity - .expect("Max Storage Capacity not set"), - jump_capacity: self.jump_capacity.expect("Jump Capacity not set"), + capacity_config: self.capacity_config.expect("Capacity Config not set"), extrinsic_retry_timeout: self.extrinsic_retry_timeout, }, self.indexer_db_pool.clone(), @@ -389,8 +372,7 @@ where <(UserRole, NoStorageLayer) as ShNodeType>::FSH::new(), // Not used by the user role ProviderConfig { - max_storage_capacity: 0, - jump_capacity: 0, + capacity_config: CapacityConfig::new(0, 0, 0.0), extrinsic_retry_timeout: self.extrinsic_retry_timeout, }, self.indexer_db_pool.clone(), diff --git a/node/src/services/handler.rs b/node/src/services/handler.rs index 374bb9777..ff61583f3 100644 --- a/node/src/services/handler.rs +++ b/node/src/services/handler.rs @@ -6,6 +6,7 @@ use shc_actors_framework::{ event_bus::{EventBusListener, EventHandler}, }; use shc_blockchain_service::{ + capacity_manager::CapacityConfig, events::{ AcceptedBspVolunteer, FileDeletionRequest, FinalisedBspConfirmStoppedStoring, FinalisedMspStoppedStoringBucket, FinalisedProofSubmittedForPendingFileDeletionRequest, @@ -24,7 +25,6 @@ use shc_file_transfer_service::{ }; use shc_forest_manager::traits::ForestStorageHandler; use shc_indexer_db::DbPool; -use storage_hub_runtime::StorageDataUnit; use crate::{ services::types::{ @@ -45,14 +45,10 @@ use crate::{ /// Configuration parameters for Storage Providers. #[derive(Clone)] pub struct ProviderConfig { - /// Maximum storage capacity of the provider (bytes). + /// Configuration parameters necessary to run the capacity manager. /// - /// The Storage Provider will not request to increase its storage capacity beyond this value. - pub max_storage_capacity: StorageDataUnit, - /// Jump capacity (bytes). - /// - /// Storage capacity increases in jumps of this size. - pub jump_capacity: StorageDataUnit, + /// This is only required if running as a storage provider node. + pub capacity_config: CapacityConfig, /// The time in seconds to wait before retrying an extrinsic. pub extrinsic_retry_timeout: u64, } diff --git a/node/src/tasks/bsp_charge_fees.rs b/node/src/tasks/bsp_charge_fees.rs index 70aa88127..e5cd499dc 100644 --- a/node/src/tasks/bsp_charge_fees.rs +++ b/node/src/tasks/bsp_charge_fees.rs @@ -9,7 +9,7 @@ use shc_blockchain_service::{ LastChargeableInfoUpdated, ProcessStopStoringForInsolventUserRequest, SpStopStoringInsolventUser, UserWithoutFunds, }, - types::{SendExtrinsicOptions, StopStoringForInsolventUserRequest}, + types::StopStoringForInsolventUserRequest, }; use shc_common::{consts::CURRENT_FOREST_KEY, types::MaxUsersToCharge}; use shc_forest_manager::traits::{ForestStorage, ForestStorageHandler}; @@ -109,7 +109,7 @@ where let charging_result = self .storage_hub_handler .blockchain - .send_extrinsic(call, SendExtrinsicOptions::default()) + .send_extrinsic(call, Default::default()) .await; match charging_result { @@ -302,10 +302,7 @@ where // continue only if it is successful. self.storage_hub_handler .blockchain - .send_extrinsic( - stop_storing_for_insolvent_user_call, - SendExtrinsicOptions::default(), - ) + .send_extrinsic(stop_storing_for_insolvent_user_call, Default::default()) .await? .with_timeout(Duration::from_secs( self.storage_hub_handler @@ -328,7 +325,7 @@ where let charging_result = self .storage_hub_handler .blockchain - .send_extrinsic(call, SendExtrinsicOptions::default()) + .send_extrinsic(call, Default::default()) .await; match charging_result { diff --git a/node/src/tasks/bsp_upload_file.rs b/node/src/tasks/bsp_upload_file.rs index 7aff5264a..03a257ed6 100644 --- a/node/src/tasks/bsp_upload_file.rs +++ b/node/src/tasks/bsp_upload_file.rs @@ -1,4 +1,4 @@ -use std::{cmp::max, collections::HashMap, ops::Add, str::FromStr, sync::Arc, time::Duration}; +use std::{collections::HashMap, str::FromStr, time::Duration}; use anyhow::anyhow; use frame_support::BoundedVec; @@ -6,13 +6,13 @@ use sc_network::PeerId; use sc_tracing::tracing::*; use sp_core::H256; use sp_runtime::AccountId32; -use tokio::sync::Mutex; use shc_actors_framework::event_bus::EventHandler; use shc_blockchain_service::{ + capacity_manager::CapacityRequestData, commands::BlockchainServiceInterface, events::{NewStorageRequest, ProcessConfirmStoringRequest}, - types::{ConfirmStoringRequest, RetryStrategy, SendExtrinsicOptions}, + types::{ConfirmStoringRequest, RetryStrategy}, }; use shc_common::{ consts::CURRENT_FOREST_KEY, @@ -26,7 +26,7 @@ use shc_file_transfer_service::{ commands::FileTransferServiceInterface, events::RemoteUploadRequest, }; use shc_forest_manager::traits::{ForestStorage, ForestStorageHandler}; -use storage_hub_runtime::{StorageDataUnit, MILLIUNIT}; +use storage_hub_runtime::MILLIUNIT; use crate::services::{ handler::StorageHubHandler, @@ -61,7 +61,6 @@ where { storage_hub_handler: StorageHubHandler, file_key_cleanup: Option, - capacity_queue: Arc>, } impl Clone for BspUploadFileTask @@ -73,7 +72,6 @@ where Self { storage_hub_handler: self.storage_hub_handler.clone(), file_key_cleanup: self.file_key_cleanup, - capacity_queue: Arc::clone(&self.capacity_queue), } } } @@ -87,7 +85,6 @@ where Self { storage_hub_handler, file_key_cleanup: None, - capacity_queue: Arc::new(Mutex::new(0_u64)), } } @@ -548,10 +545,6 @@ where event.file_key ); - // Note: the logic below is not ideal since it's not efficient (multiple increases in - // capacity might occur when one would suffice if multiple tasks are executing it, for example), - // but it's a temporary solution until we have a better way to handle this. - // Check that the BSP has not reached the maximum storage capacity. let current_capacity = self .storage_hub_handler @@ -569,108 +562,34 @@ where let max_storage_capacity = self .storage_hub_handler .provider_config - .max_storage_capacity; + .capacity_config + .max_capacity(); - if max_storage_capacity == current_capacity { + if max_storage_capacity <= current_capacity { let err_msg = "Reached maximum storage capacity limit. Unable to add more more storage capacity."; - warn!( + error!( target: LOG_TARGET, "{}", err_msg ); return Err(anyhow::anyhow!(err_msg)); } - // Register the capacity change in the queue. - let mut capacity_queue = self.capacity_queue.lock().await; - *capacity_queue = capacity_queue.add(event.size); - drop(capacity_queue); - - // Get the earliest block at which this BSP can change its capacity. - // This is done after registering the capacity increase in the queue in case another task was currently - // increasing the capacity as well, so we have the most up-to-date earliest change capacity block. - let earliest_change_capacity_block = self - .storage_hub_handler - .blockchain - .query_earliest_change_capacity_block(own_bsp_id) - .await - .map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to query storage provider capacity: {:?}", e - ); - anyhow::anyhow!("Failed to query storage provider capacity: {:?}", e) - })?; - - // Wait for the earliest block where the capacity can be changed. self.storage_hub_handler .blockchain - .wait_for_block(earliest_change_capacity_block) + .increase_capacity(CapacityRequestData::new(event.size)) .await?; - // Read from the queue if there is a capacity change remaining. - let mut capacity_queue = self.capacity_queue.lock().await; - - // If there is, apply it. - if *capacity_queue > 0 { - let size: u64 = *capacity_queue; - - // Get the current capacity of the BSP. This is needed since it could have changed between the time we - // registered the capacity increase in the queue and the time we are actually increasing the capacity. - let current_capacity = self - .storage_hub_handler - .blockchain - .query_storage_provider_capacity(own_bsp_id) - .await - .map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to query storage provider capacity: {:?}", e - ); - anyhow::anyhow!("Failed to query storage provider capacity: {:?}", e) - })?; - - // Calculate the new capacity that the BSP has to have to be able to volunteer for the storage request. - let new_capacity = self.calculate_capacity(size, current_capacity)?; - - // Send the extrinsic to change this BSP's capacity and wait for it to succeed. - let call = storage_hub_runtime::RuntimeCall::Providers( - pallet_storage_providers::Call::change_capacity { new_capacity }, - ); - - self.storage_hub_handler - .blockchain - .send_extrinsic(call, SendExtrinsicOptions::default()) - .await? - .with_timeout(Duration::from_secs( - self.storage_hub_handler - .provider_config - .extrinsic_retry_timeout, - )) - .watch_for_success(&self.storage_hub_handler.blockchain) - .await?; - - // Reset the capacity queue. - *capacity_queue = 0; - - info!( - target: LOG_TARGET, - "Increased storage capacity to {:?} bytes", - new_capacity - ); - } - - drop(capacity_queue); - let available_capacity = self .storage_hub_handler .blockchain .query_available_storage_capacity(own_bsp_id) .await .map_err(|e| { + let err_msg = format!("Failed to query available storage capacity: {:?}", e); error!( target: LOG_TARGET, - "Failed to query available storage capacity: {:?}", e + err_msg ); - anyhow::anyhow!("Failed to query available storage capacity: {:?}", e) + anyhow::anyhow!(err_msg) })?; // Skip volunteering if the new available capacity is still less than the file size. @@ -776,7 +695,7 @@ where let result = self .storage_hub_handler .blockchain - .send_extrinsic(call.clone(), SendExtrinsicOptions::default()) + .send_extrinsic(call.clone(), Default::default()) .await? .with_timeout(Duration::from_secs( self.storage_hub_handler @@ -805,7 +724,7 @@ where let result = self .storage_hub_handler .blockchain - .send_extrinsic(call, SendExtrinsicOptions::default()) + .send_extrinsic(call, Default::default()) .await? .with_timeout(Duration::from_secs( self.storage_hub_handler @@ -992,37 +911,6 @@ where Ok(file_complete) } - /// Calculate the new capacity after adding the required capacity for the file. - /// - /// The new storage capacity will be increased by the jump capacity until it reaches the - /// `max_storage_capacity`. - /// - /// The `max_storage_capacity` is returned if the new capacity exceeds it. - fn calculate_capacity( - &self, - required_additional_capacity: StorageDataUnit, - current_capacity: StorageDataUnit, - ) -> Result { - let jump_capacity = self.storage_hub_handler.provider_config.jump_capacity; - let jumps_needed = required_additional_capacity.div_ceil(jump_capacity); - let jumps = max(jumps_needed, 1); - let bytes_to_add = jumps * jump_capacity; - let required_capacity = current_capacity.checked_add(bytes_to_add).ok_or_else(|| { - anyhow::anyhow!( - "Reached maximum storage capacity limit. Skipping volunteering for file." - ) - })?; - - let max_storage_capacity = self - .storage_hub_handler - .provider_config - .max_storage_capacity; - - let new_capacity = std::cmp::min(required_capacity, max_storage_capacity); - - Ok(new_capacity) - } - async fn unvolunteer_file(&self, file_key: H256) { warn!(target: LOG_TARGET, "Unvolunteering file {:?}", file_key); diff --git a/node/src/tasks/mock_bsp_volunteer.rs b/node/src/tasks/mock_bsp_volunteer.rs index e581bd84c..6ff1758a7 100644 --- a/node/src/tasks/mock_bsp_volunteer.rs +++ b/node/src/tasks/mock_bsp_volunteer.rs @@ -4,7 +4,6 @@ use std::time::Duration; use log::*; use shc_actors_framework::event_bus::EventHandler; -use shc_blockchain_service::types::SendExtrinsicOptions; use shc_blockchain_service::{commands::BlockchainServiceInterface, events::NewStorageRequest}; use sp_core::H256; @@ -67,7 +66,7 @@ where self.storage_hub_handler .blockchain - .send_extrinsic(call, SendExtrinsicOptions::default()) + .send_extrinsic(call, Default::default()) .await? .with_timeout(Duration::from_secs( self.storage_hub_handler diff --git a/node/src/tasks/mock_sp_react_to_event.rs b/node/src/tasks/mock_sp_react_to_event.rs index 0f97d0894..bbd500ecf 100644 --- a/node/src/tasks/mock_sp_react_to_event.rs +++ b/node/src/tasks/mock_sp_react_to_event.rs @@ -6,7 +6,6 @@ use log::*; use shc_actors_framework::event_bus::EventHandler; use shc_blockchain_service::{ commands::BlockchainServiceInterface, events::MultipleNewChallengeSeeds, - types::SendExtrinsicOptions, }; use crate::services::{handler::StorageHubHandler, types::ShNodeType}; @@ -70,7 +69,7 @@ where self.storage_hub_handler .blockchain - .send_extrinsic(call, SendExtrinsicOptions::default()) + .send_extrinsic(call, Default::default()) .await? .with_timeout(Duration::from_secs( self.storage_hub_handler diff --git a/node/src/tasks/msp_charge_fees.rs b/node/src/tasks/msp_charge_fees.rs index e79b56e11..05ee695c9 100644 --- a/node/src/tasks/msp_charge_fees.rs +++ b/node/src/tasks/msp_charge_fees.rs @@ -1,9 +1,7 @@ use anyhow::anyhow; use sc_tracing::tracing::*; use shc_actors_framework::event_bus::EventHandler; -use shc_blockchain_service::{ - commands::BlockchainServiceInterface, events::NotifyPeriod, types::SendExtrinsicOptions, -}; +use shc_blockchain_service::{commands::BlockchainServiceInterface, events::NotifyPeriod}; use shc_common::types::{MaxUsersToCharge, StorageProviderId}; use sp_core::Get; use storage_hub_runtime::Balance; @@ -114,7 +112,7 @@ where let charging_result = self .storage_hub_handler .blockchain - .send_extrinsic(call, SendExtrinsicOptions::default()) + .send_extrinsic(call, Default::default()) .await; match charging_result { diff --git a/node/src/tasks/msp_move_bucket.rs b/node/src/tasks/msp_move_bucket.rs index e19763adc..9b248ea46 100644 --- a/node/src/tasks/msp_move_bucket.rs +++ b/node/src/tasks/msp_move_bucket.rs @@ -3,14 +3,13 @@ use codec::Decode; use rand::seq::SliceRandom; use sc_tracing::tracing::*; use sp_core::H256; -use std::{cmp::max, time::Duration}; +use std::time::Duration; use pallet_file_system::types::BucketMoveRequestResponse; use shc_actors_framework::event_bus::EventHandler; use shc_blockchain_service::{ - commands::BlockchainServiceInterface, - events::MoveBucketRequestedForMsp, - types::{RetryStrategy, SendExtrinsicOptions}, + capacity_manager::CapacityRequestData, commands::BlockchainServiceInterface, + events::MoveBucketRequestedForMsp, types::RetryStrategy, }; use shc_common::types::{ BucketId, FileKeyProof, HashT, ProviderId, StorageProofsMerkleTrieLayout, StorageProviderId, @@ -20,7 +19,6 @@ use shc_file_transfer_service::commands::FileTransferServiceInterface; use shc_forest_manager::traits::{ForestStorage, ForestStorageHandler}; use shp_constants::FILE_CHUNK_SIZE; use shp_file_metadata::ChunkId; -use storage_hub_runtime::StorageDataUnit; use crate::services::{ handler::StorageHubHandler, @@ -596,126 +594,78 @@ where .await .map_err(|e| { let err_msg = format!("Failed to query available storage capacity: {:?}", e); - error!(target: LOG_TARGET, err_msg); + error!( + target: LOG_TARGET, + err_msg + ); anyhow::anyhow!(err_msg) })?; - // Increase storage capacity if the available capacity is less than the required size + // Increase storage capacity if the available capacity is less than the file size. if available_capacity < required_size { warn!( target: LOG_TARGET, - "Insufficient storage capacity to accept bucket move. Available: {}, Required: {}", - available_capacity, - required_size + "Insufficient storage capacity to move bucket. Required: {}, available: {}", + required_size, available_capacity ); + // Check that the BSP has not reached the maximum storage capacity. let current_capacity = self .storage_hub_handler .blockchain .query_storage_provider_capacity(own_msp_id) .await .map_err(|e| { - let err_msg = format!("Failed to query storage provider capacity: {:?}", e); - error!(target: LOG_TARGET, err_msg); - anyhow::anyhow!(err_msg) + error!( + target: LOG_TARGET, + "Failed to query storage provider capacity: {:?}", e + ); + anyhow::anyhow!("Failed to query storage provider capacity: {:?}", e) })?; let max_storage_capacity = self .storage_hub_handler .provider_config - .max_storage_capacity; + .capacity_config + .max_capacity(); - if max_storage_capacity == current_capacity { - let err_msg = - "Reached maximum storage capacity limit. Unable to add more storage capacity."; - warn!(target: LOG_TARGET, err_msg); + if max_storage_capacity <= current_capacity { + let err_msg = "Reached maximum storage capacity limit. Unable to add more more storage capacity."; + error!( + target: LOG_TARGET, "{}", err_msg + ); return Err(anyhow::anyhow!(err_msg)); } - let new_capacity = self.calculate_capacity(required_size, current_capacity)?; - - let call = storage_hub_runtime::RuntimeCall::Providers( - pallet_storage_providers::Call::change_capacity { new_capacity }, - ); - - let earliest_change_capacity_block = self - .storage_hub_handler - .blockchain - .query_earliest_change_capacity_block(own_msp_id) - .await - .map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to query earliest change capacity block: {:?}", e - ); - anyhow::anyhow!("Failed to query earliest change capacity block: {:?}", e) - })?; - - // Wait for the earliest block where the capacity can be changed - self.storage_hub_handler - .blockchain - .wait_for_block(earliest_change_capacity_block) - .await?; - self.storage_hub_handler .blockchain - .send_extrinsic(call, SendExtrinsicOptions::default()) - .await? - .with_timeout(Duration::from_secs(60)) - .watch_for_success(&self.storage_hub_handler.blockchain) + .increase_capacity(CapacityRequestData::new(required_size)) .await?; - info!( - target: LOG_TARGET, - "Increased storage capacity to {:?} bytes", - new_capacity - ); - let available_capacity = self .storage_hub_handler .blockchain .query_available_storage_capacity(own_msp_id) .await .map_err(|e| { + let err_msg = format!("Failed to query available storage capacity: {:?}", e); error!( target: LOG_TARGET, - "Failed to query available storage capacity: {:?}", e + err_msg ); - anyhow::anyhow!("Failed to query available storage capacity: {:?}", e) + anyhow::anyhow!(err_msg) })?; - // Reject bucket move if the new available capacity is still less than required + // Skip volunteering if the new available capacity is still less than the file size. if available_capacity < required_size { - let err_msg = - "Increased storage capacity is still insufficient to accept bucket move."; - warn!(target: LOG_TARGET, "{}", err_msg); + let err_msg = "Increased storage capacity is still insufficient to volunteer for file. Skipping volunteering."; + warn!( + target: LOG_TARGET, "{}", err_msg + ); return Err(anyhow::anyhow!(err_msg)); } } Ok(()) } - - fn calculate_capacity( - &self, - required_size: u64, - current_capacity: StorageDataUnit, - ) -> Result { - let jump_capacity = self.storage_hub_handler.provider_config.jump_capacity; - let jumps_needed = (required_size + jump_capacity - 1) / jump_capacity; - let jumps = max(jumps_needed, 1); - let bytes_to_add = jumps * jump_capacity; - let required_capacity = current_capacity.checked_add(bytes_to_add).ok_or_else(|| { - anyhow::anyhow!("Reached maximum storage capacity limit. Cannot accept bucket move.") - })?; - - let max_storage_capacity = self - .storage_hub_handler - .provider_config - .max_storage_capacity; - - let new_capacity = std::cmp::min(required_capacity, max_storage_capacity); - - Ok(new_capacity) - } } diff --git a/node/src/tasks/msp_upload_file.rs b/node/src/tasks/msp_upload_file.rs index c64a6bcd6..a87297a3f 100644 --- a/node/src/tasks/msp_upload_file.rs +++ b/node/src/tasks/msp_upload_file.rs @@ -1,12 +1,11 @@ -use std::{cmp::max, collections::HashMap, str::FromStr, time::Duration}; +use std::{collections::HashMap, str::FromStr, time::Duration}; use anyhow::anyhow; use pallet_file_system::types::RejectedStorageRequest; use sc_network::PeerId; use sc_tracing::tracing::*; -use shc_blockchain_service::types::{ - MspRespondStorageRequest, RespondStorageRequest, SendExtrinsicOptions, -}; +use shc_blockchain_service::capacity_manager::CapacityRequestData; +use shc_blockchain_service::types::{MspRespondStorageRequest, RespondStorageRequest}; use sp_core::H256; use sp_runtime::AccountId32; @@ -23,7 +22,6 @@ use shc_file_transfer_service::{ commands::FileTransferServiceInterface, events::RemoteUploadRequest, }; use shc_forest_manager::traits::{ForestStorage, ForestStorageHandler}; -use storage_hub_runtime::StorageDataUnit; use crate::services::types::ShNodeType; use crate::services::{handler::StorageHubHandler, types::MspForestStorageHandlerT}; @@ -321,7 +319,7 @@ where self.storage_hub_handler .blockchain - .send_extrinsic(call, SendExtrinsicOptions::default()) + .send_extrinsic(call, Default::default()) .await? .with_timeout(Duration::from_secs(60)) .watch_for_success(&self.storage_hub_handler.blockchain) @@ -495,87 +493,56 @@ where if available_capacity < event.size { warn!( target: LOG_TARGET, - "Insufficient storage capacity to accept file: {:?}", + "Insufficient storage capacity to volunteer for file key: {:?}", event.file_key ); + // Check that the BSP has not reached the maximum storage capacity. let current_capacity = self .storage_hub_handler .blockchain .query_storage_provider_capacity(own_msp_id) .await .map_err(|e| { - let err_msg = format!("Failed to query storage provider capacity: {:?}", e); error!( target: LOG_TARGET, - err_msg + "Failed to query storage provider capacity: {:?}", e ); - anyhow::anyhow!(err_msg) + anyhow::anyhow!("Failed to query storage provider capacity: {:?}", e) })?; let max_storage_capacity = self .storage_hub_handler .provider_config - .max_storage_capacity; + .capacity_config + .max_capacity(); - if max_storage_capacity == current_capacity { + if max_storage_capacity <= current_capacity { let err_msg = "Reached maximum storage capacity limit. Unable to add more more storage capacity."; - warn!( - target: LOG_TARGET, err_msg + error!( + target: LOG_TARGET, "{}", err_msg ); return Err(anyhow::anyhow!(err_msg)); } - let new_capacity = self.calculate_capacity(&event, current_capacity)?; - - let call = storage_hub_runtime::RuntimeCall::Providers( - pallet_storage_providers::Call::change_capacity { new_capacity }, - ); - - let earliest_change_capacity_block = self - .storage_hub_handler - .blockchain - .query_earliest_change_capacity_block(own_msp_id) - .await - .map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to query storage provider capacity: {:?}", e - ); - anyhow::anyhow!("Failed to query storage provider capacity: {:?}", e) - })?; - - // Wait for the earliest block where the capacity can be changed. self.storage_hub_handler .blockchain - .wait_for_block(earliest_change_capacity_block) + .increase_capacity(CapacityRequestData::new(event.size)) .await?; - self.storage_hub_handler - .blockchain - .send_extrinsic(call, SendExtrinsicOptions::default()) - .await? - .with_timeout(Duration::from_secs(60)) - .watch_for_success(&self.storage_hub_handler.blockchain) - .await?; - - info!( - target: LOG_TARGET, - "Increased storage capacity to {:?} bytes", - new_capacity - ); - let available_capacity = self .storage_hub_handler .blockchain .query_available_storage_capacity(own_msp_id) .await .map_err(|e| { + let err_msg = + format!("Failed to query available storage capacity: {:?}", e); error!( target: LOG_TARGET, - "Failed to query available storage capacity: {:?}", e + err_msg ); - anyhow::anyhow!("Failed to query available storage capacity: {:?}", e) + anyhow::anyhow!(err_msg) })?; // Reject storage request if the new available capacity is still less than the file size. @@ -601,7 +568,7 @@ where self.storage_hub_handler .blockchain - .send_extrinsic(call, SendExtrinsicOptions::default()) + .send_extrinsic(call, Default::default()) .await? .with_timeout(Duration::from_secs(60)) .watch_for_success(&self.storage_hub_handler.blockchain) @@ -918,7 +885,7 @@ where self.storage_hub_handler .blockchain - .send_extrinsic(call, SendExtrinsicOptions::default()) + .send_extrinsic(call, Default::default()) .await? .with_timeout(Duration::from_secs(60)) .watch_for_success(&self.storage_hub_handler.blockchain) @@ -930,37 +897,6 @@ where Ok(()) } - /// Calculate the new capacity after adding the required capacity for the file. - /// - /// The new storage capacity will be increased by the jump capacity until it reaches the - /// `max_storage_capacity`. - /// - /// The `max_storage_capacity` is returned if the new capacity exceeds it. - fn calculate_capacity( - &mut self, - event: &NewStorageRequest, - current_capacity: StorageDataUnit, - ) -> Result { - let jump_capacity = self.storage_hub_handler.provider_config.jump_capacity; - let jumps_needed = event.size.div_ceil(jump_capacity); - let jumps = max(jumps_needed, 1); - let bytes_to_add = jumps * jump_capacity; - let required_capacity = current_capacity.checked_add(bytes_to_add).ok_or_else(|| { - anyhow::anyhow!( - "Reached maximum storage capacity limit. Skipping volunteering for file." - ) - })?; - - let max_storage_capacity = self - .storage_hub_handler - .provider_config - .max_storage_capacity; - - let new_capacity = std::cmp::min(required_capacity, max_storage_capacity); - - Ok(new_capacity) - } - async fn unregister_file(&self, file_key: H256) -> anyhow::Result<()> { warn!(target: LOG_TARGET, "Unregistering file {:?}", file_key); diff --git a/node/src/tasks/sp_slash_provider.rs b/node/src/tasks/sp_slash_provider.rs index f425c3836..3015e1e6d 100644 --- a/node/src/tasks/sp_slash_provider.rs +++ b/node/src/tasks/sp_slash_provider.rs @@ -3,9 +3,7 @@ use std::time::Duration; use sc_tracing::tracing::*; use shc_actors_framework::event_bus::EventHandler; -use shc_blockchain_service::{ - commands::BlockchainServiceInterface, events::SlashableProvider, types::SendExtrinsicOptions, -}; +use shc_blockchain_service::{commands::BlockchainServiceInterface, events::SlashableProvider}; use crate::services::{handler::StorageHubHandler, types::ShNodeType}; @@ -79,7 +77,7 @@ where // Send extrinsic and wait for it to be included in the block. self.storage_hub_handler .blockchain - .send_extrinsic(call, SendExtrinsicOptions::default()) + .send_extrinsic(call, Default::default()) .await? .with_timeout(Duration::from_secs( self.storage_hub_handler diff --git a/test/util/bspNet/block.ts b/test/util/bspNet/block.ts index c3b9d3800..5bf3e2296 100644 --- a/test/util/bspNet/block.ts +++ b/test/util/bspNet/block.ts @@ -44,7 +44,11 @@ export interface SealedBlock { */ export const extendFork = async ( api: ApiPromise, - options: { parentBlockHash: string; amountToExtend: number; verbose?: boolean } + options: { + parentBlockHash: string; + amountToExtend: number; + verbose?: boolean; + } ) => { let parentBlockHash: string = options.parentBlockHash; let parentHeight = (await api.rpc.chain.getHeader(parentBlockHash)).number.toNumber(); @@ -135,7 +139,9 @@ export const sealBlock = async ( if (call.isSigned) { hash = await call.send(); } else { - hash = await call.signAndSend(signer || alice, { nonce: nonceToUse + i }); + hash = await call.signAndSend(signer || alice, { + nonce: nonceToUse + i + }); } // Poll for the transaction to be included in the pending extrinsics, or error out in 2 seconds @@ -300,7 +306,7 @@ export const skipBlocksUntilBspCanChangeCapacity: ( `\tSkipping to block #${blockToAdvanceTo} to go beyond MinBlocksBetweenCapacityChanges` ); await advanceToBlock(api, { - blockNumber: blockToAdvanceTo, + blockNumber: blockToAdvanceTo - 1, verbose: false, watchForBspProofs: [bspId.toString()] }); diff --git a/test/util/bspNet/consts.ts b/test/util/bspNet/consts.ts index 2b37123ea..a5819ec77 100644 --- a/test/util/bspNet/consts.ts +++ b/test/util/bspNet/consts.ts @@ -110,4 +110,5 @@ export const TRANSFER_WEIGHT_REF_TIME = 297_297_000; export const TRANSFER_WEIGHT_PROOF_SIZE = 308; export const JUMP_CAPACITY_BSP = 1073741824; +export const CAPACITY_BUFFER_PERCENTILE = 0.2; export const MSP_CHARGING_PERIOD = 12; diff --git a/test/util/bspNet/test-api.ts b/test/util/bspNet/test-api.ts index 2b56730b8..7be186ece 100644 --- a/test/util/bspNet/test-api.ts +++ b/test/util/bspNet/test-api.ts @@ -239,7 +239,10 @@ export class BspNetTestApi implements AsyncDisposable { * @param options.timeoutMs - Optional timeout in milliseconds * @returns A promise that resolves when a BSP has submitted to the tx pool the extrinsic to confirm storing a file. */ - bspStoredInTxPool: (options?: { expectedExts?: number; timeoutMs?: number }) => + bspStoredInTxPool: (options?: { + expectedExts?: number; + timeoutMs?: number; + }) => Waits.waitForBspStoredWithoutSealing(this._api, { checkQuantity: options?.expectedExts, timeout: options?.timeoutMs @@ -506,6 +509,8 @@ export class BspNetTestApi implements AsyncDisposable { ) => BspNetBlock.advanceToBlock(this._api, { ...options, blockNumber }), /** * Skips blocks until the minimum time for capacity changes is reached. + * It will stop at the block before the minimum change time is reached since the capacity + * change extrinsic will be sent and included in the next block. * * @param bspId - The ID of the BSP that the capacity change is for. * @returns A promise that resolves when the minimum change time is reached. From 04bd7d8cc80715791bfdde2764212cceb4c77214 Mon Sep 17 00:00:00 2001 From: Michael Assaf Date: Fri, 21 Feb 2025 17:04:01 -0500 Subject: [PATCH 2/7] unify complete requests functions into one, skip processing queue if there is a pending transaction --- .../src/capacity_manager.rs | 15 +--- client/blockchain-service/src/utils.rs | 83 +++++++++---------- node/src/services/builder.rs | 2 +- 3 files changed, 45 insertions(+), 55 deletions(-) diff --git a/client/blockchain-service/src/capacity_manager.rs b/client/blockchain-service/src/capacity_manager.rs index b607bf339..2f094527d 100644 --- a/client/blockchain-service/src/capacity_manager.rs +++ b/client/blockchain-service/src/capacity_manager.rs @@ -128,25 +128,16 @@ impl CapacityRequestQueue { /// Complete all requests waiting for inclusion, notifying the callers of success. /// /// The `requests_waiting_for_inclusion` list is cleared after the requests are notified. - pub fn complete_requests_waiting_for_inclusion(&mut self) { - // Notify all callers of success + pub fn complete_requests_waiting_for_inclusion(&mut self, result: Result<(), String>) { + // Notify all callers of result while let Some(request) = self.requests_waiting_for_inclusion.pop() { - request.send_result(Ok(())); + request.send_result(result.clone().map_err(anyhow::Error::msg)); } // Clear the last submitted transaction self.last_submitted_transaction = None; } - /// Fail all requests waiting for inclusion with an error message - /// - /// The `requests_waiting_for_inclusion` list is cleared after the requests are notified. - pub fn fail_requests_waiting_for_inclusion(&mut self, error_msg: String) { - while let Some(request) = self.requests_waiting_for_inclusion.pop() { - request.send_result(Err(anyhow!(error_msg.clone()))); - } - } - /// Fail all pending requests with an error message pub fn fail_requests(&mut self, error_msg: String) { while let Some(request) = self.pending_requests.pop_front() { diff --git a/client/blockchain-service/src/utils.rs b/client/blockchain-service/src/utils.rs index f8410bb95..5bb7de427 100644 --- a/client/blockchain-service/src/utils.rs +++ b/client/blockchain-service/src/utils.rs @@ -152,65 +152,64 @@ where // Check if extrinsic was included in the current block. if let Ok(extrinsic) = self .get_extrinsic_from_block(current_block_hash, last_submitted_transaction.hash()) - .await - .map_err(|e| { - error!(target: LOG_TARGET, "[notify_capacity_manager] Failed to get extrinsic from block: {:?}", e); - anyhow::anyhow!("Failed to get extrinsic from block: {:?}", e) - }) + .await + .map_err(|e| anyhow::anyhow!("Failed to get extrinsic from block: {:?}", e)) { - info!(target: LOG_TARGET, "[notify_capacity_manager] Extrinsic found in block, checking if it succeeded or failed"); // Check if the extrinsic succeeded or failed. - // We notify the callers of the success or failure of the extrinsic. - for event in extrinsic.events.iter() { - match &event.event { - RuntimeEvent::System(system_event) => match system_event { - frame_system::Event::ExtrinsicSuccess { dispatch_info } => { - debug!( - target: LOG_TARGET, - "Extrinsic succeeded: {:?}", dispatch_info - ); - if let Err(e) = self.execute_with_capacity_manager(|manager| { - manager.complete_requests_waiting_for_inclusion(); - }) { - error!(target: LOG_TARGET, "Failed to complete requests waiting for inclusion: {:?}", e); + let result = extrinsic + .events + .iter() + .find_map(|event| { + if let RuntimeEvent::System(system_event) = &event.event { + match system_event { + frame_system::Event::ExtrinsicSuccess { dispatch_info: _ } => { + Some(Ok(())) } - } - frame_system::Event::ExtrinsicFailed { dispatch_error, dispatch_info: _ } => { - error!( - target: LOG_TARGET, - "Extrinsic failed: {:?}", dispatch_error - ); - if let Err(e) = self.execute_with_capacity_manager(|manager| { - manager.fail_requests_waiting_for_inclusion(format!("Extrinsic failed: {:?}", dispatch_error)); - }) { - error!(target: LOG_TARGET, "Failed to fail requests waiting for inclusion: {:?}", e); + frame_system::Event::ExtrinsicFailed { + dispatch_error, + dispatch_info: _, + } => { + Some(Err(format!("Extrinsic failed: {:?}", dispatch_error))) } + _ => None, } - _ => {} - }, - _ => {} - } + } else { + None + } + }) + .unwrap_or(Ok(())); + + // Notify all callers of the result. + if let Err(e) = self.execute_with_capacity_manager(|manager| { + manager.complete_requests_waiting_for_inclusion(result); + }) { + error!(target: LOG_TARGET, "[notify_capacity_manager] Failed to complete requests waiting for inclusion: {:?}", e); } } } } + // We will only attempt to process the next batch of requests in the queue if there are no requests waiting for inclusion. + if self + .capacity_manager + .as_ref() + .unwrap() + .has_requests_waiting_for_inclusion() + { + return; + } + // Query earliest block to change capacity - let earliest_block = match self + let Ok(earliest_block) = self .client .runtime_api() .query_earliest_change_capacity_block(current_block_hash, &provider_id) .unwrap_or_else(|_| { error!(target: LOG_TARGET, "[notify_capacity_manager] Failed to query earliest block to change capacity"); Err(QueryEarliestChangeCapacityBlockError::InternalError) - }) { - Ok(earliest_block) => { - earliest_block - } - Err(e) => { - error!(target: LOG_TARGET, "[notify_capacity_manager] Failed to query earliest block to change capacity: {:?}", e); - return; - } + }) + else { + return; }; // We can send the transaction 1 block before the earliest block to change capacity since it will be included in the next block. diff --git a/node/src/services/builder.rs b/node/src/services/builder.rs index 9345e3eba..2aba52702 100644 --- a/node/src/services/builder.rs +++ b/node/src/services/builder.rs @@ -372,7 +372,7 @@ where <(UserRole, NoStorageLayer) as ShNodeType>::FSH::new(), // Not used by the user role ProviderConfig { - capacity_config: CapacityConfig::new(0, 0, 0.0), + capacity_config: CapacityConfig::new(0, 0), extrinsic_retry_timeout: self.extrinsic_retry_timeout, }, self.indexer_db_pool.clone(), From a39f2753f64ec23ec0576938fd7669cb9fc8fb31 Mon Sep 17 00:00:00 2001 From: Michael Assaf Date: Mon, 24 Feb 2025 08:20:09 -0500 Subject: [PATCH 3/7] amend clippy --- .../src/capacity_manager.rs | 3 +-- node/src/tasks/bsp_upload_file.rs | 3 --- node/src/tasks/msp_move_bucket.rs | 20 ++++++------------- node/src/tasks/msp_upload_file.rs | 6 +----- pallets/file-system/src/utils.rs | 4 ++-- pallets/provider-randomness/src/lib.rs | 6 ++---- pallets/providers/src/benchmarking.rs | 2 +- 7 files changed, 13 insertions(+), 31 deletions(-) diff --git a/client/blockchain-service/src/capacity_manager.rs b/client/blockchain-service/src/capacity_manager.rs index 2f094527d..1a1cc2447 100644 --- a/client/blockchain-service/src/capacity_manager.rs +++ b/client/blockchain-service/src/capacity_manager.rs @@ -94,8 +94,7 @@ impl CapacityRequestQueue { total_required: StorageData, ) -> StorageData { // Calculate how many jumps we need to cover the required capacity - let jumps_needed = (total_required + self.capacity_config.jump_capacity - 1) - / self.capacity_config.jump_capacity; + let jumps_needed = total_required.div_ceil(self.capacity_config.jump_capacity); let total_jump_capacity = jumps_needed * self.capacity_config.jump_capacity; // Calculate new total capacity diff --git a/node/src/tasks/bsp_upload_file.rs b/node/src/tasks/bsp_upload_file.rs index 643824b6d..3b0bdb77c 100644 --- a/node/src/tasks/bsp_upload_file.rs +++ b/node/src/tasks/bsp_upload_file.rs @@ -1,9 +1,6 @@ use std::{ - cmp::max, collections::{HashMap, HashSet}, - ops::Add, str::FromStr, - sync::Arc, time::Duration, }; diff --git a/node/src/tasks/msp_move_bucket.rs b/node/src/tasks/msp_move_bucket.rs index 6f648d41a..824756a4e 100644 --- a/node/src/tasks/msp_move_bucket.rs +++ b/node/src/tasks/msp_move_bucket.rs @@ -4,7 +4,6 @@ use ordered_float::OrderedFloat; use priority_queue::PriorityQueue; use rand::{rngs::StdRng, SeedableRng}; use std::{ - cmp::max, collections::{HashMap, HashSet}, sync::{Arc, Mutex}, time::Duration, @@ -33,7 +32,6 @@ use shc_file_transfer_service::{ use shc_forest_manager::traits::{ForestStorage, ForestStorageHandler}; use shp_constants::FILE_CHUNK_SIZE; use shp_file_metadata::{Chunk, ChunkId, Leaf as ProvenLeaf}; -use storage_hub_runtime::StorageDataUnit; use crate::services::{ handler::StorageHubHandler, @@ -202,7 +200,7 @@ where .file_storage .write() .await - .insert_file(file_key.clone(), file_metadata.clone()) + .insert_file(file_key, file_metadata.clone()) .map_err(|error| { anyhow!( "CRITICAL ❗️❗️❗️: Failed to insert file {:?} into file storage: {:?}", @@ -319,7 +317,7 @@ where Self { storage_hub_handler: self.storage_hub_handler.clone(), file_storage_inserted_file_keys: self.file_storage_inserted_file_keys.clone(), - pending_bucket_id: self.pending_bucket_id.clone(), + pending_bucket_id: self.pending_bucket_id, bsp_peer_manager: self.bsp_peer_manager.clone(), } } @@ -588,12 +586,7 @@ where match self .storage_hub_handler .file_transfer - .download_request( - peer_id, - file_key.into(), - chunk_batch.clone(), - Some(bucket.clone()), - ) + .download_request(peer_id, file_key.into(), chunk_batch.clone(), Some(*bucket)) .await { Ok(download_request) => { @@ -710,7 +703,7 @@ where { let mut peer_manager = self.bsp_peer_manager.write().await; for &peer_id in &bsp_peer_ids { - peer_manager.add_peer(peer_id, file_key.clone()); + peer_manager.add_peer(peer_id, file_key); } } @@ -724,8 +717,7 @@ where let semaphore = Arc::clone(&chunk_semaphore); let task = self.clone(); let file_metadata = file_metadata.clone(); - let file_key = file_key.clone(); - let bucket = bucket.clone(); + let bucket = *bucket; let peer_manager = Arc::clone(&peer_manager); tokio::spawn(async move { @@ -1043,7 +1035,7 @@ impl BspPeerManager { .peers .entry(peer_id) .or_insert_with(|| BspPeerStats::new()); - stats.add_file_key(file_key.clone()); + stats.add_file_key(file_key); // Add to the priority queue for this file key let queue = self diff --git a/node/src/tasks/msp_upload_file.rs b/node/src/tasks/msp_upload_file.rs index 25166db5b..581c43dbb 100644 --- a/node/src/tasks/msp_upload_file.rs +++ b/node/src/tasks/msp_upload_file.rs @@ -1,8 +1,5 @@ -use std::{cmp::max, collections::HashMap, str::FromStr, time::Duration}; - use anyhow::anyhow; use std::{ - cmp::max, collections::{HashMap, HashSet}, str::FromStr, time::Duration, @@ -22,14 +19,13 @@ use shc_blockchain_service::{commands::BlockchainServiceInterface, events::NewSt use shc_common::types::{ FileKey, FileKeyWithProof, FileMetadata, HashT, RejectedStorageRequestReason, StorageProofsMerkleTrieLayout, StorageProviderId, StorageRequestMspAcceptedFileKeys, - StorageRequestMspBucketResponse, BATCH_CHUNK_FILE_TRANSFER_MAX_SIZE, + StorageRequestMspBucketResponse, BATCH_CHUNK_FILE_TRANSFER_MAX_SIZE, FILE_CHUNK_SIZE, }; use shc_file_manager::traits::{FileStorage, FileStorageWriteError, FileStorageWriteOutcome}; use shc_file_transfer_service::{ commands::FileTransferServiceInterface, events::RemoteUploadRequest, }; use shc_forest_manager::traits::{ForestStorage, ForestStorageHandler}; -use storage_hub_runtime::StorageDataUnit; use crate::services::types::ShNodeType; use crate::services::{handler::StorageHubHandler, types::MspForestStorageHandlerT}; diff --git a/pallets/file-system/src/utils.rs b/pallets/file-system/src/utils.rs index 6a4b53f5f..3784062bb 100644 --- a/pallets/file-system/src/utils.rs +++ b/pallets/file-system/src/utils.rs @@ -2813,9 +2813,9 @@ where } fn do_decode_generic_apply_delta_event_info( - encoded_event_info: &[u8], + mut encoded_event_info: &[u8], ) -> Result, codec::Error> { - BucketIdFor::::decode(&mut encoded_event_info.as_ref()) + BucketIdFor::::decode(&mut encoded_event_info) } } diff --git a/pallets/provider-randomness/src/lib.rs b/pallets/provider-randomness/src/lib.rs index 3219ab186..04eb6b7d7 100644 --- a/pallets/provider-randomness/src/lib.rs +++ b/pallets/provider-randomness/src/lib.rs @@ -576,7 +576,6 @@ pub mod pallet { // The difference between the sets are the Providers that did not submit their seed commitments let missing_providers: Vec> = due_providers_for_current_tick .difference(&providers_that_submitted) - .into_iter() .cloned() .collect(); @@ -620,12 +619,11 @@ pub mod pallet { ProvidersToMarkAsSlashable::::take(current_tick_to_process); // If there are any, process them, consuming the weight used to do so - let should_advance_tick = if providers_to_mark.is_some() { + let should_advance_tick = if let Some(providers_to_mark) = providers_to_mark { Self::process_providers_to_mark_as_slashable( &mut weight_meter, current_tick_to_process, - providers_to_mark - .expect("This option is some since we checked it before. qed"), + providers_to_mark, ) } else { true diff --git a/pallets/providers/src/benchmarking.rs b/pallets/providers/src/benchmarking.rs index cf7a4939b..d1e849f77 100644 --- a/pallets/providers/src/benchmarking.rs +++ b/pallets/providers/src/benchmarking.rs @@ -578,7 +578,7 @@ mod benchmarks { /*********** Call the extrinsic to benchmark: ***********/ #[extrinsic_call] - _(RawOrigin::Signed(user_account.clone()), msp_id.clone()); + _(RawOrigin::Signed(user_account.clone()), msp_id); /*********** Post-benchmark checks: ***********/ // Verify that the event of the MSP sign off was emitted From 00300ac6ebfbe0db70e896c3dadad308d8ebe3e1 Mon Sep 17 00:00:00 2001 From: Michael Assaf Date: Mon, 24 Feb 2025 08:43:42 -0500 Subject: [PATCH 4/7] amend merge changes --- node/src/tasks/msp_move_bucket.rs | 158 ++---------------------------- node/src/tasks/msp_upload_file.rs | 2 +- 2 files changed, 9 insertions(+), 151 deletions(-) diff --git a/node/src/tasks/msp_move_bucket.rs b/node/src/tasks/msp_move_bucket.rs index 3a11ca76f..f9b763e10 100644 --- a/node/src/tasks/msp_move_bucket.rs +++ b/node/src/tasks/msp_move_bucket.rs @@ -564,54 +564,22 @@ where let max_storage_capacity = self .storage_hub_handler .provider_config - .max_storage_capacity; + .capacity_config + .max_capacity(); - if max_storage_capacity == current_capacity { - let err_msg = - "Reached maximum storage capacity limit. Unable to add more storage capacity."; - warn!(target: LOG_TARGET, err_msg); + if max_storage_capacity <= current_capacity { + let err_msg = "Reached maximum storage capacity limit. Unable to add more more storage capacity."; + error!( + target: LOG_TARGET, "{}", err_msg + ); return Err(anyhow::anyhow!(err_msg)); } - let new_capacity = self.calculate_capacity(required_size, current_capacity)?; - - let call = storage_hub_runtime::RuntimeCall::Providers( - pallet_storage_providers::Call::change_capacity { new_capacity }, - ); - - let earliest_change_capacity_block = self - .storage_hub_handler - .blockchain - .query_earliest_change_capacity_block(own_msp_id) - .await - .map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to query earliest change capacity block: {:?}", e - ); - anyhow::anyhow!("Failed to query earliest change capacity block: {:?}", e) - })?; - - // Wait for the earliest block where the capacity can be changed - self.storage_hub_handler - .blockchain - .wait_for_block(earliest_change_capacity_block) - .await?; - self.storage_hub_handler .blockchain - .send_extrinsic(call, SendExtrinsicOptions::default()) - .await? - .with_timeout(Duration::from_secs(60)) - .watch_for_success(&self.storage_hub_handler.blockchain) + .increase_capacity(CapacityRequestData::new(required_size)) .await?; - info!( - target: LOG_TARGET, - "Increased storage capacity to {:?} bytes", - new_capacity - ); - let available_capacity = self .storage_hub_handler .blockchain @@ -637,29 +605,6 @@ where Ok(()) } - fn calculate_capacity( - &self, - required_size: u64, - current_capacity: StorageDataUnit, - ) -> Result { - let jump_capacity = self.storage_hub_handler.provider_config.jump_capacity; - let jumps_needed = (required_size + jump_capacity - 1) / jump_capacity; - let jumps = max(jumps_needed, 1); - let bytes_to_add = jumps * jump_capacity; - let required_capacity = current_capacity.checked_add(bytes_to_add).ok_or_else(|| { - anyhow::anyhow!("Reached maximum storage capacity limit. Cannot accept bucket move.") - })?; - - let max_storage_capacity = self - .storage_hub_handler - .provider_config - .max_storage_capacity; - - let new_capacity = std::cmp::min(required_capacity, max_storage_capacity); - - Ok(new_capacity) - } - /// Processes a single chunk download response async fn process_chunk_download_response( &self, @@ -986,93 +931,6 @@ where Ok(()) } - - async fn check_and_increase_capacity( - &self, - required_size: u64, - own_msp_id: ProviderId, - ) -> anyhow::Result<()> { - let available_capacity = self - .storage_hub_handler - .blockchain - .query_available_storage_capacity(own_msp_id) - .await - .map_err(|e| { - let err_msg = format!("Failed to query available storage capacity: {:?}", e); - error!( - target: LOG_TARGET, - err_msg - ); - anyhow::anyhow!(err_msg) - })?; - - // Increase storage capacity if the available capacity is less than the file size. - if available_capacity < required_size { - warn!( - target: LOG_TARGET, - "Insufficient storage capacity to move bucket. Required: {}, available: {}", - required_size, available_capacity - ); - - // Check that the BSP has not reached the maximum storage capacity. - let current_capacity = self - .storage_hub_handler - .blockchain - .query_storage_provider_capacity(own_msp_id) - .await - .map_err(|e| { - error!( - target: LOG_TARGET, - "Failed to query storage provider capacity: {:?}", e - ); - anyhow::anyhow!("Failed to query storage provider capacity: {:?}", e) - })?; - - let max_storage_capacity = self - .storage_hub_handler - .provider_config - .capacity_config - .max_capacity(); - - if max_storage_capacity <= current_capacity { - let err_msg = "Reached maximum storage capacity limit. Unable to add more more storage capacity."; - error!( - target: LOG_TARGET, "{}", err_msg - ); - return Err(anyhow::anyhow!(err_msg)); - } - - self.storage_hub_handler - .blockchain - .increase_capacity(CapacityRequestData::new(required_size)) - .await?; - - let available_capacity = self - .storage_hub_handler - .blockchain - .query_available_storage_capacity(own_msp_id) - .await - .map_err(|e| { - let err_msg = format!("Failed to query available storage capacity: {:?}", e); - error!( - target: LOG_TARGET, - err_msg - ); - anyhow::anyhow!(err_msg) - })?; - - // Skip volunteering if the new available capacity is still less than the file size. - if available_capacity < required_size { - let err_msg = "Increased storage capacity is still insufficient to volunteer for file. Skipping volunteering."; - warn!( - target: LOG_TARGET, "{}", err_msg - ); - return Err(anyhow::anyhow!(err_msg)); - } - } - - Ok(()) - } } /// Tracks performance metrics for a BSP peer. diff --git a/node/src/tasks/msp_upload_file.rs b/node/src/tasks/msp_upload_file.rs index ceec6bcaf..422d45794 100644 --- a/node/src/tasks/msp_upload_file.rs +++ b/node/src/tasks/msp_upload_file.rs @@ -19,7 +19,7 @@ use shc_blockchain_service::{commands::BlockchainServiceInterface, events::NewSt use shc_common::types::{ FileKey, FileKeyWithProof, FileMetadata, HashT, RejectedStorageRequestReason, StorageProofsMerkleTrieLayout, StorageProviderId, StorageRequestMspAcceptedFileKeys, - StorageRequestMspBucketResponse, BATCH_CHUNK_FILE_TRANSFER_MAX_SIZE, FILE_CHUNK_SIZE, + StorageRequestMspBucketResponse, BATCH_CHUNK_FILE_TRANSFER_MAX_SIZE, }; use shc_file_manager::traits::{FileStorage, FileStorageWriteError, FileStorageWriteOutcome}; use shc_file_transfer_service::{ From 974395c15e92662b9f9af76abce856a44f59be3b Mon Sep 17 00:00:00 2001 From: Michael Assaf Date: Wed, 26 Feb 2025 15:59:41 -0500 Subject: [PATCH 5/7] amend; add docs to `total_required` field, safe substraction methods, remove unused const --- .../blockchain-service/src/capacity_manager.rs | 16 +++++++++------- client/blockchain-service/src/utils.rs | 2 +- test/util/bspNet/consts.ts | 1 - 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/client/blockchain-service/src/capacity_manager.rs b/client/blockchain-service/src/capacity_manager.rs index 1a1cc2447..b079109f7 100644 --- a/client/blockchain-service/src/capacity_manager.rs +++ b/client/blockchain-service/src/capacity_manager.rs @@ -1,7 +1,7 @@ use std::collections::VecDeque; use anyhow::anyhow; -use log::{error, info}; +use log::{debug, error}; use pallet_storage_providers_runtime_api::{ QueryEarliestChangeCapacityBlockError, QueryStorageProviderCapacityError, StorageProvidersApi, }; @@ -26,7 +26,9 @@ pub struct CapacityRequestQueue { /// All requesters will be notified via the callback when the `CapacityChanged` event is processed /// in the block important notification pipeline. This list will be cleared subsequently. requests_waiting_for_inclusion: Vec, - /// Total required capacity. + /// Total accumulated capacity required by the aggregate of all `pending_requests`. + /// + /// This is reset when the `pending_requests` is moved to `requests_waiting_for_inclusion` when they have been batched in a single transaction. total_required: StorageData, /// The last submitted transaction which `requests_waiting_for_inclusion` is waiting for. last_submitted_transaction: Option, @@ -276,7 +278,7 @@ where &mut self, block_number: BlockNumber, ) -> Result<(), anyhow::Error> { - info!(target: LOG_TARGET, "[process_capacity_requests] Processing capacity requests"); + debug!(target: LOG_TARGET, "[process_capacity_requests] Processing capacity requests"); let (current_block_hash, current_capacity, inner_provider_id) = match self .check_capacity_request_conditions() .await @@ -295,12 +297,12 @@ where // Skip the process if there are no pending requests. if !capacity_manager_ref.has_pending_requests() { - info!(target: LOG_TARGET, "[process_capacity_requests] No pending requests, skipping"); + debug!(target: LOG_TARGET, "[process_capacity_requests] No pending requests, skipping"); return Ok(()); } // Query earliest block to change capacity - info!(target: LOG_TARGET, "[process_capacity_requests] Querying earliest block to change capacity"); + debug!(target: LOG_TARGET, "[process_capacity_requests] Querying earliest block to change capacity"); let earliest_block = self .client .runtime_api() @@ -311,8 +313,8 @@ where }) .map_err(|e| anyhow!("Failed to query earliest block to change capacity: {:?}", e))?; - if block_number < earliest_block - 1 { - info!(target: LOG_TARGET, "[process_capacity_requests] Earliest block to change capacity: {:?}", earliest_block); + if block_number < earliest_block.saturating_sub(1) { + debug!(target: LOG_TARGET, "[process_capacity_requests] Earliest block to change capacity: {:?}", earliest_block); // Must wait until the earliest block to change capacity. return Ok(()); } diff --git a/client/blockchain-service/src/utils.rs b/client/blockchain-service/src/utils.rs index b1c43ff44..2c132e960 100644 --- a/client/blockchain-service/src/utils.rs +++ b/client/blockchain-service/src/utils.rs @@ -214,7 +214,7 @@ where }; // We can send the transaction 1 block before the earliest block to change capacity since it will be included in the next block. - if *block_number >= earliest_block - 1 { + if *block_number >= earliest_block.saturating_sub(1) { if let Err(e) = self.process_capacity_requests(*block_number).await { error!(target: LOG_TARGET, "[notify_capacity_manager] Failed to process capacity requests: {:?}", e); } diff --git a/test/util/bspNet/consts.ts b/test/util/bspNet/consts.ts index a5819ec77..2b37123ea 100644 --- a/test/util/bspNet/consts.ts +++ b/test/util/bspNet/consts.ts @@ -110,5 +110,4 @@ export const TRANSFER_WEIGHT_REF_TIME = 297_297_000; export const TRANSFER_WEIGHT_PROOF_SIZE = 308; export const JUMP_CAPACITY_BSP = 1073741824; -export const CAPACITY_BUFFER_PERCENTILE = 0.2; export const MSP_CHARGING_PERIOD = 12; From 73a40334f691d2acc8f056f81926d77ed9a2f063 Mon Sep 17 00:00:00 2001 From: Michael Assaf Date: Thu, 27 Feb 2025 09:39:37 -0500 Subject: [PATCH 6/7] amend; simplify borrow mut capacity manager --- .../src/capacity_manager.rs | 64 ++++++------------- client/blockchain-service/src/commands.rs | 1 - client/blockchain-service/src/utils.rs | 13 ++-- node/src/tasks/bsp_upload_file.rs | 5 +- node/src/tasks/msp_move_bucket.rs | 3 +- node/src/tasks/msp_upload_file.rs | 2 +- 6 files changed, 33 insertions(+), 55 deletions(-) diff --git a/client/blockchain-service/src/capacity_manager.rs b/client/blockchain-service/src/capacity_manager.rs index b079109f7..98af8808a 100644 --- a/client/blockchain-service/src/capacity_manager.rs +++ b/client/blockchain-service/src/capacity_manager.rs @@ -23,8 +23,8 @@ pub struct CapacityRequestQueue { pending_requests: VecDeque, /// Capacity requests bundled in a single transaction waiting to be included in a block. /// - /// All requesters will be notified via the callback when the `CapacityChanged` event is processed - /// in the block important notification pipeline. This list will be cleared subsequently. + /// All requesters will be notified via the callback when the transaction is included in the + /// block important notification pipeline. This list will be cleared subsequently. requests_waiting_for_inclusion: Vec, /// Total accumulated capacity required by the aggregate of all `pending_requests`. /// @@ -223,43 +223,17 @@ impl BlockchainService where FSH: ForestStorageHandler + Clone + Send + Sync + 'static, { - /// Helper method to temporarily take ownership of the capacity manager - pub(crate) fn execute_with_capacity_manager(&mut self, f: F) -> Result - where - F: FnOnce(&mut CapacityRequestQueue) -> R, - { - let mut manager = self - .capacity_manager - .take() - .ok_or_else(|| anyhow!("Capacity manager not initialized"))?; - - let result = f(&mut manager); - self.capacity_manager = Some(manager); - - Ok(result) - } - /// Queue a capacity request. /// /// If the capacity request cannot be queued for any reason, the error will be sent back to the caller. pub(crate) async fn queue_capacity_request(&mut self, capacity_request: CapacityRequest) { match self.check_capacity_request_conditions().await { Ok((_, current_capacity, _)) => { - // Wrap in Option to control ownership - let mut request_opt = Some(capacity_request); - - let result = self.execute_with_capacity_manager(|manager| { - // Take ownership from the Option - let request = request_opt.take().unwrap(); - manager.queue_capacity_request(request, current_capacity) - }); - - if let Err(e) = result { - // If we still have the request (manager wasn't initialized), send error - if let Some(request) = request_opt { - request.send_result(Err(e)); - } - // Else: request was successfully queued before an error occurred + if let Some(capacity_manager) = self.capacity_manager.as_mut() { + capacity_manager.queue_capacity_request(capacity_request, current_capacity); + } else { + capacity_request.send_result(Err(anyhow!("Capacity manager not initialized"))); + return; } } Err(e) => { @@ -334,31 +308,31 @@ where match self.send_extrinsic(call, Default::default()).await { Ok(output) => { // Add all pending requests to the list of requests waiting for inclusion. - if let Err(e) = self.execute_with_capacity_manager(|manager| { - manager.add_pending_requests_to_waiting_for_inclusion( + if let Some(capacity_manager) = self.capacity_manager.as_mut() { + capacity_manager.add_pending_requests_to_waiting_for_inclusion( SubmittedTransaction::new(output.receiver, output.hash, output.nonce), ); - }) { - error!(target: LOG_TARGET, "Failed to add pending requests to waiting for inclusion: {:?}", e); + } else { + error!(target: LOG_TARGET, "Capacity manager not initialized"); } } Err(e) => { error!(target: LOG_TARGET, "Failed to send increase capacity extrinsic: {:?}", e); // Notify all in-flight requests of the error - if let Err(e) = self.execute_with_capacity_manager(|manager| { - manager.fail_requests(e.to_string()); - }) { - error!(target: LOG_TARGET, "Failed to notify in-flight requests of the error: {:?}", e); + if let Some(capacity_manager) = self.capacity_manager.as_mut() { + capacity_manager.fail_requests(e.to_string()); + } else { + error!(target: LOG_TARGET, "Capacity manager not initialized"); } } }; // Ensure the pending requests queue and total required capacity are reset so that // new capacity requests can be queued and tally up from 0 again. - if let Err(e) = self.execute_with_capacity_manager(|manager| { - manager.reset_queue(); - }) { - error!(target: LOG_TARGET, "Failed to reset capacity manager: {:?}", e); + if let Some(capacity_manager) = self.capacity_manager.as_mut() { + capacity_manager.reset_queue(); + } else { + error!(target: LOG_TARGET, "Capacity manager not initialized"); } Ok(()) diff --git a/client/blockchain-service/src/commands.rs b/client/blockchain-service/src/commands.rs index 3288065be..ade9facc8 100644 --- a/client/blockchain-service/src/commands.rs +++ b/client/blockchain-service/src/commands.rs @@ -801,7 +801,6 @@ where let message = BlockchainServiceCommand::IncreaseCapacity { request, callback }; self.send(message).await; let rx = rx.await.expect("Failed to receive response from BlockchainService. Probably means BlockchainService has crashed."); - // This should receive a WatchTransaction so that the blockchain service can fire and forget rx.await.expect("Failed to wait for capacity increase") } diff --git a/client/blockchain-service/src/utils.rs b/client/blockchain-service/src/utils.rs index 95d76d37e..ccb2c8a33 100644 --- a/client/blockchain-service/src/utils.rs +++ b/client/blockchain-service/src/utils.rs @@ -143,7 +143,10 @@ where } }; - let capacity_manager_ref = self.capacity_manager.as_ref().unwrap(); + let capacity_manager_ref = self + .capacity_manager + .as_ref() + .expect("Capacity manager should exist when calling this function"); // Send response to all callers waiting for their capacity request to be included in a block. if capacity_manager_ref.has_requests_waiting_for_inclusion() { @@ -181,10 +184,10 @@ where .unwrap_or(Ok(())); // Notify all callers of the result. - if let Err(e) = self.execute_with_capacity_manager(|manager| { - manager.complete_requests_waiting_for_inclusion(result); - }) { - error!(target: LOG_TARGET, "[notify_capacity_manager] Failed to complete requests waiting for inclusion: {:?}", e); + if let Some(capacity_manager) = self.capacity_manager.as_mut() { + capacity_manager.complete_requests_waiting_for_inclusion(result); + } else { + error!(target: LOG_TARGET, "[notify_capacity_manager] Capacity manager not initialized"); } } } diff --git a/node/src/tasks/bsp_upload_file.rs b/node/src/tasks/bsp_upload_file.rs index 08bfd1ac1..8396c0990 100644 --- a/node/src/tasks/bsp_upload_file.rs +++ b/node/src/tasks/bsp_upload_file.rs @@ -494,10 +494,11 @@ where .max_capacity(); if max_storage_capacity <= current_capacity { - let err_msg = "Reached maximum storage capacity limit. Unable to add more more storage capacity."; + let err_msg = + "Reached maximum storage capacity limit. Unable to add more storage capacity."; error!( target: LOG_TARGET, "{}", err_msg - ); + );`` return Err(anyhow::anyhow!(err_msg)); } diff --git a/node/src/tasks/msp_move_bucket.rs b/node/src/tasks/msp_move_bucket.rs index 6dccaf78e..d60f8351b 100644 --- a/node/src/tasks/msp_move_bucket.rs +++ b/node/src/tasks/msp_move_bucket.rs @@ -572,7 +572,8 @@ where .max_capacity(); if max_storage_capacity <= current_capacity { - let err_msg = "Reached maximum storage capacity limit. Unable to add more more storage capacity."; + let err_msg = + "Reached maximum storage capacity limit. Unable to add more storage capacity."; error!( target: LOG_TARGET, "{}", err_msg ); diff --git a/node/src/tasks/msp_upload_file.rs b/node/src/tasks/msp_upload_file.rs index 8c5e3908c..37246b737 100644 --- a/node/src/tasks/msp_upload_file.rs +++ b/node/src/tasks/msp_upload_file.rs @@ -480,7 +480,7 @@ where .max_capacity(); if max_storage_capacity <= current_capacity { - let err_msg = "Reached maximum storage capacity limit. Unable to add more more storage capacity."; + let err_msg = "Reached maximum storage capacity limit. Unable to add more storage capacity."; error!( target: LOG_TARGET, "{}", err_msg ); From e138c265eb1c87c2e79597e2cdb2a975fff555a7 Mon Sep 17 00:00:00 2001 From: Michael Assaf Date: Thu, 27 Feb 2025 09:44:46 -0500 Subject: [PATCH 7/7] fmt --- node/src/tasks/bsp_upload_file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/tasks/bsp_upload_file.rs b/node/src/tasks/bsp_upload_file.rs index 40a4fede2..a20855704 100644 --- a/node/src/tasks/bsp_upload_file.rs +++ b/node/src/tasks/bsp_upload_file.rs @@ -498,7 +498,7 @@ where "Reached maximum storage capacity limit. Unable to add more storage capacity."; error!( target: LOG_TARGET, "{}", err_msg - );`` + ); return Err(anyhow::anyhow!(err_msg)); }