From 2991562244c095bff2e45b8857f4edfa85757557 Mon Sep 17 00:00:00 2001 From: Jitendra Bhurat Date: Mon, 24 Sep 2018 15:29:00 -0400 Subject: [PATCH 01/14] Locking WriteBlockAndState for Istanbul Consensus --- core/blockchain.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/core/blockchain.go b/core/blockchain.go index c5e50695a7..04b6fc7092 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -92,6 +92,7 @@ type BlockChain struct { mu sync.RWMutex // global mutex for locking chain operations chainmu sync.RWMutex // blockchain insertion lock procmu sync.RWMutex // block processor lock + insertmu sync.Mutex // block and state insert lock checkpoint int // checkpoint counts towards the new checkpoint currentBlock *types.Block // Current head of the block chain @@ -813,6 +814,16 @@ func (bc *BlockChain) WriteBlockAndState(block *types.Block, receipts []*types.R bc.wg.Add(1) defer bc.wg.Done() + bc.insertmu.Lock() + defer bc.insertmu.Unlock() + + if bc.config.IsQuorum && bc.config.Istanbul != nil { + if bc.GetBlockByHash(block.Hash()) != nil { + log.Warn("Block already inserted", "number", block.NumberU64(), "hash", block.Hash()) + return CanonStatTy, nil + } + } + // Calculate the total difficulty of the block ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1) if ptd == nil { From b55ba5690e1c858141f9051e0eca18d01dc891a3 Mon Sep 17 00:00:00 2001 From: Jitendra Bhurat Date: Mon, 1 Oct 2018 14:29:37 -0400 Subject: [PATCH 02/14] formatting --- core/blockchain.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 04b6fc7092..c4515dabdd 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -89,10 +89,10 @@ type BlockChain struct { scope event.SubscriptionScope genesisBlock *types.Block - mu sync.RWMutex // global mutex for locking chain operations - chainmu sync.RWMutex // blockchain insertion lock - procmu sync.RWMutex // block processor lock - insertmu sync.Mutex // block and state insert lock + mu sync.RWMutex // global mutex for locking chain operations + chainmu sync.RWMutex // blockchain insertion lock + procmu sync.RWMutex // block processor lock + insertmu sync.Mutex // block and state insert lock checkpoint int // checkpoint counts towards the new checkpoint currentBlock *types.Block // Current head of the block chain From 478139017dc16a8d9b5cc66d4587b04b637502e0 Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Fri, 5 Oct 2018 16:09:10 -0400 Subject: [PATCH 03/14] fix racing condition when proposer receives same block from peer --- consensus/istanbul/backend/handler.go | 28 ++++++++++++++++++++++----- consensus/istanbul/core/core.go | 6 +++--- consensus/istanbul/core/preprepare.go | 2 +- consensus/istanbul/core/types.go | 1 + log/format.go | 14 ++++++++++++-- 5 files changed, 40 insertions(+), 11 deletions(-) diff --git a/consensus/istanbul/backend/handler.go b/consensus/istanbul/backend/handler.go index a338009451..57fd822796 100644 --- a/consensus/istanbul/backend/handler.go +++ b/consensus/istanbul/backend/handler.go @@ -23,7 +23,7 @@ import ( "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/istanbul" "github.com/ethereum/go-ethereum/p2p" - lru "github.com/hashicorp/golang-lru" + "github.com/hashicorp/golang-lru" ) const ( @@ -44,6 +44,15 @@ func (sb *backend) Protocol() consensus.Protocol { } } +func (sb *backend) decode(msg p2p.Msg) ([]byte, common.Hash, error) { + var data []byte + if err := msg.Decode(&data); err != nil { + return nil, common.Hash{}, errDecodeFailed + } + + return data, istanbul.RLPHash(data), nil +} + // HandleMsg implements consensus.Handler.HandleMsg func (sb *backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) { sb.coreMu.Lock() @@ -54,13 +63,11 @@ func (sb *backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) { return true, istanbul.ErrStoppedEngine } - var data []byte - if err := msg.Decode(&data); err != nil { + data, hash, err := sb.decode(msg) + if err != nil { return true, errDecodeFailed } - hash := istanbul.RLPHash(data) - // Mark peer's message ms, ok := sb.recentMessages.Get(addr) var m *lru.ARCCache @@ -84,6 +91,17 @@ func (sb *backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) { return true, nil } + if msg.Code == 0x07 && sb.core.IsProposer() { // eth.NewBlockMsg: import cycle + // this case is to safeguard the race of similar block which gets propagated from other node while this node is proposing + + _, hash, err := sb.decode(msg) + if err != nil { + return true, errDecodeFailed + } + if _, ok := sb.knownMessages.Get(hash); ok { + return true, nil + } + } return false, nil } diff --git a/consensus/istanbul/core/core.go b/consensus/istanbul/core/core.go index 0cede30eef..59bee199e0 100644 --- a/consensus/istanbul/core/core.go +++ b/consensus/istanbul/core/core.go @@ -152,7 +152,7 @@ func (c *core) currentView() *istanbul.View { } } -func (c *core) isProposer() bool { +func (c *core) IsProposer() bool { v := c.valSet if v == nil { return false @@ -240,7 +240,7 @@ func (c *core) startNewRound(round *big.Int) { c.valSet.CalcProposer(lastProposer, newView.Round.Uint64()) c.waitingForRoundChange = false c.setState(StateAcceptRequest) - if roundChange && c.isProposer() && c.current != nil { + if roundChange && c.IsProposer() && c.current != nil { // If it is locked, propose the old proposal // If we have pending request, propose pending request if c.current.IsHashLocked() { @@ -254,7 +254,7 @@ func (c *core) startNewRound(round *big.Int) { } c.newRoundChangeTimer() - logger.Debug("New round", "new_round", newView.Round, "new_seq", newView.Sequence, "new_proposer", c.valSet.GetProposer(), "valSet", c.valSet.List(), "size", c.valSet.Size(), "isProposer", c.isProposer()) + logger.Debug("New round", "new_round", newView.Round, "new_seq", newView.Sequence, "new_proposer", c.valSet.GetProposer(), "valSet", c.valSet.List(), "size", c.valSet.Size(), "IsProposer", c.IsProposer()) } func (c *core) catchUpRound(view *istanbul.View) { diff --git a/consensus/istanbul/core/preprepare.go b/consensus/istanbul/core/preprepare.go index a9e5949672..a4ee295b7d 100644 --- a/consensus/istanbul/core/preprepare.go +++ b/consensus/istanbul/core/preprepare.go @@ -27,7 +27,7 @@ func (c *core) sendPreprepare(request *istanbul.Request) { logger := c.logger.New("state", c.state) // If I'm the proposer and I have the same sequence with the proposal - if c.current.Sequence().Cmp(request.Proposal.Number()) == 0 && c.isProposer() { + if c.current.Sequence().Cmp(request.Proposal.Number()) == 0 && c.IsProposer() { curView := c.currentView() preprepare, err := Encode(&istanbul.Preprepare{ View: curView, diff --git a/consensus/istanbul/core/types.go b/consensus/istanbul/core/types.go index 74e1b22630..71e3885219 100644 --- a/consensus/istanbul/core/types.go +++ b/consensus/istanbul/core/types.go @@ -27,6 +27,7 @@ import ( type Engine interface { Start() error Stop() error + IsProposer() bool } type State uint64 diff --git a/log/format.go b/log/format.go index 0b07abb2ac..11cdb3788c 100644 --- a/log/format.go +++ b/log/format.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "reflect" + "runtime" "strconv" "strings" "sync" @@ -15,7 +16,7 @@ import ( const ( timeFormat = "2006-01-02T15:04:05-0700" - termTimeFormat = "01-02|15:04:05" + termTimeFormat = "01-02|15:04:05.000" floatFormat = 'f' termMsgJust = 40 ) @@ -107,7 +108,7 @@ func TerminalFormat(usecolor bool) Format { lvl := r.Lvl.AlignedString() if atomic.LoadUint32(&locationEnabled) != 0 { // Log origin printing was requested, format the location path and line number - location := fmt.Sprintf("%+v", r.Call) + location := fmt.Sprintf("%+v|%v", r.Call, getGID()) for _, prefix := range locationTrims { location = strings.TrimPrefix(location, prefix) } @@ -361,3 +362,12 @@ func escapeString(s string) string { stringBufPool.Put(e) return ret } + +func getGID() uint64 { + b := make([]byte, 64) + b = b[:runtime.Stack(b, false)] + b = bytes.TrimPrefix(b, []byte("goroutine ")) + b = b[:bytes.IndexByte(b, ' ')] + n, _ := strconv.ParseUint(string(b), 10, 64) + return n +} \ No newline at end of file From 600852a5cb9a58334bbf46115780ef1968c559e3 Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Fri, 5 Oct 2018 16:28:10 -0400 Subject: [PATCH 04/14] reverted change on the core/blockchain.go --- core/blockchain.go | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index c4515dabdd..c5e50695a7 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -89,10 +89,9 @@ type BlockChain struct { scope event.SubscriptionScope genesisBlock *types.Block - mu sync.RWMutex // global mutex for locking chain operations - chainmu sync.RWMutex // blockchain insertion lock - procmu sync.RWMutex // block processor lock - insertmu sync.Mutex // block and state insert lock + mu sync.RWMutex // global mutex for locking chain operations + chainmu sync.RWMutex // blockchain insertion lock + procmu sync.RWMutex // block processor lock checkpoint int // checkpoint counts towards the new checkpoint currentBlock *types.Block // Current head of the block chain @@ -814,16 +813,6 @@ func (bc *BlockChain) WriteBlockAndState(block *types.Block, receipts []*types.R bc.wg.Add(1) defer bc.wg.Done() - bc.insertmu.Lock() - defer bc.insertmu.Unlock() - - if bc.config.IsQuorum && bc.config.Istanbul != nil { - if bc.GetBlockByHash(block.Hash()) != nil { - log.Warn("Block already inserted", "number", block.NumberU64(), "hash", block.Hash()) - return CanonStatTy, nil - } - } - // Calculate the total difficulty of the block ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1) if ptd == nil { From e929349db4c963ca109ad7f44eb692d90e2480d9 Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Fri, 5 Oct 2018 16:30:17 -0400 Subject: [PATCH 05/14] reverted log/format.go --- log/format.go | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/log/format.go b/log/format.go index 11cdb3788c..0b07abb2ac 100644 --- a/log/format.go +++ b/log/format.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "reflect" - "runtime" "strconv" "strings" "sync" @@ -16,7 +15,7 @@ import ( const ( timeFormat = "2006-01-02T15:04:05-0700" - termTimeFormat = "01-02|15:04:05.000" + termTimeFormat = "01-02|15:04:05" floatFormat = 'f' termMsgJust = 40 ) @@ -108,7 +107,7 @@ func TerminalFormat(usecolor bool) Format { lvl := r.Lvl.AlignedString() if atomic.LoadUint32(&locationEnabled) != 0 { // Log origin printing was requested, format the location path and line number - location := fmt.Sprintf("%+v|%v", r.Call, getGID()) + location := fmt.Sprintf("%+v", r.Call) for _, prefix := range locationTrims { location = strings.TrimPrefix(location, prefix) } @@ -362,12 +361,3 @@ func escapeString(s string) string { stringBufPool.Put(e) return ret } - -func getGID() uint64 { - b := make([]byte, 64) - b = b[:runtime.Stack(b, false)] - b = bytes.TrimPrefix(b, []byte("goroutine ")) - b = b[:bytes.IndexByte(b, ' ')] - n, _ := strconv.ParseUint(string(b), 10, 64) - return n -} \ No newline at end of file From 30f506208c359dbeda92c8d9b49c54ba80a2fa26 Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Thu, 11 Oct 2018 14:02:32 -0400 Subject: [PATCH 06/14] only process messages for istanbul --- consensus/istanbul/backend/handler.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/consensus/istanbul/backend/handler.go b/consensus/istanbul/backend/handler.go index 57fd822796..1d3976b8b6 100644 --- a/consensus/istanbul/backend/handler.go +++ b/consensus/istanbul/backend/handler.go @@ -94,12 +94,10 @@ func (sb *backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) { if msg.Code == 0x07 && sb.core.IsProposer() { // eth.NewBlockMsg: import cycle // this case is to safeguard the race of similar block which gets propagated from other node while this node is proposing - _, hash, err := sb.decode(msg) - if err != nil { - return true, errDecodeFailed - } - if _, ok := sb.knownMessages.Get(hash); ok { - return true, nil + if _, hash, err := sb.decode(msg); err == nil { + if _, ok := sb.knownMessages.Get(hash); ok { + return true, nil + } } } return false, nil From 784491e502f2822223faa48c31be148cdfc85ee2 Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Thu, 11 Oct 2018 14:41:51 -0400 Subject: [PATCH 07/14] refined NewBlockMsg handling in Istanbul --- consensus/istanbul/backend/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consensus/istanbul/backend/handler.go b/consensus/istanbul/backend/handler.go index 1d3976b8b6..3b750a7b4f 100644 --- a/consensus/istanbul/backend/handler.go +++ b/consensus/istanbul/backend/handler.go @@ -94,7 +94,7 @@ func (sb *backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) { if msg.Code == 0x07 && sb.core.IsProposer() { // eth.NewBlockMsg: import cycle // this case is to safeguard the race of similar block which gets propagated from other node while this node is proposing - if _, hash, err := sb.decode(msg); err == nil { + if data, hash, err := sb.decode(msg); err == nil && len(data) > 0 { if _, ok := sb.knownMessages.Get(hash); ok { return true, nil } From b9ee7c971932f375ac1301eb44665f53cf37bfd6 Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Thu, 11 Oct 2018 14:57:58 -0400 Subject: [PATCH 08/14] refined NewBlockMsg handling in Istanbul --- consensus/istanbul/backend/handler.go | 4 +++- consensus/istanbul/core/types.go | 9 +++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/consensus/istanbul/backend/handler.go b/consensus/istanbul/backend/handler.go index 3b750a7b4f..496e08d1a5 100644 --- a/consensus/istanbul/backend/handler.go +++ b/consensus/istanbul/backend/handler.go @@ -18,6 +18,7 @@ package backend import ( "errors" + "github.com/ethereum/go-ethereum/consensus/istanbul/core" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" @@ -93,8 +94,9 @@ func (sb *backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) { } if msg.Code == 0x07 && sb.core.IsProposer() { // eth.NewBlockMsg: import cycle // this case is to safeguard the race of similar block which gets propagated from other node while this node is proposing + // need to make sure this is the Gossip message from Istanbul peers - if data, hash, err := sb.decode(msg); err == nil && len(data) > 0 { + if data, hash, err := sb.decode(msg); err == nil && core.IsIstanbulPayload(data) { if _, ok := sb.knownMessages.Get(hash); ok { return true, nil } diff --git a/consensus/istanbul/core/types.go b/consensus/istanbul/core/types.go index 71e3885219..ee363c7ef3 100644 --- a/consensus/istanbul/core/types.go +++ b/consensus/istanbul/core/types.go @@ -160,6 +160,15 @@ func (m *message) String() string { // // helper functions +// Check if a payload is encoded from Istanbul `message` struct +func IsIstanbulPayload(payload []byte) bool { + msg := new(message) + if err := msg.FromPayload(payload, nil); err != nil { + return false + } + return true +} + func Encode(val interface{}) ([]byte, error) { return rlp.EncodeToBytes(val) } From cbba21e9fc0f04a837f87504fe1d2cf34a1e3cc9 Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Thu, 11 Oct 2018 15:05:37 -0400 Subject: [PATCH 09/14] added test cases --- consensus/istanbul/core/types_test.go | 37 +++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/consensus/istanbul/core/types_test.go b/consensus/istanbul/core/types_test.go index a280fba54b..4ef3c8efc0 100644 --- a/consensus/istanbul/core/types_test.go +++ b/consensus/istanbul/core/types_test.go @@ -177,3 +177,40 @@ func TestMessageEncodeDecode(t *testing.T) { testSubject(t) testSubjectWithSignature(t) } + +func TestIsIstanbulPayload_whenTypical(t *testing.T) { + msg := &message{ + Code: uint64(1), + Msg: []byte{0, 1}, + } + payload, err := Encode(msg) + if err != nil { + t.Fatalf("unable to encode message %v", err) + } + + ok := IsIstanbulPayload(payload) + + if !ok { + t.Error("expected true but got false") + } +} + +func TestIsIstanbulPayload_whenPayloadIsEmpty(t *testing.T) { + var payload []byte + + ok := IsIstanbulPayload(payload) + + if ok { + t.Error("expected false but got true") + } +} + +func TestIsIstanbulPayload_whenInvalidPayload(t *testing.T) { + payload := []byte{0, 1} + + ok := IsIstanbulPayload(payload) + + if ok { + t.Error("expected false but got true") + } +} From b0bd4302eb21ae017c2299f012f0922ca255162b Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Thu, 11 Oct 2018 15:55:56 -0400 Subject: [PATCH 10/14] cloned p2p Message to be used for checking --- consensus/istanbul/backend/handler.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/consensus/istanbul/backend/handler.go b/consensus/istanbul/backend/handler.go index 496e08d1a5..d2443f5b34 100644 --- a/consensus/istanbul/backend/handler.go +++ b/consensus/istanbul/backend/handler.go @@ -95,10 +95,19 @@ func (sb *backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) { if msg.Code == 0x07 && sb.core.IsProposer() { // eth.NewBlockMsg: import cycle // this case is to safeguard the race of similar block which gets propagated from other node while this node is proposing // need to make sure this is the Gossip message from Istanbul peers + // as p2p.Msg can only be decoded once, we need to clone it for our own check - if data, hash, err := sb.decode(msg); err == nil && core.IsIstanbulPayload(data) { - if _, ok := sb.knownMessages.Get(hash); ok { - return true, nil + rPipe, wPipe := p2p.MsgPipe() + if err := wPipe.WriteMsg(msg); err != nil { + return false, err + } + if clonedMsg, err := rPipe.ReadMsg(); err != nil { + return false, err + } else { + if data, hash, err := sb.decode(clonedMsg); err == nil && core.IsIstanbulPayload(data) { + if _, ok := sb.knownMessages.Get(hash); ok { + return true, nil + } } } } From b921df8b8655f54b6d6cc6f87448dc09c2c88c01 Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Fri, 12 Oct 2018 11:09:43 -0400 Subject: [PATCH 11/14] restored p2p message payload after processing --- consensus/istanbul/backend/handler.go | 43 +++++++---- consensus/istanbul/backend/handler_test.go | 83 ++++++++++++++++++++++ consensus/istanbul/core/types.go | 9 --- consensus/istanbul/core/types_test.go | 37 ---------- 4 files changed, 111 insertions(+), 61 deletions(-) diff --git a/consensus/istanbul/backend/handler.go b/consensus/istanbul/backend/handler.go index d2443f5b34..3c3cbfe004 100644 --- a/consensus/istanbul/backend/handler.go +++ b/consensus/istanbul/backend/handler.go @@ -17,12 +17,18 @@ package backend import ( + "bytes" "errors" - "github.com/ethereum/go-ethereum/consensus/istanbul/core" + "io/ioutil" + "math/big" + "reflect" + + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/istanbul" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/hashicorp/golang-lru" ) @@ -94,20 +100,27 @@ func (sb *backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) { } if msg.Code == 0x07 && sb.core.IsProposer() { // eth.NewBlockMsg: import cycle // this case is to safeguard the race of similar block which gets propagated from other node while this node is proposing - // need to make sure this is the Gossip message from Istanbul peers - // as p2p.Msg can only be decoded once, we need to clone it for our own check - - rPipe, wPipe := p2p.MsgPipe() - if err := wPipe.WriteMsg(msg); err != nil { - return false, err - } - if clonedMsg, err := rPipe.ReadMsg(); err != nil { - return false, err - } else { - if data, hash, err := sb.decode(clonedMsg); err == nil && core.IsIstanbulPayload(data) { - if _, ok := sb.knownMessages.Get(hash); ok { - return true, nil - } + // as p2p.Msg can only be decoded once (get EOF for any subsequence read), we need to make sure the payload is restored after we decode it + log.Debug("Proposer received NewBlockMsg", "size", msg.Size, "payload.type", reflect.TypeOf(msg.Payload), "sender", addr) + if reader, ok := msg.Payload.(*bytes.Reader); ok { + payload, err := ioutil.ReadAll(reader) + if err != nil { + return true, err + } + reader.Reset(payload) // ready to be decoded + defer reader.Reset(payload) // restore so main eth/handler can decode + var request struct { // this has to be same as eth/protocol.go#newBlockData as we are reading NewBlockMsg + Block *types.Block + TD *big.Int + } + if err := msg.Decode(&request); err != nil { + log.Debug("Proposer was unable to decode the NewBlockMsg", "error", err) + return false, nil + } + newRequestedBlock := request.Block + if newRequestedBlock.Header().MixDigest == types.IstanbulDigest && newRequestedBlock.Hash() == sb.proposedBlockHash { + log.Debug("Proposer already proposed this block", "hash", newRequestedBlock.Hash(), "sender", addr) + return true, nil } } } diff --git a/consensus/istanbul/backend/handler_test.go b/consensus/istanbul/backend/handler_test.go index 690a586be8..a1918a6c47 100644 --- a/consensus/istanbul/backend/handler_test.go +++ b/consensus/istanbul/backend/handler_test.go @@ -17,8 +17,13 @@ package backend import ( + "bytes" + "io/ioutil" + "math/big" "testing" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/istanbul" "github.com/ethereum/go-ethereum/p2p" @@ -70,3 +75,81 @@ func makeMsg(msgcode uint64, data interface{}) p2p.Msg { size, r, _ := rlp.EncodeToReader(data) return p2p.Msg{Code: msgcode, Size: uint32(size), Payload: r} } + +func TestHandleNewBlockMessage_whenTypical(t *testing.T) { + _, backend := newBlockChain(1) + arbitraryAddress := common.StringToAddress("arbitrary") + arbitraryBlock, arbitraryP2PMessage := buildArbitraryP2PNewBlockMessage(t, false) + backend.proposedBlockHash = arbitraryBlock.Hash() + + handled, err := backend.HandleMsg(arbitraryAddress, arbitraryP2PMessage) + + if err != nil { + t.Errorf("expected message being handled successfully but got %s", err) + } + if !handled { + t.Errorf("expected message being handled but not") + } + if _, err := ioutil.ReadAll(arbitraryP2PMessage.Payload); err != nil { + t.Errorf("expected p2p message payload is restored") + } +} + +func TestHandleNewBlockMessage_whenNotAProposedBlock(t *testing.T) { + _, backend := newBlockChain(1) + arbitraryAddress := common.StringToAddress("arbitrary") + _, arbitraryP2PMessage := buildArbitraryP2PNewBlockMessage(t, false) + backend.proposedBlockHash = common.StringToHash("arbitrary hash") + + handled, err := backend.HandleMsg(arbitraryAddress, arbitraryP2PMessage) + + if err != nil { + t.Errorf("expected message being handled successfully but got %s", err) + } + if handled { + t.Errorf("expected message not being handled") + } + if _, err := ioutil.ReadAll(arbitraryP2PMessage.Payload); err != nil { + t.Errorf("expected p2p message payload is restored") + } +} + +func TestHandleNewBlockMessage_whenFailToDecode(t *testing.T) { + _, backend := newBlockChain(1) + arbitraryAddress := common.StringToAddress("arbitrary") + _, arbitraryP2PMessage := buildArbitraryP2PNewBlockMessage(t, true) + backend.proposedBlockHash = common.StringToHash("arbitrary hash") + + handled, err := backend.HandleMsg(arbitraryAddress, arbitraryP2PMessage) + + if err != nil { + t.Errorf("expected message being handled successfully but got %s", err) + } + if handled { + t.Errorf("expected message not being handled") + } + if _, err := ioutil.ReadAll(arbitraryP2PMessage.Payload); err != nil { + t.Errorf("expected p2p message payload is restored") + } +} + +func buildArbitraryP2PNewBlockMessage(t *testing.T, invalidMsg bool) (*types.Block, p2p.Msg) { + arbitraryBlock := types.NewBlock(&types.Header{ + GasLimit: big.NewInt(0), + MixDigest: types.IstanbulDigest, + }, nil, nil, nil) + request := []interface{}{&arbitraryBlock, big.NewInt(1)} + if invalidMsg { + request = []interface{}{"invalid msg"} + } + size, r, err := rlp.EncodeToReader(request) + if err != nil { + t.Fatalf("can't encode due to %s", err) + } + payload, err := ioutil.ReadAll(r) + if err != nil { + t.Fatalf("can't read payload due to %s", err) + } + arbitraryP2PMessage := p2p.Msg{Code: 0x07, Size: uint32(size), Payload: bytes.NewReader(payload)} + return arbitraryBlock, arbitraryP2PMessage +} diff --git a/consensus/istanbul/core/types.go b/consensus/istanbul/core/types.go index ee363c7ef3..71e3885219 100644 --- a/consensus/istanbul/core/types.go +++ b/consensus/istanbul/core/types.go @@ -160,15 +160,6 @@ func (m *message) String() string { // // helper functions -// Check if a payload is encoded from Istanbul `message` struct -func IsIstanbulPayload(payload []byte) bool { - msg := new(message) - if err := msg.FromPayload(payload, nil); err != nil { - return false - } - return true -} - func Encode(val interface{}) ([]byte, error) { return rlp.EncodeToBytes(val) } diff --git a/consensus/istanbul/core/types_test.go b/consensus/istanbul/core/types_test.go index 4ef3c8efc0..a280fba54b 100644 --- a/consensus/istanbul/core/types_test.go +++ b/consensus/istanbul/core/types_test.go @@ -177,40 +177,3 @@ func TestMessageEncodeDecode(t *testing.T) { testSubject(t) testSubjectWithSignature(t) } - -func TestIsIstanbulPayload_whenTypical(t *testing.T) { - msg := &message{ - Code: uint64(1), - Msg: []byte{0, 1}, - } - payload, err := Encode(msg) - if err != nil { - t.Fatalf("unable to encode message %v", err) - } - - ok := IsIstanbulPayload(payload) - - if !ok { - t.Error("expected true but got false") - } -} - -func TestIsIstanbulPayload_whenPayloadIsEmpty(t *testing.T) { - var payload []byte - - ok := IsIstanbulPayload(payload) - - if ok { - t.Error("expected false but got true") - } -} - -func TestIsIstanbulPayload_whenInvalidPayload(t *testing.T) { - payload := []byte{0, 1} - - ok := IsIstanbulPayload(payload) - - if ok { - t.Error("expected false but got true") - } -} From f09bd2c465ef7b9b8e52711a84a24d04d302e4ae Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Mon, 15 Oct 2018 12:31:17 -0400 Subject: [PATCH 12/14] fixed the logic to verify if block is currently proposed --- consensus/istanbul/backend/handler.go | 2 +- consensus/istanbul/backend/handler_test.go | 37 +++++++++++++++++++--- consensus/istanbul/core/core.go | 4 +++ consensus/istanbul/core/types.go | 9 ++++++ 4 files changed, 47 insertions(+), 5 deletions(-) diff --git a/consensus/istanbul/backend/handler.go b/consensus/istanbul/backend/handler.go index 3c3cbfe004..af31bb2078 100644 --- a/consensus/istanbul/backend/handler.go +++ b/consensus/istanbul/backend/handler.go @@ -118,7 +118,7 @@ func (sb *backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) { return false, nil } newRequestedBlock := request.Block - if newRequestedBlock.Header().MixDigest == types.IstanbulDigest && newRequestedBlock.Hash() == sb.proposedBlockHash { + if newRequestedBlock.Header().MixDigest == types.IstanbulDigest && sb.core.IsCurrentProposal(newRequestedBlock.Hash()) { log.Debug("Proposer already proposed this block", "hash", newRequestedBlock.Hash(), "sender", addr) return true, nil } diff --git a/consensus/istanbul/backend/handler_test.go b/consensus/istanbul/backend/handler_test.go index a1918a6c47..f3e8e2cdd2 100644 --- a/consensus/istanbul/backend/handler_test.go +++ b/consensus/istanbul/backend/handler_test.go @@ -28,7 +28,7 @@ import ( "github.com/ethereum/go-ethereum/consensus/istanbul" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" - lru "github.com/hashicorp/golang-lru" + "github.com/hashicorp/golang-lru" ) func TestIstanbulMessage(t *testing.T) { @@ -80,7 +80,7 @@ func TestHandleNewBlockMessage_whenTypical(t *testing.T) { _, backend := newBlockChain(1) arbitraryAddress := common.StringToAddress("arbitrary") arbitraryBlock, arbitraryP2PMessage := buildArbitraryP2PNewBlockMessage(t, false) - backend.proposedBlockHash = arbitraryBlock.Hash() + postAndWait(backend, arbitraryBlock, t) handled, err := backend.HandleMsg(arbitraryAddress, arbitraryP2PMessage) @@ -99,7 +99,12 @@ func TestHandleNewBlockMessage_whenNotAProposedBlock(t *testing.T) { _, backend := newBlockChain(1) arbitraryAddress := common.StringToAddress("arbitrary") _, arbitraryP2PMessage := buildArbitraryP2PNewBlockMessage(t, false) - backend.proposedBlockHash = common.StringToHash("arbitrary hash") + postAndWait(backend, types.NewBlock(&types.Header{ + Number: big.NewInt(1), + Root: common.StringToHash("someroot"), + GasLimit: big.NewInt(1), + MixDigest: types.IstanbulDigest, + }, nil, nil, nil), t) handled, err := backend.HandleMsg(arbitraryAddress, arbitraryP2PMessage) @@ -118,7 +123,11 @@ func TestHandleNewBlockMessage_whenFailToDecode(t *testing.T) { _, backend := newBlockChain(1) arbitraryAddress := common.StringToAddress("arbitrary") _, arbitraryP2PMessage := buildArbitraryP2PNewBlockMessage(t, true) - backend.proposedBlockHash = common.StringToHash("arbitrary hash") + postAndWait(backend, types.NewBlock(&types.Header{ + Number: big.NewInt(1), + GasLimit: big.NewInt(1), + MixDigest: types.IstanbulDigest, + }, nil, nil, nil), t) handled, err := backend.HandleMsg(arbitraryAddress, arbitraryP2PMessage) @@ -133,8 +142,28 @@ func TestHandleNewBlockMessage_whenFailToDecode(t *testing.T) { } } +func postAndWait(backend *backend, block *types.Block, t *testing.T) { + eventSub := backend.EventMux().Subscribe(istanbul.RequestEvent{}) + defer eventSub.Unsubscribe() + stop := make(chan struct{}, 1) + eventLoop := func() { + select { + case <-eventSub.Chan(): + stop <- struct{}{} + } + } + go eventLoop() + if err := backend.EventMux().Post(istanbul.RequestEvent{ + Proposal: block, + }); err != nil { + t.Fatalf("%s", err) + } + <-stop +} + func buildArbitraryP2PNewBlockMessage(t *testing.T, invalidMsg bool) (*types.Block, p2p.Msg) { arbitraryBlock := types.NewBlock(&types.Header{ + Number: big.NewInt(1), GasLimit: big.NewInt(0), MixDigest: types.IstanbulDigest, }, nil, nil, nil) diff --git a/consensus/istanbul/core/core.go b/consensus/istanbul/core/core.go index 59bee199e0..47faff3ec4 100644 --- a/consensus/istanbul/core/core.go +++ b/consensus/istanbul/core/core.go @@ -160,6 +160,10 @@ func (c *core) IsProposer() bool { return v.IsProposer(c.backend.Address()) } +func (c *core) IsCurrentProposal(blockHash common.Hash) bool { + return c.current.pendingRequest != nil && c.current.pendingRequest.Proposal.Hash() == blockHash +} + func (c *core) commit() { c.setState(StateCommitted) diff --git a/consensus/istanbul/core/types.go b/consensus/istanbul/core/types.go index 71e3885219..da202cb2cd 100644 --- a/consensus/istanbul/core/types.go +++ b/consensus/istanbul/core/types.go @@ -27,7 +27,16 @@ import ( type Engine interface { Start() error Stop() error + IsProposer() bool + + // verify if a hash is the same as the proposed block in the current pending request + // + // this is useful when the engine is currently the proposer + // + // pending request is populated right at the preprepare stage so this would give us the earliest verification + // to avoid any race condition of coming propagated blocks + IsCurrentProposal(blockHash common.Hash) bool } type State uint64 From 1007ea93e268a790f98418a106f0dea4717e4313 Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Tue, 11 Dec 2018 11:28:26 -0500 Subject: [PATCH 13/14] fixed unit test --- consensus/istanbul/backend/handler_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/consensus/istanbul/backend/handler_test.go b/consensus/istanbul/backend/handler_test.go index f3e8e2cdd2..c6c41f4e89 100644 --- a/consensus/istanbul/backend/handler_test.go +++ b/consensus/istanbul/backend/handler_test.go @@ -102,7 +102,7 @@ func TestHandleNewBlockMessage_whenNotAProposedBlock(t *testing.T) { postAndWait(backend, types.NewBlock(&types.Header{ Number: big.NewInt(1), Root: common.StringToHash("someroot"), - GasLimit: big.NewInt(1), + GasLimit: 1, MixDigest: types.IstanbulDigest, }, nil, nil, nil), t) @@ -125,7 +125,7 @@ func TestHandleNewBlockMessage_whenFailToDecode(t *testing.T) { _, arbitraryP2PMessage := buildArbitraryP2PNewBlockMessage(t, true) postAndWait(backend, types.NewBlock(&types.Header{ Number: big.NewInt(1), - GasLimit: big.NewInt(1), + GasLimit: 1, MixDigest: types.IstanbulDigest, }, nil, nil, nil), t) @@ -164,7 +164,7 @@ func postAndWait(backend *backend, block *types.Block, t *testing.T) { func buildArbitraryP2PNewBlockMessage(t *testing.T, invalidMsg bool) (*types.Block, p2p.Msg) { arbitraryBlock := types.NewBlock(&types.Header{ Number: big.NewInt(1), - GasLimit: big.NewInt(0), + GasLimit: 0, MixDigest: types.IstanbulDigest, }, nil, nil, nil) request := []interface{}{&arbitraryBlock, big.NewInt(1)} From 1ddf595c343b6cca7121ed944c29846f805505f8 Mon Sep 17 00:00:00 2001 From: Jitendra Bhurat Date: Tue, 12 Feb 2019 15:36:05 -0500 Subject: [PATCH 14/14] Using NewBlockMsg constant instead of hardcoded 0x07 --- consensus/istanbul/backend/handler.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/consensus/istanbul/backend/handler.go b/consensus/istanbul/backend/handler.go index af31bb2078..7be02bea22 100644 --- a/consensus/istanbul/backend/handler.go +++ b/consensus/istanbul/backend/handler.go @@ -35,6 +35,7 @@ import ( const ( istanbulMsg = 0x11 + NewBlockMsg = 0x07 ) var ( @@ -98,7 +99,7 @@ func (sb *backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) { return true, nil } - if msg.Code == 0x07 && sb.core.IsProposer() { // eth.NewBlockMsg: import cycle + if msg.Code == NewBlockMsg && sb.core.IsProposer() { // eth.NewBlockMsg: import cycle // this case is to safeguard the race of similar block which gets propagated from other node while this node is proposing // as p2p.Msg can only be decoded once (get EOF for any subsequence read), we need to make sure the payload is restored after we decode it log.Debug("Proposer received NewBlockMsg", "size", msg.Size, "payload.type", reflect.TypeOf(msg.Payload), "sender", addr)