Skip to content

Commit bb64438

Browse files
committed
tcproute: add tests for round-robin load balancing
Signed-off-by: Sanskar Jaiswal <[email protected]>
1 parent 31d8a2d commit bb64438

File tree

5 files changed

+269
-17
lines changed

5 files changed

+269
-17
lines changed

config/samples/tcproute/server.yaml

+2-4
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,9 @@ spec:
1616
spec:
1717
containers:
1818
- name: server
19-
image: ghcr.io/shaneutt/malutki
19+
image: istio/tcp-echo-server:1.1
2020
imagePullPolicy: IfNotPresent
21-
env:
22-
- name: LISTEN_PORT
23-
value: "8080"
21+
args: [ "8080", "blixt-tcproute-sample:" ]
2422
ports:
2523
- containerPort: 8080
2624
protocol: TCP
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
apiVersion: kustomize.config.k8s.io/v1beta1
2+
kind: Kustomization
3+
images:
4+
- name: ghcr.io/kong/blixt-udp-test-server
5+
newTag: integration-tests
6+
resources:
7+
- ../../samples/tcproute
8+
- server.yaml
9+
patches:
10+
- path: patch.yaml

config/tests/tcproute-rr/patch.yaml

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
apiVersion: gateway.networking.k8s.io/v1alpha2
2+
kind: TCPRoute
3+
metadata:
4+
name: blixt-tcproute-sample
5+
spec:
6+
parentRefs:
7+
- name: blixt-tcproute-sample
8+
port: 8080
9+
rules:
10+
- backendRefs:
11+
- name: blixt-tcproute-sample
12+
port: 8080
13+
- backendRefs:
14+
- name: tcproute-rr-v1
15+
port: 8080
16+
- backendRefs:
17+
- name: tcproute-rr-v2
18+
port: 8080

config/tests/tcproute-rr/server.yaml

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
---
2+
apiVersion: apps/v1
3+
kind: Deployment
4+
metadata:
5+
name: tcproute-rr-v1
6+
labels:
7+
app: tcproute-rr-v1
8+
spec:
9+
selector:
10+
matchLabels:
11+
app: tcproute-rr-v1
12+
template:
13+
metadata:
14+
labels:
15+
app: tcproute-rr-v1
16+
spec:
17+
containers:
18+
- name: tcp-echo
19+
image: istio/tcp-echo-server:1.1
20+
imagePullPolicy: IfNotPresent
21+
args: [ "8080", "tcproute-rr-v1:" ]
22+
ports:
23+
- containerPort: 8080
24+
---
25+
apiVersion: apps/v1
26+
kind: Deployment
27+
metadata:
28+
name: tcproute-rr-v2
29+
labels:
30+
app: tcproute-rr-v2
31+
spec:
32+
selector:
33+
matchLabels:
34+
app: tcproute-rr-v2
35+
template:
36+
metadata:
37+
labels:
38+
app: tcproute-rr-v2
39+
spec:
40+
containers:
41+
- name: tcp-echo
42+
image: istio/tcp-echo-server:1.1
43+
imagePullPolicy: IfNotPresent
44+
args: [ "8080", "tcproute-rr-v2:" ]
45+
ports:
46+
- containerPort: 8080
47+
---
48+
apiVersion: v1
49+
kind: Service
50+
metadata:
51+
labels:
52+
app: tcproute-rr-v1
53+
name: tcproute-rr-v1
54+
spec:
55+
ports:
56+
- name: tcp
57+
port: 8080
58+
protocol: TCP
59+
selector:
60+
app: tcproute-rr-v1
61+
type: ClusterIP
62+
---
63+
apiVersion: v1
64+
kind: Service
65+
metadata:
66+
labels:
67+
app: tcproute-rr-v2
68+
name: tcproute-rr-v2
69+
spec:
70+
ports:
71+
- name: tcp
72+
port: 8080
73+
protocol: TCP
74+
selector:
75+
app: tcproute-rr-v2
76+
type: ClusterIP

test/integration/tcproute_test.go

+163-13
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ limitations under the License.
2020
package integration
2121

