Skip to content

Commit cbfe424

Browse files
committed
nsqd: drop per-channel queue workers
This moves to a single goroutine to process in-flight and deferred priority queues. It manages a pool of workers (configurable max) that process channels concurrently. It is a copy of redis's probabilistic algorithm that wakes up every 100ms to select a random 20 channels from a locally cached list (refreshed every 5s). If either of the queues had work to do the channel is considered "dirty". If 25% of the selected channels were dirty, the loop continues without sleep. For 1000 topics and channels with no clients connected - idle cpu usage dropped to ~8%.
1 parent a63ec33 commit cbfe424

File tree

6 files changed

+221
-68
lines changed

6 files changed

+221
-68
lines changed

internal/util/rand.go

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package util
2+
3+
import (
4+
"math/rand"
5+
)
6+
7+
func UniqRands(l int, n int) []int {
8+
set := make(map[int]struct{})
9+
nums := make([]int, 0, l)
10+
for {
11+
num := rand.Intn(n)
12+
if _, ok := set[num]; !ok {
13+
set[num] = struct{}{}
14+
nums = append(nums, num)
15+
}
16+
if len(nums) == l {
17+
goto exit
18+
}
19+
}
20+
exit:
21+
return nums
22+
}

nsqd/channel.go

+51-68
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,8 @@ import (
1212

1313
"github.com/bitly/nsq/internal/pqueue"
1414
"github.com/bitly/nsq/internal/quantile"
15-
"github.com/bitly/nsq/internal/util"
1615
)
1716

