From e854079b7a4dab3ca89e4d3e44522739ad8a0c6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guilherme=20Brand=C3=A3o?= <37072140+guilherme-brandao@users.noreply.github.com> Date: Thu, 16 May 2024 19:24:46 +0100 Subject: [PATCH] ORA1421 - Changed churn ready topic usage (#256) - Removing stateful changes from topic handler Ticket: https://linear.app/upshot/issue/ORA-1421/move-churnreadytopics-removal-to-outside-topic-handler --- app/topics_handler.go | 13 ++------- x/emissions/keeper/keeper.go | 46 +++++++++++++++++++++---------- x/emissions/keeper/keeper_test.go | 15 +++++----- x/emissions/module/abci.go | 7 +++++ 4 files changed, 48 insertions(+), 33 deletions(-) diff --git a/app/topics_handler.go b/app/topics_handler.go index 81d2aac58..f03f1f6fc 100644 --- a/app/topics_handler.go +++ b/app/topics_handler.go @@ -128,7 +128,7 @@ func (th *TopicsHandler) PrepareProposalHandler() sdk.PrepareProposalHandler { fmt.Printf("\n ---------------- TopicsHandler ------------------- \n") currentBlockHeight := ctx.BlockHeight() - maxNumberTopics, err := th.emissionsKeeper.GetParamsMaxTopicsPerBlock(ctx) + churnReadyTopics, err := th.emissionsKeeper.GetChurnReadyTopics(ctx) if err != nil { fmt.Println("Error getting max number of topics per block: ", err) return nil, err @@ -137,16 +137,7 @@ func (th *TopicsHandler) PrepareProposalHandler() sdk.PrepareProposalHandler { var wg sync.WaitGroup // Loop over and run epochs on topics whose inferences are demanded enough to be served // Within each loop, execute the inference and weight cadence checks and trigger the inference and weight generation - for i := uint64(0); i < maxNumberTopics; i++ { - // Pop churn ready topic - churnReadyTopicId, err := th.emissionsKeeper.PopChurnReadyTopic(ctx) - if err != nil { - fmt.Println("Error popping churn ready topic: ", err) - continue - } - if churnReadyTopicId == 0 { - break - } + for _, churnReadyTopicId := range churnReadyTopics { wg.Add(1) go func(topicId TopicId) { defer wg.Done() diff --git a/x/emissions/keeper/keeper.go b/x/emissions/keeper/keeper.go index 59012d564..fb0c5be1a 100644 --- a/x/emissions/keeper/keeper.go +++ b/x/emissions/keeper/keeper.go @@ -1574,35 +1574,51 @@ func (k *Keeper) ResetTopicFeeRevenue(ctx context.Context, topicId TopicId, bloc /// TOPIC CHURN +// Get the churn ready topics +func (k *Keeper) GetChurnReadyTopics(ctx context.Context) ([]TopicId, error) { + iter, err := k.churnReadyTopics.Iterate(ctx, nil) + if err != nil { + return nil, err + } + defer iter.Close() + + topics := make([]TopicId, 0) + for ; iter.Valid(); iter.Next() { + topicId, err := iter.Key() + if err != nil { + return nil, err + } + topics = append(topics, topicId) + } + + return topics, nil +} + // Add a topic as churn ready func (k *Keeper) AddChurnReadyTopic(ctx context.Context, topicId TopicId) error { return k.churnReadyTopics.Set(ctx, topicId) } -// returns a single churn ready topic for processing. Order out is not guaranteed. -// if there are no churn ready topics, returns the reserved topic id 0, -// which cannot be used as a topic id - callers are responsible for checking -// that the returned topic id is not 0. -func (k *Keeper) PopChurnReadyTopic(ctx context.Context) (TopicId, error) { +// ResetChurnReadyTopics clears all topics from the churn-ready set and resets related states. +func (k *Keeper) ResetChurnReadyTopics(ctx context.Context) error { iter, err := k.churnReadyTopics.Iterate(ctx, nil) if err != nil { - return uint64(0), err + return err } + defer iter.Close() - if iter.Valid() { - poppedTopic, err := iter.Key() + for ; iter.Valid(); iter.Next() { + topicId, err := iter.Key() if err != nil { - return uint64(0), err + return err } - if err := k.churnReadyTopics.Remove(ctx, poppedTopic); err != nil { - return uint64(0), err + + if err := k.churnReadyTopics.Remove(ctx, topicId); err != nil { + return err } - return poppedTopic, nil } - iter.Close() - // if no topics exist to be churned, return the reserved topic id 0 - return uint64(0), nil + return nil } /// SCORES diff --git a/x/emissions/keeper/keeper_test.go b/x/emissions/keeper/keeper_test.go index 97a67627a..e31931fe0 100644 --- a/x/emissions/keeper/keeper_test.go +++ b/x/emissions/keeper/keeper_test.go @@ -1981,7 +1981,7 @@ func (s *KeeperTestSuite) TestAddTopicFeeRevenueAndIncrementEpoch() { /// TOPIC CHURN -func (s *KeeperTestSuite) TestPopChurnReadyTopic() { +func (s *KeeperTestSuite) TestChurnReadyTopics() { ctx := s.ctx keeper := s.emissionsKeeper topicId := uint64(123) @@ -1993,18 +1993,19 @@ func (s *KeeperTestSuite) TestPopChurnReadyTopic() { err = keeper.AddChurnReadyTopic(ctx, topicId2) s.Require().NoError(err) - poppedId2, err := keeper.PopChurnReadyTopic(ctx) + // Ensure the first topic is retrieved + retrievedIds, err := keeper.GetChurnReadyTopics(ctx) s.Require().NoError(err) - s.Require().Equal(topicId, poppedId2) + s.Require().Len(retrievedIds, 2, "Should retrieve all churn ready topics") - poppedId, err := keeper.PopChurnReadyTopic(ctx) + // Reset the churn ready topics + err = keeper.ResetChurnReadyTopics(ctx) s.Require().NoError(err) - s.Require().Equal(topicId2, poppedId) // Ensure no topics remain - remainingId, err := keeper.PopChurnReadyTopic(ctx) + remainingIds, err := keeper.GetChurnReadyTopics(ctx) s.Require().NoError(err) - s.Require().Equal(uint64(0), remainingId) + s.Require().Len(remainingIds, 0, "Should have no churn ready topics after reset") } /// SCORES diff --git a/x/emissions/module/abci.go b/x/emissions/module/abci.go index 82933fe7a..6587ad254 100644 --- a/x/emissions/module/abci.go +++ b/x/emissions/module/abci.go @@ -29,6 +29,13 @@ func EndBlocker(ctx context.Context, am AppModule) error { return errors.Wrapf(err, "Rewards error") } + // Reset the churn ready topics + err = am.keeper.ResetChurnReadyTopics(ctx) + if err != nil { + fmt.Println("Error resetting churn ready topics: ", err) + return errors.Wrapf(err, "Resetting churn ready topics error") + } + // NONCE MGMT with churnReady weights var wg sync.WaitGroup // Loop over and run epochs on topics whose inferences are demanded enough to be served