Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 59 additions & 44 deletions pkg/autoscaler/statserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,25 @@ import (
"k8s.io/apimachinery/pkg/types"
)

var (
msg1 = metrics.StatMessage{
Key: types.NamespacedName{Namespace: "test-namespace", Name: "test-revision"},
Stat: metrics.Stat{
PodName: "activator1",
AverageConcurrentRequests: 2.1,
RequestCount: 51,
},
}
msg2 = metrics.StatMessage{
Key: types.NamespacedName{Namespace: "test-namespace", Name: "test-revision2"},
Stat: metrics.Stat{
PodName: "activator2",
AverageConcurrentRequests: 2.2,
RequestCount: 30,
},
}
)

func TestServerLifecycle(t *testing.T) {
statsCh := make(chan metrics.StatMessage)
server := newTestServer(statsCh)
Expand Down Expand Up @@ -82,17 +101,17 @@ func TestStatsReceived(t *testing.T) {
defer server.Shutdown(0)
go server.listenAndServe()

statSink := dialOK(server.listenAddr(), t)
statSink := dialOK(t, server.listenAddr())

// gob encoding
assertReceivedOK(t, newStatMessage(types.NamespacedName{Namespace: "test-namespace", Name: "test-revision"}, "activator1", 2.1, 51), statSink, statsCh, false)
assertReceivedOK(t, newStatMessage(types.NamespacedName{Namespace: "test-namespace", Name: "test-revision2"}, "activator2", 2.2, 30), statSink, statsCh, false)
assertReceivedOK(t, msg1, statSink, statsCh, false)
assertReceivedOK(t, msg2, statSink, statsCh, false)

// json encoding
assertReceivedOK(t, newStatMessage(types.NamespacedName{Namespace: "test-namespace", Name: "test-revision"}, "activator1", 2.1, 51), statSink, statsCh, true)
assertReceivedOK(t, newStatMessage(types.NamespacedName{Namespace: "test-namespace", Name: "test-revision2"}, "activator2", 2.2, 30), statSink, statsCh, true)
assertReceivedOK(t, msg1, statSink, statsCh, true)
assertReceivedOK(t, msg2, statSink, statsCh, true)

closeSink(statSink, t)
closeSink(t, statSink)
}

func TestServerShutdown(t *testing.T) {
Expand All @@ -102,16 +121,16 @@ func TestServerShutdown(t *testing.T) {
go server.listenAndServe()

listenAddr := server.listenAddr()
statSink := dialOK(listenAddr, t)
statSink := dialOK(t, listenAddr)

assertReceivedOK(t, newStatMessage(types.NamespacedName{Namespace: "test-namespace", Name: "test-revision"}, "activator1", 2.1, 51), statSink, statsCh, false)
assertReceivedOK(t, msg1, statSink, statsCh, false)

server.Shutdown(time.Second)
// We own the channel.
close(statsCh)

// Send a statistic to the server
if err := send(statSink, newStatMessage(types.NamespacedName{Namespace: "test-namespace", Name: "test-revision2"}, "activator2", 2.2, 30), false); err != nil {
if err := send(statSink, msg2, false); err != nil {
t.Fatal("Expected send to succeed, got:", err)
}

Expand Down Expand Up @@ -139,23 +158,24 @@ func TestServerShutdown(t *testing.T) {
t.Fatal("Connection not refused")
}

closeSink(statSink, t)
closeSink(t, statSink)
}

func TestServerDoesNotLeakGoroutines(t *testing.T) {
statsCh := make(chan metrics.StatMessage)
server := newTestServer(statsCh)
defer server.Shutdown(0)

go server.listenAndServe()

originalGoroutines := runtime.NumGoroutine()

listenAddr := server.listenAddr()
statSink := dialOK(listenAddr, t)
statSink := dialOK(t, listenAddr)

assertReceivedOK(t, newStatMessage(types.NamespacedName{Namespace: "test-namespace", Name: "test-revision"}, "activator1", 2.1, 51), statSink, statsCh, false)
assertReceivedOK(t, msg1, statSink, statsCh, false)

closeSink(statSink, t)
closeSink(t, statSink)

// Check the number of goroutines eventually reduces to the number there were before the connection was created
for i := 1000; i >= 0; i-- {
Expand All @@ -168,12 +188,10 @@ func TestServerDoesNotLeakGoroutines(t *testing.T) {
t.Fatalf("Current number of goroutines %d is not equal to the original number %d", currentGoRoutines, originalGoroutines)
}
}

server.Shutdown(time.Second)
}

func BenchmarkStatServer(b *testing.B) {
statsCh := make(chan metrics.StatMessage, 1)
statsCh := make(chan metrics.StatMessage, 100)
server := newTestServer(statsCh)
go server.listenAndServe()
defer server.Shutdown(time.Second)
Expand All @@ -183,47 +201,44 @@ func BenchmarkStatServer(b *testing.B) {
b.Fatal("Dial failed:", err)
}

msg := newStatMessage(types.NamespacedName{Namespace: "test-namespace", Name: "test-revision"}, "activator1", 2.1, 51)
// The activator sends a bunch of metrics at once usually. This simulates cases with
// the respective number of active revisions, sending via the activator.
for _, size := range []int{1, 2, 5, 10, 20, 50, 100} {
msgs := make([]metrics.StatMessage, 0, size)
for i := 0; i < size; i++ {
msgs = append(msgs, msg1)
}

for encoding, jsonEncoding := range map[string]bool{"json": true, "gob": false} {
b.Run(fmt.Sprintf("%s-encoding-sequential", encoding), func(b *testing.B) {
for i := 0; i < b.N; i++ {
if err := send(statSink, msg, jsonEncoding); err != nil {
b.Fatal("Expected send to succeed, but got:", err)
for encoding, jsonEncoding := range map[string]bool{"json": true, "gob": false} {
b.Run(fmt.Sprintf("%s-encoding-%d-msgs", encoding, len(msgs)), func(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, msg := range msgs {
if err := send(statSink, msg, jsonEncoding); err != nil {
b.Fatal("Expected send to succeed, but got:", err)
}
}

for range msgs {
<-statsCh
}
}
<-statsCh
}
})
}
}

func newStatMessage(revKey types.NamespacedName, podName string, averageConcurrentRequests float64, requestCount float64) metrics.StatMessage {
return metrics.StatMessage{
Key: revKey,
Stat: metrics.Stat{
PodName: podName,
AverageConcurrentRequests: averageConcurrentRequests,
RequestCount: requestCount,
},
})
}
}
}

func assertReceivedOK(t *testing.T, sm metrics.StatMessage, statSink *websocket.Conn, statsCh <-chan metrics.StatMessage, jsonEncoding bool) bool {
func assertReceivedOK(t *testing.T, sm metrics.StatMessage, statSink *websocket.Conn, statsCh <-chan metrics.StatMessage, jsonEncoding bool) {
if err := send(statSink, sm, jsonEncoding); err != nil {
t.Fatal("Expected send to succeed, got:", err)
}

recv, ok := <-statsCh
if !ok {
t.Fatal("Statistic not received")
}
recv := <-statsCh
if !cmp.Equal(sm, recv) {
t.Fatalf("StatMessage mismatch: diff (-got, +want) %s", cmp.Diff(recv, sm))
}
return true
}

func dialOK(serverURL string, t *testing.T) *websocket.Conn {
func dialOK(t *testing.T, serverURL string) *websocket.Conn {
statSink, err := dial(serverURL)
if err != nil {
t.Fatal("Dial failed:", err)
Expand Down Expand Up @@ -267,7 +282,7 @@ func send(statSink *websocket.Conn, sm metrics.StatMessage, jsonEncoding bool) e
return nil
}

func closeSink(statSink *websocket.Conn, t *testing.T) {
func closeSink(t *testing.T, statSink *websocket.Conn) {
if err := statSink.Close(); err != nil {
t.Fatal("Failed to close", err)
}
Expand Down