Skip to content

Commit

Permalink
tcproute: add tests for round-robin load balancing
Browse files Browse the repository at this point in the history
Signed-off-by: Sanskar Jaiswal <[email protected]>
  • Loading branch information
aryan9600 committed Dec 1, 2023
1 parent 31d8a2d commit e790e9d
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 17 deletions.
6 changes: 2 additions & 4 deletions config/samples/tcproute/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ spec:
spec:
containers:
- name: server
image: ghcr.io/shaneutt/malutki
image: istio/tcp-echo-server:1.1
imagePullPolicy: IfNotPresent
env:
- name: LISTEN_PORT
value: "8080"
args: [ "8080", "blixt-tcproute-sample:" ]
ports:
- containerPort: 8080
protocol: TCP
Expand Down
10 changes: 10 additions & 0 deletions config/tests/tcproute-rr/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: ghcr.io/kong/blixt-udp-test-server
newTag: integration-tests
resources:
- ../../samples/tcproute
- server.yaml
patches:
- path: patch.yaml
18 changes: 18 additions & 0 deletions config/tests/tcproute-rr/patch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: gateway.networking.k8s.io/v1alpha2
kind: TCPRoute
metadata:
name: blixt-tcproute-sample
spec:
parentRefs:
- name: blixt-tcproute-sample
port: 8080
rules:
- backendRefs:
- name: blixt-tcproute-sample
port: 8080
- backendRefs:
- name: tcproute-rr-v1
port: 8080
- backendRefs:
- name: tcproute-rr-v2
port: 8080
76 changes: 76 additions & 0 deletions config/tests/tcproute-rr/server.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: tcproute-rr-v1
labels:
app: tcproute-rr-v1
spec:
selector:
matchLabels:
app: tcproute-rr-v1
template:
metadata:
labels:
app: tcproute-rr-v1
spec:
containers:
- name: tcp-echo
image: istio/tcp-echo-server:1.1
imagePullPolicy: IfNotPresent
args: [ "8080", "tcproute-rr-v1:" ]
ports:
- containerPort: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: tcproute-rr-v2
labels:
app: tcproute-rr-v2
spec:
selector:
matchLabels:
app: tcproute-rr-v2
template:
metadata:
labels:
app: tcproute-rr-v2
spec:
containers:
- name: tcp-echo
image: istio/tcp-echo-server:1.1
imagePullPolicy: IfNotPresent
args: [ "8080", "tcproute-rr-v2:" ]
ports:
- containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
labels:
app: tcproute-rr-v1
name: tcproute-rr-v1
spec:
ports:
- name: tcp
port: 8080
protocol: TCP
selector:
app: tcproute-rr-v1
type: ClusterIP
---
apiVersion: v1
kind: Service
metadata:
labels:
app: tcproute-rr-v2
name: tcproute-rr-v2
spec:
ports:
- name: tcp
port: 8080
protocol: TCP
selector:
app: tcproute-rr-v2
type: ClusterIP
148 changes: 135 additions & 13 deletions test/integration/tcproute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ limitations under the License.
package integration

