Skip to content
3 changes: 3 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ type Config struct {
// *WARNING* Only set this if the node is running in a trusted network, exposing
// private APIs to untrusted users is a major security risk.
WSExposeAll bool `toml:",omitempty"`

// Logger is a custom logger to use with the p2p.Server.
Logger log.Logger
}

// IPCEndpoint resolves an IPC endpoint based on a configured value, taking into
Expand Down
33 changes: 20 additions & 13 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type Node struct {

stop chan struct{} // Channel to wait for termination notifications
lock sync.RWMutex

log log.Logger
}

// New creates a new P2P node, ready for protocol registration.
Expand Down Expand Up @@ -101,6 +103,9 @@ func New(conf *Config) (*Node, error) {
if err != nil {
return nil, err
}
if conf.Logger == nil {
conf.Logger = log.New()
}
// Note: any interaction with Config that would create/touch files
// in the data directory or instance directory is delayed until Start.
return &Node{
Expand All @@ -112,6 +117,7 @@ func New(conf *Config) (*Node, error) {
httpEndpoint: conf.HTTPEndpoint(),
wsEndpoint: conf.WSEndpoint(),
eventmux: new(event.TypeMux),
log: conf.Logger,
}, nil
}

Expand Down Expand Up @@ -146,6 +152,7 @@ func (n *Node) Start() error {
n.serverConfig = n.config.P2P
n.serverConfig.PrivateKey = n.config.NodeKey()
n.serverConfig.Name = n.config.NodeName()
n.serverConfig.Logger = n.log
if n.serverConfig.StaticNodes == nil {
n.serverConfig.StaticNodes = n.config.StaticNodes()
}
Expand All @@ -156,7 +163,7 @@ func (n *Node) Start() error {
n.serverConfig.NodeDatabase = n.config.NodeDB()
}
running := &p2p.Server{Config: n.serverConfig}
log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)
n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)

// Otherwise copy and specialize the P2P configuration
services := make(map[reflect.Type]Service)
Expand Down Expand Up @@ -280,7 +287,7 @@ func (n *Node) startInProc(apis []rpc.API) error {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
log.Debug(fmt.Sprintf("InProc registered %T under '%s'", api.Service, api.Namespace))
n.log.Debug(fmt.Sprintf("InProc registered %T under '%s'", api.Service, api.Namespace))
}
n.inprocHandler = handler
return nil
Expand All @@ -306,7 +313,7 @@ func (n *Node) startIPC(apis []rpc.API) error {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
log.Debug(fmt.Sprintf("IPC registered %T under '%s'", api.Service, api.Namespace))
n.log.Debug(fmt.Sprintf("IPC registered %T under '%s'", api.Service, api.Namespace))
}
// All APIs registered, start the IPC listener
var (
Expand All @@ -317,7 +324,7 @@ func (n *Node) startIPC(apis []rpc.API) error {
return err
}
go func() {
log.Info(fmt.Sprintf("IPC endpoint opened: %s", n.ipcEndpoint))
n.log.Info(fmt.Sprintf("IPC endpoint opened: %s", n.ipcEndpoint))

for {
conn, err := listener.Accept()
Expand All @@ -330,7 +337,7 @@ func (n *Node) startIPC(apis []rpc.API) error {
return
}
// Not closed, just some error; report and continue
log.Error(fmt.Sprintf("IPC accept failed: %v", err))
n.log.Error(fmt.Sprintf("IPC accept failed: %v", err))
continue
}
go handler.ServeCodec(rpc.NewJSONCodec(conn), rpc.OptionMethodInvocation|rpc.OptionSubscriptions)
Expand All @@ -349,7 +356,7 @@ func (n *Node) stopIPC() {
n.ipcListener.Close()
n.ipcListener = nil

log.Info(fmt.Sprintf("IPC endpoint closed: %s", n.ipcEndpoint))
n.log.Info(fmt.Sprintf("IPC endpoint closed: %s", n.ipcEndpoint))
}
if n.ipcHandler != nil {
n.ipcHandler.Stop()
Expand All @@ -375,7 +382,7 @@ func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
log.Debug(fmt.Sprintf("HTTP registered %T under '%s'", api.Service, api.Namespace))
n.log.Debug(fmt.Sprintf("HTTP registered %T under '%s'", api.Service, api.Namespace))
}
}
// All APIs registered, start the HTTP listener
Expand All @@ -387,7 +394,7 @@ func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors
return err
}
go rpc.NewHTTPServer(cors, handler).Serve(listener)
log.Info(fmt.Sprintf("HTTP endpoint opened: http://%s", endpoint))
n.log.Info(fmt.Sprintf("HTTP endpoint opened: http://%s", endpoint))