18-
// the amount of time a worker will wait when idle
19-
const defaultWorkerWait = 100 * time.Millisecond
20-
2117
type Consumer interface {
2218
UnPause()
2319
Pause()
@@ -52,8 +48,8 @@ type Channel struct {
5248
memoryMsgChan chan *Message
5349
clientMsgChan chan *Message
5450
exitChan chan int
55-
waitGroup util.WaitGroupWrapper
5651
exitFlag int32
52+
exitMutex sync.RWMutex
5753

5854
// state tracking
5955
clients map[int64]Consumer
@@ -116,9 +112,6 @@ func NewChannel(topicName string, channelName string, ctx *context,
116112

117113
go c.messagePump()
118114

119-
c.waitGroup.Wrap(func() { c.deferredWorker() })
120-
c.waitGroup.Wrap(func() { c.inFlightWorker() })
121-
122115
c.ctx.nsqd.Notify(c)
123116

124117
return c
@@ -155,6 +148,9 @@ func (c *Channel) Close() error {
155148
}
156149

157150
func (c *Channel) exit(deleted bool) error {
151+
c.exitMutex.Lock()
152+
defer c.exitMutex.Unlock()
153+
158154
if !atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) {
159155
return errors.New("exiting")
160156
}
@@ -178,9 +174,6 @@ func (c *Channel) exit(deleted bool) error {
178174

179175
close(c.exitChan)
180176

181-
// synchronize the close of router() and pqWorkers (2)
182-
c.waitGroup.Wait()
183-
184177
if deleted {
185178
// empty the queue (deletes the backend files, too)
186179
c.Empty()
@@ -587,80 +580,70 @@ exit:
587580
close(c.clientMsgChan)
588581
}
589582

590-
func (c *Channel) deferredWorker() {
591-
c.pqWorker(&c.deferredPQ, &c.deferredMutex, func(item *pqueue.Item) {
592-
msg := item.Value.(*Message)
593-
_, err := c.popDeferredMessage(msg.ID)
594-
if err != nil {
595-
return
596-
}
597-
c.doRequeue(msg)
598-
})
599-
}
583+
func (c *Channel) processDeferredQueue(t int64) bool {
584+
c.exitMutex.RLock()
585+
defer c.exitMutex.RUnlock()
586+
587+
if c.Exiting() {
588+
return false
589+
}
600590

601-
func (c *Channel) inFlightWorker() {
602-
ticker := time.NewTicker(defaultWorkerWait)
591+
dirty := false
603592
for {
604-
select {
605-
case <-ticker.C:
606-
case <-c.exitChan:
593+
c.deferredMutex.Lock()
594+
item, _ := c.deferredPQ.PeekAndShift(t)
595+
c.deferredMutex.Unlock()
596+
597+
if item == nil {
607598
goto exit
608599
}
609-
now := time.Now().UnixNano()
610-
for {
611-
c.inFlightMutex.Lock()
612-
msg, _ := c.inFlightPQ.PeekAndShift(now)
613-
c.inFlightMutex.Unlock()
614-
615-
if msg == nil {
616-
break
617-
}
600+
dirty = true
618601

619-
_, err := c.popInFlightMessage(msg.clientID, msg.ID)
620-
if err != nil {
621-
break
622-
}
623-
atomic.AddUint64(&c.timeoutCount, 1)
624-
c.RLock()
625-
client, ok := c.clients[msg.clientID]
626-
c.RUnlock()
627-
if ok {
628-
client.TimedOutMessage()
629-
}
630-
c.doRequeue(msg)
602+
msg := item.Value.(*Message)
603+
_, err := c.popDeferredMessage(msg.ID)
604+
if err != nil {
605+
goto exit
631606
}
607+
c.doRequeue(msg)
632608
}
633609

634610
exit:
635-
c.ctx.nsqd.logf("CHANNEL(%s): closing ... inFlightWorker", c.name)
636-
ticker.Stop()
611+
return dirty
637612
}
638613

639-
// generic loop (executed in a goroutine) that periodically wakes up to walk
640-
// the priority queue and call the callback
641-
func (c *Channel) pqWorker(pq *pqueue.PriorityQueue, mutex *sync.Mutex, callback func(item *pqueue.Item)) {
642-
ticker := time.NewTicker(defaultWorkerWait)
614+
func (c *Channel) processInFlightQueue(t int64) bool {
615+
c.exitMutex.RLock()
616+
defer c.exitMutex.RUnlock()
617+
618+
if c.Exiting() {
619+
return false
620+
}
621+
622+
dirty := false
643623
for {
644-
select {
645-
case <-ticker.C:
646-
case <-c.exitChan:
624+
c.inFlightMutex.Lock()
625+
msg, _ := c.inFlightPQ.PeekAndShift(t)
626+
c.inFlightMutex.Unlock()
627+
628+
if msg == nil {
647629
goto exit
648630
}
649-
now := time.Now().UnixNano()
650-
for {
651-
mutex.Lock()
652-
item, _ := pq.PeekAndShift(now)
653-
mutex.Unlock()
654-
655-
if item == nil {
656-
break
657-
}
631+
dirty = true
658632

659-
callback(item)
633+
_, err := c.popInFlightMessage(msg.clientID, msg.ID)
634+
if err != nil {
635+
goto exit
660636
}
637+
atomic.AddUint64(&c.timeoutCount, 1)
638+
c.RLock()
639+
client, ok := c.clients[msg.clientID]
640+
c.RUnlock()
641+
if ok {
642+
client.TimedOutMessage()
643+
}
644+
c.doRequeue(msg)
661645
}
662646

663647
exit:
664-
c.ctx.nsqd.logf("CHANNEL(%s): closing ... pqueue worker", c.name)
665-
ticker.Stop()
648+
return dirty
666649
}

nsqd/channel_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func TestInFlightWorker(t *testing.T) {
6060
opts := NewNSQDOptions()
6161
opts.Logger = newTestLogger(t)
6262
opts.MsgTimeout = 100 * time.Millisecond
63+
opts.QueueScanRefreshInterval = 100 * time.Millisecond
6364
_, _, nsqd := mustStartNSQD(opts)
6465
defer os.RemoveAll(opts.DataPath)
6566
defer nsqd.Exit()

nsqd/nsqd.go

+134
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ type NSQD struct {
5959
httpsListener net.Listener
6060
tlsConfig *tls.Config
6161

62+
poolSize int
63+
6264
idChan chan MessageID
6365
notifyChan chan interface{}
6466
exitChan chan int
@@ -245,6 +247,7 @@ func (n *NSQD) Main() {
245247
http_api.Serve(n.httpListener, httpServer, n.opts.Logger, "HTTP")
246248
})
247249

250+
n.waitGroup.Wrap(func() { n.queueScanLoop() })
248251
n.waitGroup.Wrap(func() { n.idPump() })
249252
n.waitGroup.Wrap(func() { n.lookupLoop() })
250253
if n.opts.StatsdAddress != "" {
@@ -551,6 +554,137 @@ func (n *NSQD) Notify(v interface{}) {
551554
})
552555
}
553556

557+
// channels returns a flat slice of all channels in all topics
558+
func (n *NSQD) channels() []*Channel {
559+
var channels []*Channel
560+
n.RLock()
561+
for _, t := range n.topicMap {
562+
t.RLock()
563+
for _, c := range t.channelMap {
564+
channels = append(channels, c)
565+
}
566+
t.RUnlock()
567+
}
568+
n.RUnlock()
569+
return channels
570+
}
571+
572+
// resizePool adjusts the size of the pool of queueScanWorker goroutines
573+
//
574+
// 1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
575+
//
576+
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
577+
idealPoolSize := int(float64(num) * 0.25)
578+
if idealPoolSize < 1 {
579+
idealPoolSize = 1
580+
} else if idealPoolSize > n.opts.QueueScanWorkerPoolMax {
581+
idealPoolSize = n.opts.QueueScanWorkerPoolMax
582+
}
583+
for {
584+
if idealPoolSize == n.poolSize {
585+
break
586+
} else if idealPoolSize < n.poolSize {
587+
// contract
588+
closeCh <- 1
589+
n.poolSize--
590+
} else {
591+
// expand
592+
n.waitGroup.Wrap(func() {
593+
n.queueScanWorker(workCh, responseCh, closeCh)
594+
})
595+
n.poolSize++
596+
}
597+
}
598+
}
599+
600+
// queueScanWorker receives work (in the form of a channel) from queueScanLoop
601+
// and processes the deferred and in-flight queues
602+
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
603+
for {
604+
select {
605+
case c := <-workCh:
606+
now := time.Now().UnixNano()
607+
dirty := false
608+
if c.processInFlightQueue(now) {
609+
dirty = true
610+
}
611+
if c.processDeferredQueue(now) {
612+
dirty = true
613+
}
614+
responseCh <- dirty
615+
case <-closeCh:
616+
return
617+
}
618+
}
619+
}
620+
621+
// queueScanLoop runs in a single goroutine to process in-flight and deferred
622+
// priority queues. It manages a pool of queueScanWorker (configurable max of
623+
// QueueScanWorkerPoolMax (default: 4)) that process channels concurrently.
624+
//
625+
// It copies Redis's probabilistic expiration algorithm: it wakes up every
626+
// QueueScanInterval (default: 100ms) to select a random QueueScanSelectionCount
627+
// (default: 20) channels from a locally cached list (refreshed every
628+
// QueueScanRefreshInterval (default: 5s)).
629+
//
630+
// If either of the queues had work to do the channel is considered "dirty".
631+
//
632+
// If QueueScanDirtyPercent (default: 25%) of the selected channels were dirty,
633+
// the loop continues without sleep.
634+
func (n *NSQD) queueScanLoop() {
635+
workCh := make(chan *Channel, n.opts.QueueScanSelectionCount)
636+
responseCh := make(chan bool, n.opts.QueueScanSelectionCount)
637+
closeCh := make(chan int)
638+
639+
workTicker := time.NewTicker(n.opts.QueueScanInterval)
640+
refreshTicker := time.NewTicker(n.opts.QueueScanRefreshInterval)
641+
642+
channels := n.channels()
643+
n.resizePool(len(channels), workCh, responseCh, closeCh)
644+
645+
for {
646+
select {
647+
case <-workTicker.C:
648+
if len(channels) == 0 {
649+
continue
650+
}
651+
case <-refreshTicker.C:
652+
channels = n.channels()
653+
n.resizePool(len(channels), workCh, responseCh, closeCh)
654+
continue
655+
case <-n.exitChan:
656+
goto exit
657+
}
658+
659+
num := n.opts.QueueScanSelectionCount
660+
if num > len(channels) {
661+
num = len(channels)
662+
}
663+
664+
loop:
665+
for _, i := range util.UniqRands(num, len(channels)) {
666+
workCh <- channels[i]
667+
}
668+
669+
numDirty := 0
670+
for i := 0; i < num; i++ {
671+
if <-responseCh {
672+
numDirty++
673+
}
674+
}
675+
676+
if float64(numDirty)/float64(num) > n.opts.QueueScanDirtyPercent {
677+
goto loop
678+
}
679+
}
680+
681+
exit:
682+
n.logf("QUEUESCAN: closing")
683+
close(closeCh)
684+
workTicker.Stop()
685+
refreshTicker.Stop()
686+
}
687+
554688
func buildTLSConfig(opts *nsqdOptions) (*tls.Config, error) {
555689
var tlsConfig *tls.Config
556690

nsqd/options.go

+12
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ type nsqdOptions struct {
2828
SyncEvery int64 `flag:"sync-every"`
2929
SyncTimeout time.Duration `flag:"sync-timeout"`
3030

31+
QueueScanInterval time.Duration
32+
QueueScanRefreshInterval time.Duration
33+
QueueScanSelectionCount int
34+
QueueScanWorkerPoolMax int
35+
QueueScanDirtyPercent float64
36+
3137
// msg and command options
3238
MsgTimeout time.Duration `flag:"msg-timeout" arg:"1ms"`
3339
MaxMsgTimeout time.Duration `flag:"max-msg-timeout"`
@@ -85,6 +91,12 @@ func NewNSQDOptions() *nsqdOptions {
8591
SyncEvery: 2500,
8692
SyncTimeout: 2 * time.Second,
8793

94+
QueueScanInterval: 100 * time.Millisecond,
95+
QueueScanRefreshInterval: 5 * time.Second,
96+
QueueScanSelectionCount: 20,
97+
QueueScanWorkerPoolMax: 4,
98+
QueueScanDirtyPercent: 0.25,
99+
88100
MsgTimeout: 60 * time.Second,
89101
MaxMsgTimeout: 15 * time.Minute,
90102
MaxMsgSize: 1024768,

0 commit comments

Comments
 (0)