import (
"bufio"
"context"
"fmt"
"net/http"
"net"
"strings"
"testing"
"time"
Expand All @@ -38,9 +39,12 @@ import (

const (
tcprouteSampleKustomize = "../../config/tests/tcproute"
tcprouteRRKustomize = "../../config/tests/tcproute-rr"
tcprouteSampleName = "blixt-tcproute-sample"
)

var tcpServerNames = []string{"blixt-tcproute-sample", "tcproute-rr-v1", "tcproute-rr-v2"}

func TestTCPRouteBasics(t *testing.T) {
tcpRouteBasicsCleanupKey := "tcproutebasics"
defer func() {
Expand Down Expand Up @@ -69,38 +73,156 @@ func TestTCPRouteBasics(t *testing.T) {
require.Equal(t, gatewayv1beta1.IPAddressType, *gw.Status.Addresses[0].Type)
gwaddr := fmt.Sprintf("%s:8080", gw.Status.Addresses[0].Value)

t.Log("waiting for HTTP server to be available")
t.Log("waiting for TCP server to be available")
require.Eventually(t, func() bool {
server, err := env.Cluster().Client().AppsV1().Deployments(corev1.NamespaceDefault).Get(ctx, tcprouteSampleName, metav1.GetOptions{})
require.NoError(t, err)
return server.Status.AvailableReplicas > 0
}, time.Minute, time.Second)

t.Log("verifying HTTP connectivity to the server")
httpc := http.Client{Timeout: time.Second * 30}
t.Log("verifying TCP connectivity to the server")
var conn net.Conn
require.Eventually(t, func() bool {
resp, err := httpc.Get(fmt.Sprintf("http://%s/status/%d", gwaddr, http.StatusTeapot))
var err error
conn, err = net.Dial("tcp", gwaddr)
if err != nil {
t.Logf("received error checking HTTP server: [%s], retrying...", err)
t.Logf("received error connecting to TCP server: [%s], retrying...", err)
return false
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusTeapot
return true
}, time.Minute*5, time.Second)

t.Log("deleting the TCPRoute and verifying that HTTP traffic stops")
writeAndReadTCP(t, conn, tcpServerNames[0])

t.Log("deleting the TCPRoute and verifying that TCP connection is closed")
require.NoError(t, gwclient.GatewayV1alpha2().TCPRoutes(corev1.NamespaceDefault).Delete(ctx, tcprouteSampleName, metav1.DeleteOptions{}))
httpc = http.Client{Timeout: time.Second * 3}
require.Eventually(t, func() bool {
resp, err := httpc.Get(fmt.Sprintf("http://%s/status/%d", gwaddr, http.StatusTeapot))
_, err := conn.Write([]byte("blahhh\n"))
require.NoError(t, err)

err = conn.SetReadDeadline(time.Now().Add(time.Second * 3))
require.NoError(t, err)
reader := bufio.NewReader(conn)
_, err = reader.ReadBytes(byte('\n'))
if err != nil {
if strings.Contains(err.Error(), "context deadline exceeded") {
if strings.Contains(err.Error(), "i/o timeout") {
return true
}
t.Logf("received unexpected error waiting for TCPRoute to decomission: %s", err)
return false
}
defer resp.Body.Close()
return false
}, time.Minute, time.Second)
}

func TestTCPRouteRoundRobin(t *testing.T) {
tcpRouteRRCleanupKey := "tcprouterr"
defer func() {
testutils.DumpDiagnosticsIfFailed(ctx, t, env.Cluster())
if err := runCleanup(tcpRouteRRCleanupKey); err != nil {
t.Errorf("cleanup failed: %s", err)
}
}()

t.Log("deploying config/samples/tcproute-rr kustomize")
require.NoError(t, clusters.KustomizeDeployForCluster(ctx, env.Cluster(), tcprouteRRKustomize))
addCleanup(tcpRouteRRCleanupKey, func(ctx context.Context) error {
cleanupLog("cleaning up config/samples/tcproute-rr kustomize")
return clusters.KustomizeDeleteForCluster(ctx, env.Cluster(), tcprouteRRKustomize, "--ignore-not-found=true")
})

t.Log("waiting for Gateway to have an address")
var gw *gatewayv1beta1.Gateway
require.Eventually(t, func() bool {
var err error
gw, err = gwclient.GatewayV1beta1().Gateways(corev1.NamespaceDefault).Get(ctx, tcprouteSampleName, metav1.GetOptions{})
require.NoError(t, err)
return len(gw.Status.Addresses) > 0
}, time.Minute, time.Second)
require.NotNil(t, gw.Status.Addresses[0].Type)
require.Equal(t, gatewayv1beta1.IPAddressType, *gw.Status.Addresses[0].Type)
gwaddr := fmt.Sprintf("%s:8080", gw.Status.Addresses[0].Value)

t.Log("waiting for TCP servers to be available")
labelSelector := metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "app",
Operator: metav1.LabelSelectorOpIn,
Values: tcpServerNames,
},
},
}
require.Eventually(t, func() bool {
servers, err := env.Cluster().Client().AppsV1().Deployments(corev1.NamespaceDefault).List(ctx, metav1.ListOptions{
LabelSelector: metav1.FormatLabelSelector(&labelSelector),
})
require.NoError(t, err)
for _, server := range servers.Items {
if server.Status.AvailableReplicas <= 0 {
return false
}
}
return true
}, time.Minute, time.Second)

t.Log("verifying TCP connectivity to the servers")
var conn1 net.Conn
require.Eventually(t, func() bool {
var err error
conn1, err = net.Dial("tcp", gwaddr)
if err != nil {
t.Logf("received error connecting to TCP server: [%s], retrying...", err)
return false
}
return true
}, time.Minute*5, time.Second)
conn2, err := net.Dial("tcp", gwaddr)
require.NoError(t, err)
conn3, err := net.Dial("tcp", gwaddr)
require.NoError(t, err)
conns := []net.Conn{conn1, conn2, conn3}

for c := 0; c < 2; c++ {
for i, conn := range conns {
writeAndReadTCP(t, conn, tcpServerNames[i])
}
}

t.Log("deleting the TCPRoute and verifying that all TCP connections are closed")
require.NoError(t, gwclient.GatewayV1alpha2().TCPRoutes(corev1.NamespaceDefault).Delete(ctx, tcprouteSampleName, metav1.DeleteOptions{}))
require.Eventually(t, func() bool {
for _, conn := range conns {
_, err := conn.Write([]byte("blahhh\n"))
require.NoError(t, err)
err = conn.SetReadDeadline(time.Now().Add(time.Second * 3))
require.NoError(t, err)

reader := bufio.NewReader(conn)
_, err = reader.ReadBytes(byte('\n'))
if err != nil {
if strings.Contains(err.Error(), "i/o timeout") {
continue
}
t.Logf("received unexpected error waiting for TCPRoute to decomission: %s", err)
}
return false
}
return true
}, time.Minute, time.Second)
}

func writeAndReadTCP(t *testing.T, conn net.Conn, prefix string) {
t.Helper()

t.Logf("writing data to TCP connection with server %s", prefix)
request := "wazzzaaaa"
_, err := conn.Write([]byte(request + "\n"))
require.NoError(t, err)

t.Logf("reading data from TCP connection with server %s", prefix)
reader := bufio.NewReader(conn)
response, err := reader.ReadBytes(byte('\n'))
require.NoError(t, err)
require.Contains(t, string(response), fmt.Sprintf("%s: %s", prefix, string(request)))
}

0 comments on commit e790e9d

Please sign in to comment.