From b7d814c368ec33e59892246e1a4ae3ff07b03704 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 10 Jan 2022 17:43:22 +0530 Subject: [PATCH 01/36] lib/grandpa: ensure catch-up logic works - ensure catch-up logic works when interacting with substrate nodes - If round in the neighbour message is ahead of our current round by a threshold, send a catch up request - process the catch up response, if we can't process it at the moment, store it to process later. Closes #1531 --- lib/grandpa/grandpa.go | 6 ++ lib/grandpa/message_handler.go | 130 +++++++++++++++++++++++++++------ 2 files changed, 114 insertions(+), 22 deletions(-) diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index 0955dcbb7f..ddf34e75ef 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -74,6 +74,12 @@ type Service struct { in chan *networkVoteMessage // only used to receive *VoteMessage finalisedCh chan *types.FinalisationInfo neighbourMessage *NeighbourMessage // cached neighbour message + + catchUpResponseCh chan *CatchUpResponse + + CatchUpResponseCacheLock sync.Mutex + // round number is used as key + CatchUpResponseCache map[uint64]CatchUpResponse } // Config represents a GRANDPA service configuration diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 12285fc894..168f0ae71c 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -9,7 +9,9 @@ import ( "fmt" "math/big" "reflect" + "time" + "github.com/ChainSafe/chaindb" "github.com/ChainSafe/gossamer/dot/network" "github.com/ChainSafe/gossamer/dot/telemetry" "github.com/ChainSafe/gossamer/dot/types" @@ -21,6 +23,8 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) +const CATCHUP_THRESHOLD = 2 + // MessageHandler handles GRANDPA consensus messages type MessageHandler struct { grandpa *Service @@ -53,17 +57,27 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. case *CommitMessage: return nil, h.handleCommitMessage(msg) case *NeighbourMessage: - return nil, h.handleNeighbourMessage(msg) + fmt.Println("got a neighbour message") + return nil, h.handleNeighbourMessage(msg, from) case *CatchUpRequest: - return h.handleCatchUpRequest(msg) + networkMessage, err := h.handleCatchUpRequest(msg, from) + if err != nil { + logger.Debugf("could not handle catch up request: %s", err) + } + + return networkMessage, err case *CatchUpResponse: - return nil, h.handleCatchUpResponse(msg) + err := h.handleCatchUpResponse(msg) + if err != nil { + logger.Debugf("could not catchup: %s", err) + } + return nil, err default: return nil, ErrInvalidMessageType } } -func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage) error { +func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer.ID) error { currFinalized, err := h.blockState.GetFinalisedHeader(0, 0) if err != nil { return err @@ -76,21 +90,74 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage) error { // TODO; determine if there is some reason we don't receive justifications in responses near the head (usually), // and remove the following code if it's fixed. (#1815) - head, err := h.blockState.BestBlockNumber() + // head, err := h.blockState.BestBlockNumber() + // if err != nil { + // return err + // } + + // TODO: Why are we ignoring these? Isn't + // msg.Number likely to be higher if we are lagging behind? + // ignore neighbour messages that are above our head + // if int64(msg.Number) > head.Int64() { + // return nil + // } + + logger.Debugf("got neighbour message with number %d, set id %d and round %d", msg.Number, msg.SetID, msg.Round) + // TODO: should we send a justification request here? potentially re-connect this to sync package? (#1815) + + highestRound, highestSetID, err := h.blockState.GetHighestRoundAndSetID() if err != nil { return err } - // ignore neighbour messages that are above our head - if int64(msg.Number) > head.Int64() { - return nil + logger.Debugf("lagging behind by %d", msg.Round-highestRound) + // catch up only if we are behind by more than catchup threshold + if (msg.Round - highestRound) > CATCHUP_THRESHOLD { + catchUpResponse, err := h.sendCatchUpRequest( + from, newCatchUpRequest(highestRound, highestSetID), + ) + if err != nil { + logger.Debugf("failed to send catch up request: %s", err.Error()) + return err + } + + logger.Debugf("sent a catch up request to node %s", from) + + err = h.handleCatchUpResponse(catchUpResponse) + if err != nil { + return err + } } - logger.Debugf("got neighbour message with number %d, set id %d and round %d", msg.Number, msg.SetID, msg.Round) - // TODO: should we send a justification request here? potentially re-connect this to sync package? (#1815) return nil } +func (h *MessageHandler) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) (*CatchUpResponse, error) { + cm, err := req.ToConsensusMessage() + if err != nil { + return nil, err + } + + err = h.grandpa.network.SendMessage(to, cm) + if err != nil { + return nil, err + } + + // Can we do this without pausing? + // h.grandpa.paused.Store(true) + + timer := time.NewTimer(time.Second * 5) + defer timer.Stop() + + select { + case resp := <-h.grandpa.catchUpResponseCh: + fmt.Println("got a response, this is awesome") + return resp, nil + case <-timer.C: + return nil, errors.New("timeout") + } +} + func (h *MessageHandler) handleCommitMessage(msg *CommitMessage) error { logger.Debugf("received commit message, msg: %+v", msg) @@ -141,7 +208,7 @@ func (h *MessageHandler) handleCommitMessage(msg *CommitMessage) error { return nil } -func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest) (*ConsensusMessage, error) { +func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest, from peer.ID) (*ConsensusMessage, error) { if !h.grandpa.authority { return nil, nil //nolint:nilnil } @@ -157,15 +224,26 @@ func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest) (*ConsensusMe return nil, ErrInvalidCatchUpRound } - resp, err := h.grandpa.newCatchUpResponse(msg.Round, msg.SetID) + resp, err := h.grandpa.newCatchUpResponse(h.grandpa.state.round, h.grandpa.state.setID) + if err != nil { + return nil, err + } + + cm, err := resp.ToConsensusMessage() + if err != nil { + return nil, err + } + + err = h.grandpa.network.SendMessage(from, cm) if err != nil { return nil, err } logger.Debugf( - "sending catch up response with hash %s for round %d and set id %d", + "sent catch up response with hash %s for round %d and set id %d", resp.Hash, msg.Round, msg.SetID) - return resp.ToConsensusMessage() + + return nil, nil } func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { @@ -177,11 +255,6 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { "received catch up response with hash %s for round %d and set id %d", msg.Hash, msg.Round, msg.SetID) - // TODO: re-add catch-up logic (#1531) - if true { - return nil - } - // if we aren't currently expecting a catch up response, return if !h.grandpa.paused.Load().(bool) { logger.Debug("not currently paused, ignoring catch up response") @@ -192,10 +265,14 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { return ErrSetIDMismatch } - if msg.Round != h.grandpa.state.round-1 { + if msg.Round <= h.grandpa.state.round { return ErrInvalidCatchUpResponseRound } + // TODO: confirm if we should add the message to the channel after or before + // checking set id and round. + h.grandpa.catchUpResponseCh <- msg + prevote, err := h.verifyPreVoteJustification(msg) if err != nil { return err @@ -224,7 +301,16 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { // update state and signal to grandpa we are ready to initiate head, err := h.grandpa.blockState.GetHeader(msg.Hash) - if err != nil { + if errors.Is(err, chaindb.ErrKeyNotFound) { + h.grandpa.CatchUpResponseCacheLock.Lock() + + h.grandpa.CatchUpResponseCache[msg.Round] = *msg + + h.grandpa.CatchUpResponseCacheLock.Unlock() + + logger.Debugf("couldn not catch up to round %d, storing the catch up response to retry", msg.Round) + return nil + } else if err != nil { return err } @@ -232,7 +318,7 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { h.grandpa.state.round = msg.Round close(h.grandpa.resumed) h.grandpa.resumed = make(chan struct{}) - h.grandpa.paused.Store(false) + // h.grandpa.paused.Store(false) logger.Debugf("caught up to round; unpaused service and grandpa state round is %d", h.grandpa.state.round) return nil } From faa8f8261d37a1a1e611225a471d7557b838d45d Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Wed, 12 Jan 2022 18:29:32 +0530 Subject: [PATCH 02/36] lib/grandpa: ensure catch-up logic works got it working on devnet --- lib/grandpa/grandpa.go | 6 +-- lib/grandpa/message.go | 4 ++ lib/grandpa/message_handler.go | 86 +++++++++++++++++----------------- lib/grandpa/message_tracker.go | 25 +++++++--- lib/grandpa/vote_message.go | 5 ++ 5 files changed, 74 insertions(+), 52 deletions(-) diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index f04cfbce6d..afe63bba61 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -75,11 +75,11 @@ type Service struct { finalisedCh chan *types.FinalisationInfo neighbourMessage *NeighbourMessage // cached neighbour message - catchUpResponseCh chan *CatchUpResponse + // catchUpResponseCh chan *CatchUpResponse - CatchUpResponseCacheLock sync.Mutex + // CatchUpResponseCacheLock sync.Mutex // round number is used as key - CatchUpResponseCache map[uint64]CatchUpResponse + // CatchUpResponseCache map[uint64]CatchUpResponse } // Config represents a GRANDPA service configuration diff --git a/lib/grandpa/message.go b/lib/grandpa/message.go index 032462fea1..416f979558 100644 --- a/lib/grandpa/message.go +++ b/lib/grandpa/message.go @@ -11,6 +11,7 @@ import ( "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/crypto/ed25519" "github.com/ChainSafe/gossamer/pkg/scale" + "github.com/libp2p/go-libp2p-core/peer" ) // GrandpaMessage is implemented by all GRANDPA network messages @@ -191,7 +192,9 @@ func compactToJustification(vs []Vote, auths []AuthData) ([]SignedVote, error) { // CatchUpRequest struct to represent a CatchUpRequest message type CatchUpRequest struct { + // Round that we want to catch up to Round uint64 + // The voter set ID this message is from. SetID uint64 } @@ -231,6 +234,7 @@ type CatchUpResponse struct { PreCommitJustification []SignedVote Hash common.Hash Number uint32 + From peer.ID } func (s *Service) newCatchUpResponse(round, setID uint64) (*CatchUpResponse, error) { diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 6a57ec55d3..f3ed450b4f 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -9,7 +9,6 @@ import ( "fmt" "math/big" "reflect" - "time" "github.com/ChainSafe/chaindb" "github.com/ChainSafe/gossamer/dot/network" @@ -57,7 +56,6 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. case *CommitMessage: return nil, h.handleCommitMessage(msg) case *NeighbourMessage: - fmt.Println("got a neighbour message") // we can afford to not retry handling neighbour message, if it errors. return nil, h.handleNeighbourMessage(msg, from) case *CatchUpRequest: @@ -68,13 +66,20 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. return networkMessage, err case *CatchUpResponse: + logger.Debugf( + "received catch up response with hash %s for round %d and set id %d, from %s", + msg.Hash, msg.Round, msg.SetID, from) + err := h.handleCatchUpResponse(msg) - if errors.Is(err, blocktree.ErrNodeNotFound) { + if errors.Is(err, blocktree.ErrNodeNotFound) || errors.Is(err, chaindb.ErrKeyNotFound) { // TODO: we are adding these messages to reprocess them again, but we // haven't added code to reprocess them. Do that. // Also, revisit if we need to add these message in synchronous manner // or not. If not, change catchUpResponseMessages to a normal map. #1531 - h.grandpa.tracker.addCatchUpResponse(msg) + h.grandpa.tracker.addCatchUpResponse(&networkCatchUpResponseMessage{ + from: from, + msg: msg, + }) } else if err != nil { logger.Debugf("could not catchup: %s", err) } @@ -113,16 +118,17 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer logger.Debugf("got neighbour message with number %d, set id %d and round %d", msg.Number, msg.SetID, msg.Round) // TODO: should we send a justification request here? potentially re-connect this to sync package? (#1815) - highestRound, highestSetID, err := h.blockState.GetHighestRoundAndSetID() + highestRound, setID, err := h.blockState.GetHighestRoundAndSetID() if err != nil { return err } - logger.Debugf("lagging behind by %d", msg.Round-highestRound) + // TODO: printing uint64 tends to overflow sometimes, not sure why! + logger.Debugf("msg.Round %d - highestRound %d lagging behind by %d", msg.Round, highestRound, int(msg.Round)-int(highestRound)) // catch up only if we are behind by more than catchup threshold - if (msg.Round - highestRound) > CATCHUP_THRESHOLD { - catchUpResponse, err := h.sendCatchUpRequest( - from, newCatchUpRequest(highestRound, highestSetID), + if (int(msg.Round) - int(highestRound)) > CATCHUP_THRESHOLD { + _, err := h.sendCatchUpRequest( + from, newCatchUpRequest(msg.Round, setID), ) if err != nil { logger.Debugf("failed to send catch up request: %s", err.Error()) @@ -131,10 +137,10 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer logger.Debugf("sent a catch up request to node %s", from) - err = h.handleCatchUpResponse(catchUpResponse) - if err != nil { - return err - } + // err = h.handleCatchUpResponse(catchUpResponse) + // if err != nil { + // return err + // } } return nil @@ -151,19 +157,19 @@ func (h *MessageHandler) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) (*C return nil, err } - // Can we do this without pausing? - // h.grandpa.paused.Store(true) + h.grandpa.paused.Store(true) - timer := time.NewTimer(time.Second * 5) - defer timer.Stop() + // timer := time.NewTimer(time.Second * 10) + // defer timer.Stop() - select { - case resp := <-h.grandpa.catchUpResponseCh: - fmt.Println("got a response, this is awesome") - return resp, nil - case <-timer.C: - return nil, errors.New("timeout") - } + // select { + // case resp := <-h.grandpa.catchUpResponseCh: + // fmt.Println("got a response, this is awesome") + // return resp, nil + // case <-timer.C: + // return nil, errors.New("timeout") + // } + return nil, nil } func (h *MessageHandler) handleCommitMessage(msg *CommitMessage) error { @@ -221,14 +227,16 @@ func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest, from peer.ID) return nil, nil //nolint:nilnil } - logger.Debugf("received catch up request for round %d and set id %d", - msg.Round, msg.SetID) + logger.Debugf("received catch up request for round %d and set id %d, from %s", + msg.Round, msg.SetID, from) + + logger.Debugf("Our latest round is %d", h.grandpa.state.round) if msg.SetID != h.grandpa.state.setID { return nil, ErrSetIDMismatch } - if msg.Round >= h.grandpa.state.round { + if msg.Round > h.grandpa.state.round { return nil, ErrInvalidCatchUpRound } @@ -248,8 +256,8 @@ func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest, from peer.ID) } logger.Debugf( - "sent catch up response with hash %s for round %d and set id %d", - resp.Hash, msg.Round, msg.SetID) + "sent catch up response with hash %s for round %d and set id %d, to %s", + resp.Hash, msg.Round, msg.SetID, from) return nil, nil } @@ -259,9 +267,9 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { return nil } - logger.Debugf( - "received catch up response with hash %s for round %d and set id %d", - msg.Hash, msg.Round, msg.SetID) + // logger.Debugf( + // "received catch up response with hash %s for round %d and set id %d", + // msg.Hash, msg.Round, msg.SetID) // if we aren't currently expecting a catch up response, return if !h.grandpa.paused.Load().(bool) { @@ -279,7 +287,7 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { // TODO: confirm if we should add the message to the channel after or before // checking set id and round. - h.grandpa.catchUpResponseCh <- msg + // h.grandpa.catchUpResponseCh <- msg prevote, err := h.verifyPreVoteJustification(msg) if err != nil { @@ -309,16 +317,8 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { // update state and signal to grandpa we are ready to initiate head, err := h.grandpa.blockState.GetHeader(msg.Hash) - if errors.Is(err, chaindb.ErrKeyNotFound) { - h.grandpa.CatchUpResponseCacheLock.Lock() - - h.grandpa.CatchUpResponseCache[msg.Round] = *msg - - h.grandpa.CatchUpResponseCacheLock.Unlock() - + if err != nil { logger.Debugf("couldn not catch up to round %d, storing the catch up response to retry", msg.Round) - return nil - } else if err != nil { return err } @@ -326,7 +326,7 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { h.grandpa.state.round = msg.Round close(h.grandpa.resumed) h.grandpa.resumed = make(chan struct{}) - // h.grandpa.paused.Store(false) + h.grandpa.paused.Store(false) logger.Debugf("caught up to round; unpaused service and grandpa state round is %d", h.grandpa.state.round) return nil } diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index 4f540ac0dc..bebf1a0d41 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -26,8 +26,8 @@ type tracker struct { stopped chan struct{} catchUpResponseMessageMutex sync.Mutex - // round(uint64) is used as key and *CatchUpResponse as value - catchUpResponseMessages map[uint64]*CatchUpResponse + // block hash is used as key and *CatchUpResponse as value + catchUpResponseMessages map[common.Hash]*networkCatchUpResponseMessage } func newTracker(bs BlockState, handler *MessageHandler) *tracker { @@ -39,7 +39,7 @@ func newTracker(bs BlockState, handler *MessageHandler) *tracker { mapLock: sync.Mutex{}, in: bs.GetImportedBlockNotifierChannel(), stopped: make(chan struct{}), - catchUpResponseMessages: make(map[uint64]*CatchUpResponse), + catchUpResponseMessages: make(map[common.Hash]*networkCatchUpResponseMessage), } } @@ -75,10 +75,11 @@ func (t *tracker) addCommit(cm *CommitMessage) { t.commitMessages[cm.Vote.Hash] = cm } -func (t *tracker) addCatchUpResponse(cr *CatchUpResponse) { +func (t *tracker) addCatchUpResponse(cr *networkCatchUpResponseMessage) { t.catchUpResponseMessageMutex.Lock() defer t.catchUpResponseMessageMutex.Unlock() - t.catchUpResponseMessages[cr.Round] = cr + + t.catchUpResponseMessages[cr.msg.Hash] = cr } func (t *tracker) handleBlocks() { @@ -110,6 +111,8 @@ func (t *tracker) handleBlock(b *types.Block) { } } + // TODO: Check if we should delete all vote messages for h, + // if fail to process a few of the vote messages. delete(t.voteMessages, h) } @@ -117,8 +120,18 @@ func (t *tracker) handleBlock(b *types.Block) { _, err := t.handler.handleMessage("", cm) if err != nil { logger.Warnf("failed to handle commit message %v: %s", cm, err) + } else { + delete(t.commitMessages, h) } + } - delete(t.commitMessages, h) + // TODO: Can I use the same mapLock or do I need to use catchUpResponseLock? + if cr, has := t.catchUpResponseMessages[h]; has { + _, err := t.handler.handleMessage(cr.from, cr.msg) + if err != nil { + logger.Warnf("failed to handle catch up response message %v: %s", cr, err) + } else { + delete(t.catchUpResponseMessages, h) + } } } diff --git a/lib/grandpa/vote_message.go b/lib/grandpa/vote_message.go index 51eea9acb9..c0648595ed 100644 --- a/lib/grandpa/vote_message.go +++ b/lib/grandpa/vote_message.go @@ -22,6 +22,11 @@ type networkVoteMessage struct { msg *VoteMessage } +type networkCatchUpResponseMessage struct { + from peer.ID + msg *CatchUpResponse +} + // receiveVoteMessages receives messages from the in channel until a grandpa round finishes. func (s *Service) receiveVoteMessages(ctx context.Context) { for { From 0099a98d2169315bf8ea0eae258833b8b21eb416 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Wed, 12 Jan 2022 20:02:45 +0530 Subject: [PATCH 03/36] cleaning up a few things --- lib/grandpa/message_handler.go | 45 ++++++++++++++-------------------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index f3ed450b4f..7f021c74cc 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -22,7 +22,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) -const CATCHUP_THRESHOLD = 2 +const catchupThreshold = 2 // MessageHandler handles GRANDPA consensus messages type MessageHandler struct { @@ -59,12 +59,7 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. // we can afford to not retry handling neighbour message, if it errors. return nil, h.handleNeighbourMessage(msg, from) case *CatchUpRequest: - networkMessage, err := h.handleCatchUpRequest(msg, from) - if err != nil { - logger.Debugf("could not handle catch up request: %s", err) - } - - return networkMessage, err + return nil, h.handleCatchUpRequest(msg, from) case *CatchUpResponse: logger.Debugf( "received catch up response with hash %s for round %d and set id %d, from %s", @@ -72,9 +67,7 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. err := h.handleCatchUpResponse(msg) if errors.Is(err, blocktree.ErrNodeNotFound) || errors.Is(err, chaindb.ErrKeyNotFound) { - // TODO: we are adding these messages to reprocess them again, but we - // haven't added code to reprocess them. Do that. - // Also, revisit if we need to add these message in synchronous manner + // TODO: revisit if we need to add these message in synchronous manner // or not. If not, change catchUpResponseMessages to a normal map. #1531 h.grandpa.tracker.addCatchUpResponse(&networkCatchUpResponseMessage{ from: from, @@ -123,14 +116,12 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer return err } - // TODO: printing uint64 tends to overflow sometimes, not sure why! logger.Debugf("msg.Round %d - highestRound %d lagging behind by %d", msg.Round, highestRound, int(msg.Round)-int(highestRound)) // catch up only if we are behind by more than catchup threshold - if (int(msg.Round) - int(highestRound)) > CATCHUP_THRESHOLD { - _, err := h.sendCatchUpRequest( + if (int(msg.Round) - int(highestRound)) > catchupThreshold { + if err := h.sendCatchUpRequest( from, newCatchUpRequest(msg.Round, setID), - ) - if err != nil { + ); err != nil { logger.Debugf("failed to send catch up request: %s", err.Error()) return err } @@ -146,15 +137,15 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer return nil } -func (h *MessageHandler) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) (*CatchUpResponse, error) { +func (h *MessageHandler) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) error { cm, err := req.ToConsensusMessage() if err != nil { - return nil, err + return err } err = h.grandpa.network.SendMessage(to, cm) if err != nil { - return nil, err + return err } h.grandpa.paused.Store(true) @@ -169,7 +160,7 @@ func (h *MessageHandler) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) (*C // case <-timer.C: // return nil, errors.New("timeout") // } - return nil, nil + return nil } func (h *MessageHandler) handleCommitMessage(msg *CommitMessage) error { @@ -222,9 +213,9 @@ func (h *MessageHandler) handleCommitMessage(msg *CommitMessage) error { return nil } -func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest, from peer.ID) (*ConsensusMessage, error) { +func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest, from peer.ID) error { if !h.grandpa.authority { - return nil, nil //nolint:nilnil + return nil } logger.Debugf("received catch up request for round %d and set id %d, from %s", @@ -233,33 +224,33 @@ func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest, from peer.ID) logger.Debugf("Our latest round is %d", h.grandpa.state.round) if msg.SetID != h.grandpa.state.setID { - return nil, ErrSetIDMismatch + return ErrSetIDMismatch } if msg.Round > h.grandpa.state.round { - return nil, ErrInvalidCatchUpRound + return ErrInvalidCatchUpRound } resp, err := h.grandpa.newCatchUpResponse(h.grandpa.state.round, h.grandpa.state.setID) if err != nil { - return nil, err + return err } cm, err := resp.ToConsensusMessage() if err != nil { - return nil, err + return err } err = h.grandpa.network.SendMessage(from, cm) if err != nil { - return nil, err + return err } logger.Debugf( "sent catch up response with hash %s for round %d and set id %d, to %s", resp.Hash, msg.Round, msg.SetID, from) - return nil, nil + return nil } func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { From ec356aa891d4f2e362dda21f8ed34c762dcaaadc Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Wed, 19 Jan 2022 22:58:33 +0530 Subject: [PATCH 04/36] cleaning up more things --- dot/network/notifications.go | 3 ++- lib/grandpa/message_handler.go | 12 ++++++------ lib/runtime/wasmer/instance.go | 1 + 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 281006265f..2ea1c04e19 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -370,7 +370,7 @@ func (s *Service) sendHandshake(peer peer.ID, hs Handshake, info *notificationsP } if hsResponse.err != nil { - logger.Tracef("failed to read handshake from peer %s using protocol %s: %s", peer, info.protocolID, err) + logger.Tracef("failed to read handshake from peer %s using protocol %s: %s", peer, info.protocolID, hsResponse.err) closeOutboundStream(info, peer, stream) return nil, hsResponse.err } @@ -419,6 +419,7 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer continue } + info.outboundHandshakeMutexes.Store(peer, new(sync.Mutex)) go s.sendData(peer, hs, info, msg) } } diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 7f021c74cc..b92bdcaf22 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -108,7 +108,7 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer // return nil // } - logger.Debugf("got neighbour message with number %d, set id %d and round %d", msg.Number, msg.SetID, msg.Round) + logger.Debugf("got neighbour message with number %d, set id %d and round %d, from: %s ", msg.Number, msg.SetID, msg.Round, from) // TODO: should we send a justification request here? potentially re-connect this to sync package? (#1815) highestRound, setID, err := h.blockState.GetHighestRoundAndSetID() @@ -127,11 +127,6 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer } logger.Debugf("sent a catch up request to node %s", from) - - // err = h.handleCatchUpResponse(catchUpResponse) - // if err != nil { - // return err - // } } return nil @@ -143,11 +138,14 @@ func (h *MessageHandler) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) err return err } + // TODO: Uncomment if gossipping do not work err = h.grandpa.network.SendMessage(to, cm) if err != nil { return err } + // h.grandpa.network.GossipMessage(cm) + h.grandpa.paused.Store(true) // timer := time.NewTimer(time.Second * 10) @@ -246,6 +244,8 @@ func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest, from peer.ID) return err } + // h.grandpa.network.GossipMessage(cm) + logger.Debugf( "sent catch up response with hash %s for round %d and set id %d, to %s", resp.Hash, msg.Round, msg.SetID, from) diff --git a/lib/runtime/wasmer/instance.go b/lib/runtime/wasmer/instance.go index 7c87ceb287..9551f459a1 100644 --- a/lib/runtime/wasmer/instance.go +++ b/lib/runtime/wasmer/instance.go @@ -153,6 +153,7 @@ func NewInstance(code []byte, cfg *Config) (*Instance, error) { codeHash: cfg.CodeHash, } + // TODO: log this error in debug, trace or warn mode. Do not ignore it. inst.version, _ = inst.Version() return inst, nil } From 8320fc99caf72a6289f02f7bffc0f5b892ef4108 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Thu, 20 Jan 2022 17:28:07 +0530 Subject: [PATCH 05/36] cleaned up more code --- lib/grandpa/grandpa.go | 6 ------ lib/grandpa/message.go | 2 -- lib/grandpa/message_handler.go | 38 +++++++--------------------------- 3 files changed, 8 insertions(+), 38 deletions(-) diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index afe63bba61..40d7c8cdbc 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -74,12 +74,6 @@ type Service struct { in chan *networkVoteMessage // only used to receive *VoteMessage finalisedCh chan *types.FinalisationInfo neighbourMessage *NeighbourMessage // cached neighbour message - - // catchUpResponseCh chan *CatchUpResponse - - // CatchUpResponseCacheLock sync.Mutex - // round number is used as key - // CatchUpResponseCache map[uint64]CatchUpResponse } // Config represents a GRANDPA service configuration diff --git a/lib/grandpa/message.go b/lib/grandpa/message.go index 416f979558..6df1246d2a 100644 --- a/lib/grandpa/message.go +++ b/lib/grandpa/message.go @@ -11,7 +11,6 @@ import ( "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/crypto/ed25519" "github.com/ChainSafe/gossamer/pkg/scale" - "github.com/libp2p/go-libp2p-core/peer" ) // GrandpaMessage is implemented by all GRANDPA network messages @@ -234,7 +233,6 @@ type CatchUpResponse struct { PreCommitJustification []SignedVote Hash common.Hash Number uint32 - From peer.ID } func (s *Service) newCatchUpResponse(round, setID uint64) (*CatchUpResponse, error) { diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index b92bdcaf22..25165582cd 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -61,10 +61,6 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. case *CatchUpRequest: return nil, h.handleCatchUpRequest(msg, from) case *CatchUpResponse: - logger.Debugf( - "received catch up response with hash %s for round %d and set id %d, from %s", - msg.Hash, msg.Round, msg.SetID, from) - err := h.handleCatchUpResponse(msg) if errors.Is(err, blocktree.ErrNodeNotFound) || errors.Is(err, chaindb.ErrKeyNotFound) { // TODO: revisit if we need to add these message in synchronous manner @@ -116,9 +112,10 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer return err } - logger.Debugf("msg.Round %d - highestRound %d lagging behind by %d", msg.Round, highestRound, int(msg.Round)-int(highestRound)) // catch up only if we are behind by more than catchup threshold if (int(msg.Round) - int(highestRound)) > catchupThreshold { + logger.Debugf("lagging behind by %d rounds", int(msg.Round)-int(highestRound)) + if err := h.sendCatchUpRequest( from, newCatchUpRequest(msg.Round, setID), ); err != nil { @@ -126,7 +123,7 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer return err } - logger.Debugf("sent a catch up request to node %s", from) + logger.Debugf("successfully sent a catch up request to node %s, for round number %d and set ID %d", from, msg.Round, setID) } return nil @@ -138,26 +135,13 @@ func (h *MessageHandler) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) err return err } - // TODO: Uncomment if gossipping do not work err = h.grandpa.network.SendMessage(to, cm) if err != nil { return err } - // h.grandpa.network.GossipMessage(cm) - h.grandpa.paused.Store(true) - // timer := time.NewTimer(time.Second * 10) - // defer timer.Stop() - - // select { - // case resp := <-h.grandpa.catchUpResponseCh: - // fmt.Println("got a response, this is awesome") - // return resp, nil - // case <-timer.C: - // return nil, errors.New("timeout") - // } return nil } @@ -244,10 +228,8 @@ func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest, from peer.ID) return err } - // h.grandpa.network.GossipMessage(cm) - logger.Debugf( - "sent catch up response with hash %s for round %d and set id %d, to %s", + "successfully sent catch up response with hash %s for round %d and set id %d, to %s", resp.Hash, msg.Round, msg.SetID, from) return nil @@ -258,9 +240,9 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { return nil } - // logger.Debugf( - // "received catch up response with hash %s for round %d and set id %d", - // msg.Hash, msg.Round, msg.SetID) + logger.Debugf( + "processing catch up response with hash %s for round %d and set id %d", + msg.Hash, msg.Round, msg.SetID) // if we aren't currently expecting a catch up response, return if !h.grandpa.paused.Load().(bool) { @@ -276,10 +258,6 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { return ErrInvalidCatchUpResponseRound } - // TODO: confirm if we should add the message to the channel after or before - // checking set id and round. - // h.grandpa.catchUpResponseCh <- msg - prevote, err := h.verifyPreVoteJustification(msg) if err != nil { return err @@ -309,7 +287,7 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { // update state and signal to grandpa we are ready to initiate head, err := h.grandpa.blockState.GetHeader(msg.Hash) if err != nil { - logger.Debugf("couldn not catch up to round %d, storing the catch up response to retry", msg.Round) + logger.Debugf("failed to process catch up response for round %d, storing the catch up response to retry", msg.Round) return err } From 9a73bb2c72b76b9f59c531eed9241fbf73520ebb Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Thu, 20 Jan 2022 17:48:41 +0530 Subject: [PATCH 06/36] small improvement --- lib/grandpa/message_handler.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 25165582cd..002bd80e11 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -213,6 +213,8 @@ func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest, from peer.ID) return ErrInvalidCatchUpRound } + // We don't necessarily have to reply with the round asked in the request, we can reply + // with our latest round. resp, err := h.grandpa.newCatchUpResponse(h.grandpa.state.round, h.grandpa.state.setID) if err != nil { return err @@ -230,7 +232,7 @@ func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest, from peer.ID) logger.Debugf( "successfully sent catch up response with hash %s for round %d and set id %d, to %s", - resp.Hash, msg.Round, msg.SetID, from) + resp.Hash, h.grandpa.state.round, h.grandpa.state.setID, from) return nil } From 22e43c3b72a45e45569b09e5911c8b5c24bd4fe3 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Thu, 20 Jan 2022 18:20:48 +0530 Subject: [PATCH 07/36] fixing lint --- lib/grandpa/message_handler.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index af0bf7440d..7da35240e3 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -40,6 +40,8 @@ func NewMessageHandler(grandpa *Service, blockState BlockState, telemetryMailer } } +//nolint +// TODO: NotificationMessage is used at places. But NotificationMessage we return is always nil. // HandleMessage handles a GRANDPA consensus message // if it is a CommitMessage, it updates the BlockState // if it is a VoteMessage, it sends it to the GRANDPA service @@ -106,7 +108,8 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer // return nil // } - logger.Debugf("got neighbour message with number %d, set id %d and round %d, from: %s ", msg.Number, msg.SetID, msg.Round, from) + logger.Debugf("got neighbour message with number %d, set id %d and round %d, from: %s ", + msg.Number, msg.SetID, msg.Round, from) // TODO: should we send a justification request here? potentially re-connect this to sync package? (#1815) highestRound, setID, err := h.blockState.GetHighestRoundAndSetID() @@ -125,7 +128,8 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer return err } - logger.Debugf("successfully sent a catch up request to node %s, for round number %d and set ID %d", from, msg.Round, setID) + logger.Debugf("successfully sent a catch up request to node %s, for round number %d and set ID %d", + from, msg.Round, setID) } return nil From 87c3ccf09e27d86cebfcd4a6781e7ba1e3a2aff6 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Sat, 22 Jan 2022 02:36:13 +0530 Subject: [PATCH 08/36] Addressed some reviews + tests changes --- lib/grandpa/message.go | 21 ++++++++++ lib/grandpa/message_handler.go | 15 ++++--- lib/grandpa/message_handler_test.go | 62 ++++++++++++++++------------- lib/grandpa/message_tracker.go | 6 +-- lib/grandpa/network.go | 22 ++-------- 5 files changed, 70 insertions(+), 56 deletions(-) diff --git a/lib/grandpa/message.go b/lib/grandpa/message.go index 6df1246d2a..d61cfaae98 100644 --- a/lib/grandpa/message.go +++ b/lib/grandpa/message.go @@ -13,9 +13,20 @@ import ( "github.com/ChainSafe/gossamer/pkg/scale" ) +type GrandpaMessageType int64 + +const ( + VoteMessageType GrandpaMessageType = iota + CommitMessageType + NeighborMessageType + CatchUpRequestMessageType + CatchUpResponseMessageType +) + // GrandpaMessage is implemented by all GRANDPA network messages type GrandpaMessage interface { //nolint:revive ToConsensusMessage() (*network.ConsensusMessage, error) + Type() GrandpaMessageType } // NewGrandpaMessage returns a new VaryingDataType to represent a GrandpaMessage @@ -59,6 +70,8 @@ type VoteMessage struct { // Index Returns VDT index func (v VoteMessage) Index() uint { return 0 } +func (v VoteMessage) Type() GrandpaMessageType { return VoteMessageType } + // ToConsensusMessage converts the VoteMessage into a network-level consensus message func (v *VoteMessage) ToConsensusMessage() (*ConsensusMessage, error) { msg := newGrandpaMessage() @@ -88,6 +101,8 @@ type NeighbourMessage struct { // Index Returns VDT index func (m NeighbourMessage) Index() uint { return 2 } +func (v NeighbourMessage) Type() GrandpaMessageType { return NeighborMessageType } + // ToConsensusMessage converts the NeighbourMessage into a network-level consensus message func (m *NeighbourMessage) ToConsensusMessage() (*network.ConsensusMessage, error) { msg := newGrandpaMessage() @@ -139,6 +154,8 @@ func (s *Service) newCommitMessage(header *types.Header, round uint64) (*CommitM // Index Returns VDT index func (f CommitMessage) Index() uint { return 1 } +func (v CommitMessage) Type() GrandpaMessageType { return CommitMessageType } + // ToConsensusMessage converts the CommitMessage into a network-level consensus message func (f *CommitMessage) ToConsensusMessage() (*ConsensusMessage, error) { msg := newGrandpaMessage() @@ -207,6 +224,8 @@ func newCatchUpRequest(round, setID uint64) *CatchUpRequest { // Index Returns VDT index func (r CatchUpRequest) Index() uint { return 3 } +func (v CatchUpRequest) Type() GrandpaMessageType { return CatchUpRequestMessageType } + // ToConsensusMessage converts the catchUpRequest into a network-level consensus message func (r *CatchUpRequest) ToConsensusMessage() (*ConsensusMessage, error) { msg := newGrandpaMessage() @@ -264,6 +283,8 @@ func (s *Service) newCatchUpResponse(round, setID uint64) (*CatchUpResponse, err // Index Returns VDT index func (r CatchUpResponse) Index() uint { return 4 } +func (v CatchUpResponse) Type() GrandpaMessageType { return CatchUpResponseMessageType } + // ToConsensusMessage converts the catchUpResponse into a network-level consensus message func (r *CatchUpResponse) ToConsensusMessage() (*ConsensusMessage, error) { msg := newGrandpaMessage() diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 7da35240e3..35f8ed0a4f 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -11,7 +11,6 @@ import ( "reflect" "github.com/ChainSafe/chaindb" - "github.com/ChainSafe/gossamer/dot/network" "github.com/ChainSafe/gossamer/dot/telemetry" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/blocktree" @@ -45,7 +44,7 @@ func NewMessageHandler(grandpa *Service, blockState BlockState, telemetryMailer // HandleMessage handles a GRANDPA consensus message // if it is a CommitMessage, it updates the BlockState // if it is a VoteMessage, it sends it to the GRANDPA service -func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network.NotificationsMessage, error) { +func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) error { logger.Tracef("handling grandpa message: %v", m) switch msg := m.(type) { @@ -56,14 +55,14 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. msg: msg, } - return nil, nil + return nil case *CommitMessage: - return nil, h.handleCommitMessage(msg) + return h.handleCommitMessage(msg) case *NeighbourMessage: // we can afford to not retry handling neighbour message, if it errors. - return nil, h.handleNeighbourMessage(msg, from) + return h.handleNeighbourMessage(msg, from) case *CatchUpRequest: - return nil, h.handleCatchUpRequest(msg, from) + return h.handleCatchUpRequest(msg, from) case *CatchUpResponse: err := h.handleCatchUpResponse(msg) if errors.Is(err, blocktree.ErrNodeNotFound) || errors.Is(err, chaindb.ErrKeyNotFound) { @@ -77,9 +76,9 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. logger.Debugf("could not catchup: %s", err) } - return nil, err + return err default: - return nil, ErrInvalidMessageType + return ErrInvalidMessageType } } diff --git a/lib/grandpa/message_handler_test.go b/lib/grandpa/message_handler_test.go index 437d755c4c..c2563eb46a 100644 --- a/lib/grandpa/message_handler_test.go +++ b/lib/grandpa/message_handler_test.go @@ -173,9 +173,8 @@ func TestMessageHandler_VoteMessage(t *testing.T) { telemetryMock.EXPECT().SendMessage(gomock.Any()).AnyTimes() h := NewMessageHandler(gs, st.Block, telemetryMock) - out, err := h.handleMessage("", vm) + err = h.handleMessage("", vm) require.NoError(t, err) - require.Nil(t, out) select { case vote := <-gs.in: @@ -200,7 +199,7 @@ func TestMessageHandler_NeighbourMessage(t *testing.T) { SetID: 3, Number: 1, } - _, err := h.handleMessage("", msg) + err := h.handleMessage("", msg) require.NoError(t, err) digest := types.NewDigest() @@ -224,9 +223,8 @@ func TestMessageHandler_NeighbourMessage(t *testing.T) { err = st.Block.AddBlock(block) require.NoError(t, err) - out, err := h.handleMessage("", msg) + err = h.handleMessage("", msg) require.NoError(t, err) - require.Nil(t, out) } func TestMessageHandler_VerifyJustification_InvalidSig(t *testing.T) { @@ -283,9 +281,8 @@ func TestMessageHandler_CommitMessage_NoCatchUpRequest_ValidSig(t *testing.T) { telemetryMock.EXPECT().SendMessage(gomock.Any()).AnyTimes() h := NewMessageHandler(gs, st.Block, telemetryMock) - out, err := h.handleMessage("", fm) + err = h.handleMessage("", fm) require.NoError(t, err) - require.Nil(t, out) hash, err := st.Block.GetFinalisedHash(fm.Round, gs.state.setID) require.NoError(t, err) @@ -310,9 +307,8 @@ func TestMessageHandler_CommitMessage_NoCatchUpRequest_MinVoteError(t *testing.T telemetryMock.EXPECT().SendMessage(gomock.Any()).AnyTimes() h := NewMessageHandler(gs, st.Block, telemetryMock) - out, err := h.handleMessage("", fm) + err = h.handleMessage("", fm) require.EqualError(t, err, ErrMinVotesNotMet.Error()) - require.Nil(t, out) } func TestMessageHandler_CommitMessage_WithCatchUpRequest(t *testing.T) { @@ -338,7 +334,7 @@ func TestMessageHandler_CommitMessage_WithCatchUpRequest(t *testing.T) { telemetryMock.EXPECT().SendMessage(gomock.Any()).AnyTimes() h := NewMessageHandler(gs, st.Block, telemetryMock) - _, err = h.handleMessage("", fm) + err = h.handleMessage("", fm) require.NoError(t, err) } @@ -351,7 +347,7 @@ func TestMessageHandler_CatchUpRequest_InvalidRound(t *testing.T) { telemetryMock.EXPECT().SendMessage(gomock.Any()).AnyTimes() h := NewMessageHandler(gs, st.Block, telemetryMock) - _, err := h.handleMessage("", req) + err := h.handleMessage("", req) require.Equal(t, ErrInvalidCatchUpRound, err) } @@ -364,11 +360,11 @@ func TestMessageHandler_CatchUpRequest_InvalidSetID(t *testing.T) { telemetryMock.EXPECT().SendMessage(gomock.Any()).AnyTimes() h := NewMessageHandler(gs, st.Block, telemetryMock) - _, err := h.handleMessage("", req) + err := h.handleMessage("", req) require.Equal(t, ErrSetIDMismatch, err) } -func TestMessageHandler_CatchUpRequest_WithResponse(t *testing.T) { +func TestMessageHandler_CatchUpResponse(t *testing.T) { gs, st := newTestService(t) // set up needed info for response @@ -419,23 +415,36 @@ func TestMessageHandler_CatchUpRequest_WithResponse(t *testing.T) { err = gs.grandpaState.SetPrecommits(round, setID, pcj) require.NoError(t, err) - resp, err := gs.newCatchUpResponse(round, setID) - require.NoError(t, err) - - expected, err := resp.ToConsensusMessage() - require.NoError(t, err) - - // create and handle request - req := newCatchUpRequest(round, setID) - ctrl := gomock.NewController(t) telemetryMock := NewMockClient(ctrl) telemetryMock.EXPECT().SendMessage(gomock.Any()).AnyTimes() h := NewMessageHandler(gs, st.Block, telemetryMock) - out, err := h.handleMessage("", req) - require.NoError(t, err) - require.Equal(t, expected, out) + + h.grandpa.paused.Store(true) + + // If catching up to a previous round, throw an error + err = h.handleMessage("", &CatchUpResponse{ + SetID: setID, + Round: 1, + PreVoteJustification: []types.GrandpaSignedVote{ + { + Vote: *testVote, + Signature: testSignature, + AuthorityID: testAuthorityID, + }, + }, + PreCommitJustification: []types.GrandpaSignedVote{ + { + Vote: *testVote, + Signature: testSignature, + AuthorityID: testAuthorityID, + }, + }, + Hash: common.Hash{}, + Number: 1, + }) + require.ErrorIs(t, err, ErrInvalidCatchUpResponseRound) } func TestVerifyJustification(t *testing.T) { @@ -568,9 +577,8 @@ func TestMessageHandler_HandleCatchUpResponse(t *testing.T) { Number: uint32(round), } - out, err := h.handleMessage("", msg) + err = h.handleMessage("", msg) require.NoError(t, err) - require.Nil(t, out) require.Equal(t, round+1, gs.state.round) } diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index bebf1a0d41..488a7e69f1 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -105,7 +105,7 @@ func (t *tracker) handleBlock(b *types.Block) { if vms, has := t.voteMessages[h]; has { for _, v := range vms { // handleMessage would never error for vote message - _, err := t.handler.handleMessage(v.from, v.msg) + err := t.handler.handleMessage(v.from, v.msg) if err != nil { logger.Warnf("failed to handle vote message %v: %s", v, err) } @@ -117,7 +117,7 @@ func (t *tracker) handleBlock(b *types.Block) { } if cm, has := t.commitMessages[h]; has { - _, err := t.handler.handleMessage("", cm) + err := t.handler.handleMessage("", cm) if err != nil { logger.Warnf("failed to handle commit message %v: %s", cm, err) } else { @@ -127,7 +127,7 @@ func (t *tracker) handleBlock(b *types.Block) { // TODO: Can I use the same mapLock or do I need to use catchUpResponseLock? if cr, has := t.catchUpResponseMessages[h]; has { - _, err := t.handler.handleMessage(cr.from, cr.msg) + err := t.handler.handleMessage(cr.from, cr.msg) if err != nil { logger.Warnf("failed to handle catch up response message %v: %s", cr, err) } else { diff --git a/lib/grandpa/network.go b/lib/grandpa/network.go index 089a9c36bd..454d703588 100644 --- a/lib/grandpa/network.go +++ b/lib/grandpa/network.go @@ -135,28 +135,14 @@ func (s *Service) handleNetworkMessage(from peer.ID, msg NotificationsMessage) ( return false, err } - resp, err := s.messageHandler.handleMessage(from, m) + err = s.messageHandler.handleMessage(from, m) if err != nil { return false, err } - switch r := resp.(type) { - case *ConsensusMessage: - if r != nil { - s.network.GossipMessage(resp) - } - case nil: - default: - logger.Warnf( - "unexpected type %T returned from message handler: %v", - resp, resp) - } - - switch m.(type) { - case *NeighbourMessage: - return false, nil - case *CatchUpResponse: - return false, nil + switch m.Type() { + case VoteMessageType, CommitMessageType: + s.network.GossipMessage(msg) } return true, nil From 75c4c4fc39bf4aede2451aba57490c686ade1399 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Tue, 25 Jan 2022 23:53:00 +0530 Subject: [PATCH 09/36] addressed some more reviews --- lib/grandpa/message_handler.go | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 35f8ed0a4f..5e89a744cc 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -95,27 +95,31 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer // TODO; determine if there is some reason we don't receive justifications in responses near the head (usually), // and remove the following code if it's fixed. (#1815) - // head, err := h.blockState.BestBlockNumber() - // if err != nil { - // return err - // } - - // TODO: Why are we ignoring these? Isn't - // msg.Number likely to be higher if we are lagging behind? - // ignore neighbour messages that are above our head - // if int64(msg.Number) > head.Int64() { - // return nil - // } + head, err := h.blockState.BestBlockNumber() + if err != nil { + return err + } + + // we shouldn't send a catch up request for blocks we haven't synced yet + // as we won't be able to process them. We also receive neighbour messages + // each time a new block is finalized, so we get them very often. + if int64(msg.Number) > head.Int64() { + logger.Debug("ignoring neighbour message, because we have not synced to this block number") + return nil + } logger.Debugf("got neighbour message with number %d, set id %d and round %d, from: %s ", msg.Number, msg.SetID, msg.Round, from) - // TODO: should we send a justification request here? potentially re-connect this to sync package? (#1815) highestRound, setID, err := h.blockState.GetHighestRoundAndSetID() if err != nil { return err } + if msg.SetID != setID { + return ErrSetIDMismatch + } + // catch up only if we are behind by more than catchup threshold if (int(msg.Round) - int(highestRound)) > catchupThreshold { logger.Debugf("lagging behind by %d rounds", int(msg.Round)-int(highestRound)) From c1cd3f4e40e9a9051ede2104b0aae635fdba0d90 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Wed, 26 Jan 2022 15:48:59 +0530 Subject: [PATCH 10/36] moved catchup logic in a separate file --- lib/grandpa/catch-up.go | 235 ++++++++++++++++++++++++++++ lib/grandpa/grandpa.go | 3 +- lib/grandpa/message_handler.go | 207 ++---------------------- lib/grandpa/message_handler_test.go | 16 +- 4 files changed, 254 insertions(+), 207 deletions(-) create mode 100644 lib/grandpa/catch-up.go diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go new file mode 100644 index 0000000000..2f46558abd --- /dev/null +++ b/lib/grandpa/catch-up.go @@ -0,0 +1,235 @@ +// Copyright 2021 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package grandpa + +import ( + "github.com/ChainSafe/gossamer/lib/common" + "github.com/ChainSafe/gossamer/lib/crypto/ed25519" + "github.com/libp2p/go-libp2p-core/peer" +) + +type catchUp struct { + requestsSent map[peer.ID]CatchUpRequest + bestResponse *CatchUpResponse + grandpa *Service +} + +func newCatchUp(grandpa *Service) *catchUp { + return &catchUp{ + requestsSent: make(map[peer.ID]CatchUpRequest), + grandpa: grandpa, + } +} + +func (c *catchUp) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) error { + if c.bestResponse != nil { + logger.Debugf("ignoring neighbour message since we are already processing a catch response") + return nil + } + + // TODO: Clean up all request sent before 5 min / (neighbour message interval) + _, ok := c.requestsSent[to] + if ok { + logger.Debugf("ignoring neighbour message since we already sent a catch request to this peer: %s", to) + return nil + } + + cm, err := req.ToConsensusMessage() + if err != nil { + return err + } + + err = c.grandpa.network.SendMessage(to, cm) + if err != nil { + return err + } + + c.requestsSent[to] = *req + c.grandpa.paused.Store(true) + + return nil +} + +func (c *catchUp) handleCatchUpResponse(msg *CatchUpResponse) error { + if !c.grandpa.authority { + return nil + } + + logger.Debugf( + "processing catch up response with hash %s for round %d and set id %d", + msg.Hash, msg.Round, msg.SetID) + + // if we aren't currently expecting a catch up response, return + if !c.grandpa.paused.Load().(bool) { + logger.Debug("not currently paused, ignoring catch up response") + return nil + } + + if msg.SetID != c.grandpa.state.setID { + return ErrSetIDMismatch + } + + if msg.Round <= c.grandpa.state.round { + return ErrInvalidCatchUpResponseRound + } + + if c.bestResponse.Round >= msg.Round { + logger.Debug("ignoring catch up response, since we are already processing one with a higher round") + } + + prevote, err := c.verifyPreVoteJustification(msg) + if err != nil { + return err + } + + if err = c.verifyPreCommitJustification(msg); err != nil { + return err + } + + if msg.Hash.IsEmpty() || msg.Number == 0 { + return ErrGHOSTlessCatchUp + } + + if err = c.verifyCatchUpResponseCompletability(prevote, msg.Hash); err != nil { + return err + } + + // set prevotes and precommits in db + if err = c.grandpa.grandpaState.SetPrevotes(msg.Round, msg.SetID, msg.PreVoteJustification); err != nil { + return err + } + + if err = c.grandpa.grandpaState.SetPrecommits(msg.Round, msg.SetID, msg.PreCommitJustification); err != nil { + return err + } + + // update state and signal to grandpa we are ready to initiate + head, err := c.grandpa.blockState.GetHeader(msg.Hash) + if err != nil { + logger.Debugf("failed to process catch up response for round %d, storing the catch up response to retry", msg.Round) + return err + } + + c.grandpa.head = head + c.grandpa.state.round = msg.Round + close(c.grandpa.resumed) + c.grandpa.resumed = make(chan struct{}) + c.grandpa.paused.Store(false) + c.bestResponse = nil + + logger.Debugf("caught up to round; unpaused service and grandpa state round is %d", c.grandpa.state.round) + return nil +} + +// verifyCatchUpResponseCompletability verifies that the pre-commit block is a descendant of, or is, the pre-voted block +func (c *catchUp) verifyCatchUpResponseCompletability(prevote, precommit common.Hash) error { + if prevote == precommit { + return nil + } + + // check if the current block is a descendant of prevoted block + isDescendant, err := c.grandpa.blockState.IsDescendantOf(prevote, precommit) + if err != nil { + return err + } + + if !isDescendant { + return ErrCatchUpResponseNotCompletable + } + + return nil +} + +func (c *catchUp) verifyPreCommitJustification(msg *CatchUpResponse) error { + auths := make([]AuthData, len(msg.PreCommitJustification)) + for i, pcj := range msg.PreCommitJustification { + auths[i] = AuthData{AuthorityID: pcj.AuthorityID} + } + + eqvVoters := getEquivocatoryVoters(auths) + + // verify pre-commit justification + var count uint64 + for idx := range msg.PreCommitJustification { + just := &msg.PreCommitJustification[idx] + + if _, ok := eqvVoters[just.AuthorityID]; ok { + continue + } + + err := verifyJustification(c.grandpa, just, msg.Round, msg.SetID, precommit) + if err != nil { + continue + } + + if just.Vote.Hash == msg.Hash && just.Vote.Number == msg.Number { + count++ + } + } + + if count+uint64(len(eqvVoters)) < c.grandpa.state.threshold() { + return ErrMinVotesNotMet + } + + return nil +} + +func (c *catchUp) verifyPreVoteJustification(msg *CatchUpResponse) (common.Hash, error) { + voters := make(map[ed25519.PublicKeyBytes]map[common.Hash]int, len(msg.PreVoteJustification)) + eqVotesByHash := make(map[common.Hash]map[ed25519.PublicKeyBytes]struct{}) + + // identify equivocatory votes by hash + for _, justification := range msg.PreVoteJustification { + hashsToCount, ok := voters[justification.AuthorityID] + if !ok { + hashsToCount = make(map[common.Hash]int) + } + + hashsToCount[justification.Vote.Hash]++ + voters[justification.AuthorityID] = hashsToCount + + if hashsToCount[justification.Vote.Hash] > 1 { + pubKeysOnHash, ok := eqVotesByHash[justification.Vote.Hash] + if !ok { + pubKeysOnHash = make(map[ed25519.PublicKeyBytes]struct{}) + } + + pubKeysOnHash[justification.AuthorityID] = struct{}{} + eqVotesByHash[justification.Vote.Hash] = pubKeysOnHash + } + } + + // verify pre-vote justification, returning the pre-voted block if there is one + votes := make(map[common.Hash]uint64) + for idx := range msg.PreVoteJustification { + just := &msg.PreVoteJustification[idx] + + // if the current voter is on equivocatory map then ignore the vote + if _, ok := eqVotesByHash[just.Vote.Hash][just.AuthorityID]; ok { + continue + } + + err := verifyJustification(c.grandpa, just, msg.Round, msg.SetID, prevote) + if err != nil { + continue + } + + votes[just.Vote.Hash]++ + } + + var prevote common.Hash + for hash, count := range votes { + equivocatoryVotes := eqVotesByHash[hash] + if count+uint64(len(equivocatoryVotes)) >= c.grandpa.state.threshold() { + prevote = hash + break + } + } + + if prevote.IsEmpty() { + return prevote, ErrMinVotesNotMet + } + + return prevote, nil +} diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index 4f80c62323..7bcee36716 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -74,7 +74,8 @@ type Service struct { in chan *networkVoteMessage // only used to receive *VoteMessage finalisedCh chan *types.FinalisationInfo neighbourMessage *NeighbourMessage // cached neighbour message - + // requestsSent map[peer.ID]CatchUpRequest + // bestResponse *CatchUpResponse telemetry telemetry.Client } diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 5e89a744cc..1467ee66ab 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -26,6 +26,7 @@ const catchupThreshold = 2 // MessageHandler handles GRANDPA consensus messages type MessageHandler struct { grandpa *Service + catchUp *catchUp blockState BlockState telemetry telemetry.Client } @@ -35,6 +36,7 @@ func NewMessageHandler(grandpa *Service, blockState BlockState, telemetryMailer return &MessageHandler{ grandpa: grandpa, blockState: blockState, + catchUp: newCatchUp(grandpa), telemetry: telemetryMailer, } } @@ -64,7 +66,8 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) error { case *CatchUpRequest: return h.handleCatchUpRequest(msg, from) case *CatchUpResponse: - err := h.handleCatchUpResponse(msg) + // err := h.handleCatchUpResponse(msg) + err := h.catchUp.handleCatchUpResponse(msg) if errors.Is(err, blocktree.ErrNodeNotFound) || errors.Is(err, chaindb.ErrKeyNotFound) { // TODO: revisit if we need to add these message in synchronous manner // or not. If not, change catchUpResponseMessages to a normal map. #1531 @@ -72,6 +75,7 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) error { from: from, msg: msg, }) + h.catchUp.bestResponse = msg } else if err != nil { logger.Debugf("could not catchup: %s", err) } @@ -124,7 +128,7 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer if (int(msg.Round) - int(highestRound)) > catchupThreshold { logger.Debugf("lagging behind by %d rounds", int(msg.Round)-int(highestRound)) - if err := h.sendCatchUpRequest( + if err := h.catchUp.sendCatchUpRequest( from, newCatchUpRequest(msg.Round, setID), ); err != nil { logger.Debugf("failed to send catch up request: %s", err.Error()) @@ -138,22 +142,6 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer return nil } -func (h *MessageHandler) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) error { - cm, err := req.ToConsensusMessage() - if err != nil { - return err - } - - err = h.grandpa.network.SendMessage(to, cm) - if err != nil { - return err - } - - h.grandpa.paused.Store(true) - - return nil -} - func (h *MessageHandler) handleCommitMessage(msg *CommitMessage) error { logger.Debugf("received commit message, msg: %+v", msg) @@ -243,90 +231,6 @@ func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest, from peer.ID) return nil } -func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { - if !h.grandpa.authority { - return nil - } - - logger.Debugf( - "processing catch up response with hash %s for round %d and set id %d", - msg.Hash, msg.Round, msg.SetID) - - // if we aren't currently expecting a catch up response, return - if !h.grandpa.paused.Load().(bool) { - logger.Debug("not currently paused, ignoring catch up response") - return nil - } - - if msg.SetID != h.grandpa.state.setID { - return ErrSetIDMismatch - } - - if msg.Round <= h.grandpa.state.round { - return ErrInvalidCatchUpResponseRound - } - - prevote, err := h.verifyPreVoteJustification(msg) - if err != nil { - return err - } - - if err = h.verifyPreCommitJustification(msg); err != nil { - return err - } - - if msg.Hash.IsEmpty() || msg.Number == 0 { - return ErrGHOSTlessCatchUp - } - - if err = h.verifyCatchUpResponseCompletability(prevote, msg.Hash); err != nil { - return err - } - - // set prevotes and precommits in db - if err = h.grandpa.grandpaState.SetPrevotes(msg.Round, msg.SetID, msg.PreVoteJustification); err != nil { - return err - } - - if err = h.grandpa.grandpaState.SetPrecommits(msg.Round, msg.SetID, msg.PreCommitJustification); err != nil { - return err - } - - // update state and signal to grandpa we are ready to initiate - head, err := h.grandpa.blockState.GetHeader(msg.Hash) - if err != nil { - logger.Debugf("failed to process catch up response for round %d, storing the catch up response to retry", msg.Round) - return err - } - - h.grandpa.head = head - h.grandpa.state.round = msg.Round - close(h.grandpa.resumed) - h.grandpa.resumed = make(chan struct{}) - h.grandpa.paused.Store(false) - logger.Debugf("caught up to round; unpaused service and grandpa state round is %d", h.grandpa.state.round) - return nil -} - -// verifyCatchUpResponseCompletability verifies that the pre-commit block is a descendant of, or is, the pre-voted block -func (h *MessageHandler) verifyCatchUpResponseCompletability(prevote, precommit common.Hash) error { - if prevote == precommit { - return nil - } - - // check if the current block is a descendant of prevoted block - isDescendant, err := h.grandpa.blockState.IsDescendantOf(prevote, precommit) - if err != nil { - return err - } - - if !isDescendant { - return ErrCatchUpResponseNotCompletable - } - - return nil -} - func getEquivocatoryVoters(votes []AuthData) map[ed25519.PublicKeyBytes]struct{} { eqvVoters := make(map[ed25519.PublicKeyBytes]struct{}) voters := make(map[ed25519.PublicKeyBytes]int, len(votes)) @@ -362,7 +266,7 @@ func (h *MessageHandler) verifyCommitMessageJustification(fm *CommitMessage) err AuthorityID: fm.AuthData[i].AuthorityID, } - err := h.verifyJustification(just, fm.Round, h.grandpa.state.setID, precommit) + err := verifyJustification(h.grandpa, just, fm.Round, h.grandpa.state.setID, precommit) if err != nil { continue } @@ -390,100 +294,7 @@ func (h *MessageHandler) verifyCommitMessageJustification(fm *CommitMessage) err return nil } -func (h *MessageHandler) verifyPreVoteJustification(msg *CatchUpResponse) (common.Hash, error) { - voters := make(map[ed25519.PublicKeyBytes]map[common.Hash]int, len(msg.PreVoteJustification)) - eqVotesByHash := make(map[common.Hash]map[ed25519.PublicKeyBytes]struct{}) - - // identify equivocatory votes by hash - for _, justification := range msg.PreVoteJustification { - hashsToCount, ok := voters[justification.AuthorityID] - if !ok { - hashsToCount = make(map[common.Hash]int) - } - - hashsToCount[justification.Vote.Hash]++ - voters[justification.AuthorityID] = hashsToCount - - if hashsToCount[justification.Vote.Hash] > 1 { - pubKeysOnHash, ok := eqVotesByHash[justification.Vote.Hash] - if !ok { - pubKeysOnHash = make(map[ed25519.PublicKeyBytes]struct{}) - } - - pubKeysOnHash[justification.AuthorityID] = struct{}{} - eqVotesByHash[justification.Vote.Hash] = pubKeysOnHash - } - } - - // verify pre-vote justification, returning the pre-voted block if there is one - votes := make(map[common.Hash]uint64) - for idx := range msg.PreVoteJustification { - just := &msg.PreVoteJustification[idx] - - // if the current voter is on equivocatory map then ignore the vote - if _, ok := eqVotesByHash[just.Vote.Hash][just.AuthorityID]; ok { - continue - } - - err := h.verifyJustification(just, msg.Round, msg.SetID, prevote) - if err != nil { - continue - } - - votes[just.Vote.Hash]++ - } - - var prevote common.Hash - for hash, count := range votes { - equivocatoryVotes := eqVotesByHash[hash] - if count+uint64(len(equivocatoryVotes)) >= h.grandpa.state.threshold() { - prevote = hash - break - } - } - - if prevote.IsEmpty() { - return prevote, ErrMinVotesNotMet - } - - return prevote, nil -} - -func (h *MessageHandler) verifyPreCommitJustification(msg *CatchUpResponse) error { - auths := make([]AuthData, len(msg.PreCommitJustification)) - for i, pcj := range msg.PreCommitJustification { - auths[i] = AuthData{AuthorityID: pcj.AuthorityID} - } - - eqvVoters := getEquivocatoryVoters(auths) - - // verify pre-commit justification - var count uint64 - for idx := range msg.PreCommitJustification { - just := &msg.PreCommitJustification[idx] - - if _, ok := eqvVoters[just.AuthorityID]; ok { - continue - } - - err := h.verifyJustification(just, msg.Round, msg.SetID, precommit) - if err != nil { - continue - } - - if just.Vote.Hash == msg.Hash && just.Vote.Number == msg.Number { - count++ - } - } - - if count+uint64(len(eqvVoters)) < h.grandpa.state.threshold() { - return ErrMinVotesNotMet - } - - return nil -} - -func (h *MessageHandler) verifyJustification(just *SignedVote, round, setID uint64, stage Subround) error { +func verifyJustification(grandpa *Service, just *SignedVote, round, setID uint64, stage Subround) error { // verify signature msg, err := scale.Marshal(FullVote{ Stage: stage, @@ -512,7 +323,7 @@ func (h *MessageHandler) verifyJustification(just *SignedVote, round, setID uint // verify authority in justification set authFound := false - for _, auth := range h.grandpa.authorities() { + for _, auth := range grandpa.authorities() { justKey, err := just.AuthorityID.Encode() if err != nil { return err diff --git a/lib/grandpa/message_handler_test.go b/lib/grandpa/message_handler_test.go index c2563eb46a..cf5c1a6765 100644 --- a/lib/grandpa/message_handler_test.go +++ b/lib/grandpa/message_handler_test.go @@ -242,7 +242,7 @@ func TestMessageHandler_VerifyJustification_InvalidSig(t *testing.T) { telemetryMock.EXPECT().SendMessage(gomock.Any()).AnyTimes() h := NewMessageHandler(gs, st.Block, telemetryMock) - err := h.verifyJustification(just, gs.state.round, gs.state.setID, precommit) + err := verifyJustification(h.grandpa, just, gs.state.round, gs.state.setID, precommit) require.Equal(t, err, ErrInvalidSignature) } @@ -462,7 +462,7 @@ func TestVerifyJustification(t *testing.T) { AuthorityID: kr.Alice().Public().(*ed25519.PublicKey).AsBytes(), } - err := h.verifyJustification(just, 77, gs.state.setID, precommit) + err := verifyJustification(h.grandpa, just, 77, gs.state.setID, precommit) require.NoError(t, err) } @@ -482,7 +482,7 @@ func TestVerifyJustification_InvalidSignature(t *testing.T) { AuthorityID: kr.Alice().Public().(*ed25519.PublicKey).AsBytes(), } - err := h.verifyJustification(just, 77, gs.state.setID, precommit) + err := verifyJustification(h.grandpa, just, 77, gs.state.setID, precommit) require.EqualError(t, err, ErrInvalidSignature.Error()) } @@ -505,7 +505,7 @@ func TestVerifyJustification_InvalidAuthority(t *testing.T) { AuthorityID: fakeKey.Public().(*ed25519.PublicKey).AsBytes(), } - err = h.verifyJustification(just, 77, gs.state.setID, precommit) + err = verifyJustification(h.grandpa, just, 77, gs.state.setID, precommit) require.EqualError(t, err, ErrVoterNotFound.Error()) } @@ -524,7 +524,7 @@ func TestMessageHandler_VerifyPreVoteJustification(t *testing.T) { PreVoteJustification: just, } - prevote, err := h.verifyPreVoteJustification(msg) + prevote, err := h.catchUp.verifyPreVoteJustification(msg) require.NoError(t, err) require.Equal(t, testHash, prevote) } @@ -547,7 +547,7 @@ func TestMessageHandler_VerifyPreCommitJustification(t *testing.T) { Number: uint32(round), } - err := h.verifyPreCommitJustification(msg) + err := h.catchUp.verifyPreCommitJustification(msg) require.NoError(t, err) } @@ -949,7 +949,7 @@ func Test_VerifyPrevoteJustification_CountEquivocatoryVoters(t *testing.T) { Number: uint32(bfcNumber), } - hash, err := h.verifyPreVoteJustification(testCatchUpResponse) + hash, err := h.catchUp.verifyPreVoteJustification(testCatchUpResponse) require.NoError(t, err) require.Equal(t, hash, bfcHash) } @@ -1022,7 +1022,7 @@ func Test_VerifyPreCommitJustification(t *testing.T) { Number: uint32(bfcNumber), } - err = h.verifyPreCommitJustification(testCatchUpResponse) + err = h.catchUp.verifyPreCommitJustification(testCatchUpResponse) require.NoError(t, err) } From 8fbd0953c541bc0ecaca3032e74c73df5c0cb82f Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Wed, 26 Jan 2022 18:34:59 +0530 Subject: [PATCH 11/36] wait to receive response after sending catch req --- lib/grandpa/catch-up.go | 39 +++++++++++++++++++++++++++++++--- lib/grandpa/message_handler.go | 13 +++--------- 2 files changed, 39 insertions(+), 13 deletions(-) diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go index 2f46558abd..32c39a1b66 100644 --- a/lib/grandpa/catch-up.go +++ b/lib/grandpa/catch-up.go @@ -4,15 +4,22 @@ package grandpa import ( + "errors" + "fmt" + "time" + "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/crypto/ed25519" "github.com/libp2p/go-libp2p-core/peer" ) +const catchUpResponseTimeout = time.Second * 5 + type catchUp struct { - requestsSent map[peer.ID]CatchUpRequest - bestResponse *CatchUpResponse - grandpa *Service + requestsSent map[peer.ID]CatchUpRequest + bestResponse *CatchUpResponse + catchUpResponseCh chan *CatchUpResponse + grandpa *Service } func newCatchUp(grandpa *Service) *catchUp { @@ -22,6 +29,31 @@ func newCatchUp(grandpa *Service) *catchUp { } } +func (c *catchUp) do(to peer.ID, round uint64, setID uint64) error { + if err := c.sendCatchUpRequest( + to, newCatchUpRequest(round, setID), + ); err != nil { + logger.Debugf("failed to send catch up request: %s", err.Error()) + return err + } + + logger.Debugf("successfully sent a catch up request to node %s, for round number %d and set ID %d", + to, round, setID) + + c.grandpa.paused.Store(true) + + timer := time.NewTimer(catchUpResponseTimeout) + defer timer.Stop() + + select { + case <-c.catchUpResponseCh: + fmt.Println("got a response, this is awesome") + return nil + case <-timer.C: + return errors.New("timeout") + } +} + func (c *catchUp) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) error { if c.bestResponse != nil { logger.Debugf("ignoring neighbour message since we are already processing a catch response") @@ -115,6 +147,7 @@ func (c *catchUp) handleCatchUpResponse(msg *CatchUpResponse) error { c.grandpa.state.round = msg.Round close(c.grandpa.resumed) c.grandpa.resumed = make(chan struct{}) + c.catchUpResponseCh <- msg c.grandpa.paused.Store(false) c.bestResponse = nil diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 1467ee66ab..ad9bc86201 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -127,16 +127,9 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer // catch up only if we are behind by more than catchup threshold if (int(msg.Round) - int(highestRound)) > catchupThreshold { logger.Debugf("lagging behind by %d rounds", int(msg.Round)-int(highestRound)) - - if err := h.catchUp.sendCatchUpRequest( - from, newCatchUpRequest(msg.Round, setID), - ); err != nil { - logger.Debugf("failed to send catch up request: %s", err.Error()) - return err - } - - logger.Debugf("successfully sent a catch up request to node %s, for round number %d and set ID %d", - from, msg.Round, setID) + return h.catchUp.do(from, msg.Round, msg.SetID) + } else { + logger.Debugf("not lagging behind by more than threshold rounds") } return nil From cc01bb2d1e604d542084176a88319d4d168d30bb Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Wed, 26 Jan 2022 21:49:47 +0530 Subject: [PATCH 12/36] make handling response thread safe --- lib/grandpa/catch-up.go | 41 +++++++++++++++++++++++++--------- lib/grandpa/message_handler.go | 4 ++-- lib/grandpa/message_tracker.go | 1 - 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go index 32c39a1b66..5c68137187 100644 --- a/lib/grandpa/catch-up.go +++ b/lib/grandpa/catch-up.go @@ -5,7 +5,8 @@ package grandpa import ( "errors" - "fmt" + "sync" + "sync/atomic" "time" "github.com/ChainSafe/gossamer/lib/common" @@ -16,20 +17,27 @@ import ( const catchUpResponseTimeout = time.Second * 5 type catchUp struct { - requestsSent map[peer.ID]CatchUpRequest - bestResponse *CatchUpResponse + lock sync.Mutex // applied on requestsSent and bestResponse + + requestsSent map[peer.ID]CatchUpRequest + bestResponse *atomic.Value // *CatchUpResponse + catchUpResponseCh chan *CatchUpResponse + waitingOnResponse *atomic.Value grandpa *Service } func newCatchUp(grandpa *Service) *catchUp { return &catchUp{ - requestsSent: make(map[peer.ID]CatchUpRequest), - grandpa: grandpa, + requestsSent: make(map[peer.ID]CatchUpRequest), + grandpa: grandpa, + catchUpResponseCh: make(chan *CatchUpResponse), } } func (c *catchUp) do(to peer.ID, round uint64, setID uint64) error { + defer c.waitingOnResponse.Store(false) + if err := c.sendCatchUpRequest( to, newCatchUpRequest(round, setID), ); err != nil { @@ -47,7 +55,6 @@ func (c *catchUp) do(to peer.ID, round uint64, setID uint64) error { select { case <-c.catchUpResponseCh: - fmt.Println("got a response, this is awesome") return nil case <-timer.C: return errors.New("timeout") @@ -55,18 +62,22 @@ func (c *catchUp) do(to peer.ID, round uint64, setID uint64) error { } func (c *catchUp) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) error { - if c.bestResponse != nil { + if c.bestResponse.Load() != nil { logger.Debugf("ignoring neighbour message since we are already processing a catch response") return nil } // TODO: Clean up all request sent before 5 min / (neighbour message interval) + c.lock.Lock() _, ok := c.requestsSent[to] + c.lock.Unlock() if ok { logger.Debugf("ignoring neighbour message since we already sent a catch request to this peer: %s", to) return nil } + c.waitingOnResponse.Store(true) + cm, err := req.ToConsensusMessage() if err != nil { return err @@ -77,7 +88,9 @@ func (c *catchUp) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) error { return err } + c.lock.Lock() c.requestsSent[to] = *req + c.lock.Unlock() c.grandpa.paused.Store(true) return nil @@ -106,7 +119,7 @@ func (c *catchUp) handleCatchUpResponse(msg *CatchUpResponse) error { return ErrInvalidCatchUpResponseRound } - if c.bestResponse.Round >= msg.Round { + if c.bestResponse.Load().(*CatchUpResponse).Round >= msg.Round { logger.Debug("ignoring catch up response, since we are already processing one with a higher round") } @@ -147,9 +160,17 @@ func (c *catchUp) handleCatchUpResponse(msg *CatchUpResponse) error { c.grandpa.state.round = msg.Round close(c.grandpa.resumed) c.grandpa.resumed = make(chan struct{}) - c.catchUpResponseCh <- msg + if c.waitingOnResponse.Load().(bool) { + c.catchUpResponseCh <- msg + } + c.grandpa.paused.Store(false) - c.bestResponse = nil + + // resetting both response and requests + c.bestResponse.Store(nil) + c.lock.Lock() + c.requestsSent = make(map[peer.ID]CatchUpRequest) + c.lock.Unlock() logger.Debugf("caught up to round; unpaused service and grandpa state round is %d", c.grandpa.state.round) return nil diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index ad9bc86201..78911c7ebd 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -75,7 +75,7 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) error { from: from, msg: msg, }) - h.catchUp.bestResponse = msg + h.catchUp.bestResponse.Store(msg) } else if err != nil { logger.Debugf("could not catchup: %s", err) } @@ -129,7 +129,7 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer logger.Debugf("lagging behind by %d rounds", int(msg.Round)-int(highestRound)) return h.catchUp.do(from, msg.Round, msg.SetID) } else { - logger.Debugf("not lagging behind by more than threshold rounds") + logger.Debugf("not lagging behind by more than threshold rounds, msg.Round: %d, highestRound: %d", int(msg.Round), int(highestRound)) } return nil diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index 488a7e69f1..8cd6ffb686 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -125,7 +125,6 @@ func (t *tracker) handleBlock(b *types.Block) { } } - // TODO: Can I use the same mapLock or do I need to use catchUpResponseLock? if cr, has := t.catchUpResponseMessages[h]; has { err := t.handler.handleMessage(cr.from, cr.msg) if err != nil { From b5e81f3619af5d082dee3f239e80fc3f71f1e85f Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Thu, 27 Jan 2022 21:14:46 +0530 Subject: [PATCH 13/36] some cleanup --- lib/grandpa/grandpa.go | 4 +--- lib/grandpa/message_handler.go | 4 ---- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index 7bcee36716..09f76788f5 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -74,9 +74,7 @@ type Service struct { in chan *networkVoteMessage // only used to receive *VoteMessage finalisedCh chan *types.FinalisationInfo neighbourMessage *NeighbourMessage // cached neighbour message - // requestsSent map[peer.ID]CatchUpRequest - // bestResponse *CatchUpResponse - telemetry telemetry.Client + telemetry telemetry.Client } // Config represents a GRANDPA service configuration diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 78911c7ebd..c470a7da70 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -41,8 +41,6 @@ func NewMessageHandler(grandpa *Service, blockState BlockState, telemetryMailer } } -//nolint -// TODO: NotificationMessage is used at places. But NotificationMessage we return is always nil. // HandleMessage handles a GRANDPA consensus message // if it is a CommitMessage, it updates the BlockState // if it is a VoteMessage, it sends it to the GRANDPA service @@ -69,8 +67,6 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) error { // err := h.handleCatchUpResponse(msg) err := h.catchUp.handleCatchUpResponse(msg) if errors.Is(err, blocktree.ErrNodeNotFound) || errors.Is(err, chaindb.ErrKeyNotFound) { - // TODO: revisit if we need to add these message in synchronous manner - // or not. If not, change catchUpResponseMessages to a normal map. #1531 h.grandpa.tracker.addCatchUpResponse(&networkCatchUpResponseMessage{ from: from, msg: msg, From b2dc802723c47485158e52ea03c39d0d0039e943 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Thu, 27 Jan 2022 23:39:50 +0530 Subject: [PATCH 14/36] neighbour message does not get propagated --- lib/grandpa/network.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/grandpa/network.go b/lib/grandpa/network.go index 454d703588..8c914d30c3 100644 --- a/lib/grandpa/network.go +++ b/lib/grandpa/network.go @@ -116,6 +116,8 @@ func (*Service) decodeMessage(in []byte) (NotificationsMessage, error) { return msg, err } +// handleNetworkMessage processes notification messages and return true if we should +// propagate this message, false if otherwise. func (s *Service) handleNetworkMessage(from peer.ID, msg NotificationsMessage) (bool, error) { if msg == nil { return false, nil @@ -143,9 +145,12 @@ func (s *Service) handleNetworkMessage(from peer.ID, msg NotificationsMessage) ( switch m.Type() { case VoteMessageType, CommitMessageType: s.network.GossipMessage(msg) + return true, nil + case NeighborMessageType, CatchUpRequestMessageType, CatchUpResponseMessageType: + return false, nil } - return true, nil + return false, nil } // sendMessage sends a vote message to be gossiped to the network From fa2dbdd7eaa1e38a5fc045ddf0253022074d25ff Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:39:11 +0530 Subject: [PATCH 15/36] Update lib/grandpa/catch-up.go Co-authored-by: noot <36753753+noot@users.noreply.github.com> --- lib/grandpa/catch-up.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go index 5c68137187..c0ee51e24a 100644 --- a/lib/grandpa/catch-up.go +++ b/lib/grandpa/catch-up.go @@ -63,7 +63,7 @@ func (c *catchUp) do(to peer.ID, round uint64, setID uint64) error { func (c *catchUp) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) error { if c.bestResponse.Load() != nil { - logger.Debugf("ignoring neighbour message since we are already processing a catch response") + logger.Debug("ignoring neighbour message since we are already processing a catch-up response") return nil } From 1a9c766a93c821ccfe30f227d22f3c50720b94dd Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:39:23 +0530 Subject: [PATCH 16/36] Update lib/grandpa/catch-up.go Co-authored-by: noot <36753753+noot@users.noreply.github.com> --- lib/grandpa/catch-up.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go index c0ee51e24a..1e13c0b804 100644 --- a/lib/grandpa/catch-up.go +++ b/lib/grandpa/catch-up.go @@ -72,7 +72,7 @@ func (c *catchUp) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) error { _, ok := c.requestsSent[to] c.lock.Unlock() if ok { - logger.Debugf("ignoring neighbour message since we already sent a catch request to this peer: %s", to) + logger.Debugf("ignoring neighbour message since we already sent a catch-up request to this peer: %s", to) return nil } From 0019d85902d1725166bda118b707f736504593d5 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:41:00 +0530 Subject: [PATCH 17/36] Update lib/grandpa/catch-up.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Eclésio Junior --- lib/grandpa/catch-up.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go index 1e13c0b804..5a42896c91 100644 --- a/lib/grandpa/catch-up.go +++ b/lib/grandpa/catch-up.go @@ -17,7 +17,7 @@ import ( const catchUpResponseTimeout = time.Second * 5 type catchUp struct { - lock sync.Mutex // applied on requestsSent and bestResponse + lock sync.Mutex // applied on requestsSent requestsSent map[peer.ID]CatchUpRequest bestResponse *atomic.Value // *CatchUpResponse From 3ac4e19f249525e24304c54ad877a3b38739e287 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:43:09 +0530 Subject: [PATCH 18/36] Update lib/grandpa/catch-up.go Co-authored-by: Quentin McGaw --- lib/grandpa/catch-up.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go index 5a42896c91..40c239d6ae 100644 --- a/lib/grandpa/catch-up.go +++ b/lib/grandpa/catch-up.go @@ -14,7 +14,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) -const catchUpResponseTimeout = time.Second * 5 +const catchUpResponseTimeout = 5 * time.Second type catchUp struct { lock sync.Mutex // applied on requestsSent From e75c810c0352a9c186ac79729868077c2abfde0e Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:53:48 +0530 Subject: [PATCH 19/36] Update lib/grandpa/message_handler.go Co-authored-by: Quentin McGaw --- lib/grandpa/message_handler.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index c470a7da70..dd7dd12fbd 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -193,7 +193,8 @@ func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest, from peer.ID) } if msg.Round > h.grandpa.state.round { - return ErrInvalidCatchUpRound + return fmt.Errorf("%w: received %d and grandpa state round is %d", + ErrInvalidCatchUpRound, msg.Round, h.grandpa.state.round) } // We don't necessarily have to reply with the round asked in the request, we can reply From aa78386a016fa15e25f9faddde78c6b0ade44d02 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:54:13 +0530 Subject: [PATCH 20/36] Update lib/grandpa/catch-up.go Co-authored-by: Quentin McGaw --- lib/grandpa/catch-up.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go index 40c239d6ae..f43510c613 100644 --- a/lib/grandpa/catch-up.go +++ b/lib/grandpa/catch-up.go @@ -80,7 +80,7 @@ func (c *catchUp) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) error { cm, err := req.ToConsensusMessage() if err != nil { - return err + return fmt.Errorf("cannot convert request to consensus message: %w", err) } err = c.grandpa.network.SendMessage(to, cm) From 884a59e5893c3cc28402a6352c3065ed43ec3053 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:54:33 +0530 Subject: [PATCH 21/36] Update lib/grandpa/catch-up.go Co-authored-by: Quentin McGaw --- lib/grandpa/catch-up.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go index f43510c613..9e2cfb4033 100644 --- a/lib/grandpa/catch-up.go +++ b/lib/grandpa/catch-up.go @@ -85,7 +85,7 @@ func (c *catchUp) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) error { err = c.grandpa.network.SendMessage(to, cm) if err != nil { - return err + return fmt.Errorf("cannot send grandpa message: %w", err) } c.lock.Lock() From 6810d9263a0ae59479ff012ae4fe7f6d2a3e48a6 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:54:42 +0530 Subject: [PATCH 22/36] Update lib/grandpa/message_handler.go Co-authored-by: Quentin McGaw --- lib/grandpa/message_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index dd7dd12fbd..86fc959332 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -121,7 +121,7 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer } // catch up only if we are behind by more than catchup threshold - if (int(msg.Round) - int(highestRound)) > catchupThreshold { + if int(msg.Round - highestRound) > catchupThreshold { logger.Debugf("lagging behind by %d rounds", int(msg.Round)-int(highestRound)) return h.catchUp.do(from, msg.Round, msg.SetID) } else { From 835ac6bf58acbddfe024a54f6553bbe392468d3c Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:55:11 +0530 Subject: [PATCH 23/36] Update lib/grandpa/message_handler.go Co-authored-by: Quentin McGaw --- lib/grandpa/message_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 86fc959332..a75875f1cc 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -122,7 +122,7 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer // catch up only if we are behind by more than catchup threshold if int(msg.Round - highestRound) > catchupThreshold { - logger.Debugf("lagging behind by %d rounds", int(msg.Round)-int(highestRound)) + logger.Debugf("lagging behind by %d rounds", msg.Round-highestRound) return h.catchUp.do(from, msg.Round, msg.SetID) } else { logger.Debugf("not lagging behind by more than threshold rounds, msg.Round: %d, highestRound: %d", int(msg.Round), int(highestRound)) From b57565f05ec7f6ccbb9cba034747059371915b76 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:55:22 +0530 Subject: [PATCH 24/36] Update lib/grandpa/message_handler.go Co-authored-by: Quentin McGaw --- lib/grandpa/message_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index a75875f1cc..05787ec5d7 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -125,7 +125,7 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer logger.Debugf("lagging behind by %d rounds", msg.Round-highestRound) return h.catchUp.do(from, msg.Round, msg.SetID) } else { - logger.Debugf("not lagging behind by more than threshold rounds, msg.Round: %d, highestRound: %d", int(msg.Round), int(highestRound)) + logger.Debugf("not lagging behind by more than threshold rounds, msg.Round: %d, highestRound: %d", msg.Round, highestRound) } return nil From 805478276ef60dce6aa2148cb282ead79293dcb6 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:55:58 +0530 Subject: [PATCH 25/36] Update lib/grandpa/catch-up.go Co-authored-by: Quentin McGaw --- lib/grandpa/catch-up.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go index 9e2cfb4033..82e5d90d94 100644 --- a/lib/grandpa/catch-up.go +++ b/lib/grandpa/catch-up.go @@ -112,7 +112,7 @@ func (c *catchUp) handleCatchUpResponse(msg *CatchUpResponse) error { } if msg.SetID != c.grandpa.state.setID { - return ErrSetIDMismatch + return fmt.Errorf("%w: received set id %d but have set id %d in state", ErrSetIDMismatch, msg.SetID, c.grandpa.state.setID) } if msg.Round <= c.grandpa.state.round { From 144022e43232e3ada91fe8ce309eceee4e7509d1 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:56:22 +0530 Subject: [PATCH 26/36] Update lib/grandpa/catch-up.go Co-authored-by: Quentin McGaw --- lib/grandpa/catch-up.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go index 82e5d90d94..3738856067 100644 --- a/lib/grandpa/catch-up.go +++ b/lib/grandpa/catch-up.go @@ -116,7 +116,7 @@ func (c *catchUp) handleCatchUpResponse(msg *CatchUpResponse) error { } if msg.Round <= c.grandpa.state.round { - return ErrInvalidCatchUpResponseRound + return fmt.Errorf("%w: received round %d but grandpa round in state is %d", ErrInvalidCatchUpResponseRound, msg.Round, c.grandpa.state.round) } if c.bestResponse.Load().(*CatchUpResponse).Round >= msg.Round { From db73c8c3333c35be11283340ff45d2e668820f08 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:56:42 +0530 Subject: [PATCH 27/36] Update lib/grandpa/catch-up.go Co-authored-by: Quentin McGaw --- lib/grandpa/catch-up.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go index 3738856067..2193dbc7d2 100644 --- a/lib/grandpa/catch-up.go +++ b/lib/grandpa/catch-up.go @@ -125,7 +125,7 @@ func (c *catchUp) handleCatchUpResponse(msg *CatchUpResponse) error { prevote, err := c.verifyPreVoteJustification(msg) if err != nil { - return err + return fmt.Errorf("cannot verify pre vote justification: %w", err) } if err = c.verifyPreCommitJustification(msg); err != nil { From 592e201488381d7b8d4d4144624b82048a4c2d4e Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:57:04 +0530 Subject: [PATCH 28/36] Update lib/grandpa/catch-up.go Co-authored-by: Quentin McGaw --- lib/grandpa/catch-up.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go index 2193dbc7d2..b5e729a34d 100644 --- a/lib/grandpa/catch-up.go +++ b/lib/grandpa/catch-up.go @@ -129,7 +129,7 @@ func (c *catchUp) handleCatchUpResponse(msg *CatchUpResponse) error { } if err = c.verifyPreCommitJustification(msg); err != nil { - return err + return fmt.Errorf("cannot verify pre commit justification: %w", err) } if msg.Hash.IsEmpty() || msg.Number == 0 { From d2fcc2140e4b5960a4ca0a368fcc224709b7292b Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:57:30 +0530 Subject: [PATCH 29/36] Update lib/grandpa/catch-up.go Co-authored-by: Quentin McGaw --- lib/grandpa/catch-up.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go index b5e729a34d..d0195d39f5 100644 --- a/lib/grandpa/catch-up.go +++ b/lib/grandpa/catch-up.go @@ -137,7 +137,7 @@ func (c *catchUp) handleCatchUpResponse(msg *CatchUpResponse) error { } if err = c.verifyCatchUpResponseCompletability(prevote, msg.Hash); err != nil { - return err + return fmt.Errorf("cannot verify catch-up response completability: %w", err) } // set prevotes and precommits in db From c04125b1a797b24c058e8f923cbe635b7063b4d6 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:57:47 +0530 Subject: [PATCH 30/36] Update lib/grandpa/catch-up.go Co-authored-by: Quentin McGaw --- lib/grandpa/catch-up.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go index d0195d39f5..36fcaf2515 100644 --- a/lib/grandpa/catch-up.go +++ b/lib/grandpa/catch-up.go @@ -142,7 +142,7 @@ func (c *catchUp) handleCatchUpResponse(msg *CatchUpResponse) error { // set prevotes and precommits in db if err = c.grandpa.grandpaState.SetPrevotes(msg.Round, msg.SetID, msg.PreVoteJustification); err != nil { - return err + return fmt.Errorf("cannot set pre votes in grandpa state: %w", err) } if err = c.grandpa.grandpaState.SetPrecommits(msg.Round, msg.SetID, msg.PreCommitJustification); err != nil { From 576eabd83458d35123fe9b9c12096944cc05b2b3 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:58:08 +0530 Subject: [PATCH 31/36] Update lib/grandpa/catch-up.go Co-authored-by: Quentin McGaw --- lib/grandpa/catch-up.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go index 36fcaf2515..6eb03cb8a8 100644 --- a/lib/grandpa/catch-up.go +++ b/lib/grandpa/catch-up.go @@ -153,7 +153,7 @@ func (c *catchUp) handleCatchUpResponse(msg *CatchUpResponse) error { head, err := c.grandpa.blockState.GetHeader(msg.Hash) if err != nil { logger.Debugf("failed to process catch up response for round %d, storing the catch up response to retry", msg.Round) - return err + return return fmt.Errorf("cannot get header from grandpa block state: %w", err) } c.grandpa.head = head From a45fbd51e1c3e5e76f3c50d4fbc2153119fad290 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:58:24 +0530 Subject: [PATCH 32/36] Update lib/grandpa/catch-up.go Co-authored-by: Quentin McGaw --- lib/grandpa/catch-up.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go index 6eb03cb8a8..8e5a688ef2 100644 --- a/lib/grandpa/catch-up.go +++ b/lib/grandpa/catch-up.go @@ -146,7 +146,7 @@ func (c *catchUp) handleCatchUpResponse(msg *CatchUpResponse) error { } if err = c.grandpa.grandpaState.SetPrecommits(msg.Round, msg.SetID, msg.PreCommitJustification); err != nil { - return err + return return fmt.Errorf("cannot set pre commits in grandpa state: %w", err) } // update state and signal to grandpa we are ready to initiate From a4aa5f7ed90ee02b50598b39217915770d3fe2d6 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:58:47 +0530 Subject: [PATCH 33/36] Update lib/grandpa/catch-up.go Co-authored-by: Quentin McGaw --- lib/grandpa/catch-up.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go index 8e5a688ef2..f3c60ee3e7 100644 --- a/lib/grandpa/catch-up.go +++ b/lib/grandpa/catch-up.go @@ -185,7 +185,7 @@ func (c *catchUp) verifyCatchUpResponseCompletability(prevote, precommit common. // check if the current block is a descendant of prevoted block isDescendant, err := c.grandpa.blockState.IsDescendantOf(prevote, precommit) if err != nil { - return err + return fmt.Errorf("cannot find descendance between prevote and precommit: %w", err) } if !isDescendant { From 728e91ae6ec4679c4aba2ce8e78abf0229622a52 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:59:22 +0530 Subject: [PATCH 34/36] Update lib/grandpa/message_handler.go Co-authored-by: Quentin McGaw --- lib/grandpa/message_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 05787ec5d7..2ed06d9988 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -113,7 +113,7 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer highestRound, setID, err := h.blockState.GetHighestRoundAndSetID() if err != nil { - return err + return fmt.Errorf("cannot get highest round and set id: %w", err) } if msg.SetID != setID { From 735f98cd5d8e96cc5ab497ac11a02358e29e99a0 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 15:59:45 +0530 Subject: [PATCH 35/36] Update lib/grandpa/message_handler.go Co-authored-by: Quentin McGaw --- lib/grandpa/message_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 2ed06d9988..4d788d545b 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -117,7 +117,7 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer } if msg.SetID != setID { - return ErrSetIDMismatch + return fmt.Errorf("%w: received %d and expected %d", ErrSetIDMismatch, msg.SetID, setID) } // catch up only if we are behind by more than catchup threshold From a3bc8972efcf25a5f37afb5ce285caab7d2f8e52 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 14 Mar 2022 16:00:00 +0530 Subject: [PATCH 36/36] Update lib/grandpa/catch-up.go Co-authored-by: Quentin McGaw --- lib/grandpa/catch-up.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/grandpa/catch-up.go b/lib/grandpa/catch-up.go index f3c60ee3e7..ca89b5f640 100644 --- a/lib/grandpa/catch-up.go +++ b/lib/grandpa/catch-up.go @@ -41,8 +41,7 @@ func (c *catchUp) do(to peer.ID, round uint64, setID uint64) error { if err := c.sendCatchUpRequest( to, newCatchUpRequest(round, setID), ); err != nil { - logger.Debugf("failed to send catch up request: %s", err.Error()) - return err + return fmt.Errorf("failed to send catch up request: %w", err) } logger.Debugf("successfully sent a catch up request to node %s, for round number %d and set ID %d",