Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b7d814c
lib/grandpa: ensure catch-up logic works
kishansagathiya Jan 10, 2022
13b943e
Merge branch 'development' into feat/kishan/grandpa-catchup
kishansagathiya Jan 11, 2022
faa8f82
lib/grandpa: ensure catch-up logic works
kishansagathiya Jan 12, 2022
0099a98
cleaning up a few things
kishansagathiya Jan 12, 2022
ec356aa
cleaning up more things
kishansagathiya Jan 19, 2022
8320fc9
cleaned up more code
kishansagathiya Jan 20, 2022
9a73bb2
small improvement
kishansagathiya Jan 20, 2022
b554d66
Merge branch 'development' into feat/kishan/grandpa-catchup
kishansagathiya Jan 20, 2022
22e43c3
fixing lint
kishansagathiya Jan 20, 2022
87c3ccf
Addressed some reviews + tests changes
kishansagathiya Jan 21, 2022
75c4c4f
addressed some more reviews
kishansagathiya Jan 25, 2022
c1cd3f4
moved catchup logic in a separate file
kishansagathiya Jan 26, 2022
8fbd095
wait to receive response after sending catch req
kishansagathiya Jan 26, 2022
cc01bb2
make handling response thread safe
kishansagathiya Jan 26, 2022
0b6aeaa
Merge branch 'development' into feat/kishan/grandpa-catchup
kishansagathiya Jan 26, 2022
b5e81f3
some cleanup
kishansagathiya Jan 27, 2022
b2dc802
neighbour message does not get propagated
kishansagathiya Jan 27, 2022
fa2dbdd
Update lib/grandpa/catch-up.go
kishansagathiya Mar 14, 2022
1a9c766
Update lib/grandpa/catch-up.go
kishansagathiya Mar 14, 2022
0019d85
Update lib/grandpa/catch-up.go
kishansagathiya Mar 14, 2022
3ac4e19
Update lib/grandpa/catch-up.go
kishansagathiya Mar 14, 2022
e75c810
Update lib/grandpa/message_handler.go
kishansagathiya Mar 14, 2022
aa78386
Update lib/grandpa/catch-up.go
kishansagathiya Mar 14, 2022
884a59e
Update lib/grandpa/catch-up.go
kishansagathiya Mar 14, 2022
6810d92
Update lib/grandpa/message_handler.go
kishansagathiya Mar 14, 2022
835ac6b
Update lib/grandpa/message_handler.go
kishansagathiya Mar 14, 2022
b57565f
Update lib/grandpa/message_handler.go
kishansagathiya Mar 14, 2022
8054782
Update lib/grandpa/catch-up.go
kishansagathiya Mar 14, 2022
144022e
Update lib/grandpa/catch-up.go
kishansagathiya Mar 14, 2022
db73c8c
Update lib/grandpa/catch-up.go
kishansagathiya Mar 14, 2022
592e201
Update lib/grandpa/catch-up.go
kishansagathiya Mar 14, 2022
d2fcc21
Update lib/grandpa/catch-up.go
kishansagathiya Mar 14, 2022
c04125b
Update lib/grandpa/catch-up.go
kishansagathiya Mar 14, 2022
576eabd
Update lib/grandpa/catch-up.go
kishansagathiya Mar 14, 2022
a45fbd5
Update lib/grandpa/catch-up.go
kishansagathiya Mar 14, 2022
a4aa5f7
Update lib/grandpa/catch-up.go
kishansagathiya Mar 14, 2022
728e91a
Update lib/grandpa/message_handler.go
kishansagathiya Mar 14, 2022
735f98c
Update lib/grandpa/message_handler.go
kishansagathiya Mar 14, 2022
a3bc897
Update lib/grandpa/catch-up.go
kishansagathiya Mar 14, 2022
71c3774
Merge branch 'development' into feat/kishan/grandpa-catchup
kishansagathiya Mar 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func (s *Service) sendHandshake(peer peer.ID, hs Handshake, info *notificationsP
}

