Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion p2p/simulations/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func ControlEvent(v interface{}) *Event {
func (e *Event) String() string {
switch e.Type {
case EventTypeNode:
return fmt.Sprintf("<node-event> id: %s up: %t", e.Node.ID().TerminalString(), e.Node.Up)
return fmt.Sprintf("<node-event> id: %s up: %t", e.Node.ID().TerminalString(), e.Node.Up())
case EventTypeConn:
return fmt.Sprintf("<conn-event> nodes: %s->%s up: %t", e.Conn.One.TerminalString(), e.Conn.Other.TerminalString(), e.Conn.Up)
case EventTypeMsg:
Expand Down
18 changes: 10 additions & 8 deletions p2p/simulations/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,14 +421,15 @@ type expectEvents struct {
}

func (t *expectEvents) nodeEvent(id string, up bool) *Event {
node := Node{
Config: &adapters.NodeConfig{
ID: enode.HexID(id),
},
up: up,
}
return &Event{
Type: EventTypeNode,
Node: &Node{
Config: &adapters.NodeConfig{
ID: enode.HexID(id),
},
Up: up,
},
Node: &node,
}
}

Expand Down Expand Up @@ -480,6 +481,7 @@ loop:
}

func (t *expectEvents) expect(events ...*Event) {
t.Helper()
timeout := time.After(10 * time.Second)
i := 0
for {
Expand All @@ -501,8 +503,8 @@ func (t *expectEvents) expect(events ...*Event) {
if event.Node.ID() != expected.Node.ID() {
t.Fatalf("expected node event %d to have id %q, got %q", i, expected.Node.ID().TerminalString(), event.Node.ID().TerminalString())
}
if event.Node.Up != expected.Node.Up {
t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up, event.Node.Up)
if event.Node.Up() != expected.Node.Up() {
t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up(), event.Node.Up())
}

case EventTypeConn:
Expand Down
9 changes: 5 additions & 4 deletions p2p/simulations/mocker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,12 @@ func TestMocker(t *testing.T) {
for {
select {
case event := <-events:
//if the event is a node Up event only
if event.Node != nil && event.Node.Up {
if isNodeUp(event) {
//add the correspondent node ID to the map
nodemap[event.Node.Config.ID] = true
//this means all nodes got a nodeUp event, so we can continue the test
if len(nodemap) == nodeCount {
nodesComplete = true
//wait for 3s as the mocker will need time to connect the nodes
//time.Sleep( 3 *time.Second)
}
} else if event.Conn != nil && nodesComplete {
connCount += 1
Expand Down Expand Up @@ -169,3 +166,7 @@ func TestMocker(t *testing.T) {
t.Fatalf("Expected empty list of nodes, got: %d", len(nodesInfo))
}
}

func isNodeUp(event *Event) bool {
return event.Node != nil && event.Node.Up()
}
84 changes: 53 additions & 31 deletions p2p/simulations/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (net *Network) Config() *NetworkConfig {
// StartAll starts all nodes in the network
func (net *Network) StartAll() error {
for _, node := range net.Nodes {
if node.Up {
if node.Up() {
continue
}
if err := net.Start(node.ID()); err != nil {
Expand All @@ -149,7 +149,7 @@ func (net *Network) StartAll() error {
// StopAll stops all nodes in the network
func (net *Network) StopAll() error {
for _, node := range net.Nodes {
if !node.Up {
if !node.Up() {
continue
}
if err := net.Stop(node.ID()); err != nil {
Expand All @@ -168,27 +168,23 @@ func (net *Network) Start(id enode.ID) error {
// snapshots
func (net *Network) startWithSnapshots(id enode.ID, snapshots map[string][]byte) error {
net.lock.Lock()
defer net.lock.Unlock()

node := net.getNode(id)
if node == nil {
net.lock.Unlock()
return fmt.Errorf("node %v does not exist", id)
}
if node.Up {
net.lock.Unlock()
if node.Up() {
return fmt.Errorf("node %v already up", id)
}
log.Trace("Starting node", "id", id, "adapter", net.nodeAdapter.Name())
if err := node.Start(snapshots); err != nil {
net.lock.Unlock()
log.Warn("Node startup failed", "id", id, "err", err)
return err
}
node.Up = true
node.SetUp(true)
log.Info("Started node", "id", id)
ev := NewEvent(node)
net.lock.Unlock()

net.events.Send(ev)

// subscribe to peer events
Expand Down Expand Up @@ -219,7 +215,7 @@ func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub
if node == nil {
return
}
node.Up = false
node.SetUp(false)
ev := NewEvent(node)
net.events.Send(ev)
}()
Expand Down Expand Up @@ -258,29 +254,23 @@ func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub
// Stop stops the node with the given ID
func (net *Network) Stop(id enode.ID) error {
net.lock.Lock()
defer net.lock.Unlock()
node := net.getNode(id)
if node == nil {
net.lock.Unlock()
return fmt.Errorf("node %v does not exist", id)
}
if !node.Up {
net.lock.Unlock()
if !node.Up() {
return fmt.Errorf("node %v already down", id)
}
node.Up = false
net.lock.Unlock()
node.SetUp(false)

err := node.Stop()
if err != nil {
net.lock.Lock()
node.Up = true
net.lock.Unlock()
node.SetUp(true)
return err
}
log.Info("Stopped node", "id", id, "err", err)
net.lock.Lock()
ev := ControlEvent(node)
net.lock.Unlock()
net.events.Send(ev)
return nil
}
Expand Down Expand Up @@ -430,7 +420,7 @@ func (net *Network) GetRandomUpNode(excludeIDs ...enode.ID) *Node {

func (net *Network) getUpNodeIDs() (ids []enode.ID) {
for _, node := range net.Nodes {
if node.Up {
if node.Up() {
ids = append(ids, node.ID())
}
}
Expand All @@ -446,7 +436,7 @@ func (net *Network) GetRandomDownNode(excludeIDs ...enode.ID) *Node {

func (net *Network) getDownNodeIDs() (ids []enode.ID) {
for _, node := range net.GetNodes() {
if !node.Up {
if !node.Up() {
ids = append(ids, node.ID())
}
}
Expand Down Expand Up @@ -595,8 +585,21 @@ type Node struct {
// Config if the config used to created the node
Config *adapters.NodeConfig `json:"config"`

// Up tracks whether or not the node is running
Up bool `json:"up"`
// up tracks whether or not the node is running
up bool
upMu sync.RWMutex
}

func (n *Node) Up() bool {
n.upMu.RLock()
defer n.upMu.RUnlock()
return n.up
}

func (n *Node) SetUp(up bool) {
n.upMu.Lock()
defer n.upMu.Unlock()
n.up = up
}

// ID returns the ID of the node
Expand Down Expand Up @@ -630,10 +633,29 @@ func (n *Node) MarshalJSON() ([]byte, error) {
}{
Info: n.NodeInfo(),
Config: n.Config,
Up: n.Up,
Up: n.Up(),
})
}

// UnmarshalJSON implements json.Unmarshaler interface so that we don't lose
// Node.up status. IMPORTANT: The implementation is incomplete; we lose
// p2p.NodeInfo.
func (n *Node) UnmarshalJSON(raw []byte) error {
// TODO: How should we turn back NodeInfo into n.Node?
// Ticket: https://github.com/ethersphere/go-ethereum/issues/1177
node := struct {
Config *adapters.NodeConfig `json:"config,omitempty"`
Up bool `json:"up"`
}{}
if err := json.Unmarshal(raw, &node); err != nil {
return err
}

n.SetUp(node.Up)
n.Config = node.Config
return nil
}

// Conn represents a connection between two nodes in the network
type Conn struct {
// One is the node which initiated the connection
Expand All @@ -653,10 +675,10 @@ type Conn struct {

// nodesUp returns whether both nodes are currently up
func (c *Conn) nodesUp() error {
if !c.one.Up {
if !c.one.Up() {
return fmt.Errorf("one %v is not up", c.One)
}
if !c.other.Up {
if !c.other.Up() {
return fmt.Errorf("other %v is not up", c.Other)
}
return nil
Expand Down Expand Up @@ -728,7 +750,7 @@ func (net *Network) snapshot(addServices []string, removeServices []string) (*Sn
}
for i, node := range net.Nodes {
snap.Nodes[i] = NodeSnapshot{Node: *node}
if !node.Up {
if !node.Up() {
continue
}
snapshots, err := node.Snapshots()
Expand Down Expand Up @@ -783,7 +805,7 @@ func (net *Network) Load(snap *Snapshot) error {
if _, err := net.NewNodeWithConfig(n.Node.Config); err != nil {
return err
}
if !n.Node.Up {
if !n.Node.Up() {
continue
}
if err := net.startWithSnapshots(n.Node.Config.ID, n.Snapshots); err != nil {
Expand Down Expand Up @@ -855,7 +877,7 @@ func (net *Network) Load(snap *Snapshot) error {
// Start connecting.
for _, conn := range snap.Conns {

if !net.GetNode(conn.One).Up || !net.GetNode(conn.Other).Up {
if !net.GetNode(conn.One).Up() || !net.GetNode(conn.Other).Up() {
//in this case, at least one of the nodes of a connection is not up,
//so it would result in the snapshot `Load` to fail
continue
Expand Down Expand Up @@ -909,7 +931,7 @@ func (net *Network) executeControlEvent(event *Event) {
}

func (net *Network) executeNodeEvent(e *Event) error {
if !e.Node.Up {
if !e.Node.Up() {
return net.Stop(e.Node.ID())
}

Expand Down
Loading