Skip to content

Commit

Permalink
Merge pull request #539 from jbhurat/impossible-reorg-fix-v1
Browse files Browse the repository at this point in the history
consensus/istanbul: Race condition causing "Impossible reorg" error
  • Loading branch information
jpmsam authored Feb 12, 2019
2 parents 07c76cc + 1ddf595 commit 212b25c
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 10 deletions.
51 changes: 46 additions & 5 deletions consensus/istanbul/backend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,25 @@
package backend

import (
"bytes"
"errors"
"io/ioutil"
"math/big"
"reflect"

"github.com/ethereum/go-ethereum/core/types"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/golang-lru"
)

const (
istanbulMsg = 0x11
NewBlockMsg = 0x07
)

var (
Expand All @@ -44,6 +52,15 @@ func (sb *backend) Protocol() consensus.Protocol {
}
}

func (sb *backend) decode(msg p2p.Msg) ([]byte, common.Hash, error) {
var data []byte
if err := msg.Decode(&data); err != nil {
return nil, common.Hash{}, errDecodeFailed
}

return data, istanbul.RLPHash(data), nil
}

// HandleMsg implements consensus.Handler.HandleMsg
func (sb *backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) {
sb.coreMu.Lock()
Expand All @@ -54,13 +71,11 @@ func (sb *backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) {
return true, istanbul.ErrStoppedEngine
}

var data []byte
if err := msg.Decode(&data); err != nil {
data, hash, err := sb.decode(msg)
if err != nil {
return true, errDecodeFailed
}

hash := istanbul.RLPHash(data)

// Mark peer's message
ms, ok := sb.recentMessages.Get(addr)
var m *lru.ARCCache
Expand All @@ -84,6 +99,32 @@ func (sb *backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) {

return true, nil
}
if msg.Code == NewBlockMsg && sb.core.IsProposer() { // eth.NewBlockMsg: import cycle
// this case is to safeguard the race of similar block which gets propagated from other node while this node is proposing
// as p2p.Msg can only be decoded once (get EOF for any subsequence read), we need to make sure the payload is restored after we decode it
log.Debug("Proposer received NewBlockMsg", "size", msg.Size, "payload.type", reflect.TypeOf(msg.Payload), "sender", addr)
if reader, ok := msg.Payload.(*bytes.Reader); ok {
payload, err := ioutil.ReadAll(reader)
if err != nil {
return true, err
}
reader.Reset(payload) // ready to be decoded
defer reader.Reset(payload) // restore so main eth/handler can decode
var request struct { // this has to be same as eth/protocol.go#newBlockData as we are reading NewBlockMsg
Block *types.Block
TD *big.Int
}
if err := msg.Decode(&request); err != nil {
log.Debug("Proposer was unable to decode the NewBlockMsg", "error", err)
return false, nil
}
newRequestedBlock := request.Block
if newRequestedBlock.Header().MixDigest == types.IstanbulDigest && sb.core.IsCurrentProposal(newRequestedBlock.Hash()) {
log.Debug("Proposer already proposed this block", "hash", newRequestedBlock.Hash(), "sender", addr)
return true, nil
}
}
}
return false, nil
}

Expand Down
114 changes: 113 additions & 1 deletion consensus/istanbul/backend/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package backend

import (
"bytes"
"io/ioutil"
"math/big"
"testing"

"github.com/ethereum/go-ethereum/core/types"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/golang-lru"
)

func TestIstanbulMessage(t *testing.T) {
Expand Down Expand Up @@ -70,3 +75,110 @@ func makeMsg(msgcode uint64, data interface{}) p2p.Msg {
size, r, _ := rlp.EncodeToReader(data)
return p2p.Msg{Code: msgcode, Size: uint32(size), Payload: r}
}

func TestHandleNewBlockMessage_whenTypical(t *testing.T) {
_, backend := newBlockChain(1)
arbitraryAddress := common.StringToAddress("arbitrary")
arbitraryBlock, arbitraryP2PMessage := buildArbitraryP2PNewBlockMessage(t, false)
postAndWait(backend, arbitraryBlock, t)

handled, err := backend.HandleMsg(arbitraryAddress, arbitraryP2PMessage)

if err != nil {
t.Errorf("expected message being handled successfully but got %s", err)
}
if !handled {
t.Errorf("expected message being handled but not")
}
if _, err := ioutil.ReadAll(arbitraryP2PMessage.Payload); err != nil {
t.Errorf("expected p2p message payload is restored")
}
}

func TestHandleNewBlockMessage_whenNotAProposedBlock(t *testing.T) {
_, backend := newBlockChain(1)
arbitraryAddress := common.StringToAddress("arbitrary")
_, arbitraryP2PMessage := buildArbitraryP2PNewBlockMessage(t, false)
postAndWait(backend, types.NewBlock(&types.Header{
Number: big.NewInt(1),
Root: common.StringToHash("someroot"),
GasLimit: 1,
MixDigest: types.IstanbulDigest,
}, nil, nil, nil), t)

handled, err := backend.HandleMsg(arbitraryAddress, arbitraryP2PMessage)

if err != nil {
t.Errorf("expected message being handled successfully but got %s", err)
}
if handled {
t.Errorf("expected message not being handled")
}
if _, err := ioutil.ReadAll(arbitraryP2PMessage.Payload); err != nil {
t.Errorf("expected p2p message payload is restored")
}
}

func TestHandleNewBlockMessage_whenFailToDecode(t *testing.T) {
_, backend := newBlockChain(1)
arbitraryAddress := common.StringToAddress("arbitrary")
_, arbitraryP2PMessage := buildArbitraryP2PNewBlockMessage(t, true)
postAndWait(backend, types.NewBlock(&types.Header{
Number: big.NewInt(1),
GasLimit: 1,
MixDigest: types.IstanbulDigest,
}, nil, nil, nil), t)

handled, err := backend.HandleMsg(arbitraryAddress, arbitraryP2PMessage)

if err != nil {
t.Errorf("expected message being handled successfully but got %s", err)
}
if handled {
t.Errorf("expected message not being handled")
}
if _, err := ioutil.ReadAll(arbitraryP2PMessage.Payload); err != nil {
t.Errorf("expected p2p message payload is restored")
}
}

func postAndWait(backend *backend, block *types.Block, t *testing.T) {
eventSub := backend.EventMux().Subscribe(istanbul.RequestEvent{})
defer eventSub.Unsubscribe()
stop := make(chan struct{}, 1)
eventLoop := func() {
select {
case <-eventSub.Chan():
stop <- struct{}{}
}
}
go eventLoop()
if err := backend.EventMux().Post(istanbul.RequestEvent{
Proposal: block,
}); err != nil {
t.Fatalf("%s", err)
}
<-stop
}

func buildArbitraryP2PNewBlockMessage(t *testing.T, invalidMsg bool) (*types.Block, p2p.Msg) {
arbitraryBlock := types.NewBlock(&types.Header{
Number: big.NewInt(1),
GasLimit: 0,
MixDigest: types.IstanbulDigest,
}, nil, nil, nil)
request := []interface{}{&arbitraryBlock, big.NewInt(1)}
if invalidMsg {
request = []interface{}{"invalid msg"}
}
size, r, err := rlp.EncodeToReader(request)
if err != nil {
t.Fatalf("can't encode due to %s", err)
}
payload, err := ioutil.ReadAll(r)
if err != nil {
t.Fatalf("can't read payload due to %s", err)
}
arbitraryP2PMessage := p2p.Msg{Code: 0x07, Size: uint32(size), Payload: bytes.NewReader(payload)}
return arbitraryBlock, arbitraryP2PMessage
}
10 changes: 7 additions & 3 deletions consensus/istanbul/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,18 @@ func (c *core) currentView() *istanbul.View {
}
}

func (c *core) isProposer() bool {
func (c *core) IsProposer() bool {
v := c.valSet
if v == nil {
return false
}
return v.IsProposer(c.backend.Address())
}

func (c *core) IsCurrentProposal(blockHash common.Hash) bool {
return c.current.pendingRequest != nil && c.current.pendingRequest.Proposal.Hash() == blockHash
}

func (c *core) commit() {
c.setState(StateCommitted)

Expand Down Expand Up @@ -245,7 +249,7 @@ func (c *core) startNewRound(round *big.Int) {
c.valSet.CalcProposer(lastProposer, newView.Round.Uint64())
c.waitingForRoundChange = false
c.setState(StateAcceptRequest)
if roundChange && c.isProposer() && c.current != nil {
if roundChange && c.IsProposer() && c.current != nil {
// If it is locked, propose the old proposal
// If we have pending request, propose pending request
if c.current.IsHashLocked() {
Expand All @@ -259,7 +263,7 @@ func (c *core) startNewRound(round *big.Int) {
}
c.newRoundChangeTimer()

logger.Debug("New round", "new_round", newView.Round, "new_seq", newView.Sequence, "new_proposer", c.valSet.GetProposer(), "valSet", c.valSet.List(), "size", c.valSet.Size(), "isProposer", c.isProposer())
logger.Debug("New round", "new_round", newView.Round, "new_seq", newView.Sequence, "new_proposer", c.valSet.GetProposer(), "valSet", c.valSet.List(), "size", c.valSet.Size(), "IsProposer", c.IsProposer())
}

func (c *core) catchUpRound(view *istanbul.View) {
Expand Down
2 changes: 1 addition & 1 deletion consensus/istanbul/core/preprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (c *core) sendPreprepare(request *istanbul.Request) {
logger := c.logger.New("state", c.state)

// If I'm the proposer and I have the same sequence with the proposal
if c.current.Sequence().Cmp(request.Proposal.Number()) == 0 && c.isProposer() {
if c.current.Sequence().Cmp(request.Proposal.Number()) == 0 && c.IsProposer() {
curView := c.currentView()
preprepare, err := Encode(&istanbul.Preprepare{
View: curView,
Expand Down
10 changes: 10 additions & 0 deletions consensus/istanbul/core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ import (
type Engine interface {
Start() error
Stop() error

IsProposer() bool

// verify if a hash is the same as the proposed block in the current pending request
//
// this is useful when the engine is currently the proposer
//
// pending request is populated right at the preprepare stage so this would give us the earliest verification
// to avoid any race condition of coming propagated blocks
IsCurrentProposal(blockHash common.Hash) bool
}

type State uint64
Expand Down

0 comments on commit 212b25c

Please sign in to comment.