Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancer/rls: Add picker and cache unit tests for RLS Metrics #7614

Merged
merged 6 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions balancer/rls/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,61 @@ func (s) TestDataCache_ResetBackoffState(t *testing.T) {
t.Fatalf("unexpected diff in backoffState for cache entry after dataCache.resetBackoffState(): %s", diff)
}
}

func (s) TestDataCache_Metrics(t *testing.T) {
cacheEntriesMetricsTests := []*cacheEntry{
{size: 1},
{size: 2},
{size: 3},
{size: 4},
{size: 5},
}
tmr := stats.NewTestMetricsRecorder()
dc := newDataCache(50, nil, tmr, "")

dc.updateRLSServerTarget("rls-server-target")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntriesMetricsTests[i])
}

const cacheEntriesKey = "grpc.lb.rls.cache_entries"
const cacheSizeKey = "grpc.lb.rls.cache_size"
// 5 total entries which add up to 15 size, so should record that.
if got, _ := tmr.Metric(cacheEntriesKey); got != 5 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 5)
}
if got, _ := tmr.Metric(cacheSizeKey); got != 15 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 15)
}

// Resize down the cache to 2 entries (deterministic as based of LRU).
dc.resize(9)
if got, _ := tmr.Metric(cacheEntriesKey); got != 2 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 2)
}
if got, _ := tmr.Metric(cacheSizeKey); got != 9 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 9)
}

// Update an entry to have size 6. This should reflect in the size metrics,
// which will increase by 1 to 11, while the number of cache entries should
// stay same. This write is deterministic and writes to the last one.
dc.updateEntrySize(cacheEntriesMetricsTests[4], 6)

if got, _ := tmr.Metric(cacheEntriesKey); got != 2 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 2)
}
if got, _ := tmr.Metric(cacheSizeKey); got != 10 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 10)
}

// Delete this scaled up cache key. This should scale down the cache to 1
// entries, and remove 6 size so cache size should be 4.
dc.deleteAndCleanup(cacheKeys[4], cacheEntriesMetricsTests[4])
if got, _ := tmr.Metric(cacheEntriesKey); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 1)
}
if got, _ := tmr.Metric(cacheSizeKey); got != 4 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 4)
}
}
168 changes: 167 additions & 1 deletion balancer/rls/picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/stubserver"
rlstest "google.golang.org/grpc/internal/testutils/rls"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
Expand Down Expand Up @@ -246,6 +248,133 @@ func (s) TestPick_DataCacheMiss_PendingEntryExists(t *testing.T) {
}
}

// Test_RLSDefaultTargetPicksMetric tests the default target picks metric. It
// configures an RLS Balancer which specifies to route to the default target in
// the RLS Configuration, and makes an RPC on a Channel containing this RLS
// Balancer. This test then asserts a default target picks metric is emitted,
// and target pick or failed pick metric is not emitted.
func (s) Test_RLSDefaultTargetPicksMetric(t *testing.T) {
// Start an RLS server and set the throttler to always throttle requests.
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, alwaysThrottlingThrottler())

// Build RLS service config with a default target.
rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
defBackendCh, defBackendAddress := startBackend(t)
rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress

// Register a manual resolver and push the RLS service config through it.
r := startManualResolverWithConfig(t, rlsConfig)

tmr := stats.NewTestMetricsRecorder()
cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()

// Make an RPC and ensure it gets routed to the default target.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh)

if got, _ := tmr.Metric("grpc.lb.rls.default_target_picks"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.rls.default_target_picks", got, 1)
}
if _, ok := tmr.Metric("grpc.lb.rls.target_picks"); ok {
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.target_picks")
}
if _, ok := tmr.Metric("grpc.lb.rls.failed_picks"); ok {
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.failed_picks")
}
}

// Test_RLSTargetPicksMetric tests the target picks metric. It configures an RLS
// Balancer which specifies to route to a target through a RouteLookupResponse,
// and makes an RPC on a Channel containing this RLS Balancer. This test then
// asserts a target picks metric is emitted, and default target pick or failed
// pick metric is not emitted.
func (s) Test_RLSTargetPicksMetric(t *testing.T) {
// Start an RLS server and set the throttler to never throttle requests.
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Build the RLS config without a default target.
rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)

// Start a test backend, and setup the fake RLS server to return this as a
// target in the RLS response.
testBackendCh, testBackendAddress := startBackend(t)
rlsServer.SetResponseCallback(func(context.Context, *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
})

// Register a manual resolver and push the RLS service config through it.
r := startManualResolverWithConfig(t, rlsConfig)

tmr := stats.NewTestMetricsRecorder()
// Dial the backend.
cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()

// Make an RPC and ensure it gets routed to the test backend.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
if got, _ := tmr.Metric("grpc.lb.rls.target_picks"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.rls.target_picks", got, 1)
}
if _, ok := tmr.Metric("grpc.lb.rls.default_target_picks"); ok {
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.default_target_picks")
}
if _, ok := tmr.Metric("grpc.lb.rls.failed_picks"); ok {
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.failed_picks")
}
}

// Test_RLSFailedPicksMetric tests the failed picks metric. It configures an RLS
// Balancer to fail a pick with unavailable, and makes an RPC on a Channel
// containing this RLS Balancer. This test then asserts a failed picks metric is
// emitted, and default target pick or target pick metric is not emitted.
func (s) Test_RLSFailedPicksMetric(t *testing.T) {
// Start an RLS server and set the throttler to never throttle requests.
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Build an RLS config without a default target.
rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)

// Register a manual resolver and push the RLS service config through it.
r := startManualResolverWithConfig(t, rlsConfig)

