Skip to content

Commit

Permalink
nfd-worker: Add liveness probe
Browse files Browse the repository at this point in the history
Signed-off-by: Oleg Zhurakivskyy <[email protected]>
  • Loading branch information
ozhuraki committed Mar 11, 2024
1 parent 890a029 commit 90b1edb
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 1 deletion.
4 changes: 3 additions & 1 deletion cmd/nfd-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import (

const (
// ProgramName is the canonical name of this program
ProgramName = "nfd-worker"
ProgramName = "nfd-worker"
GrpcHealthPort = 8082
)

func main() {
Expand Down Expand Up @@ -73,6 +74,7 @@ func main() {
utils.ConfigureGrpcKlog()

// Get new NfdWorker instance
args.GrpcHealthPort = GrpcHealthPort

Check warning on line 77 in cmd/nfd-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/nfd-worker/main.go#L77

Added line #L77 was not covered by tests
instance, err := worker.NewNfdWorker(args)
if err != nil {
klog.ErrorS(err, "failed to initialize NfdWorker instance")
Expand Down
11 changes: 11 additions & 0 deletions deployment/base/worker-daemonset/worker-daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ spec:
- name: nfd-worker
image: gcr.io/k8s-staging-nfd/node-feature-discovery:master
imagePullPolicy: Always
livenessProbe:
grpc:
port: 8082
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
grpc:
port: 8082
initialDelaySeconds: 5
periodSeconds: 10
failureThreshold: 10
command:
- "nfd-worker"
args:
Expand Down
43 changes: 43 additions & 0 deletions pkg/nfd-worker/nfd-worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"regexp"
Expand All @@ -33,6 +34,8 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/validation"
Expand Down Expand Up @@ -104,6 +107,7 @@ type Args struct {
Server string
ServerNameOverride string
MetricsPort int
GrpcHealthPort int

Overrides ConfigOverrideArgs
}
Expand All @@ -124,6 +128,7 @@ type nfdWorker struct {
config *NFDConfig
kubernetesNamespace string
grpcClient pb.LabelerClient
healthServer *grpc.Server
nfdClient *nfdclient.Clientset
stop chan struct{} // channel for signaling stop
featureSources []source.FeatureSource
Expand Down Expand Up @@ -187,6 +192,29 @@ func (i *infiniteTicker) Reset(d time.Duration) {
}
}

func (w *nfdWorker) startGrpcHealthServer(errChan chan<- error) error {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", w.args.GrpcHealthPort))
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}

Check warning on line 199 in pkg/nfd-worker/nfd-worker.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-worker/nfd-worker.go#L195-L199

Added lines #L195 - L199 were not covered by tests

s := grpc.NewServer()
grpc_health_v1.RegisterHealthServer(s, health.NewServer())
klog.InfoS("gRPC health server serving", "port", w.args.GrpcHealthPort)

go func() {
defer func() {
lis.Close()
}()
if err := s.Serve(lis); err != nil {
errChan <- fmt.Errorf("gRPC health server exited with an error: %w", err)
}
klog.InfoS("gRPC health server stopped")

Check warning on line 212 in pkg/nfd-worker/nfd-worker.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-worker/nfd-worker.go#L201-L212

Added lines #L201 - L212 were not covered by tests
}()
w.healthServer = s
return nil

Check warning on line 215 in pkg/nfd-worker/nfd-worker.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-worker/nfd-worker.go#L214-L215

Added lines #L214 - L215 were not covered by tests
}

// Run feature discovery.
func (w *nfdWorker) runFeatureDiscovery() error {
discoveryStart := time.Now()
Expand Down Expand Up @@ -262,8 +290,20 @@ func (w *nfdWorker) Run() error {
return nil
}

grpcErr := make(chan error, 1)

// Start gRPC server for liveness probe (at this point we're "live")
if w.args.GrpcHealthPort != 0 {
if err := w.startGrpcHealthServer(grpcErr); err != nil {
return fmt.Errorf("failed to start gRPC health server: %w", err)
}

Check warning on line 299 in pkg/nfd-worker/nfd-worker.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-worker/nfd-worker.go#L297-L299

Added lines #L297 - L299 were not covered by tests
}

for {
select {
case err := <-grpcErr:
return fmt.Errorf("error in serving gRPC: %w", err)

Check warning on line 305 in pkg/nfd-worker/nfd-worker.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-worker/nfd-worker.go#L304-L305

Added lines #L304 - L305 were not covered by tests

case <-labelTrigger.C:
err = w.runFeatureDiscovery()
if err != nil {
Expand Down Expand Up @@ -294,6 +334,9 @@ func (w *nfdWorker) Run() error {

case <-w.stop:
klog.InfoS("shutting down nfd-worker")
if w.healthServer != nil {
w.healthServer.GracefulStop()
}

Check warning on line 339 in pkg/nfd-worker/nfd-worker.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-worker/nfd-worker.go#L338-L339

Added lines #L338 - L339 were not covered by tests
configWatch.Close()
w.certWatch.Close()
return nil
Expand Down

0 comments on commit 90b1edb

Please sign in to comment.