Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion simulation/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (n *DockerNode) rpcClient() (*rpc.Client, error) {
var client *rpc.Client
var err error
wsAddr := fmt.Sprintf("ws://%s:%d", n.ipAddr, dockerWebsocketPort)
for start := time.Now(); time.Since(start) < 30*time.Second; time.Sleep(50 * time.Millisecond) {
for start := time.Now(); time.Since(start) < 30*time.Second; time.Sleep(200 * time.Millisecond) {
client, err = rpc.Dial(wsAddr)
if err == nil {
break
Expand Down
58 changes: 45 additions & 13 deletions simulation/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ type ExecAdapterConfig struct {

// ExecNode is a node that is executed locally
type ExecNode struct {
adapter *ExecAdapter
config NodeConfig
cmd *exec.Cmd
info NodeInfo
adapter *ExecAdapter
config NodeConfig
cmd *exec.Cmd
info NodeInfo
waitChan chan error
}

// NewExecAdapter creates an ExecAdapter by receiving a ExecAdapterConfig
Expand Down Expand Up @@ -69,9 +70,10 @@ func (a ExecAdapter) NewNode(config NodeConfig) Node {
ID: config.ID,
}
node := &ExecNode{
config: config,
adapter: &a,
info: info,
config: config,
adapter: &a,
info: info,
waitChan: make(chan error),
}
return node
}
Expand Down Expand Up @@ -135,6 +137,11 @@ func (n *ExecNode) Start() error {
return fmt.Errorf("error starting node %s: %s", n.config.ID, err)
}

// Wait channel from cmd.Wait() to know if the cmd exited before successful rpc.Dial call
go func(cmd *exec.Cmd, waitCh chan error) {
waitCh <- cmd.Wait()
}(n.cmd, n.waitChan)

// Wait for the node to start
var client *rpc.Client
var err error
Expand All @@ -143,11 +150,40 @@ func (n *ExecNode) Start() error {
n.Stop()
}
}()
for start := time.Now(); time.Since(start) < 10*time.Second; time.Sleep(50 * time.Millisecond) {

for start := time.Now(); time.Since(start) < 30*time.Second; time.Sleep(200 * time.Millisecond) {
client, err = rpc.Dial(n.ipcPath())
if err == nil {
break
}
// rpc.Dial is failing, so let's check if command exited due to TCP/UDP pair fail
select {
case <-n.waitChan:
// Command exited

// Restart command, as the process got killed due to tcp/udp pair of ports being taken
n.cmd = &exec.Cmd{
Path: n.adapter.config.ExecutablePath,
Args: args,
Dir: dir,
Env: n.config.Env,
Stdout: n.config.Stdout,
Stderr: n.config.Stderr,
}

if err := n.cmd.Start(); err != nil {
n.cmd = nil
return fmt.Errorf("error starting node %s: %s", n.config.ID, err)
}

// Wait channel from cmd.Wait() to know if the cmd exited before successful rpc.Dial call
go func(cmd *exec.Cmd, waitCh chan error) {
waitCh <- cmd.Wait()
}(n.cmd, n.waitChan)

default:
// Wait hasn't returned, so Command is still running...
}
}
if client == nil {
return fmt.Errorf("could not establish rpc connection. node %s: %v", n.config.ID, err)
Expand Down Expand Up @@ -190,12 +226,8 @@ func (n *ExecNode) Stop() error {
}

// Wait for the process to terminate or timeout
waitErr := make(chan error)
go func() {
waitErr <- n.cmd.Wait()
}()
select {
case err := <-waitErr:
case err := <-n.waitChan:
return err
case <-time.After(20 * time.Second):
return n.cmd.Process.Kill()
Expand Down
2 changes: 1 addition & 1 deletion simulation/simulation.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ func (s *Simulation) WaitForHealthyNetwork() error {
return err
}
if !healthy.Healthy() {
return fmt.Errorf("node %s is not healthy", nodes[i].Info().ID)
return fmt.Errorf("node %s is not healthy: known <%v> ; connected <%v>", nodes[i].Info().ID, healthy.CountKnowNN, healthy.CountConnectNN)
}
return nil
})
Expand Down