if hsResponse.err != nil {
logger.Tracef("failed to read handshake from peer %s using protocol %s: %s", peer, info.protocolID, err)
logger.Tracef("failed to read handshake from peer %s using protocol %s: %s", peer, info.protocolID, hsResponse.err)
Comment thread
kishansagathiya marked this conversation as resolved.
closeOutboundStream(info, peer, stream)
return nil, hsResponse.err
}
Expand Down Expand Up @@ -419,6 +419,7 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer
continue
}

info.outboundHandshakeMutexes.Store(peer, new(sync.Mutex))
Comment thread
kishansagathiya marked this conversation as resolved.
Outdated
go s.sendData(peer, hs, info, msg)
}
}
Expand Down
2 changes: 2 additions & 0 deletions lib/grandpa/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ func compactToJustification(vs []Vote, auths []AuthData) ([]SignedVote, error) {

// CatchUpRequest struct to represent a CatchUpRequest message
type CatchUpRequest struct {
// Round that we want to catch up to
Round uint64
// The voter set ID this message is from.
SetID uint64
}

Expand Down
126 changes: 93 additions & 33 deletions lib/grandpa/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"math/big"
"reflect"

"github.com/ChainSafe/chaindb"
"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/dot/types"
Expand All @@ -21,6 +22,8 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
)

const catchupThreshold = 2

// MessageHandler handles GRANDPA consensus messages
type MessageHandler struct {
grandpa *Service
Expand All @@ -37,6 +40,8 @@ func NewMessageHandler(grandpa *Service, blockState BlockState, telemetryMailer
}
}

//nolint
// TODO: NotificationMessage is used at places. But NotificationMessage we return is always nil.
Comment thread
kishansagathiya marked this conversation as resolved.
Outdated
// HandleMessage handles a GRANDPA consensus message
// if it is a CommitMessage, it updates the BlockState
// if it is a VoteMessage, it sends it to the GRANDPA service
Expand All @@ -56,25 +61,29 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network.
return nil, h.handleCommitMessage(msg)
case *NeighbourMessage:
// we can afford to not retry handling neighbour message, if it errors.
return nil, h.handleNeighbourMessage(msg)
return nil, h.handleNeighbourMessage(msg, from)
case *CatchUpRequest:
return h.handleCatchUpRequest(msg)
return nil, h.handleCatchUpRequest(msg, from)
case *CatchUpResponse:
err := h.handleCatchUpResponse(msg)
if errors.Is(err, blocktree.ErrNodeNotFound) {
// TODO: we are adding these messages to reprocess them again, but we
// haven't added code to reprocess them. Do that.
// Also, revisit if we need to add these message in synchronous manner
if errors.Is(err, blocktree.ErrNodeNotFound) || errors.Is(err, chaindb.ErrKeyNotFound) {
// TODO: revisit if we need to add these message in synchronous manner
// or not. If not, change catchUpResponseMessages to a normal map. #1531
h.grandpa.tracker.addCatchUpResponse(msg)
h.grandpa.tracker.addCatchUpResponse(&networkCatchUpResponseMessage{
from: from,
msg: msg,
})
} else if err != nil {
logger.Debugf("could not catchup: %s", err)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not return the error here?

}

return nil, err
default:
return nil, ErrInvalidMessageType
}
}

