Skip to content

Commit

Permalink
proxy: auto flush
Browse files Browse the repository at this point in the history
  • Loading branch information
spinlock committed Sep 9, 2016
1 parent 2b5b8b9 commit 33df86e
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions pkg/proxy/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Session struct {
total atomic2.Int64
flush struct {
n uint
tick int64
nano int64
}
}
start sync.Once
Expand Down Expand Up @@ -182,25 +182,28 @@ func (s *Session) loopWriter(tasks <-chan *Request) (err error) {
s.flushOpStats(true)
}()

p := s.Conn.FlushEncoder()
p.MaxInterval = time.Millisecond
p.MaxBuffered = 256

for r := range tasks {
resp, err := s.handleResponse(r)
if err != nil {
resp = redis.NewErrorf("ERR handle response, %s", err)
s.Conn.Encode(resp, true)
return s.incrOpFails(err)
}
if err := s.Conn.Encode(resp, false); err != nil {
if err := p.Encode(resp); err != nil {
return s.incrOpFails(err)
} else {
r.Release()
}
if len(tasks) != 0 {
continue
}
if err := s.Conn.Flush(); err != nil {
if err := p.Flush(len(tasks) == 0); err != nil {
return s.incrOpFails(err)
}
s.flushOpStats(false)
if len(tasks) == 0 {
s.flushOpStats(false)
}
}
return nil
}
Expand Down Expand Up @@ -583,13 +586,14 @@ func (s *Session) incrOpStats(r *Request) {
}

func (s *Session) flushOpStats(force bool) {
var tick = time.Now().UnixNano() / (int64(time.Millisecond) * 100)
var nano = time.Now().UnixNano()
if !force {
if s.stats.flush.tick == tick {
const period = int64(time.Millisecond) * 100
if d := nano - s.stats.flush.nano; d < period {
return
}
}
s.stats.flush.tick = tick
s.stats.flush.nano = nano

incrOpTotal(s.stats.total.Swap(0))
for _, e := range s.stats.opmap {
Expand Down

0 comments on commit 33df86e

Please sign in to comment.