diff --git a/eth/backend.go b/eth/backend.go index 299c838561..769ca5a4f9 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -758,7 +758,7 @@ func (s *Ethereum) Protocols() []p2p.Protocol { // /Quorum // add additional quorum consensus protocol if set and if not set to "eth", e.g. istanbul if quorumConsensusProtocolName != "" && quorumConsensusProtocolName != eth.ProtocolName { - quorumProtos := s.quorumConsensusProtocols() + quorumProtos := s.quorumConsensusProtocols((*ethHandler)(s.handler), s.networkID, s.ethDialCandidates) protos = append(protos, quorumProtos...) } // /end Quorum diff --git a/eth/handler.go b/eth/handler.go index 488dd2d49e..9106b772d8 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -42,6 +42,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/qlight" "github.com/ethereum/go-ethereum/trie" @@ -686,10 +687,11 @@ func (h *handler) FindPeers(targets map[common.Address]bool) map[common.Address] // The Run method starts the protocol and is called by the p2p server. The quorum consensus subprotocol, // leverages the peer created and managed by the "eth" subprotocol. // The quorum consensus protocol requires that the "eth" protocol is running as well. -func (h *handler) makeQuorumConsensusProtocol(ProtoName string, version uint, length uint64) p2p.Protocol { +func (h *handler) makeQuorumConsensusProtocol(protoName string, version uint, length uint64, backend eth.Backend, network uint64, dnsdisc enode.Iterator) p2p.Protocol { + log.Debug("registering qouorum protocol ", "protoName", protoName, "version", version) return p2p.Protocol{ - Name: ProtoName, + Name: protoName, Version: version, Length: length, // no new peer created, uses the "eth" peer, so no peer management needed. @@ -716,10 +718,11 @@ func (h *handler) makeQuorumConsensusProtocol(ProtoName string, version uint, le log.Warn("full p2p peer", "id", p2pPeerId, "ethPeer", ethPeer) } if ethPeer != nil { - p.Log().Debug("consensus subprotocol retrieved eth peer from peerset", "ethPeer.id", p2pPeerId, "ProtoName", ProtoName) + p.Log().Debug("consensus subprotocol retrieved eth peer from peerset", "ethPeer.id", p2pPeerId, "ProtoName", protoName) // add the rw protocol for the quorum subprotocol to the eth peer. ethPeer.AddConsensusProtoRW(rw) - return h.handleConsensusLoop(p, rw) + peer := eth.NewPeer(version, p, rw, h.txpool) + return h.handleConsensusLoop(peer, rw, backend) } p.Log().Error("consensus subprotocol retrieved nil eth peer from peerset", "ethPeer.id", p2pPeerId) return errEthPeerNil @@ -728,24 +731,20 @@ func (h *handler) makeQuorumConsensusProtocol(ProtoName string, version uint, le } }, NodeInfo: func() interface{} { - return h.NodeInfo() + return eth.NodeInfoFunc(backend.Chain(), network) }, PeerInfo: func(id enode.ID) interface{} { - if p := h.peers.peer(fmt.Sprintf("%x", id[:8])); p != nil { - return p.Info() - } - if p := h.peers.peer(fmt.Sprintf("%x", id)); p != nil { // TODO:BBO - return p.Info() - } - return nil + return backend.PeerInfo(id) }, + Attributes: []enr.Entry{eth.CurrentENREntry(backend.Chain())}, + DialCandidates: dnsdisc, } } -func (h *handler) handleConsensusLoop(p *p2p.Peer, protoRW p2p.MsgReadWriter) error { +func (h *handler) handleConsensusLoop(p *eth.Peer, protoRW p2p.MsgReadWriter, backend eth.Backend) error { // Handle incoming messages until the connection is torn down for { - if err := h.handleConsensus(p, protoRW); err != nil { + if err := h.handleConsensus(p, protoRW, backend); err != nil { p.Log().Debug("Ethereum quorum message handling failed", "err", err) return err } @@ -753,7 +752,7 @@ func (h *handler) handleConsensusLoop(p *p2p.Peer, protoRW p2p.MsgReadWriter) er } // This is a no-op because the eth handleMsg main loop handle ibf message as well. -func (h *handler) handleConsensus(p *p2p.Peer, protoRW p2p.MsgReadWriter) error { +func (h *handler) handleConsensus(p *eth.Peer, protoRW p2p.MsgReadWriter, backend eth.Backend) error { // Read the next message from the remote peer (in protoRW), and ensure it's fully consumed msg, err := protoRW.ReadMsg() if err != nil { @@ -768,19 +767,29 @@ func (h *handler) handleConsensus(p *p2p.Peer, protoRW p2p.MsgReadWriter) error // istanbulMsg = 0x11, and NewBlockMsg = 0x07. handled, err := h.handleConsensusMsg(p, msg) if handled { - p.Log().Debug("consensus message was handled by consensus engine", "handled", handled, + p.Log().Debug("consensus message was handled by consensus engine", "msg", msg.Code, "quorumConsensusProtocolName", quorumConsensusProtocolName, "err", err) return err } + var handlers = eth.ETH_65_FULL_SYNC + + p.Log().Trace("Message not handled by sub-protocol", "msg", msg.Code) + + if handler := handlers[msg.Code]; handler != nil { + p.Log().Debug("Found eth handler for msg", "msg", msg.Code) + return handler(backend, msg, p) + } + return nil } -func (h *handler) handleConsensusMsg(p *p2p.Peer, msg p2p.Msg) (bool, error) { +func (h *handler) handleConsensusMsg(p *eth.Peer, msg p2p.Msg) (bool, error) { if handler, ok := h.engine.(consensus.Handler); ok { pubKey := p.Node().Pubkey() addr := crypto.PubkeyToAddress(*pubKey) handled, err := handler.HandleMsg(addr, msg) + return handled, err } return false, nil @@ -789,31 +798,26 @@ func (h *handler) handleConsensusMsg(p *p2p.Peer, msg p2p.Msg) (bool, error) { // makeLegacyProtocol is basically a copy of the eth makeProtocol, but for legacy subprotocols, e.g. "istanbul/99" "istabnul/64" // If support legacy subprotocols is removed, remove this and associated code as well. // If quorum is using a legacy protocol then the "eth" subprotocol should not be available. -func (h *handler) makeLegacyProtocol(protoName string, version uint, length uint64) p2p.Protocol { - log.Debug("registering a legacy protocol ", "protoName", protoName) +func (h *handler) makeLegacyProtocol(protoName string, version uint, length uint64, backend eth.Backend, network uint64, dnsdisc enode.Iterator) p2p.Protocol { + log.Debug("registering a legacy protocol ", "protoName", protoName, "version", version) return p2p.Protocol{ Name: protoName, Version: version, Length: length, Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := eth.NewPeer(version, p, rw, h.txpool) - peer.AddConsensusProtoRW(rw) return h.runEthPeer(peer, func(peer *eth.Peer) error { - return h.handleConsensusLoop(p, rw) + return h.handleConsensusLoop(peer, rw, backend) }) }, NodeInfo: func() interface{} { - return h.NodeInfo() + return eth.NodeInfoFunc(backend.Chain(), network) }, PeerInfo: func(id enode.ID) interface{} { - if p := h.peers.peer(fmt.Sprintf("%x", id[:8])); p != nil { - return p.Info() - } - if p := h.peers.peer(fmt.Sprintf("%x", id)); p != nil { // TODO:BBO - return p.Info() - } - return nil + return backend.PeerInfo(id) }, + Attributes: []enr.Entry{eth.CurrentENREntry(backend.Chain())}, + DialCandidates: dnsdisc, } } diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 9edba1ae78..ec2f97931e 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -414,7 +414,7 @@ func (p *Peer) RequestOneHeader(hash common.Hash) error { Skip: uint64(0), Reverse: false, } - if p.Version() >= ETH66 { + if p.Version() == ETH66 { return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ RequestId: rand.Uint64(), GetBlockHeadersPacket: &query, @@ -433,7 +433,7 @@ func (p *Peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, re Skip: uint64(skip), Reverse: reverse, } - if p.Version() >= ETH66 { + if p.Version() == ETH66 { return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ RequestId: rand.Uint64(), GetBlockHeadersPacket: &query, @@ -452,7 +452,7 @@ func (p *Peer) RequestHeadersByNumber(origin uint64, amount int, skip int, rever Skip: uint64(skip), Reverse: reverse, } - if p.Version() >= ETH66 { + if p.Version() == ETH66 { return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ RequestId: rand.Uint64(), GetBlockHeadersPacket: &query, @@ -481,7 +481,7 @@ func (p *Peer) ExpectPeerMessage(code uint64, content types.Transactions) error // specified. func (p *Peer) RequestBodies(hashes []common.Hash) error { p.Log().Debug("Fetching batch of block bodies", "count", len(hashes)) - if p.Version() >= ETH66 { + if p.Version() == ETH66 { return p2p.Send(p.rw, GetBlockBodiesMsg, &GetBlockBodiesPacket66{ RequestId: rand.Uint64(), GetBlockBodiesPacket: hashes, @@ -494,7 +494,7 @@ func (p *Peer) RequestBodies(hashes []common.Hash) error { // data, corresponding to the specified hashes. func (p *Peer) RequestNodeData(hashes []common.Hash) error { p.Log().Debug("Fetching batch of state data", "count", len(hashes)) - if p.Version() >= ETH66 { + if p.Version() == ETH66 { return p2p.Send(p.rw, GetNodeDataMsg, &GetNodeDataPacket66{ RequestId: rand.Uint64(), GetNodeDataPacket: hashes, @@ -506,7 +506,7 @@ func (p *Peer) RequestNodeData(hashes []common.Hash) error { // RequestReceipts fetches a batch of transaction receipts from a remote node. func (p *Peer) RequestReceipts(hashes []common.Hash) error { p.Log().Debug("Fetching batch of receipts", "count", len(hashes)) - if p.Version() >= ETH66 { + if p.Version() == ETH66 { return p2p.Send(p.rw, GetReceiptsMsg, &GetReceiptsPacket66{ RequestId: rand.Uint64(), GetReceiptsPacket: hashes, @@ -518,7 +518,7 @@ func (p *Peer) RequestReceipts(hashes []common.Hash) error { // RequestTxs fetches a batch of transactions from a remote node. func (p *Peer) RequestTxs(hashes []common.Hash) error { p.Log().Debug("Fetching batch of transactions", "count", len(hashes)) - if p.Version() >= ETH66 { + if p.Version() == ETH66 { return p2p.Send(p.rw, GetPooledTransactionsMsg, &GetPooledTransactionsPacket66{ RequestId: rand.Uint64(), GetPooledTransactionsPacket: hashes, diff --git a/eth/quorum_protocol.go b/eth/quorum_protocol.go index fb6ef8da4e..9b7ceeec7e 100644 --- a/eth/quorum_protocol.go +++ b/eth/quorum_protocol.go @@ -3,7 +3,9 @@ package eth import ( "errors" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" ) // Quorum: quorum_protocol enables the eth service to return two different protocols, one for the eth mainnet "eth" service, @@ -26,7 +28,7 @@ var quorumConsensusProtocolVersions []uint // protocol Length describe the number of messages support by the protocol/version map[uint]uint64{Istanbul64: 18, Istanbul99: 18, Istanbul100: 18} var quorumConsensusProtocolLengths map[uint]uint64 -func (s *Ethereum) quorumConsensusProtocols() []p2p.Protocol { +func (s *Ethereum) quorumConsensusProtocols(backend eth.Backend, network uint64, dnsdisc enode.Iterator) []p2p.Protocol { protos := make([]p2p.Protocol, len(quorumConsensusProtocolVersions)) for i, vsn := range quorumConsensusProtocolVersions { // if we have a legacy protocol, e.g. istanbul/99, istanbul/64 then the protocol handler is will be the "eth" @@ -37,14 +39,14 @@ func (s *Ethereum) quorumConsensusProtocols() []p2p.Protocol { if !ok { panic("makeProtocol for unknown version") } - lp := s.handler.makeLegacyProtocol(quorumConsensusProtocolName, vsn, length) + lp := s.handler.makeLegacyProtocol(quorumConsensusProtocolName, vsn, length, backend, network, dnsdisc) protos[i] = lp } else { length, ok := quorumConsensusProtocolLengths[vsn] if !ok { panic("makeQuorumConsensusProtocol for unknown version") } - protos[i] = s.handler.makeQuorumConsensusProtocol(quorumConsensusProtocolName, vsn, length) + protos[i] = s.handler.makeQuorumConsensusProtocol(quorumConsensusProtocolName, vsn, length, backend, network, dnsdisc) } } return protos diff --git a/eth/sync.go b/eth/sync.go index 5cf4564a3d..0551528b23 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -65,7 +65,7 @@ func (h *handler) syncTransactions(p *eth.Peer) { // The eth/65 protocol introduces proper transaction announcements, so instead // of dripping transactions across multiple peers, just send the entire list as // an announcement and let the remote side decide what they need (likely nothing). - if p.Version() >= eth.ETH65 { + if p.Version() == eth.ETH65 { hashes := make([]common.Hash, len(txs)) for i, tx := range txs { hashes[i] = tx.Hash() @@ -96,7 +96,7 @@ func (h *handler) txsyncLoop64() { // send starts a sending a pack of transactions from the sync. send := func(s *txsync) { - if s.p.Version() >= eth.ETH65 { + if s.p.Version() == eth.ETH65 { panic("initial transaction syncer running on eth/65+") } // Fill pack with transactions up to the target size.