Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consensus/istanbul: Race condition causing "Impossible reorg" error #539

Merged
merged 17 commits into from
Feb 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions consensus/istanbul/backend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,25 @@
package backend

import (
"bytes"
"errors"
"io/ioutil"
"math/big"
"reflect"

"github.com/ethereum/go-ethereum/core/types"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/golang-lru"
)

const (
istanbulMsg = 0x11
NewBlockMsg = 0x07
)

var (
Expand All @@ -44,6 +52,15 @@ func (sb *backend) Protocol() consensus.Protocol {
}
}

func (sb *backend) decode(msg p2p.Msg) ([]byte, common.Hash, error) {
var data []byte
if err := msg.Decode(&data); err != nil {
return nil, common.Hash{}, errDecodeFailed
}

return data, istanbul.RLPHash(data), nil
}

// HandleMsg implements consensus.Handler.HandleMsg
func (sb *backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) {
sb.coreMu.Lock()
Expand All @@ -54,13 +71,11 @@ func (sb *backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) {
return true, istanbul.ErrStoppedEngine
}

var data []byte
if err := msg.Decode(&data); err != nil {
data, hash, err := sb.decode(msg)
if err != nil {
return true, errDecodeFailed
}

hash := istanbul.RLPHash(data)

// Mark peer's message
ms, ok := sb.recentMessages.Get(addr)
var m *lru.ARCCache
Expand All @@ -84,6 +99,32 @@ func (sb *backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) {

return true, nil
}
if msg.Code == NewBlockMsg && sb.core.IsProposer() { // eth.NewBlockMsg: import cycle
// this case is to safeguard the race of similar block which gets propagated from other node while this node is proposing
// as p2p.Msg can only be decoded once (get EOF for any subsequence read), we need to make sure the payload is restored after we decode it
log.Debug("Proposer received NewBlockMsg", "size", msg.Size, "payload.type", reflect.TypeOf(msg.Payload), "sender", addr)
if reader, ok := msg.Payload.(*bytes.Reader); ok {
payload, err := ioutil.ReadAll(reader)
if err != nil {
return true, err
}
reader.Reset(payload) // ready to be decoded
defer reader.Reset(payload) // restore so main eth/handler can decode
var request struct { // this has to be same as eth/protocol.go#newBlockData as we are reading NewBlockMsg
Block *types.Block
TD *big.Int
}
if err := msg.Decode(&request); err != nil {
log.Debug("Proposer was unable to decode the NewBlockMsg", "error", err)
return false, nil
}
newRequestedBlock := request.Block
if newRequestedBlock.Header().MixDigest == types.IstanbulDigest && sb.core.IsCurrentProposal(newRequestedBlock.Hash()) {
log.Debug("Proposer already proposed this block", "hash", newRequestedBlock.Hash(), "sender", addr)
return true, nil
}
}
}
return false, nil
}

Expand Down
114 changes: 113 additions & 1 deletion consensus/istanbul/backend/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package backend

import (
"bytes"
"io/ioutil"
"math/big"
"testing"

"github.com/ethereum/go-ethereum/core/types"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/golang-lru"
)

func TestIstanbulMessage(t *testing.T) {
Expand Down Expand Up @@ -70,3 +75,110 @@ func makeMsg(msgcode uint64, data interface{}) p2p.Msg {
size, r, _ := rlp.EncodeToReader(data)
return p2p.Msg{Code: msgcode, Size: uint32(size), Payload: r}
}

func TestHandleNewBlockMessage_whenTypical(t *testing.T) {
_, backend := newBlockChain(1)
arbitraryAddress := common.StringToAddress("arbitrary")
arbitraryBlock, arbitraryP2PMessage := buildArbitraryP2PNewBlockMessage(t, false)
postAndWait(backend, arbitraryBlock, t)

handled, err := backend.HandleMsg(arbitraryAddress, arbitraryP2PMessage)

if err != nil {
t.Errorf("expected message being handled successfully but got %s", err)
}
if !handled {
t.Errorf("expected message being handled but not")
}
if _, err := ioutil.ReadAll(arbitraryP2PMessage.Payload); err != nil {
t.Errorf("expected p2p message payload is restored")
}
}

func TestHandleNewBlockMessage_whenNotAProposedBlock(t *testing.T) {
_, backend := newBlockChain(1)
arbitraryAddress := common.StringToAddress("arbitrary")
_, arbitraryP2PMessage := buildArbitraryP2PNewBlockMessage(t, false)
postAndWait(backend, types.NewBlock(&types.Header{
Number: big.NewInt(1),
Root: common.StringToHash("someroot"),
GasLimit: 1,
MixDigest: types.IstanbulDigest,
}, nil, nil, nil), t)

handled, err := backend.HandleMsg(arbitraryAddress, arbitraryP2PMessage)

if err != nil {
t.Errorf("expected message being handled successfully but got %s", err)
}
if handled {
t.Errorf("expected message not being handled")
}
if _, err := ioutil.ReadAll(arbitraryP2PMessage.Payload); err != nil {
t.Errorf("expected p2p message payload is restored")
}
}

func TestHandleNewBlockMessage_whenFailToDecode(t *testing.T) {
_, backend := newBlockChain(1)
arbitraryAddress := common.StringToAddress("arbitrary")
_, arbitraryP2PMessage := buildArbitraryP2PNewBlockMessage(t, true)
postAndWait(backend, types.NewBlock(&types.Header{
Number: big.NewInt(1),
GasLimit: 1,
MixDigest: types.IstanbulDigest,
}, nil, nil, nil), t)

handled, err := backend.HandleMsg(arbitraryAddress, arbitraryP2PMessage)

if err != nil {
t.Errorf("expected message being handled successfully but got %s", err)
}
if handled {
t.Errorf("expected message not being handled")
}
if _, err := ioutil.ReadAll(arbitraryP2PMessage.Payload); err != nil {
t.Errorf("expected p2p message payload is restored")
}
}

func postAndWait(backend *backend, block *types.Block, t *testing.T) {
eventSub := backend.EventMux().Subscribe(istanbul.RequestEvent{})
defer eventSub.Unsubscribe()
stop := make(chan struct{}, 1)
eventLoop := func() {
select {
case <-eventSub.Chan():
stop <- struct{}{}
}
}
go eventLoop()
if err := backend.EventMux().Post(istanbul.RequestEvent{
Proposal: block,
}); err != nil {
t.Fatalf("%s", err)
}
<-stop
}

func buildArbitraryP2PNewBlockMessage(t *testing.T, invalidMsg bool) (*types.Block, p2p.Msg) {
arbitraryBlock := types.NewBlock(&types.Header{
Number: big.NewInt(1),
GasLimit: 0,
MixDigest: types.IstanbulDigest,
}, nil, nil, nil)
request := []interface{}{&arbitraryBlock, big.NewInt(1)}
if invalidMsg {
request = []interface{}{"invalid msg"}
}
size, r, err := rlp.EncodeToReader(request)
if err != nil {
t.Fatalf("can't encode due to %s", err)
}
payload, err := ioutil.ReadAll(r)
if err != nil {
t.Fatalf("can't read payload due to %s", err)
}
arbitraryP2PMessage := p2p.Msg{Code: 0x07, Size: uint32(size), Payload: bytes.NewReader(payload)}
return arbitraryBlock, arbitraryP2PMessage
}
10 changes: 7 additions & 3 deletions consensus/istanbul/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,18 @@ func (c *core) currentView() *istanbul.View {
}
}

func (c *core) isProposer() bool {
func (c *core) IsProposer() bool {
v := c.valSet
if v == nil {
return false
}
return v.IsProposer(c.backend.Address())
}

func (c *core) IsCurrentProposal(blockHash common.Hash) bool {
return c.current.pendingRequest != nil && c.current.pendingRequest.Proposal.Hash() == blockHash
}

func (c *core) commit() {
c.setState(StateCommitted)

Expand Down Expand Up @@ -245,7 +249,7 @@ func (c *core) startNewRound(round *big.Int) {
c.valSet.CalcProposer(lastProposer, newView.Round.Uint64())
c.waitingForRoundChange = false
c.setState(StateAcceptRequest)
if roundChange && c.isProposer() && c.current != nil {
if roundChange && c.IsProposer() && c.current != nil {
// If it is locked, propose the old proposal
// If we have pending request, propose pending request
if c.current.IsHashLocked() {
Expand All @@ -259,7 +263,7 @@ func (c *core) startNewRound(round *big.Int) {
}
c.newRoundChangeTimer()

logger.Debug("New round", "new_round", newView.Round, "new_seq", newView.Sequence, "new_proposer", c.valSet.GetProposer(), "valSet", c.valSet.List(), "size", c.valSet.Size(), "isProposer", c.isProposer())
logger.Debug("New round", "new_round", newView.Round, "new_seq", newView.Sequence, "new_proposer", c.valSet.GetProposer(), "valSet", c.valSet.List(), "size", c.valSet.Size(), "IsProposer", c.IsProposer())
}

func (c *core) catchUpRound(view *istanbul.View) {
Expand Down
2 changes: 1 addition & 1 deletion consensus/istanbul/core/preprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (c *core) sendPreprepare(request *istanbul.Request) {
logger := c.logger.New("state", c.state)

// If I'm the proposer and I have the same sequence with the proposal
if c.current.Sequence().Cmp(request.Proposal.Number()) == 0 && c.isProposer() {
if c.current.Sequence().Cmp(request.Proposal.Number()) == 0 && c.IsProposer() {
curView := c.currentView()
preprepare, err := Encode(&istanbul.Preprepare{
View: curView,
Expand Down
10 changes: 10 additions & 0 deletions consensus/istanbul/core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ import (
type Engine interface {
Start() error
Stop() error

IsProposer() bool

// verify if a hash is the same as the proposed block in the current pending request
//
// this is useful when the engine is currently the proposer
//
// pending request is populated right at the preprepare stage so this would give us the earliest verification
// to avoid any race condition of coming propagated blocks
IsCurrentProposal(blockHash common.Hash) bool
}

type State uint64
Expand Down