Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: msp check previous block #337

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5c0e1f2
wip
undercover-cactus Jan 22, 2025
0bccfb6
needed file key value and some minors fixes
undercover-cactus Jan 23, 2025
d7c056a
added an integration test but it is failing
undercover-cactus Jan 29, 2025
3fdb907
failing integration test
undercover-cactus Feb 3, 2025
ee2c27b
fmt
undercover-cactus Feb 4, 2025
5b4461a
change name of the rpc function; incomplete integration test
undercover-cactus Feb 5, 2025
327be01
linter
undercover-cactus Feb 5, 2025
95adc49
typecheck
undercover-cactus Feb 5, 2025
0bd3cb2
remove sleep
undercover-cactus Feb 6, 2025
680bca7
update comment; change unresponded_ to pending_ function name;
undercover-cactus Feb 7, 2025
aceacad
convert Vec of storage requests to HashMap; minor changes;
undercover-cactus Feb 12, 2025
43a883d
use BTreeMap from the sp-std lib; remove sleep;
undercover-cactus Feb 13, 2025
5b6ecc4
typegen
undercover-cactus Feb 13, 2025
302a0ff
remove unused imports
undercover-cactus Feb 13, 2025
9dc8846
wip
undercover-cactus Feb 19, 2025
dff7e02
wip
undercover-cactus Feb 20, 2025
a09b3b2
add retry logic and complete integration test;
undercover-cactus Feb 21, 2025
d046d26
fmt
undercover-cactus Feb 21, 2025
6eb9df6
update integration tests
undercover-cactus Feb 21, 2025
8ca3749
created new utility function wait_for_num_blocks that wait for an amo…
undercover-cactus Feb 21, 2025
db47403
linter
undercover-cactus Feb 21, 2025
51039c4
feat: :construction: Trying to fix this, passing it to lola
ffarall Feb 26, 2025
b17af7f
fix test
undercover-cactus Feb 26, 2025
643be98
remove unused import
undercover-cactus Feb 26, 2025
b9e8149
make the tail value an option and default back to undefined
undercover-cactus Feb 27, 2025
0ed2221
remove only:true
undercover-cactus Feb 27, 2025
aefdf1a
still need sleep
undercover-cactus Feb 28, 2025
c202fcf
fix linter
undercover-cactus Feb 28, 2025
158bd85
Merge branch 'main' into feat/msp-check-previous-block
undercover-cactus Mar 3, 2025
8f714a7
qome fixes
undercover-cactus Mar 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 36 additions & 2 deletions client/blockchain-service/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use shc_common::{
blockchain_utils::{convert_raw_multiaddresses_to_multiaddr, get_events_at_block},
types::{
BlockNumber, EitherBucketOrBspId, Fingerprint, ParachainClient, StorageProviderId,
TickNumber, BCSV_KEY_TYPE,
StorageRequestMetadata, TickNumber, BCSV_KEY_TYPE,
},
};
use shp_file_metadata::FileKey;
Expand Down Expand Up @@ -1195,9 +1195,43 @@ where
// TODO: Send events to check that this node has a Forest Storage for the BSP that it manages.
// TODO: Catch up to Forest root writes in the BSP Forest.
}
Some(StorageProviderId::MainStorageProvider(_msp_id)) => {
Some(StorageProviderId::MainStorageProvider(msp_id)) => {
// TODO: Send events to check that this node has a Forest Storage for each Bucket this MSP manages.
// TODO: Catch up to Forest root writes in the Bucket's Forests.

info!(target: LOG_TARGET, "Checking for storage requests for this MSP");

let storage_requests: BTreeMap<H256, StorageRequestMetadata> = match self
.client
.runtime_api()
.pending_storage_requests_by_msp(block_hash, msp_id)
{
Ok(sr) => sr,
Err(_) => {
// If querying for pending storage requests fail, do not try to answer them
warn!(target: LOG_TARGET, "Failed to get pending storage request");
return;
}
};

info!(
"We have {} pending storage requests",
storage_requests.len()
);

// loop over each pending storage requests to start a new storage request task for the MSP
for (file_key, sr) in storage_requests {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe ignore since it's personal preference

Can you rename sr to storage_request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hard to differentiate storage_requests and storage_request when browsing the code. I realize that sr is not a good name but we can quickly understand what it is by looking at the loop. It is a trade off and I guess linked to programming habits.

Maybe to improve readability I could add a comment ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a comment is fine by me, btw.

self.emit(NewStorageRequest {
who: sr.owner,
file_key: file_key.into(),
bucket_id: sr.bucket_id,
location: sr.location,
fingerprint: Fingerprint::from(sr.fingerprint.as_bytes()),
size: sr.size,
user_peer_ids: sr.user_peer_ids,
expires_at: sr.expires_at,
})
}
}
None => {
warn!(target: LOG_TARGET, "No Provider ID found. This node is not managing a Provider.");
Expand Down
1 change: 1 addition & 0 deletions client/common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub type Balance = pallet_storage_providers::types::BalanceOf<Runtime>;
pub type OpaqueBlock = storage_hub_runtime::opaque::Block;
pub type BlockHash = <OpaqueBlock as BlockT>::Hash;
pub type PeerId = pallet_file_system::types::PeerId<Runtime>;
pub type StorageRequestMetadata = pallet_file_system::types::StorageRequestMetadata<Runtime>;

/// Type alias for the events vector.
///
Expand Down
11 changes: 2 additions & 9 deletions client/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,7 @@ use tokio::{fs, fs::create_dir_all, sync::RwLock};

use pallet_file_system_runtime_api::FileSystemApi as FileSystemRuntimeApi;
use pallet_proofs_dealer_runtime_api::ProofsDealerApi as ProofsDealerRuntimeApi;
use shc_common::{
consts::CURRENT_FOREST_KEY,
types::{
BackupStorageProviderId, BlockNumber, BucketId, ChunkId, CustomChallenge, FileMetadata,
ForestLeaf, HashT, KeyProof, KeyProofs, MainStorageProviderId, ProofsDealerProviderId,
Proven, RandomnessOutput, StorageProof, StorageProofsMerkleTrieLayout, BCSV_KEY_TYPE,
FILE_CHUNK_SIZE,
},
};
use shc_common::{consts::CURRENT_FOREST_KEY, types::*};
use shc_file_manager::traits::{ExcludeType, FileDataTrie, FileStorage, FileStorageError};
use shc_forest_manager::traits::{ForestStorage, ForestStorageHandler};
use sp_core::{sr25519::Pair as Sr25519Pair, Encode, Pair, H256};
Expand Down Expand Up @@ -253,6 +245,7 @@ where
BlockNumber,
ChunkId,
BucketId,
StorageRequestMetadata,
>,
FL: FileStorage<StorageProofsMerkleTrieLayout> + Send + Sync,
FSH: ForestStorageHandler + Send + Sync + 'static,
Expand Down
6 changes: 2 additions & 4 deletions node/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ use sc_consensus_manual_seal::{
};
use sc_rpc::DenyUnsafe;
use sc_transaction_pool_api::TransactionPool;
use shc_common::types::{
BackupStorageProviderId, BlockNumber, BucketId, ChunkId, CustomChallenge, ForestLeaf,
MainStorageProviderId, ProofsDealerProviderId, RandomnessOutput,
};
use shc_common::types::*;
use shc_forest_manager::traits::ForestStorageHandler;
use shc_rpc::{StorageHubClientApiServer, StorageHubClientRpc, StorageHubClientRpcConfig};
use sp_api::ProvideRuntimeApi;
Expand Down Expand Up @@ -73,6 +70,7 @@ where
BlockNumber,
ChunkId,
BucketId,
StorageRequestMetadata,
>,
P: TransactionPool + Send + Sync + 'static,
FL: FileStorageT,
Expand Down
3 changes: 3 additions & 0 deletions node/src/tasks/bsp_charge_fees.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@ where
.map_err(|e| anyhow!("Failed to get metadata from Forest: {:?}", e))?;

if !user_files.is_empty() {
// We only take the first file of the list in order to generate a proof submit it with an extrinsic and then release the lock to process the next file and generate the next proof.
// It is not ideal because it means one extrinsic per file but batch deletion is not yet implemented.
// TODO: Improve it once batch deletion is implemented.
let (file_key, metadata) = user_files.first().expect("User files is not empty");
let bucket_id = H256::from_slice(metadata.bucket_id.as_ref());
let location = sp_runtime::BoundedVec::truncate_from(metadata.location.clone());
Expand Down
44 changes: 40 additions & 4 deletions node/src/tasks/user_sends_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ where
{
warn!(
target: LOG_TARGET,
"Batch upload rejected by peer {:?}, retrying... (attempt {})",
"Final batch upload rejected by peer {:?}, retrying... (attempt {})",
peer_id,
retry_attempts + 1
);
Expand All @@ -318,7 +318,25 @@ where
// Wait for a short time before retrying
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Err(RequestError::RequestFailure(RequestFailure::Refused)) => {
Err(RequestError::RequestFailure(RequestFailure::Network(_)))
if retry_attempts < 10 =>
{
warn!(
target: LOG_TARGET,
"Unable to upload final batch to peer {:?}, retrying... (attempt {})",
peer_id,
retry_attempts + 1
);
retry_attempts += 1;

// Wait a bit for the MSP to be online
self.storage_hub_handler
.blockchain
.wait_for_block(10)
.await?;
}
Err(RequestError::RequestFailure(RequestFailure::Refused))
| Err(RequestError::RequestFailure(RequestFailure::Network(_))) => {
// Return an error if the provider refused to answer.
return Err(anyhow::anyhow!("Failed to send file {:?}", file_key));
}
Expand Down Expand Up @@ -378,7 +396,7 @@ where
.upload_request(peer_id, file_key.as_ref().into(), proof.clone(), None)
.await;

match upload_response {
match upload_response.as_ref() {
Ok(r) => {
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -411,7 +429,25 @@ where
// Wait for a short time before retrying
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to standardize the retrying logic, should we also wait for one block here instead of one second ? Because Refused happened if the request hasn't been yet processed.

And because of latency and block propagation time, one second could be too short in a real network.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this flow independent from the blockchain. The user here is trying to send data to a peer (provider).

}
Err(RequestError::RequestFailure(RequestFailure::Refused)) => {
Err(RequestError::RequestFailure(RequestFailure::Network(_)))
if retry_attempts < 10 =>
{
warn!(
target: LOG_TARGET,
"Unable to upload final batch to peer {:?}, retrying... (attempt {})",
peer_id,
retry_attempts + 1
);
retry_attempts += 1;

// Wait a bit for the MSP to be online
self.storage_hub_handler
.blockchain
.wait_for_block(10)
.await?;
}
Err(RequestError::RequestFailure(RequestFailure::Refused))
| Err(RequestError::RequestFailure(RequestFailure::Network(_))) => {
// Return an error if the provider refused to answer.
return Err(anyhow::anyhow!("Failed to send file {:?}", file_key));
}
Expand Down
4 changes: 3 additions & 1 deletion pallets/file-system/runtime-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ targets = ["x86_64-unknown-linux-gnu"]
codec = { workspace = true, features = ["derive"] }
scale-info = { workspace = true }
sp-api = { workspace = true }
sp-core = { workspace = true }
sp-runtime = { workspace = true }
sp-std = { workspace = true }

[features]
default = ["std"]
std = ["codec/std", "sp-api/std", "sp-runtime/std"]
std = ["codec/std", "sp-api/std", "sp-runtime/std", "sp-std/std"]
12 changes: 11 additions & 1 deletion pallets/file-system/runtime-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
use codec::{Codec, Decode, Encode};
use scale_info::prelude::vec::Vec;
use scale_info::TypeInfo;
use sp_core::H256;
use sp_runtime::RuntimeDebug;
use sp_std::collections::btree_map::BTreeMap;

/// Error type for the `is_storage_request_open_to_volunteers` runtime API call.
#[derive(Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
Expand Down Expand Up @@ -51,21 +53,29 @@ pub enum GenericApplyDeltaEventInfoError {
DecodeError,
}

/// Error type for the `pending_storage_requests_by_msp`.
#[derive(Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
pub enum StorageRequestsByMspError {
FailedToRetrieveStorageRequests,
}

sp_api::decl_runtime_apis! {
#[api_version(1)]
pub trait FileSystemApi<BackupStorageProviderId, MainStorageProviderId, FileKey, TickNumber, ChunkId, GenericApplyDeltaEventInfo>
pub trait FileSystemApi<BackupStorageProviderId, MainStorageProviderId, FileKey, TickNumber, ChunkId, GenericApplyDeltaEventInfo, StorageRequestMetadata>
where
BackupStorageProviderId: Codec,
MainStorageProviderId: Codec,
FileKey: Codec,
TickNumber: Codec,
ChunkId: Codec,
GenericApplyDeltaEventInfo: Codec,
StorageRequestMetadata: Codec,
{
fn is_storage_request_open_to_volunteers(file_key: FileKey) -> Result<bool, IsStorageRequestOpenToVolunteersError>;
fn query_earliest_file_volunteer_tick(bsp_id: BackupStorageProviderId, file_key: FileKey) -> Result<TickNumber, QueryFileEarliestVolunteerTickError>;
fn query_bsp_confirm_chunks_to_prove_for_file(bsp_id: BackupStorageProviderId, file_key: FileKey) -> Result<Vec<ChunkId>, QueryBspConfirmChunksToProveForFileError>;
fn query_msp_confirm_chunks_to_prove_for_file(msp_id: MainStorageProviderId, file_key: FileKey) -> Result<Vec<ChunkId>, QueryMspConfirmChunksToProveForFileError>;
fn decode_generic_apply_delta_event_info(encoded_event_info: Vec<u8>) -> Result<GenericApplyDeltaEventInfo, GenericApplyDeltaEventInfoError>;
fn pending_storage_requests_by_msp(msp_id: MainStorageProviderId) -> BTreeMap<H256, StorageRequestMetadata>;
}
}
17 changes: 17 additions & 0 deletions pallets/file-system/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
ReadBucketsInterface, ReadProvidersInterface, ReadStorageProvidersInterface,
ReadUserSolvencyInterface, TrieAddMutation, TrieRemoveMutation,
};
use sp_std::collections::btree_map::BTreeMap;

use crate::{
pallet,
Expand Down Expand Up @@ -71,7 +72,7 @@
}};
// Handle boolean type
($condition:expr, $error_msg:expr, $error_type:path, bool) => {{
if !$condition {

Check warning on line 75 in pallets/file-system/src/utils.rs

View workflow job for this annotation

GitHub Actions / Check lint with clippy

the use of negated comparison operators on partially ordered types produces code that is hard to read and refactor, please consider using the `partial_cmp` method instead, to make it clear that the two values could be incomparable
#[cfg(test)]
unreachable!($error_msg);

Expand Down Expand Up @@ -2817,6 +2818,22 @@
) -> Result<BucketIdFor<T>, codec::Error> {
BucketIdFor::<T>::decode(&mut encoded_event_info.as_ref())
}

pub fn pending_storage_requests_by_msp(
msp_id: ProviderIdFor<T>,
) -> BTreeMap<MerkleHash<T>, StorageRequestMetadata<T>> {
// Get the storage requests for a specific MSP
StorageRequests::<T>::iter()
.filter(|(_, metadata)| {
if let Some(msp) = metadata.msp {
msp.0 == msp_id && !msp.1
} else {
false
}
})
.map(|(file_key, metadata)| (file_key, metadata))
.collect()
}
}

mod hooks {
Expand Down
11 changes: 8 additions & 3 deletions runtime/src/apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use frame_support::{
weights::Weight,
};
use pallet_aura::Authorities;
use pallet_file_system::types::StorageRequestMetadata;
use pallet_file_system_runtime_api::*;
use pallet_payment_streams_runtime_api::*;
use pallet_proofs_dealer::types::{
Expand All @@ -24,7 +25,7 @@ use sp_runtime::{
transaction_validity::{TransactionSource, TransactionValidity},
ApplyExtrinsicResult, ExtrinsicInclusionMode,
};
use sp_std::prelude::Vec;
use sp_std::{collections::btree_map::BTreeMap, prelude::Vec};
use sp_version::RuntimeVersion;
use xcm::{
latest::prelude::AssetId, VersionedAssetId, VersionedAssets, VersionedLocation, VersionedXcm,
Expand Down Expand Up @@ -328,7 +329,7 @@ impl_runtime_apis! {
}
}

impl pallet_file_system_runtime_api::FileSystemApi<Block, BackupStorageProviderId<Runtime>, MainStorageProviderId<Runtime>, H256, BlockNumber, ChunkId, BucketId<Runtime>> for Runtime {
impl pallet_file_system_runtime_api::FileSystemApi<Block, BackupStorageProviderId<Runtime>, MainStorageProviderId<Runtime>, H256, BlockNumber, ChunkId, BucketId<Runtime>, StorageRequestMetadata<Runtime>> for Runtime {
fn is_storage_request_open_to_volunteers(file_key: H256) -> Result<bool, IsStorageRequestOpenToVolunteersError> {
FileSystem::is_storage_request_open_to_volunteers(file_key)
}
Expand All @@ -345,9 +346,13 @@ impl_runtime_apis! {
FileSystem::query_msp_confirm_chunks_to_prove_for_file(msp_id, file_key)
}

fn decode_generic_apply_delta_event_info(encoded_event_info: Vec<u8>) -> Result<BucketId<Runtime>, GenericApplyDeltaEventInfoError> {
fn decode_generic_apply_delta_event_info(encoded_event_info: Vec<u8>) -> Result<BucketId<Runtime>, GenericApplyDeltaEventInfoError> {
FileSystem::decode_generic_apply_delta_event_info(encoded_event_info)
}

fn pending_storage_requests_by_msp(msp_id: MainStorageProviderId<Runtime>) -> BTreeMap<H256, StorageRequestMetadata<Runtime>> {
FileSystem::pending_storage_requests_by_msp(msp_id)
}
}

impl pallet_payment_streams_runtime_api::PaymentStreamsApi<Block, ProviderIdFor<Runtime>, Balance, AccountId> for Runtime {
Expand Down
Loading
Loading