diff --git a/p2p/protocols/protocol.go b/p2p/protocols/protocol.go index 96a01d95150c..de6089a46e22 100644 --- a/p2p/protocols/protocol.go +++ b/p2p/protocols/protocol.go @@ -32,7 +32,9 @@ import ( "fmt" "reflect" "sync" + "time" + "github.com/XinFinOrg/XDPoSChain/metrics" "github.com/XinFinOrg/XDPoSChain/p2p" ) @@ -216,6 +218,8 @@ func (p *Peer) Drop(err error) { // this low level call will be wrapped by libraries providing routed or broadcast sends // but often just used to forward and push messages to directly connected peers func (p *Peer) Send(msg interface{}) error { + defer metrics.GetOrRegisterResettingTimer("peer.send_t", nil).UpdateSince(time.Now()) + metrics.GetOrRegisterCounter("peer.send", nil).Inc(1) code, found := p.spec.GetCode(msg) if !found { return errorf(ErrInvalidMsgType, "%v", code) diff --git a/p2p/rlpx_test.go b/p2p/rlpx_test.go index dcfa18b20531..de5afa976587 100644 --- a/p2p/rlpx_test.go +++ b/p2p/rlpx_test.go @@ -32,6 +32,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/crypto" "github.com/XinFinOrg/XDPoSChain/crypto/ecies" "github.com/XinFinOrg/XDPoSChain/p2p/discover" + "github.com/XinFinOrg/XDPoSChain/p2p/simulations/pipes" "github.com/XinFinOrg/XDPoSChain/rlp" "github.com/davecgh/go-spew/spew" "golang.org/x/crypto/sha3" @@ -158,7 +159,7 @@ func TestProtocolHandshake(t *testing.T) { wg sync.WaitGroup ) - fd0, fd1, err := tcpPipe() + fd0, fd1, err := pipes.TCPPipe() if err != nil { t.Fatal(err) } @@ -597,31 +598,3 @@ func TestHandshakeForwardCompatibility(t *testing.T) { t.Errorf("ingress-mac('foo') mismatch:\ngot %x\nwant %x", fooIngressHash, wantFooIngressHash) } } - -// tcpPipe creates an in process full duplex pipe based on a localhost TCP socket -func tcpPipe() (net.Conn, net.Conn, error) { - l, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return nil, nil, err - } - defer l.Close() - - var aconn net.Conn - aerr := make(chan error, 1) - go func() { - var err error - aconn, err = l.Accept() - aerr <- err - }() - - dconn, err := net.Dial("tcp", l.Addr().String()) - if err != nil { - <-aerr - return nil, nil, err - } - if err := <-aerr; err != nil { - dconn.Close() - return nil, nil, err - } - return aconn, dconn, nil -} diff --git a/p2p/simulations/adapters/docker.go b/p2p/simulations/adapters/docker.go index ee1f211bc168..dc67f1f8a279 100644 --- a/p2p/simulations/adapters/docker.go +++ b/p2p/simulations/adapters/docker.go @@ -32,6 +32,10 @@ import ( "github.com/docker/docker/pkg/reexec" ) +var ( + ErrLinuxOnly = errors.New("DockerAdapter can only be used on Linux as it uses the current binary (which must be a Linux binary)") +) + // DockerAdapter is a NodeAdapter which runs simulation nodes inside Docker // containers. // @@ -51,7 +55,7 @@ func NewDockerAdapter() (*DockerAdapter, error) { // It is reasonable to require this because the caller can just // compile the current binary in a Docker container. if runtime.GOOS != "linux" { - return nil, errors.New("DockerAdapter can only be used on Linux as it uses the current binary (which must be a Linux binary)") + return nil, ErrLinuxOnly } if err := buildDockerImage(); err != nil { @@ -95,6 +99,10 @@ func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) { conf.Stack.P2P.NAT = nil conf.Stack.Logger = log.New("node.id", config.ID.String()) + // listen on all interfaces on a given port, which we set when we + // initialise NodeConfig (usually a random port) + conf.Stack.P2P.ListenAddr = fmt.Sprintf(":%d", config.Port) + node := &DockerNode{ ExecNode: ExecNode{ ID: config.ID, diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index ee83939c6887..8b342e725603 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -17,6 +17,7 @@ package adapters import ( + "bufio" "context" "crypto/ecdsa" "encoding/json" @@ -103,9 +104,9 @@ func (e *ExecAdapter) NewNode(config *NodeConfig) (Node, error) { conf.Stack.P2P.NoDiscovery = true conf.Stack.P2P.NAT = nil - // listen on a random localhost port (we'll get the actual port after - // starting the node through the RPC admin.nodeInfo method) - conf.Stack.P2P.ListenAddr = "127.0.0.1:0" + // listen on a localhost port, which we set when we + // initialise NodeConfig (usually a random port) + conf.Stack.P2P.ListenAddr = fmt.Sprintf(":%d", config.Port) node := &ExecNode{ ID: config.ID, @@ -189,9 +190,23 @@ func (n *ExecNode) Start(snapshots map[string][]byte) (err error) { n.Cmd = cmd // read the WebSocket address from the stderr logs - wsAddr, err := findWSAddr(stderrR, 10*time.Second) - if err != nil { - return fmt.Errorf("error getting WebSocket address: %s", err) + var wsAddr string + wsAddrC := make(chan string) + go func() { + s := bufio.NewScanner(stderrR) + for s.Scan() { + if strings.Contains(s.Text(), "WebSocket endpoint opened") { + wsAddrC <- wsAddrPattern.FindString(s.Text()) + } + } + }() + select { + case wsAddr = <-wsAddrC: + if wsAddr == "" { + return errors.New("failed to read WebSocket address from stderr") + } + case <-time.After(10 * time.Second): + return errors.New("timed out waiting for WebSocket address on stderr") } // create the RPC client and load the node info @@ -321,6 +336,21 @@ type execNodeConfig struct { PeerAddrs map[string]string `json:"peer_addrs,omitempty"` } +// ExternalIP gets an external IP address so that Enode URL is usable +func ExternalIP() net.IP { + addrs, err := net.InterfaceAddrs() + if err != nil { + log.Crit("error getting IP address", "err", err) + } + for _, addr := range addrs { + if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() && !ip.IP.IsLinkLocalUnicast() { + return ip.IP + } + } + log.Warn("unable to determine explicit IP address, falling back to loopback") + return net.IP{127, 0, 0, 1} +} + func initLogging() { // Initialize the logging by default first. var innerHandler slog.Handler @@ -377,25 +407,11 @@ func execP2PNode() { conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey conf.Stack.Logger = log.New("node.id", conf.Node.ID.String()) - // use explicit IP address in ListenAddr so that Enode URL is usable - externalIP := func() string { - addrs, err := net.InterfaceAddrs() - if err != nil { - log.Crit("error getting IP address", "err", err) - } - for _, addr := range addrs { - if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() { - return ip.IP.String() - } - } - log.Crit("unable to determine explicit IP address") - return "" - } if strings.HasPrefix(conf.Stack.P2P.ListenAddr, ":") { - conf.Stack.P2P.ListenAddr = externalIP() + conf.Stack.P2P.ListenAddr + conf.Stack.P2P.ListenAddr = ExternalIP().String() + conf.Stack.P2P.ListenAddr } if conf.Stack.WSHost == "0.0.0.0" { - conf.Stack.WSHost = externalIP() + conf.Stack.WSHost = ExternalIP().String() } // initialize the devp2p stack diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go index be7124dae4e1..22afb3a7894c 100644 --- a/p2p/simulations/adapters/inproc.go +++ b/p2p/simulations/adapters/inproc.go @@ -28,13 +28,15 @@ import ( "github.com/XinFinOrg/XDPoSChain/node" "github.com/XinFinOrg/XDPoSChain/p2p" "github.com/XinFinOrg/XDPoSChain/p2p/discover" + "github.com/XinFinOrg/XDPoSChain/p2p/simulations/pipes" "github.com/XinFinOrg/XDPoSChain/rpc" "github.com/gorilla/websocket" ) // SimAdapter is a NodeAdapter which creates in-memory simulation nodes and -// connects them using in-memory net.Pipe connections +// connects them using net.Pipe type SimAdapter struct { + pipe func() (net.Conn, net.Conn, error) mtx sync.RWMutex nodes map[discover.NodeID]*SimNode lifecycles LifecycleConstructors @@ -43,10 +45,18 @@ type SimAdapter struct { // NewSimAdapter creates a SimAdapter which is capable of running in-memory // simulation nodes running any of the given services (the services to run on a // particular node are passed to the NewNode function in the NodeConfig) +// the adapter uses a net.Pipe for in-memory simulated network connections func NewSimAdapter(services LifecycleConstructors) *SimAdapter { return &SimAdapter{ - // nodes: make(map[discover.NodeID]*SimNode), - // lifecycles: lifecycles, + pipe: pipes.NetPipe, + nodes: make(map[discover.NodeID]*SimNode), + lifecycles: services, + } +} + +func NewTCPAdapter(services LifecycleConstructors) *SimAdapter { + return &SimAdapter{ + pipe: pipes.TCPPipe, nodes: make(map[discover.NodeID]*SimNode), lifecycles: services, } @@ -84,7 +94,7 @@ func (sa *SimAdapter) NewNode(config *NodeConfig) (Node, error) { MaxPeers: math.MaxInt32, NoDiscovery: true, Dialer: sa, - EnableMsgEvents: true, + EnableMsgEvents: config.EnableMsgEvents, }, Logger: log.New("node.id", id), }) @@ -105,7 +115,7 @@ func (sa *SimAdapter) NewNode(config *NodeConfig) (Node, error) { } // Dial implements the p2p.NodeDialer interface by connecting to the node using -// an in-memory net.Pipe connection +// an in-memory net.Pipe func (sa *SimAdapter) Dial(dest *discover.Node) (conn net.Conn, err error) { node, ok := sa.GetNode(dest.ID) if !ok { @@ -118,7 +128,14 @@ func (sa *SimAdapter) Dial(dest *discover.Node) (conn net.Conn, err error) { if srv == nil { return nil, fmt.Errorf("node not running: %s", dest.ID) } - pipe1, pipe2 := net.Pipe() + // SimAdapter.pipe is net.Pipe (NewSimAdapter) + pipe1, pipe2, err := sa.pipe() + if err != nil { + return nil, err + } + // this is simulated 'listening' + // asynchronously call the dialed destintion node's p2p server + // to set up connection on the 'listening' side go srv.SetupConn(pipe1, 0, nil) node.connected[dest.ID] = true return pipe2, nil @@ -143,8 +160,8 @@ func (sa *SimAdapter) GetNode(id discover.NodeID) (*SimNode, bool) { } // SimNode is an in-memory simulation node which connects to other nodes using -// an in-memory net.Pipe connection (see SimAdapter.Dial), running devp2p -// protocols directly over that pipe +// net.Pipe (see SimAdapter.Dial), running devp2p protocols directly over that +// pipe type SimNode struct { lock sync.RWMutex ID discover.NodeID @@ -241,7 +258,7 @@ func (sn *SimNode) Start(snapshots map[string][]byte) error { service, err := serviceFunc(ctx, sn.node) if err != nil { regErr = err - return + break } // if the service has already been registered, don't register it again. if _, ok := sn.running[name]; ok { @@ -317,3 +334,18 @@ func (sn *SimNode) NodeInfo() *p2p.NodeInfo { } return server.NodeInfo() } + +func setSocketBuffer(conn net.Conn, socketReadBuffer int, socketWriteBuffer int) error { + switch v := conn.(type) { + case *net.UnixConn: + err := v.SetReadBuffer(socketReadBuffer) + if err != nil { + return err + } + err = v.SetWriteBuffer(socketWriteBuffer) + if err != nil { + return err + } + } + return nil +} diff --git a/p2p/simulations/adapters/inproc_test.go b/p2p/simulations/adapters/inproc_test.go new file mode 100644 index 000000000000..bc0dc32b8f19 --- /dev/null +++ b/p2p/simulations/adapters/inproc_test.go @@ -0,0 +1,259 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package adapters + +import ( + "bytes" + "encoding/binary" + "fmt" + "testing" + "time" + + "github.com/XinFinOrg/XDPoSChain/p2p/simulations/pipes" +) + +func TestTCPPipe(t *testing.T) { + c1, c2, err := pipes.TCPPipe() + if err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + + go func() { + msgs := 50 + size := 1024 + for i := 0; i < msgs; i++ { + msg := make([]byte, size) + _ = binary.PutUvarint(msg, uint64(i)) + + _, err := c1.Write(msg) + if err != nil { + t.Fatal(err) + } + } + + for i := 0; i < msgs; i++ { + msg := make([]byte, size) + _ = binary.PutUvarint(msg, uint64(i)) + + out := make([]byte, size) + _, err := c2.Read(out) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(msg, out) { + t.Fatalf("expected %#v, got %#v", msg, out) + } + } + done <- struct{}{} + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("test timeout") + } +} + +func TestTCPPipeBidirections(t *testing.T) { + c1, c2, err := pipes.TCPPipe() + if err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + + go func() { + msgs := 50 + size := 7 + for i := 0; i < msgs; i++ { + msg := []byte(fmt.Sprintf("ping %02d", i)) + + _, err := c1.Write(msg) + if err != nil { + t.Fatal(err) + } + } + + for i := 0; i < msgs; i++ { + expected := []byte(fmt.Sprintf("ping %02d", i)) + + out := make([]byte, size) + _, err := c2.Read(out) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(expected, out) { + t.Fatalf("expected %#v, got %#v", out, expected) + } else { + msg := []byte(fmt.Sprintf("pong %02d", i)) + _, err := c2.Write(msg) + if err != nil { + t.Fatal(err) + } + } + } + + for i := 0; i < msgs; i++ { + expected := []byte(fmt.Sprintf("pong %02d", i)) + + out := make([]byte, size) + _, err := c1.Read(out) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(expected, out) { + t.Fatalf("expected %#v, got %#v", out, expected) + } + } + done <- struct{}{} + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("test timeout") + } +} + +func TestNetPipe(t *testing.T) { + c1, c2, err := pipes.NetPipe() + if err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + + go func() { + msgs := 50 + size := 1024 + // netPipe is blocking, so writes are emitted asynchronously + go func() { + for i := 0; i < msgs; i++ { + msg := make([]byte, size) + _ = binary.PutUvarint(msg, uint64(i)) + + _, err := c1.Write(msg) + if err != nil { + t.Fatal(err) + } + } + }() + + for i := 0; i < msgs; i++ { + msg := make([]byte, size) + _ = binary.PutUvarint(msg, uint64(i)) + + out := make([]byte, size) + _, err := c2.Read(out) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(msg, out) { + t.Fatalf("expected %#v, got %#v", msg, out) + } + } + + done <- struct{}{} + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("test timeout") + } +} + +func TestNetPipeBidirections(t *testing.T) { + c1, c2, err := pipes.NetPipe() + if err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + + go func() { + msgs := 1000 + size := 8 + pingTemplate := "ping %03d" + pongTemplate := "pong %03d" + + // netPipe is blocking, so writes are emitted asynchronously + go func() { + for i := 0; i < msgs; i++ { + msg := []byte(fmt.Sprintf(pingTemplate, i)) + + _, err := c1.Write(msg) + if err != nil { + t.Fatal(err) + } + } + }() + + // netPipe is blocking, so reads for pong are emitted asynchronously + go func() { + for i := 0; i < msgs; i++ { + expected := []byte(fmt.Sprintf(pongTemplate, i)) + + out := make([]byte, size) + _, err := c1.Read(out) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(expected, out) { + t.Fatalf("expected %#v, got %#v", expected, out) + } + } + + done <- struct{}{} + }() + + // expect to read pings, and respond with pongs to the alternate connection + for i := 0; i < msgs; i++ { + expected := []byte(fmt.Sprintf(pingTemplate, i)) + + out := make([]byte, size) + _, err := c2.Read(out) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(expected, out) { + t.Fatalf("expected %#v, got %#v", expected, out) + } else { + msg := []byte(fmt.Sprintf(pongTemplate, i)) + + _, err := c2.Write(msg) + if err != nil { + t.Fatal(err) + } + } + } + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("test timeout") + } +} diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go index ea5e1a0458ee..37c7c51b8b47 100644 --- a/p2p/simulations/adapters/types.go +++ b/p2p/simulations/adapters/types.go @@ -22,7 +22,9 @@ import ( "encoding/json" "fmt" "log/slog" + "net" "os" + "strconv" "github.com/XinFinOrg/XDPoSChain/crypto" "github.com/XinFinOrg/XDPoSChain/node" @@ -98,6 +100,8 @@ type NodeConfig struct { // function to sanction or prevent suggesting a peer Reachable func(id discover.NodeID) bool + Port uint16 + // LogFile is the log file name of the p2p node at runtime. // // The default value is empty so that the default log writer @@ -113,23 +117,27 @@ type NodeConfig struct { // nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding // all fields as strings type nodeConfigJSON struct { - ID string `json:"id"` - PrivateKey string `json:"private_key"` - Name string `json:"name"` - Services []string `json:"services"` - LogFile string `json:"logfile"` - LogVerbosity int `json:"log_verbosity"` + ID string `json:"id"` + PrivateKey string `json:"private_key"` + Name string `json:"name"` + Services []string `json:"services"` + EnableMsgEvents bool `json:"enable_msg_events"` + Port uint16 `json:"port"` + LogFile string `json:"logfile"` + LogVerbosity int `json:"log_verbosity"` } // MarshalJSON implements the json.Marshaler interface by encoding the config // fields as strings func (n *NodeConfig) MarshalJSON() ([]byte, error) { confJSON := nodeConfigJSON{ - ID: n.ID.String(), - Name: n.Name, - Services: n.Lifecycles, - LogFile: n.LogFile, - LogVerbosity: int(n.LogVerbosity), + ID: n.ID.String(), + Name: n.Name, + Services: n.Lifecycles, + Port: n.Port, + EnableMsgEvents: n.EnableMsgEvents, + LogFile: n.LogFile, + LogVerbosity: int(n.LogVerbosity), } if n.PrivateKey != nil { confJSON.PrivateKey = hex.EncodeToString(crypto.FromECDSA(n.PrivateKey)) @@ -167,6 +175,8 @@ func (n *NodeConfig) UnmarshalJSON(data []byte) error { n.Name = confJSON.Name n.Lifecycles = confJSON.Services + n.Port = confJSON.Port + n.EnableMsgEvents = confJSON.EnableMsgEvents n.LogFile = confJSON.LogFile n.LogVerbosity = slog.Level(confJSON.LogVerbosity) @@ -180,13 +190,36 @@ func RandomNodeConfig() *NodeConfig { if err != nil { panic("unable to generate key") } - var id discover.NodeID - pubkey := crypto.FromECDSAPub(&key.PublicKey) - copy(id[:], pubkey[1:]) + + id := discover.PubkeyID(&key.PublicKey) + port, err := assignTCPPort() + if err != nil { + panic("unable to assign tcp port") + } return &NodeConfig{ - ID: id, - PrivateKey: key, + ID: id, + Name: fmt.Sprintf("node_%s", id.String()), + PrivateKey: key, + Port: port, + EnableMsgEvents: true, + } +} + +func assignTCPPort() (uint16, error) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return 0, err + } + l.Close() + _, port, err := net.SplitHostPort(l.Addr().String()) + if err != nil { + return 0, err + } + p, err := strconv.ParseInt(port, 10, 32) + if err != nil { + return 0, err } + return uint16(p), nil } // ServiceContext is a collection of options and methods which can be utilised diff --git a/p2p/simulations/http.go b/p2p/simulations/http.go index 501737cde453..0817866ad569 100644 --- a/p2p/simulations/http.go +++ b/p2p/simulations/http.go @@ -559,7 +559,8 @@ func (s *Server) LoadSnapshot(w http.ResponseWriter, req *http.Request) { // CreateNode creates a node in the network using the given configuration func (s *Server) CreateNode(w http.ResponseWriter, req *http.Request) { - config := adapters.RandomNodeConfig() + config := &adapters.NodeConfig{} + err := json.NewDecoder(req.Body).Decode(config) if err != nil && err != io.EOF { http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/p2p/simulations/http_test.go b/p2p/simulations/http_test.go index 72b08345f454..689d496316d2 100644 --- a/p2p/simulations/http_test.go +++ b/p2p/simulations/http_test.go @@ -350,7 +350,8 @@ func startTestNetwork(t *testing.T, client *Client) []string { nodeCount := 2 nodeIDs := make([]string, nodeCount) for i := 0; i < nodeCount; i++ { - node, err := client.CreateNode(nil) + config := adapters.RandomNodeConfig() + node, err := client.CreateNode(config) if err != nil { t.Fatalf("error creating node: %s", err) } @@ -527,7 +528,9 @@ func TestHTTPNodeRPC(t *testing.T) { // start a node in the network client := NewClient(s.URL) - node, err := client.CreateNode(nil) + + config := adapters.RandomNodeConfig() + node, err := client.CreateNode(config) if err != nil { t.Fatalf("error creating node: %s", err) } @@ -589,7 +592,8 @@ func TestHTTPSnapshot(t *testing.T) { nodeCount := 2 nodes := make([]*p2p.NodeInfo, nodeCount) for i := 0; i < nodeCount; i++ { - node, err := client.CreateNode(nil) + config := adapters.RandomNodeConfig() + node, err := client.CreateNode(config) if err != nil { t.Fatalf("error creating node: %s", err) } diff --git a/p2p/simulations/mocker.go b/p2p/simulations/mocker.go index 5d5548273b17..91cf123dae09 100644 --- a/p2p/simulations/mocker.go +++ b/p2p/simulations/mocker.go @@ -26,6 +26,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/log" "github.com/XinFinOrg/XDPoSChain/p2p/discover" + "github.com/XinFinOrg/XDPoSChain/p2p/simulations/adapters" ) // a map of mocker names to its function @@ -102,7 +103,13 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) { func probabilistic(net *Network, quit chan struct{}, nodeCount int) { nodes, err := connectNodesInRing(net, nodeCount) if err != nil { - panic("Could not startup node network for mocker") + select { + case <-quit: + //error may be due to abortion of mocking; so the quit channel is closed + return + default: + panic("Could not startup node network for mocker") + } } for { select { @@ -141,7 +148,7 @@ func probabilistic(net *Network, quit chan struct{}, nodeCount int) { log.Debug(fmt.Sprintf("node %v shutting down", nodes[i])) err := net.Stop(nodes[i]) if err != nil { - log.Error(fmt.Sprintf("Error stopping node %s", nodes[i])) + log.Error("Error stopping node", "node", nodes[i], "err", err) continue } wg.Go(func() { @@ -149,7 +156,7 @@ func probabilistic(net *Network, quit chan struct{}, nodeCount int) { time.Sleep(randWait) err := net.Start(id) if err != nil { - log.Error(fmt.Sprintf("Error starting node %s", id)) + log.Error("Error starting node", "node", id, "err", err) } }) } @@ -161,9 +168,10 @@ func probabilistic(net *Network, quit chan struct{}, nodeCount int) { func connectNodesInRing(net *Network, nodeCount int) ([]discover.NodeID, error) { ids := make([]discover.NodeID, nodeCount) for i := 0; i < nodeCount; i++ { - node, err := net.NewNode() + conf := adapters.RandomNodeConfig() + node, err := net.NewNodeWithConfig(conf) if err != nil { - log.Error("Error creating a node! %s", err) + log.Error("Error creating a node!", "err", err) return nil, err } ids[i] = node.ID() @@ -171,7 +179,7 @@ func connectNodesInRing(net *Network, nodeCount int) ([]discover.NodeID, error) for _, id := range ids { if err := net.Start(id); err != nil { - log.Error("Error starting a node! %s", err) + log.Error("Error starting a node!", "err", err) return nil, err } log.Debug(fmt.Sprintf("node %v starting up", id)) @@ -179,7 +187,7 @@ func connectNodesInRing(net *Network, nodeCount int) ([]discover.NodeID, error) for i, id := range ids { peerID := ids[(i+1)%len(ids)] if err := net.Connect(id, peerID); err != nil { - log.Error("Error connecting a node to a peer! %s", err) + log.Error("Error connecting a node to a peer!", "err", err) return nil, err } } diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index 3eef431fe722..1a707c9e8861 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -381,6 +381,15 @@ func (net *Network) GetNodeByName(name string) *Node { return net.getNodeByName(name) } +// GetNodes returns the existing nodes +func (net *Network) GetNodes() (nodes []*Node) { + net.lock.Lock() + defer net.lock.Unlock() + + nodes = append(nodes, net.Nodes...) + return nodes +} + func (net *Network) getNode(id discover.NodeID) *Node { i, found := net.nodeMap[id] if !found { @@ -398,15 +407,6 @@ func (net *Network) getNodeByName(name string) *Node { return nil } -// GetNodes returns the existing nodes -func (net *Network) GetNodes() (nodes []*Node) { - net.lock.Lock() - defer net.lock.Unlock() - - nodes = append(nodes, net.Nodes...) - return nodes -} - // GetConn returns the connection which exists between "one" and "other" // regardless of which node initiated the connection func (net *Network) GetConn(oneID, otherID discover.NodeID) *Conn { diff --git a/p2p/simulations/network_test.go b/p2p/simulations/network_test.go index a308442701ef..63a761d8f15d 100644 --- a/p2p/simulations/network_test.go +++ b/p2p/simulations/network_test.go @@ -41,7 +41,8 @@ func TestNetworkSimulation(t *testing.T) { nodeCount := 20 ids := make([]discover.NodeID, nodeCount) for i := 0; i < nodeCount; i++ { - node, err := network.NewNode() + conf := adapters.RandomNodeConfig() + node, err := network.NewNodeWithConfig(conf) if err != nil { t.Fatalf("error creating node: %s", err) } diff --git a/p2p/simulations/pipes/pipes.go b/p2p/simulations/pipes/pipes.go new file mode 100644 index 000000000000..8532c1bcf0e9 --- /dev/null +++ b/p2p/simulations/pipes/pipes.go @@ -0,0 +1,55 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package pipes + +import ( + "net" +) + +// NetPipe wraps net.Pipe in a signature returning an error +func NetPipe() (net.Conn, net.Conn, error) { + p1, p2 := net.Pipe() + return p1, p2, nil +} + +// TCPPipe creates an in process full duplex pipe based on a localhost TCP socket +func TCPPipe() (net.Conn, net.Conn, error) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, nil, err + } + defer l.Close() + + var aconn net.Conn + aerr := make(chan error, 1) + go func() { + var err error + aconn, err = l.Accept() + aerr <- err + }() + + dconn, err := net.Dial("tcp", l.Addr().String()) + if err != nil { + <-aerr + return nil, nil, err + } + if err := <-aerr; err != nil { + dconn.Close() + return nil, nil, err + } + return aconn, dconn, nil +}