-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: ✨ MSP stop storing buckets for insolvent user task #366
Conversation
This change is made since a bucket status (such as its MSP) could change between the request was made and the new MSP responds, so it's not reliable
…ucket-insolvent-user
…ucket-insolvent-user
Added a check to make sure the payment stream between the MSP and the user exists when the user issues a new storage request. This is so we don't allow users that just stopped being insolvent to issue storage requests for buckets that the MSP hasn't deleted yet
…ucket-insolvent-user
…task-stop-storing-insolvent-user
…task-stop-storing-insolvent-user
…ucket-insolvent-user
…task-stop-storing-insolvent-user
// Get the ID of this MSP in the MSP indexer table. | ||
let msp = shc_indexer_db::models::Msp::get_by_onchain_msp_id( | ||
&mut indexer_connection, | ||
msp_on_chain_id.to_string(), | ||
) | ||
.await?; | ||
let msp_id = msp.id; | ||
|
||
// Get all the buckets this MSP is currently storing for the user. | ||
let stored_buckets = shc_indexer_db::models::Bucket::get_by_msp_id_and_owner( | ||
&mut indexer_connection, | ||
msp_id, | ||
insolvent_user.to_string(), | ||
) | ||
.await?; | ||
|
||
// If we are, queue up a bucket deletion request for that user. | ||
if !stored_buckets.is_empty() { | ||
info!(target: LOG_TARGET, "Buckets found for user {:?}, queueing up bucket stop storing", insolvent_user); | ||
// Queue a request to stop storing a bucket from the insolvent user. | ||
self.storage_hub_handler | ||
.blockchain | ||
.queue_stop_storing_for_insolvent_user_request( | ||
StopStoringForInsolventUserRequest::new(insolvent_user), | ||
) | ||
.await?; | ||
} | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
impl<NT> EventHandler<MspStopStoringBucketInsolventUser> for MspStopStoringInsolventUserTask<NT> | ||
where | ||
NT: ShNodeType + 'static, | ||
NT::FSH: MspForestStorageHandlerT, | ||
{ | ||
async fn handle_event( | ||
&mut self, | ||
event: MspStopStoringBucketInsolventUser, | ||
) -> anyhow::Result<()> { | ||
info!( | ||
target: LOG_TARGET, | ||
"Processing MspStopStoringBucketInsolventUser for user {:?}", | ||
event.owner | ||
); | ||
|
||
// Get the insolvent user from the event. | ||
let insolvent_user = event.owner; | ||
|
||
// Get the indexer database pool. If this MSP is not keeping an indexer, it won't be able to check if it has any buckets for | ||
// this user and as such wont be able to stop storing them. | ||
let indexer_db_pool = if let Some(indexer_db_pool) = | ||
self.storage_hub_handler.indexer_db_pool.clone() | ||
{ | ||
indexer_db_pool | ||
} else { | ||
error!( | ||
target: LOG_TARGET, | ||
"Indexer is disabled but a insolvent user event was received. Please provide a database URL (and enable indexer) for it to use this feature." | ||
); | ||
|
||
return Err(anyhow!("Indexer is disabled but a insolvent user event was received. Please provide a database URL (and enable indexer) for it to use this feature.")); | ||
}; | ||
|
||
// Try to connect to the indexer. It's needed to check if there are any buckets for | ||
// this user that this MSP is storing. | ||
let mut indexer_connection = indexer_db_pool.get().await?; | ||
|
||
// Get the ID of this MSP in the MSP indexer table. | ||
let msp = shc_indexer_db::models::Msp::get_by_onchain_msp_id( | ||
&mut indexer_connection, | ||
event.msp_id.to_string(), | ||
) | ||
.await?; | ||
let msp_id = msp.id; | ||
|
||
// Get all the buckets this MSP is currently storing for the user. | ||
let stored_buckets = shc_indexer_db::models::Bucket::get_by_msp_id_and_owner( | ||
&mut indexer_connection, | ||
msp_id, | ||
insolvent_user.to_string(), | ||
) | ||
.await?; | ||
|
||
// If we are, queue up a bucket deletion request for that user. | ||
if !stored_buckets.is_empty() { | ||
info!(target: LOG_TARGET, "Buckets found for user {:?}, queueing up bucket stop storing", insolvent_user); | ||
// Queue a request to stop storing a bucket from the insolvent user. | ||
self.storage_hub_handler | ||
.blockchain | ||
.queue_stop_storing_for_insolvent_user_request( | ||
StopStoringForInsolventUserRequest::new(insolvent_user), | ||
) | ||
.await?; | ||
} | ||
|
||
Ok(()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is repeated code. We could unify it. But most importantly, why not use a runtime API instead of the indexer for this?
By using the indexer:
- You require an MSP to be running an indexer, whereas before, it was only necessary if the MSP wanted to be able to accept moved buckets.
- You have the problem that the indexer follows the finalised state.
By using a runtime API:
- You can access the on-chain information of the latest block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added runtime api in favor of indexer here f9ff4c7
/// Handles the `ProcessStopStoringForInsolventUserRequest` event. | ||
/// | ||
/// This event is triggered whenever a Forest write-lock can be acquired to process a `StopStoringForInsolventUserRequest` | ||
/// after receiving either a `UserWithoutFunds` or `MspStopStoringBucketInsolventUser` event from the runtime.\ | ||
/// This task will: | ||
/// - Stop storing the bucket for the insolvent user. | ||
/// - Delete the bucket from the forest storage. | ||
impl<NT> EventHandler<ProcessStopStoringForInsolventUserRequest> | ||
for MspStopStoringInsolventUserTask<NT> | ||
where | ||
NT: ShNodeType + 'static, | ||
NT::FSH: MspForestStorageHandlerT, | ||
{ | ||
async fn handle_event( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would argue that this task doesn't need to use the Forest write lock. It's not presenting any proof, just removing the whole bucket.
Sure, if you send this transaction together with confirmation of a storage request for this bucket, either the storage request will fail if it comes in second, or it will succeed if it comes in first. Either way the result is the same: the MSP will no longer be storing the bucket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would do this in another PR since the ProcessStopStoringForInsolventUserRequestData
event is being processed by both BSP and MSP nodes.
We would have to decouple this event so we can emit this under separate circumstances (i.e. emit the event without the need for a forest root write lock).
…g index db for buckets stored by insolvent users
…StopStoringForInsolventUserRequest` event processing from `MspStopStoringInsolventUserTask`
… integration test
Co-authored-by: Facundo Farall <37149322+ffarall@users.noreply.github.com>
This PR adds the
MspStopStoringInsolventUserTask
, which is the task in charge of deleting the currently stored buckets that belong to a user that has become insolvent. This task:UserWithoutFunds
event from the runtime. When it receives it, checks if any of the MSP's stored buckets belongs to the now-insolvent user, and then spawns concurrent tasks for each one that submit the requiredmsp_stop_storing_bucket_for_insolvent_user
extrinsic and waits for them to finish.FinalisedMspStopStoringBucketInsolventUser
(which is emitted when a block that contained aMspStopStoringBucketInsolventUser
event gets finalised). When it receives it, it deletes the bucket from the forest storage and all that bucket's files from the file storage.Pending TODOs:
Develop an integration test that comprehensively tests this flow.DONE