Skip to content

Commit

Permalink
feat: ✨ MSP stop storing buckets for insolvent user task (#366)
Browse files Browse the repository at this point in the history
* refactor: ⚡ update move bucket requests for clarity and storage efficiency

* chore: 🏷️ run typegen

* fix: 🐛 remove `previous_msp_id` from move bucket request metadata

This change is made since a bucket status (such as its MSP) could change between the request was made and the new MSP responds, so it's not reliable

* feat: 🚧 initial impl of stop storing bucket for insolvent user

* chore: 🎨 run cargo fmt

* fix: 🚨 update file system benchmark

* fix: 🩹 update `get_provider_id` call

* chore: 🏷️ run typegen

* test: ✅ add tests for `msp_stop_storing_bucket_for_insolvent_user`

* test: ✅ fix testing

* chore: 🏷️ run typegen

* fix: 🐛 add solvency check when issuing storage requests

Added a check to make sure the payment stream between the MSP and the user exists when the user issues a new storage request. This is so we don't allow users that just stopped being insolvent to issue storage requests for buckets that the MSP hasn't deleted yet

* chore: 🏷️ run typegen

* feat: ✨ add task for MSP to stop storing buckets of insolvent user

* fix: 🐛 avoid the task processing multiple times the same bucket

* fix: 🐛 get only buckets stored by the current MSP (instead of all)

* perf: ⚡ remove unneeded fetch from indexer db

* feat: 📈 benchmark file system

* perf: ⚡ drop file write lock asap

* test: ✅ add integration test for MSP stop storing bucket for insolvent user

* chore: 🏷️ Update api-augment after merge

* fix: 🚨 Fix compiler errors in test after merge

* refactor: ♻️ Rename `MoveBucketRequestedForNewMsp` -> `MoveBucketRequestedForMsp`

* fix: 🩹 fix after merge from main

* chore: 🏷️ Update `api-augment` after merge

* fix: 🩹 Fix typo

* fix: 🩹 minor fixes after merge from main

* docs: 💡 Minor improvement of doc comment of task

* impl query_buckets_for_insolvent_user runtime api in favor of querying index db for buckets stored by insolvent users

* remove `get_by_msp_id_and_owner` indexer db function. remove `ProcessStopStoringForInsolventUserRequest` event processing from `MspStopStoringInsolventUserTask`

* refactor: 🔥 remove now unneeded handling of `MspStopStoringBucketInsolventUser` event

* refactor: ⚡ make stop storing buckets for insolvent user task parallelizable

* test: ✅ fix integration test after changes

* test: 🔥 remove `only` from MSP stop storing bucket for insolvent user integration test

* Update node/src/tasks/msp_stop_storing_insolvent_user.rs

Co-authored-by: Facundo Farall <37149322+ffarall@users.noreply.github.com>

* chore: 🏷️ run typegen

---------

Co-authored-by: Facundo Farall <37149322+ffarall@users.noreply.github.com>
Co-authored-by: Michael Assaf <michael.assaf.edge@gmail.com>
Co-authored-by: Michael Assaf <94772640+snowmead@users.noreply.github.com>
  • Loading branch information
4 people authored Feb 27, 2025
1 parent bfc03d2 commit 832b228
Show file tree
Hide file tree
Showing 25 changed files with 1,050 additions and 22 deletions.
11 changes: 11 additions & 0 deletions api-augment/dist/types/interfaces/augment-api-runtime.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import type {
QueryAvailableStorageCapacityError,
QueryBspConfirmChunksToProveForFileError,
QueryBucketsForMspError,
QueryBucketsOfUserStoredByMspError,
QueryEarliestChangeCapacityBlockError,
QueryFileEarliestVolunteerBlockError,
QueryMspConfirmChunksToProveForFileError,
Expand Down Expand Up @@ -689,6 +690,16 @@ declare module "@polkadot/api-base/types/calls" {
mspId: MainStorageProviderId | string | Uint8Array
) => Observable<Result<Vec<BucketId>, QueryBucketsForMspError>>
>;
/**
* Query the buckets stored by an MSP that belong to a specific user.
**/
queryBucketsOfUserStoredByMsp: AugmentedCall<
ApiType,
(
mspId: ProviderId | string | Uint8Array,
user: AccountId | string | Uint8Array
) => Observable<Result<Vec<H256>, QueryBucketsOfUserStoredByMspError>>
>;
/**
* Query the earliest block number that a BSP can change its capacity.
**/
Expand Down
2 changes: 2 additions & 0 deletions api-augment/dist/types/interfaces/augment-types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1344,6 +1344,7 @@ import type {
QueryAvailableStorageCapacityError,
QueryBspConfirmChunksToProveForFileError,
QueryBucketsForMspError,
QueryBucketsOfUserStoredByMspError,
QueryConfirmChunksToProveForFileError,
QueryEarliestChangeCapacityBlockError,
QueryFileEarliestVolunteerBlockError,
Expand Down Expand Up @@ -2219,6 +2220,7 @@ declare module "@polkadot/types/types/registry" {
QueryAvailableStorageCapacityError: QueryAvailableStorageCapacityError;
QueryBspConfirmChunksToProveForFileError: QueryBspConfirmChunksToProveForFileError;
QueryBucketsForMspError: QueryBucketsForMspError;
QueryBucketsOfUserStoredByMspError: QueryBucketsOfUserStoredByMspError;
QueryConfirmChunksToProveForFileError: QueryConfirmChunksToProveForFileError;
QueryEarliestChangeCapacityBlockError: QueryEarliestChangeCapacityBlockError;
QueryFileEarliestVolunteerBlockError: QueryFileEarliestVolunteerBlockError;
Expand Down
6 changes: 6 additions & 0 deletions api-augment/dist/types/interfaces/storagehubclient/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ export interface QueryBucketsForMspError extends Enum {
readonly isInternalError: boolean;
readonly type: "ProviderNotRegistered" | "InternalError";
}
/** @name QueryBucketsOfUserStoredByMspError */
export interface QueryBucketsOfUserStoredByMspError extends Enum {
readonly isNotAnMsp: boolean;
readonly isInternalError: boolean;
readonly type: "NotAnMsp" | "InternalError";
}
/** @name QueryConfirmChunksToProveForFileError */
export interface QueryConfirmChunksToProveForFileError extends Enum {
readonly isChallengedChunkToChunkIdError: boolean;
Expand Down
11 changes: 11 additions & 0 deletions api-augment/src/interfaces/augment-api-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import type {
QueryAvailableStorageCapacityError,
QueryBspConfirmChunksToProveForFileError,
QueryBucketsForMspError,
QueryBucketsOfUserStoredByMspError,
QueryEarliestChangeCapacityBlockError,
QueryFileEarliestVolunteerBlockError,
QueryMspConfirmChunksToProveForFileError,
Expand Down Expand Up @@ -639,6 +640,16 @@ declare module "@polkadot/api-base/types/calls" {
mspId: MainStorageProviderId | string | Uint8Array
) => Observable<Result<Vec<BucketId>, QueryBucketsForMspError>>
>;
/**
* Query the buckets stored by an MSP that belong to a specific user.
**/
queryBucketsOfUserStoredByMsp: AugmentedCall<
ApiType,
(
mspId: ProviderId | string | Uint8Array,
user: AccountId | string | Uint8Array
) => Observable<Result<Vec<H256>, QueryBucketsOfUserStoredByMspError>>
>;
/**
* Query the earliest block number that a BSP can change its capacity.
**/
Expand Down
2 changes: 2 additions & 0 deletions api-augment/src/interfaces/augment-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1350,6 +1350,7 @@ import type {
QueryAvailableStorageCapacityError,
QueryBspConfirmChunksToProveForFileError,
QueryBucketsForMspError,
QueryBucketsOfUserStoredByMspError,
QueryConfirmChunksToProveForFileError,
QueryEarliestChangeCapacityBlockError,
QueryFileEarliestVolunteerBlockError,
Expand Down Expand Up @@ -2226,6 +2227,7 @@ declare module "@polkadot/types/types/registry" {
QueryAvailableStorageCapacityError: QueryAvailableStorageCapacityError;
QueryBspConfirmChunksToProveForFileError: QueryBspConfirmChunksToProveForFileError;
QueryBucketsForMspError: QueryBucketsForMspError;
QueryBucketsOfUserStoredByMspError: QueryBucketsOfUserStoredByMspError;
QueryConfirmChunksToProveForFileError: QueryConfirmChunksToProveForFileError;
QueryEarliestChangeCapacityBlockError: QueryEarliestChangeCapacityBlockError;
QueryFileEarliestVolunteerBlockError: QueryFileEarliestVolunteerBlockError;
Expand Down
7 changes: 7 additions & 0 deletions api-augment/src/interfaces/storagehubclient/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ export interface QueryBucketsForMspError extends Enum {
readonly type: "ProviderNotRegistered" | "InternalError";
}

/** @name QueryBucketsOfUserStoredByMspError */
export interface QueryBucketsOfUserStoredByMspError extends Enum {
readonly isNotAnMsp: boolean;
readonly isInternalError: boolean;
readonly type: "NotAnMsp" | "InternalError";
}

/** @name QueryConfirmChunksToProveForFileError */
export interface QueryConfirmChunksToProveForFileError extends Enum {
readonly isChallengedChunkToChunkIdError: boolean;
Expand Down
34 changes: 32 additions & 2 deletions client/blockchain-service/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use pallet_proofs_dealer_runtime_api::{
GetChallengePeriodError, GetCheckpointChallengesError, GetProofSubmissionRecordError,
};
use pallet_storage_providers_runtime_api::{
GetBspInfoError, QueryAvailableStorageCapacityError, QueryEarliestChangeCapacityBlockError,
QueryMspIdOfBucketIdError, QueryProviderMultiaddressesError, QueryStorageProviderCapacityError,
GetBspInfoError, QueryAvailableStorageCapacityError, QueryBucketsOfUserStoredByMspError,
QueryEarliestChangeCapacityBlockError, QueryMspIdOfBucketIdError,
QueryProviderMultiaddressesError, QueryStorageProviderCapacityError,
};
use shc_actors_framework::actor::ActorHandle;
use shc_common::types::{
Expand Down Expand Up @@ -201,6 +202,12 @@ pub enum BlockchainServiceCommand {
request: FileDeletionRequest,
callback: tokio::sync::oneshot::Sender<Result<()>>,
},
QueryBucketsOfUserStoredByMsp {
msp_id: ProviderId,
user: AccountId,
callback:
tokio::sync::oneshot::Sender<Result<Vec<BucketId>, QueryBucketsOfUserStoredByMspError>>,
},
}

/// Interface for interacting with the BlockchainService actor.
Expand Down Expand Up @@ -395,6 +402,13 @@ pub trait BlockchainServiceInterface {
&self,
forest_root_write_tx: tokio::sync::oneshot::Sender<()>,
) -> Result<()>;

/// Helper function to query all the buckets stored by an MSP that belong to a specific user.
async fn query_buckets_of_user_stored_by_msp(
&self,
msp_id: ProviderId,
user: AccountId,
) -> Result<Vec<BucketId>, QueryBucketsOfUserStoredByMspError>;
}

/// Implement the BlockchainServiceInterface for the ActorHandle<BlockchainService>.
Expand Down Expand Up @@ -903,4 +917,20 @@ where
self.send(message).await;
rx.await.expect("Failed to receive response from BlockchainService. Probably means BlockchainService has crashed.")
}

/// Helper function to query all the buckets stored by an MSP that belong to a specific user.
async fn query_buckets_of_user_stored_by_msp(
&self,
msp_id: ProviderId,
user: AccountId,
) -> Result<Vec<BucketId>, QueryBucketsOfUserStoredByMspError> {
let (callback, rx) = tokio::sync::oneshot::channel();
let message = BlockchainServiceCommand::QueryBucketsOfUserStoredByMsp {
msp_id,
user,
callback,
};
self.send(message).await;
rx.await.expect("Failed to receive response from BlockchainService. Probably means BlockchainService has crashed.")
}
}
23 changes: 23 additions & 0 deletions client/blockchain-service/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,18 @@ pub struct SpStopStoringInsolventUser {
}
impl EventBusMessage for SpStopStoringInsolventUser {}

/// A MSP stopped storing a bucket for an insolvent user event was finalised.
///
/// This event is emitted when the relay chain block to which a block in which a MSP stopped storing a bucket
/// for an insolvent user event is anchored has been finalised.
#[derive(Debug, Clone)]
pub struct FinalisedMspStopStoringBucketInsolventUser {
pub msp_id: ProofsDealerProviderId,
pub bucket_id: BucketId,
}

impl EventBusMessage for FinalisedMspStopStoringBucketInsolventUser {}

/// A user has requested to move one of its bucket to a new MSP.
///
/// This event is emitted so the BSP can allow the new MSP to download the files from the bucket.
Expand Down Expand Up @@ -462,6 +474,8 @@ pub struct BlockchainServiceEventBusProvider {
last_chargeable_info_updated_event_bus: EventBus<LastChargeableInfoUpdated>,
user_without_funds_event_bus: EventBus<UserWithoutFunds>,
sp_stop_storing_insolvent_user_event_bus: EventBus<SpStopStoringInsolventUser>,
finalised_msp_stop_storing_bucket_insolvent_user_event_bus:
EventBus<FinalisedMspStopStoringBucketInsolventUser>,
finalised_msp_stopped_storing_bucket_event_bus: EventBus<FinalisedMspStoppedStoringBucket>,
move_bucket_requested_event_bus: EventBus<MoveBucketRequested>,
move_bucket_rejected_event_bus: EventBus<MoveBucketRejected>,
Expand Down Expand Up @@ -496,6 +510,7 @@ impl BlockchainServiceEventBusProvider {
last_chargeable_info_updated_event_bus: EventBus::new(),
user_without_funds_event_bus: EventBus::new(),
sp_stop_storing_insolvent_user_event_bus: EventBus::new(),
finalised_msp_stop_storing_bucket_insolvent_user_event_bus: EventBus::new(),
finalised_msp_stopped_storing_bucket_event_bus: EventBus::new(),
move_bucket_requested_event_bus: EventBus::new(),
move_bucket_rejected_event_bus: EventBus::new(),
Expand Down Expand Up @@ -599,6 +614,14 @@ impl ProvidesEventBus<SpStopStoringInsolventUser> for BlockchainServiceEventBusP
}
}

impl ProvidesEventBus<FinalisedMspStopStoringBucketInsolventUser>
for BlockchainServiceEventBusProvider
{
fn event_bus(&self) -> &EventBus<FinalisedMspStopStoringBucketInsolventUser> {
&self.finalised_msp_stop_storing_bucket_insolvent_user_event_bus
}
}

impl ProvidesEventBus<FinalisedMspStoppedStoringBucket> for BlockchainServiceEventBusProvider {
fn event_bus(&self) -> &EventBus<FinalisedMspStoppedStoringBucket> {
&self.finalised_msp_stopped_storing_bucket_event_bus
Expand Down
56 changes: 47 additions & 9 deletions client/blockchain-service/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ use pallet_proofs_dealer_runtime_api::{
ProofsDealerApi,
};
use pallet_storage_providers_runtime_api::{
GetBspInfoError, QueryAvailableStorageCapacityError, QueryEarliestChangeCapacityBlockError,
QueryMspIdOfBucketIdError, QueryProviderMultiaddressesError, QueryStorageProviderCapacityError,
StorageProvidersApi,
GetBspInfoError, QueryAvailableStorageCapacityError, QueryBucketsOfUserStoredByMspError,
QueryEarliestChangeCapacityBlockError, QueryMspIdOfBucketIdError,
QueryProviderMultiaddressesError, QueryStorageProviderCapacityError, StorageProvidersApi,
};
use shc_actors_framework::actor::{Actor, ActorEventLoop};
use shc_common::{
Expand All @@ -52,11 +52,11 @@ use crate::{
events::{
AcceptedBspVolunteer, BlockchainServiceEventBusProvider, BspConfirmStoppedStoring,
FileDeletionRequest, FinalisedBspConfirmStoppedStoring, FinalisedBucketMovedAway,
FinalisedMspStoppedStoringBucket, FinalisedProofSubmittedForPendingFileDeletionRequest,
FinalisedTrieRemoveMutationsApplied, LastChargeableInfoUpdated, MoveBucketAccepted,
MoveBucketExpired, MoveBucketRejected, MoveBucketRequested, MoveBucketRequestedForMsp,
NewStorageRequest, SlashableProvider, SpStopStoringInsolventUser, StartMovedBucketDownload,
UserWithoutFunds,
FinalisedMspStopStoringBucketInsolventUser, FinalisedMspStoppedStoringBucket,
FinalisedProofSubmittedForPendingFileDeletionRequest, FinalisedTrieRemoveMutationsApplied,
LastChargeableInfoUpdated, MoveBucketAccepted, MoveBucketExpired, MoveBucketRejected,
MoveBucketRequested, MoveBucketRequestedForMsp, NewStorageRequest, SlashableProvider,
SpStopStoringInsolventUser, StartMovedBucketDownload, UserWithoutFunds,
},
state::{
BlockchainServiceStateStore, LastProcessedBlockNumberCf,
Expand Down Expand Up @@ -986,6 +986,29 @@ where
}
}
}
BlockchainServiceCommand::QueryBucketsOfUserStoredByMsp {
msp_id,
user,
callback,
} => {
let current_block_hash = self.client.info().best_hash;

let buckets = self
.client
.runtime_api()
.query_buckets_of_user_stored_by_msp(current_block_hash, &msp_id, &user)
.unwrap_or_else(|e| {
error!(target: LOG_TARGET, "{}", e);
Err(QueryBucketsOfUserStoredByMspError::InternalError)
});

match callback.send(buckets) {
Ok(_) => {}
Err(e) => {
error!(target: LOG_TARGET, "Failed to send back buckets: {:?}", e);
}
}
}
BlockchainServiceCommand::ReleaseForestRootWriteLock {
forest_root_write_tx,
callback,
Expand All @@ -1007,7 +1030,7 @@ where
match callback.send(forest_root_write_result) {
Ok(_) => {}
Err(e) => {
error!(target: LOG_TARGET, "Failed to send forest write lock release result: {:?}", e);
error!(target: LOG_TARGET, "Failed to send back forest root write result: {:?}", e);
}
}
}
Expand Down Expand Up @@ -1627,6 +1650,21 @@ where
}
}
}
RuntimeEvent::FileSystem(pallet_file_system::Event::MspStopStoringBucketInsolventUser {
msp_id,
owner: _,
bucket_id
}) => {
// This event is relevant in case the Provider managed is the MSP of the event.
if let Some(StorageProviderId::MainStorageProvider(managed_msp_id)) = &self.provider_id {
if msp_id == *managed_msp_id {
self.emit(FinalisedMspStopStoringBucketInsolventUser {
msp_id,
bucket_id
})
}
}
}
RuntimeEvent::FileSystem(
pallet_file_system::Event::BspConfirmStoppedStoring {
bsp_id,
Expand Down
40 changes: 33 additions & 7 deletions node/src/services/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ use shc_actors_framework::{
use shc_blockchain_service::{
events::{
AcceptedBspVolunteer, FileDeletionRequest, FinalisedBspConfirmStoppedStoring,
FinalisedBucketMovedAway, FinalisedMspStoppedStoringBucket,
FinalisedProofSubmittedForPendingFileDeletionRequest, LastChargeableInfoUpdated,
MoveBucketAccepted, MoveBucketExpired, MoveBucketRejected, MoveBucketRequested,
MoveBucketRequestedForMsp, MultipleNewChallengeSeeds, NewStorageRequest, NotifyPeriod,
ProcessConfirmStoringRequest, ProcessFileDeletionRequest, ProcessMspRespondStoringRequest,
ProcessStopStoringForInsolventUserRequest, ProcessSubmitProofRequest, SlashableProvider,
SpStopStoringInsolventUser, StartMovedBucketDownload, UserWithoutFunds,
FinalisedBucketMovedAway, FinalisedMspStopStoringBucketInsolventUser,
FinalisedMspStoppedStoringBucket, FinalisedProofSubmittedForPendingFileDeletionRequest,
LastChargeableInfoUpdated, MoveBucketAccepted, MoveBucketExpired, MoveBucketRejected,
MoveBucketRequested, MoveBucketRequestedForMsp, MultipleNewChallengeSeeds,
NewStorageRequest, NotifyPeriod, ProcessConfirmStoringRequest, ProcessFileDeletionRequest,
ProcessMspRespondStoringRequest, ProcessStopStoringForInsolventUserRequest,
ProcessSubmitProofRequest, SlashableProvider, SpStopStoringInsolventUser,
StartMovedBucketDownload, UserWithoutFunds,
},
BlockchainService,
};
Expand All @@ -38,6 +39,7 @@ use crate::{
bsp_submit_proof::BspSubmitProofTask, bsp_upload_file::BspUploadFileTask,
msp_charge_fees::MspChargeFeesTask, msp_delete_bucket::MspDeleteBucketTask,
msp_delete_file::MspDeleteFileTask, msp_move_bucket::MspRespondMoveBucketTask,
msp_stop_storing_insolvent_user::MspStopStoringInsolventUserTask,
msp_upload_file::MspUploadFileTask, sp_slash_provider::SlashProviderTask,
user_sends_file::UserSendsFileTask,
},
Expand Down Expand Up @@ -290,6 +292,30 @@ where

let msp_charge_fees_task = MspChargeFeesTask::new(self.clone());

// MspStopStoringInsolventUserTask handles events for deleting buckets owned by users that have become insolvent.
let msp_stop_storing_insolvent_user = MspStopStoringInsolventUserTask::new(self.clone());

// Subscribing to UserInsolvent event from the BlockchainService to delete all stored buckets owned by a
// user that has been declared as without funds.
let user_without_funds_event_bus_listener: EventBusListener<UserWithoutFunds, _> =
msp_stop_storing_insolvent_user.clone().subscribe_to(
&self.task_spawner,
&self.blockchain,
true,
);
user_without_funds_event_bus_listener.start();

// Subscribing to FinalisedMspStopStoringBucketInsolventUser event from the BlockchainService.
let finalised_msp_stop_storing_bucket_insolvent_user_event_bus_listener: EventBusListener<
FinalisedMspStopStoringBucketInsolventUser,
_,
> = msp_stop_storing_insolvent_user.clone().subscribe_to(
&self.task_spawner,
&self.blockchain,
true,
);
finalised_msp_stop_storing_bucket_insolvent_user_event_bus_listener.start();

// Subscribing to NotifyPeriod event from the BlockchainService.
let notify_period_event_bus_listener: EventBusListener<NotifyPeriod, _> =
msp_charge_fees_task
Expand Down
1 change: 1 addition & 0 deletions node/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod msp_charge_fees;
pub mod msp_delete_bucket;
pub mod msp_delete_file;
pub mod msp_move_bucket;
pub mod msp_stop_storing_insolvent_user;
pub mod msp_upload_file;
pub mod sp_slash_provider;
pub mod user_sends_file;
Loading

0 comments on commit 832b228

Please sign in to comment.