Skip to content

Commit

Permalink
health: shutdown server to set NOT_SERVING and disallow future updates (
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl authored Dec 10, 2018
1 parent e242249 commit aad0ede
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 0 deletions.
25 changes: 25 additions & 0 deletions health/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,16 @@ import (
"google.golang.org/grpc/codes"
healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/status"
)

// Server implements `service Health`.
type Server struct {
// If shutdownEvent has fired, it's expected all serving status is
// NOT_SERVING, and will stay in NOT_SERVING.
shutdownEvent *grpcsync.Event

mu sync.Mutex
// statusMap stores the serving status of the services this Server monitors.
statusMap map[string]healthpb.HealthCheckResponse_ServingStatus
Expand All @@ -43,6 +48,8 @@ type Server struct {
// NewServer returns a new Server.
func NewServer() *Server {
return &Server{
shutdownEvent: grpcsync.NewEvent(),

statusMap: map[string]healthpb.HealthCheckResponse_ServingStatus{"": healthpb.HealthCheckResponse_SERVING},
updates: make(map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus),
}
Expand Down Expand Up @@ -110,7 +117,14 @@ func (s *Server) Watch(in *healthpb.HealthCheckRequest, stream healthgrpc.Health
func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
s.mu.Lock()
defer s.mu.Unlock()
if s.shutdownEvent.HasFired() {
return
}

s.setServingStatusLocked(service, servingStatus)
}

func (s *Server) setServingStatusLocked(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
s.statusMap[service] = servingStatus
for _, update := range s.updates[service] {
// Clears previous updates, that are not sent to the client, from the channel.
Expand All @@ -123,3 +137,14 @@ func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthC
update <- servingStatus
}
}

// Shutdown sets all serving status to NOT_SERVING, and configures the server to
// ignore all future status changes.
func (s *Server) Shutdown() {
s.mu.Lock()
defer s.mu.Unlock()
s.shutdownEvent.Fire()
for service := range s.statusMap {
s.setServingStatusLocked(service, healthpb.HealthCheckResponse_NOT_SERVING)
}
}
62 changes: 62 additions & 0 deletions health/server_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package health

import (
"sync"
"testing"
"time"

healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

func TestShutdown(t *testing.T) {
const testService = "tteesstt"
s := NewServer()
s.SetServingStatus(testService, healthpb.HealthCheckResponse_SERVING)

status := s.statusMap[testService]
if status != healthpb.HealthCheckResponse_SERVING {
t.Fatalf("status for %s is %v, want %v", testService, status, healthpb.HealthCheckResponse_SERVING)
}

var wg sync.WaitGroup
wg.Add(2)
// Run SetServingStatus and Shutdown in parallel.
go func() {
for i := 0; i < 1000; i++ {
s.SetServingStatus(testService, healthpb.HealthCheckResponse_SERVING)
time.Sleep(time.Microsecond)
}
wg.Done()
}()
go func() {
time.Sleep(300 * time.Microsecond)
s.Shutdown()
wg.Done()
}()
wg.Wait()

s.mu.Lock()
status = s.statusMap[testService]
s.mu.Unlock()
if status != healthpb.HealthCheckResponse_NOT_SERVING {
t.Fatalf("status for %s is %v, want %v", testService, status, healthpb.HealthCheckResponse_NOT_SERVING)
}
}

0 comments on commit aad0ede

Please sign in to comment.