Skip to content

Commit

Permalink
simplify rewards by adding rewardable topic set (#318)
Browse files Browse the repository at this point in the history
Add reward-ready queue, just like we have for churnable topics.

Add topic when reputer payload committed, pop during EmitRewards, reset
when rewards paid out.

Topics still need to be churnable (active, epochLength % blockHeight ==
0, top N by weight) in order to be rewardable, rewardable is just
tracked more cleanly.

No more "x-ready" just "xable":
   * "churn-ready" -> "churnable"
   * "reward-ready" -> "rewardable"

Added query to return weight and "effective revenue" from `GetTopic`
query.

Added tie breakers to sorting functions that were missing it.

Added validation on submitted losses to check if losses were submitted
for duplicate workers. We now just take 1 per unique worker per type of
loss.

[Associated
ticket](https://linear.app/upshot/issue/ORA-1509/add-reward-ready-queue).

---------

Co-authored-by: Diego Campo <xmariachi@gmail.com>
Co-authored-by: Tyler <tyler@cyb3r.space>
  • Loading branch information
3 people authored May 30, 2024
1 parent 57aa41d commit a22b76b
Show file tree
Hide file tree
Showing 20 changed files with 1,105 additions and 911 deletions.
6 changes: 3 additions & 3 deletions app/topics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (th *TopicsHandler) requestTopicReputers(ctx sdk.Context, topic emissionsty
func (th *TopicsHandler) PrepareProposalHandler() sdk.PrepareProposalHandler {
return func(ctx sdk.Context, req *abci.RequestPrepareProposal) (*abci.ResponsePrepareProposal, error) {
ctx.Logger().Debug("\n ---------------- TopicsHandler ------------------- \n")
churnReadyTopics, err := th.emissionsKeeper.GetChurnReadyTopics(ctx)
churnableTopics, err := th.emissionsKeeper.GetChurnableTopics(ctx)
if err != nil {
ctx.Logger().Error("Error getting max number of topics per block: " + err.Error())
return nil, err
Expand All @@ -139,7 +139,7 @@ func (th *TopicsHandler) PrepareProposalHandler() sdk.PrepareProposalHandler {
var wg sync.WaitGroup
// Loop over and run epochs on topics whose inferences are demanded enough to be served
// Within each loop, execute the inference and weight cadence checks and trigger the inference and weight generation
for _, churnReadyTopicId := range churnReadyTopics {
for _, churnableTopicId := range churnableTopics {
wg.Add(1)
go func(topicId TopicId) {
defer wg.Done()
Expand All @@ -150,7 +150,7 @@ func (th *TopicsHandler) PrepareProposalHandler() sdk.PrepareProposalHandler {
}
th.requestTopicWorkers(ctx, topic)
th.requestTopicReputers(ctx, topic)
}(churnReadyTopicId)
}(churnableTopicId)
}
wg.Wait()
// Return the transactions as they came
Expand Down
27 changes: 26 additions & 1 deletion math/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func CalcExpDecay(
return newRev, nil
}

// generic function that sorts the keys of a map
// Generic function that sorts the keys of a map
// Used for deterministic ranging of maps
func GetSortedKeys[K cmp.Ordered, V any](m map[K]V) []K {
keys := make([]K, len(m))
Expand All @@ -69,6 +69,31 @@ func GetSortedKeys[K cmp.Ordered, V any](m map[K]V) []K {
return keys
}

// Generic function that sorts the keys of a map.
// Used for deterministic ranging of arrays with weights in a map
// whose keys may not include some values in the array.
// When an array element is not in the map, it is not included in the output array.
func GetSortedElementsByDecWeightDesc[K cmp.Ordered](l []K, m map[K]*Dec) []K {
// Create a new array that only contains unique elements that are in the map
newL := make([]K, 0)
hasKeyBeenSeen := make(map[K]bool)
for _, el := range l {
if _, ok := m[el]; ok {
if _, ok := hasKeyBeenSeen[el]; !ok {
newL = append(newL, el)
hasKeyBeenSeen[el] = true
}
}
}
sort.Slice(newL, func(i, j int) bool {
if (*m[newL[i]]).Equal(*m[newL[j]]) {
return newL[i] < newL[j]
}
return (*m[newL[i]]).Gt(*m[newL[j]])
})
return newL
}

// StdDev calculates the standard deviation of a slice of `Dec`
// stdDev = sqrt((Σ(x - μ))^2/ N)
// where μ is mean and N is number of elements
Expand Down
26 changes: 26 additions & 0 deletions math/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,32 @@ func TestCalcEmaWithNoPrior(t *testing.T) {
require.True(t, alloraMath.InDelta(expected, result, alloraMath.MustNewDecFromString("0.0001")))
}

func TestCalcExpDecaySimple(t *testing.T) {
decayFactor := alloraMath.MustNewDecFromString("0.1")
currentRev := alloraMath.MustNewDecFromString("300")

// (1 - 0.1) * 300
// 0.9 * 300 = 270
expected := alloraMath.MustNewDecFromString("270")

result, err := alloraMath.CalcExpDecay(currentRev, decayFactor)
require.NoError(t, err)
require.True(t, alloraMath.InDelta(expected, result, alloraMath.MustNewDecFromString("0.0001")))
}

func TestCalcExpDecayZeroDecayFactor(t *testing.T) {
decayFactor := alloraMath.MustNewDecFromString("0")
currentRev := alloraMath.MustNewDecFromString("300")

// (1 - 0) * 300
// 1 * 300 = 300
expected := alloraMath.MustNewDecFromString("300")

result, err := alloraMath.CalcExpDecay(currentRev, decayFactor)
require.NoError(t, err)
require.True(t, alloraMath.InDelta(expected, result, alloraMath.MustNewDecFromString("0.0001")))
}

func TestStdDev(t *testing.T) {
tests := []struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion test/integration/fund_topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,5 @@ func TopicFundingChecks(m testCommon.TestConfig) {
m.T.Log("--- Check funding Topic 1 ---")
FundTopic1(m)
m.T.Log("--- Check reactivating Topic 1 ---")
CheckTopic1Activated(m)
CheckTopic1Activated(m) // Should have stake (from earlier test) AND funds by now
}
28 changes: 10 additions & 18 deletions test/integration/staking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,18 @@ func StakeAliceAsReputerTopic1(m testCommon.TestConfig) {
require.Equal(m.T, fmt.Sprint(stakeToAdd), aliceStakedAfter.Amount.Sub(aliceStakedBefore.Amount).String())
}

// func CheckTopic1Activated(m testCommon.TestConfig) {
// // Fetch only active topics
// pagi := &emissionstypes.QueryActiveTopicsRequest{
// Pagination: &emissionstypes.SimpleCursorPaginationRequest{
// Limit: 10,
// },
// }
// activeTopics, err := m.Client.QueryEmissions().GetActiveTopics(
// m.Ctx,
// pagi)
// require.NoError(m.t, err, "Fetching active topics should not produce an error")

// // Verify the correct number of active topics is retrieved
// require.Equal(m.t, len(activeTopics.Topics), 1, "Should retrieve exactly one active topic")
// }

// Register two actors and check their registrations went through
func StakingChecks(m testCommon.TestConfig) {
m.T.Log("--- Staking Alice as Reputer ---")
StakeAliceAsReputerTopic1(m)
m.T.Log("--- Check reactivating Topic 1 ---")
CheckTopic1Activated(m)

res, _ := m.Client.QueryEmissions().GetTopic(m.Ctx, &emissionstypes.QueryTopicRequest{
TopicId: uint64(1),
})
// Topic is not expected to be funded yet => expect 0 weight => topic not active!
// But we still have this conditional just in case there are > 0 funds
if res.EffectiveRevenue != "0" {
m.T.Log("--- Check reactivating Topic 1 ---")
CheckTopic1Activated(m)
}
}
Loading

0 comments on commit a22b76b

Please sign in to comment.