Skip to content

Commit

Permalink
simplify util_sort.go FindTopNByScoreDesc (#539)
Browse files Browse the repository at this point in the history
## Purpose of Changes and their Description

As part of #421 I need to add features to util_sort for getting the top
list of actors by score. I also observed some ways to make the
`FindTopNByScoreDesc` function simpler as part of that effort. But due
to the high level of importance of this code I wanted to do a refactor
of this function in an isolated manner to make sure that no changes that
affect tests / state are made by the way I rewrote the function. Hence
this PR

## Link(s) to Ticket(s) or Issue(s) resolved by this PR

Related to PROTO-2204

## Are these changes tested and documented?

This code is a refactor and so it is tested by our existing unit tests
to ensure that no logical changes have been made.
  • Loading branch information
relyt29 authored Aug 23, 2024
1 parent 46e6759 commit 9b8a9d8
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 117 deletions.
128 changes: 39 additions & 89 deletions x/emissions/keeper/actor_utils/util_sort.go
Original file line number Diff line number Diff line change
@@ -1,106 +1,56 @@
package actorutils

import (
"container/heap"
"math/rand"
"sort"
"slices"

"github.com/allora-network/allora-chain/x/emissions/types"
emissionstypes "github.com/allora-network/allora-chain/x/emissions/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)

// Source: https://pkg.go.dev/container/heap#Push

// A structure to hold the original value and a random tiebreaker
type SortableItem struct {
Value Actor
Weight Score
Tiebreaker uint32
index int
}

type Actor = string
type BlockHeight = int64
type Score = types.Score
type TopicId = uint64

type PriorityQueue []*SortableItem

func (pq PriorityQueue) Len() int { return len(pq) }

func (pq PriorityQueue) Less(i, j int) bool {
// We want Pop to give us the highest, not lowest, priority so we use greater than here.
if pq[i].Weight.Score.Equal(pq[j].Weight.Score) {
return pq[i].Tiebreaker > pq[j].Tiebreaker
}

return pq[i].Weight.Score.Gt(pq[j].Weight.Score)
}

func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}

func (pq *PriorityQueue) Push(x any) {
n := len(*pq)
item, ok := x.(*SortableItem)
if !ok {
return
}
item.index = n
*pq = append(*pq, item)
}

func (pq *PriorityQueue) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}

