Skip to content
Merged
8 changes: 6 additions & 2 deletions dot/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,15 @@ func (d *discovery) start() error {
// get all currently connected peers and use them to bootstrap the DHT
peers := d.h.Network().Peers()

t := time.NewTicker(startDHTTimeout)
defer t.Stop()
for {
if len(peers) > 0 {
break
}

select {
case <-time.After(startDHTTimeout):
case <-t.C:
logger.Debug("no peers yet, waiting to start DHT...")
// wait for peers to connect before starting DHT, otherwise DHT bootstrap nodes
// will be empty and we will fail to fill the routing table
Expand Down Expand Up @@ -160,11 +162,13 @@ func (d *discovery) advertise() {
}

func (d *discovery) checkPeerCount() {
t := time.NewTicker(connectToPeersTimeout)
defer t.Stop()
for {
select {
case <-d.ctx.Done():
return
case <-time.After(connectToPeersTimeout):
case <-t.C:
if len(d.h.Network().Peers()) > d.minPeers {
continue
}
Expand Down
12 changes: 9 additions & 3 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,12 @@ func (q *syncQueue) syncAtHead() {
q.s.syncer.SetSyncing(true)
q.s.noGossip = true // don't gossip messages until we're at the head

t := time.NewTicker(q.slotDuration * 2)
defer t.Stop()
for {
select {
// sleep for average block time TODO: make this configurable from slot duration
case <-time.After(q.slotDuration * 2):
case <-t.C:
case <-q.ctx.Done():
return
}
Expand Down Expand Up @@ -214,9 +216,11 @@ func (q *syncQueue) syncAtHead() {
}

func (q *syncQueue) handleResponseQueue() {
t := time.NewTicker(time.Second)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make these durations configurable to the type, so syncQueue in this case? It will help with testing and we can adjust the durations in the unit tests.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, I've added configuration variables.

defer t.Stop()
for {
select {
case <-time.After(time.Second):
case <-t.C:
case <-q.ctx.Done():
return
}
Expand Down Expand Up @@ -260,9 +264,11 @@ func (q *syncQueue) handleResponseQueue() {

// prune peers with low score and connect to new peers
func (q *syncQueue) prunePeers() {
t := time.NewTicker(time.Second * 30)
defer t.Stop()
for {
select {
case <-time.After(time.Second * 30):
case <-t.C:
case <-q.ctx.Done():
return
}
Expand Down
4 changes: 3 additions & 1 deletion dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,12 @@ func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) {

// SendMessage sends Message to connected telemetry listeners
func (h *Handler) SendMessage(msg *Message) error {
t := time.NewTicker(time.Second * 1)
defer t.Stop()
select {
case h.msg <- *msg:

case <-time.After(time.Second * 1):
case <-t.C:
return errors.New("timeout sending message")
}
return nil
Expand Down
4 changes: 3 additions & 1 deletion lib/grandpa/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,13 @@ func (s *Service) handleNetworkMessage(from peer.ID, msg NotificationsMessage) (
}

func (s *Service) sendNeighbourMessage() {
t := time.NewTicker(neighbourMessageInterval)
defer t.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-time.After(neighbourMessageInterval):
case <-t.C:
if s.neighbourMessage == nil {
continue
}
Expand Down