diff --git a/pkg/autoscaler/statserver/server_test.go b/pkg/autoscaler/statserver/server_test.go index 774ea07358e1..1156fb941c43 100644 --- a/pkg/autoscaler/statserver/server_test.go +++ b/pkg/autoscaler/statserver/server_test.go @@ -38,6 +38,25 @@ import ( "k8s.io/apimachinery/pkg/types" ) +var ( + msg1 = metrics.StatMessage{ + Key: types.NamespacedName{Namespace: "test-namespace", Name: "test-revision"}, + Stat: metrics.Stat{ + PodName: "activator1", + AverageConcurrentRequests: 2.1, + RequestCount: 51, + }, + } + msg2 = metrics.StatMessage{ + Key: types.NamespacedName{Namespace: "test-namespace", Name: "test-revision2"}, + Stat: metrics.Stat{ + PodName: "activator2", + AverageConcurrentRequests: 2.2, + RequestCount: 30, + }, + } +) + func TestServerLifecycle(t *testing.T) { statsCh := make(chan metrics.StatMessage) server := newTestServer(statsCh) @@ -82,17 +101,17 @@ func TestStatsReceived(t *testing.T) { defer server.Shutdown(0) go server.listenAndServe() - statSink := dialOK(server.listenAddr(), t) + statSink := dialOK(t, server.listenAddr()) // gob encoding - assertReceivedOK(t, newStatMessage(types.NamespacedName{Namespace: "test-namespace", Name: "test-revision"}, "activator1", 2.1, 51), statSink, statsCh, false) - assertReceivedOK(t, newStatMessage(types.NamespacedName{Namespace: "test-namespace", Name: "test-revision2"}, "activator2", 2.2, 30), statSink, statsCh, false) + assertReceivedOK(t, msg1, statSink, statsCh, false) + assertReceivedOK(t, msg2, statSink, statsCh, false) // json encoding - assertReceivedOK(t, newStatMessage(types.NamespacedName{Namespace: "test-namespace", Name: "test-revision"}, "activator1", 2.1, 51), statSink, statsCh, true) - assertReceivedOK(t, newStatMessage(types.NamespacedName{Namespace: "test-namespace", Name: "test-revision2"}, "activator2", 2.2, 30), statSink, statsCh, true) + assertReceivedOK(t, msg1, statSink, statsCh, true) + assertReceivedOK(t, msg2, statSink, statsCh, true) - closeSink(statSink, t) + closeSink(t, statSink) } func TestServerShutdown(t *testing.T) { @@ -102,16 +121,16 @@ func TestServerShutdown(t *testing.T) { go server.listenAndServe() listenAddr := server.listenAddr() - statSink := dialOK(listenAddr, t) + statSink := dialOK(t, listenAddr) - assertReceivedOK(t, newStatMessage(types.NamespacedName{Namespace: "test-namespace", Name: "test-revision"}, "activator1", 2.1, 51), statSink, statsCh, false) + assertReceivedOK(t, msg1, statSink, statsCh, false) server.Shutdown(time.Second) // We own the channel. close(statsCh) // Send a statistic to the server - if err := send(statSink, newStatMessage(types.NamespacedName{Namespace: "test-namespace", Name: "test-revision2"}, "activator2", 2.2, 30), false); err != nil { + if err := send(statSink, msg2, false); err != nil { t.Fatal("Expected send to succeed, got:", err) } @@ -139,23 +158,24 @@ func TestServerShutdown(t *testing.T) { t.Fatal("Connection not refused") } - closeSink(statSink, t) + closeSink(t, statSink) } func TestServerDoesNotLeakGoroutines(t *testing.T) { statsCh := make(chan metrics.StatMessage) server := newTestServer(statsCh) + defer server.Shutdown(0) go server.listenAndServe() originalGoroutines := runtime.NumGoroutine() listenAddr := server.listenAddr() - statSink := dialOK(listenAddr, t) + statSink := dialOK(t, listenAddr) - assertReceivedOK(t, newStatMessage(types.NamespacedName{Namespace: "test-namespace", Name: "test-revision"}, "activator1", 2.1, 51), statSink, statsCh, false) + assertReceivedOK(t, msg1, statSink, statsCh, false) - closeSink(statSink, t) + closeSink(t, statSink) // Check the number of goroutines eventually reduces to the number there were before the connection was created for i := 1000; i >= 0; i-- { @@ -168,12 +188,10 @@ func TestServerDoesNotLeakGoroutines(t *testing.T) { t.Fatalf("Current number of goroutines %d is not equal to the original number %d", currentGoRoutines, originalGoroutines) } } - - server.Shutdown(time.Second) } func BenchmarkStatServer(b *testing.B) { - statsCh := make(chan metrics.StatMessage, 1) + statsCh := make(chan metrics.StatMessage, 100) server := newTestServer(statsCh) go server.listenAndServe() defer server.Shutdown(time.Second) @@ -183,47 +201,44 @@ func BenchmarkStatServer(b *testing.B) { b.Fatal("Dial failed:", err) } - msg := newStatMessage(types.NamespacedName{Namespace: "test-namespace", Name: "test-revision"}, "activator1", 2.1, 51) + // The activator sends a bunch of metrics at once usually. This simulates cases with + // the respective number of active revisions, sending via the activator. + for _, size := range []int{1, 2, 5, 10, 20, 50, 100} { + msgs := make([]metrics.StatMessage, 0, size) + for i := 0; i < size; i++ { + msgs = append(msgs, msg1) + } - for encoding, jsonEncoding := range map[string]bool{"json": true, "gob": false} { - b.Run(fmt.Sprintf("%s-encoding-sequential", encoding), func(b *testing.B) { - for i := 0; i < b.N; i++ { - if err := send(statSink, msg, jsonEncoding); err != nil { - b.Fatal("Expected send to succeed, but got:", err) + for encoding, jsonEncoding := range map[string]bool{"json": true, "gob": false} { + b.Run(fmt.Sprintf("%s-encoding-%d-msgs", encoding, len(msgs)), func(b *testing.B) { + for i := 0; i < b.N; i++ { + for _, msg := range msgs { + if err := send(statSink, msg, jsonEncoding); err != nil { + b.Fatal("Expected send to succeed, but got:", err) + } + } + + for range msgs { + <-statsCh + } } - <-statsCh - } - }) - } -} - -func newStatMessage(revKey types.NamespacedName, podName string, averageConcurrentRequests float64, requestCount float64) metrics.StatMessage { - return metrics.StatMessage{ - Key: revKey, - Stat: metrics.Stat{ - PodName: podName, - AverageConcurrentRequests: averageConcurrentRequests, - RequestCount: requestCount, - }, + }) + } } } -func assertReceivedOK(t *testing.T, sm metrics.StatMessage, statSink *websocket.Conn, statsCh <-chan metrics.StatMessage, jsonEncoding bool) bool { +func assertReceivedOK(t *testing.T, sm metrics.StatMessage, statSink *websocket.Conn, statsCh <-chan metrics.StatMessage, jsonEncoding bool) { if err := send(statSink, sm, jsonEncoding); err != nil { t.Fatal("Expected send to succeed, got:", err) } - recv, ok := <-statsCh - if !ok { - t.Fatal("Statistic not received") - } + recv := <-statsCh if !cmp.Equal(sm, recv) { t.Fatalf("StatMessage mismatch: diff (-got, +want) %s", cmp.Diff(recv, sm)) } - return true } -func dialOK(serverURL string, t *testing.T) *websocket.Conn { +func dialOK(t *testing.T, serverURL string) *websocket.Conn { statSink, err := dial(serverURL) if err != nil { t.Fatal("Dial failed:", err) @@ -267,7 +282,7 @@ func send(statSink *websocket.Conn, sm metrics.StatMessage, jsonEncoding bool) e return nil } -func closeSink(statSink *websocket.Conn, t *testing.T) { +func closeSink(t *testing.T, statSink *websocket.Conn) { if err := statSink.Close(); err != nil { t.Fatal("Failed to close", err) }