Skip to content

Commit

Permalink
park waiters in slice; revise closure logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk committed Jan 30, 2019
1 parent bf4b91c commit 74d22f3
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 68 deletions.
82 changes: 44 additions & 38 deletions dial_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type waitingCh struct {
// Dialler throttling (e.g. FD limit exceeded) is a concern, as we can easily spin up more workers to compensate, and
// end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency
// to DialQueueMaxParallelism.
func newDialQueue(ctx context.Context, target string, in *queue.ChanQueue, dialFn func(context.Context, peer.ID) error, nConsumers int) *dialQueue {
func newDialQueue(ctx context.Context, target string, in *queue.ChanQueue, dialFn func(context.Context, peer.ID) error) *dialQueue {
sq := &dialQueue{
ctx: ctx,
dialFn: dialFn,
Expand All @@ -71,9 +71,9 @@ func newDialQueue(ctx context.Context, target string, in *queue.ChanQueue, dialF
in: in,
out: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(target)),

growCh: make(chan struct{}, nConsumers),
growCh: make(chan struct{}, 1),
shrinkCh: make(chan struct{}, 1),
waitingCh: make(chan waitingCh, nConsumers),
waitingCh: make(chan waitingCh),
dieCh: make(chan struct{}, DialQueueMaxParallelism),
}
for i := 0; i < DialQueueMinParallelism; i++ {
Expand All @@ -85,22 +85,16 @@ func newDialQueue(ctx context.Context, target string, in *queue.ChanQueue, dialF

func (dq *dialQueue) control() {
var (
p peer.ID
dialled = dq.out.DeqChan
resp waitingCh
waiting <-chan waitingCh
dialled <-chan peer.ID
waiting []waitingCh
lastScalingEvt = time.Now()
)

defer func() {
// close channels.
if resp.ch != nil {
close(resp.ch)
}
close(dq.waitingCh)
for w := range dq.waitingCh {
for _, w := range waiting {
close(w.ch)
}
waiting = nil
}()

for {
Expand All @@ -109,16 +103,23 @@ func (dq *dialQueue) control() {
select {
case <-dq.ctx.Done():
return
case p = <-dialled:
dialled, waiting = nil, dq.waitingCh
case w := <-dq.waitingCh:
waiting = append(waiting, w)
dialled = dq.out.DeqChan
continue // onto the top.
case resp = <-waiting:
// got a channel that's waiting for a peer.
log.Debugf("delivering dialled peer to DHT; took %dms.", time.Now().Sub(resp.ts)/time.Millisecond)
resp.ch <- p
close(resp.ch)
dialled, waiting = dq.out.DeqChan, nil // stop consuming waiting jobs until we've cleared a peer.
resp.ch = nil
case p, ok := <-dialled:
if !ok {
return // we're done if the ChanQueue is closed, which happens when the context is closed.
}
w := waiting[0]
log.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
w.ch <- p
close(w.ch)
waiting = waiting[1:]
if len(waiting) == 0 {
// no more waiters, so stop consuming dialled jobs.
dialled = nil
}
continue // onto the top.
default:
// there's nothing to process, so proceed onto the main select block.
Expand All @@ -127,23 +128,30 @@ func (dq *dialQueue) control() {
select {
case <-dq.ctx.Done():
return
case p = <-dialled:
dialled, waiting = nil, dq.waitingCh
case resp = <-waiting:
// got a channel that's waiting for a peer.
log.Debugf("delivering dialled peer to DHT; took %dms.", time.Now().Sub(resp.ts)/time.Millisecond)
resp.ch <- p
close(resp.ch)
dialled, waiting = dq.out.DeqChan, nil // stop consuming waiting jobs until we've cleared a peer.
resp.ch = nil
case w := <-dq.waitingCh:
waiting = append(waiting, w)
dialled = dq.out.DeqChan
case p, ok := <-dialled:
if !ok {
return // we're done if the ChanQueue is closed, which happens when the context is closed.
}
w := waiting[0]
log.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
w.ch <- p
close(w.ch)
waiting = waiting[1:]
if len(waiting) == 0 {
// no more waiters, so stop consuming dialled jobs.
dialled = nil
}
case <-dq.growCh:
if time.Now().Sub(lastScalingEvt) < DialQueueScalingMutePeriod {
if time.Since(lastScalingEvt) < DialQueueScalingMutePeriod {
continue
}
dq.grow()
lastScalingEvt = time.Now()
case <-dq.shrinkCh:
if time.Now().Sub(lastScalingEvt) < DialQueueScalingMutePeriod {
if time.Since(lastScalingEvt) < DialQueueScalingMutePeriod {
continue
}
dq.shrink()
Expand Down Expand Up @@ -176,13 +184,11 @@ func (dq *dialQueue) Consume() <-chan peer.ID {

// park the channel until a dialled peer becomes available.
select {
case dq.waitingCh <- waitingCh{ch, time.Now()}:
// all good
case <-dq.ctx.Done():
// return a closed channel with no value if we're done.
close(ch)
return ch
case dq.waitingCh <- waitingCh{ch, time.Now()}:
default:
panic("detected more consuming goroutines than declared upfront")
}
return ch
}
Expand Down Expand Up @@ -268,7 +274,7 @@ func (dq *dialQueue) worker() {
log.Debugf("discarding dialled peer because of error: %v", err)
continue
}
log.Debugf("dialling %v took %dms (as observed by the dht subsystem).", p, time.Now().Sub(t)/time.Millisecond)
log.Debugf("dialling %v took %dms (as observed by the dht subsystem).", p, time.Since(t)/time.Millisecond)
waiting := len(dq.waitingCh)
dq.out.EnqChan <- p
if waiting > 0 {
Expand Down
33 changes: 4 additions & 29 deletions dial_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,6 @@ func init() {
DialQueueScalingMutePeriod = 0
}

func TestDialQueueErrorsWithTooManyConsumers(t *testing.T) {
var calls int
defer func() {
if e := recover(); e == nil {
t.Error("expected a panic, got none")
} else if calls != 4 {
t.Errorf("expected a panic on the 4th call to Consume(); got it on call number %d", calls)
}
}()

in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
hang := make(chan struct{})
dialFn := func(ctx context.Context, p peer.ID) error {
<-hang
return nil
}

dq := newDialQueue(context.Background(), "test", in, dialFn, 3)
for ; calls < 3; calls++ {
dq.Consume()
}
calls++
dq.Consume()
}

func TestDialQueueGrowsOnSlowDials(t *testing.T) {
DialQueueMaxIdle = 10 * time.Minute

Expand All @@ -60,7 +35,7 @@ func TestDialQueueGrowsOnSlowDials(t *testing.T) {
}

// remove the mute period to grow faster.
dq := newDialQueue(context.Background(), "test", in, dialFn, 4)
dq := newDialQueue(context.Background(), "test", in, dialFn)

for i := 0; i < 4; i++ {
_ = dq.Consume()
Expand Down Expand Up @@ -93,7 +68,7 @@ func TestDialQueueShrinksWithNoConsumers(t *testing.T) {
return nil
}

dq := newDialQueue(context.Background(), "test", in, dialFn, 3)
dq := newDialQueue(context.Background(), "test", in, dialFn)

defer func() {
recover()
Expand Down Expand Up @@ -160,7 +135,7 @@ func TestDialQueueShrinksWithWhenIdle(t *testing.T) {
in.EnqChan <- peer.ID(i)
}

dq := newDialQueue(context.Background(), "test", in, dialFn, 3)
dq := newDialQueue(context.Background(), "test", in, dialFn)

// keep up to speed with backlog by releasing the dial function every time we acquire a channel.
for i := 0; i < 13; i++ {
Expand Down Expand Up @@ -203,7 +178,7 @@ func TestDialQueueMutePeriodHonored(t *testing.T) {
in.EnqChan <- peer.ID(i)
}

dq := newDialQueue(context.Background(), "test", in, dialFn, 3)
dq := newDialQueue(context.Background(), "test", in, dialFn)

// pick up three consumers.
for i := 0; i < 3; i++ {
Expand Down
2 changes: 1 addition & 1 deletion query.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
peersToQuery: peersToQuery,
proc: proc,
}
r.peersDialed = newDialQueue(ctx, q.key, peersToQuery, r.dialPeer, AlphaValue)
r.peersDialed = newDialQueue(ctx, q.key, peersToQuery, r.dialPeer)
return r
}

Expand Down

0 comments on commit 74d22f3

Please sign in to comment.