From 382508869706ed469aeb0fe2ed14553b70f06d38 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Wed, 28 Jan 2026 13:06:49 +0800 Subject: [PATCH] feat(p2p): swarm POC3 #17041 --- p2p/metrics.go | 8 ++--- p2p/peer.go | 6 +++- p2p/protocols/protocol.go | 7 ++++ p2p/simulations/network.go | 67 +++++++++++++++++++------------------- 4 files changed, 49 insertions(+), 39 deletions(-) diff --git a/p2p/metrics.go b/p2p/metrics.go index 8105e265671e..c02c90207af8 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -79,7 +79,7 @@ func markDialError(err error) { // meteredConn is a wrapper around a network TCP connection that meters both the // inbound and outbound network traffic. type meteredConn struct { - *net.TCPConn // Network connection to wrap with metering + net.Conn } // newMeteredConn creates a new metered connection, also bumping the ingress or @@ -90,13 +90,13 @@ func newMeteredConn(conn net.Conn) net.Conn { if !metrics.Enabled() { return conn } - return &meteredConn{conn.(*net.TCPConn)} + return &meteredConn{Conn: conn} } // Read delegates a network read to the underlying connection, bumping the ingress // traffic meter along the way. func (c *meteredConn) Read(b []byte) (n int, err error) { - n, err = c.TCPConn.Read(b) + n, err = c.Conn.Read(b) ingressTrafficMeter.Mark(int64(n)) return } @@ -104,7 +104,7 @@ func (c *meteredConn) Read(b []byte) (n int, err error) { // Write delegates a network write to the underlying connection, bumping the // egress traffic meter along the way. func (c *meteredConn) Write(b []byte) (n int, err error) { - n, err = c.TCPConn.Write(b) + n, err = c.Conn.Write(b) egressTrafficMeter.Mark(int64(n)) return } diff --git a/p2p/peer.go b/p2p/peer.go index 20bbafb909bf..79eb400c387c 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -33,6 +33,10 @@ import ( "github.com/XinFinOrg/XDPoSChain/rlp" ) +var ( + ErrShuttingDown = errors.New("shutting down") +) + const ( baseProtocolVersion = 5 baseProtocolLength = uint64(16) @@ -408,7 +412,7 @@ func (rw *protoRW) WriteMsg(msg Msg) (err error) { // as well but we don't want to rely on that. rw.werr <- err case <-rw.closed: - err = errors.New("shutting down") + err = ErrShuttingDown } return err } diff --git a/p2p/protocols/protocol.go b/p2p/protocols/protocol.go index de6089a46e22..2d6be56883b4 100644 --- a/p2p/protocols/protocol.go +++ b/p2p/protocols/protocol.go @@ -30,10 +30,12 @@ package protocols import ( "context" "fmt" + "io" "reflect" "sync" "time" + "github.com/XinFinOrg/XDPoSChain/log" "github.com/XinFinOrg/XDPoSChain/metrics" "github.com/XinFinOrg/XDPoSChain/p2p" ) @@ -201,6 +203,11 @@ func NewPeer(p *p2p.Peer, rw p2p.MsgReadWriter, spec *Spec) *Peer { func (p *Peer) Run(handler func(msg interface{}) error) error { for { if err := p.handleIncoming(handler); err != nil { + if err != io.EOF { + metrics.GetOrRegisterCounter("peer.handleincoming.error", nil).Inc(1) + log.Error("peer.handleIncoming", "err", err) + } + return err } } diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index 1a707c9e8861..70c7bc0123e9 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -32,7 +32,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/p2p/simulations/adapters" ) -var dialBanTimeout = 200 * time.Millisecond +var DialBanTimeout = 200 * time.Millisecond // NetworkConfig defines configuration options for starting a Network type NetworkConfig struct { @@ -79,41 +79,25 @@ func (net *Network) Events() *event.Feed { return &net.events } -// NewNode adds a new node to the network with a random ID -func (net *Network) NewNode() (*Node, error) { - conf := adapters.RandomNodeConfig() - conf.Lifecycles = []string{net.DefaultService} - return net.NewNodeWithConfig(conf) -} - // NewNodeWithConfig adds a new node to the network with the given config, // returning an error if a node with the same ID or name already exists func (net *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) { net.lock.Lock() defer net.lock.Unlock() - // create a random ID and PrivateKey if not set - if conf.ID == (discover.NodeID{}) { - c := adapters.RandomNodeConfig() - conf.ID = c.ID - conf.PrivateKey = c.PrivateKey - } - id := conf.ID if conf.Reachable == nil { conf.Reachable = func(otherID discover.NodeID) bool { _, err := net.InitConn(conf.ID, otherID) - return err == nil + if err != nil && bytes.Compare(conf.ID.Bytes(), otherID.Bytes()) < 0 { + return false + } + return true } } - // assign a name to the node if not set - if conf.Name == "" { - conf.Name = fmt.Sprintf("node%02d", len(net.Nodes)+1) - } - // check the node doesn't already exist - if node := net.getNode(id); node != nil { - return nil, fmt.Errorf("node with ID %q already exists", id) + if node := net.getNode(conf.ID); node != nil { + return nil, fmt.Errorf("node with ID %q already exists", conf.ID) } if node := net.getNodeByName(conf.Name); node != nil { return nil, fmt.Errorf("node with name %q already exists", conf.Name) @@ -133,8 +117,8 @@ func (net *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) Node: adapterNode, Config: conf, } - log.Trace(fmt.Sprintf("node %v created", id)) - net.nodeMap[id] = len(net.Nodes) + log.Trace(fmt.Sprintf("node %v created", conf.ID)) + net.nodeMap[conf.ID] = len(net.Nodes) net.Nodes = append(net.Nodes, node) // emit a "control" event @@ -182,7 +166,9 @@ func (net *Network) Start(id discover.NodeID) error { // startWithSnapshots starts the node with the given ID using the give // snapshots func (net *Network) startWithSnapshots(id discover.NodeID, snapshots map[string][]byte) error { - node := net.GetNode(id) + net.lock.Lock() + defer net.lock.Unlock() + node := net.getNode(id) if node == nil { return fmt.Errorf("node %v does not exist", id) } @@ -221,9 +207,13 @@ func (net *Network) watchPeerEvents(id discover.NodeID, events chan *p2p.PeerEve // assume the node is now down net.lock.Lock() + defer net.lock.Unlock() node := net.getNode(id) + if node == nil { + log.Error("Can not find node for id", "id", id) + return + } node.Up = false - net.lock.Unlock() net.events.Send(NewEvent(node)) }() for { @@ -258,7 +248,9 @@ func (net *Network) watchPeerEvents(id discover.NodeID, events chan *p2p.PeerEve // Stop stops the node with the given ID func (net *Network) Stop(id discover.NodeID) error { - node := net.GetNode(id) + net.lock.Lock() + defer net.lock.Unlock() + node := net.getNode(id) if node == nil { return fmt.Errorf("node %v does not exist", id) } @@ -311,7 +303,9 @@ func (net *Network) Disconnect(oneID, otherID discover.NodeID) error { // DidConnect tracks the fact that the "one" node connected to the "other" node func (net *Network) DidConnect(one, other discover.NodeID) error { - conn, err := net.GetOrCreateConn(one, other) + net.lock.Lock() + defer net.lock.Unlock() + conn, err := net.getOrCreateConn(one, other) if err != nil { return fmt.Errorf("connection between %v and %v does not exist", one, other) } @@ -326,7 +320,9 @@ func (net *Network) DidConnect(one, other discover.NodeID) error { // DidDisconnect tracks the fact that the "one" node disconnected from the // "other" node func (net *Network) DidDisconnect(one, other discover.NodeID) error { - conn := net.GetConn(one, other) + net.lock.Lock() + defer net.lock.Unlock() + conn := net.getConn(one, other) if conn == nil { return fmt.Errorf("connection between %v and %v does not exist", one, other) } @@ -334,7 +330,7 @@ func (net *Network) DidDisconnect(one, other discover.NodeID) error { return fmt.Errorf("%v and %v already disconnected", one, other) } conn.Up = false - conn.initiated = time.Now().Add(-dialBanTimeout) + conn.initiated = time.Now().Add(-DialBanTimeout) net.events.Send(NewEvent(conn)) return nil } @@ -475,16 +471,19 @@ func (net *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) { if err != nil { return nil, err } - if time.Since(conn.initiated) < dialBanTimeout { - return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID) - } if conn.Up { return nil, fmt.Errorf("%v and %v already connected", oneID, otherID) } + if time.Since(conn.initiated) < DialBanTimeout { + return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID) + } + err = conn.nodesUp() if err != nil { + log.Trace(fmt.Sprintf("nodes not up: %v", err)) return nil, fmt.Errorf("nodes not up: %v", err) } + log.Debug("InitConn - connection initiated") conn.initiated = time.Now() return conn, nil }