Skip to content

Commit

Permalink
Fix MSP move bucket and BSP download whitelist update
Browse files Browse the repository at this point in the history
  • Loading branch information
links234 committed Mar 10, 2025
1 parent 2db6e97 commit 7cfa2b9
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 34 deletions.
46 changes: 13 additions & 33 deletions client/blockchain-service/src/handler_bsp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use storage_hub_runtime::RuntimeEvent;

use crate::{
events::{
BspConfirmStoppedStoring, FinalisedBspConfirmStoppedStoring, FinalisedBucketMovedAway,
BspConfirmStoppedStoring, FinalisedBspConfirmStoppedStoring,
FinalisedTrieRemoveMutationsApplied, ForestWriteLockTaskData, MoveBucketAccepted,
MoveBucketExpired, MoveBucketRejected, MoveBucketRequested, MultipleNewChallengeSeeds,
ProcessConfirmStoringRequest, ProcessConfirmStoringRequestData,
Expand Down Expand Up @@ -139,6 +139,18 @@ where
});
}
}
RuntimeEvent::FileSystem(pallet_file_system::Event::MoveBucketRequested {
who: _,
bucket_id,
new_msp_id,
new_value_prop_id: _,
}) => {
// As a BSP, this node is interested in the event to allow the new MSP to request files from it.
self.emit(MoveBucketRequested {
bucket_id,
new_msp_id,
});
}
// Ignore all other events.
_ => {}
}
Expand Down Expand Up @@ -185,38 +197,6 @@ where
});
}
}
RuntimeEvent::FileSystem(pallet_file_system::Event::MoveBucketRequested {
who: _,
bucket_id,
new_msp_id,
new_value_prop_id: _,
}) => {
// As a BSP, this node is interested in the event to allow the new MSP to request files from it.
self.emit(MoveBucketRequested {
bucket_id,
new_msp_id,
});
}
RuntimeEvent::FileSystem(pallet_file_system::Event::MoveBucketAccepted {
bucket_id,
old_msp_id,
new_msp_id,
value_prop_id: _,
}) => {
// This event is relevant in case the Provider managed is the old MSP,
// in which case we should clean up the bucket.
// Note: we do this in finality to ensure we don't lose data in case
// of a reorg.
if let Some(old_msp_id) = old_msp_id {
if managed_bsp_id == &old_msp_id {
self.emit(FinalisedBucketMovedAway {
bucket_id,
old_msp_id,
new_msp_id,
});
}
}
}
// Ignore all other events.
_ => {}
}
Expand Down
1 change: 1 addition & 0 deletions client/file-transfer-service/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ impl Actor for FileTransferService {
bucket_id,
callback,
} => {
info!(target: LOG_TARGET, "Registering new bucket peer {:?} for bucket {:?}", peer_id, bucket_id);
let result = match self.peer_bucket_allow_list.insert((peer_id, bucket_id)) {
true => Ok(()),
false => Err(RequestError::BucketAlreadyRegisteredForPeer),
Expand Down
50 changes: 49 additions & 1 deletion node/src/tasks/msp_move_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use shc_common::types::{
BucketId, HashT, ProviderId, StorageProofsMerkleTrieLayout, StorageProviderId,
};
use shc_file_manager::traits::FileStorage;
use shc_forest_manager::traits::ForestStorageHandler;
use shc_forest_manager::traits::{ForestStorage, ForestStorageHandler};

use crate::services::{
handler::StorageHubHandler,
Expand Down Expand Up @@ -109,6 +109,14 @@ where
event.bucket_id
);

// Important: Add a delay after receiving the on-chain confirmation
// This gives the BSPs time to process the chain event and prepare to serve files
info!(
target: LOG_TARGET,
"Waiting for BSPs to be ready to serve files for bucket {:?}", event.bucket_id
);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;

// Get all files for this bucket from the indexer
let indexer_db_pool =
if let Some(indexer_db_pool) = self.storage_hub_handler.indexer_db_pool.clone() {
Expand Down Expand Up @@ -157,6 +165,11 @@ where
let file_download_manager = &self.storage_hub_handler.file_download_manager;
let file_transfer_service = self.storage_hub_handler.file_transfer.clone();

info!(
target: LOG_TARGET,
"Starting new download of bucket {:?}", event.bucket_id
);

file_download_manager
.download_bucket(
event.bucket_id,
Expand Down Expand Up @@ -222,6 +235,14 @@ where
return Ok(());
}

let bucket = event.bucket_id.as_ref().to_vec();

let forest_storage = self
.storage_hub_handler
.forest_storage_handler
.get_or_create(&bucket)
.await;

// Calculate total size to check capacity
let total_size: u64 = files
.iter()
Expand Down Expand Up @@ -261,6 +282,33 @@ where

let file_key = file_metadata.file_key::<HashT<StorageProofsMerkleTrieLayout>>();

self.storage_hub_handler
.file_storage
.write()
.await
.insert_file(file_key, file_metadata.clone())
.map_err(|error| {
anyhow!(
"CRITICAL ❗️❗️❗️: Failed to insert file {:?} into file storage: {:?}",
file_key,
error
)
})?;

self.file_storage_inserted_file_keys.push(file_key);

forest_storage
.write()
.await
.insert_files_metadata(&[file_metadata.clone()])
.map_err(|error| {
anyhow!(
"CRITICAL ❗️❗️❗️: Failed to insert file {:?} into forest storage: {:?}",
file_key,
error
)
})?;

// Register the BSP peers with the peer manager for this file
let bsp_peer_ids = file.get_bsp_peer_ids(&mut indexer_connection).await?;
if bsp_peer_ids.is_empty() {
Expand Down

0 comments on commit 7cfa2b9

Please sign in to comment.