2222
import (
23+
"bufio"
2324
"context"
2425
"fmt"
25-
"net/http"
26+
"net"
2627
"strings"
2728
"testing"
2829
"time"
@@ -38,9 +39,12 @@ import (
3839

3940
const (
4041
tcprouteSampleKustomize = "../../config/tests/tcproute"
42+
tcprouteRRKustomize = "../../config/tests/tcproute-rr"
4143
tcprouteSampleName = "blixt-tcproute-sample"
4244
)
4345

46+
var tcpServerNames = []string{"blixt-tcproute-sample", "tcproute-rr-v1", "tcproute-rr-v2"}
47+
4448
func TestTCPRouteBasics(t *testing.T) {
4549
tcpRouteBasicsCleanupKey := "tcproutebasics"
4650
defer func() {
@@ -69,38 +73,184 @@ func TestTCPRouteBasics(t *testing.T) {
6973
require.Equal(t, gatewayv1beta1.IPAddressType, *gw.Status.Addresses[0].Type)
7074
gwaddr := fmt.Sprintf("%s:8080", gw.Status.Addresses[0].Value)
7175

72-
t.Log("waiting for HTTP server to be available")
76+
t.Log("waiting for TCP server to be available")
7377
require.Eventually(t, func() bool {
7478
server, err := env.Cluster().Client().AppsV1().Deployments(corev1.NamespaceDefault).Get(ctx, tcprouteSampleName, metav1.GetOptions{})
7579
require.NoError(t, err)
7680
return server.Status.AvailableReplicas > 0
7781
}, time.Minute, time.Second)
7882

79-
t.Log("verifying HTTP connectivity to the server")
80-
httpc := http.Client{Timeout: time.Second * 30}
83+
t.Log("verifying TCP connectivity to the server")
84+
var conn net.Conn
8185
require.Eventually(t, func() bool {
82-
resp, err := httpc.Get(fmt.Sprintf("http://%s/status/%d", gwaddr, http.StatusTeapot))
86+
var err error
87+
conn, err = net.Dial("tcp", gwaddr)
8388
if err != nil {
84-
t.Logf("received error checking HTTP server: [%s], retrying...", err)
89+
t.Logf("received error connecting to TCP server: [%s], retrying...", err)
8590
return false
8691
}
87-
defer resp.Body.Close()
88-
return resp.StatusCode == http.StatusTeapot
92+
return true
8993
}, time.Minute*5, time.Second)
9094

91-
t.Log("deleting the TCPRoute and verifying that HTTP traffic stops")
95+
response := writeAndReadTCP(t, conn)
96+
require.Contains(t, response, tcpServerNames[0])
97+
98+
t.Log("deleting the TCPRoute and verifying that TCP connection is closed")
9299
require.NoError(t, gwclient.GatewayV1alpha2().TCPRoutes(corev1.NamespaceDefault).Delete(ctx, tcprouteSampleName, metav1.DeleteOptions{}))
93-
httpc = http.Client{Timeout: time.Second * 3}
94100
require.Eventually(t, func() bool {
95-
resp, err := httpc.Get(fmt.Sprintf("http://%s/status/%d", gwaddr, http.StatusTeapot))
101+
_, err := conn.Write([]byte("blahhh\n"))
102+
require.NoError(t, err)
103+
104+
err = conn.SetReadDeadline(time.Now().Add(time.Second * 3))
105+
require.NoError(t, err)
106+
reader := bufio.NewReader(conn)
107+
_, err = reader.ReadBytes(byte('\n'))
96108
if err != nil {
97-
if strings.Contains(err.Error(), "context deadline exceeded") {
109+
if strings.Contains(err.Error(), "i/o timeout") {
98110
return true
99111
}
100112
t.Logf("received unexpected error waiting for TCPRoute to decomission: %s", err)
101113
return false
102114
}
103-
defer resp.Body.Close()
104115
return false
105116
}, time.Minute, time.Second)
106117
}
118+
119+
func TestTCPRouteRoundRobin(t *testing.T) {
120+
tcpRouteRRCleanupKey := "tcprouterr"
121+
defer func() {
122+
testutils.DumpDiagnosticsIfFailed(ctx, t, env.Cluster())
123+
if err := runCleanup(tcpRouteRRCleanupKey); err != nil {
124+
t.Errorf("cleanup failed: %s", err)
125+
}
126+
}()
127+
128+
t.Log("deploying config/samples/tcproute-rr kustomize")
129+
require.NoError(t, clusters.KustomizeDeployForCluster(ctx, env.Cluster(), tcprouteRRKustomize))
130+
addCleanup(tcpRouteRRCleanupKey, func(ctx context.Context) error {
131+
cleanupLog("cleaning up config/samples/tcproute-rr kustomize")
132+
return clusters.KustomizeDeleteForCluster(ctx, env.Cluster(), tcprouteRRKustomize, "--ignore-not-found=true")
133+
})
134+
135+
t.Log("waiting for Gateway to have an address")
136+
var gw *gatewayv1beta1.Gateway
137+
require.Eventually(t, func() bool {
138+
var err error
139+
gw, err = gwclient.GatewayV1beta1().Gateways(corev1.NamespaceDefault).Get(ctx, tcprouteSampleName, metav1.GetOptions{})
140+
require.NoError(t, err)
141+
return len(gw.Status.Addresses) > 0
142+
}, time.Minute, time.Second)
143+
require.NotNil(t, gw.Status.Addresses[0].Type)
144+
require.Equal(t, gatewayv1beta1.IPAddressType, *gw.Status.Addresses[0].Type)
145+
gwaddr := fmt.Sprintf("%s:8080", gw.Status.Addresses[0].Value)
146+
147+
t.Log("waiting for TCP servers to be available")
148+
labelSelector := metav1.LabelSelector{
149+
MatchExpressions: []metav1.LabelSelectorRequirement{
150+
{
151+
Key: "app",
152+
Operator: metav1.LabelSelectorOpIn,
153+
Values: tcpServerNames,
154+
},
155+
},
156+
}
157+
require.Eventually(t, func() bool {
158+
servers, err := env.Cluster().Client().AppsV1().Deployments(corev1.NamespaceDefault).List(ctx, metav1.ListOptions{
159+
LabelSelector: metav1.FormatLabelSelector(&labelSelector),
160+
})
161+
require.NoError(t, err)
162+
for _, server := range servers.Items {
163+
if server.Status.AvailableReplicas <= 0 {
164+
return false
165+
}
166+
}
167+
return true
168+
}, time.Minute, time.Second)
169+
170+
t.Log("verifying TCP connectivity to the servers")
171+
// We create three TCP connections, one for each backend.
172+
var conn1 net.Conn
173+
require.Eventually(t, func() bool {
174+
var err error
175+
conn1, err = net.Dial("tcp", gwaddr)
176+
if err != nil {
177+
t.Logf("received error connecting to TCP server: [%s], retrying...", err)
178+
return false
179+
}
180+
return true
181+
}, time.Minute*5, time.Second)
182+
conn2, err := net.Dial("tcp", gwaddr)
183+
require.NoError(t, err)
184+
conn3, err := net.Dial("tcp", gwaddr)
185+
require.NoError(t, err)
186+
conns := []net.Conn{conn1, conn2, conn3}
187+
188+
// Run it twice to verify that we load balance in a round-robin fashion.
189+
for c := 0; c < 2; c++ {
190+
// We can't do names := tcpServerNames because we overwrite this in the loop later.
191+
var names []string
192+
names = append(names, tcpServerNames...)
193+
194+
for _, conn := range conns {
195+
response := writeAndReadTCP(t, conn)
196+
split := strings.Split(response, ":")
197+
require.Len(t, split, 2)
198+
name := split[0]
199+
var removed bool
200+
names, removed = removeName(names, name)
201+
// If no name was removed from the list, it means that the response
202+
// does not contain the name of a known server.
203+
if !removed {
204+
t.Fatalf("received unexpected response from backend: %s", name)
205+
}
206+
}
207+
require.Len(t, names, 0)
208+
}
209+
210+
t.Log("deleting the TCPRoute and verifying that all TCP connections are closed")
211+
require.NoError(t, gwclient.GatewayV1alpha2().TCPRoutes(corev1.NamespaceDefault).Delete(ctx, tcprouteSampleName, metav1.DeleteOptions{}))
212+
require.Eventually(t, func() bool {
213+
for _, conn := range conns {
214+
_, err := conn.Write([]byte("blahhh\n"))
215+
require.NoError(t, err)
216+
err = conn.SetReadDeadline(time.Now().Add(time.Second * 3))
217+
require.NoError(t, err)
218+
219+
reader := bufio.NewReader(conn)
220+
_, err = reader.ReadBytes(byte('\n'))
221+
if err != nil {
222+
if strings.Contains(err.Error(), "i/o timeout") {
223+
continue
224+
}
225+
t.Logf("received unexpected error waiting for TCPRoute to decomission: %s", err)
226+
}
227+
return false
228+
}
229+
return true
230+
}, time.Minute, time.Second)
231+
}
232+
233+
func removeName(names []string, name string) ([]string, bool) {
234+
for i, v := range names {
235+
if v == name {
236+
names = append(names[:i], names[i+1:]...)
237+
return names, true
238+
}
239+
}
240+
return nil, false
241+
}
242+
243+
func writeAndReadTCP(t *testing.T, conn net.Conn) string {
244+
t.Helper()
245+
246+
t.Logf("writing data to TCP connection with backend %s", conn.RemoteAddr().String())
247+
request := "wazzzaaaa"
248+
_, err := conn.Write([]byte(request + "\n"))
249+
require.NoError(t, err)
250+
251+
t.Logf("reading data from TCP connection with backend %s", conn.RemoteAddr().String())
252+
reader := bufio.NewReader(conn)
253+
response, err := reader.ReadBytes(byte('\n'))
254+
require.NoError(t, err)
255+
return string(response)
256+
}

0 commit comments

Comments
 (0)