diff --git a/op-node/flags/p2p_flags.go b/op-node/flags/p2p_flags.go index 2bf1c9083cbab..d38abf11480da 100644 --- a/op-node/flags/p2p_flags.go +++ b/op-node/flags/p2p_flags.go @@ -144,7 +144,7 @@ var ( } UserAgent = cli.StringFlag{ Name: "p2p.useragent", - Usage: "User-agent string to share via LibP2P identify. If empty it defaults to 'optimism-VERSIONHERE'.", + Usage: "User-agent string to share via LibP2P identify. If empty it defaults to 'optimism'.", Hidden: true, Required: false, Value: "optimism", diff --git a/op-node/node/node.go b/op-node/node/node.go index 021196880d185..9cdf7eb85cbae 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -34,7 +34,7 @@ type OpNode struct { l2Engines []*driver.Driver // engines to keep synced l2Nodes []*rpc.Client // L2 Execution Engines to close at shutdown server *rpcServer // RPC server hosting the rollup-node API - p2pNode p2p.Node // P2P node functionality + p2pNode *p2p.NodeP2P // P2P node functionality p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer tracer Tracer // tracer to get events for testing/debugging @@ -221,6 +221,9 @@ func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error { return err } n.p2pNode = p2pNode + if n.p2pNode.Dv5Udp() != nil { + go n.p2pNode.DiscoveryProcess(n.resourcesCtx, n.log, &cfg.Rollup, cfg.P2P.TargetPeers()) + } } return nil } diff --git a/op-node/p2p/config.go b/op-node/p2p/config.go index 6ed62612df520..1c7fd220b24fa 100644 --- a/op-node/p2p/config.go +++ b/op-node/p2p/config.go @@ -12,6 +12,8 @@ import ( "strings" "time" + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/libp2p/go-libp2p-core/peer" "github.com/ethereum/go-ethereum/log" @@ -44,7 +46,8 @@ type SetupP2P interface { // Host creates a libp2p host service. Returns nil, nil if p2p is disabled. Host(log log.Logger) (host.Host, error) // Discovery creates a disc-v5 service. Returns nil, nil, nil if discovery is disabled. - Discovery(log log.Logger) (*enode.LocalNode, *discover.UDPv5, error) + Discovery(log log.Logger, rollupCfg *rollup.Config, tcpPort uint16) (*enode.LocalNode, *discover.UDPv5, error) + TargetPeers() uint } // Config sets up a p2p host and discv5 service from configuration. @@ -175,6 +178,10 @@ func NewConfig(ctx *cli.Context) (*Config, error) { return conf, nil } +func (conf *Config) TargetPeers() uint { + return conf.PeersLo +} + func (conf *Config) loadListenOpts(ctx *cli.Context) error { listenIP := ctx.GlobalString(flags.ListenIP.Name) if listenIP != "" { // optional diff --git a/op-node/p2p/discovery.go b/op-node/p2p/discovery.go index ea2b1cc0e30e1..08c814301015e 100644 --- a/op-node/p2p/discovery.go +++ b/op-node/p2p/discovery.go @@ -1,24 +1,70 @@ package p2p import ( + "bytes" + "context" + secureRand "crypto/rand" + "encoding/binary" + "fmt" + "io" + "math/rand" "net" + "time" + "github.com/ethereum-optimism/optimism/op-node/rollup" + gcrypto "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/ethereum/go-ethereum/rlp" + "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/multiformats/go-multiaddr" ) -func (conf *Config) Discovery(log log.Logger) (*enode.LocalNode, *discover.UDPv5, error) { +const ( + discoverIntervalFast = time.Second * 5 + discoverIntervalSlow = time.Second * 20 + connectionIntervalFast = time.Second * 5 + connectionIntervalSlow = time.Second * 20 + connectionWorkerCount = 4 + connectionBufferSize = 10 + discoveredNodesBuffer = 3 + tableKickoffDelay = time.Second * 3 + discoveredAddrTTL = time.Hour * 24 + collectiveDialTimeout = time.Second * 30 +) + +func (conf *Config) Discovery(log log.Logger, rollupCfg *rollup.Config, tcpPort uint16) (*enode.LocalNode, *discover.UDPv5, error) { if conf.NoDiscovery { return nil, nil, nil } - localNode := enode.NewLocalNode(conf.DiscoveryDB, conf.Priv) + priv := *conf.Priv + // use the geth curve definition. Same crypto, but geth needs to detect it as *their* definition of the curve. + priv.Curve = gcrypto.S256() + localNode := enode.NewLocalNode(conf.DiscoveryDB, &priv) if conf.AdvertiseIP != nil { localNode.SetStaticIP(conf.AdvertiseIP) } if conf.AdvertiseUDPPort != 0 { localNode.SetFallbackUDP(int(conf.AdvertiseUDPPort)) } + if conf.AdvertiseTCPPort != 0 { // explicitly advertised port gets priority + localNode.Set(enr.TCP(conf.AdvertiseTCPPort)) + } else if tcpPort != 0 { // otherwise try to pick up whatever port LibP2P binded to (listen port, or dynamically picked) + localNode.Set(enr.TCP(tcpPort)) + } else if conf.ListenTCPPort != 0 { // otherwise default to the port we configured it to listen on + localNode.Set(enr.TCP(conf.ListenTCPPort)) + } else { + return nil, nil, fmt.Errorf("no TCP port to put in discovery record") + } + dat := OptimismENRData{ + chainID: rollupCfg.L2ChainID.Uint64(), + version: 0, + } + localNode.Set(&dat) udpAddr := &net.UDPAddr{ IP: conf.ListenIP, @@ -29,9 +75,13 @@ func (conf *Config) Discovery(log log.Logger) (*enode.LocalNode, *discover.UDPv5 if err != nil { return nil, nil, err } + if udpAddr.Port == 0 { // if we picked a port dynamically, then find the port we got, and update our node record + localUDPAddr := conn.LocalAddr().(*net.UDPAddr) + localNode.SetFallbackUDP(localUDPAddr.Port) + } cfg := discover.Config{ - PrivateKey: conf.Priv, + PrivateKey: &priv, NetRestrict: nil, Bootnodes: conf.Bootnodes, Unhandled: nil, // Not used in dv5 @@ -43,8 +93,294 @@ func (conf *Config) Discovery(log log.Logger) (*enode.LocalNode, *discover.UDPv5 return nil, nil, err } - // TODO: periodically we can pull the external IP from libp2p NAT service, + log.Info("started discovery service", "enr", localNode.Node(), "id", localNode.ID()) + + // TODO: periodically we can pull the external IP and TCP port from libp2p NAT service, // and add it as a statement to keep the localNode accurate (if we trust the NAT device more than the discv5 statements) return localNode, udpV5, nil } + +func enrToAddrInfo(r *enode.Node) (*peer.AddrInfo, error) { + ip := r.IP() + ipScheme := "ip4" + if ip4 := ip.To4(); ip4 == nil { + ipScheme = "ip6" + } else { + ip = ip4 + } + mAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/%s/%s/tcp/%d", ipScheme, ip.String(), r.TCP())) + if err != nil { + return nil, fmt.Errorf("could not construct multi addr: %v", err) + } + pub := r.Pubkey() + peerID, err := peer.IDFromPublicKey((*crypto.Secp256k1PublicKey)(pub)) + if err != nil { + return nil, fmt.Errorf("could not compute peer ID from pubkey for multi-addr: %v", err) + } + return &peer.AddrInfo{ + ID: peerID, + Addrs: []multiaddr.Multiaddr{mAddr}, + }, nil +} + +// The discovery ENRs are just key-value lists, and we filter them by records tagged with the "optimism" key, +// and then check the chain ID and version. +type OptimismENRData struct { + chainID uint64 + version uint64 +} + +func (o *OptimismENRData) ENRKey() string { + return "optimism" +} + +func (o *OptimismENRData) EncodeRLP(w io.Writer) error { + out := make([]byte, 2*binary.MaxVarintLen64) + offset := binary.PutUvarint(out, o.chainID) + offset += binary.PutUvarint(out[offset:], o.version) + out = out[:offset] + // encode as byte-string + return rlp.Encode(w, out) +} + +func (o *OptimismENRData) DecodeRLP(s *rlp.Stream) error { + b, err := s.Bytes() + if err != nil { + return fmt.Errorf("failed to decode outer ENR entry: %v", err) + } + // We don't check the byte length: the below readers are limited, and the ENR itself has size limits. + // Future "optimism" entries may contain additional data, and will be tagged with a newer version etc. + r := bytes.NewReader(b) + chainID, err := binary.ReadUvarint(r) + if err != nil { + return fmt.Errorf("failed to read chain ID var int: %v", err) + } + version, err := binary.ReadUvarint(r) + if err != nil { + return fmt.Errorf("failed to read version var int: %v", err) + } + o.chainID = chainID + o.version = version + return nil +} + +var _ enr.Entry = (*OptimismENRData)(nil) + +func FilterEnodes(log log.Logger, cfg *rollup.Config) func(node *enode.Node) bool { + return func(node *enode.Node) bool { + var dat OptimismENRData + err := node.Load(&dat) + // if the entry does not exist, or if it is invalid, then ignore the node + if err != nil { + log.Debug("discovered node record has no optimism info", "node", node.ID(), "err", err) + return false + } + // check chain ID matches + if cfg.L2ChainID.Uint64() != dat.chainID { + log.Debug("discovered node record has no matching chain ID", "node", node.ID(), "got", dat.chainID, "expected", cfg.L2ChainID.Uint64()) + return false + } + // check version matches + if dat.version != 0 { + log.Debug("discovered node record has no matching version", "node", node.ID(), "got", dat.version, "expected", 0) + return false + } + return true + } +} + +// DiscoveryProcess runs a discovery process that randomly walks the DHT to fill the peerstore, +// and connects to nodes in the peerstore that we are not already connected to. +// Nodes from the peerstore will be shuffled, unsuccessful connection attempts will cause peers to be avoided, +// and only nodes with addresses (under TTL) will be connected to. +func (n *NodeP2P) DiscoveryProcess(ctx context.Context, log log.Logger, cfg *rollup.Config, connectGoal uint) { + if n.dv5Udp == nil { + log.Warn("peer discovery is disabled") + return + } + filter := FilterEnodes(log, cfg) + // We pull nodes from discv5 DHT in random order to find new peers. + // Eventually we'll find a peer record that matches our filter. + randomNodeIter := n.dv5Udp.RandomNodes() + + randomNodeIter = enode.Filter(randomNodeIter, filter) + defer randomNodeIter.Close() + + // We pull from the DHT in a slow/fast interval, depending on the need to find more peers + discoverTicker := time.NewTicker(discoverIntervalFast) + defer discoverTicker.Stop() + + // We connect to the peers we know of to maintain a target, + // but do so with polling to avoid scanning the connection count continuously + connectTicker := time.NewTicker(connectionIntervalFast) + defer connectTicker.Stop() + + // We can go faster/slower depending on the need + slower := func() { + discoverTicker.Reset(discoverIntervalSlow) + connectTicker.Reset(connectionIntervalSlow) + } + faster := func() { + discoverTicker.Reset(discoverIntervalFast) + connectTicker.Reset(connectionIntervalFast) + } + + // We try to connect to peers in parallel: some may be slow to respond + connAttempts := make(chan peer.ID, connectionBufferSize) + connectWorker := func(ctx context.Context) { + for { + id, ok := <-connAttempts + if !ok { + return + } + addrs := n.Host().Peerstore().Addrs(id) + log.Info("attempting connection", "peer", id) + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + err := n.Host().Connect(ctx, peer.AddrInfo{ID: id, Addrs: addrs}) + cancel() + if err != nil { + log.Debug("failed connection attempt", "peer", id, "err", err) + } + } + } + + // stops all the workers when we are done + defer close(connAttempts) + // start workers to try connect to peers + for i := 0; i < connectionWorkerCount; i++ { + go connectWorker(ctx) + } + + // buffer discovered nodes, so don't stall on the dht iteration as much + randomNodesCh := make(chan *enode.Node, discoveredNodesBuffer) + defer close(randomNodesCh) + bufferNodes := func() { + for { + select { + case <-discoverTicker.C: + if !randomNodeIter.Next() { + log.Info("discv5 DHT iteration stopped, closing peer discovery now...") + return + } + found := randomNodeIter.Node() + select { + // block once we have found enough nodes + case randomNodesCh <- found: + continue + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + } + // Walk the DHT in parallel, the discv5 interface does not use channels for the iteration + go bufferNodes() + + // Kick off by trying the nodes we have in our table (previous nodes from last run and/or bootnodes) + go func() { + <-time.After(tableKickoffDelay) + // At the start we might have trouble walking the DHT, + // but we do have a table with some nodes, + // so take the table and feed it into the discovery process + for _, rec := range n.dv5Udp.AllNodes() { + if filter(rec) { + select { + case randomNodesCh <- rec: + continue + case <-ctx.Done(): + return + } + } + } + }() + + pstore := n.Host().Peerstore() + for { + select { + case <-ctx.Done(): + log.Info("stopped peer discovery") + return // no ctx error, expected close + case found := <-randomNodesCh: + var dat OptimismENRData + if err := found.Load(&dat); err != nil { // we already filtered on chain ID and version + continue + } + info, err := enrToAddrInfo(found) + if err != nil { + continue + } + // We add the addresses to the peerstore, and update the address TTL. + //After that we stop using the address, assuming it may not be valid anymore (until we rediscover the node) + pstore.AddAddrs(info.ID, info.Addrs, discoveredAddrTTL) + _ = pstore.AddPubKey(info.ID, (*crypto.Secp256k1PublicKey)(found.Pubkey())) + // Tag the peer, we'd rather have the connection manager prune away old peers, + // or peers on different chains, or anyone we have not seen via discovery. + // There is no tag score decay yet, so just set it to 42. + n.ConnectionManager().TagPeer(info.ID, fmt.Sprintf("optimism-%d-%d", dat.chainID, dat.version), 42) + log.Debug("discovered peer", "peer", info.ID, "nodeID", found.ID(), "addr", info.Addrs[0]) + case <-connectTicker.C: + connected := n.Host().Network().Peers() + log.Debug("peering tick", "connected", len(connected), + "advertised_udp", n.dv5Local.Node().UDP(), + "advertised_tcp", n.dv5Local.Node().TCP(), + "advertised_ip", n.dv5Local.Node().IP()) + if uint(len(connected)) < connectGoal { + // Start looking for more peers more actively again + faster() + + peersWithAddrs := n.Host().Peerstore().PeersWithAddrs() + if err := shufflePeers(peersWithAddrs); err != nil { + continue + } + + existing := make(map[peer.ID]struct{}) + for _, p := range connected { + existing[p] = struct{}{} + } + + // Keep using these peers, and don't try new discovery/connections. + // We don't need to search for more peers and try new connections if we already have plenty + ctx, cancel := context.WithTimeout(ctx, collectiveDialTimeout) + peerLoop: + for _, id := range peersWithAddrs { + // never dial ourselves + if n.Host().ID() == id { + continue + } + // skip peers that we are already connected to + if _, ok := existing[id]; ok { + continue + } + // skip peers that we were just connected to + if n.Host().Network().Connectedness(id) == network.CannotConnect { + continue + } + // schedule, if there is still space to schedule (this may block) + select { + case connAttempts <- id: + case <-ctx.Done(): + break peerLoop + } + } + cancel() + } else { + // we have enough connections, slow down actively filling the peerstore + slower() + } + } + } +} + +// shuffle the slice of peer IDs in-place with a RNG seeded by secure randomness. +func shufflePeers(ids peer.IDSlice) error { + var x [8]byte // shuffling is not critical, just need to avoid basic predictability by outside peers + if _, err := io.ReadFull(secureRand.Reader, x[:]); err != nil { + return err + } + rng := rand.New(rand.NewSource(int64(binary.LittleEndian.Uint64(x[:])))) + rng.Shuffle(len(ids), ids.Swap) + return nil +} diff --git a/op-node/p2p/host_test.go b/op-node/p2p/host_test.go index e218762686e3f..c1f4ddabc31c8 100644 --- a/op-node/p2p/host_test.go +++ b/op-node/p2p/host_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/ecdsa" "crypto/rand" + "math/big" "net" "testing" "time" @@ -12,6 +13,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rpc" ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/sync" @@ -222,6 +224,123 @@ func TestP2PFull(t *testing.T) { require.NoError(t, p2pClientA.UnprotectPeer(ctx, hostB.ID())) } +func TestDiscovery(t *testing.T) { + pA, _, err := crypto.GenerateSecp256k1Key(rand.Reader) + require.NoError(t, err, "failed to generate new p2p priv key") + pB, _, err := crypto.GenerateSecp256k1Key(rand.Reader) + require.NoError(t, err, "failed to generate new p2p priv key") + pC, _, err := crypto.GenerateSecp256k1Key(rand.Reader) + require.NoError(t, err, "failed to generate new p2p priv key") + + logA := testlog.Logger(t, log.LvlError).New("host", "A") + logB := testlog.Logger(t, log.LvlError).New("host", "B") + logC := testlog.Logger(t, log.LvlError).New("host", "C") + + mplexC, err := mplexC() + require.NoError(t, err) + yamuxC, err := yamuxC() + require.NoError(t, err) + noiseC, err := noiseC() + require.NoError(t, err) + tlsC, err := tlsC() + require.NoError(t, err) + + discDBA, err := enode.OpenDB("") // "" = memory db + require.NoError(t, err) + discDBB, err := enode.OpenDB("") + require.NoError(t, err) + discDBC, err := enode.OpenDB("") + require.NoError(t, err) + + rollupCfg := &rollup.Config{L2ChainID: big.NewInt(901)} + + confA := Config{ + Priv: (*ecdsa.PrivateKey)((pA).(*crypto.Secp256k1PrivateKey)), + DisableP2P: false, + NoDiscovery: false, + AdvertiseIP: net.IP{127, 0, 0, 1}, + ListenUDPPort: 0, // bind to any available port + ListenIP: net.IP{127, 0, 0, 1}, + ListenTCPPort: 0, // bind to any available port + StaticPeers: nil, + HostMux: []lconf.MsMuxC{yamuxC, mplexC}, + HostSecurity: []lconf.MsSecC{noiseC, tlsC}, + NoTransportSecurity: false, + PeersLo: 1, + PeersHi: 10, + PeersGrace: time.Second * 10, + NAT: false, + UserAgent: "optimism-testing", + TimeoutNegotiation: time.Second * 2, + TimeoutAccept: time.Second * 2, + TimeoutDial: time.Second * 2, + Store: sync.MutexWrap(ds.NewMapDatastore()), + DiscoveryDB: discDBA, + ConnGater: DefaultConnGater, + ConnMngr: DefaultConnManager, + } + // copy config A, and change the settings for B + confB := confA + confB.Priv = (*ecdsa.PrivateKey)((pB).(*crypto.Secp256k1PrivateKey)) + confB.Store = sync.MutexWrap(ds.NewMapDatastore()) + confB.DiscoveryDB = discDBB + + resourcesCtx, resourcesCancel := context.WithCancel(context.Background()) + defer resourcesCancel() + + nodeA, err := NewNodeP2P(context.Background(), rollupCfg, logA, &confA, &mockGossipIn{}) + require.NoError(t, err) + defer nodeA.Close() + hostA := nodeA.Host() + go nodeA.DiscoveryProcess(resourcesCtx, logA, rollupCfg, 10) + + // Add A as bootnode to B + confB.Bootnodes = []*enode.Node{nodeA.Dv5Udp().Self()} + // Copy B config to C, and ensure they have a different priv / peerstore + confC := confB + confC.Priv = (*ecdsa.PrivateKey)((pC).(*crypto.Secp256k1PrivateKey)) + confC.Store = sync.MutexWrap(ds.NewMapDatastore()) + confB.DiscoveryDB = discDBC + + // Start B + nodeB, err := NewNodeP2P(context.Background(), rollupCfg, logB, &confB, &mockGossipIn{}) + require.NoError(t, err) + defer nodeB.Close() + hostB := nodeB.Host() + go nodeB.DiscoveryProcess(resourcesCtx, logB, rollupCfg, 10) + + // Track connections to B + connsB := make(chan network.Conn, 2) + hostB.Network().Notify(&network.NotifyBundle{ + ConnectedF: func(n network.Network, conn network.Conn) { + log.Info("connection to B", "peer", conn.RemotePeer()) + connsB <- conn + }}) + + // Start C + nodeC, err := NewNodeP2P(context.Background(), rollupCfg, logC, &confC, &mockGossipIn{}) + require.NoError(t, err) + defer nodeC.Close() + hostC := nodeC.Host() + go nodeC.DiscoveryProcess(resourcesCtx, logC, rollupCfg, 10) + + // B and C don't know each other yet, but both have A as a bootnode. + // It should only be a matter of time for them to connect, if they discover each other via A. + var firstPeersOfB []peer.ID + for i := 0; i < 2; i++ { + select { + case <-time.After(time.Second * 30): + t.Fatal("failed to get connection to B in time") + case c := <-connsB: + firstPeersOfB = append(firstPeersOfB, c.RemotePeer()) + } + } + // B should be connected to the bootnode it used (it's a valid optimism node to connect to here) + require.Contains(t, firstPeersOfB, hostA.ID()) + // C should be connected, although this one might take more time to discover + require.Contains(t, firstPeersOfB, hostC.ID()) +} + // Most tests should use mocknets instead of using the actual local host network func TestP2PMocknet(t *testing.T) { mnet, err := mocknet.FullMeshConnected(3) diff --git a/op-node/p2p/node.go b/op-node/p2p/node.go index d98b8919cb6aa..77636e8fb16f1 100644 --- a/op-node/p2p/node.go +++ b/op-node/p2p/node.go @@ -4,6 +4,9 @@ import ( "context" "errors" "fmt" + "strconv" + + ma "github.com/multiformats/go-multiaddr" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum/go-ethereum/log" @@ -17,13 +20,14 @@ import ( ) type NodeP2P struct { - host host.Host // p2p host (optional, may be nil) - gater ConnectionGater // p2p gater, to ban/unban peers with, may be nil even with p2p enabled - connMgr connmgr.ConnManager // p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled - dv5Local *enode.LocalNode // p2p discovery identity (optional, may be nil) - dv5Udp *discover.UDPv5 // p2p discovery service (optional, may be nil) - gs *pubsub.PubSub // p2p gossip router (optional, may be nil) - gsOut GossipOut // p2p gossip application interface for publishing (optional, may be nil) + host host.Host // p2p host (optional, may be nil) + gater ConnectionGater // p2p gater, to ban/unban peers with, may be nil even with p2p enabled + connMgr connmgr.ConnManager // p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled + // the below components are all optional, and may be nil. They require the host to not be nil. + dv5Local *enode.LocalNode // p2p discovery identity + dv5Udp *discover.UDPv5 // p2p discovery service + gs *pubsub.PubSub // p2p gossip router + gsOut GossipOut // p2p gossip application interface for publishing } func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn) (*NodeP2P, error) { @@ -46,12 +50,6 @@ func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log. func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn) error { var err error - // All nil if disabled. - n.dv5Local, n.dv5Udp, err = setup.Discovery(log.New("p2p", "discv5")) - if err != nil { - return fmt.Errorf("failed to start discv5: %v", err) - } - // nil if disabled. n.host, err = setup.Host(log) if err != nil { @@ -81,6 +79,17 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l return fmt.Errorf("failed to join blocks gossip topic: %v", err) } log.Info("started p2p host", "addrs", n.host.Addrs(), "peerID", n.host.ID().Pretty()) + + tcpPort, err := FindActiveTCPPort(n.host) + if err != nil { + log.Warn("failed to find what TCP port p2p is binded to", "err", err) + } + + // All nil if disabled. + n.dv5Local, n.dv5Udp, err = setup.Discovery(log.New("p2p", "discv5"), rollupCfg, tcpPort) + if err != nil { + return fmt.Errorf("failed to start discv5: %v", err) + } } return nil } @@ -130,3 +139,20 @@ func (n *NodeP2P) Close() error { } return result.ErrorOrNil() } + +func FindActiveTCPPort(h host.Host) (uint16, error) { + var tcpPort uint16 + for _, addr := range h.Addrs() { + tcpPortStr, err := addr.ValueForProtocol(ma.P_TCP) + if err != nil { + continue + } + v, err := strconv.ParseUint(tcpPortStr, 10, 16) + if err != nil { + continue + } + tcpPort = uint16(v) + break + } + return tcpPort, nil +} diff --git a/op-node/p2p/prepared.go b/op-node/p2p/prepared.go index 72c3a18b07b08..9e269158a4f7d 100644 --- a/op-node/p2p/prepared.go +++ b/op-node/p2p/prepared.go @@ -4,6 +4,10 @@ import ( "errors" "fmt" + "github.com/ethereum/go-ethereum/p2p/enr" + + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/enode" @@ -20,6 +24,10 @@ type Prepared struct { var _ SetupP2P = (*Prepared)(nil) +func (p *Prepared) TargetPeers() uint { + return 20 +} + func (p *Prepared) Check() error { if (p.LocalNode == nil) != (p.UDPv5 == nil) { return fmt.Errorf("inconsistent discv5 setup: %v <> %v", p.LocalNode, p.UDPv5) @@ -36,6 +44,16 @@ func (p *Prepared) Host(log log.Logger) (host.Host, error) { } // Discovery creates a disc-v5 service. Returns nil, nil, nil if discovery is disabled. -func (p *Prepared) Discovery(log log.Logger) (*enode.LocalNode, *discover.UDPv5, error) { +func (p *Prepared) Discovery(log log.Logger, rollupCfg *rollup.Config, tcpPort uint16) (*enode.LocalNode, *discover.UDPv5, error) { + if p.LocalNode != nil { + dat := OptimismENRData{ + chainID: rollupCfg.L2ChainID.Uint64(), + version: 0, + } + p.LocalNode.Set(&dat) + if tcpPort != 0 { + p.LocalNode.Set(enr.TCP(tcpPort)) + } + } return p.LocalNode, p.UDPv5, nil } diff --git a/op-node/p2p/rpc_server.go b/op-node/p2p/rpc_server.go index e0f2c0d4419a4..8c1def0cfc34b 100644 --- a/op-node/p2p/rpc_server.go +++ b/op-node/p2p/rpc_server.go @@ -5,7 +5,6 @@ import ( "crypto/ecdsa" "errors" "fmt" - "io" "net" "time" @@ -26,8 +25,6 @@ import ( // TODO: dynamic peering // - req-resp protocol to ensure peers from a different chain learn they shouldn't be connected // - banning peers based on score -// - store enode in peerstore in dynamic-peering background process -// - peers must be tagged with the "optimism" tag and marked with high value if the chain ID matches var ( DisabledDiscovery = errors.New("discovery disabled") @@ -50,7 +47,6 @@ type Node interface { ConnectionGater() ConnectionGater // ConnectionManager returns the connection manager, to protect peers with, may be nil ConnectionManager() connmgr.ConnManager - io.Closer } type APIBackend struct { diff --git a/specs/rollup-node-p2p.md b/specs/rollup-node-p2p.md index 97c94f4d67b79..ba2a1a0cb8cca 100644 --- a/specs/rollup-node-p2p.md +++ b/specs/rollup-node-p2p.md @@ -86,10 +86,10 @@ The Ethereum Node Record (ENR) for an Optimism rollup node must contain the foll - A UDP port (`udp` field) representing the local discv5 listening port. - An Optimism (`optimism` field) L2 network identifier -The `optimism` value is encoded as the concatenation of: +The `optimism` value is encoded as a single RLP `bytes` value, the concatenation of: -- chain ID (`varint`) -- fork ID (`varint`) +- chain ID (`unsigned varint`) +- fork ID (`unsigned varint`) Note that DiscV5 is a shared DHT (Distributed Hash Table): the L1 consensus and execution nodes, as well as testnet nodes, and even external IOT nodes, all communicate records in this large common DHT.