@@ -25,18 +25,20 @@ import (
25
25
"golang.org/x/net/context"
26
26
"google.golang.org/grpc"
27
27
"google.golang.org/grpc/grpclog"
28
+ "google.golang.org/grpc/health/grpc_health_v1"
28
29
"google.golang.org/grpc/transport"
29
30
)
30
31
31
32
const (
32
- maxConnectionRetryCount = 3
33
- connectionRetryDelay = 3 * time .Second
34
- containerdShutdownTimeout = 15 * time .Second
35
- containerdBinary = "docker-containerd"
36
- containerdPidFilename = "docker-containerd.pid"
37
- containerdSockFilename = "docker-containerd.sock"
38
- containerdStateDir = "containerd"
39
- eventTimestampFilename = "event.ts"
33
+ maxConnectionRetryCount = 3
34
+ connectionRetryDelay = 3 * time .Second
35
+ containerdHealthCheckTimeout = 3 * time .Second
36
+ containerdShutdownTimeout = 15 * time .Second
37
+ containerdBinary = "docker-containerd"
38
+ containerdPidFilename = "docker-containerd.pid"
39
+ containerdSockFilename = "docker-containerd.sock"
40
+ containerdStateDir = "containerd"
41
+ eventTimestampFilename = "event.ts"
40
42
)
41
43
42
44
type remote struct {
@@ -134,37 +136,41 @@ func (r *remote) UpdateOptions(options ...RemoteOption) error {
134
136
135
137
func (r * remote ) handleConnectionChange () {
136
138
var transientFailureCount = 0
137
- state := grpc .Idle
139
+
140
+ ticker := time .NewTicker (500 * time .Millisecond )
141
+ defer ticker .Stop ()
142
+ healthClient := grpc_health_v1 .NewHealthClient (r .rpcConn )
143
+
138
144
for {
139
- s , err := r .rpcConn .WaitForStateChange (context .Background (), state )
140
- if err != nil {
141
- break
145
+ <- ticker .C
146
+ ctx , cancel := context .WithTimeout (context .Background (), containerdHealthCheckTimeout )
147
+ _ , err := healthClient .Check (ctx , & grpc_health_v1.HealthCheckRequest {})
148
+ cancel ()
149
+ if err == nil {
150
+ continue
142
151
}
143
- state = s
144
- logrus .Debugf ("libcontainerd: containerd connection state change : %v" , s )
152
+
153
+ logrus .Debugf ("libcontainerd: containerd health check returned error : %v" , err )
145
154
146
155
if r .daemonPid != - 1 {
147
- switch state {
148
- case grpc .TransientFailure :
149
- // Reset state to be notified of next failure
150
- transientFailureCount ++
151
- if transientFailureCount >= maxConnectionRetryCount {
152
- transientFailureCount = 0
153
- if utils .IsProcessAlive (r .daemonPid ) {
154
- utils .KillProcess (r .daemonPid )
155
- }
156
- <- r .daemonWaitCh
157
- if err := r .runContainerdDaemon (); err != nil { //FIXME: Handle error
158
- logrus .Errorf ("libcontainerd: error restarting containerd: %v" , err )
159
- }
160
- } else {
161
- state = grpc .Idle
162
- time .Sleep (connectionRetryDelay )
163
- }
164
- case grpc .Shutdown :
156
+ if strings .Contains (err .Error (), "is closing" ) {
165
157
// Well, we asked for it to stop, just return
166
158
return
167
159
}
160
+ // all other errors are transient
161
+ // Reset state to be notified of next failure
162
+ transientFailureCount ++
163
+ if transientFailureCount >= maxConnectionRetryCount {
164
+ transientFailureCount = 0
165
+ if utils .IsProcessAlive (r .daemonPid ) {
166
+ utils .KillProcess (r .daemonPid )
167
+ }
168
+ <- r .daemonWaitCh
169
+ if err := r .runContainerdDaemon (); err != nil { //FIXME: Handle error
170
+ logrus .Errorf ("libcontainerd: error restarting containerd: %v" , err )
171
+ }
172
+ continue
173
+ }
168
174
}
169
175
}
170
176
}
0 commit comments