2024-10-25 weekly #2740
AlexStocks
started this conversation in
General
Replies: 1 comment
-
grpc-go keepalive grpc-go keep-aliveclient-side新建一个HTTP2Client的时候会启动一个goroutine来处理keepalive() //internal/transport/http_client.go #403
if t.keepaliveEnabled {
t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive()
} keepalive的实现(https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_client.go#L1710) // keepalive running in a separate goroutine makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
var err error
defer func() {
close(t.keepaliveDone)
if err != nil {
t.Close(err)
}
}()
p := &ping{data: [8]byte{}}
// True iff a ping has been sent, and no data has been received since then.
outstandingPing := false
// Amount of time remaining before which we should receive an ACK for the
// last sent ping.
timeoutLeft := time.Duration(0)
// Records the last value of t.lastRead before we go block on the timer.
// This is required to check for read activity since then.
prevNano := time.Now().UnixNano()
timer := time.NewTimer(t.kp.Time)
for {
select {
case <-timer.C:
lastRead := atomic.LoadInt64(&t.lastRead)
if lastRead > prevNano {
// There has been read activity since the last time we were here.
outstandingPing = false
// Next timer should fire at kp.Time seconds from lastRead time.
timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
prevNano = lastRead
continue
}
if outstandingPing && timeoutLeft <= 0 {
err = connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout")
return
}
t.mu.Lock()
if t.state == closing {
// If the transport is closing, we should exit from the
// keepalive goroutine here. If not, we could have a race
// between the call to Signal() from Close() and the call to
// Wait() here, whereby the keepalive goroutine ends up
// blocking on the condition variable which will never be
// signalled again.
t.mu.Unlock()
return
}
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
// If a ping was sent out previously (because there were active
// streams at that point) which wasn't acked and its timeout
// hadn't fired, but we got here and are about to go dormant,
// we should make sure that we unconditionally send a ping once
// we awaken.
outstandingPing = false
t.kpDormant = true
t.kpDormancyCond.Wait()
}
t.kpDormant = false
t.mu.Unlock()
// We get here either because we were dormant and a new stream was
// created which unblocked the Wait() call, or because the
// keepalive timer expired. In both cases, we need to send a ping.
if !outstandingPing {
if channelz.IsOn() {
t.channelz.SocketMetrics.KeepAlivesSent.Add(1)
}
t.controlBuf.put(p)
timeoutLeft = t.kp.Timeout
outstandingPing = true
}
// The amount of time to sleep here is the minimum of kp.Time and
// timeoutLeft. This will ensure that we wait only for kp.Time
// before sending out the next ping (for cases where the ping is
// acked).
sleepDuration := min(t.kp.Time, timeoutLeft)
timeoutLeft -= sleepDuration
timer.Reset(sleepDuration)
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
}
}
} server-sidefunc (t *http2Server) handlePing(f *http2.PingFrame) {
if f.IsAck() {
if f.Data == goAwayPing.data && t.drainEvent != nil {
t.drainEvent.Fire()
return
}
// Maybe it's a BDP ping.
if t.bdpEst != nil {
t.bdpEst.calculate(f.Data)
}
return
}
pingAck := &ping{ack: true}
copy(pingAck.data[:], f.Data[:])
t.controlBuf.put(pingAck)
now := time.Now()
defer func() {
t.lastPingAt = now
}()
// A reset ping strikes means that we don't need to check for policy
// violation for this ping and the pingStrikes counter should be set
// to 0.
if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
t.pingStrikes = 0
return
}
t.mu.Lock()
ns := len(t.activeStreams)
t.mu.Unlock()
if ns < 1 && !t.kep.PermitWithoutStream {
// Keepalive shouldn't be active thus, this new ping should
// have come after at least defaultPingTimeout.
if t.lastPingAt.Add(defaultPingTimeout).After(now) {
t.pingStrikes++
}
} else {
// Check if keepalive policy is respected.
if t.lastPingAt.Add(t.kep.MinTime).After(now) {
t.pingStrikes++
}
}
if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
}
} |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
2. triple与dubbo3/grpc之间的关系
1. config与global.config配置整合,当初global.config从设计上是取代config,目前还未全部完成
2. config掏空,保留老用户兼容性
3. config包中的启动动作迁移,移到server/client等位置
4. 开放一些协议层配置
config/global
server/client
protocol
registry/metadata
1 当前情况,支持 proto 文件生成 Java 文件;
2 反向实现 Java 转成 proto 文件
想推进新版本的编写,交叉编译
● Members ●
Beta Was this translation helpful? Give feedback.
All reactions