// All listeners booted successfully
n.httpEndpoint = endpoint
Expand All @@ -403,7 +410,7 @@ func (n *Node) stopHTTP() {
n.httpListener.Close()
n.httpListener = nil

log.Info(fmt.Sprintf("HTTP endpoint closed: http://%s", n.httpEndpoint))
n.log.Info(fmt.Sprintf("HTTP endpoint closed: http://%s", n.httpEndpoint))
}
if n.httpHandler != nil {
n.httpHandler.Stop()
Expand All @@ -429,7 +436,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
log.Debug(fmt.Sprintf("WebSocket registered %T under '%s'", api.Service, api.Namespace))
n.log.Debug(fmt.Sprintf("WebSocket registered %T under '%s'", api.Service, api.Namespace))
}
}
// All APIs registered, start the HTTP listener
Expand All @@ -441,7 +448,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
return err
}
go rpc.NewWSServer(wsOrigins, handler).Serve(listener)
log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", listener.Addr()))
n.log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", listener.Addr()))

// All listeners booted successfully
n.wsEndpoint = endpoint
Expand All @@ -457,7 +464,7 @@ func (n *Node) stopWS() {
n.wsListener.Close()
n.wsListener = nil

log.Info(fmt.Sprintf("WebSocket endpoint closed: ws://%s", n.wsEndpoint))
n.log.Info(fmt.Sprintf("WebSocket endpoint closed: ws://%s", n.wsEndpoint))
}
if n.wsHandler != nil {
n.wsHandler.Stop()
Expand Down Expand Up @@ -496,7 +503,7 @@ func (n *Node) Stop() error {
// Release instance directory lock.
if n.instanceDirLock != nil {
if err := n.instanceDirLock.Release(); err != nil {
log.Error("Can't release datadir lock", "err", err)
n.log.Error("Can't release datadir lock", "err", err)
}
n.instanceDirLock = nil
}
Expand Down
25 changes: 15 additions & 10 deletions p2p/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,14 @@ func (t *dialTask) Do(srv *Server) {
return
}
}
success := t.dial(srv, t.dest)
// Try resolving the ID of static nodes if dialing failed.
if !success && t.flags&staticDialedConn != 0 {
if t.resolve(srv) {
t.dial(srv, t.dest)
err := t.dial(srv, t.dest)
if err != nil {
log.Trace("Dial error", "task", t, "err", err)
// Try resolving the ID of static nodes if dialing failed.
if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
if t.resolve(srv) {
t.dial(srv, t.dest)
}
}
}
}
Expand Down Expand Up @@ -334,16 +337,18 @@ func (t *dialTask) resolve(srv *Server) bool {
return true
}

type dialError struct {
error
}

// dial performs the actual connection attempt.
func (t *dialTask) dial(srv *Server, dest *discover.Node) bool {
func (t *dialTask) dial(srv *Server, dest *discover.Node) error {
fd, err := srv.Dialer.Dial(dest)
if err != nil {
log.Trace("Dial error", "task", t, "err", err)
return false
return &dialError{err}
}
mfd := newMeteredConn(fd, false)
srv.SetupConn(mfd, t.flags, dest)
return true
return srv.SetupConn(mfd, t.flags, dest)
}

func (t *dialTask) String() string {
Expand Down
5 changes: 5 additions & 0 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ func (p *Peer) String() string {
return fmt.Sprintf("Peer %x %v", p.rw.id[:8], p.RemoteAddr())
}

// Inbound returns true if the peer is an inbound connection
func (p *Peer) Inbound() bool {
return p.rw.flags&inboundConn != 0
}

func newPeer(conn *conn, protocols []Protocol) *Peer {
protomap := matchProtocols(protocols, conn.caps, conn)
p := &Peer{
Expand Down
Loading