Skip to content

Commit

Permalink
feat(consensus): Make state to rely on IPFS for block retrieving inst…
Browse files Browse the repository at this point in the history
…ead of BlockParts
  • Loading branch information
Wondertan committed Apr 20, 2021
1 parent b01fcec commit d561809
Showing 1 changed file with 105 additions and 135 deletions.
240 changes: 105 additions & 135 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"reflect"
"runtime/debug"
"time"

"github.com/gogo/protobuf/proto"
ipfsapi "github.com/ipfs/interface-go-ipfs-core"

cfg "github.com/lazyledger/lazyledger-core/config"
cstypes "github.com/lazyledger/lazyledger-core/consensus/types"
"github.com/lazyledger/lazyledger-core/crypto"
Expand All @@ -25,6 +24,7 @@ import (
"github.com/lazyledger/lazyledger-core/libs/service"
tmsync "github.com/lazyledger/lazyledger-core/libs/sync"
"github.com/lazyledger/lazyledger-core/p2p"
"github.com/lazyledger/lazyledger-core/p2p/ipld"
tmproto "github.com/lazyledger/lazyledger-core/proto/tendermint/types"
sm "github.com/lazyledger/lazyledger-core/state"
"github.com/lazyledger/lazyledger-core/types"
Expand Down Expand Up @@ -782,24 +782,26 @@ func (cs *State) handleMsg(mi msgInfo) {
// will not cause transition.
// once proposal is set, we can receive block parts
err = cs.setProposal(msg.Proposal)
case *BlockPartMessage:
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
added, err = cs.addProposalBlockPart(msg, peerID)
if added {
cs.statsMsgQueue <- mi
}

if err != nil && msg.Round != cs.Round {
cs.Logger.Debug(
"Received block part from wrong round",
"height",
cs.Height,
"csRound",
cs.Round,
"blockRound",
msg.Round)
err = nil
}
// TODO: In case proposal or block failed - send msg to statsMsgQueue

// case *BlockPartMessage:
// // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
// added, err = cs.addProposalBlockPart(msg, peerID)
// if added {
// cs.statsMsgQueue <- mi
// }
//
// if err != nil && msg.Round != cs.Round {
// cs.Logger.Debug(
// "Received block part from wrong round",
// "height",
// cs.Height,
// "csRound",
// cs.Round,
// "blockRound",
// msg.Round)
// err = nil
// }
case *VoteMessage:
// attempt to add the vote and dupeout the validator if its a duplicate signature
// if the vote gives us a 2/3-any or 2/3-one, we transition
Expand Down Expand Up @@ -1064,20 +1066,7 @@ func (cs *State) isProposer(address []byte) bool {
}

func (cs *State) defaultDecideProposal(height int64, round int32) {
var block *types.Block
var blockParts *types.PartSet

// Decide on block
if cs.ValidBlock != nil {
// If there is valid block, choose that.
block, blockParts = cs.ValidBlock, cs.ValidBlockParts
} else {
// Create a new proposal block from state/txs from the mempool.
block = cs.createProposalBlock()
if block == nil {
return
}
}
block := cs.getProposalBlock()

// Flush the WAL. Otherwise, we may not recompute the same proposal to sign,
// and the privValidator will refuse to sign anything.
Expand All @@ -1086,41 +1075,25 @@ func (cs *State) defaultDecideProposal(height int64, round int32) {
}

// Make proposal
propBlockID := types.BlockID{Hash: block.Hash(), PartSetHeader: blockParts.Header()}
propBlockID := types.BlockID{Hash: block.Hash()}
proposal := types.NewProposal(height, round, cs.ValidRound, propBlockID, &block.DataAvailabilityHeader)
p, err := proposal.ToProto()
if err != nil {
cs.Logger.Error(fmt.Sprintf("can't serialize proposal: %s", err.Error()))
return
}

if err := cs.privValidator.SignProposal(cs.state.ChainID, p); err == nil {
proposal.Signature = p.Signature

// send proposal and block parts on internal msg queue
cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""})
for i := 0; i < int(blockParts.Total()); i++ {
part := blockParts.GetPart(i)
cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""})
}
cs.Logger.Info("Signed proposal", "height", height, "round", round, "proposal", proposal)
cs.Logger.Debug(fmt.Sprintf("Signed proposal block: %v", block))
} else if !cs.replayMode {
err = cs.privValidator.SignProposal(cs.state.ChainID, p)
if err != nil && !cs.replayMode {
cs.Logger.Error("enterPropose: Error signing proposal", "height", height, "round", round, "err", err)
return
}
proposal.Signature = p.Signature
cs.Logger.Info("Signed proposal", "height", height, "round", round, "proposal", proposal)
cs.Logger.Debug(fmt.Sprintf("Signed proposal block: %v", block))

// post data to ipfs
// TODO(evan): don't hard code context and timeout
if cs.IpfsAPI != nil {
// longer timeouts result in block proposers failing to propose blocks in time.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*1500)
defer cancel()
// TODO: post data to IPFS in a goroutine
err := block.PutBlock(ctx, cs.IpfsAPI.Dag())
if err != nil {
cs.Logger.Error(fmt.Sprintf("failure to post block data to IPFS: %s", err.Error()))
}
}
// send proposal on internal msg queue
cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""})
}

