Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simplify util_sort.go FindTopNByScoreDesc #539

Merged
merged 10 commits into from
Aug 23, 2024
128 changes: 39 additions & 89 deletions x/emissions/keeper/actor_utils/util_sort.go
Original file line number Diff line number Diff line change
@@ -1,106 +1,56 @@
package actorutils

import (
"container/heap"
"math/rand"
"sort"
"slices"

"github.com/allora-network/allora-chain/x/emissions/types"
emissionstypes "github.com/allora-network/allora-chain/x/emissions/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)

// Source: https://pkg.go.dev/container/heap#Push

// A structure to hold the original value and a random tiebreaker
type SortableItem struct {
Value Actor
Weight Score
Tiebreaker uint32
index int
}

type Actor = string
type BlockHeight = int64
type Score = types.Score
type TopicId = uint64

type PriorityQueue []*SortableItem

func (pq PriorityQueue) Len() int { return len(pq) }

func (pq PriorityQueue) Less(i, j int) bool {
// We want Pop to give us the highest, not lowest, priority so we use greater than here.
if pq[i].Weight.Score.Equal(pq[j].Weight.Score) {
return pq[i].Tiebreaker > pq[j].Tiebreaker
}

return pq[i].Weight.Score.Gt(pq[j].Weight.Score)
}

func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}

func (pq *PriorityQueue) Push(x any) {
n := len(*pq)
item, ok := x.(*SortableItem)
if !ok {
return
}
item.index = n
*pq = append(*pq, item)
}

func (pq *PriorityQueue) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}