func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage) error {
func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer.ID) error {
currFinalized, err := h.blockState.GetFinalisedHeader(0, 0)
if err != nil {
return err
Expand All @@ -87,18 +96,58 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage) error {

// TODO; determine if there is some reason we don't receive justifications in responses near the head (usually),
// and remove the following code if it's fixed. (#1815)
head, err := h.blockState.BestBlockNumber()
// head, err := h.blockState.BestBlockNumber()
// if err != nil {
// return err
// }

// TODO: Why are we ignoring these? Isn't
// msg.Number likely to be higher if we are lagging behind?
// ignore neighbour messages that are above our head
// if int64(msg.Number) > head.Int64() {
// return nil
// }
Comment thread
kishansagathiya marked this conversation as resolved.
Outdated
Comment thread
kishansagathiya marked this conversation as resolved.
Outdated

logger.Debugf("got neighbour message with number %d, set id %d and round %d, from: %s ",
msg.Number, msg.SetID, msg.Round, from)
// TODO: should we send a justification request here? potentially re-connect this to sync package? (#1815)
Comment thread
kishansagathiya marked this conversation as resolved.
Outdated

highestRound, setID, err := h.blockState.GetHighestRoundAndSetID()
if err != nil {
return err
Comment thread
kishansagathiya marked this conversation as resolved.
Outdated
}

// ignore neighbour messages that are above our head
if int64(msg.Number) > head.Int64() {
return nil
// catch up only if we are behind by more than catchup threshold
if (int(msg.Round) - int(highestRound)) > catchupThreshold {
Comment thread
kishansagathiya marked this conversation as resolved.
Outdated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define catchupThreshold as an inlined constant here instead

Comment thread
kishansagathiya marked this conversation as resolved.
Outdated
logger.Debugf("lagging behind by %d rounds", int(msg.Round)-int(highestRound))
Comment thread
kishansagathiya marked this conversation as resolved.
Outdated

if err := h.sendCatchUpRequest(
from, newCatchUpRequest(msg.Round, setID),
); err != nil {
logger.Debugf("failed to send catch up request: %s", err.Error())
return err
}

logger.Debugf("successfully sent a catch up request to node %s, for round number %d and set ID %d",
from, msg.Round, setID)
}

logger.Debugf("got neighbour message with number %d, set id %d and round %d", msg.Number, msg.SetID, msg.Round)
// TODO: should we send a justification request here? potentially re-connect this to sync package? (#1815)
return nil
}

func (h *MessageHandler) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) error {
cm, err := req.ToConsensusMessage()
if err != nil {
return err
}

err = h.grandpa.network.SendMessage(to, cm)
if err != nil {
return err
}

h.grandpa.paused.Store(true)

return nil
}

Expand Down Expand Up @@ -149,31 +198,46 @@ func (h *MessageHandler) handleCommitMessage(msg *CommitMessage) error {
return nil
}

func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest) (*ConsensusMessage, error) {
func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest, from peer.ID) error {
if !h.grandpa.authority {
return nil, nil //nolint:nilnil
return nil
}

logger.Debugf("received catch up request for round %d and set id %d",
msg.Round, msg.SetID)
logger.Debugf("received catch up request for round %d and set id %d, from %s",
msg.Round, msg.SetID, from)

logger.Debugf("Our latest round is %d", h.grandpa.state.round)

if msg.SetID != h.grandpa.state.setID {
return nil, ErrSetIDMismatch
return ErrSetIDMismatch
}

if msg.Round >= h.grandpa.state.round {
return nil, ErrInvalidCatchUpRound
if msg.Round > h.grandpa.state.round {
return ErrInvalidCatchUpRound
Comment thread
kishansagathiya marked this conversation as resolved.
Outdated
}

resp, err := h.grandpa.newCatchUpResponse(msg.Round, msg.SetID)
// We don't necessarily have to reply with the round asked in the request, we can reply
// with our latest round.
resp, err := h.grandpa.newCatchUpResponse(h.grandpa.state.round, h.grandpa.state.setID)
if err != nil {
return nil, err
return err
}

cm, err := resp.ToConsensusMessage()
if err != nil {
return err
}

err = h.grandpa.network.SendMessage(from, cm)
if err != nil {
return err
}
Comment thread
kishansagathiya marked this conversation as resolved.

logger.Debugf(
"sending catch up response with hash %s for round %d and set id %d",
resp.Hash, msg.Round, msg.SetID)
return resp.ToConsensusMessage()
"successfully sent catch up response with hash %s for round %d and set id %d, to %s",
resp.Hash, h.grandpa.state.round, h.grandpa.state.setID, from)

return nil
}

