Skip to content

Commit c914088

Browse files
committed
perf(pool): optimize lock-free queue with conditional yielding and opposite-end pooling
- Replace unconditional runtime.Gosched() with conditional yielding (every 8 attempts) - Add PushOpposite() method to push UNUSABLE connections to opposite end - Update Put logic to use PushOpposite for UNUSABLE connections - Reduces Gosched overhead by ~87.5%
1 parent 3478f7a commit c914088

File tree

2 files changed

+70
-16
lines changed

2 files changed

+70
-16
lines changed

internal/pool/lockfree_queue.go

Lines changed: 56 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -102,27 +102,54 @@ func (q *lockFreeQueue) Push(cn *Conn) bool {
102102
if cn == nil {
103103
return false
104104
}
105-
105+
106106
// Check capacity before attempting to add
107107
if q.capacity > 0 {
108108
currentSize := q.size.Load()
109109
if currentSize >= q.capacity {
110110
return false
111111
}
112112
}
113-
113+
114114
if q.fifo {
115115
return q.pushFIFO(cn)
116116
}
117117
return q.pushLIFO(cn)
118118
}
119119

120+
// PushOpposite adds a connection to the opposite end of the queue.
121+
// This is useful for UNUSABLE connections that should not be immediately popped.
122+
// Returns true if successful, false if queue is at capacity.
123+
// Thread-safe and lock-free.
124+
func (q *lockFreeQueue) PushOpposite(cn *Conn) bool {
125+
if cn == nil {
126+
return false
127+
}
128+
129+
// Check capacity before attempting to add
130+
if q.capacity > 0 {
131+
currentSize := q.size.Load()
132+
if currentSize >= q.capacity {
133+
return false
134+
}
135+
}
136+
137+
// Push to opposite end
138+
if q.fifo {
139+
// FIFO mode: normal push is to tail, opposite is to head
140+
return q.pushLIFO(cn)
141+
}
142+
// LIFO mode: normal push is to head, opposite is to tail
143+
return q.pushFIFO(cn)
144+
}
145+
120146
// pushFIFO implements lock-free FIFO push (enqueue at tail).
121147
// Based on Michael-Scott queue algorithm.
122148
func (q *lockFreeQueue) pushFIFO(cn *Conn) bool {
123149
// Get node from pool instead of allocating
124150
node := getNode(cn)
125151

152+
attempts := 0
126153
for {
127154
tail := q.tail.Load()
128155
next := tail.next.Load()
@@ -145,8 +172,11 @@ func (q *lockFreeQueue) pushFIFO(cn *Conn) bool {
145172
q.tail.CompareAndSwap(tail, next)
146173
}
147174
}
148-
// Yield to prevent livelock under high contention
149-
runtime.Gosched()
175+
// Yield only after multiple attempts to reduce overhead
176+
attempts++
177+
if attempts&7 == 0 { // Every 8 attempts
178+
runtime.Gosched()
179+
}
150180
}
151181
}
152182

@@ -156,6 +186,7 @@ func (q *lockFreeQueue) pushLIFO(cn *Conn) bool {
156186
// Get node from pool instead of allocating
157187
node := getNode(cn)
158188

189+
attempts := 0
159190
for {
160191
head := q.head.Load()
161192
node.next.Store(head)
@@ -165,8 +196,11 @@ func (q *lockFreeQueue) pushLIFO(cn *Conn) bool {
165196
q.size.Add(1)
166197
return true
167198
}
168-
// Yield to prevent livelock under high contention
169-
runtime.Gosched()
199+
// Yield only after multiple attempts to reduce overhead
200+
attempts++
201+
if attempts&7 == 0 { // Every 8 attempts
202+
runtime.Gosched()
203+
}
170204
}
171205
}
172206