tmr := stats.NewTestMetricsRecorder()
// Dial the backend.
cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()

// Make an RPC and expect it to fail with deadline exceeded error. We use a
// smaller timeout to ensure that the test doesn't run very long.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, errors.New("RLS response's target list does not contain any entries for key"))

if got, _ := tmr.Metric("grpc.lb.rls.failed_picks"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.rls.failed_picks", got, 1)
}
if _, ok := tmr.Metric("grpc.lb.rls.target_picks"); ok {
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.target_picks")
}
if _, ok := tmr.Metric("grpc.lb.rls.default_target_picks"); ok {
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.default_target_picks")
}
}

// Test verifies the scenario where there is a matching entry in the data cache
// which is valid and there is no pending request. The pick is expected to be
// delegated to the child policy.
Expand All @@ -256,7 +385,6 @@ func (s) TestPick_DataCacheHit_NoPendingEntry_ValidEntry(t *testing.T) {

// Build the RLS config without a default target.
rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)

// Start a test backend, and setup the fake RLS server to return this as a
// target in the RLS response.
testBackendCh, testBackendAddress := startBackend(t)
Expand Down Expand Up @@ -881,3 +1009,41 @@ func TestIsFullMethodNameValid(t *testing.T) {
})
}
}

// Tests the conversion of the child pickers error to the pick result attribute.
func (s) TestChildPickResultError(t *testing.T) {
tests := []struct {
name string
err error
want string
}{
{
name: "nil",
err: nil,
want: "complete",
},
{
name: "errNoSubConnAvailable",
err: balancer.ErrNoSubConnAvailable,
want: "queue",
},
{
name: "status error",
err: status.Error(codes.Unimplemented, "unimplemented"),
want: "drop",
},
{
name: "other error",
err: errors.New("some error"),
want: "fail",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if got := errToPickResult(test.err); got != test.want {
t.Fatalf("errToPickResult(%q) = %v, want %v", test.err, got, test.want)
}
})
}
}
22 changes: 15 additions & 7 deletions balancer/weightedroundrobin/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ var (
OOBReportingPeriod: stringp("0.005s"),
BlackoutPeriod: stringp("0s"),
WeightExpirationPeriod: stringp("60s"),
WeightUpdatePeriod: stringp(".050s"),
WeightUpdatePeriod: stringp("30s"),
dfawley marked this conversation as resolved.
Show resolved Hide resolved
ErrorUtilizationPenalty: float64p(0),
}
)
Expand Down Expand Up @@ -224,8 +224,8 @@ func (s) TestWRRMetricsBasic(t *testing.T) {
srv := startServer(t, reportCall)
sc := svcConfig(t, testMetricsConfig)

mr := stats.NewTestMetricsRecorder(t)
if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil {
tmr := stats.NewTestMetricsRecorder()
if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(tmr)); err != nil {
t.Fatalf("Error starting client: %v", err)
}
srv.callMetrics.SetQPS(float64(1))
Expand All @@ -234,12 +234,20 @@ func (s) TestWRRMetricsBasic(t *testing.T) {
t.Fatalf("Error from EmptyCall: %v", err)
}

mr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) // Falls back because only one SubConn.
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) // The endpoint weight has not expired so this is 0 (never emitted).
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1)
if got, _ := tmr.Metric("grpc.lb.wrr.rr_fallback"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.rr_fallback", got, 1)
}
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); got != 0 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, 0)
}
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_not_yet_usable"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_not_yet_usable", got, 1)
}
// Unusable, so no endpoint weight. Due to only one SubConn, this will never
// update the weight. Thus, this will stay 0.
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); got != 0 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, 0)
}
}

// Tests two addresses with ORCA reporting disabled (should fall back to pure
Expand Down
24 changes: 17 additions & 7 deletions balancer/weightedroundrobin/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
tmr := stats.NewTestMetricsRecorder(t)
tmr := stats.NewTestMetricsRecorder()
wsc := &weightedSubConn{
metricsRecorder: tmr,
weightVal: 3,
Expand All @@ -117,9 +117,15 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) {
}
wsc.weight(test.nowTime, test.weightExpirationPeriod, test.blackoutPeriod, true)

tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", test.endpointWeightStaleWant)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", test.endpointWeightNotYetUsableWant)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", test.endpointWeightWant)
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); got != test.endpointWeightStaleWant {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, test.endpointWeightStaleWant)
}
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_not_yet_usable"); got != test.endpointWeightNotYetUsableWant {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_not_yet_usable", got, test.endpointWeightNotYetUsableWant)
}
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); got != test.endpointWeightStaleWant {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, test.endpointWeightStaleWant)
}
})
}

Expand All @@ -130,7 +136,7 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) {
// with no weights. Both of these should emit a count metric for round robin
// fallback.
func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) {
tmr := stats.NewTestMetricsRecorder(t)
tmr := stats.NewTestMetricsRecorder()
wsc := &weightedSubConn{
metricsRecorder: tmr,
weightVal: 0,
Expand All @@ -147,7 +153,9 @@ func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) {
// There is only one SubConn, so no matter if the SubConn has a weight or
// not will fallback to round robin.
p.regenerateScheduler()
tmr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1)
if got, _ := tmr.Metric("grpc.lb.wrr.rr_fallback"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.rr_fallback", got, 1)
}
tmr.ClearMetrics()

// With two SubConns, if neither of them have weights, it will also fallback
Expand All @@ -159,5 +167,7 @@ func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) {
}
p.subConns = append(p.subConns, wsc2)
p.regenerateScheduler()
tmr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1)
if got, _ := tmr.Metric("grpc.lb.wrr.rr_fallback"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.rr_fallback", got, 1)
}
}
Loading