diff --git a/CHANGELOG.md b/CHANGELOG.md index f735e6013..9549c17f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,9 @@ Implements merit based sortition for top N inferers, forecasters, and reputers. ### Fixed * [#544](https://github.com/allora-network/allora-chain/pull/544) Added check against zero-rewards after conversion to cosmosInt +* [#547](https://github.com/allora-network/allora-chain/pull/547) Improve error handling on InsertPayload, fixed/added tests err handling +* [#550](https://github.com/allora-network/allora-chain/pull/550) Fix reputer window upper limit + ### Security diff --git a/x/emissions/keeper/keeper.go b/x/emissions/keeper/keeper.go index e09e174df..d8a8209d7 100644 --- a/x/emissions/keeper/keeper.go +++ b/x/emissions/keeper/keeper.go @@ -774,6 +774,12 @@ func (k *Keeper) GetForecastsAtBlock(ctx context.Context, topicId TopicId, block // Append individual inference for a topic/block func (k *Keeper) AppendInference(ctx context.Context, topicId TopicId, nonce types.Nonce, inference *types.Inference) error { + if inference == nil { + return errors.New("invalid inference: inferer is empty or nil") + } + if inference.Inferer == "" { + return errors.New("invalid inference: inferer is empty") + } block := nonce.BlockHeight moduleParams, err := k.GetParams(ctx) if err != nil { @@ -831,6 +837,12 @@ func (k *Keeper) InsertInferences(ctx context.Context, topicId TopicId, nonce ty // Append individual forecast for a topic/block func (k *Keeper) AppendForecast(ctx context.Context, topicId TopicId, nonce types.Nonce, forecast *types.Forecast) error { + if forecast == nil || forecast.Forecaster == "" { + return errors.New("invalid forecast: forecaster is empty or nil") + } + if len(forecast.ForecastElements) == 0 { + return errors.New("invalid forecast: forecast elements are empty") + } block := nonce.BlockHeight moduleParams, err := k.GetParams(ctx) if err != nil { @@ -921,6 +933,15 @@ func (k *Keeper) DeleteTopicRewardNonce(ctx context.Context, topicId TopicId) er // Append loss bundle for a topoic and blockheight func (k *Keeper) AppendReputerLoss(ctx context.Context, topicId TopicId, block BlockHeight, reputerLoss *types.ReputerValueBundle) error { + if reputerLoss == nil { + return errors.New("invalid reputerLoss bundle: inferer is empty or nil") + } + if reputerLoss.ValueBundle == nil { + return errors.New("reputerLoss bundle is nil") + } + if reputerLoss.ValueBundle.Reputer == "" { + return errors.New("invalid reputerLoss bundle: reputer is empty") + } moduleParams, err := k.GetParams(ctx) if err != nil { return err diff --git a/x/emissions/keeper/msgserver/msg_server_reputer_payload.go b/x/emissions/keeper/msgserver/msg_server_reputer_payload.go index 2b51471ad..70166d229 100644 --- a/x/emissions/keeper/msgserver/msg_server_reputer_payload.go +++ b/x/emissions/keeper/msgserver/msg_server_reputer_payload.go @@ -62,7 +62,8 @@ func (ms msgServer) InsertReputerPayload(ctx context.Context, msg *types.MsgInse } // Check if the ground truth lag has passed: if blockheight > nonce.BlockHeight + topic.GroundTruthLag - if blockHeight < nonce.ReputerNonce.BlockHeight+topic.GroundTruthLag { + if blockHeight < nonce.ReputerNonce.BlockHeight+topic.GroundTruthLag || + blockHeight > nonce.ReputerNonce.BlockHeight+topic.GroundTruthLag*2 { return nil, types.ErrReputerNonceWindowNotAvailable } diff --git a/x/emissions/keeper/msgserver/msg_server_reputer_payload_test.go b/x/emissions/keeper/msgserver/msg_server_reputer_payload_test.go index 2b6b95238..1699c73c0 100644 --- a/x/emissions/keeper/msgserver/msg_server_reputer_payload_test.go +++ b/x/emissions/keeper/msgserver/msg_server_reputer_payload_test.go @@ -10,17 +10,16 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" ) +const block = types.BlockHeight(1) + func (s *MsgServerTestSuite) setUpMsgReputerPayload( reputerAddr sdk.AccAddress, workerAddr sdk.AccAddress, - block types.BlockHeight, ) ( reputerValueBundle types.ValueBundle, expectedInferences types.Inferences, expectedForecasts types.Forecasts, topicId uint64, - reputerNonce types.Nonce, - workerNonce types.Nonce, ) { ctx, msgServer := s.ctx, s.msgServer require := s.Require() @@ -43,10 +42,7 @@ func (s *MsgServerTestSuite) setUpMsgReputerPayload( _, err = msgServer.AddStake(ctx, addStakeMsg) s.Require().NoError(err) - reputerNonce = types.Nonce{ - BlockHeight: block, - } - workerNonce = types.Nonce{ + workerNonce := types.Nonce{ BlockHeight: block, } @@ -54,7 +50,7 @@ func (s *MsgServerTestSuite) setUpMsgReputerPayload( require.NoError(err) _, err = keeper.FulfillWorkerNonce(ctx, topicId, &workerNonce) require.NoError(err) - err = keeper.AddReputerNonce(ctx, topicId, &reputerNonce) + err = keeper.AddReputerNonce(ctx, topicId, &workerNonce) require.NoError(err) // add in inference and forecast data @@ -107,11 +103,11 @@ func (s *MsgServerTestSuite) setUpMsgReputerPayload( }, }, ReputerRequestNonce: &types.ReputerRequestNonce{ - ReputerNonce: &reputerNonce, + ReputerNonce: &workerNonce, }, } - return reputerValueBundle, expectedInferences, expectedForecasts, topicId, reputerNonce, workerNonce + return reputerValueBundle, expectedInferences, expectedForecasts, topicId } func (s *MsgServerTestSuite) signValueBundle(reputerValueBundle *types.ValueBundle, privateKey secp256k1.PrivKey) []byte { @@ -149,13 +145,11 @@ func (s *MsgServerTestSuite) constructAndInsertReputerPayload( return err } -func (s *MsgServerTestSuite) TestMsgInsertReputerPayload() { +func (s *MsgServerTestSuite) TestMsgInsertReputerPayloadFailsEarlyWindow() { ctx := s.ctx require := s.Require() keeper := s.emissionsKeeper - block := types.BlockHeight(1) - reputerPrivateKey := secp256k1.GenPrivKey() reputerPublicKeyBytes := reputerPrivateKey.PubKey().Bytes() reputerAddr := sdk.AccAddress(reputerPrivateKey.PubKey().Address()) @@ -163,7 +157,7 @@ func (s *MsgServerTestSuite) TestMsgInsertReputerPayload() { workerPrivateKey := secp256k1.GenPrivKey() workerAddr := sdk.AccAddress(workerPrivateKey.PubKey().Address()) - reputerValueBundle, expectedInferences, expectedForecasts, topicId, _, _ := s.setUpMsgReputerPayload(reputerAddr, workerAddr, block) + reputerValueBundle, expectedInferences, expectedForecasts, topicId := s.setUpMsgReputerPayload(reputerAddr, workerAddr) err := keeper.InsertForecasts(ctx, topicId, types.Nonce{BlockHeight: block}, expectedForecasts) require.NoError(err) @@ -174,9 +168,29 @@ func (s *MsgServerTestSuite) TestMsgInsertReputerPayload() { topic, err := s.emissionsKeeper.GetTopic(s.ctx, topicId) s.Require().NoError(err) - newBlockheight := block + topic.GroundTruthLag + // Prior to the ground truth lag, should not allow reputer payload + newBlockheight := block + topic.GroundTruthLag - 1 + s.ctx = sdk.UnwrapSDKContext(s.ctx).WithBlockHeight(newBlockheight) + + err = s.constructAndInsertReputerPayload(reputerAddr, reputerPrivateKey, reputerPublicKeyBytes, &reputerValueBundle) + require.ErrorIs(err, types.ErrReputerNonceWindowNotAvailable) + + // Valid reputer nonce window, start + newBlockheight = block + topic.GroundTruthLag s.ctx = sdk.UnwrapSDKContext(s.ctx).WithBlockHeight(newBlockheight) err = s.constructAndInsertReputerPayload(reputerAddr, reputerPrivateKey, reputerPublicKeyBytes, &reputerValueBundle) require.NoError(err) + + // Valid reputer nonce window, end + newBlockheight = block + topic.GroundTruthLag*2 + s.ctx = sdk.UnwrapSDKContext(s.ctx).WithBlockHeight(newBlockheight) + err = s.constructAndInsertReputerPayload(reputerAddr, reputerPrivateKey, reputerPublicKeyBytes, &reputerValueBundle) + require.NoError(err) + + // Valid reputer nonce window, end + newBlockheight = block + topic.GroundTruthLag*2 + 1 + s.ctx = sdk.UnwrapSDKContext(s.ctx).WithBlockHeight(newBlockheight) + err = s.constructAndInsertReputerPayload(reputerAddr, reputerPrivateKey, reputerPublicKeyBytes, &reputerValueBundle) + require.ErrorIs(err, types.ErrReputerNonceWindowNotAvailable) } diff --git a/x/emissions/keeper/msgserver/msg_server_worker_payload.go b/x/emissions/keeper/msgserver/msg_server_worker_payload.go index 31143867d..4247d408f 100644 --- a/x/emissions/keeper/msgserver/msg_server_worker_payload.go +++ b/x/emissions/keeper/msgserver/msg_server_worker_payload.go @@ -71,23 +71,24 @@ func (ms msgServer) InsertWorkerPayload(ctx context.Context, msg *types.MsgInser return nil, err } + // Inferences if msg.WorkerDataBundle.InferenceForecastsBundle.Inference != nil { inference := msg.WorkerDataBundle.InferenceForecastsBundle.Inference if inference == nil { - return nil, errorsmod.Wrapf(err, "Inference not found") + return nil, errorsmod.Wrapf(types.ErrNoValidInferences, "Inference not found") } if inference.TopicId != msg.WorkerDataBundle.TopicId { - return nil, errorsmod.Wrapf(err, - "Error inferer not use same topic") + return nil, errorsmod.Wrapf(types.ErrInvalidTopicId, + "inferer not using the same topic as bundle") } isInfererRegistered, err := ms.k.IsWorkerRegisteredInTopic(ctx, topicId, inference.Inferer) if err != nil { return nil, errorsmod.Wrapf(err, - "Error inferer address is not registered in this topic") + "error checking if inferer address is registered in this topic") } if !isInfererRegistered { - return nil, errorsmod.Wrapf(err, - "Error inferer address is not registered in this topic") + return nil, errorsmod.Wrapf(types.ErrAddressNotRegistered, + "inferer address is not registered in this topic") } err = ms.k.AppendInference(ctx, topicId, *nonce, inference) if err != nil { @@ -95,23 +96,26 @@ func (ms msgServer) InsertWorkerPayload(ctx context.Context, msg *types.MsgInser } } - // Append this individual inference to all inferences + // Forecasts if msg.WorkerDataBundle.InferenceForecastsBundle.Forecast != nil { forecast := msg.WorkerDataBundle.InferenceForecastsBundle.Forecast + if len(forecast.ForecastElements) == 0 { + return nil, errorsmod.Wrapf(types.ErrNoValidForecastElements, "No valid forecast elements found in Forecast") + } if forecast.TopicId != msg.WorkerDataBundle.TopicId { - return nil, errorsmod.Wrapf(err, - "Error forecaster not use same topic") + return nil, errorsmod.Wrapf(types.ErrInvalidTopicId, "forecaster not using the same topic as bundle") } isForecasterRegistered, err := ms.k.IsWorkerRegisteredInTopic(ctx, topicId, forecast.Forecaster) if err != nil { return nil, errorsmod.Wrapf(err, - "Error forecaster address is not registered in this topic") + "error checking if forecaster address is registered in this topic") } if !isForecasterRegistered { - return nil, errorsmod.Wrapf(err, - "Error forecaster address is not registered in this topic") + return nil, errorsmod.Wrapf(types.ErrAddressNotRegistered, + "forecaster address is not registered in this topic") } + // LImit forecast elements for top inferers latestScoresForForecastedInferers := make([]types.Score, 0) for _, el := range forecast.ForecastElements { score, err := ms.k.GetLatestInfererScore(ctx, forecast.TopicId, el.Inferer) @@ -132,7 +136,7 @@ func (ms msgServer) InsertWorkerPayload(ctx context.Context, msg *types.MsgInser forecast.BlockHeight, ) - // Remove duplicate forecast element + // Remove duplicate forecast elements acceptedForecastElements := make([]*types.ForecastElement, 0) seenInferers := make(map[string]bool) for _, el := range forecast.ForecastElements { @@ -143,11 +147,14 @@ func (ms msgServer) InsertWorkerPayload(ctx context.Context, msg *types.MsgInser seenInferers[el.Inferer] = true } } - forecast.ForecastElements = acceptedForecastElements - err = ms.k.AppendForecast(ctx, topicId, *nonce, forecast) - if err != nil { - return nil, errorsmod.Wrapf(err, - "Error appending forecast") + + if len(acceptedForecastElements) > 0 { + forecast.ForecastElements = acceptedForecastElements + err = ms.k.AppendForecast(ctx, topicId, *nonce, forecast) + if err != nil { + return nil, errorsmod.Wrapf(err, + "Error appending forecast") + } } } return &types.MsgInsertWorkerPayloadResponse{}, nil diff --git a/x/emissions/keeper/msgserver/msg_server_worker_payload_test.go b/x/emissions/keeper/msgserver/msg_server_worker_payload_test.go index b6986c8a9..11f1450a3 100644 --- a/x/emissions/keeper/msgserver/msg_server_worker_payload_test.go +++ b/x/emissions/keeper/msgserver/msg_server_worker_payload_test.go @@ -141,23 +141,70 @@ func (s *MsgServerTestSuite) TestMsgInsertWorkerPayload() { require.Equal(forecastsCount1, 1) } -func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadFailsWithNilInference() { +func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadNotFailsWithNilInference() { ctx, msgServer := s.ctx, s.msgServer require := s.Require() workerPrivateKey := secp256k1.GenPrivKey() - workerMsg, _ := s.setUpMsgInsertWorkerPayload(workerPrivateKey) + workerMsg, topicId := s.setUpMsgInsertWorkerPayload(workerPrivateKey) + + workerMsg.WorkerDataBundle.InferenceForecastsBundle.Inference = nil + workerMsg = s.signMsgInsertWorkerPayload(workerMsg, workerPrivateKey) + + blockHeight := workerMsg.WorkerDataBundle.InferenceForecastsBundle.Forecast.BlockHeight + ctx = ctx.WithBlockHeight(blockHeight) + + _, err := msgServer.InsertWorkerPayload(ctx, &workerMsg) + require.NoError(err) + + forecasts, err := s.emissionsKeeper.GetForecastsAtBlock(ctx, topicId, blockHeight) + require.NoError(err) + require.Equal(len(forecasts.Forecasts[0].ForecastElements), 4) +} + +func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadNotFailsWithNilForecast() { + ctx, msgServer := s.ctx, s.msgServer + require := s.Require() + + workerPrivateKey := secp256k1.GenPrivKey() + workerMsg, topicId := s.setUpMsgInsertWorkerPayload(workerPrivateKey) + + workerMsg.WorkerDataBundle.InferenceForecastsBundle.Forecast = nil workerMsg = s.signMsgInsertWorkerPayload(workerMsg, workerPrivateKey) + blockHeight := workerMsg.WorkerDataBundle.InferenceForecastsBundle.Inference.BlockHeight + ctx = ctx.WithBlockHeight(blockHeight) + + _, err := msgServer.InsertWorkerPayload(ctx, &workerMsg) + require.NoError(err) + + inferences, err := s.emissionsKeeper.GetInferencesAtBlock(ctx, topicId, blockHeight) + require.NoError(err) + require.Equal(len(inferences.Inferences), 1) +} + +func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadFailsWithNilInferenceAndForecast() { + ctx, msgServer := s.ctx, s.msgServer + require := s.Require() + + workerPrivateKey := secp256k1.GenPrivKey() + + workerMsg, _ := s.setUpMsgInsertWorkerPayload(workerPrivateKey) + blockHeight := workerMsg.WorkerDataBundle.InferenceForecastsBundle.Forecast.BlockHeight + ctx = ctx.WithBlockHeight(blockHeight) + // BEGIN MODIFICATION workerMsg.WorkerDataBundle.InferenceForecastsBundle.Inference = nil + workerMsg.WorkerDataBundle.InferenceForecastsBundle.Forecast = nil + workerMsg = s.signMsgInsertWorkerPayload(workerMsg, workerPrivateKey) + // END MODIFICATION _, err := msgServer.InsertWorkerPayload(ctx, &workerMsg) - require.Error(err) + require.ErrorIs(err, types.ErrInvalidWorkerData) } -func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadFailsWithoutWorkerDataBundle() { +func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadFailsWithoutSignature() { ctx, msgServer := s.ctx, s.msgServer require := s.Require() @@ -170,7 +217,7 @@ func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadFailsWithoutWorkerDataBun // END MODIFICATION _, err := msgServer.InsertWorkerPayload(ctx, &workerMsg) - require.Error(err) + require.ErrorIs(err, types.ErrInvalidWorkerData) } func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadFailsWithMismatchedTopicId() { @@ -182,13 +229,15 @@ func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadFailsWithMismatchedTopicI workerMsg, _ := s.setUpMsgInsertWorkerPayload(workerPrivateKey) // BEGIN MODIFICATION - workerMsg.WorkerDataBundle.InferenceForecastsBundle.Inference.TopicId = 1 + workerMsg.WorkerDataBundle.InferenceForecastsBundle.Inference.TopicId = 123 // END MODIFICATION + blockHeight := workerMsg.WorkerDataBundle.InferenceForecastsBundle.Inference.BlockHeight + ctx = ctx.WithBlockHeight(blockHeight) workerMsg = s.signMsgInsertWorkerPayload(workerMsg, workerPrivateKey) _, err := msgServer.InsertWorkerPayload(ctx, &workerMsg) - require.Error(err, types.ErrNoValidBundles) + require.ErrorIs(err, types.ErrInvalidTopicId) } func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadFailsWithUnregisteredInferer() { @@ -212,11 +261,12 @@ func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadFailsWithUnregisteredInfe require.NoError(err) // END MODIFICATION - workerMsg = s.signMsgInsertWorkerPayload(workerMsg, workerPrivateKey) + blockHeight := workerMsg.WorkerDataBundle.InferenceForecastsBundle.Inference.BlockHeight + ctx = ctx.WithBlockHeight(blockHeight) _, err = msgServer.InsertWorkerPayload(ctx, &workerMsg) - require.Error(err, types.ErrNoValidBundles) + require.ErrorIs(err, types.ErrAddressNotRegistered) } func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadWithFewTopElementsPerForecast() { @@ -306,15 +356,18 @@ func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadFailsWithMismatchedForeca blockHeight := workerMsg.WorkerDataBundle.InferenceForecastsBundle.Forecast.BlockHeight forecastsCount0 := s.getCountForecastsAtBlock(originalTopicId, blockHeight) + require.Equal(forecastsCount0, 0) ctx = ctx.WithBlockHeight(blockHeight) _, err := msgServer.InsertWorkerPayload(ctx, &workerMsg) - require.NoError(err) + require.ErrorIs(err, types.ErrInvalidTopicId) forecastsCount1 := s.getCountForecastsAtBlock(originalTopicId, blockHeight) - - require.Equal(forecastsCount0, 0) require.Equal(forecastsCount1, 0) + + // Also not added on the changed topicId + forecastsCountNew := s.getCountForecastsAtBlock(123, blockHeight) + require.Equal(forecastsCountNew, 0) } func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadFailsWithUnregisteredForecaster() { @@ -342,12 +395,15 @@ func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadFailsWithUnregisteredFore blockHeight := workerMsg.WorkerDataBundle.InferenceForecastsBundle.Forecast.BlockHeight forecastsCount0 := s.getCountForecastsAtBlock(topicId, blockHeight) + require.Equal(forecastsCount0, 0) workerMsg = s.signMsgInsertWorkerPayload(workerMsg, workerPrivateKey) - forecastsCount1 := s.getCountForecastsAtBlock(topicId, blockHeight) + ctx = ctx.WithBlockHeight(blockHeight) + _, err = msgServer.InsertWorkerPayload(ctx, &workerMsg) + require.ErrorIs(err, types.ErrAddressNotRegistered) - require.Equal(forecastsCount0, 0) + forecastsCount1 := s.getCountForecastsAtBlock(topicId, blockHeight) require.Equal(forecastsCount1, 0) } @@ -403,7 +459,6 @@ func (s *MsgServerTestSuite) TestInsertingHugeBundleWorkerPayloadFails() { // Define sample OffchainNode information for a worker workerInfo := types.OffchainNode{ - Owner: "worker-owner-sample", NodeAddress: "worker-node-address-sample", } @@ -475,7 +530,7 @@ func (s *MsgServerTestSuite) TestInsertingHugeBundleWorkerPayloadFails() { workerMsg.WorkerDataBundle.InferencesForecastsBundleSignature = sig workerMsg.WorkerDataBundle.Pubkey = hex.EncodeToString(workerPublicKeyBytes) _, err = msgServer.InsertWorkerPayload(ctx, workerMsg) - require.Error(err, types.ErrQueryTooLarge) + require.ErrorIs(err, types.ErrQueryTooLarge) } func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadVerifyFailed() { @@ -487,7 +542,6 @@ func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadVerifyFailed() { // Define sample OffchainNode information for a worker workerInfo := types.OffchainNode{ - Owner: "worker-owner-sample", NodeAddress: "worker-node-address-sample", } @@ -556,5 +610,140 @@ func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadVerifyFailed() { } _, err = msgServer.InsertWorkerPayload(ctx, workerMsg) - require.Error(err, types.ErrNoValidBundles) + require.ErrorIs(err, types.ErrInvalidWorkerData) +} + +func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadWithLowScoreForecastsAreRejected() { + ctx, msgServer := s.ctx, s.msgServer + require := s.Require() + keeper := s.emissionsKeeper + + workerPrivateKey := secp256k1.GenPrivKey() + adminPrivateKey := secp256k1.GenPrivKey() + adminAddr := sdk.AccAddress(adminPrivateKey.PubKey().Address()) + _ = keeper.AddWhitelistAdmin(s.ctx, adminAddr.String()) + + newParams := &types.OptionalParams{ + MaxElementsPerForecast: []uint64{3}, + } + + updateMsg := &types.MsgUpdateParams{ + Sender: adminAddr.String(), + Params: newParams, + } + + _, err := s.msgServer.UpdateParams(s.ctx, updateMsg) + require.NoError(err, "UpdateParams should not return an error") + + workerMsg, topicId := s.setUpMsgInsertWorkerPayload(workerPrivateKey) + + workerMsg = s.signMsgInsertWorkerPayload(workerMsg, workerPrivateKey) + + blockHeight := workerMsg.WorkerDataBundle.InferenceForecastsBundle.Forecast.BlockHeight + ctx = ctx.WithBlockHeight(blockHeight) + + inferer1 := workerMsg.WorkerDataBundle.InferenceForecastsBundle.Forecast.ForecastElements[0].Inferer + inferer2 := workerMsg.WorkerDataBundle.InferenceForecastsBundle.Forecast.ForecastElements[1].Inferer + inferer3 := workerMsg.WorkerDataBundle.InferenceForecastsBundle.Forecast.ForecastElements[2].Inferer + inferer4 := workerMsg.WorkerDataBundle.InferenceForecastsBundle.Forecast.ForecastElements[3].Inferer + + score1 := types.Score{TopicId: topicId, BlockHeight: blockHeight, Address: inferer1, Score: alloraMath.NewDecFromInt64(95)} + score2 := types.Score{TopicId: topicId, BlockHeight: blockHeight, Address: inferer2, Score: alloraMath.NewDecFromInt64(90)} + score3 := types.Score{TopicId: topicId, BlockHeight: blockHeight, Address: inferer3, Score: alloraMath.NewDecFromInt64(80)} + score4 := types.Score{TopicId: topicId, BlockHeight: blockHeight, Address: inferer4, Score: alloraMath.NewDecFromInt64(50)} + + _ = keeper.SetLatestInfererScore(ctx, topicId, inferer1, score1) + _ = keeper.SetLatestInfererScore(ctx, topicId, inferer2, score2) + _ = keeper.SetLatestInfererScore(ctx, topicId, inferer3, score3) + _ = keeper.SetLatestInfererScore(ctx, topicId, inferer4, score4) + + _, err = msgServer.InsertWorkerPayload(ctx, &workerMsg) + require.NoError(err, "InsertWorkerPayload should not return an error even if the forecast elements are below the threshold") + + forecastsCount0 := s.getCountForecastsAtBlock(topicId, blockHeight) + require.Equal(forecastsCount0, 1) + forecastsAtBlock, err := keeper.GetForecastsAtBlock(ctx, topicId, blockHeight) + require.NoError(err) + require.Equal(len(forecastsAtBlock.Forecasts[0].ForecastElements), 3) + require.Equal(forecastsAtBlock.Forecasts[0].ForecastElements[0].Inferer, inferer1) + require.Equal(forecastsAtBlock.Forecasts[0].ForecastElements[1].Inferer, inferer2) + require.Equal(forecastsAtBlock.Forecasts[0].ForecastElements[2].Inferer, inferer3) +} + +func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadWithInferencesRepeatedlyOverwritesPreviousValue() { + ctx, msgServer := s.ctx, s.msgServer + require := s.Require() + keeper := s.emissionsKeeper + + workerPrivateKey := secp256k1.GenPrivKey() + + workerMsg, topicId := s.setUpMsgInsertWorkerPayload(workerPrivateKey) + // BEGIN MODIFICATION + workerMsg.WorkerDataBundle.InferenceForecastsBundle.Inference.Value = alloraMath.NewDecFromInt64(100) + // END MODIFICATION + workerMsg = s.signMsgInsertWorkerPayload(workerMsg, workerPrivateKey) + + blockHeight := workerMsg.WorkerDataBundle.InferenceForecastsBundle.Inference.BlockHeight + ctx = ctx.WithBlockHeight(blockHeight) + + _, err := msgServer.InsertWorkerPayload(ctx, &workerMsg) + require.NoError(err, "InsertWorkerPayload should not return an error") + + inferences, err := keeper.GetInferencesAtBlock(ctx, topicId, blockHeight) + require.NoError(err) + require.Equal(len(inferences.Inferences), 1) + require.Equal(inferences.Inferences[0].Value, alloraMath.NewDecFromInt64(100)) + + // Repeat the same inference with a different inference value and check if it overwrites the previous value + // BEGIN MODIFICATION + workerMsg.WorkerDataBundle.InferenceForecastsBundle.Inference.Value = alloraMath.NewDecFromInt64(200) + // END MODIFICATION + workerMsg = s.signMsgInsertWorkerPayload(workerMsg, workerPrivateKey) + + _, err = msgServer.InsertWorkerPayload(ctx, &workerMsg) + require.NoError(err, "InsertWorkerPayload should not return an error") + + inferences, err = keeper.GetInferencesAtBlock(ctx, topicId, blockHeight) + require.NoError(err) + require.Equal(len(inferences.Inferences), 1) + require.Equal(inferences.Inferences[0].Value, alloraMath.NewDecFromInt64(200)) +} + +func (s *MsgServerTestSuite) TestMsgInsertWorkerPayloadWithForecastRepeatedlyOverwritesPreviousValue() { + ctx, msgServer := s.ctx, s.msgServer + require := s.Require() + keeper := s.emissionsKeeper + + workerPrivateKey := secp256k1.GenPrivKey() + + workerMsg, topicId := s.setUpMsgInsertWorkerPayload(workerPrivateKey) + // BEGIN MODIFICATION + workerMsg.WorkerDataBundle.InferenceForecastsBundle.Forecast.ForecastElements[0].Value = alloraMath.NewDecFromInt64(100) + // END MODIFICATION + workerMsg = s.signMsgInsertWorkerPayload(workerMsg, workerPrivateKey) + + blockHeight := workerMsg.WorkerDataBundle.InferenceForecastsBundle.Forecast.BlockHeight + ctx = ctx.WithBlockHeight(blockHeight) + + _, err := msgServer.InsertWorkerPayload(ctx, &workerMsg) + require.NoError(err, "InsertWorkerPayload should not return an error") + + forecasts, err := keeper.GetForecastsAtBlock(ctx, topicId, blockHeight) + require.NoError(err) + require.Equal(len(forecasts.Forecasts[0].ForecastElements), 4) + require.Equal(forecasts.Forecasts[0].ForecastElements[0].Value, alloraMath.NewDecFromInt64(100)) + + // Repeat the same forecast with a different forecast value and check if it overwrites the previous value + // BEGIN MODIFICATION + workerMsg.WorkerDataBundle.InferenceForecastsBundle.Forecast.ForecastElements[0].Value = alloraMath.NewDecFromInt64(200) + // END MODIFICATION + workerMsg = s.signMsgInsertWorkerPayload(workerMsg, workerPrivateKey) + + _, err = msgServer.InsertWorkerPayload(ctx, &workerMsg) + require.NoError(err, "InsertWorkerPayload should not return an error") + + forecasts, err = keeper.GetForecastsAtBlock(ctx, topicId, blockHeight) + require.NoError(err) + require.Equal(len(forecasts.Forecasts[0].ForecastElements), 4) + require.Equal(forecasts.Forecasts[0].ForecastElements[0].Value, alloraMath.NewDecFromInt64(200)) } diff --git a/x/emissions/module/rewards/rewards_test.go b/x/emissions/module/rewards/rewards_test.go index ea86b1dcc..4ca309223 100644 --- a/x/emissions/module/rewards/rewards_test.go +++ b/x/emissions/module/rewards/rewards_test.go @@ -1729,18 +1729,20 @@ func (s *RewardsTestSuite) TestRewardsHandleStandardDeviationOfZero() { topicId2 := res.TopicId // Register 5 workers, first 3 for topic 1 and last 2 for topic 2 - for i, addr := range workerAddrs { + for _, addr := range workerAddrs { workerRegMsg := &types.MsgRegister{ Sender: addr.String(), TopicId: topicId1, IsReputer: false, Owner: addr.String(), } - if i > 2 { - workerRegMsg.TopicId = topicId2 - } + // Full registration of all workers in both topics. _, err := s.msgServer.Register(s.ctx, workerRegMsg) s.Require().NoError(err) + + workerRegMsg.TopicId = topicId2 + _, err = s.msgServer.Register(s.ctx, workerRegMsg) + s.Require().NoError(err) } // Register 5 reputers, first 3 for topic 1 and last 2 for topic 2 diff --git a/x/emissions/types/errors.go b/x/emissions/types/errors.go index 51460ff5a..76f157efe 100644 --- a/x/emissions/types/errors.go +++ b/x/emissions/types/errors.go @@ -79,4 +79,5 @@ var ( ErrGroundTruthLagTooBig = errors.Register(ModuleName, 74, "ground truth lag too big for epoch length") ErrUnfulfilledNonceNotFound = errors.Register(ModuleName, 75, "unfulfilled nonce not found") ErrNotFound = errors.Register(ModuleName, 76, "not found") + ErrNoValidForecastElements = errors.Register(ModuleName, 77, "no valid forecast elements found") )