Skip to content

Commit

Permalink
ORA1421 - Changed churn ready topic usage (#256)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilherme-brandao authored May 16, 2024
1 parent 2289994 commit e854079
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 33 deletions.
13 changes: 2 additions & 11 deletions app/topics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (th *TopicsHandler) PrepareProposalHandler() sdk.PrepareProposalHandler {
fmt.Printf("\n ---------------- TopicsHandler ------------------- \n")
currentBlockHeight := ctx.BlockHeight()

maxNumberTopics, err := th.emissionsKeeper.GetParamsMaxTopicsPerBlock(ctx)
churnReadyTopics, err := th.emissionsKeeper.GetChurnReadyTopics(ctx)
if err != nil {
fmt.Println("Error getting max number of topics per block: ", err)
return nil, err
Expand All @@ -137,16 +137,7 @@ func (th *TopicsHandler) PrepareProposalHandler() sdk.PrepareProposalHandler {
var wg sync.WaitGroup
// Loop over and run epochs on topics whose inferences are demanded enough to be served
// Within each loop, execute the inference and weight cadence checks and trigger the inference and weight generation
for i := uint64(0); i < maxNumberTopics; i++ {
// Pop churn ready topic
churnReadyTopicId, err := th.emissionsKeeper.PopChurnReadyTopic(ctx)
if err != nil {
fmt.Println("Error popping churn ready topic: ", err)
continue
}
if churnReadyTopicId == 0 {
break
}
for _, churnReadyTopicId := range churnReadyTopics {
wg.Add(1)
go func(topicId TopicId) {
defer wg.Done()
Expand Down
46 changes: 31 additions & 15 deletions x/emissions/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1574,35 +1574,51 @@ func (k *Keeper) ResetTopicFeeRevenue(ctx context.Context, topicId TopicId, bloc

/// TOPIC CHURN

// Get the churn ready topics
func (k *Keeper) GetChurnReadyTopics(ctx context.Context) ([]TopicId, error) {
iter, err := k.churnReadyTopics.Iterate(ctx, nil)
if err != nil {
return nil, err
}
defer iter.Close()

topics := make([]TopicId, 0)
for ; iter.Valid(); iter.Next() {
topicId, err := iter.Key()
if err != nil {
return nil, err
}
topics = append(topics, topicId)
}

return topics, nil
}

// Add a topic as churn ready
func (k *Keeper) AddChurnReadyTopic(ctx context.Context, topicId TopicId) error {
return k.churnReadyTopics.Set(ctx, topicId)
}

// returns a single churn ready topic for processing. Order out is not guaranteed.
// if there are no churn ready topics, returns the reserved topic id 0,
// which cannot be used as a topic id - callers are responsible for checking
// that the returned topic id is not 0.
func (k *Keeper) PopChurnReadyTopic(ctx context.Context) (TopicId, error) {
// ResetChurnReadyTopics clears all topics from the churn-ready set and resets related states.
func (k *Keeper) ResetChurnReadyTopics(ctx context.Context) error {
iter, err := k.churnReadyTopics.Iterate(ctx, nil)
if err != nil {
return uint64(0), err
return err
}
defer iter.Close()

if iter.Valid() {
poppedTopic, err := iter.Key()
for ; iter.Valid(); iter.Next() {
topicId, err := iter.Key()
if err != nil {
return uint64(0), err
return err
}
if err := k.churnReadyTopics.Remove(ctx, poppedTopic); err != nil {
return uint64(0), err

if err := k.churnReadyTopics.Remove(ctx, topicId); err != nil {
return err
}
return poppedTopic, nil
}
iter.Close()

// if no topics exist to be churned, return the reserved topic id 0
return uint64(0), nil
return nil
}

/// SCORES
Expand Down
15 changes: 8 additions & 7 deletions x/emissions/keeper/keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1981,7 +1981,7 @@ func (s *KeeperTestSuite) TestAddTopicFeeRevenueAndIncrementEpoch() {

/// TOPIC CHURN

func (s *KeeperTestSuite) TestPopChurnReadyTopic() {
func (s *KeeperTestSuite) TestChurnReadyTopics() {
ctx := s.ctx
keeper := s.emissionsKeeper
topicId := uint64(123)
Expand All @@ -1993,18 +1993,19 @@ func (s *KeeperTestSuite) TestPopChurnReadyTopic() {
err = keeper.AddChurnReadyTopic(ctx, topicId2)
s.Require().NoError(err)

poppedId2, err := keeper.PopChurnReadyTopic(ctx)
// Ensure the first topic is retrieved
retrievedIds, err := keeper.GetChurnReadyTopics(ctx)
s.Require().NoError(err)
s.Require().Equal(topicId, poppedId2)
s.Require().Len(retrievedIds, 2, "Should retrieve all churn ready topics")

poppedId, err := keeper.PopChurnReadyTopic(ctx)
// Reset the churn ready topics
err = keeper.ResetChurnReadyTopics(ctx)
s.Require().NoError(err)
s.Require().Equal(topicId2, poppedId)

// Ensure no topics remain
remainingId, err := keeper.PopChurnReadyTopic(ctx)
remainingIds, err := keeper.GetChurnReadyTopics(ctx)
s.Require().NoError(err)
s.Require().Equal(uint64(0), remainingId)
s.Require().Len(remainingIds, 0, "Should have no churn ready topics after reset")
}

/// SCORES
Expand Down
7 changes: 7 additions & 0 deletions x/emissions/module/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ func EndBlocker(ctx context.Context, am AppModule) error {
return errors.Wrapf(err, "Rewards error")
}

// Reset the churn ready topics
err = am.keeper.ResetChurnReadyTopics(ctx)
if err != nil {
fmt.Println("Error resetting churn ready topics: ", err)
return errors.Wrapf(err, "Resetting churn ready topics error")
}

// NONCE MGMT with churnReady weights
var wg sync.WaitGroup
// Loop over and run epochs on topics whose inferences are demanded enough to be served
Expand Down

0 comments on commit e854079

Please sign in to comment.