// Sorts the given actors by score, desc, breaking ties randomly
// Returns the top N actors as a map with the actor as the key and a boolean (True) as the value
func FindTopNByScoreDesc(ctx sdk.Context, n uint64, scoresByActor map[Actor]Score, randSeed BlockHeight) ([]Actor, map[string]bool) {
func FindTopNByScoreDesc(
ctx sdk.Context,
n uint64,
scores []emissionstypes.Score,
randSeed int64,
) (topNActorsSorted []emissionstypes.Score, allActorsSorted []emissionstypes.Score, actorIsTop map[string]struct{}) {
r := rand.New(rand.NewSource(randSeed)) //nolint:gosec // G404: Use of weak random number generator (math/rand or math/rand/v2 instead of crypto/rand)
queue := &PriorityQueue{}
i := 0
// Extract and sort the keys
keys := make([]Actor, 0, len(scoresByActor))
for actor := range scoresByActor {
keys = append(keys, actor)
}
sort.Slice(keys, func(i, j int) bool {
return keys[i] < keys[j]
// in our tiebreaker, we never return that two elements are equal
// so the sort function will never be called with two equal elements
// so we don't care about sort stability because our tiebreaker determines the order
// we also sort from largest to smallest so the sort function is inverted
// from the usual smallest to largest sort order
slices.SortFunc(scores, func(x, y emissionstypes.Score) int {
if x.Score.Lt(y.Score) {
return 1
} else if x.Score.Gt(y.Score) {
return -1
} else {
tiebreaker := r.Intn(2)
if tiebreaker == 0 {
return -1
} else {
return 1
}
}
})

// Iterate over the sorted keys
for _, actor := range keys {
score := scoresByActor[actor]
queue.Push(&SortableItem{actor, score, r.Uint32(), i})
i++
// which is bigger, n or the length of the scores?
N := n
if N > uint64(len(scores)) {
N = uint64(len(scores))
}

heap.Init(queue)

topN := make([]Actor, 0)
topNBool := make(map[string]bool)
for i := uint64(0); i < n; i++ {
if queue.Len() == 0 {
break
}
item, ok := heap.Pop(queue).(*SortableItem)
if !ok {
ctx.Logger().Warn("Error: Could not cast to SortableItem")
continue
}
topN = append(topN, item.Value)
topNBool[item.Value] = true
topNActorsSorted = make([]emissionstypes.Score, N)
actorIsTop = make(map[string]struct{}, N)
// populate top n actors sorted with only the top n
// populate all with all
// actor is top is a map of the top n actors
for i := uint64(0); i < N; i++ {
topNActorsSorted[i] = scores[i]
actorIsTop[scores[i].Address] = struct{}{}
}

return topN, topNBool
return topNActorsSorted, scores, actorIsTop
}
40 changes: 23 additions & 17 deletions x/emissions/keeper/actor_utils/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,27 @@ func TestFindTopNByScoreDesc(t *testing.T) {
worker4Addr := sdk.AccAddress(worker4PrivateKey.PubKey().Address())
worker5Addr := sdk.AccAddress(worker5PrivateKey.PubKey().Address())

latestReputerScores := make(map[string]types.Score)
latestReputerScores[worker1Addr.String()] = types.Score{TopicId: topicId, BlockHeight: 1, Address: worker1Addr.String(), Score: alloraMath.NewDecFromInt64(90)}
latestReputerScores[worker2Addr.String()] = types.Score{TopicId: topicId, BlockHeight: 1, Address: worker2Addr.String(), Score: alloraMath.NewDecFromInt64(40)}
latestReputerScores[worker3Addr.String()] = types.Score{TopicId: topicId, BlockHeight: 1, Address: worker3Addr.String(), Score: alloraMath.NewDecFromInt64(80)}
latestReputerScores[worker4Addr.String()] = types.Score{TopicId: topicId, BlockHeight: 1, Address: worker4Addr.String(), Score: alloraMath.NewDecFromInt64(20)}
latestReputerScores[worker5Addr.String()] = types.Score{TopicId: topicId, BlockHeight: 1, Address: worker5Addr.String(), Score: alloraMath.NewDecFromInt64(100)}

topActors, topActorsBool := FindTopNByScoreDesc(testCtx, 3, latestReputerScores, 1)
require.Equal(t, worker5Addr.String(), topActors[0])
require.Equal(t, worker1Addr.String(), topActors[1])
require.Equal(t, worker3Addr.String(), topActors[2])

require.Equal(t, topActorsBool[worker1Addr.String()], true)
require.Equal(t, topActorsBool[worker2Addr.String()], false)
require.Equal(t, topActorsBool[worker3Addr.String()], true)
require.Equal(t, topActorsBool[worker4Addr.String()], false)
require.Equal(t, topActorsBool[worker5Addr.String()], true)
latestReputerScores := []types.Score{
{TopicId: topicId, BlockHeight: 1, Address: worker1Addr.String(), Score: alloraMath.NewDecFromInt64(90)},
{TopicId: topicId, BlockHeight: 1, Address: worker2Addr.String(), Score: alloraMath.NewDecFromInt64(40)},
{TopicId: topicId, BlockHeight: 1, Address: worker3Addr.String(), Score: alloraMath.NewDecFromInt64(80)},
{TopicId: topicId, BlockHeight: 1, Address: worker4Addr.String(), Score: alloraMath.NewDecFromInt64(20)},
{TopicId: topicId, BlockHeight: 1, Address: worker5Addr.String(), Score: alloraMath.NewDecFromInt64(100)},
}

topActors, _, topActorsBool := FindTopNByScoreDesc(testCtx, 3, latestReputerScores, 1)
require.Equal(t, worker5Addr.String(), topActors[0].Address)
require.Equal(t, worker1Addr.String(), topActors[1].Address)
require.Equal(t, worker3Addr.String(), topActors[2].Address)

_, isTop := topActorsBool[worker1Addr.String()]
require.Equal(t, isTop, true)
_, isTop = topActorsBool[worker2Addr.String()]
require.Equal(t, isTop, false)
_, isTop = topActorsBool[worker3Addr.String()]
require.Equal(t, isTop, true)
_, isTop = topActorsBool[worker4Addr.String()]
require.Equal(t, isTop, false)
_, isTop = topActorsBool[worker5Addr.String()]
require.Equal(t, isTop, true)
}
7 changes: 6 additions & 1 deletion x/emissions/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2190,7 +2190,12 @@ func (k *Keeper) GetLatestInfererScore(ctx context.Context, topicId TopicId, wor
score, err := k.latestInfererScoresByWorker.Get(ctx, key)
if err != nil {
if errors.Is(err, collections.ErrNotFound) {
return types.Score{}, nil
return types.Score{
BlockHeight: 0,
Address: worker,
TopicId: topicId,
Score: alloraMath.ZeroDec(),
}, nil
}
return types.Score{}, err
}
Expand Down
7 changes: 6 additions & 1 deletion x/emissions/keeper/keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2476,7 +2476,12 @@ func (s *KeeperTestSuite) TestGetLatestScores() {
// Test getting latest scores when none are set
infererScore, err := keeper.GetLatestInfererScore(ctx, topicId, worker)
s.Require().NoError(err, "Fetching latest inferer score should not fail")
s.Require().Equal(types.Score{}, infererScore, "Inferer score should be empty if not set")
s.Require().Equal(types.Score{
TopicId: topicId,
BlockHeight: 0,
Address: worker,
Score: alloraMath.ZeroDec(),
}, infererScore, "Inferer score should be zero if not set")

forecasterScore, err := keeper.GetLatestForecasterScore(ctx, topicId, forecaster)
s.Require().NoError(err, "Fetching latest forecaster score should not fail")
Expand Down
22 changes: 14 additions & 8 deletions x/emissions/keeper/msgserver/msg_server_worker_payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,27 +112,33 @@ func (ms msgServer) InsertWorkerPayload(ctx context.Context, msg *types.MsgInser
"Error forecaster address is not registered in this topic")
}

latestScoresForForecastedInferers := make(map[string]types.Score)
// Remove duplicate forecast element
acceptedForecastElements := make([]*types.ForecastElement, 0)
seenInferers := make(map[string]bool)

latestScoresForForecastedInferers := make([]types.Score, 0)
for _, el := range forecast.ForecastElements {
score, err := ms.k.GetLatestInfererScore(ctx, forecast.TopicId, el.Inferer)
if err != nil {
continue
}
latestScoresForForecastedInferers[el.Inferer] = score
latestScoresForForecastedInferers = append(latestScoresForForecastedInferers, score)
}

moduleParams, err := ms.k.GetParams(ctx)
if err != nil {
return nil, err
}
_, topNInferer := actorutils.FindTopNByScoreDesc(sdkCtx, moduleParams.MaxElementsPerForecast, latestScoresForForecastedInferers, forecast.BlockHeight)
_, _, topNInferer := actorutils.FindTopNByScoreDesc(
sdkCtx,
moduleParams.MaxElementsPerForecast,
latestScoresForForecastedInferers,
forecast.BlockHeight,
)

// Remove duplicate forecast element
acceptedForecastElements := make([]*types.ForecastElement, 0)
seenInferers := make(map[string]bool)
for _, el := range forecast.ForecastElements {
if !seenInferers[el.Inferer] && topNInferer[el.Inferer] {
notAlreadySeen := !seenInferers[el.Inferer]
_, isTopInferer := topNInferer[el.Inferer]
if notAlreadySeen && isTopInferer {
acceptedForecastElements = append(acceptedForecastElements, el)
seenInferers[el.Inferer] = true
}
Expand Down
7 changes: 6 additions & 1 deletion x/emissions/module/rewards/rewards_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2674,7 +2674,12 @@ func (s *RewardsTestSuite) TestTotalInferersRewardFractionGrowsWithMoreInferers(
}
thirdForecasterFraction, err := totalForecastersReward.Quo(totalReward)
s.Require().NoError(err)
s.Require().True(firstForecasterFraction.Lt(thirdForecasterFraction), "Third forecaster fraction must be bigger than first fraction")
s.Require().True(
firstForecasterFraction.Lt(thirdForecasterFraction),
"Third forecaster fraction must be bigger than first fraction %s > %s",
firstForecasterFraction.String(),
thirdForecasterFraction.String(),
)
}

func (s *RewardsTestSuite) TestRewardForTopicGoesUpWhenRelativeStakeGoesUp() {
Expand Down

0 comments on commit 9b8a9d8

Please sign in to comment.