Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix up issue with Istanbul not call calling eth sub-protocol #1411

Merged
merged 3 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ func (s *Ethereum) Protocols() []p2p.Protocol {
// /Quorum
// add additional quorum consensus protocol if set and if not set to "eth", e.g. istanbul
if quorumConsensusProtocolName != "" && quorumConsensusProtocolName != eth.ProtocolName {
quorumProtos := s.quorumConsensusProtocols()
quorumProtos := s.quorumConsensusProtocols((*ethHandler)(s.handler), s.networkID, s.ethDialCandidates)
protos = append(protos, quorumProtos...)
}
// /end Quorum
Expand Down
62 changes: 33 additions & 29 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/qlight"
"github.com/ethereum/go-ethereum/trie"
Expand Down Expand Up @@ -686,10 +687,11 @@ func (h *handler) FindPeers(targets map[common.Address]bool) map[common.Address]
// The Run method starts the protocol and is called by the p2p server. The quorum consensus subprotocol,
// leverages the peer created and managed by the "eth" subprotocol.
// The quorum consensus protocol requires that the "eth" protocol is running as well.
func (h *handler) makeQuorumConsensusProtocol(ProtoName string, version uint, length uint64) p2p.Protocol {
func (h *handler) makeQuorumConsensusProtocol(protoName string, version uint, length uint64, backend eth.Backend, network uint64, dnsdisc enode.Iterator) p2p.Protocol {
log.Debug("registering qouorum protocol ", "protoName", protoName, "version", version)

return p2p.Protocol{
Name: ProtoName,
Name: protoName,
Version: version,
Length: length,
// no new peer created, uses the "eth" peer, so no peer management needed.
Expand All @@ -716,10 +718,11 @@ func (h *handler) makeQuorumConsensusProtocol(ProtoName string, version uint, le
log.Warn("full p2p peer", "id", p2pPeerId, "ethPeer", ethPeer)
}
if ethPeer != nil {
p.Log().Debug("consensus subprotocol retrieved eth peer from peerset", "ethPeer.id", p2pPeerId, "ProtoName", ProtoName)
p.Log().Debug("consensus subprotocol retrieved eth peer from peerset", "ethPeer.id", p2pPeerId, "ProtoName", protoName)
// add the rw protocol for the quorum subprotocol to the eth peer.
ethPeer.AddConsensusProtoRW(rw)
return h.handleConsensusLoop(p, rw)
peer := eth.NewPeer(version, p, rw, h.txpool)
return h.handleConsensusLoop(peer, rw, backend)
}
p.Log().Error("consensus subprotocol retrieved nil eth peer from peerset", "ethPeer.id", p2pPeerId)
return errEthPeerNil
Expand All @@ -728,32 +731,28 @@ func (h *handler) makeQuorumConsensusProtocol(ProtoName string, version uint, le
}
},
NodeInfo: func() interface{} {
return h.NodeInfo()
return eth.NodeInfoFunc(backend.Chain(), network)
},
PeerInfo: func(id enode.ID) interface{} {
if p := h.peers.peer(fmt.Sprintf("%x", id[:8])); p != nil {
return p.Info()
}
if p := h.peers.peer(fmt.Sprintf("%x", id)); p != nil { // TODO:BBO
return p.Info()
}
return nil
return backend.PeerInfo(id)
},
Attributes: []enr.Entry{eth.CurrentENREntry(backend.Chain())},
DialCandidates: dnsdisc,
}
}

func (h *handler) handleConsensusLoop(p *p2p.Peer, protoRW p2p.MsgReadWriter) error {
func (h *handler) handleConsensusLoop(p *eth.Peer, protoRW p2p.MsgReadWriter, backend eth.Backend) error {
// Handle incoming messages until the connection is torn down
for {
if err := h.handleConsensus(p, protoRW); err != nil {
if err := h.handleConsensus(p, protoRW, backend); err != nil {
p.Log().Debug("Ethereum quorum message handling failed", "err", err)
return err
}
}
}

// This is a no-op because the eth handleMsg main loop handle ibf message as well.
func (h *handler) handleConsensus(p *p2p.Peer, protoRW p2p.MsgReadWriter) error {
func (h *handler) handleConsensus(p *eth.Peer, protoRW p2p.MsgReadWriter, backend eth.Backend) error {
// Read the next message from the remote peer (in protoRW), and ensure it's fully consumed
msg, err := protoRW.ReadMsg()
if err != nil {
Expand All @@ -768,19 +767,29 @@ func (h *handler) handleConsensus(p *p2p.Peer, protoRW p2p.MsgReadWriter) error
// istanbulMsg = 0x11, and NewBlockMsg = 0x07.
handled, err := h.handleConsensusMsg(p, msg)
if handled {
p.Log().Debug("consensus message was handled by consensus engine", "handled", handled,
p.Log().Debug("consensus message was handled by consensus engine", "msg", msg.Code,
"quorumConsensusProtocolName", quorumConsensusProtocolName, "err", err)
return err
}

var handlers = eth.ETH_65_FULL_SYNC

p.Log().Trace("Message not handled by sub-protocol", "msg", msg.Code)

if handler := handlers[msg.Code]; handler != nil {
p.Log().Debug("Found eth handler for msg", "msg", msg.Code)
return handler(backend, msg, p)
}

return nil
}

func (h *handler) handleConsensusMsg(p *p2p.Peer, msg p2p.Msg) (bool, error) {
func (h *handler) handleConsensusMsg(p *eth.Peer, msg p2p.Msg) (bool, error) {
if handler, ok := h.engine.(consensus.Handler); ok {
pubKey := p.Node().Pubkey()
addr := crypto.PubkeyToAddress(*pubKey)
handled, err := handler.HandleMsg(addr, msg)

return handled, err
}
return false, nil
Expand All @@ -789,31 +798,26 @@ func (h *handler) handleConsensusMsg(p *p2p.Peer, msg p2p.Msg) (bool, error) {
// makeLegacyProtocol is basically a copy of the eth makeProtocol, but for legacy subprotocols, e.g. "istanbul/99" "istabnul/64"
// If support legacy subprotocols is removed, remove this and associated code as well.
// If quorum is using a legacy protocol then the "eth" subprotocol should not be available.
func (h *handler) makeLegacyProtocol(protoName string, version uint, length uint64) p2p.Protocol {
log.Debug("registering a legacy protocol ", "protoName", protoName)
func (h *handler) makeLegacyProtocol(protoName string, version uint, length uint64, backend eth.Backend, network uint64, dnsdisc enode.Iterator) p2p.Protocol {
log.Debug("registering a legacy protocol ", "protoName", protoName, "version", version)
return p2p.Protocol{
Name: protoName,
Version: version,
Length: length,
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := eth.NewPeer(version, p, rw, h.txpool)
peer.AddConsensusProtoRW(rw)
return h.runEthPeer(peer, func(peer *eth.Peer) error {
return h.handleConsensusLoop(p, rw)
return h.handleConsensusLoop(peer, rw, backend)
})
},
NodeInfo: func() interface{} {
return h.NodeInfo()
return eth.NodeInfoFunc(backend.Chain(), network)
},
PeerInfo: func(id enode.ID) interface{} {
if p := h.peers.peer(fmt.Sprintf("%x", id[:8])); p != nil {
return p.Info()
}
if p := h.peers.peer(fmt.Sprintf("%x", id)); p != nil { // TODO:BBO
return p.Info()
}
return nil
return backend.PeerInfo(id)
},
Attributes: []enr.Entry{eth.CurrentENREntry(backend.Chain())},
DialCandidates: dnsdisc,
}
}

Expand Down
14 changes: 7 additions & 7 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (p *Peer) RequestOneHeader(hash common.Hash) error {
Skip: uint64(0),
Reverse: false,
}
if p.Version() >= ETH66 {
if p.Version() == ETH66 {
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: rand.Uint64(),
GetBlockHeadersPacket: &query,
Expand All @@ -433,7 +433,7 @@ func (p *Peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, re
Skip: uint64(skip),
Reverse: reverse,
}
if p.Version() >= ETH66 {
if p.Version() == ETH66 {
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: rand.Uint64(),
GetBlockHeadersPacket: &query,
Expand All @@ -452,7 +452,7 @@ func (p *Peer) RequestHeadersByNumber(origin uint64, amount int, skip int, rever
Skip: uint64(skip),
Reverse: reverse,
}
if p.Version() >= ETH66 {
if p.Version() == ETH66 {
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: rand.Uint64(),
GetBlockHeadersPacket: &query,
Expand Down Expand Up @@ -481,7 +481,7 @@ func (p *Peer) ExpectPeerMessage(code uint64, content types.Transactions) error
// specified.
func (p *Peer) RequestBodies(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of block bodies", "count", len(hashes))
if p.Version() >= ETH66 {
if p.Version() == ETH66 {
return p2p.Send(p.rw, GetBlockBodiesMsg, &GetBlockBodiesPacket66{
RequestId: rand.Uint64(),
GetBlockBodiesPacket: hashes,
Expand All @@ -494,7 +494,7 @@ func (p *Peer) RequestBodies(hashes []common.Hash) error {
// data, corresponding to the specified hashes.
func (p *Peer) RequestNodeData(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of state data", "count", len(hashes))
if p.Version() >= ETH66 {
if p.Version() == ETH66 {
return p2p.Send(p.rw, GetNodeDataMsg, &GetNodeDataPacket66{
RequestId: rand.Uint64(),
GetNodeDataPacket: hashes,
Expand All @@ -506,7 +506,7 @@ func (p *Peer) RequestNodeData(hashes []common.Hash) error {
// RequestReceipts fetches a batch of transaction receipts from a remote node.
func (p *Peer) RequestReceipts(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
if p.Version() >= ETH66 {
if p.Version() == ETH66 {
return p2p.Send(p.rw, GetReceiptsMsg, &GetReceiptsPacket66{
RequestId: rand.Uint64(),
GetReceiptsPacket: hashes,
Expand All @@ -518,7 +518,7 @@ func (p *Peer) RequestReceipts(hashes []common.Hash) error {
// RequestTxs fetches a batch of transactions from a remote node.
func (p *Peer) RequestTxs(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of transactions", "count", len(hashes))
if p.Version() >= ETH66 {
if p.Version() == ETH66 {
return p2p.Send(p.rw, GetPooledTransactionsMsg, &GetPooledTransactionsPacket66{
RequestId: rand.Uint64(),
GetPooledTransactionsPacket: hashes,
Expand Down
8 changes: 5 additions & 3 deletions eth/quorum_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package eth
import (
"errors"

"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
)

// Quorum: quorum_protocol enables the eth service to return two different protocols, one for the eth mainnet "eth" service,
Expand All @@ -26,7 +28,7 @@ var quorumConsensusProtocolVersions []uint
// protocol Length describe the number of messages support by the protocol/version map[uint]uint64{Istanbul64: 18, Istanbul99: 18, Istanbul100: 18}
var quorumConsensusProtocolLengths map[uint]uint64

func (s *Ethereum) quorumConsensusProtocols() []p2p.Protocol {
func (s *Ethereum) quorumConsensusProtocols(backend eth.Backend, network uint64, dnsdisc enode.Iterator) []p2p.Protocol {
protos := make([]p2p.Protocol, len(quorumConsensusProtocolVersions))
for i, vsn := range quorumConsensusProtocolVersions {
// if we have a legacy protocol, e.g. istanbul/99, istanbul/64 then the protocol handler is will be the "eth"
Expand All @@ -37,14 +39,14 @@ func (s *Ethereum) quorumConsensusProtocols() []p2p.Protocol {
if !ok {
panic("makeProtocol for unknown version")
}
lp := s.handler.makeLegacyProtocol(quorumConsensusProtocolName, vsn, length)
lp := s.handler.makeLegacyProtocol(quorumConsensusProtocolName, vsn, length, backend, network, dnsdisc)
protos[i] = lp
} else {
length, ok := quorumConsensusProtocolLengths[vsn]
if !ok {
panic("makeQuorumConsensusProtocol for unknown version")
}
protos[i] = s.handler.makeQuorumConsensusProtocol(quorumConsensusProtocolName, vsn, length)
protos[i] = s.handler.makeQuorumConsensusProtocol(quorumConsensusProtocolName, vsn, length, backend, network, dnsdisc)
}
}
return protos
Expand Down
4 changes: 2 additions & 2 deletions eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (h *handler) syncTransactions(p *eth.Peer) {
// The eth/65 protocol introduces proper transaction announcements, so instead
// of dripping transactions across multiple peers, just send the entire list as
// an announcement and let the remote side decide what they need (likely nothing).
if p.Version() >= eth.ETH65 {
if p.Version() == eth.ETH65 {
hashes := make([]common.Hash, len(txs))
for i, tx := range txs {
hashes[i] = tx.Hash()
Expand Down Expand Up @@ -96,7 +96,7 @@ func (h *handler) txsyncLoop64() {

// send starts a sending a pack of transactions from the sync.
send := func(s *txsync) {
if s.p.Version() >= eth.ETH65 {
if s.p.Version() == eth.ETH65 {
panic("initial transaction syncer running on eth/65+")
}
// Fill pack with transactions up to the target size.
Expand Down