From 7ae1bed4a452e8cf667d2576e09b0f24c25cc343 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Wed, 14 Jun 2023 20:23:46 -0400 Subject: [PATCH 1/4] Fix flaky metrics test --- stats/opencensus/e2e_test.go | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/stats/opencensus/e2e_test.go b/stats/opencensus/e2e_test.go index 1f68675f0c1d..a5511bab83b6 100644 --- a/stats/opencensus/e2e_test.go +++ b/stats/opencensus/e2e_test.go @@ -237,6 +237,23 @@ func distributionDataLatencyCount(vi *viewInformation, countWant int64, wantTags return nil } +// waitForServerCompletedRPCs waits until both Unary and Streaming metric rows +// appear for server completed RPC's view (by checking for length of rows to be +// 2). Returns an error if both the Unary and Streaming metric not found within +// the passed context's timeout. +func waitForServerCompletedRPCs(ctx context.Context, fe *fakeExporter) error { + for ; ctx.Err() == nil; <-time.After(time.Millisecond) { + rows, err := view.RetrieveData("grpc.io/server/completed_rpcs") + if err != nil { + continue + } + if len(rows) == 2 { + return nil + } + } + return fmt.Errorf("timeout when waiting for Unary and Streaming rows to be present for \"grpc.io/server/completed_rpcs\"") +} + // TestAllMetricsOneFunction tests emitted metrics from gRPC. It registers all // the metrics provided by this package. It then configures a system with a gRPC // Client and gRPC server with the OpenCensus Dial and Server Option configured, @@ -987,10 +1004,13 @@ func (s) TestAllMetricsOneFunction(t *testing.T) { }, }, } - // Unregister all the views. Unregistering a view causes a synchronous - // upload of any collected data for the view to any registered exporters. - // Thus, after this unregister call, the exporter has the data to make - // assertions on immediately. + // Server Side stats.End call happens asynchronously for both Unary and + // Streaming calls with respect to the RPC returning client side. Thus, add + // a sync point at the global view package level for these two rows to be + // recorded, which will be synchronously uploaded to exporters right after. + if err := waitForServerCompletedRPCs(ctx, fe); err != nil { + t.Fatal(err) + } view.Unregister(allViews...) // Assert the expected emissions for each metric match the expected // emissions. From 9993ba23854e0d926ec9fa100e039bf1d827d41e Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Thu, 15 Jun 2023 14:06:51 -0400 Subject: [PATCH 2/4] Review comments --- stats/opencensus/e2e_test.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/stats/opencensus/e2e_test.go b/stats/opencensus/e2e_test.go index a5511bab83b6..8b41b790198a 100644 --- a/stats/opencensus/e2e_test.go +++ b/stats/opencensus/e2e_test.go @@ -238,16 +238,21 @@ func distributionDataLatencyCount(vi *viewInformation, countWant int64, wantTags } // waitForServerCompletedRPCs waits until both Unary and Streaming metric rows -// appear for server completed RPC's view (by checking for length of rows to be -// 2). Returns an error if both the Unary and Streaming metric not found within -// the passed context's timeout. -func waitForServerCompletedRPCs(ctx context.Context, fe *fakeExporter) error { +// appear for server completed RPC's view. Returns an error if the Unary and +// Streaming metric are not found within the passed context's timeout. +func waitForServerCompletedRPCs(ctx context.Context) error { for ; ctx.Err() == nil; <-time.After(time.Millisecond) { rows, err := view.RetrieveData("grpc.io/server/completed_rpcs") if err != nil { continue } - if len(rows) == 2 { + m := make(map[string]bool) + for _, row := range rows { + for _, tag := range row.Tags { + m[tag.Value] = true + } + } + if m["grpc.testing.TestService/UnaryCall"] && m["grpc.testing.TestService/FullDuplexCall"] { return nil } } @@ -1008,7 +1013,7 @@ func (s) TestAllMetricsOneFunction(t *testing.T) { // Streaming calls with respect to the RPC returning client side. Thus, add // a sync point at the global view package level for these two rows to be // recorded, which will be synchronously uploaded to exporters right after. - if err := waitForServerCompletedRPCs(ctx, fe); err != nil { + if err := waitForServerCompletedRPCs(ctx); err != nil { t.Fatal(err) } view.Unregister(allViews...) From 35358c48cbff70b43f2e5b55e3db608930842f9d Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Fri, 16 Jun 2023 09:14:41 -0400 Subject: [PATCH 3/4] Review comments --- stats/opencensus/e2e_test.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/stats/opencensus/e2e_test.go b/stats/opencensus/e2e_test.go index 8b41b790198a..ae6d31b5d8ca 100644 --- a/stats/opencensus/e2e_test.go +++ b/stats/opencensus/e2e_test.go @@ -238,22 +238,30 @@ func distributionDataLatencyCount(vi *viewInformation, countWant int64, wantTags } // waitForServerCompletedRPCs waits until both Unary and Streaming metric rows -// appear for server completed RPC's view. Returns an error if the Unary and -// Streaming metric are not found within the passed context's timeout. +// appear, in two seperate rows, for server completed RPC's view. Returns an +// error if the Unary and Streaming metric are not found within the passed +// context's timeout. func waitForServerCompletedRPCs(ctx context.Context) error { for ; ctx.Err() == nil; <-time.After(time.Millisecond) { rows, err := view.RetrieveData("grpc.io/server/completed_rpcs") if err != nil { continue } - m := make(map[string]bool) + unaryFound := false + streamingFound := false for _, row := range rows { for _, tag := range row.Tags { - m[tag.Value] = true + if tag.Value == "grpc.testing.TestService/UnaryCall" { + unaryFound = true + break + } else if tag.Value == "grpc.testing.TestService/FullDuplexCall" { + streamingFound = true + break + } + } + if unaryFound && streamingFound { + return nil } - } - if m["grpc.testing.TestService/UnaryCall"] && m["grpc.testing.TestService/FullDuplexCall"] { - return nil } } return fmt.Errorf("timeout when waiting for Unary and Streaming rows to be present for \"grpc.io/server/completed_rpcs\"") From 526f731538c379e32cb6686fdf15d3b34e897a48 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Fri, 16 Jun 2023 19:06:27 -0400 Subject: [PATCH 4/4] Typo --- stats/opencensus/e2e_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stats/opencensus/e2e_test.go b/stats/opencensus/e2e_test.go index ae6d31b5d8ca..3b3c2bbbd908 100644 --- a/stats/opencensus/e2e_test.go +++ b/stats/opencensus/e2e_test.go @@ -238,7 +238,7 @@ func distributionDataLatencyCount(vi *viewInformation, countWant int64, wantTags } // waitForServerCompletedRPCs waits until both Unary and Streaming metric rows -// appear, in two seperate rows, for server completed RPC's view. Returns an +// appear, in two separate rows, for server completed RPC's view. Returns an // error if the Unary and Streaming metric are not found within the passed // context's timeout. func waitForServerCompletedRPCs(ctx context.Context) error {