Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using atomic instead of mutex and delete scratch slice #1833

Merged
merged 13 commits into from
Aug 25, 2024
154 changes: 69 additions & 85 deletions workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -27,23 +28,48 @@ type workerPool struct {

connState func(net.Conn, ConnState)

ready []*workerChan
ready workerChanStack

MaxWorkersCount int

MaxIdleWorkerDuration time.Duration

workersCount int

lock sync.Mutex
workersCount int32

LogAllErrors bool
mustStop bool
mustStop atomic.Bool
}

type workerChan struct {
lastUseTime time.Time
ch chan net.Conn
next *workerChan

ch chan net.Conn

lastUseTime int64
}

type workerChanStack struct {
head, tail *workerChan
}

func (s *workerChanStack) push(ch *workerChan) {
ch.next = s.head
s.head = ch
if s.tail == nil {
s.tail = ch
}
}

func (s *workerChanStack) pop() *workerChan {
head := s.head
if head == nil {
return nil
}
s.head = head.next
if s.head == nil {
s.tail = nil
}
return head
}
NikoMalik marked this conversation as resolved.
Show resolved Hide resolved

func (wp *workerPool) Start() {
Expand All @@ -58,9 +84,9 @@ func (wp *workerPool) Start() {
}
}
go func() {
var scratch []*workerChan

NikoMalik marked this conversation as resolved.
Show resolved Hide resolved
for {
wp.clean(&scratch)
wp.clean()
select {
case <-stopCh:
return
Expand All @@ -81,15 +107,16 @@ func (wp *workerPool) Stop() {
// Stop all the workers waiting for incoming connections.
// Do not wait for busy workers - they will stop after
// serving the connection and noticing wp.mustStop = true.
wp.lock.Lock()
ready := wp.ready
for i := range ready {
ready[i].ch <- nil
ready[i] = nil

for {
ch := wp.ready.pop()
if ch == nil {
break
}
ch.ch <- nil
}
wp.ready = ready[:0]
wp.mustStop = true
wp.lock.Unlock()
wp.mustStop.Store(true)

NikoMalik marked this conversation as resolved.
Show resolved Hide resolved
}

func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
Expand All @@ -99,50 +126,23 @@ func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
return wp.MaxIdleWorkerDuration
}

func (wp *workerPool) clean(scratch *[]*workerChan) {
func (wp *workerPool) clean() {
maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()

// Clean least recently used workers if they didn't serve connections
// for more than maxIdleWorkerDuration.
criticalTime := time.Now().Add(-maxIdleWorkerDuration)

wp.lock.Lock()
ready := wp.ready
n := len(ready)

// Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up.
l, r := 0, n-1
for l <= r {
mid := (l + r) / 2
if criticalTime.After(wp.ready[mid].lastUseTime) {
l = mid + 1
criticalTime := time.Now().Add(-maxIdleWorkerDuration).UnixNano()

current := wp.ready.head
for current != nil {
next := current.next
if current.lastUseTime < criticalTime {
current.ch <- nil
wp.workerChanPool.Put(current)
} else {
r = mid - 1
wp.ready.head = current
break
}
current = next
}
i := r
if i == -1 {
wp.lock.Unlock()
return
}

*scratch = append((*scratch)[:0], ready[:i+1]...)
m := copy(ready, ready[i+1:])
for i = m; i < n; i++ {
ready[i] = nil
}
wp.ready = ready[:m]
wp.lock.Unlock()

// Notify obsolete workers to stop.
// This notification must be outside the wp.lock, since ch.ch
// may be blocking and may consume a lot of time if many workers
// are located on non-local CPUs.
tmp := *scratch
for i := range tmp {
tmp[i].ch <- nil
tmp[i] = nil
}
wp.ready.tail = wp.ready.head
}

func (wp *workerPool) Serve(c net.Conn) bool {
Expand Down Expand Up @@ -170,27 +170,15 @@ var workerChanCap = func() int {

func (wp *workerPool) getCh() *workerChan {
var ch *workerChan
createWorker := false

wp.lock.Lock()
ready := wp.ready
n := len(ready) - 1
if n < 0 {
if wp.workersCount < wp.MaxWorkersCount {
createWorker = true
wp.workersCount++
}
} else {
ch = ready[n]
ready[n] = nil
wp.ready = ready[:n]
var createWorker bool

ch = wp.ready.pop()
if ch == nil && atomic.LoadInt32(&wp.workersCount) < int32(wp.MaxWorkersCount) {
NikoMalik marked this conversation as resolved.
Show resolved Hide resolved
atomic.AddInt32(&wp.workersCount, 1)
createWorker = true
}
wp.lock.Unlock()

if ch == nil {
if !createWorker {
return nil
}
if ch == nil && createWorker {
vch := wp.workerChanPool.Get()
ch = vch.(*workerChan)
go func() {
Expand All @@ -202,14 +190,12 @@ func (wp *workerPool) getCh() *workerChan {
}

func (wp *workerPool) release(ch *workerChan) bool {
ch.lastUseTime = time.Now()
wp.lock.Lock()
if wp.mustStop {
wp.lock.Unlock()
ch.lastUseTime = time.Now().UnixNano()
if wp.mustStop.Load() {
return false
}
wp.ready = append(wp.ready, ch)
wp.lock.Unlock()

wp.ready.push(ch)
return true
}

Expand Down Expand Up @@ -245,7 +231,5 @@ func (wp *workerPool) workerFunc(ch *workerChan) {
}
}

wp.lock.Lock()
wp.workersCount--
wp.lock.Unlock()
atomic.AddInt32(&wp.workersCount, -1)
}