diff --git a/client/blockchain-service/src/handler.rs b/client/blockchain-service/src/handler.rs index bc5a48796..8f425398a 100644 --- a/client/blockchain-service/src/handler.rs +++ b/client/blockchain-service/src/handler.rs @@ -126,6 +126,8 @@ where /// /// Only required if the node is running as a provider. pub(crate) capacity_manager: Option, + /// Whether the node is running in maintenance mode. + pub(crate) maintenance_mode: bool, } /// Event loop for the BlockchainService actor. @@ -216,6 +218,12 @@ where message: Self::Message, ) -> impl std::future::Future + Send { async { + // If the node is running in maintenance mode, we don't process any messages. + if self.maintenance_mode { + info!(target: LOG_TARGET, "🔒 Maintenance mode is enabled. Skipping message processing."); + return; + } + match message { BlockchainServiceCommand::SendExtrinsic { call, @@ -1115,6 +1123,7 @@ where rocksdb_root_path: impl Into, notify_period: Option, capacity_request_queue: Option, + maintenance_mode: bool, ) -> Self { Self { event_bus_provider: BlockchainServiceEventBusProvider::new(), @@ -1130,6 +1139,7 @@ where persistent_state: BlockchainServiceStateStore::new(rocksdb_root_path.into()), notify_period, capacity_manager: capacity_request_queue, + maintenance_mode, } } @@ -1285,6 +1295,12 @@ where ) where Block: cumulus_primitives_core::BlockT, { + // If the node is running in maintenance mode, we don't process block imports. + if self.maintenance_mode { + info!(target: LOG_TARGET, "🔒 Maintenance mode is enabled. Skipping processing ofblock import #{}: {}", block_number, block_hash); + return; + } + trace!(target: LOG_TARGET, "📠 Processing block import #{}: {}", block_number, block_hash); // Provider-specific code to run on every block import. @@ -1377,6 +1393,12 @@ where let block_hash: H256 = notification.hash; let block_number: BlockNumber = (*notification.header.number()).saturated_into(); + // If the node is running in maintenance mode, we don't process finality notifications. + if self.maintenance_mode { + info!(target: LOG_TARGET, "🔒 Maintenance mode is enabled. Skipping finality notification #{}: {}", block_number, block_hash); + return; + } + info!(target: LOG_TARGET, "📨 Finality notification #{}: {}", block_number, block_hash); // Get events from storage. diff --git a/client/blockchain-service/src/lib.rs b/client/blockchain-service/src/lib.rs index 45d5e20db..1a223a247 100644 --- a/client/blockchain-service/src/lib.rs +++ b/client/blockchain-service/src/lib.rs @@ -30,6 +30,7 @@ pub async fn spawn_blockchain_service( rocksdb_root_path: impl Into, notify_period: Option, capacity_config: Option, + maintenance_mode: bool, ) -> ActorHandle> where FSH: shc_forest_manager::traits::ForestStorageHandler + Clone + Send + Sync + 'static, @@ -46,6 +47,7 @@ where rocksdb_root_path, notify_period, capacity_config.map(CapacityRequestQueue::new), + maintenance_mode, ); task_spawner.spawn_actor(blockchain_service) diff --git a/node/src/cli.rs b/node/src/cli.rs index 7c3f108c7..7bfc6194f 100644 --- a/node/src/cli.rs +++ b/node/src/cli.rs @@ -106,6 +106,12 @@ pub struct ProviderConfigurations { #[arg(long)] pub provider: bool, + /// Run the node in maintenance mode. + /// In this mode, the node will not import blocks or participate in consensus, + /// but will allow specific RPC calls for file and storage management. + #[arg(long, default_value = "false")] + pub maintenance_mode: bool, + /// Type of StorageHub provider. #[clap( long, @@ -173,6 +179,7 @@ impl ProviderConfigurations { jump_capacity: self.jump_capacity, extrinsic_retry_timeout: self.extrinsic_retry_timeout, msp_charging_period: self.msp_charging_period, + maintenance_mode: self.maintenance_mode, } } } diff --git a/node/src/command.rs b/node/src/command.rs index be273bd20..af1f2790c 100644 --- a/node/src/command.rs +++ b/node/src/command.rs @@ -35,6 +35,8 @@ pub struct ProviderOptions { pub extrinsic_retry_timeout: u64, /// MSP charging fees frequency. pub msp_charging_period: Option, + /// Whether the node is running in maintenance mode. + pub maintenance_mode: bool, } fn load_spec(id: &str) -> std::result::Result, String> { diff --git a/node/src/service.rs b/node/src/service.rs index 087a1ffc5..7dbba53ce 100644 --- a/node/src/service.rs +++ b/node/src/service.rs @@ -272,6 +272,7 @@ async fn finish_sh_builder_and_run_tasks( rpc_handlers: RpcHandlers, keystore: KeystorePtr, rocksdb_root_path: impl Into, + maintenance_mode: bool, ) -> Result<(), sc_service::Error> where R: ShRole, @@ -287,6 +288,7 @@ where keystore.clone(), Arc::new(rpc_handlers), rocksdb_root_path, + maintenance_mode, ) .await; @@ -319,6 +321,23 @@ where use async_io::Timer; use sc_consensus_manual_seal::{run_manual_seal, EngineCommand, ManualSealParams}; + // Check if we're in maintenance mode and build the dev node in maintenance mode if so + let maintenance_mode = provider_options + .as_ref() + .map_or(false, |opts| opts.maintenance_mode); + if maintenance_mode { + log::info!("🛠️ Running dev node in maintenance mode"); + log::info!("🛠️ Network participation is disabled"); + log::info!("🛠️ Only storage management RPC methods are available"); + return start_dev_in_maintenance_mode::( + config, + provider_options, + indexer_config, + hwbench, + ) + .await; + } + let sc_service::PartialComponents { client, backend, @@ -530,6 +549,7 @@ where rpc_handlers, keystore.clone(), base_path, + maintenance_mode, ) .await?; } @@ -696,6 +716,195 @@ where Ok(task_manager) } +async fn start_dev_in_maintenance_mode( + config: Configuration, + provider_options: Option, + indexer_config: IndexerConfigurations, + hwbench: Option, +) -> sc_service::error::Result +where + R: ShRole, + S: ShStorageLayer, + (R, S): ShNodeType, + StorageHubBuilder: StorageLayerBuilder + Buildable<(R, S)>, + StorageHubHandler<(R, S)>: RunnableTasks, + Network: sc_network::NetworkBackend, +{ + let sc_service::PartialComponents { + client, + backend, + mut task_manager, + import_queue, + keystore_container, + select_chain: _maybe_select_chain, + transaction_pool, + other: (_, mut telemetry, _), + } = new_partial(&config, true)?; + + let maybe_database_url = indexer_config + .database_url + .clone() + .or(env::var("DATABASE_URL").ok()); + + let maybe_db_pool = if let Some(database_url) = maybe_database_url { + Some( + shc_indexer_db::setup_db_pool(database_url) + .await + .map_err(|e| sc_service::Error::Application(Box::new(e)))?, + ) + } else { + None + }; + + if indexer_config.indexer { + let task_spawner = TaskSpawner::new(task_manager.spawn_handle(), "indexer-service"); + spawn_indexer_service( + &task_spawner, + client.clone(), + maybe_db_pool.clone().expect( + "Indexer is enabled but no database URL is provided (via CLI using --database-url or setting DATABASE_URL environment variable)", + ), + ) + .await; + } + + let signing_dev_key = config + .dev_key_seed + .clone() + .expect("Dev key seed must be present in dev mode."); + let keystore = keystore_container.keystore(); + + // Initialise seed for signing transactions using blockchain service. + // In dev mode we use a well known dev account. + keystore + .sr25519_generate_new(BCSV_KEY_TYPE, Some(signing_dev_key.as_ref())) + .expect("Invalid dev signing key provided."); + + let mut net_config = sc_network::config::FullNetworkConfiguration::<_, _, Network>::new( + &config.network, + config + .prometheus_config + .as_ref() + .map(|cfg| cfg.registry.clone()), + ); + + // If we are a provider we update the network configuration with the file transfer protocol. + let mut file_transfer_request_protocol = None; + if provider_options.is_some() { + file_transfer_request_protocol = Some(configure_file_transfer_network( + client.clone(), + &config, + &mut net_config, + )); + } + + let metrics = Network::register_notification_metrics( + config.prometheus_config.as_ref().map(|cfg| &cfg.registry), + ); + + let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = + sc_service::build_network(sc_service::BuildNetworkParams { + config: &config, + net_config, + client: client.clone(), + transaction_pool: transaction_pool.clone(), + spawn_handle: task_manager.spawn_handle(), + import_queue, + block_announce_validator_builder: None, + warp_sync_config: None, + block_relay: None, + metrics, + })?; + + // No offchain workers in maintenance mode - intentionally omitted + + // Create command_sink for RPC + let (command_sink, _) = futures::channel::mpsc::channel(1000); + + // If node is running as a Storage Provider, start building the StorageHubHandler using the StorageHubBuilder. + let (sh_builder, maybe_storage_hub_client_rpc_config) = match init_sh_builder::( + &provider_options, + &task_manager, + file_transfer_request_protocol, + network.clone(), + keystore.clone(), + maybe_db_pool, + ) + .await + { + Some((shb, rpc)) => (Some(shb), Some(rpc)), + None => (None, None), + }; + + let rpc_builder = { + let client = client.clone(); + let transaction_pool = transaction_pool.clone(); + + Box::new(move |_| { + let deps = crate::rpc::FullDeps { + client: client.clone(), + pool: transaction_pool.clone(), + maybe_storage_hub_client_config: maybe_storage_hub_client_rpc_config.clone(), + command_sink: Some(command_sink.clone()), + }; + + crate::rpc::create_full(deps).map_err(Into::into) + }) + }; + + let base_path = config.base_path.path().to_path_buf().clone(); + + let rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams { + rpc_builder, + client: client.clone(), + transaction_pool: transaction_pool.clone(), + task_manager: &mut task_manager, + config, + keystore: keystore.clone(), + backend: backend.clone(), + network: network.clone(), + sync_service: sync_service.clone(), + system_rpc_tx, + tx_handler_controller, + telemetry: telemetry.as_mut(), + })?; + + // Finish building the StorageHubBuilder if node is running as a Storage Provider. + if let Some(_) = provider_options { + finish_sh_builder_and_run_tasks( + sh_builder.expect("StorageHubBuilder should already be initialised."), + client.clone(), + rpc_handlers, + keystore.clone(), + base_path, + true, + ) + .await?; + } + + if let Some(hwbench) = hwbench { + sc_sysinfo::print_hwbench(&hwbench); + + if let Some(ref mut telemetry) = telemetry { + let telemetry_handle = telemetry.handle(); + task_manager.spawn_handle().spawn( + "telemetry_hwbench", + None, + sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench), + ); + } + } + + // In maintenance mode, we intentionally don't start the manual sealing process + // This means no block production will occur + log::info!("🛠️ Dev node started in maintenance mode - block production is disabled"); + log::info!("🛠️ Manual sealing is disabled"); + log::info!("🛠️ Only RPC functionality is available"); + + network_starter.start_network(); + Ok(task_manager) +} + /// Start a node with the given parachain `Configuration` and relay chain `Configuration`. /// /// This is the actual implementation that is abstract over the executor and the runtime api. @@ -716,6 +925,26 @@ where StorageHubHandler<(R, S)>: RunnableTasks, Network: NetworkBackend, { + // Check if we're in maintenance mode and build the node in maintenance mode if so + let maintenance_mode = provider_options + .as_ref() + .map_or(false, |opts| opts.maintenance_mode); + if maintenance_mode { + log::info!("🛠️ Running dev node in maintenance mode"); + log::info!("🛠️ Network participation is disabled"); + log::info!("🛠️ Only storage management RPC methods are available"); + return start_node_in_maintenance_mode::( + parachain_config, + polkadot_config, + collator_options, + provider_options, + indexer_config, + para_id, + hwbench, + ) + .await; + } + let parachain_config = prepare_node_config(parachain_config); let params = new_partial(¶chain_config, false)?; @@ -879,6 +1108,7 @@ where rpc_handlers, keystore.clone(), base_path, + maintenance_mode, ) .await?; } @@ -960,6 +1190,195 @@ where Ok((task_manager, client)) } +async fn start_node_in_maintenance_mode( + parachain_config: Configuration, + polkadot_config: Configuration, + collator_options: CollatorOptions, + provider_options: Option, + indexer_config: IndexerConfigurations, + para_id: ParaId, + hwbench: Option, +) -> sc_service::error::Result<(TaskManager, Arc)> +where + R: ShRole, + S: ShStorageLayer, + (R, S): ShNodeType, + StorageHubBuilder: StorageLayerBuilder + Buildable<(R, S)>, + StorageHubHandler<(R, S)>: RunnableTasks, + Network: NetworkBackend, +{ + let parachain_config = prepare_node_config(parachain_config); + + let params = new_partial(¶chain_config, false)?; + let (_block_import, mut telemetry, telemetry_worker_handle) = params.other; + + // Create network configuration + let mut net_config = sc_network::config::FullNetworkConfiguration::<_, _, Network>::new( + ¶chain_config.network, + parachain_config + .prometheus_config + .as_ref() + .map(|cfg| cfg.registry.clone()), + ); + + let client = params.client.clone(); + let backend = params.backend.clone(); + let mut task_manager = params.task_manager; + let keystore = params.keystore_container.keystore(); + + let maybe_database_url = indexer_config + .database_url + .clone() + .or(env::var("DATABASE_URL").ok()); + + let maybe_db_pool = if let Some(database_url) = maybe_database_url { + Some( + shc_indexer_db::setup_db_pool(database_url) + .await + .map_err(|e| sc_service::Error::Application(Box::new(e)))?, + ) + } else { + None + }; + + if indexer_config.indexer { + let task_spawner = TaskSpawner::new(task_manager.spawn_handle(), "indexer-service"); + spawn_indexer_service( + &task_spawner, + client.clone(), + maybe_db_pool.clone().expect( + "Indexer is enabled but no database URL is provided (via CLI using --database-url or setting DATABASE_URL environment variable)", + ), + ) + .await; + } + + // If we are a provider we update the network configuration with the file transfer protocol. + let mut file_transfer_request_protocol = None; + if provider_options.is_some() { + file_transfer_request_protocol = Some(configure_file_transfer_network( + client.clone(), + ¶chain_config, + &mut net_config, + )); + } + + // Create relay chain interface + let (relay_chain_interface, _collator_key) = build_relay_chain_interface( + polkadot_config, + ¶chain_config, + telemetry_worker_handle, + &mut task_manager, + collator_options.clone(), + hwbench.clone(), + ) + .await + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; + + let transaction_pool = params.transaction_pool.clone(); + + let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = + build_network(BuildNetworkParams { + parachain_config: ¶chain_config, + net_config, + client: client.clone(), + transaction_pool: transaction_pool.clone(), + para_id, + spawn_handle: task_manager.spawn_handle(), + relay_chain_interface: relay_chain_interface.clone(), + import_queue: params.import_queue, + sybil_resistance_level: CollatorSybilResistance::Resistant, // because of Aura + }) + .await?; + + // No need for offchain workers in maintenance mode + + // If node is running as a Storage Provider, start building the StorageHubHandler using the StorageHubBuilder. + let (sh_builder, maybe_storage_hub_client_rpc_config) = match init_sh_builder::( + &provider_options, + &task_manager, + file_transfer_request_protocol, + network.clone(), + keystore.clone(), + maybe_db_pool, + ) + .await + { + Some((shb, rpc)) => (Some(shb), Some(rpc)), + None => (None, None), + }; + + let rpc_builder = { + let client = client.clone(); + let transaction_pool = transaction_pool.clone(); + + Box::new(move |_| { + let deps = crate::rpc::FullDeps { + client: client.clone(), + pool: transaction_pool.clone(), + maybe_storage_hub_client_config: maybe_storage_hub_client_rpc_config.clone(), + command_sink: None, + }; + + crate::rpc::create_full(deps).map_err(Into::into) + }) + }; + + let base_path = parachain_config.base_path.path().to_path_buf().clone(); + + let rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams { + rpc_builder, + client: client.clone(), + transaction_pool: transaction_pool.clone(), + task_manager: &mut task_manager, + config: parachain_config, + keystore: keystore.clone(), + backend: backend.clone(), + network: network.clone(), + sync_service: sync_service.clone(), + system_rpc_tx, + tx_handler_controller, + telemetry: telemetry.as_mut(), + })?; + + // Finish building the StorageHubBuilder if node is running as a Storage Provider. + if let Some(_) = provider_options { + finish_sh_builder_and_run_tasks( + sh_builder.expect("StorageHubBuilder should already be initialised."), + client.clone(), + rpc_handlers, + keystore.clone(), + base_path, + true, + ) + .await?; + } + + if let Some(hwbench) = hwbench { + sc_sysinfo::print_hwbench(&hwbench); + + if let Some(ref mut telemetry) = telemetry { + let telemetry_handle = telemetry.handle(); + task_manager.spawn_handle().spawn( + "telemetry_hwbench", + None, + sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench), + ); + } + } + + // In maintenance mode, we don't need the relay chain tasks + log::info!("🛠️ Skipping relay chain tasks initialization in maintenance mode"); + log::info!("🛠️ Block import and relay chain sync are disabled"); + + // We still need to start the network to allow RPC connections + network_starter.start_network(); + + log::info!("🛠️ Node started in maintenance mode - only RPC functionality is available"); + + Ok((task_manager, client)) +} + /// Build the import queue for the parachain runtime. fn build_import_queue( client: Arc, diff --git a/node/src/services/builder.rs b/node/src/services/builder.rs index 2aba52702..6b81a5639 100644 --- a/node/src/services/builder.rs +++ b/node/src/services/builder.rs @@ -128,6 +128,7 @@ where keystore: KeystorePtr, rpc_handlers: Arc, rocksdb_root_path: impl Into, + maintenance_mode: bool, ) -> &mut Self { if self.forest_storage_handler.is_none() { panic!( @@ -153,6 +154,7 @@ where rocksdb_root_path, self.notify_period, capacity_config, + maintenance_mode, ) .await; diff --git a/test/suites/integration/bsp/maintenance-mode.test.ts b/test/suites/integration/bsp/maintenance-mode.test.ts new file mode 100644 index 000000000..b76d7babd --- /dev/null +++ b/test/suites/integration/bsp/maintenance-mode.test.ts @@ -0,0 +1,92 @@ +import { strictEqual } from "node:assert"; +import { + bspTwoKey, + bspTwoSeed, + describeBspNet, + type EnrichedBspApi, + ShConsts +} from "../../../util"; + +describeBspNet("BSPNet: Maintenance Mode Test", ({ before, it, createUserApi, createApi }) => { + let userApi: EnrichedBspApi; + let maintenanceBspApi: EnrichedBspApi; + + before(async () => { + userApi = await createUserApi(); + + // 1 block to maxthreshold (i.e. instant acceptance) + const tickToMaximumThresholdRuntimeParameter = { + RuntimeConfig: { + TickRangeToMaximumThreshold: [null, 1] + } + }; + await userApi.block.seal({ + calls: [ + userApi.tx.sudo.sudo( + userApi.tx.parameters.setParameter(tickToMaximumThresholdRuntimeParameter) + ) + ] + }); + }); + + it("BSP in maintenance mode does not execute actions after block imports", async () => { + // Onboard a BSP in maintenance mode + const { rpcPort: maintenanceBspRpcPort } = await userApi.docker.onboardBsp({ + bspSigner: bspTwoKey, + name: "sh-bsp-maintenance", + bspKeySeed: bspTwoSeed, + bspId: ShConsts.BSP_TWO_ID, + additionalArgs: ["--keystore-path=/keystore/bsp-two", "--maintenance-mode"], + waitForIdle: true + }); + + console.log("BSP in maintenance mode started"); + + // Connect to the BSP in maintenance mode + maintenanceBspApi = await createApi(`ws://127.0.0.1:${maintenanceBspRpcPort}`); + + // Issue a storage request. The maintenance mode BSP should not volunteer for it since it won't process the request + await userApi.file.createBucketAndSendNewStorageRequest( + "res/adolphus.jpg", + "cat/adolphus.jpg", + "maintenance-mode-test" + ); + + // Wait for the two BSPs to volunteer. This will throw since the maintenance mode BSP won't process the request + await userApi.assert + .extrinsicPresent({ + module: "fileSystem", + method: "bspVolunteer", + checkTxPool: true, + assertLength: 2, + timeout: 15000 + }) + .then( + (extrinsicArray) => { + console.log("Extrinsic array:", extrinsicArray); + throw new Error( + "Expected assertion to fail because maintenance mode BSP should not volunteer" + ); + }, + (error) => { + console.log( + "Expected error (maintenance BSP not volunteering):", + error instanceof Error ? error.message : String(error) + ); + } + ); + + // Verify that RPC calls still work on the maintenance BSP: + // This should work even in maintenance mode + const result = await maintenanceBspApi.rpc.storagehubclient.isFileInFileStorage( + "0x0000000000000000000000000000000000000000000000000000000000000000" + ); + + // The specific result doesn't matter - what matters is that the call worked and didn't throw + strictEqual(result !== undefined, true, "RPC calls should still work in maintenance mode"); + + // Disconnect the maintenance BSP + await userApi.docker.stopContainer("sh-bsp-maintenance"); + await maintenanceBspApi.disconnect(); + }); +});