Skip to content

Commit

Permalink
etcdserver: add linearizable_read check to readyz.
Browse files Browse the repository at this point in the history
Signed-off-by: Siyuan Zhang <[email protected]>
  • Loading branch information
siyuanfoundation committed Dec 6, 2023
1 parent 293fc21 commit ebb7e79
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 25 deletions.
21 changes: 11 additions & 10 deletions server/etcdserver/api/etcdhttp/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ import (
"encoding/json"
"fmt"
"net/http"
"time"

"path"
"strings"
"time"

"go.uber.org/zap"

Expand Down Expand Up @@ -276,14 +275,19 @@ type CheckRegistry struct {

func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
reg := CheckRegistry{checkType: checkTypeLivez, checks: make(map[string]HealthCheck)}
reg.Register("serializable_read", serializableReadCheck(server))
reg.Register("serializable_read", readCheck(server, true /* serializable */))
reg.InstallHttpEndpoints(lg, mux)
}

func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
reg := CheckRegistry{checkType: checkTypeReadyz, checks: make(map[string]HealthCheck)}
reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT))
reg.Register("serializable_read", serializableReadCheck(server))
// serializable_read checks if local read is ok.
// linearizable_read checks if there is consensus in the cluster.
// Having both serializable_read and linearizable_read helps isolate the cause of problems if there is a read failure.
reg.Register("serializable_read", readCheck(server, true))
// linearizable_read check would be replaced by read_index check in 3.6
reg.Register("linearizable_read", readCheck(server, false))
reg.InstallHttpEndpoints(lg, mux)
}

Expand Down Expand Up @@ -447,13 +451,10 @@ func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) e
}
}

func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error {
func readCheck(srv ServerHealth, serializable bool) func(ctx context.Context) error {
return func(ctx context.Context) error {
ctx = srv.AuthStore().WithRoot(ctx)
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: true})
if err != nil {
return fmt.Errorf("range error: %w", err)
}
return nil
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable})
return err
}
}
76 changes: 61 additions & 15 deletions server/etcdserver/api/etcdhttp/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@ import (

type fakeHealthServer struct {
fakeServer
apiError error
missingLeader bool
authStore auth.AuthStore
serializableReadError error
linearizableReadError error
missingLeader bool
authStore auth.AuthStore
}

func (s *fakeHealthServer) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, error) {
return nil, s.apiError
func (s *fakeHealthServer) Range(_ context.Context, req *pb.RangeRequest) (*pb.RangeResponse, error) {
if req.Serializable {
return nil, s.serializableReadError
}
return nil, s.linearizableReadError
}

func (s *fakeHealthServer) Config() config.ServerConfig {
Expand Down Expand Up @@ -132,10 +136,11 @@ func TestHealthHandler(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
HandleHealth(zaptest.NewLogger(t), mux, &fakeHealthServer{
fakeServer: fakeServer{alarms: tt.alarms},
apiError: tt.apiError,
missingLeader: tt.missingLeader,
authStore: auth.NewAuthStore(lg, be, nil, 0),
fakeServer: fakeServer{alarms: tt.alarms},
serializableReadError: tt.apiError,
linearizableReadError: tt.apiError,
missingLeader: tt.missingLeader,
authStore: auth.NewAuthStore(lg, be, nil, 0),
})
ts := httptest.NewServer(mux)
defer ts.Close()
Expand Down Expand Up @@ -171,8 +176,8 @@ func TestHttpSubPath(t *testing.T) {
mux := http.NewServeMux()
logger := zaptest.NewLogger(t)
s := &fakeHealthServer{
apiError: tt.apiError,
authStore: auth.NewAuthStore(logger, be, nil, 0),
serializableReadError: tt.apiError,
authStore: auth.NewAuthStore(logger, be, nil, 0),
}
HandleHealth(logger, mux, s)
ts := httptest.NewServer(mux)
Expand Down Expand Up @@ -255,23 +260,23 @@ func TestSerializableReadCheck(t *testing.T) {
healthCheckURL: "/livez",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[-]serializable_read failed: range error: Unexpected error"},
inResult: []string{"[-]serializable_read failed: Unexpected error"},
},
{
name: "Not ready if range api is not available",
healthCheckURL: "/readyz",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[-]serializable_read failed: range error: Unexpected error"},
inResult: []string{"[-]serializable_read failed: Unexpected error"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mux := http.NewServeMux()
logger := zaptest.NewLogger(t)
s := &fakeHealthServer{
apiError: tt.apiError,
authStore: auth.NewAuthStore(logger, be, nil, 0),
serializableReadError: tt.apiError,
authStore: auth.NewAuthStore(logger, be, nil, 0),
}
HandleHealth(logger, mux, s)
ts := httptest.NewServer(mux)
Expand All @@ -282,6 +287,47 @@ func TestSerializableReadCheck(t *testing.T) {
}
}

func TestLinearizableReadCheck(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
tests := []healthTestCase{
{
name: "Alive normal",
healthCheckURL: "/livez?verbose",
expectStatusCode: http.StatusOK,
inResult: []string{"[+]serializable_read ok"},
},
{
name: "Alive if lineariable range api is not available",
healthCheckURL: "/livez",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusOK,
},
{
name: "Not ready if range api is not available",
healthCheckURL: "/readyz",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[+]serializable_read ok", "[-]linearizable_read failed: Unexpected error"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mux := http.NewServeMux()
logger := zaptest.NewLogger(t)
s := &fakeHealthServer{
linearizableReadError: tt.apiError,
authStore: auth.NewAuthStore(logger, be, nil, 0),
}
HandleHealth(logger, mux, s)
ts := httptest.NewServer(mux)
defer ts.Close()
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
checkMetrics(t, tt.healthCheckURL, "linearizable_read", tt.expectStatusCode)
})
}
}

func checkHttpResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) {
res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+url)})

Expand Down

0 comments on commit ebb7e79

Please sign in to comment.