@@ -18,12 +18,16 @@ package nfdtopologyupdater
18
18
19
19
import (
20
20
"fmt"
21
+ "net"
21
22
"net/url"
22
23
"os"
23
24
"path/filepath"
24
25
25
26
"golang.org/x/net/context"
26
27
28
+ "google.golang.org/grpc"
29
+ "google.golang.org/grpc/health"
30
+ "google.golang.org/grpc/health/grpc_health_v1"
27
31
"k8s.io/apimachinery/pkg/api/errors"
28
32
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
33
"k8s.io/apimachinery/pkg/types"
@@ -58,6 +62,7 @@ type Args struct {
58
62
KubeConfigFile string
59
63
ConfigFile string
60
64
KubeletStateDir string
65
+ GrpcHealthPort int
61
66
62
67
Klog map [string ]* utils.KlogFlagVal
63
68
}
@@ -85,6 +90,7 @@ type nfdTopologyUpdater struct {
85
90
ownerRefs []metav1.OwnerReference
86
91
k8sClient k8sclient.Interface
87
92
kubeletConfigFunc func () (* kubeletconfigv1beta1.KubeletConfiguration , error )
93
+ healthServer * grpc.Server
88
94
}
89
95
90
96
// NewTopologyUpdater creates a new NfdTopologyUpdater instance.
@@ -128,6 +134,29 @@ func (w *nfdTopologyUpdater) detectTopologyPolicyAndScope() (string, string, err
128
134
return klConfig .TopologyManagerPolicy , klConfig .TopologyManagerScope , nil
129
135
}
130
136
137
+ func (w * nfdTopologyUpdater ) startGrpcHealthServer (errChan chan <- error ) error {
138
+ lis , err := net .Listen ("tcp" , fmt .Sprintf (":%d" , w .args .GrpcHealthPort ))
139
+ if err != nil {
140
+ return fmt .Errorf ("failed to listen: %w" , err )
141
+ }
142
+
143
+ s := grpc .NewServer ()
144
+ grpc_health_v1 .RegisterHealthServer (s , health .NewServer ())
145
+ klog .InfoS ("gRPC health server serving" , "port" , w .args .GrpcHealthPort )
146
+
147
+ go func () {
148
+ defer func () {
149
+ lis .Close ()
150
+ }()
151
+ if err := s .Serve (lis ); err != nil {
152
+ errChan <- fmt .Errorf ("gRPC health server exited with an error: %w" , err )
153
+ }
154
+ klog .InfoS ("gRPC health server stopped" )
155
+ }()
156
+ w .healthServer = s
157
+ return nil
158
+ }
159
+
131
160
// Run nfdTopologyUpdater. Returns if a fatal error is encountered, or, after
132
161
// one request if OneShot is set to 'true' in the updater args.
133
162
func (w * nfdTopologyUpdater ) Run () error {
@@ -187,8 +216,20 @@ func (w *nfdTopologyUpdater) Run() error {
187
216
return fmt .Errorf ("failed to obtain node resource information: %w" , err )
188
217
}
189
218
219
+ grpcErr := make (chan error , 1 )
220
+
221
+ // Start gRPC server for liveness probe (at this point we're "live")
222
+ if w .args .GrpcHealthPort != 0 {
223
+ if err := w .startGrpcHealthServer (grpcErr ); err != nil {
224
+ return fmt .Errorf ("failed to start gRPC health server: %w" , err )
225
+ }
226
+ }
227
+
190
228
for {
191
229
select {
230
+ case err := <- grpcErr :
231
+ return fmt .Errorf ("error in serving gRPC: %w" , err )
232
+
192
233
case info := <- w .eventSource :
193
234
klog .V (4 ).InfoS ("event received, scanning..." , "event" , info .Event )
194
235
scanResponse , err := resScan .Scan ()
@@ -217,6 +258,9 @@ func (w *nfdTopologyUpdater) Run() error {
217
258
218
259
case <- w .stop :
219
260
klog .InfoS ("shutting down nfd-topology-updater" )
261
+ if w .healthServer != nil {
262
+ w .healthServer .GracefulStop ()
263
+ }
220
264
return nil
221
265
}
222
266
}
0 commit comments