@@ -182,6 +216,7 @@ func (q *lockFreeQueue) Pop() *Conn {
182216

183217
// popFIFO implements lock-free FIFO pop (dequeue from head).
184218
func (q *lockFreeQueue) popFIFO() *Conn {
219+
attempts := 0
185220
for {
186221
head := q.head.Load()
187222
tail := q.tail.Load()
@@ -202,7 +237,10 @@ func (q *lockFreeQueue) popFIFO() *Conn {
202237
// But next could be nil if another thread just popped
203238
if next == nil {
204239
// Retry - another thread modified the queue
205-
runtime.Gosched()
240+
attempts++
241+
if attempts&7 == 0 {
242+
runtime.Gosched()
243+
}
206244
continue
207245
}
208246

@@ -223,13 +261,17 @@ func (q *lockFreeQueue) popFIFO() *Conn {
223261
}
224262
}
225263
}
226-
// Yield to prevent livelock under high contention
227-
runtime.Gosched()
264+
// Yield only after multiple attempts to reduce overhead
265+
attempts++
266+
if attempts&7 == 0 { // Every 8 attempts
267+
runtime.Gosched()
268+
}
228269
}
229270
}
230271

231272
// popLIFO implements lock-free LIFO pop (pop from head).
232273
func (q *lockFreeQueue) popLIFO() *Conn {
274+
attempts := 0
233275
for {
234276
head := q.head.Load()
235277
if head == nil {
@@ -250,8 +292,11 @@ func (q *lockFreeQueue) popLIFO() *Conn {
250292

251293
return conn
252294
}
253-
// Yield to prevent livelock under high contention
254-
runtime.Gosched()
295+
// Yield only after multiple attempts to reduce overhead
296+
attempts++
297+
if attempts&7 == 0 { // Every 8 attempts
298+
runtime.Gosched()
299+
}
255300
}
256301
}
257302

internal/pool/pool.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -691,7 +691,7 @@ func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) {
691691
// Handle different connection states:
692692
// - CREATED: New connection, never used yet - transition to IDLE
693693
// - IN_USE: Normal case after Get() - transition to IDLE
694-
// - UNUSABLE: Temporary state (handoff/reauth) - keep as-is and pool it (will become usable later)
694+
// - UNUSABLE: Temporary state (handoff/reauth) - keep as-is and pool at opposite end
695695
// - IDLE: Already idle (shouldn't happen but handle it)
696696
// - Other states: Remove the connection
697697

@@ -704,7 +704,7 @@ func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) {
704704
p.removeConnWithLock(cn)
705705
shouldCloseConn = true
706706
} else {
707-
// Successfully transitioned to IDLE, add to pool
707+
// Successfully transitioned to IDLE, add to pool at normal end
708708
if !p.idleQueue.Push(cn) {
709709
internal.Logger.Printf(ctx, "redis: connection pool: idle queue full, removing connection")
710710
p.removeConnWithLock(cn)
@@ -713,9 +713,18 @@ func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) {
713713
p.idleConnsLen.Add(1)
714714
}
715715
}
716-
} else if currentState == StateUnusable || currentState == StateIdle {
717-
// UNUSABLE: Pool it as-is (will become usable after handoff/reauth completes)
718-
// IDLE: Already idle, just pool it
716+
} else if currentState == StateUnusable {
717+
// UNUSABLE: Pool at opposite end so it's not immediately popped
718+
// This gives the connection time to become usable after handoff/reauth completes
719+
if !p.idleQueue.PushOpposite(cn) {
720+
internal.Logger.Printf(ctx, "redis: connection pool: idle queue full, removing unusable connection")
721+
p.removeConnWithLock(cn)
722+
shouldCloseConn = true
723+
} else {
724+
p.idleConnsLen.Add(1)
725+
}
726+
} else if currentState == StateIdle {
727+
// IDLE: Already idle, just pool it at normal end
719728
if !p.idleQueue.Push(cn) {
720729
internal.Logger.Printf(ctx, "redis: connection pool: idle queue full, removing connection")
721730
p.removeConnWithLock(cn)

0 commit comments

Comments
 (0)