Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions internal/web3ext/web3ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,16 @@ web3._extend({
call: 'admin_removePeer',
params: 1
}),
new web3._extend.Method({
name: 'addTrustedPeer',
call: 'admin_addTrustedPeer',
params: 1
}),
new web3._extend.Method({
name: 'removeTrustedPeer',
call: 'admin_removeTrustedPeer',
params: 1
}),
new web3._extend.Method({
name: 'exportChain',
call: 'admin_exportChain',
Expand Down
33 changes: 32 additions & 1 deletion node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (api *PrivateAdminAPI) AddPeer(url string) (bool, error) {
return true, nil
}

// RemovePeer disconnects from a a remote node if the connection exists
// RemovePeer disconnects from a remote node if the connection exists
func (api *PrivateAdminAPI) RemovePeer(url string) (bool, error) {
// Make sure the server is running, fail otherwise
server := api.node.Server()
Expand All @@ -76,6 +76,37 @@ func (api *PrivateAdminAPI) RemovePeer(url string) (bool, error) {
return true, nil
}

// AddTrustedPeer allows a remote node to always connect, even if slots are full
func (api *PrivateAdminAPI) AddTrustedPeer(url string) (bool, error) {
// Make sure the server is running, fail otherwise
server := api.node.Server()
if server == nil {
return false, ErrNodeStopped
}
node, err := discover.ParseNode(url)
if err != nil {
return false, fmt.Errorf("invalid enode: %v", err)
}
server.AddTrustedPeer(node)
return true, nil
}

// RemoveTrustedPeer removes a remote node from the trusted peer set, but it
// does not disconnect it automatically.
func (api *PrivateAdminAPI) RemoveTrustedPeer(url string) (bool, error) {
// Make sure the server is running, fail otherwise
server := api.node.Server()
if server == nil {
return false, ErrNodeStopped
}
node, err := discover.ParseNode(url)
if err != nil {
return false, fmt.Errorf("invalid enode: %v", err)
}
server.RemoveTrustedPeer(node)
return true, nil
}

// PeerEvents creates an RPC subscription which receives peer events from the
// node's p2p.Server
func (api *PrivateAdminAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) {
Expand Down
2 changes: 1 addition & 1 deletion p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (p *Peer) String() string {

// Inbound returns true if the peer is an inbound connection
func (p *Peer) Inbound() bool {
return p.rw.flags&inboundConn != 0
return p.rw.is(inboundConn)
}

func newPeer(conn *conn, protocols []Protocol) *Peer {
Expand Down
62 changes: 57 additions & 5 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"net"
"sync"
"sync/atomic"
"time"

"github.com/XinFinOrg/XDPoSChain/common"
Expand Down Expand Up @@ -168,6 +169,8 @@ type Server struct {
quit chan struct{}
addstatic chan *discover.Node
removestatic chan *discover.Node
addtrusted chan *discover.Node
removetrusted chan *discover.Node
posthandshake chan *conn
addpeer chan *conn
delpeer chan peerDrop
Expand All @@ -184,7 +187,7 @@ type peerDrop struct {
requested bool // true if signaled by the peer
}

type connFlag int
type connFlag int32

const (
dynDialedConn connFlag = 1 << iota
Expand Down Expand Up @@ -249,7 +252,18 @@ func (f connFlag) String() string {
}

func (c *conn) is(f connFlag) bool {
return c.flags&f != 0
flags := connFlag(atomic.LoadInt32((*int32)(&c.flags)))
return flags&f != 0
}

func (c *conn) set(f connFlag, val bool) {
flags := connFlag(atomic.LoadInt32((*int32)(&c.flags)))
if val {
flags |= f
} else {
flags &= ^f
}
atomic.StoreInt32((*int32)(&c.flags), int32(flags))
}

// Peers returns all connected peers.
Expand Down Expand Up @@ -300,6 +314,23 @@ func (srv *Server) RemovePeer(node *discover.Node) {
}
}

// AddTrustedPeer adds the given node to a reserved whitelist which allows the
// node to always connect, even if the slot are full.
func (srv *Server) AddTrustedPeer(node *discover.Node) {
select {
case srv.addtrusted <- node:
case <-srv.quit:
}
}

// RemoveTrustedPeer removes the given node from the trusted peer set.
func (srv *Server) RemoveTrustedPeer(node *discover.Node) {
select {
case srv.removetrusted <- node:
case <-srv.quit:
}
}

// SubscribePeers subscribes the given channel to peer events
func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription {
return srv.peerFeed.Subscribe(ch)
Expand Down Expand Up @@ -410,6 +441,8 @@ func (srv *Server) Start() (err error) {
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *discover.Node)
srv.removestatic = make(chan *discover.Node)
srv.addtrusted = make(chan *discover.Node)
srv.removetrusted = make(chan *discover.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})

Expand Down Expand Up @@ -546,8 +579,7 @@ func (srv *Server) run(dialstate dialer) {
queuedTasks []task // tasks that can't run yet
)
// Put trusted nodes into a map to speed up checks.
// Trusted peers are loaded on startup and cannot be
// modified while the server is running.
// Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
for _, n := range srv.TrustedNodes {
trusted[n.ID] = true
}
Expand Down Expand Up @@ -599,12 +631,32 @@ running:
case n := <-srv.removestatic:
// This channel is used by RemovePeer to send a
// disconnect request to a peer and begin the
// stop keeping the node connected
// stop keeping the node connected.
srv.log.Debug("Removing static node", "node", n)
dialstate.removeStatic(n)
if p, ok := peers[n.ID]; ok {
p.Disconnect(DiscRequested)
}
case n := <-srv.addtrusted:
// This channel is used by AddTrustedPeer to add an enode
// to the trusted node set.
srv.log.Trace("Adding trusted node", "node", n)
trusted[n.ID] = true
// Mark any already-connected peer as trusted
if p, ok := peers[n.ID]; ok {
p.rw.set(trustedConn, true)
}
case n := <-srv.removetrusted:
// This channel is used by RemoveTrustedPeer to remove an enode
// from the trusted node set.
srv.log.Trace("Removing trusted node", "node", n)
if _, ok := trusted[n.ID]; ok {
delete(trusted, n.ID)
}
// Unmark any already-connected peer as trusted
if p, ok := peers[n.ID]; ok {
p.rw.set(trustedConn, false)
}
case op := <-srv.peerOp:
// This channel is used by Peers and PeerCount.
op(peers)
Expand Down
110 changes: 108 additions & 2 deletions p2p/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ func TestServerDial(t *testing.T) {

// tell the server to connect
tcpAddr := listener.Addr().(*net.TCPAddr)
srv.AddPeer(&discover.Node{ID: remid, IP: tcpAddr.IP, TCP: uint16(tcpAddr.Port)})
node := &discover.Node{ID: remid, IP: tcpAddr.IP, TCP: uint16(tcpAddr.Port)}
srv.AddPeer(node)

select {
case conn := <-accepted:
Expand All @@ -169,6 +170,29 @@ func TestServerDial(t *testing.T) {
if !reflect.DeepEqual(peers, []*Peer{peer}) {
t.Errorf("Peers mismatch: got %v, want %v", peers, []*Peer{peer})
}

// Test AddTrustedPeer/RemoveTrustedPeer and changing Trusted flags
// Particularly for race conditions on changing the flag state.
if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
t.Errorf("peer is trusted prematurely: %v", peer)
}
done := make(chan bool)
go func() {
srv.AddTrustedPeer(node)
if peer := srv.Peers()[0]; !peer.Info().Network.Trusted {
t.Errorf("peer is not trusted after AddTrustedPeer: %v", peer)
}
srv.RemoveTrustedPeer(node)
if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer)
}
done <- true
}()
// Trigger potential race conditions
peer = srv.Peers()[0]
_ = peer.Inbound()
_ = peer.Info()
<-done
case <-time.After(1 * time.Second):
t.Error("server did not launch peer within one second")
}
Expand Down Expand Up @@ -365,7 +389,8 @@ func TestServerAtCap(t *testing.T) {
}
}
// Try inserting a non-trusted connection.
c := newconn(randomID())
anotherID := randomID()
c := newconn(anotherID)
if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers {
t.Error("wrong error for insert:", err)
}
Expand All @@ -378,6 +403,87 @@ func TestServerAtCap(t *testing.T) {
t.Error("Server did not set trusted flag")
}

// Remove from trusted set and try again
srv.RemoveTrustedPeer(&discover.Node{ID: trustedID})
c = newconn(trustedID)
if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers {
t.Error("wrong error for insert:", err)
}

// Add anotherID to trusted set and try again
srv.AddTrustedPeer(&discover.Node{ID: anotherID})
c = newconn(anotherID)
if err := srv.checkpoint(c, srv.posthandshake); err != nil {
t.Error("unexpected error for trusted conn @posthandshake:", err)
}
if !c.is(trustedConn) {
t.Error("Server did not set trusted flag")
}
}

func TestServerPeerLimits(t *testing.T) {
srvkey := newkey()

clientid := randomID()
clientnode := &discover.Node{ID: clientid}

var tp *setupTransport = &setupTransport{
id: clientid,
phs: &protoHandshake{
ID: clientid,
// Force "DiscUselessPeer" due to unmatching caps
// Caps: []Cap{discard.cap()},
},
}
var flags connFlag = dynDialedConn
var dialDest *discover.Node = &discover.Node{ID: clientid}

srv := &Server{
Config: Config{
PrivateKey: srvkey,
MaxPeers: 0,
NoDial: true,
Protocols: []Protocol{discard},
},
newTransport: func(fd net.Conn) transport { return tp },
log: log.New(),
}
if err := srv.Start(); err != nil {
t.Fatalf("couldn't start server: %v", err)
}
defer srv.Stop()

// Check that server is full (MaxPeers=0)
conn, _ := net.Pipe()
srv.SetupConn(conn, flags, dialDest)
if tp.closeErr != DiscTooManyPeers {
t.Errorf("unexpected close error: %q", tp.closeErr)
}
conn.Close()

srv.AddTrustedPeer(clientnode)

// Check that server allows a trusted peer despite being full.
conn, _ = net.Pipe()
srv.SetupConn(conn, flags, dialDest)
if tp.closeErr == DiscTooManyPeers {
t.Errorf("failed to bypass MaxPeers with trusted node: %q", tp.closeErr)
}

if tp.closeErr != DiscUselessPeer {
t.Errorf("unexpected close error: %q", tp.closeErr)
}
conn.Close()

srv.RemoveTrustedPeer(clientnode)

// Check that server is full again.
conn, _ = net.Pipe()
srv.SetupConn(conn, flags, dialDest)
if tp.closeErr != DiscTooManyPeers {
t.Errorf("unexpected close error: %q", tp.closeErr)
}
conn.Close()
}

func TestServerSetupConn(t *testing.T) {
Expand Down