diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 55a278bbeb..9992261eca 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -337,7 +337,7 @@ func (s *Service) sendHandshake(peer peer.ID, hs Handshake, info *notificationsP } if hsResponse.err != nil { - logger.Tracef("failed to read handshake from peer %s using protocol %s: %s", peer, info.protocolID, err) + logger.Tracef("failed to read handshake from peer %s using protocol %s: %s", peer, info.protocolID, hsResponse.err) closeOutboundStream(info, peer, stream) return nil, hsResponse.err } diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go new file mode 100644 index 0000000000..5b819f2dc5 --- /dev/null +++ b/lib/grandpa/catch-up.go @@ -0,0 +1,291 @@ +// Copyright 2021 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package grandpa + +import ( + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/ChainSafe/gossamer/lib/common" + "github.com/ChainSafe/gossamer/lib/crypto/ed25519" + "github.com/libp2p/go-libp2p-core/peer" +) + +const catchUpResponseTimeout = 5 * time.Second + +type catchUp struct { + lock sync.Mutex // applied on requestsSent + + requestsSent map[peer.ID]CatchUpRequest + bestResponse *atomic.Value // *CatchUpResponse + + catchUpResponseCh chan *CatchUpResponse + waitingOnResponse *atomic.Value + grandpa *Service +} + +func newCatchUp(grandpa *Service) *catchUp { + return &catchUp{ + requestsSent: make(map[peer.ID]CatchUpRequest), + grandpa: grandpa, + catchUpResponseCh: make(chan *CatchUpResponse), + } +} + +func (c *catchUp) do(to peer.ID, round uint64, setID uint64) error { + defer c.waitingOnResponse.Store(false) + + if err := c.sendCatchUpRequest( + to, newCatchUpRequest(round, setID), + ); err != nil { + return fmt.Errorf("failed to send catch up request: %w", err) + } + + logger.Debugf("successfully sent a catch up request to node %s, for round number %d and set ID %d", + to, round, setID) + + c.grandpa.paused.Store(true) + + timer := time.NewTimer(catchUpResponseTimeout) + defer timer.Stop() + + select { + case <-c.catchUpResponseCh: + return nil + case <-timer.C: + return errors.New("timeout") + } +} + +func (c *catchUp) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) error { + if c.bestResponse.Load() != nil { + logger.Debug("ignoring neighbour message since we are already processing a catch-up response") + return nil + } + + // TODO: Clean up all request sent before 5 min / (neighbour message interval) + c.lock.Lock() + _, ok := c.requestsSent[to] + c.lock.Unlock() + if ok { + logger.Debugf("ignoring neighbour message since we already sent a catch-up request to this peer: %s", to) + return nil + } + + c.waitingOnResponse.Store(true) + + cm, err := req.ToConsensusMessage() + if err != nil { + return fmt.Errorf("cannot convert request to consensus message: %w", err) + } + + err = c.grandpa.network.SendMessage(to, cm) + if err != nil { + return fmt.Errorf("cannot send grandpa message: %w", err) + } + + c.lock.Lock() + c.requestsSent[to] = *req + c.lock.Unlock() + c.grandpa.paused.Store(true) + + return nil +} + +func (c *catchUp) handleCatchUpResponse(msg *CatchUpResponse) error { + if !c.grandpa.authority { + return nil + } + + logger.Debugf( + "processing catch up response with hash %s for round %d and set id %d", + msg.Hash, msg.Round, msg.SetID) + + // if we aren't currently expecting a catch up response, return + if !c.grandpa.paused.Load().(bool) { + logger.Debug("not currently paused, ignoring catch up response") + return nil + } + + if msg.SetID != c.grandpa.state.setID { + return fmt.Errorf("%w: received set id %d but have set id %d in state", + ErrSetIDMismatch, msg.SetID, c.grandpa.state.setID) + } + + if msg.Round <= c.grandpa.state.round { + return fmt.Errorf("%w: received round %d but grandpa round in state is %d", + ErrInvalidCatchUpResponseRound, msg.Round, c.grandpa.state.round) + } + + if c.bestResponse.Load().(*CatchUpResponse).Round >= msg.Round { + logger.Debug("ignoring catch up response, since we are already processing one with a higher round") + } + + prevote, err := c.verifyPreVoteJustification(msg) + if err != nil { + return fmt.Errorf("cannot verify pre vote justification: %w", err) + } + + if err = c.verifyPreCommitJustification(msg); err != nil { + return fmt.Errorf("cannot verify pre commit justification: %w", err) + } + + if msg.Hash.IsEmpty() || msg.Number == 0 { + return ErrGHOSTlessCatchUp + } + + if err = c.verifyCatchUpResponseCompletability(prevote, msg.Hash); err != nil { + return fmt.Errorf("cannot verify catch-up response completability: %w", err) + } + + // set prevotes and precommits in db + if err = c.grandpa.grandpaState.SetPrevotes(msg.Round, msg.SetID, msg.PreVoteJustification); err != nil { + return fmt.Errorf("cannot set pre votes in grandpa state: %w", err) + } + + if err = c.grandpa.grandpaState.SetPrecommits(msg.Round, msg.SetID, msg.PreCommitJustification); err != nil { + return fmt.Errorf("cannot set pre commits in grandpa state: %w", err) + } + + // update state and signal to grandpa we are ready to initiate + head, err := c.grandpa.blockState.GetHeader(msg.Hash) + if err != nil { + logger.Debugf("failed to process catch up response for round %d, storing the catch up response to retry", msg.Round) + return fmt.Errorf("cannot get header from grandpa block state: %w", err) + } + + c.grandpa.head = head + c.grandpa.state.round = msg.Round + close(c.grandpa.resumed) + c.grandpa.resumed = make(chan struct{}) + if c.waitingOnResponse.Load().(bool) { + c.catchUpResponseCh <- msg + } + + c.grandpa.paused.Store(false) + + // resetting both response and requests + c.bestResponse.Store(nil) + c.lock.Lock() + c.requestsSent = make(map[peer.ID]CatchUpRequest) + c.lock.Unlock() + + logger.Debugf("caught up to round; unpaused service and grandpa state round is %d", c.grandpa.state.round) + return nil +} + +// verifyCatchUpResponseCompletability verifies that the pre-commit block is a descendant of, or is, the pre-voted block +func (c *catchUp) verifyCatchUpResponseCompletability(prevote, precommit common.Hash) error { + if prevote == precommit { + return nil + } + + // check if the current block is a descendant of prevoted block + isDescendant, err := c.grandpa.blockState.IsDescendantOf(prevote, precommit) + if err != nil { + return fmt.Errorf("cannot find descendance between prevote and precommit: %w", err) + } + + if !isDescendant { + return ErrCatchUpResponseNotCompletable + } + + return nil +} + +func (c *catchUp) verifyPreCommitJustification(msg *CatchUpResponse) error { + auths := make([]AuthData, len(msg.PreCommitJustification)) + for i, pcj := range msg.PreCommitJustification { + auths[i] = AuthData{AuthorityID: pcj.AuthorityID} + } + + eqvVoters := getEquivocatoryVoters(auths) + + // verify pre-commit justification + var count uint64 + for idx := range msg.PreCommitJustification { + just := &msg.PreCommitJustification[idx] + + if _, ok := eqvVoters[just.AuthorityID]; ok { + continue + } + + err := verifyJustification(c.grandpa, just, msg.Round, msg.SetID, precommit) + if err != nil { + continue + } + + if just.Vote.Hash == msg.Hash && just.Vote.Number == msg.Number { + count++ + } + } + + if count+uint64(len(eqvVoters)) < c.grandpa.state.threshold() { + return ErrMinVotesNotMet + } + + return nil +} + +func (c *catchUp) verifyPreVoteJustification(msg *CatchUpResponse) (common.Hash, error) { + voters := make(map[ed25519.PublicKeyBytes]map[common.Hash]int, len(msg.PreVoteJustification)) + eqVotesByHash := make(map[common.Hash]map[ed25519.PublicKeyBytes]struct{}) + + // identify equivocatory votes by hash + for _, justification := range msg.PreVoteJustification { + hashsToCount, ok := voters[justification.AuthorityID] + if !ok { + hashsToCount = make(map[common.Hash]int) + } + + hashsToCount[justification.Vote.Hash]++ + voters[justification.AuthorityID] = hashsToCount + + if hashsToCount[justification.Vote.Hash] > 1 { + pubKeysOnHash, ok := eqVotesByHash[justification.Vote.Hash] + if !ok { + pubKeysOnHash = make(map[ed25519.PublicKeyBytes]struct{}) + } + + pubKeysOnHash[justification.AuthorityID] = struct{}{} + eqVotesByHash[justification.Vote.Hash] = pubKeysOnHash + } + } + + // verify pre-vote justification, returning the pre-voted block if there is one + votes := make(map[common.Hash]uint64) + for idx := range msg.PreVoteJustification { + just := &msg.PreVoteJustification[idx] + + // if the current voter is on equivocatory map then ignore the vote + if _, ok := eqVotesByHash[just.Vote.Hash][just.AuthorityID]; ok { + continue + } + + err := verifyJustification(c.grandpa, just, msg.Round, msg.SetID, prevote) + if err != nil { + continue + } + + votes[just.Vote.Hash]++ + } + + var prevote common.Hash + for hash, count := range votes { + equivocatoryVotes := eqVotesByHash[hash] + if count+uint64(len(equivocatoryVotes)) >= c.grandpa.state.threshold() { + prevote = hash + break + } + } + + if prevote.IsEmpty() { + return prevote, ErrMinVotesNotMet + } + + return prevote, nil +} diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index ffc2a01d9c..6ef3879fae 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -77,8 +77,7 @@ type Service struct { in chan *networkVoteMessage // only used to receive *VoteMessage finalisedCh chan *types.FinalisationInfo neighbourMessage *NeighbourMessage // cached neighbour message - - telemetry telemetry.Client + telemetry telemetry.Client } // Config represents a GRANDPA service configuration diff --git a/lib/grandpa/message.go b/lib/grandpa/message.go index 5bf663690c..46ba1a0adf 100644 --- a/lib/grandpa/message.go +++ b/lib/grandpa/message.go @@ -13,9 +13,20 @@ import ( "github.com/ChainSafe/gossamer/pkg/scale" ) +type GrandpaMessageType int64 + +const ( + VoteMessageType GrandpaMessageType = iota + CommitMessageType + NeighborMessageType + CatchUpRequestMessageType + CatchUpResponseMessageType +) + // GrandpaMessage is implemented by all GRANDPA network messages type GrandpaMessage interface { //nolint:revive ToConsensusMessage() (*network.ConsensusMessage, error) + Type() GrandpaMessageType } // NewGrandpaMessage returns a new VaryingDataType to represent a GrandpaMessage @@ -59,6 +70,8 @@ type VoteMessage struct { // Index Returns VDT index func (v VoteMessage) Index() uint { return 0 } +func (v VoteMessage) Type() GrandpaMessageType { return VoteMessageType } + // ToConsensusMessage converts the VoteMessage into a network-level consensus message func (v *VoteMessage) ToConsensusMessage() (*ConsensusMessage, error) { msg := newGrandpaMessage() @@ -88,6 +101,8 @@ type NeighbourMessage struct { // Index Returns VDT index func (m NeighbourMessage) Index() uint { return 2 } +func (v NeighbourMessage) Type() GrandpaMessageType { return NeighborMessageType } + // ToConsensusMessage converts the NeighbourMessage into a network-level consensus message func (m *NeighbourMessage) ToConsensusMessage() (*network.ConsensusMessage, error) { msg := newGrandpaMessage() @@ -139,6 +154,8 @@ func (s *Service) newCommitMessage(header *types.Header, round uint64) (*CommitM // Index Returns VDT index func (f CommitMessage) Index() uint { return 1 } +func (v CommitMessage) Type() GrandpaMessageType { return CommitMessageType } + // ToConsensusMessage converts the CommitMessage into a network-level consensus message func (f *CommitMessage) ToConsensusMessage() (*ConsensusMessage, error) { msg := newGrandpaMessage() @@ -191,7 +208,9 @@ func compactToJustification(vs []Vote, auths []AuthData) ([]SignedVote, error) { // CatchUpRequest struct to represent a CatchUpRequest message type CatchUpRequest struct { + // Round that we want to catch up to Round uint64 + // The voter set ID this message is from. SetID uint64 } @@ -205,6 +224,8 @@ func newCatchUpRequest(round, setID uint64) *CatchUpRequest { // Index Returns VDT index func (r CatchUpRequest) Index() uint { return 3 } +func (r CatchUpRequest) Type() GrandpaMessageType { return CatchUpRequestMessageType } + // ToConsensusMessage converts the catchUpRequest into a network-level consensus message func (r *CatchUpRequest) ToConsensusMessage() (*ConsensusMessage, error) { msg := newGrandpaMessage() @@ -262,6 +283,8 @@ func (s *Service) newCatchUpResponse(round, setID uint64) (*CatchUpResponse, err // Index Returns VDT index func (r CatchUpResponse) Index() uint { return 4 } +func (r CatchUpResponse) Type() GrandpaMessageType { return CatchUpResponseMessageType } + // ToConsensusMessage converts the catchUpResponse into a network-level consensus message func (r *CatchUpResponse) ToConsensusMessage() (*ConsensusMessage, error) { msg := newGrandpaMessage() diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 3402e8e8d3..56c9cd77a3 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -9,7 +9,7 @@ import ( "fmt" "reflect" - "github.com/ChainSafe/gossamer/dot/network" + "github.com/ChainSafe/chaindb" "github.com/ChainSafe/gossamer/dot/telemetry" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/blocktree" @@ -20,9 +20,12 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) +const catchupThreshold = 2 + // MessageHandler handles GRANDPA consensus messages type MessageHandler struct { grandpa *Service + catchUp *catchUp blockState BlockState telemetry telemetry.Client } @@ -32,6 +35,7 @@ func NewMessageHandler(grandpa *Service, blockState BlockState, telemetryMailer return &MessageHandler{ grandpa: grandpa, blockState: blockState, + catchUp: newCatchUp(grandpa), telemetry: telemetryMailer, } } @@ -39,7 +43,7 @@ func NewMessageHandler(grandpa *Service, blockState BlockState, telemetryMailer // HandleMessage handles a GRANDPA consensus message // if it is a CommitMessage, it updates the BlockState // if it is a VoteMessage, it sends it to the GRANDPA service -func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network.NotificationsMessage, error) { +func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) error { logger.Tracef("handling grandpa message: %v", m) switch msg := m.(type) { @@ -50,30 +54,34 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. msg: msg, } - return nil, nil + return nil case *CommitMessage: - return nil, h.handleCommitMessage(msg) + return h.handleCommitMessage(msg) case *NeighbourMessage: // we can afford to not retry handling neighbour message, if it errors. - return nil, h.handleNeighbourMessage(msg) + return h.handleNeighbourMessage(msg, from) case *CatchUpRequest: - return h.handleCatchUpRequest(msg) + return h.handleCatchUpRequest(msg, from) case *CatchUpResponse: - err := h.handleCatchUpResponse(msg) - if errors.Is(err, blocktree.ErrNodeNotFound) { - // TODO: we are adding these messages to reprocess them again, but we - // haven't added code to reprocess them. Do that. - // Also, revisit if we need to add these message in synchronous manner - // or not. If not, change catchUpResponseMessages to a normal map. #1531 - h.grandpa.tracker.addCatchUpResponse(msg) + // err := h.handleCatchUpResponse(msg) + err := h.catchUp.handleCatchUpResponse(msg) + if errors.Is(err, blocktree.ErrNodeNotFound) || errors.Is(err, chaindb.ErrKeyNotFound) { + h.grandpa.tracker.addCatchUpResponse(&networkCatchUpResponseMessage{ + from: from, + msg: msg, + }) + h.catchUp.bestResponse.Store(msg) + } else if err != nil { + logger.Debugf("could not catchup: %s", err) } - return nil, err + + return err default: - return nil, ErrInvalidMessageType + return ErrInvalidMessageType } } -func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage) error { +func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer.ID) error { currFinalized, err := h.blockState.GetFinalisedHeader(0, 0) if err != nil { return err @@ -91,13 +99,32 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage) error { return err } - // ignore neighbour messages that are above our head - if uint(msg.Number) > head { + // we shouldn't send a catch up request for blocks we haven't synced yet + // as we won't be able to process them. We also receive neighbour messages + // each time a new block is finalized, so we get them very often. + if msg.Number > uint32(head) { + logger.Debug("ignoring neighbour message, because we have not synced to this block number") return nil } - logger.Debugf("got neighbour message with number %d, set id %d and round %d", msg.Number, msg.SetID, msg.Round) - // TODO: should we send a justification request here? potentially re-connect this to sync package? (#1815) + logger.Debugf("got neighbour message with number %d, set id %d and round %d, from: %s ", + msg.Number, msg.SetID, msg.Round, from) + + highestRound, setID, err := h.blockState.GetHighestRoundAndSetID() + if err != nil { + return fmt.Errorf("cannot get highest round and set id: %w", err) + } + + if msg.SetID != setID { + return fmt.Errorf("%w: received %d and expected %d", ErrSetIDMismatch, msg.SetID, setID) + } + + // catch up only if we are behind by more than catchup threshold + if int(msg.Round-highestRound) > catchupThreshold { + logger.Debugf("lagging behind by %d rounds", msg.Round-highestRound) + return h.catchUp.do(from, msg.Round, msg.SetID) + } + return nil } @@ -148,117 +175,45 @@ func (h *MessageHandler) handleCommitMessage(msg *CommitMessage) error { return nil } -func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest) (*ConsensusMessage, error) { - if !h.grandpa.authority { - return nil, nil //nolint:nilnil - } - - logger.Debugf("received catch up request for round %d and set id %d", - msg.Round, msg.SetID) - - if msg.SetID != h.grandpa.state.setID { - return nil, ErrSetIDMismatch - } - - if msg.Round >= h.grandpa.state.round { - return nil, ErrInvalidCatchUpRound - } - - resp, err := h.grandpa.newCatchUpResponse(msg.Round, msg.SetID) - if err != nil { - return nil, err - } - - logger.Debugf( - "sending catch up response with hash %s for round %d and set id %d", - resp.Hash, msg.Round, msg.SetID) - return resp.ToConsensusMessage() -} - -func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { +func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest, from peer.ID) error { if !h.grandpa.authority { return nil } - logger.Debugf( - "received catch up response with hash %s for round %d and set id %d", - msg.Hash, msg.Round, msg.SetID) + logger.Debugf("received catch up request for round %d and set id %d, from %s", + msg.Round, msg.SetID, from) - // TODO: re-add catch-up logic (#1531) - if true { - return nil - } - - // if we aren't currently expecting a catch up response, return - if !h.grandpa.paused.Load().(bool) { - logger.Debug("not currently paused, ignoring catch up response") - return nil - } + logger.Debugf("Our latest round is %d", h.grandpa.state.round) if msg.SetID != h.grandpa.state.setID { return ErrSetIDMismatch } - if msg.Round != h.grandpa.state.round-1 { - return ErrInvalidCatchUpResponseRound + if msg.Round > h.grandpa.state.round { + return fmt.Errorf("%w: received %d and grandpa state round is %d", + ErrInvalidCatchUpRound, msg.Round, h.grandpa.state.round) } - prevote, err := h.verifyPreVoteJustification(msg) + // We don't necessarily have to reply with the round asked in the request, we can reply + // with our latest round. + resp, err := h.grandpa.newCatchUpResponse(h.grandpa.state.round, h.grandpa.state.setID) if err != nil { return err } - if err = h.verifyPreCommitJustification(msg); err != nil { - return err - } - - if msg.Hash.IsEmpty() || msg.Number == 0 { - return ErrGHOSTlessCatchUp - } - - if err = h.verifyCatchUpResponseCompletability(prevote, msg.Hash); err != nil { - return err - } - - // set prevotes and precommits in db - if err = h.grandpa.grandpaState.SetPrevotes(msg.Round, msg.SetID, msg.PreVoteJustification); err != nil { - return err - } - - if err = h.grandpa.grandpaState.SetPrecommits(msg.Round, msg.SetID, msg.PreCommitJustification); err != nil { - return err - } - - // update state and signal to grandpa we are ready to initiate - head, err := h.grandpa.blockState.GetHeader(msg.Hash) + cm, err := resp.ToConsensusMessage() if err != nil { return err } - h.grandpa.head = head - h.grandpa.state.round = msg.Round - close(h.grandpa.resumed) - h.grandpa.resumed = make(chan struct{}) - h.grandpa.paused.Store(false) - logger.Debugf("caught up to round; unpaused service and grandpa state round is %d", h.grandpa.state.round) - return nil -} - -// verifyCatchUpResponseCompletability verifies that the pre-commit block is a descendant of, or is, the pre-voted block -func (h *MessageHandler) verifyCatchUpResponseCompletability(prevote, precommit common.Hash) error { - if prevote == precommit { - return nil - } - - // check if the current block is a descendant of prevoted block - isDescendant, err := h.grandpa.blockState.IsDescendantOf(prevote, precommit) + err = h.grandpa.network.SendMessage(from, cm) if err != nil { return err } - if !isDescendant { - return ErrCatchUpResponseNotCompletable - } + logger.Debugf( + "successfully sent catch up response with hash %s for round %d and set id %d, to %s", + resp.Hash, h.grandpa.state.round, h.grandpa.state.setID, from) return nil } @@ -298,7 +253,7 @@ func (h *MessageHandler) verifyCommitMessageJustification(fm *CommitMessage) err AuthorityID: fm.AuthData[i].AuthorityID, } - err := h.verifyJustification(just, fm.Round, h.grandpa.state.setID, precommit) + err := verifyJustification(h.grandpa, just, fm.Round, h.grandpa.state.setID, precommit) if err != nil { continue } @@ -326,100 +281,7 @@ func (h *MessageHandler) verifyCommitMessageJustification(fm *CommitMessage) err return nil } -func (h *MessageHandler) verifyPreVoteJustification(msg *CatchUpResponse) (common.Hash, error) { - voters := make(map[ed25519.PublicKeyBytes]map[common.Hash]int, len(msg.PreVoteJustification)) - eqVotesByHash := make(map[common.Hash]map[ed25519.PublicKeyBytes]struct{}) - - // identify equivocatory votes by hash - for _, justification := range msg.PreVoteJustification { - hashsToCount, ok := voters[justification.AuthorityID] - if !ok { - hashsToCount = make(map[common.Hash]int) - } - - hashsToCount[justification.Vote.Hash]++ - voters[justification.AuthorityID] = hashsToCount - - if hashsToCount[justification.Vote.Hash] > 1 { - pubKeysOnHash, ok := eqVotesByHash[justification.Vote.Hash] - if !ok { - pubKeysOnHash = make(map[ed25519.PublicKeyBytes]struct{}) - } - - pubKeysOnHash[justification.AuthorityID] = struct{}{} - eqVotesByHash[justification.Vote.Hash] = pubKeysOnHash - } - } - - // verify pre-vote justification, returning the pre-voted block if there is one - votes := make(map[common.Hash]uint64) - for idx := range msg.PreVoteJustification { - just := &msg.PreVoteJustification[idx] - - // if the current voter is on equivocatory map then ignore the vote - if _, ok := eqVotesByHash[just.Vote.Hash][just.AuthorityID]; ok { - continue - } - - err := h.verifyJustification(just, msg.Round, msg.SetID, prevote) - if err != nil { - continue - } - - votes[just.Vote.Hash]++ - } - - var prevote common.Hash - for hash, count := range votes { - equivocatoryVotes := eqVotesByHash[hash] - if count+uint64(len(equivocatoryVotes)) >= h.grandpa.state.threshold() { - prevote = hash - break - } - } - - if prevote.IsEmpty() { - return prevote, ErrMinVotesNotMet - } - - return prevote, nil -} - -func (h *MessageHandler) verifyPreCommitJustification(msg *CatchUpResponse) error { - auths := make([]AuthData, len(msg.PreCommitJustification)) - for i, pcj := range msg.PreCommitJustification { - auths[i] = AuthData{AuthorityID: pcj.AuthorityID} - } - - eqvVoters := getEquivocatoryVoters(auths) - - // verify pre-commit justification - var count uint64 - for idx := range msg.PreCommitJustification { - just := &msg.PreCommitJustification[idx] - - if _, ok := eqvVoters[just.AuthorityID]; ok { - continue - } - - err := h.verifyJustification(just, msg.Round, msg.SetID, precommit) - if err != nil { - continue - } - - if just.Vote.Hash == msg.Hash && just.Vote.Number == msg.Number { - count++ - } - } - - if count+uint64(len(eqvVoters)) < h.grandpa.state.threshold() { - return ErrMinVotesNotMet - } - - return nil -} - -func (h *MessageHandler) verifyJustification(just *SignedVote, round, setID uint64, stage Subround) error { +func verifyJustification(grandpa *Service, just *SignedVote, round, setID uint64, stage Subround) error { // verify signature msg, err := scale.Marshal(FullVote{ Stage: stage, @@ -448,7 +310,7 @@ func (h *MessageHandler) verifyJustification(just *SignedVote, round, setID uint // verify authority in justification set authFound := false - for _, auth := range h.grandpa.authorities() { + for _, auth := range grandpa.authorities() { justKey, err := just.AuthorityID.Encode() if err != nil { return err diff --git a/lib/grandpa/message_handler_test.go b/lib/grandpa/message_handler_test.go index 54973774a6..97759c5c76 100644 --- a/lib/grandpa/message_handler_test.go +++ b/lib/grandpa/message_handler_test.go @@ -172,9 +172,8 @@ func TestMessageHandler_VoteMessage(t *testing.T) { telemetryMock.EXPECT().SendMessage(gomock.Any()).AnyTimes() h := NewMessageHandler(gs, st.Block, telemetryMock) - out, err := h.handleMessage("", vm) + err = h.handleMessage("", vm) require.NoError(t, err) - require.Nil(t, out) select { case vote := <-gs.in: @@ -199,7 +198,7 @@ func TestMessageHandler_NeighbourMessage(t *testing.T) { SetID: 3, Number: 1, } - _, err := h.handleMessage("", msg) + err := h.handleMessage("", msg) require.NoError(t, err) digest := types.NewDigest() @@ -223,9 +222,8 @@ func TestMessageHandler_NeighbourMessage(t *testing.T) { err = st.Block.AddBlock(block) require.NoError(t, err) - out, err := h.handleMessage("", msg) + err = h.handleMessage("", msg) require.NoError(t, err) - require.Nil(t, out) } func TestMessageHandler_VerifyJustification_InvalidSig(t *testing.T) { @@ -243,7 +241,7 @@ func TestMessageHandler_VerifyJustification_InvalidSig(t *testing.T) { telemetryMock.EXPECT().SendMessage(gomock.Any()).AnyTimes() h := NewMessageHandler(gs, st.Block, telemetryMock) - err := h.verifyJustification(just, gs.state.round, gs.state.setID, precommit) + err := verifyJustification(h.grandpa, just, gs.state.round, gs.state.setID, precommit) require.Equal(t, err, ErrInvalidSignature) } @@ -282,9 +280,8 @@ func TestMessageHandler_CommitMessage_NoCatchUpRequest_ValidSig(t *testing.T) { telemetryMock.EXPECT().SendMessage(gomock.Any()).AnyTimes() h := NewMessageHandler(gs, st.Block, telemetryMock) - out, err := h.handleMessage("", fm) + err = h.handleMessage("", fm) require.NoError(t, err) - require.Nil(t, out) hash, err := st.Block.GetFinalisedHash(fm.Round, gs.state.setID) require.NoError(t, err) @@ -309,9 +306,8 @@ func TestMessageHandler_CommitMessage_NoCatchUpRequest_MinVoteError(t *testing.T telemetryMock.EXPECT().SendMessage(gomock.Any()).AnyTimes() h := NewMessageHandler(gs, st.Block, telemetryMock) - out, err := h.handleMessage("", fm) + err = h.handleMessage("", fm) require.EqualError(t, err, ErrMinVotesNotMet.Error()) - require.Nil(t, out) } func TestMessageHandler_CommitMessage_WithCatchUpRequest(t *testing.T) { @@ -337,7 +333,7 @@ func TestMessageHandler_CommitMessage_WithCatchUpRequest(t *testing.T) { telemetryMock.EXPECT().SendMessage(gomock.Any()).AnyTimes() h := NewMessageHandler(gs, st.Block, telemetryMock) - _, err = h.handleMessage("", fm) + err = h.handleMessage("", fm) require.NoError(t, err) } @@ -350,7 +346,7 @@ func TestMessageHandler_CatchUpRequest_InvalidRound(t *testing.T) { telemetryMock.EXPECT().SendMessage(gomock.Any()).AnyTimes() h := NewMessageHandler(gs, st.Block, telemetryMock) - _, err := h.handleMessage("", req) + err := h.handleMessage("", req) require.Equal(t, ErrInvalidCatchUpRound, err) } @@ -363,11 +359,11 @@ func TestMessageHandler_CatchUpRequest_InvalidSetID(t *testing.T) { telemetryMock.EXPECT().SendMessage(gomock.Any()).AnyTimes() h := NewMessageHandler(gs, st.Block, telemetryMock) - _, err := h.handleMessage("", req) + err := h.handleMessage("", req) require.Equal(t, ErrSetIDMismatch, err) } -func TestMessageHandler_CatchUpRequest_WithResponse(t *testing.T) { +func TestMessageHandler_CatchUpResponse(t *testing.T) { gs, st := newTestService(t) // set up needed info for response @@ -418,23 +414,36 @@ func TestMessageHandler_CatchUpRequest_WithResponse(t *testing.T) { err = gs.grandpaState.SetPrecommits(round, setID, pcj) require.NoError(t, err) - resp, err := gs.newCatchUpResponse(round, setID) - require.NoError(t, err) - - expected, err := resp.ToConsensusMessage() - require.NoError(t, err) - - // create and handle request - req := newCatchUpRequest(round, setID) - ctrl := gomock.NewController(t) telemetryMock := NewMockClient(ctrl) telemetryMock.EXPECT().SendMessage(gomock.Any()).AnyTimes() h := NewMessageHandler(gs, st.Block, telemetryMock) - out, err := h.handleMessage("", req) - require.NoError(t, err) - require.Equal(t, expected, out) + + h.grandpa.paused.Store(true) + + // If catching up to a previous round, throw an error + err = h.handleMessage("", &CatchUpResponse{ + SetID: setID, + Round: 1, + PreVoteJustification: []types.GrandpaSignedVote{ + { + Vote: *testVote, + Signature: testSignature, + AuthorityID: testAuthorityID, + }, + }, + PreCommitJustification: []types.GrandpaSignedVote{ + { + Vote: *testVote, + Signature: testSignature, + AuthorityID: testAuthorityID, + }, + }, + Hash: common.Hash{}, + Number: 1, + }) + require.ErrorIs(t, err, ErrInvalidCatchUpResponseRound) } func TestVerifyJustification(t *testing.T) { @@ -452,7 +461,7 @@ func TestVerifyJustification(t *testing.T) { AuthorityID: kr.Alice().Public().(*ed25519.PublicKey).AsBytes(), } - err := h.verifyJustification(just, 77, gs.state.setID, precommit) + err := verifyJustification(h.grandpa, just, 77, gs.state.setID, precommit) require.NoError(t, err) } @@ -472,7 +481,7 @@ func TestVerifyJustification_InvalidSignature(t *testing.T) { AuthorityID: kr.Alice().Public().(*ed25519.PublicKey).AsBytes(), } - err := h.verifyJustification(just, 77, gs.state.setID, precommit) + err := verifyJustification(h.grandpa, just, 77, gs.state.setID, precommit) require.EqualError(t, err, ErrInvalidSignature.Error()) } @@ -495,7 +504,7 @@ func TestVerifyJustification_InvalidAuthority(t *testing.T) { AuthorityID: fakeKey.Public().(*ed25519.PublicKey).AsBytes(), } - err = h.verifyJustification(just, 77, gs.state.setID, precommit) + err = verifyJustification(h.grandpa, just, 77, gs.state.setID, precommit) require.EqualError(t, err, ErrVoterNotFound.Error()) } @@ -514,7 +523,7 @@ func TestMessageHandler_VerifyPreVoteJustification(t *testing.T) { PreVoteJustification: just, } - prevote, err := h.verifyPreVoteJustification(msg) + prevote, err := h.catchUp.verifyPreVoteJustification(msg) require.NoError(t, err) require.Equal(t, testHash, prevote) } @@ -537,7 +546,7 @@ func TestMessageHandler_VerifyPreCommitJustification(t *testing.T) { Number: uint32(round), } - err := h.verifyPreCommitJustification(msg) + err := h.catchUp.verifyPreCommitJustification(msg) require.NoError(t, err) } @@ -567,9 +576,8 @@ func TestMessageHandler_HandleCatchUpResponse(t *testing.T) { Number: uint32(round), } - out, err := h.handleMessage("", msg) + err = h.handleMessage("", msg) require.NoError(t, err) - require.Nil(t, out) require.Equal(t, round+1, gs.state.round) } @@ -940,7 +948,7 @@ func Test_VerifyPrevoteJustification_CountEquivocatoryVoters(t *testing.T) { Number: uint32(bfcNumber), } - hash, err := h.verifyPreVoteJustification(testCatchUpResponse) + hash, err := h.catchUp.verifyPreVoteJustification(testCatchUpResponse) require.NoError(t, err) require.Equal(t, hash, bfcHash) } @@ -1013,7 +1021,7 @@ func Test_VerifyPreCommitJustification(t *testing.T) { Number: uint32(bfcNumber), } - err = h.verifyPreCommitJustification(testCatchUpResponse) + err = h.catchUp.verifyPreCommitJustification(testCatchUpResponse) require.NoError(t, err) } diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index 4f540ac0dc..8cd6ffb686 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -26,8 +26,8 @@ type tracker struct { stopped chan struct{} catchUpResponseMessageMutex sync.Mutex - // round(uint64) is used as key and *CatchUpResponse as value - catchUpResponseMessages map[uint64]*CatchUpResponse + // block hash is used as key and *CatchUpResponse as value + catchUpResponseMessages map[common.Hash]*networkCatchUpResponseMessage } func newTracker(bs BlockState, handler *MessageHandler) *tracker { @@ -39,7 +39,7 @@ func newTracker(bs BlockState, handler *MessageHandler) *tracker { mapLock: sync.Mutex{}, in: bs.GetImportedBlockNotifierChannel(), stopped: make(chan struct{}), - catchUpResponseMessages: make(map[uint64]*CatchUpResponse), + catchUpResponseMessages: make(map[common.Hash]*networkCatchUpResponseMessage), } } @@ -75,10 +75,11 @@ func (t *tracker) addCommit(cm *CommitMessage) { t.commitMessages[cm.Vote.Hash] = cm } -func (t *tracker) addCatchUpResponse(cr *CatchUpResponse) { +func (t *tracker) addCatchUpResponse(cr *networkCatchUpResponseMessage) { t.catchUpResponseMessageMutex.Lock() defer t.catchUpResponseMessageMutex.Unlock() - t.catchUpResponseMessages[cr.Round] = cr + + t.catchUpResponseMessages[cr.msg.Hash] = cr } func (t *tracker) handleBlocks() { @@ -104,21 +105,32 @@ func (t *tracker) handleBlock(b *types.Block) { if vms, has := t.voteMessages[h]; has { for _, v := range vms { // handleMessage would never error for vote message - _, err := t.handler.handleMessage(v.from, v.msg) + err := t.handler.handleMessage(v.from, v.msg) if err != nil { logger.Warnf("failed to handle vote message %v: %s", v, err) } } + // TODO: Check if we should delete all vote messages for h, + // if fail to process a few of the vote messages. delete(t.voteMessages, h) } if cm, has := t.commitMessages[h]; has { - _, err := t.handler.handleMessage("", cm) + err := t.handler.handleMessage("", cm) if err != nil { logger.Warnf("failed to handle commit message %v: %s", cm, err) + } else { + delete(t.commitMessages, h) } + } - delete(t.commitMessages, h) + if cr, has := t.catchUpResponseMessages[h]; has { + err := t.handler.handleMessage(cr.from, cr.msg) + if err != nil { + logger.Warnf("failed to handle catch up response message %v: %s", cr, err) + } else { + delete(t.catchUpResponseMessages, h) + } } } diff --git a/lib/grandpa/network.go b/lib/grandpa/network.go index 49549243bf..ef05e5257e 100644 --- a/lib/grandpa/network.go +++ b/lib/grandpa/network.go @@ -116,6 +116,8 @@ func (*Service) decodeMessage(in []byte) (NotificationsMessage, error) { return msg, err } +// handleNetworkMessage processes notification messages and return true if we should +// propagate this message, false if otherwise. func (s *Service) handleNetworkMessage(from peer.ID, msg NotificationsMessage) (bool, error) { if msg == nil { return false, nil @@ -135,31 +137,20 @@ func (s *Service) handleNetworkMessage(from peer.ID, msg NotificationsMessage) ( return false, err } - resp, err := s.messageHandler.handleMessage(from, m) + err = s.messageHandler.handleMessage(from, m) if err != nil { return false, err } - switch r := resp.(type) { - case *ConsensusMessage: - if r != nil { - s.network.GossipMessage(resp) - } - case nil: - default: - logger.Warnf( - "unexpected type %T returned from message handler: %v", - resp, resp) - } - - switch m.(type) { - case *NeighbourMessage: - return false, nil - case *CatchUpResponse: + switch m.Type() { + case VoteMessageType, CommitMessageType: + s.network.GossipMessage(msg) + return true, nil + case NeighborMessageType, CatchUpRequestMessageType, CatchUpResponseMessageType: return false, nil } - return true, nil + return false, nil } // sendMessage sends a vote message to be gossiped to the network diff --git a/lib/grandpa/vote_message.go b/lib/grandpa/vote_message.go index 827d400cf2..ebfb775310 100644 --- a/lib/grandpa/vote_message.go +++ b/lib/grandpa/vote_message.go @@ -22,6 +22,11 @@ type networkVoteMessage struct { msg *VoteMessage } +type networkCatchUpResponseMessage struct { + from peer.ID + msg *CatchUpResponse +} + // receiveVoteMessages receives messages from the in channel until a grandpa round finishes. func (s *Service) receiveVoteMessages(ctx context.Context) { for { diff --git a/lib/runtime/wasmer/instance.go b/lib/runtime/wasmer/instance.go index da3b5441a9..8be175c273 100644 --- a/lib/runtime/wasmer/instance.go +++ b/lib/runtime/wasmer/instance.go @@ -162,6 +162,7 @@ func NewInstance(code []byte, cfg *Config) (*Instance, error) { codeHash: cfg.CodeHash, } + // TODO: log this error in debug, trace or warn mode. Do not ignore it. inst.version, _ = inst.Version() return inst, nil }