Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func markDialError(err error) {
// meteredConn is a wrapper around a network TCP connection that meters both the
// inbound and outbound network traffic.
type meteredConn struct {
*net.TCPConn // Network connection to wrap with metering
net.Conn
}

// newMeteredConn creates a new metered connection, also bumping the ingress or
Expand All @@ -90,21 +90,21 @@ func newMeteredConn(conn net.Conn) net.Conn {
if !metrics.Enabled() {
return conn
}
return &meteredConn{conn.(*net.TCPConn)}
return &meteredConn{Conn: conn}
}

// Read delegates a network read to the underlying connection, bumping the ingress
// traffic meter along the way.
func (c *meteredConn) Read(b []byte) (n int, err error) {
n, err = c.TCPConn.Read(b)
n, err = c.Conn.Read(b)
ingressTrafficMeter.Mark(int64(n))
return
}

// Write delegates a network write to the underlying connection, bumping the
// egress traffic meter along the way.
func (c *meteredConn) Write(b []byte) (n int, err error) {
n, err = c.TCPConn.Write(b)
n, err = c.Conn.Write(b)
egressTrafficMeter.Mark(int64(n))
return
}
6 changes: 5 additions & 1 deletion p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ import (
"github.com/XinFinOrg/XDPoSChain/rlp"
)

var (
ErrShuttingDown = errors.New("shutting down")
)

const (
baseProtocolVersion = 5
baseProtocolLength = uint64(16)
Expand Down Expand Up @@ -408,7 +412,7 @@ func (rw *protoRW) WriteMsg(msg Msg) (err error) {
// as well but we don't want to rely on that.
rw.werr <- err
case <-rw.closed:
err = errors.New("shutting down")
err = ErrShuttingDown
}
return err
}
Expand Down
7 changes: 7 additions & 0 deletions p2p/protocols/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ package protocols
import (
"context"
"fmt"
"io"
"reflect"
"sync"
"time"

"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/metrics"
"github.com/XinFinOrg/XDPoSChain/p2p"
)
Expand Down Expand Up @@ -201,6 +203,11 @@ func NewPeer(p *p2p.Peer, rw p2p.MsgReadWriter, spec *Spec) *Peer {
func (p *Peer) Run(handler func(msg interface{}) error) error {
for {
if err := p.handleIncoming(handler); err != nil {
if err != io.EOF {
metrics.GetOrRegisterCounter("peer.handleincoming.error", nil).Inc(1)
log.Error("peer.handleIncoming", "err", err)
}

return err
}
}
Expand Down
67 changes: 33 additions & 34 deletions p2p/simulations/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/p2p/simulations/adapters"
)

var dialBanTimeout = 200 * time.Millisecond
var DialBanTimeout = 200 * time.Millisecond

// NetworkConfig defines configuration options for starting a Network
type NetworkConfig struct {
Expand Down Expand Up @@ -79,41 +79,25 @@ func (net *Network) Events() *event.Feed {
return &net.events
}

// NewNode adds a new node to the network with a random ID
func (net *Network) NewNode() (*Node, error) {
conf := adapters.RandomNodeConfig()
conf.Lifecycles = []string{net.DefaultService}
return net.NewNodeWithConfig(conf)
}

// NewNodeWithConfig adds a new node to the network with the given config,
// returning an error if a node with the same ID or name already exists
func (net *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) {
net.lock.Lock()
defer net.lock.Unlock()

// create a random ID and PrivateKey if not set
if conf.ID == (discover.NodeID{}) {
c := adapters.RandomNodeConfig()
conf.ID = c.ID
conf.PrivateKey = c.PrivateKey
}
id := conf.ID
if conf.Reachable == nil {
conf.Reachable = func(otherID discover.NodeID) bool {
_, err := net.InitConn(conf.ID, otherID)
return err == nil
if err != nil && bytes.Compare(conf.ID.Bytes(), otherID.Bytes()) < 0 {
return false
}
return true
}
}

// assign a name to the node if not set
if conf.Name == "" {
conf.Name = fmt.Sprintf("node%02d", len(net.Nodes)+1)
}

// check the node doesn't already exist
if node := net.getNode(id); node != nil {
return nil, fmt.Errorf("node with ID %q already exists", id)
if node := net.getNode(conf.ID); node != nil {
return nil, fmt.Errorf("node with ID %q already exists", conf.ID)
}
if node := net.getNodeByName(conf.Name); node != nil {
return nil, fmt.Errorf("node with name %q already exists", conf.Name)
Expand All @@ -133,8 +117,8 @@ func (net *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error)
Node: adapterNode,
Config: conf,
}
log.Trace(fmt.Sprintf("node %v created", id))
net.nodeMap[id] = len(net.Nodes)
log.Trace(fmt.Sprintf("node %v created", conf.ID))
net.nodeMap[conf.ID] = len(net.Nodes)
net.Nodes = append(net.Nodes, node)

// emit a "control" event
Expand Down Expand Up @@ -182,7 +166,9 @@ func (net *Network) Start(id discover.NodeID) error {
// startWithSnapshots starts the node with the given ID using the give
// snapshots
func (net *Network) startWithSnapshots(id discover.NodeID, snapshots map[string][]byte) error {
node := net.GetNode(id)
net.lock.Lock()
defer net.lock.Unlock()
node := net.getNode(id)
if node == nil {
return fmt.Errorf("node %v does not exist", id)
}
Expand Down Expand Up @@ -221,9 +207,13 @@ func (net *Network) watchPeerEvents(id discover.NodeID, events chan *p2p.PeerEve

// assume the node is now down
net.lock.Lock()
defer net.lock.Unlock()
node := net.getNode(id)
if node == nil {
log.Error("Can not find node for id", "id", id)
return
}
node.Up = false
net.lock.Unlock()
net.events.Send(NewEvent(node))
}()
for {
Expand Down Expand Up @@ -258,7 +248,9 @@ func (net *Network) watchPeerEvents(id discover.NodeID, events chan *p2p.PeerEve

// Stop stops the node with the given ID
func (net *Network) Stop(id discover.NodeID) error {
node := net.GetNode(id)
net.lock.Lock()
defer net.lock.Unlock()
node := net.getNode(id)
if node == nil {
return fmt.Errorf("node %v does not exist", id)
}
Expand Down Expand Up @@ -311,7 +303,9 @@ func (net *Network) Disconnect(oneID, otherID discover.NodeID) error {

// DidConnect tracks the fact that the "one" node connected to the "other" node
func (net *Network) DidConnect(one, other discover.NodeID) error {
conn, err := net.GetOrCreateConn(one, other)
net.lock.Lock()
defer net.lock.Unlock()
conn, err := net.getOrCreateConn(one, other)
if err != nil {
return fmt.Errorf("connection between %v and %v does not exist", one, other)
}
Expand All @@ -326,15 +320,17 @@ func (net *Network) DidConnect(one, other discover.NodeID) error {
// DidDisconnect tracks the fact that the "one" node disconnected from the
// "other" node
func (net *Network) DidDisconnect(one, other discover.NodeID) error {
conn := net.GetConn(one, other)
net.lock.Lock()
defer net.lock.Unlock()
conn := net.getConn(one, other)
if conn == nil {
return fmt.Errorf("connection between %v and %v does not exist", one, other)
}
if !conn.Up {
return fmt.Errorf("%v and %v already disconnected", one, other)
}
conn.Up = false
conn.initiated = time.Now().Add(-dialBanTimeout)
conn.initiated = time.Now().Add(-DialBanTimeout)
net.events.Send(NewEvent(conn))
return nil
}
Expand Down Expand Up @@ -475,16 +471,19 @@ func (net *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) {
if err != nil {
return nil, err
}
if time.Since(conn.initiated) < dialBanTimeout {
return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID)
}
if conn.Up {
return nil, fmt.Errorf("%v and %v already connected", oneID, otherID)
}
if time.Since(conn.initiated) < DialBanTimeout {
return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID)
}

err = conn.nodesUp()
if err != nil {
log.Trace(fmt.Sprintf("nodes not up: %v", err))
return nil, fmt.Errorf("nodes not up: %v", err)
}
log.Debug("InitConn - connection initiated")
conn.initiated = time.Now()
return conn, nil
}
Expand Down
Loading