Skip to content

Commit

Permalink
Responded to Doug's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Sep 17, 2024
1 parent 69aadc5 commit e8ff69d
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 57 deletions.
17 changes: 8 additions & 9 deletions balancer/rls/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/testutils/stats"
)
Expand Down Expand Up @@ -263,19 +262,19 @@ func (s) TestDataCache_Metrics(t *testing.T) {
const cacheEntriesKey = "grpc.lb.rls.cache_entries"
const cacheSizeKey = "grpc.lb.rls.cache_size"
// 5 total entries which add up to 15 size, so should record that.
if got := tmr.Data[estats.Metric(cacheEntriesKey)]; got != 5 {
if got, _ := tmr.Metric(cacheEntriesKey); got != 5 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 5)
}
if got := tmr.Data[estats.Metric(cacheSizeKey)]; got != 15 {
if got, _ := tmr.Metric(cacheSizeKey); got != 15 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 15)
}

// Resize down the cache to 2 entries (deterministic as based of LRU).
dc.resize(9)
if got := tmr.Data[estats.Metric(cacheEntriesKey)]; got != 2 {
if got, _ := tmr.Metric(cacheEntriesKey); got != 2 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 2)
}
if got := tmr.Data[estats.Metric(cacheSizeKey)]; got != 9 {
if got, _ := tmr.Metric(cacheSizeKey); got != 9 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 9)
}

Expand All @@ -284,20 +283,20 @@ func (s) TestDataCache_Metrics(t *testing.T) {
// stay same. This write is deterministic and writes to the last one.
dc.updateEntrySize(cacheEntriesMetricsTests[4], 6)

if got := tmr.Data[estats.Metric(cacheEntriesKey)]; got != 2 {
if got, _ := tmr.Metric(cacheEntriesKey); got != 2 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 2)
}
if got := tmr.Data[estats.Metric(cacheSizeKey)]; got != 10 {
if got, _ := tmr.Metric(cacheSizeKey); got != 10 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 10)
}

// Delete this scaled up cache key. This should scale down the cache to 1
// entries, and remove 6 size so cache size should be 4.
dc.deleteAndCleanup(cacheKeys[4], cacheEntriesMetricsTests[4])
if got := tmr.Data[estats.Metric(cacheEntriesKey)]; got != 1 {
if got, _ := tmr.Metric(cacheEntriesKey); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 1)
}
if got := tmr.Data[estats.Metric(cacheSizeKey)]; got != 4 {
if got, _ := tmr.Metric(cacheSizeKey); got != 4 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 4)
}
}
19 changes: 9 additions & 10 deletions balancer/rls/picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/stubserver"
rlstest "google.golang.org/grpc/internal/testutils/rls"
Expand Down Expand Up @@ -279,13 +278,13 @@ func (s) Test_RLSDefaultTargetPicksMetric(t *testing.T) {
defer cancel()
makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh)