// Returns true if the proposal block is complete &&
Expand All @@ -1139,6 +1112,15 @@ func (cs *State) isProposalComplete() bool {

}

// gets block proposed by ourselves
func (cs *State) getProposalBlock() *types.Block {
if cs.ValidBlock != nil { // if there is already valid one
return cs.ValidBlock // return it
}

return cs.createProposalBlock() // otherwise, create one
}

// Create the next block to propose and return it. Returns nil block upon error.
//
// We really only need to return the parts, but the block is returned for
Expand Down Expand Up @@ -1173,7 +1155,16 @@ func (cs *State) createProposalBlock() *types.Block {
}
proposerAddr := cs.privValidatorPubKey.Address()

return cs.blockExec.CreateProposalBlock(cs.Height, cs.state, commit, proposerAddr)
block := cs.blockExec.CreateProposalBlock(cs.Height, cs.state, commit, proposerAddr)

// TODO: Handle context properly
err := block.PutBlock(context.TODO(), cs.IpfsAPI.Dag())
if err != nil {
cs.Logger.Error(fmt.Sprintf("failure to post block data to IPFS: %s", err.Error()))
return nil
}

return block
}

// Enter: `timeoutPropose` after entering Propose.
Expand Down Expand Up @@ -1776,94 +1767,73 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal) error {
cs.ProposalBlockParts = types.NewPartSetFromHeader(proposal.BlockID.PartSetHeader)
}
cs.Logger.Info("Received proposal", "proposal", proposal)
return nil
}

// NOTE: block is not necessarily valid.
// Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit,
// once we have the full block.
func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (added bool, err error) {
height, round, part := msg.Height, msg.Round, msg.Part

// Blocks might be reused, so round mismatch is OK
if cs.Height != height {
cs.Logger.Debug("Received block part from wrong height", "height", height, "round", round)
return false, nil
}

// We're not expecting a block part.
if cs.ProposalBlockParts == nil {
// NOTE: this can happen when we've gone to a higher round and
// then receive parts from the previous round - not necessarily a bad peer.
cs.Logger.Info("Received a block part when we're not expecting any",
"height", height, "round", round, "index", part.Index, "peer", peerID)
return false, nil
}
return cs.addProposalBlock()
}

