Skip to content

Commit

Permalink
Merge branch 'dev' into PROTO-2241
Browse files Browse the repository at this point in the history
  • Loading branch information
kpeluso authored Aug 27, 2024
2 parents cfd3dc0 + d560252 commit 1224a96
Show file tree
Hide file tree
Showing 8 changed files with 294 additions and 56 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ Implements merit based sortition for top N inferers, forecasters, and reputers.
### Fixed

* [#544](https://github.com/allora-network/allora-chain/pull/544) Added check against zero-rewards after conversion to cosmosInt
* [#547](https://github.com/allora-network/allora-chain/pull/547) Improve error handling on InsertPayload, fixed/added tests err handling
* [#550](https://github.com/allora-network/allora-chain/pull/550) Fix reputer window upper limit


### Security

Expand Down
21 changes: 21 additions & 0 deletions x/emissions/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,12 @@ func (k *Keeper) GetForecastsAtBlock(ctx context.Context, topicId TopicId, block

// Append individual inference for a topic/block
func (k *Keeper) AppendInference(ctx context.Context, topicId TopicId, nonce types.Nonce, inference *types.Inference) error {
if inference == nil {
return errors.New("invalid inference: inferer is empty or nil")
}
if inference.Inferer == "" {
return errors.New("invalid inference: inferer is empty")
}
block := nonce.BlockHeight
moduleParams, err := k.GetParams(ctx)
if err != nil {
Expand Down Expand Up @@ -831,6 +837,12 @@ func (k *Keeper) InsertInferences(ctx context.Context, topicId TopicId, nonce ty

// Append individual forecast for a topic/block
func (k *Keeper) AppendForecast(ctx context.Context, topicId TopicId, nonce types.Nonce, forecast *types.Forecast) error {
if forecast == nil || forecast.Forecaster == "" {
return errors.New("invalid forecast: forecaster is empty or nil")
}
if len(forecast.ForecastElements) == 0 {
return errors.New("invalid forecast: forecast elements are empty")
}
block := nonce.BlockHeight
moduleParams, err := k.GetParams(ctx)
if err != nil {
Expand Down Expand Up @@ -921,6 +933,15 @@ func (k *Keeper) DeleteTopicRewardNonce(ctx context.Context, topicId TopicId) er

// Append loss bundle for a topoic and blockheight
func (k *Keeper) AppendReputerLoss(ctx context.Context, topicId TopicId, block BlockHeight, reputerLoss *types.ReputerValueBundle) error {
if reputerLoss == nil {
return errors.New("invalid reputerLoss bundle: inferer is empty or nil")
}
if reputerLoss.ValueBundle == nil {
return errors.New("reputerLoss bundle is nil")
}
if reputerLoss.ValueBundle.Reputer == "" {
return errors.New("invalid reputerLoss bundle: reputer is empty")
}
moduleParams, err := k.GetParams(ctx)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion x/emissions/keeper/msgserver/msg_server_reputer_payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ func (ms msgServer) InsertReputerPayload(ctx context.Context, msg *types.MsgInse
}

// Check if the ground truth lag has passed: if blockheight > nonce.BlockHeight + topic.GroundTruthLag
if blockHeight < nonce.ReputerNonce.BlockHeight+topic.GroundTruthLag {
if blockHeight < nonce.ReputerNonce.BlockHeight+topic.GroundTruthLag ||
blockHeight > nonce.ReputerNonce.BlockHeight+topic.GroundTruthLag*2 {
return nil, types.ErrReputerNonceWindowNotAvailable
}

Expand Down
44 changes: 29 additions & 15 deletions x/emissions/keeper/msgserver/msg_server_reputer_payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
)

const block = types.BlockHeight(1)

func (s *MsgServerTestSuite) setUpMsgReputerPayload(
reputerAddr sdk.AccAddress,
workerAddr sdk.AccAddress,
block types.BlockHeight,
) (
reputerValueBundle types.ValueBundle,
expectedInferences types.Inferences,
expectedForecasts types.Forecasts,
topicId uint64,
reputerNonce types.Nonce,
workerNonce types.Nonce,
) {
ctx, msgServer := s.ctx, s.msgServer
require := s.Require()
Expand All @@ -43,18 +42,15 @@ func (s *MsgServerTestSuite) setUpMsgReputerPayload(
_, err = msgServer.AddStake(ctx, addStakeMsg)
s.Require().NoError(err)

reputerNonce = types.Nonce{
BlockHeight: block,
}
workerNonce = types.Nonce{
workerNonce := types.Nonce{
BlockHeight: block,
}

err = keeper.AddWorkerNonce(ctx, topicId, &workerNonce)
require.NoError(err)
_, err = keeper.FulfillWorkerNonce(ctx, topicId, &workerNonce)
require.NoError(err)
err = keeper.AddReputerNonce(ctx, topicId, &reputerNonce)
err = keeper.AddReputerNonce(ctx, topicId, &workerNonce)
require.NoError(err)

// add in inference and forecast data
Expand Down Expand Up @@ -107,11 +103,11 @@ func (s *MsgServerTestSuite) setUpMsgReputerPayload(
},
},
ReputerRequestNonce: &types.ReputerRequestNonce{
ReputerNonce: &reputerNonce,
ReputerNonce: &workerNonce,
},
}

return reputerValueBundle, expectedInferences, expectedForecasts, topicId, reputerNonce, workerNonce
return reputerValueBundle, expectedInferences, expectedForecasts, topicId
}

func (s *MsgServerTestSuite) signValueBundle(reputerValueBundle *types.ValueBundle, privateKey secp256k1.PrivKey) []byte {
Expand Down Expand Up @@ -149,21 +145,19 @@ func (s *MsgServerTestSuite) constructAndInsertReputerPayload(
return err
}

func (s *MsgServerTestSuite) TestMsgInsertReputerPayload() {
func (s *MsgServerTestSuite) TestMsgInsertReputerPayloadFailsEarlyWindow() {
ctx := s.ctx
require := s.Require()
keeper := s.emissionsKeeper

block := types.BlockHeight(1)

reputerPrivateKey := secp256k1.GenPrivKey()
reputerPublicKeyBytes := reputerPrivateKey.PubKey().Bytes()
reputerAddr := sdk.AccAddress(reputerPrivateKey.PubKey().Address())

workerPrivateKey := secp256k1.GenPrivKey()
workerAddr := sdk.AccAddress(workerPrivateKey.PubKey().Address())

reputerValueBundle, expectedInferences, expectedForecasts, topicId, _, _ := s.setUpMsgReputerPayload(reputerAddr, workerAddr, block)
reputerValueBundle, expectedInferences, expectedForecasts, topicId := s.setUpMsgReputerPayload(reputerAddr, workerAddr)

err := keeper.InsertForecasts(ctx, topicId, types.Nonce{BlockHeight: block}, expectedForecasts)
require.NoError(err)
Expand All @@ -174,9 +168,29 @@ func (s *MsgServerTestSuite) TestMsgInsertReputerPayload() {
topic, err := s.emissionsKeeper.GetTopic(s.ctx, topicId)
s.Require().NoError(err)

newBlockheight := block + topic.GroundTruthLag
// Prior to the ground truth lag, should not allow reputer payload
newBlockheight := block + topic.GroundTruthLag - 1
s.ctx = sdk.UnwrapSDKContext(s.ctx).WithBlockHeight(newBlockheight)

err = s.constructAndInsertReputerPayload(reputerAddr, reputerPrivateKey, reputerPublicKeyBytes, &reputerValueBundle)
require.ErrorIs(err, types.ErrReputerNonceWindowNotAvailable)

// Valid reputer nonce window, start
newBlockheight = block + topic.GroundTruthLag
s.ctx = sdk.UnwrapSDKContext(s.ctx).WithBlockHeight(newBlockheight)

err = s.constructAndInsertReputerPayload(reputerAddr, reputerPrivateKey, reputerPublicKeyBytes, &reputerValueBundle)
require.NoError(err)

// Valid reputer nonce window, end
newBlockheight = block + topic.GroundTruthLag*2
s.ctx = sdk.UnwrapSDKContext(s.ctx).WithBlockHeight(newBlockheight)
err = s.constructAndInsertReputerPayload(reputerAddr, reputerPrivateKey, reputerPublicKeyBytes, &reputerValueBundle)
require.NoError(err)

// Valid reputer nonce window, end
newBlockheight = block + topic.GroundTruthLag*2 + 1
s.ctx = sdk.UnwrapSDKContext(s.ctx).WithBlockHeight(newBlockheight)
err = s.constructAndInsertReputerPayload(reputerAddr, reputerPrivateKey, reputerPublicKeyBytes, &reputerValueBundle)
require.ErrorIs(err, types.ErrReputerNonceWindowNotAvailable)
}
43 changes: 25 additions & 18 deletions x/emissions/keeper/msgserver/msg_server_worker_payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,47 +71,51 @@ func (ms msgServer) InsertWorkerPayload(ctx context.Context, msg *types.MsgInser
return nil, err
}

// Inferences
if msg.WorkerDataBundle.InferenceForecastsBundle.Inference != nil {
inference := msg.WorkerDataBundle.InferenceForecastsBundle.Inference
if inference == nil {
return nil, errorsmod.Wrapf(err, "Inference not found")
return nil, errorsmod.Wrapf(types.ErrNoValidInferences, "Inference not found")
}
if inference.TopicId != msg.WorkerDataBundle.TopicId {
return nil, errorsmod.Wrapf(err,
"Error inferer not use same topic")
return nil, errorsmod.Wrapf(types.ErrInvalidTopicId,
"inferer not using the same topic as bundle")
}
isInfererRegistered, err := ms.k.IsWorkerRegisteredInTopic(ctx, topicId, inference.Inferer)
if err != nil {
return nil, errorsmod.Wrapf(err,
"Error inferer address is not registered in this topic")
"error checking if inferer address is registered in this topic")
}
if !isInfererRegistered {
return nil, errorsmod.Wrapf(err,
"Error inferer address is not registered in this topic")
return nil, errorsmod.Wrapf(types.ErrAddressNotRegistered,
"inferer address is not registered in this topic")
}
err = ms.k.AppendInference(ctx, topicId, *nonce, inference)
if err != nil {
return nil, errorsmod.Wrapf(err, "Error appending inference")
}
}

// Append this individual inference to all inferences
// Forecasts
if msg.WorkerDataBundle.InferenceForecastsBundle.Forecast != nil {
forecast := msg.WorkerDataBundle.InferenceForecastsBundle.Forecast
if len(forecast.ForecastElements) == 0 {
return nil, errorsmod.Wrapf(types.ErrNoValidForecastElements, "No valid forecast elements found in Forecast")
}
if forecast.TopicId != msg.WorkerDataBundle.TopicId {
return nil, errorsmod.Wrapf(err,
"Error forecaster not use same topic")
return nil, errorsmod.Wrapf(types.ErrInvalidTopicId, "forecaster not using the same topic as bundle")
}
isForecasterRegistered, err := ms.k.IsWorkerRegisteredInTopic(ctx, topicId, forecast.Forecaster)
if err != nil {
return nil, errorsmod.Wrapf(err,
"Error forecaster address is not registered in this topic")
"error checking if forecaster address is registered in this topic")
}
if !isForecasterRegistered {
return nil, errorsmod.Wrapf(err,
"Error forecaster address is not registered in this topic")
return nil, errorsmod.Wrapf(types.ErrAddressNotRegistered,
"forecaster address is not registered in this topic")
}

// LImit forecast elements for top inferers
latestScoresForForecastedInferers := make([]types.Score, 0)
for _, el := range forecast.ForecastElements {
score, err := ms.k.GetLatestInfererScore(ctx, forecast.TopicId, el.Inferer)
Expand All @@ -132,7 +136,7 @@ func (ms msgServer) InsertWorkerPayload(ctx context.Context, msg *types.MsgInser
forecast.BlockHeight,
)

// Remove duplicate forecast element
// Remove duplicate forecast elements
acceptedForecastElements := make([]*types.ForecastElement, 0)
seenInferers := make(map[string]bool)
for _, el := range forecast.ForecastElements {
Expand All @@ -143,11 +147,14 @@ func (ms msgServer) InsertWorkerPayload(ctx context.Context, msg *types.MsgInser
seenInferers[el.Inferer] = true
}
}
forecast.ForecastElements = acceptedForecastElements
err = ms.k.AppendForecast(ctx, topicId, *nonce, forecast)
if err != nil {
return nil, errorsmod.Wrapf(err,
"Error appending forecast")

if len(acceptedForecastElements) > 0 {
forecast.ForecastElements = acceptedForecastElements
err = ms.k.AppendForecast(ctx, topicId, *nonce, forecast)
if err != nil {
return nil, errorsmod.Wrapf(err,
"Error appending forecast")
}
}
}
return &types.MsgInsertWorkerPayloadResponse{}, nil
Expand Down
Loading

0 comments on commit 1224a96

Please sign in to comment.