Skip to content

Commit

Permalink
use update chan in watch
Browse files Browse the repository at this point in the history
  • Loading branch information
luluz66 committed Feb 27, 2025
1 parent 5288dfb commit 4eb6bec
Showing 1 changed file with 39 additions and 20 deletions.
59 changes: 39 additions & 20 deletions server/util/healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type HealthChecker struct {
shutdownFuncs []interfaces.CheckerFunc
readyToServe bool
shuttingDown bool

update chan struct{}
}

func NewHealthChecker(serverType string) *HealthChecker {
Expand All @@ -66,6 +68,7 @@ func NewHealthChecker(serverType string) *HealthChecker {
checkersMu: sync.Mutex{},
checkers: make(map[string]interfaces.Checker, 0),
lastStatus: make([]*serviceStatus, 0),
update: make(chan struct{}, 1),
}
sigTerm := make(chan os.Signal, 1)
go func() {
Expand Down Expand Up @@ -112,6 +115,7 @@ func (h *HealthChecker) handleShutdownFuncs() {
h.readyToServe = false
h.shuttingDown = true
h.mu.Unlock()
h.updateServingStatus()

// We use fmt here and below because this code is called from the
// signal handler and log.Printf can be a little wonky.
Expand Down Expand Up @@ -160,6 +164,7 @@ func (h *HealthChecker) AddHealthCheck(name string, f interfaces.Checker) {
h.mu.Lock()
h.readyToServe = false
h.mu.Unlock()
h.updateServingStatus()
}

func (h *HealthChecker) WaitForGracefulShutdown() {
Expand Down Expand Up @@ -227,6 +232,7 @@ func (h *HealthChecker) runHealthChecks(ctx context.Context) {
h.lastStatus = statusData
}
h.mu.Unlock()
h.updateServingStatus()

if newReadinessState != previousReadinessState {
log.Infof("HealthChecker transitioning from ready: %t => ready: %t", previousReadinessState, newReadinessState)
Expand Down Expand Up @@ -277,6 +283,34 @@ func (h *HealthChecker) LivenessHandler() http.Handler {
})
}

func (h *HealthChecker) servingStatus() hlpb.HealthCheckResponse_ServingStatus {
h.mu.RLock()
ready := h.readyToServe
shuttingDown := h.shuttingDown
h.mu.RUnlock()

if shuttingDown {
return hlpb.HealthCheckResponse_UNKNOWN
}

if ready {
return hlpb.HealthCheckResponse_SERVING
}
return hlpb.HealthCheckResponse_NOT_SERVING
}

func (h *HealthChecker) updateServingStatus() {
h.mu.Lock()

select {
case <-h.update:
default:
}
h.update <- struct{}{}

h.mu.Unlock()
}

func (h *HealthChecker) Check(ctx context.Context, req *hlpb.HealthCheckRequest) (*hlpb.HealthCheckResponse, error) {
// GRPC does not have indepenent health and readiness checks like HTTP does.
// An additional wrinkle is that AWS ALB's do not support sending a service
Expand All @@ -285,36 +319,21 @@ func (h *HealthChecker) Check(ctx context.Context, req *hlpb.HealthCheckRequest)
// - SERVING when the service is ready
// - NOT_SERVING when the service is not ready
// - UNKNOWN when the service is shutting down.
h.mu.RLock()
ready := h.readyToServe
shuttingDown := h.shuttingDown
h.mu.RUnlock()
rsp := &hlpb.HealthCheckResponse{}
if ready {
rsp.Status = hlpb.HealthCheckResponse_SERVING
} else {
rsp.Status = hlpb.HealthCheckResponse_NOT_SERVING
}

if shuttingDown {
rsp.Status = hlpb.HealthCheckResponse_UNKNOWN
rsp := &hlpb.HealthCheckResponse{
Status: h.servingStatus(),
}
return rsp, nil
}

func (h *HealthChecker) Watch(req *hlpb.HealthCheckRequest, stream hlpb.Health_WatchServer) error {
currentStatus := hlpb.HealthCheckResponse_SERVICE_UNKNOWN
ticker := time.NewTicker(healthCheckWatchInterval)

for {
select {
case <-stream.Context().Done():
return nil
case <-ticker.C:
rsp, err := h.Check(stream.Context(), req)
if err != nil {
return err
}
newStatus := rsp.GetStatus()
case <-h.update:
newStatus := h.servingStatus()
if newStatus != currentStatus {
currentStatus = newStatus
if err := stream.Send(&hlpb.HealthCheckResponse{Status: currentStatus}); err != nil {
Expand Down

0 comments on commit 4eb6bec

Please sign in to comment.