if got := tmr.Data[estats.Metric("grpc.lb.rls.default_target_picks")]; got != 1 {
if got, _ := tmr.Metric("grpc.lb.rls.default_target_picks"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.rls.default_target_picks", got, 1)
}
if _, ok := tmr.Data[estats.Metric("grpc.lb.rls.target_picks")]; ok {
if _, ok := tmr.Metric("grpc.lb.rls.target_picks"); ok {
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.target_picks")
}
if _, ok := tmr.Data[estats.Metric("grpc.lb.rls.failed_picks")]; ok {
if _, ok := tmr.Metric("grpc.lb.rls.failed_picks"); ok {
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.failed_picks")
}
}
Expand Down Expand Up @@ -325,13 +324,13 @@ func (s) Test_RLSTargetPicksMetric(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
if got := tmr.Data[estats.Metric("grpc.lb.rls.target_picks")]; got != 1 {
if got, _ := tmr.Metric("grpc.lb.rls.target_picks"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.rls.target_picks", got, 1)
}
if _, ok := tmr.Data[estats.Metric("grpc.lb.rls.default_target_picks")]; ok {
if _, ok := tmr.Metric("grpc.lb.rls.default_target_picks"); ok {
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.default_target_picks")
}
if _, ok := tmr.Data[estats.Metric("grpc.lb.rls.failed_picks")]; ok {
if _, ok := tmr.Metric("grpc.lb.rls.failed_picks"); ok {
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.failed_picks")
}
}
Expand Down Expand Up @@ -365,13 +364,13 @@ func (s) Test_RLSFailedPicksMetric(t *testing.T) {
defer cancel()
makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, errors.New("RLS response's target list does not contain any entries for key"))

if got := tmr.Data[estats.Metric("grpc.lb.rls.failed_picks")]; got != 1 {
if got, _ := tmr.Metric("grpc.lb.rls.failed_picks"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.rls.failed_picks", got, 1)
}
if _, ok := tmr.Data[estats.Metric("grpc.lb.rls.target_picks")]; ok {
if _, ok := tmr.Metric("grpc.lb.rls.target_picks"); ok {
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.target_picks")
}
if _, ok := tmr.Data[estats.Metric("grpc.lb.rls.default_target_picks")]; ok {
if _, ok := tmr.Metric("grpc.lb.rls.default_target_picks"); ok {
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.default_target_picks")
}
}
Expand Down
27 changes: 10 additions & 17 deletions balancer/weightedroundrobin/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"time"

"google.golang.org/grpc"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
Expand Down Expand Up @@ -225,8 +224,8 @@ func (s) TestWRRMetricsBasic(t *testing.T) {
srv := startServer(t, reportCall)
sc := svcConfig(t, testMetricsConfig)

mr := stats.NewTestMetricsRecorder()
if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil {
tmr := stats.NewTestMetricsRecorder()
if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(tmr)); err != nil {
t.Fatalf("Error starting client: %v", err)
}
srv.callMetrics.SetQPS(float64(1))
Expand All @@ -235,28 +234,22 @@ func (s) TestWRRMetricsBasic(t *testing.T) {
t.Fatalf("Error from EmptyCall: %v", err)
}

mr.Mu.Lock()
defer mr.Mu.Unlock()
if got := mr.Data[estats.Metric("grpc.lb.wrr.rr_fallback")]; got != 1 {
tmr.Mu.Lock()
defer tmr.Mu.Unlock()
if got, _ := tmr.Metric("grpc.lb.wrr.rr_fallback"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.rr_fallback", got, 1)
}
if got := mr.Data[estats.Metric("grpc.lb.wrr.endpoint_weight_stale")]; got != 0 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, 0)
if got, ok := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); !ok || got != 0 {
t.Fatalf("Unexpected data for metric %v, present: %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", ok, got, 0)
}
if got := mr.Data[estats.Metric("grpc.lb.wrr.endpoint_weight_not_yet_usable")]; got != 1 {
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_not_yet_usable"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_not_yet_usable", got, 1)
}
// Unusable, so no endpoint weight. Due to only one SubConn, this will never
// update the weight. Thus, this will stay 0.
if got := mr.Data[estats.Metric("grpc.lb.wrr.endpoint_weight_stale")]; got != 0 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, 0)
if got, ok := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); !ok || got != 0 {
t.Fatalf("Unexpected data for metric %v, present: %v, got: %v, want: %v", ok, "grpc.lb.wrr.endpoint_weight_stale", got, 0)
}
/*mr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) // Falls back because only one SubConn.
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) // The endpoint weight has not expired so this is 0 (never emitted).
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1)
// Unusable, so no endpoint weight. Due to only one SubConn, this will never
// update the weight. Thus, this will stay 0.
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)*/
}

// Tests two addresses with ORCA reporting disabled (should fall back to pure
Expand Down
11 changes: 5 additions & 6 deletions balancer/weightedroundrobin/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"testing"
"time"

estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal/grpctest"
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils/stats"
Expand Down Expand Up @@ -118,13 +117,13 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) {
}
wsc.weight(test.nowTime, test.weightExpirationPeriod, test.blackoutPeriod, true)

if got := tmr.Data[estats.Metric("grpc.lb.wrr.endpoint_weight_stale")]; got != test.endpointWeightStaleWant {
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); got != test.endpointWeightStaleWant {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, test.endpointWeightStaleWant)
}
if got := tmr.Data[estats.Metric("grpc.lb.wrr.endpoint_weight_not_yet_usable")]; got != test.endpointWeightNotYetUsableWant {
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_not_yet_usable"); got != test.endpointWeightNotYetUsableWant {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_not_yet_usable", got, test.endpointWeightNotYetUsableWant)
}
if got := tmr.Data[estats.Metric("grpc.lb.wrr.endpoint_weight_stale")]; got != test.endpointWeightStaleWant {
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); got != test.endpointWeightStaleWant {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, test.endpointWeightStaleWant)
}
})
Expand Down Expand Up @@ -154,7 +153,7 @@ func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) {
// There is only one SubConn, so no matter if the SubConn has a weight or
// not will fallback to round robin.
p.regenerateScheduler()
if got := tmr.Data[estats.Metric("grpc.lb.wrr.rr_fallback")]; got != 1 {
if got, _ := tmr.Metric("grpc.lb.wrr.rr_fallback"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.rr_fallback", got, 1)
}
tmr.ClearMetrics()
Expand All @@ -168,7 +167,7 @@ func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) {
}
p.subConns = append(p.subConns, wsc2)
p.regenerateScheduler()
if got := tmr.Data[estats.Metric("grpc.lb.wrr.rr_fallback")]; got != 1 {
if got, _ := tmr.Metric("grpc.lb.wrr.rr_fallback"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.rr_fallback", got, 1)
}
}
33 changes: 18 additions & 15 deletions internal/testutils/stats/test_metrics_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,28 @@ package stats
import (
"context"
"fmt"
"sync"
"testing"

"github.com/google/go-cmp/cmp"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/stats"
"sync"
)

// TestMetricsRecorder is a MetricsRecorder to be used in tests. It sends
// recording events on channels and provides helpers to check if certain events
// have taken place. It also persists metrics data keyed on the metrics
// descriptor.
type TestMetricsRecorder struct {
t *testing.T

intCountCh *testutils.Channel
floatCountCh *testutils.Channel
intHistoCh *testutils.Channel
floatHistoCh *testutils.Channel
intGaugeCh *testutils.Channel

// Mu protects Data.
// Mu protects data.
Mu sync.Mutex
// Data is the most recent update for each metric name.
Data map[estats.Metric]float64
// data is the most recent update for each metric name.
data map[estats.Metric]float64
}

// NewTestMetricsRecorder returns a new TestMetricsRecorder.
Expand All @@ -59,15 +55,22 @@ func NewTestMetricsRecorder() *TestMetricsRecorder {
floatHistoCh: testutils.NewChannelWithSize(10),
intGaugeCh: testutils.NewChannelWithSize(10),

Data: make(map[estats.Metric]float64),
data: make(map[estats.Metric]float64),
}
}

