Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: merge waits and recvs to wrCounter #809

Merged
merged 9 commits into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 60 additions & 36 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ type pipe struct {
_ [10]int32
blcksig int32
state int32
waits int32
recvs int32
wrCounter atomic.Uint64
bgState int32
r2ps bool // identify this pipe is used for resp2 pubsub or not
noNoDelay bool
Expand Down Expand Up @@ -372,10 +371,10 @@ func (p *pipe) _background() {
select {
case <-p.close:
default:
atomic.AddInt32(&p.waits, 1)
p.incrWaits()
go func() {
<-p.queue.PutOne(cmds.PingCmd) // avoid _backgroundWrite hanging at p.queue.WaitForWrite()
atomic.AddInt32(&p.waits, -1)
p.decrWaits()
}()
}
}
Expand Down Expand Up @@ -406,7 +405,7 @@ func (p *pipe) _background() {
}

resp := newErrResult(err)
for atomic.LoadInt32(&p.waits) != 0 {
for p.loadWaits() != 0 {
select {
case <-p.close: // p.queue.NextWriteCmd() can only be called after _backgroundWrite
_, _, _ = p.queue.NextWriteCmd()
Expand Down Expand Up @@ -450,7 +449,7 @@ func (p *pipe) _backgroundWrite() (err error) {
}
}
ones[0], multi, ch = p.queue.WaitForWrite()
if flushDelay != 0 && atomic.LoadInt32(&p.waits) > 1 { // do not delay for sequential usage
if flushDelay != 0 && p.loadWaits() > 1 { // do not delay for sequential usage
// Blocking commands are executed in dedicated client which is acquired from pool.
// So, there is no sense to wait other commands to be written.
// https://github.com/redis/rueidis/issues/379
Expand Down Expand Up @@ -637,17 +636,17 @@ func (p *pipe) _backgroundRead() (err error) {
func (p *pipe) backgroundPing() {
var prev, recv int32

prev = atomic.LoadInt32(&p.recvs)
prev = p.loadRecvs()
p.pingTimer = time.AfterFunc(p.pinggap, func() {
var err error
recv = atomic.LoadInt32(&p.recvs)
recv = p.loadRecvs()
defer func() {
if err == nil && p.Error() == nil {
prev = atomic.LoadInt32(&p.recvs)
prev = p.loadRecvs()
p.pingTimer.Reset(p.pinggap)
}
}()
if recv != prev || atomic.LoadInt32(&p.blcksig) != 0 || (atomic.LoadInt32(&p.state) == 0 && atomic.LoadInt32(&p.waits) != 0) {
if recv != prev || atomic.LoadInt32(&p.blcksig) != 0 || (atomic.LoadInt32(&p.state) == 0 && p.loadWaits() != 0) {
return
}
ch := make(chan error, 1)
Expand Down Expand Up @@ -840,10 +839,10 @@ func (p *pipe) SetPubSubHooks(hooks PubSubHooks) <-chan error {
close(old.close)
}
}
if atomic.AddInt32(&p.waits, 1) == 1 && atomic.LoadInt32(&p.state) == 0 {
if p.incrWaits() == 1 && atomic.LoadInt32(&p.state) == 0 {
p.background()
}
atomic.AddInt32(&p.waits, -1)
p.decrWaits()
return ch
}

Expand Down Expand Up @@ -884,7 +883,7 @@ func (p *pipe) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
return p._r2pipe(ctx).Do(ctx, cmd)
}
}
waits := atomic.AddInt32(&p.waits, 1) // if this is 1, and background worker is not started, no need to queue
waits := p.incrWaits() // if this is 1, and background worker is not started, no need to queue
state := atomic.LoadInt32(&p.state)

if state == 1 {
Expand All @@ -908,10 +907,10 @@ func (p *pipe) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
} else {
resp = newErrResult(p.Error())
}
if left := atomic.AddInt32(&p.waits, -1); state == 0 && left != 0 {

if _, left := p.decrWaitsAndIncrRecvs(); state == 0 && left != 0 {
p.background()
}
atomic.AddInt32(&p.recvs, 1)
return resp

queue:
Expand All @@ -925,14 +924,12 @@ queue:
goto abort
}
}
atomic.AddInt32(&p.waits, -1)
atomic.AddInt32(&p.recvs, 1)
p.decrWaitsAndIncrRecvs()
return resp
abort:
go func(ch chan RedisResult) {
<-ch
atomic.AddInt32(&p.waits, -1)
atomic.AddInt32(&p.recvs, 1)
p.decrWaitsAndIncrRecvs()
}(ch)
return newErrResult(ctx.Err())
}
Expand Down Expand Up @@ -990,7 +987,7 @@ func (p *pipe) DoMulti(ctx context.Context, multi ...Completed) *redisresults {
}
}

waits := atomic.AddInt32(&p.waits, 1) // if this is 1, and background worker is not started, no need to queue
waits := p.incrWaits() // if this is 1, and background worker is not started, no need to queue
state := atomic.LoadInt32(&p.state)

if state == 1 {
Expand All @@ -1017,10 +1014,9 @@ func (p *pipe) DoMulti(ctx context.Context, multi ...Completed) *redisresults {
resp.s[i] = err
}
}
if left := atomic.AddInt32(&p.waits, -1); state == 0 && left != 0 {
if _, left := p.decrWaitsAndIncrRecvs(); state == 0 && left != 0 {
p.background()
}
atomic.AddInt32(&p.recvs, 1)
return resp

queue:
Expand All @@ -1034,15 +1030,13 @@ queue:
goto abort
}
}
atomic.AddInt32(&p.waits, -1)
atomic.AddInt32(&p.recvs, 1)
p.decrWaitsAndIncrRecvs()
return resp
abort:
go func(resp *redisresults, ch chan RedisResult) {
<-ch
resultsp.Put(resp)
atomic.AddInt32(&p.waits, -1)
atomic.AddInt32(&p.recvs, 1)
p.decrWaitsAndIncrRecvs()
}(resp, ch)
resp = resultsp.Get(len(multi), len(multi))
err := newErrResult(ctx.Err())
Expand Down Expand Up @@ -1084,7 +1078,7 @@ func (s *RedisResultStream) WriteTo(w io.Writer) (n int64, err error) {
}
if s.n--; s.n == 0 {
atomic.AddInt32(&s.w.blcksig, -1)
atomic.AddInt32(&s.w.waits, -1)
s.w.decrWaits()
if s.e == nil {
s.e = io.EOF
} else {
Expand All @@ -1111,7 +1105,7 @@ func (p *pipe) DoStream(ctx context.Context, pool *pool, cmd Completed) RedisRes

if state == 0 {
atomic.AddInt32(&p.blcksig, 1)
waits := atomic.AddInt32(&p.waits, 1)
waits := p.incrWaits()
if waits != 1 {
panic("DoStream with racing is a bug")
}
Expand Down Expand Up @@ -1139,7 +1133,7 @@ func (p *pipe) DoStream(ctx context.Context, pool *pool, cmd Completed) RedisRes
}
}
atomic.AddInt32(&p.blcksig, -1)
atomic.AddInt32(&p.waits, -1)
p.decrWaits()
pool.Store(p)
return RedisResultStream{e: p.Error()}
}
Expand All @@ -1161,7 +1155,7 @@ func (p *pipe) DoMultiStream(ctx context.Context, pool *pool, multi ...Completed

if state == 0 {
atomic.AddInt32(&p.blcksig, 1)
waits := atomic.AddInt32(&p.waits, 1)
waits := p.incrWaits()
if waits != 1 {
panic("DoMultiStream with racing is a bug")
}
Expand Down Expand Up @@ -1204,7 +1198,7 @@ func (p *pipe) DoMultiStream(ctx context.Context, pool *pool, multi ...Completed
}
}
atomic.AddInt32(&p.blcksig, -1)
atomic.AddInt32(&p.waits, -1)
p.decrWaits()
pool.Store(p)
return RedisResultStream{e: p.Error()}
}
Expand Down Expand Up @@ -1559,6 +1553,36 @@ func (p *pipe) DoMultiCache(ctx context.Context, multi ...CacheableTTL) *redisre
return results
}

// incrWaits increments the lower 32 bits (waits).
func (p *pipe) incrWaits() uint32 {
// Increment the lower 32 bits (waits)
return uint32(p.wrCounter.Add(1))
}

// decrWaits decrements the lower 32 bits (waits).
func (p *pipe) decrWaits() uint32 {
// Decrement the lower 32 bits (waits)
return uint32(p.wrCounter.Add(^uint64(0)) & 0xFFFFFFFF)
}

// decrWaitsAndIncrRecvs decrements the lower 32 bits (waits) and increments the upper 32 bits (recvs).
func (p *pipe) decrWaitsAndIncrRecvs() (uint32, uint32) {
newValue := p.wrCounter.Add((1 << 32) - 1)
return uint32(newValue >> 32), uint32(newValue & 0xFFFFFFFF)
}

// loadRecvs loads the upper 32 bits (recvs).
func (p *pipe) loadRecvs() int32 {
// Load the upper 32 bits (recvs)
return int32(p.wrCounter.Load() >> 32)
}

// loadWaits loads the lower 32 bits (waits).
func (p *pipe) loadWaits() uint32 {
// Load the lower 32 bits (waits)
return uint32(p.wrCounter.Load() & 0xFFFFFFFF)
}

func (p *pipe) Error() error {
if err := p.error.Load(); err != nil {
return err.error
Expand All @@ -1569,28 +1593,28 @@ func (p *pipe) Error() error {
func (p *pipe) Close() {
p.error.CompareAndSwap(nil, errClosing)
block := atomic.AddInt32(&p.blcksig, 1)
waits := atomic.AddInt32(&p.waits, 1)
waits := p.incrWaits()
stopping1 := atomic.CompareAndSwapInt32(&p.state, 0, 2)
stopping2 := atomic.CompareAndSwapInt32(&p.state, 1, 2)
if p.queue != nil {
if stopping1 && waits == 1 { // make sure there is no sync read
p.background()
}
if block == 1 && (stopping1 || stopping2) { // make sure there is no block cmd
atomic.AddInt32(&p.waits, 1)
p.incrWaits()
ch := p.queue.PutOne(cmds.PingCmd)
select {
case <-ch:
atomic.AddInt32(&p.waits, -1)
p.decrWaits()
case <-time.After(time.Second):
go func(ch chan RedisResult) {
<-ch
atomic.AddInt32(&p.waits, -1)
p.decrWaits()
}(ch)
}
}
}
atomic.AddInt32(&p.waits, -1)
p.decrWaits()
atomic.AddInt32(&p.blcksig, -1)
if p.pingTimer != nil {
p.pingTimer.Stop()
Expand Down
23 changes: 12 additions & 11 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4951,8 +4951,8 @@ func TestOngoingCancelContextInPipelineMode_Do(t *testing.T) {
}()
}

for atomic.LoadInt32(&p.waits) != 5 {
t.Logf("wait p.waits to be 5 %v", atomic.LoadInt32(&p.waits))
for p.loadWaits() != 5 {
t.Logf("wait p.waits to be 5 %v", p.loadWaits())
time.Sleep(time.Millisecond * 100)
}

Expand Down Expand Up @@ -4991,8 +4991,8 @@ func TestOngoingWriteTimeoutInPipelineMode_Do(t *testing.T) {
}
}()
}
for atomic.LoadInt32(&p.waits) != 5 {
t.Logf("wait p.waits to be 5 %v", atomic.LoadInt32(&p.waits))
for p.loadWaits() != 5 {
t.Logf("wait p.waits to be 5 %v", p.loadWaits())
time.Sleep(time.Millisecond * 100)
}
for atomic.LoadInt32(&timeout) != 5 {
Expand Down Expand Up @@ -5025,8 +5025,8 @@ func TestOngoingCancelContextInPipelineMode_DoMulti(t *testing.T) {
}()
}

for atomic.LoadInt32(&p.waits) != 5 {
t.Logf("wait p.waits to be 5 %v", atomic.LoadInt32(&p.waits))
for p.loadWaits() != 5 {
t.Logf("wait p.waits to be 5 %v", p.loadWaits())
time.Sleep(time.Millisecond * 100)
}

Expand Down Expand Up @@ -5065,8 +5065,8 @@ func TestOngoingWriteTimeoutInPipelineMode_DoMulti(t *testing.T) {
}
}()
}
for atomic.LoadInt32(&p.waits) != 5 {
t.Logf("wait p.waits to be 5 %v", atomic.LoadInt32(&p.waits))
for p.loadWaits() != 5 {
t.Logf("wait p.waits to be 5 %v", p.loadWaits())
time.Sleep(time.Millisecond * 100)
}
for atomic.LoadInt32(&timeout) != 5 {
Expand Down Expand Up @@ -5325,13 +5325,13 @@ func TestBackgroundPing(t *testing.T) {
p, mock, cancel, _ := setup(t, opt)
defer cancel()
time.Sleep(50 * time.Millisecond)
prev := atomic.LoadInt32(&p.recvs)
prev := p.loadRecvs()

for i := range 10 {
atomic.AddInt32(&p.blcksig, 1) // block
time.Sleep(timeout)
atomic.AddInt32(&p.blcksig, -1) // unblock
recv := atomic.LoadInt32(&p.recvs)
recv := p.loadRecvs()
if prev != recv {
t.Fatalf("round %d unexpect recv %v, need be equal to prev %v", i, recv, prev)
}
Expand All @@ -5344,7 +5344,8 @@ func TestBackgroundPing(t *testing.T) {
}()
for i := range 10 {
time.Sleep(timeout)
recv := atomic.LoadInt32(&p.recvs)
recv := p.loadRecvs()

if prev == recv {
t.Fatalf("round %d unexpect recv %v, need be different from prev %v", i, recv, prev)
}
Expand Down
Loading