added, err = cs.ProposalBlockParts.AddPart(part)
if err != nil {
return added, err
}
func (cs *State) addProposalBlock() error {
if cs.ProposalBlockParts.ByteSize() > cs.state.ConsensusParams.Block.MaxBytes {
return added, fmt.Errorf("total size of proposal block parts exceeds maximum block bytes (%d > %d)",
return fmt.Errorf("total size of proposal block parts exceeds maximum block bytes (%d > %d)",
cs.ProposalBlockParts.ByteSize(), cs.state.ConsensusParams.Block.MaxBytes,
)
}
if added && cs.ProposalBlockParts.IsComplete() {
bz, err := ioutil.ReadAll(cs.ProposalBlockParts.GetReader())
if err != nil {
return added, err
}

var pbb = new(tmproto.Block)
err = proto.Unmarshal(bz, pbb)
if err != nil {
return added, err
}
// TODO: Handle contexts properly
// TODO: Define proper timeout inside the ipld package.
ctx, cancel := context.WithTimeout(context.TODO(), time.Second * 10)
defer cancel()

block, err := types.BlockFromProto(pbb)
if err != nil {
return added, err
}
// TODO: In case we propose, do not retreive the block, but access from some cashed
// TODO: In case we are a light client, do not retrieve the block, but validate it's availability

cs.ProposalBlock = block
// NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal
cs.Logger.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash())
if err := cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent()); err != nil {
cs.Logger.Error("Error publishing event complete proposal", "err", err)
}
// TODO: Use default codec declared in one place
bdata, err := ipld.RetrieveBlockData(ctx, cs.Proposal.DAHeader, cs.IpfsAPI, types.DefaultCodec())
if err != nil {
return fmt.Errorf("failed to retreive block from the network: %w", err)
}

// Update Valid* if we can.
prevotes := cs.Votes.Prevotes(cs.Round)
blockID, hasTwoThirds := prevotes.TwoThirdsMajority()
if hasTwoThirds && !blockID.IsZero() && (cs.ValidRound < cs.Round) {
if cs.ProposalBlock.HashesTo(blockID.Hash) {
cs.Logger.Info("Updating valid block to new proposal block",
"valid-round", cs.Round, "valid-block-hash", cs.ProposalBlock.Hash())
cs.ValidRound = cs.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
}
// TODO: In case there is +2/3 majority in Prevotes set for some
// block and cs.ProposalBlock contains different block, either
// proposer is faulty or voting power of faulty processes is more
// than 1/3. We should trigger in the future accountability
// procedure at this point.
cs.ProposalBlock = types.MakeBlock(
cs.Proposal.Height,
bdata.Txs,
bdata.Evidence.Evidence,
bdata.IntermediateStateRoots.RawRootsList,
bdata.Messages,
nil, // TODO: Should we fill it?
)
cs.Logger.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash())
if err := cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent()); err != nil {
cs.Logger.Error("Error publishing event complete proposal", "err", err)
}

// Update Valid* if we can.
prevotes := cs.Votes.Prevotes(cs.Round)
blockID, hasTwoThirds := prevotes.TwoThirdsMajority()
if hasTwoThirds && !blockID.IsZero() && (cs.ValidRound < cs.Round) {
if cs.ProposalBlock.HashesTo(blockID.Hash) {
cs.Logger.Info("Updating valid block to new proposal block",
"valid-round", cs.Round, "valid-block-hash", cs.ProposalBlock.Hash())
cs.ValidRound = cs.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
}

if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step
cs.enterPrevote(height, cs.Round)
if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added
cs.enterPrecommit(height, cs.Round)
}
} else if cs.Step == cstypes.RoundStepCommit {
// If we're waiting on the proposal block...
cs.tryFinalizeCommit(height)
// TODO: In case there is +2/3 majority in Prevotes set for some
// block and cs.ProposalBlock contains different block, either
// proposer is faulty or voting power of faulty processes is more
// than 1/3. We should trigger in the future accountability
// procedure at this point.
}

if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step
cs.enterPrevote(cs.Proposal.Height, cs.Round)
if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added
cs.enterPrecommit(cs.Proposal.Height, cs.Round)
}
return added, nil
} else if cs.Step == cstypes.RoundStepCommit {
// If we're waiting on the proposal block...
cs.tryFinalizeCommit(cs.Proposal.Height)
}
return added, nil
return nil
}

// Attempt to add the vote. if its a duplicate signature, dupeout the validator
Expand Down

0 comments on commit d561809

Please sign in to comment.