// Metric returns the most recent data for a metric, and
// whether this recorder has received data for a metric.
func (r *TestMetricsRecorder) Metric(name string) (float64, bool) {
data, ok := r.data[estats.Metric(name)]
return data, ok
}

// ClearMetrics clears the metrics data store of the test metrics recorder.
func (r *TestMetricsRecorder) ClearMetrics() {
r.Mu.Lock()
defer r.Mu.Unlock()
r.Data = make(map[estats.Metric]float64)
r.data = make(map[estats.Metric]float64)
}

// MetricsData represents data associated with a metric.
Expand Down Expand Up @@ -112,7 +115,7 @@ func (r *TestMetricsRecorder) RecordInt64Count(handle *estats.Int64CountHandle,

r.Mu.Lock()
defer r.Mu.Unlock()
r.Data[handle.Name] = float64(incr)
r.data[handle.Name] = float64(incr)
}

// WaitForFloat64Count waits for a float count metric to be recorded and
Expand Down Expand Up @@ -144,7 +147,7 @@ func (r *TestMetricsRecorder) RecordFloat64Count(handle *estats.Float64CountHand

r.Mu.Lock()
defer r.Mu.Unlock()
r.Data[handle.Name] = incr
r.data[handle.Name] = incr
}

// WaitForInt64Histo waits for an int histo metric to be recorded and verifies
Expand Down Expand Up @@ -175,7 +178,7 @@ func (r *TestMetricsRecorder) RecordInt64Histo(handle *estats.Int64HistoHandle,

r.Mu.Lock()
defer r.Mu.Unlock()
r.Data[handle.Name] = float64(incr)
r.data[handle.Name] = float64(incr)
}

// WaitForFloat64Histo waits for a float histo metric to be recorded and
Expand Down Expand Up @@ -206,7 +209,7 @@ func (r *TestMetricsRecorder) RecordFloat64Histo(handle *estats.Float64HistoHand

r.Mu.Lock()
defer r.Mu.Unlock()
r.Data[handle.Name] = incr
r.data[handle.Name] = incr
}

// WaitForInt64Gauge waits for a int gauge metric to be recorded and
Expand Down Expand Up @@ -237,7 +240,7 @@ func (r *TestMetricsRecorder) RecordInt64Gauge(handle *estats.Int64GaugeHandle,

r.Mu.Lock()
defer r.Mu.Unlock()
r.Data[handle.Name] = float64(incr)
r.data[handle.Name] = float64(incr)
}

// To implement a stats.Handler, which allows it to be set as a dial option:
Expand Down

0 comments on commit e8ff69d

Please sign in to comment.