// Sorts the given actors by score, desc, breaking ties randomly
// Returns the top N actors as a map with the actor as the key and a boolean (True) as the value
func FindTopNByScoreDesc(ctx sdk.Context, n uint64, scoresByActor map[Actor]Score, randSeed BlockHeight) ([]Actor, map[string]bool) {
func FindTopNByScoreDesc(
ctx sdk.Context,
n uint64,
scores []emissionstypes.Score,
randSeed int64,
) (topNActorsSorted []emissionstypes.Score, allActorsSorted []emissionstypes.Score, actorIsTop map[string]struct{}) {
r := rand.New(rand.NewSource(randSeed)) //nolint:gosec // G404: Use of weak random number generator (math/rand or math/rand/v2 instead of crypto/rand)
queue := &PriorityQueue{}
i := 0
// Extract and sort the keys
keys := make([]Actor, 0, len(scoresByActor))
for actor := range scoresByActor {
keys = append(keys, actor)
}
sort.Slice(keys, func(i, j int) bool {
return keys[i] < keys[j]
// in our tiebreaker, we never return that two elements are equal
// so the sort function will never be called with two equal elements
// so we don't care about sort stability because our tiebreaker determines the order
// we also sort from largest to smallest so the sort function is inverted
// from the usual smallest to largest sort order
slices.SortFunc(scores, func(x, y emissionstypes.Score) int {
if x.Score.Lt(y.Score) {
return 1
} else if x.Score.Gt(y.Score) {
return -1
} else {
tiebreaker := r.Intn(2)
if tiebreaker == 0 {
return -1
} else {
return 1
}
}
})

// Iterate over the sorted keys
for _, actor := range keys {
score := scoresByActor[actor]
queue.Push(&SortableItem{actor, score, r.Uint32(), i})
i++
// which is bigger, n or the length of the scores?
N := n
if N > uint64(len(scores)) {
N = uint64(len(scores))
}

heap.Init(queue)

topN := make([]Actor, 0)
topNBool := make(map[string]bool)
for i := uint64(0); i < n; i++ {
if queue.Len() == 0 {
break
}
item, ok := heap.Pop(queue).(*SortableItem)
if !ok {
ctx.Logger().Warn("Error: Could not cast to SortableItem")
continue
}
topN = append(topN, item.Value)
topNBool[item.Value] = true
topNActorsSorted = make([]emissionstypes.Score, N)
actorIsTop = make(map[string]struct{}, N)
// populate top n actors sorted with only the top n
// populate all with all
// actor is top is a map of the top n actors
for i := uint64(0); i < N; i++ {
topNActorsSorted[i] = scores[i]
actorIsTop[scores[i].Address] = struct{}{}
}

return topN, topNBool
return topNActorsSorted, scores, actorIsTop
}
40 changes: 23 additions & 17 deletions x/emissions/keeper/actor_utils/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,27 @@ func TestFindTopNByScoreDesc(t *testing.T) {
worker4Addr := sdk.AccAddress(worker4PrivateKey.PubKey().Address())
worker5Addr := sdk.AccAddress(worker5PrivateKey.PubKey().Address())

latestReputerScores := make(map[string]types.Score)
latestReputerScores[worker1Addr.String()] = types.Score{TopicId: topicId, BlockHeight: 1, Address: worker1Addr.String(), Score: alloraMath.NewDecFromInt64(90)}
latestReputerScores[worker2Addr.String()] = types.Score{TopicId: topicId, BlockHeight: 1, Address: worker2Addr.String(), Score: alloraMath.NewDecFromInt64(40)}
latestReputerScores[worker3Addr.String()] = types.Score{TopicId: topicId, BlockHeight: 1, Address: worker3Addr.String(), Score: alloraMath.NewDecFromInt64(80)}
latestReputerScores[worker4Addr.String()] = types.Score{TopicId: topicId, BlockHeight: 1, Address: worker4Addr.String(), Score: alloraMath.NewDecFromInt64(20)}
latestReputerScores[worker5Addr.String()] = types.Score{TopicId: topicId, BlockHeight: 1, Address: worker5Addr.String(), Score: alloraMath.NewDecFromInt64(100)}

topActors, topActorsBool := FindTopNByScoreDesc(testCtx, 3, latestReputerScores, 1)
require.Equal(t, worker5Addr.String(), topActors[0])
require.Equal(t, worker1Addr.String(), topActors[1])
require.Equal(t, worker3Addr.String(), topActors[2])

require.Equal(t, topActorsBool[worker1Addr.String()], true)
require.Equal(t, topActorsBool[worker2Addr.String()], false)
require.Equal(t, topActorsBool[worker3Addr.String()], true)
require.Equal(t, topActorsBool[worker4Addr.String()], false)
require.Equal(t, topActorsBool[worker5Addr.String()], true)
latestReputerScores := []types.Score{
{TopicId: topicId, BlockHeight: 1, Address: worker1Addr.String(), Score: alloraMath.NewDecFromInt64(90)},
{TopicId: topicId, BlockHeight: 1, Address: worker2Addr.String(), Score: alloraMath.NewDecFromInt64(40)},
{TopicId: topicId, BlockHeight: 1, Address: worker3Addr.String(), Score: alloraMath.NewDecFromInt64(80)},
{TopicId: topicId, BlockHeight: 1, Address: worker4Addr.String(), Score: alloraMath.NewDecFromInt64(20)},
{TopicId: topicId, BlockHeight: 1, Address: worker5Addr.String(), Score: alloraMath.NewDecFromInt64(100)},
}

topActors, _, topActorsBool := FindTopNByScoreDesc(testCtx, 3, latestReputerScores, 1)
require.Equal(t, worker5Addr.String(), topActors[0].Address)
require.Equal(t, worker1Addr.String(), topActors[1].Address)
require.Equal(t, worker3Addr.String(), topActors[2].Address)

_, isTop := topActorsBool[worker1Addr.String()]
require.Equal(t, isTop, true)
_, isTop = topActorsBool[worker2Addr.String()]
require.Equal(t, isTop, false)
_, isTop = topActorsBool[worker3Addr.String()]
require.Equal(t, isTop, true)
_, isTop = topActorsBool[worker4Addr.String()]
require.Equal(t, isTop, false)
_, isTop = topActorsBool[worker5Addr.String()]
require.Equal(t, isTop, true)
}
7 changes: 6 additions & 1 deletion x/emissions/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2190,7 +2190,12 @@ func (k *Keeper) GetLatestInfererScore(ctx context.Context, topicId TopicId, wor
score, err := k.latestInfererScoresByWorker.Get(ctx, key)
if err != nil {
if errors.Is(err, collections.ErrNotFound) {
return types.Score{}, nil
return types.Score{
BlockHeight: 0,
Address: worker,
TopicId: topicId,
Score: alloraMath.ZeroDec(),
}, nil
}
return types.Score{}, err
}
Expand Down
7 changes: 6 additions & 1 deletion x/emissions/keeper/keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2476,7 +2476,12 @@ func (s *KeeperTestSuite) TestGetLatestScores() {
// Test getting latest scores when none are set
infererScore, err := keeper.GetLatestInfererScore(ctx, topicId, worker)
s.Require().NoError(err, "Fetching latest inferer score should not fail")
s.Require().Equal(types.Score{}, infererScore, "Inferer score should be empty if not set")
s.Require().Equal(types.Score{
TopicId: topicId,
BlockHeight: 0,
Address: worker,
Score: alloraMath.ZeroDec(),
}, infererScore, "Inferer score should be zero if not set")

forecasterScore, err := keeper.GetLatestForecasterScore(ctx, topicId, forecaster)
s.Require().NoError(err, "Fetching latest forecaster score should not fail")
Expand Down
22 changes: 14 additions & 8 deletions x/emissions/keeper/msgserver/msg_server_worker_payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,27 +112,33 @@ func (ms msgServer) InsertWorkerPayload(ctx context.Context, msg *types.MsgInser
"Error forecaster address is not registered in this topic")
}

latestScoresForForecastedInferers := make(map[string]types.Score)
// Remove duplicate forecast element
acceptedForecastElements := make([]*types.ForecastElement, 0)
seenInferers := make(map[string]bool)

latestScoresForForecastedInferers := make([]types.Score, 0)
for _, el := range forecast.ForecastElements {
score, err := ms.k.GetLatestInfererScore(ctx, forecast.TopicId, el.Inferer)
if err != nil {
continue
}
latestScoresForForecastedInferers[el.Inferer] = score
latestScoresForForecastedInferers = append(latestScoresForForecastedInferers, score)
}

moduleParams, err := ms.k.GetParams(ctx)
if err != nil {
return nil, err
}
_, topNInferer := actorutils.FindTopNByScoreDesc(sdkCtx, moduleParams.MaxElementsPerForecast, latestScoresForForecastedInferers, forecast.BlockHeight)
_, _, topNInferer := actorutils.FindTopNByScoreDesc(
sdkCtx,
moduleParams.MaxElementsPerForecast,
latestScoresForForecastedInferers,
forecast.BlockHeight,
)

// Remove duplicate forecast element
acceptedForecastElements := make([]*types.ForecastElement, 0)
seenInferers := make(map[string]bool)
for _, el := range forecast.ForecastElements {
if !seenInferers[el.Inferer] && topNInferer[el.Inferer] {
notAlreadySeen := !seenInferers[el.Inferer]
_, isTopInferer := topNInferer[el.Inferer]
if notAlreadySeen && isTopInferer {
acceptedForecastElements = append(acceptedForecastElements, el)
seenInferers[el.Inferer] = true
}
Expand Down
7 changes: 6 additions & 1 deletion x/emissions/module/rewards/rewards_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2674,7 +2674,12 @@ func (s *RewardsTestSuite) TestTotalInferersRewardFractionGrowsWithMoreInferers(
}
thirdForecasterFraction, err := totalForecastersReward.Quo(totalReward)
s.Require().NoError(err)
s.Require().True(firstForecasterFraction.Lt(thirdForecasterFraction), "Third forecaster fraction must be bigger than first fraction")
s.Require().True(
firstForecasterFraction.Lt(thirdForecasterFraction),
"Third forecaster fraction must be bigger than first fraction %s > %s",
firstForecasterFraction.String(),
thirdForecasterFraction.String(),
)
}

func (s *RewardsTestSuite) TestRewardForTopicGoesUpWhenRelativeStakeGoesUp() {
Expand Down
Loading