@@ -21,6 +21,7 @@ import (
21
21
"crypto/x509"
22
22
"encoding/json"
23
23
"fmt"
24
+ "net"
24
25
"os"
25
26
"path/filepath"
26
27
"regexp"
@@ -33,6 +34,8 @@ import (
33
34
"google.golang.org/grpc"
34
35
"google.golang.org/grpc/credentials"
35
36
"google.golang.org/grpc/credentials/insecure"
37
+ "google.golang.org/grpc/health"
38
+ "google.golang.org/grpc/health/grpc_health_v1"
36
39
"k8s.io/apimachinery/pkg/api/errors"
37
40
"k8s.io/apimachinery/pkg/types"
38
41
"k8s.io/apimachinery/pkg/util/validation"
@@ -93,17 +96,18 @@ type Labels map[string]string
93
96
94
97
// Args are the command line arguments of NfdWorker.
95
98
type Args struct {
96
- CaFile string
97
- CertFile string
98
- ConfigFile string
99
- KeyFile string
100
- Klog map [string ]* utils.KlogFlagVal
101
- Kubeconfig string
102
- Oneshot bool
103
- Options string
104
- Server string
105
- ServerNameOverride string
106
- MetricsPort int
99
+ CaFile string
100
+ CertFile string
101
+ ConfigFile string
102
+ KeyFile string
103
+ Klog map [string ]* utils.KlogFlagVal
104
+ Kubeconfig string
105
+ Oneshot bool
106
+ Options string
107
+ Server string
108
+ ServerNameOverride string
109
+ MetricsPort int
110
+ GrpcHealthPort int
107
111
108
112
Overrides ConfigOverrideArgs
109
113
}
@@ -124,6 +128,7 @@ type nfdWorker struct {
124
128
config * NFDConfig
125
129
kubernetesNamespace string
126
130
grpcClient pb.LabelerClient
131
+ healthServer * grpc.Server
127
132
nfdClient * nfdclient.Clientset
128
133
stop chan struct {} // channel for signaling stop
129
134
featureSources []source.FeatureSource
@@ -187,6 +192,29 @@ func (i *infiniteTicker) Reset(d time.Duration) {
187
192
}
188
193
}
189
194
195
+ func (w * nfdWorker ) startGrpcHealthServer (errChan chan <- error ) error {
196
+ lis , err := net .Listen ("tcp" , fmt .Sprintf (":%d" , w .args .GrpcHealthPort ))
197
+ if err != nil {
198
+ return fmt .Errorf ("failed to listen: %w" , err )
199
+ }
200
+
201
+ s := grpc .NewServer ()
202
+ grpc_health_v1 .RegisterHealthServer (s , health .NewServer ())
203
+ klog .InfoS ("gRPC health server serving" , "port" , w .args .GrpcHealthPort )
204
+
205
+ go func () {
206
+ defer func () {
207
+ lis .Close ()
208
+ }()
209
+ if err := s .Serve (lis ); err != nil {
210
+ errChan <- fmt .Errorf ("gRPC health server exited with an error: %w" , err )
211
+ }
212
+ klog .InfoS ("gRPC health server stopped" )
213
+ }()
214
+ w .healthServer = s
215
+ return nil
216
+ }
217
+
190
218
// Run feature discovery.
191
219
func (w * nfdWorker ) runFeatureDiscovery () error {
192
220
discoveryStart := time .Now ()
@@ -262,8 +290,20 @@ func (w *nfdWorker) Run() error {
262
290
return nil
263
291
}
264
292
293
+ grpcErr := make (chan error , 1 )
294
+
295
+ // Start gRPC server for liveness probe (at this point we're "live")
296
+ if w .args .GrpcHealthPort != 0 {
297
+ if err := w .startGrpcHealthServer (grpcErr ); err != nil {
298
+ return fmt .Errorf ("failed to start gRPC health server: %w" , err )
299
+ }
300
+ }
301
+
265
302
for {
266
303
select {
304
+ case err := <- grpcErr :
305
+ return fmt .Errorf ("error in serving gRPC: %w" , err )
306
+
267
307
case <- labelTrigger .C :
268
308
err = w .runFeatureDiscovery ()
269
309
if err != nil {
@@ -294,6 +334,9 @@ func (w *nfdWorker) Run() error {
294
334
295
335
case <- w .stop :
296
336
klog .InfoS ("shutting down nfd-worker" )
337
+ if w .healthServer != nil {
338
+ w .healthServer .GracefulStop ()
339
+ }
297
340
configWatch .Close ()
298
341
w .certWatch .Close ()
299
342
return nil
0 commit comments