func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error {
Expand All @@ -182,14 +246,9 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error {
}

logger.Debugf(
"received catch up response with hash %s for round %d and set id %d",
"processing catch up response with hash %s for round %d and set id %d",
msg.Hash, msg.Round, msg.SetID)

// TODO: re-add catch-up logic (#1531)
if true {
return nil
}

// if we aren't currently expecting a catch up response, return
if !h.grandpa.paused.Load().(bool) {
logger.Debug("not currently paused, ignoring catch up response")
Expand All @@ -200,7 +259,7 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error {
return ErrSetIDMismatch
}

if msg.Round != h.grandpa.state.round-1 {
if msg.Round <= h.grandpa.state.round {
return ErrInvalidCatchUpResponseRound
}

Expand Down Expand Up @@ -233,6 +292,7 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error {
// update state and signal to grandpa we are ready to initiate
head, err := h.grandpa.blockState.GetHeader(msg.Hash)
if err != nil {
logger.Debugf("failed to process catch up response for round %d, storing the catch up response to retry", msg.Round)
return err
}

Expand Down
25 changes: 19 additions & 6 deletions lib/grandpa/message_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ type tracker struct {
stopped chan struct{}

catchUpResponseMessageMutex sync.Mutex
// round(uint64) is used as key and *CatchUpResponse as value
catchUpResponseMessages map[uint64]*CatchUpResponse
// block hash is used as key and *CatchUpResponse as value
catchUpResponseMessages map[common.Hash]*networkCatchUpResponseMessage
}

func newTracker(bs BlockState, handler *MessageHandler) *tracker {
Expand All @@ -39,7 +39,7 @@ func newTracker(bs BlockState, handler *MessageHandler) *tracker {
mapLock: sync.Mutex{},
in: bs.GetImportedBlockNotifierChannel(),
stopped: make(chan struct{}),
catchUpResponseMessages: make(map[uint64]*CatchUpResponse),
catchUpResponseMessages: make(map[common.Hash]*networkCatchUpResponseMessage),
}
}

Expand Down Expand Up @@ -75,10 +75,11 @@ func (t *tracker) addCommit(cm *CommitMessage) {
t.commitMessages[cm.Vote.Hash] = cm
}

func (t *tracker) addCatchUpResponse(cr *CatchUpResponse) {
func (t *tracker) addCatchUpResponse(cr *networkCatchUpResponseMessage) {
t.catchUpResponseMessageMutex.Lock()
defer t.catchUpResponseMessageMutex.Unlock()
t.catchUpResponseMessages[cr.Round] = cr

t.catchUpResponseMessages[cr.msg.Hash] = cr
}

func (t *tracker) handleBlocks() {
Expand Down Expand Up @@ -110,15 +111,27 @@ func (t *tracker) handleBlock(b *types.Block) {
}
}

// TODO: Check if we should delete all vote messages for h,
// if fail to process a few of the vote messages.
delete(t.voteMessages, h)
}

if cm, has := t.commitMessages[h]; has {
_, err := t.handler.handleMessage("", cm)
if err != nil {
logger.Warnf("failed to handle commit message %v: %s", cm, err)
} else {
delete(t.commitMessages, h)
}
}

delete(t.commitMessages, h)
// TODO: Can I use the same mapLock or do I need to use catchUpResponseLock?
Comment thread
kishansagathiya marked this conversation as resolved.
Outdated
if cr, has := t.catchUpResponseMessages[h]; has {
_, err := t.handler.handleMessage(cr.from, cr.msg)
if err != nil {
logger.Warnf("failed to handle catch up response message %v: %s", cr, err)
} else {
delete(t.catchUpResponseMessages, h)
}
}
}
5 changes: 5 additions & 0 deletions lib/grandpa/vote_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ type networkVoteMessage struct {
msg *VoteMessage
}

type networkCatchUpResponseMessage struct {
from peer.ID
msg *CatchUpResponse
}

// receiveVoteMessages receives messages from the in channel until a grandpa round finishes.
func (s *Service) receiveVoteMessages(ctx context.Context) {
for {
Expand Down
1 change: 1 addition & 0 deletions lib/runtime/wasmer/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func NewInstance(code []byte, cfg *Config) (*Instance, error) {
codeHash: cfg.CodeHash,
}

// TODO: log this error in debug, trace or warn mode. Do not ignore it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just return it?

inst.version, _ = inst.Version()
return inst, nil
}
Expand Down