Skip to content

Commit

Permalink
feat: check if udf is running in liveness probe (#156)
Browse files Browse the repository at this point in the history
* feat: health check to check if udf is running

Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Sep 4, 2022
1 parent 81e76d8 commit 8dfedd8
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 7 deletions.
30 changes: 28 additions & 2 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type timestampedPending struct {
timestamp int64
}

// metricsServer runs an HTTP server to:
// 1. Expose metrics;
// 2. Serve an endpoint to execute health checks
type metricsServer struct {
vertex *dfv1.Vertex
rater isb.Ratable
Expand All @@ -62,34 +65,47 @@ type metricsServer struct {
refreshInterval time.Duration
// pendingInfo stores a list of pending/timestamp(seconds) information
pendingInfo *sharedqueue.OverflowQueue[timestampedPending]
// Functions that health check executes
healthCheckExecutors []func() error
}

type Option func(*metricsServer)

// WithRater sets the rater
func WithRater(r isb.Ratable) Option {
return func(m *metricsServer) {
m.rater = r
}
}

// WithLagReader sets the lag reader
func WithLagReader(r isb.LagReader) Option {
return func(m *metricsServer) {
m.lagReader = r
}
}

// WithRefreshInterval sets how often to refresh the rate and pending
func WithRefreshInterval(d time.Duration) Option {
return func(m *metricsServer) {
m.refreshInterval = d
}
}

// WithLookbackSeconds sets lookback seconds for avg rate and pending calculation
func WithLookbackSeconds(seconds int64) Option {
return func(m *metricsServer) {
m.lookbackSeconds = seconds
}
}

// WithHealthCheckExecutor appends a health check executor
func WithHealthCheckExecutor(f func() error) Option {
return func(m *metricsServer) {
m.healthCheckExecutors = append(m.healthCheckExecutors, f)
}
}

// NewMetricsServer returns a Prometheus metrics server instance, which can be used to start an HTTPS service to expose Prometheus metrics.
func NewMetricsServer(vertex *dfv1.Vertex, opts ...Option) *metricsServer {
m := new(metricsServer)
Expand Down Expand Up @@ -204,10 +220,20 @@ func (ms *metricsServer) Start(ctx context.Context) (func(ctx context.Context) e
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(204)
w.WriteHeader(http.StatusNoContent)
})
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(204)
if len(ms.healthCheckExecutors) > 0 {
for _, ex := range ms.healthCheckExecutors {
if err := ex(); err != nil {
log.Errorw("Failed execute health check", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
}
}
w.WriteHeader(http.StatusNoContent)
})
debugEnabled := os.Getenv(dfv1.EnvDebug)
if debugEnabled == "true" {
Expand Down
6 changes: 2 additions & 4 deletions pkg/udf/applier/uds_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,12 @@ func (u *UDSGRPCBasedUDF) CloseConn(ctx context.Context) error {

// WaitUntilReady waits until the client is connected.
func (u *UDSGRPCBasedUDF) WaitUntilReady(ctx context.Context) error {
var err error
for {
select {
case <-ctx.Done():
// using %v for ctx.Err() because only one %w can exist in the fmt.Errorf
return fmt.Errorf("failed to wait for ready: %v, %w", ctx.Err(), err)
return fmt.Errorf("failed on readiness check: %w", ctx.Err())
default:
if _, err = u.client.IsReady(ctx, &emptypb.Empty{}); err == nil {
if _, err := u.client.IsReady(ctx, &emptypb.Empty{}); err == nil {
return nil
}
time.Sleep(1 * time.Second)
Expand Down
10 changes: 9 additions & 1 deletion pkg/udf/udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/numaproj/numaflow/pkg/watermark/generic/jetstream"
"go.uber.org/zap"
Expand Down Expand Up @@ -164,7 +165,14 @@ func (u *UDFProcessor) Start(ctx context.Context) error {
}
}()

metricsOpts := []metrics.Option{metrics.WithLookbackSeconds(int64(u.VertexInstance.Vertex.Spec.Scale.GetLookbackSeconds()))}
metricsOpts := []metrics.Option{
metrics.WithLookbackSeconds(int64(u.VertexInstance.Vertex.Spec.Scale.GetLookbackSeconds())),
metrics.WithHealthCheckExecutor(func() error {
cctx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
return udfHandler.WaitUntilReady(cctx)
}),
}
if x, ok := reader.(isb.LagReader); ok {
metricsOpts = append(metricsOpts, metrics.WithLagReader(x))
}
Expand Down

0 comments on commit 8dfedd8

Please sign in to comment.