Skip to content

Commit cd4dded

Browse files
committed
Fix issue with loadbalancer failover to default server
The loadbalancer should only fail over to the default server if all other server have failed, and it should force fail-back to a preferred server as soon as one passes health checks. The loadbalancer tests have been improved to ensure that this occurs. Signed-off-by: Brad Davidson <[email protected]>
1 parent b93fd98 commit cd4dded

File tree

5 files changed

+200
-41
lines changed

5 files changed

+200
-41
lines changed

Diff for: pkg/agent/loadbalancer/loadbalancer.go

+2
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ func (lb *LoadBalancer) dialContext(ctx context.Context, network, _ string) (net
179179
if !allChecksFailed {
180180
defer server.closeAll()
181181
}
182+
} else {
183+
logrus.Debugf("Dial health check failed for %s", targetServer)
182184
}
183185

184186
newServer, err := lb.nextServer(targetServer)

Diff for: pkg/agent/loadbalancer/loadbalancer_test.go

+157-29
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"testing"
1111
"time"
1212

13-
"github.com/k3s-io/k3s/pkg/cli/cmds"
1413
"github.com/sirupsen/logrus"
1514
)
1615

@@ -24,7 +23,7 @@ type testServer struct {
2423
prefix string
2524
}
2625

27-
func createServer(prefix string) (*testServer, error) {
26+
func createServer(ctx context.Context, prefix string) (*testServer, error) {
2827
listener, err := net.Listen("tcp", "127.0.0.1:0")
2928
if err != nil {
3029
return nil, err
@@ -34,6 +33,10 @@ func createServer(prefix string) (*testServer, error) {
3433
listener: listener,
3534
}
3635
go s.serve()
36+
go func() {
37+
<-ctx.Done()
38+
s.close()
39+
}()
3740
return s, nil
3841
}
3942

@@ -49,6 +52,7 @@ func (s *testServer) serve() {
4952
}
5053

5154
func (s *testServer) close() {
55+
logrus.Printf("testServer %s closing", s.prefix)
5256
s.listener.Close()
5357
for _, conn := range s.conns {
5458
conn.Close()
@@ -65,6 +69,10 @@ func (s *testServer) echo(conn net.Conn) {
6569
}
6670
}
6771

72+
func (s *testServer) address() string {
73+
return s.listener.Addr().String()
74+
}
75+
6876
func ping(conn net.Conn) (string, error) {
6977
fmt.Fprintf(conn, "ping\n")
7078
result, err := bufio.NewReader(conn).ReadString('\n')
@@ -74,25 +82,31 @@ func ping(conn net.Conn) (string, error) {
7482
return strings.TrimSpace(result), nil
7583
}
7684

85+
// Test_UnitFailOver creates a LB using a default server (ie fixed registration endpoint)
86+
// and then adds a new server (a node). The node server is then closed, and it is confirmed
87+
// that new connections use the default server.
7788
func Test_UnitFailOver(t *testing.T) {
7889
tmpDir := t.TempDir()
90+
ctx, cancel := context.WithCancel(context.Background())
91+
defer cancel()
7992

80-
ogServe, err := createServer("og")
93+
defaultServer, err := createServer(ctx, "default")
8194
if err != nil {
82-
t.Fatalf("createServer(og) failed: %v", err)
95+
t.Fatalf("createServer(default) failed: %v", err)
8396
}
8497

85-
lbServe, err := createServer("lb")
98+
node1Server, err := createServer(ctx, "node1")
8699
if err != nil {
87-
t.Fatalf("createServer(lb) failed: %v", err)
100+
t.Fatalf("createServer(node1) failed: %v", err)
88101
}
89102

90-
cfg := cmds.Agent{
91-
ServerURL: fmt.Sprintf("http://%s/", ogServe.listener.Addr().String()),
92-
DataDir: tmpDir,
103+
node2Server, err := createServer(ctx, "node2")
104+
if err != nil {
105+
t.Fatalf("createServer(node2) failed: %v", err)
93106
}
94107

95-
lb, err := New(context.TODO(), cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort, false)
108+
// start the loadbalancer with the default server as the only server
109+
lb, err := New(ctx, tmpDir, SupervisorServiceName, "http://"+defaultServer.address(), RandomPort, false)
96110
if err != nil {
97111
t.Fatalf("New() failed: %v", err)
98112
}
@@ -103,50 +117,123 @@ func Test_UnitFailOver(t *testing.T) {
103117
}
104118
localAddress := parsedURL.Host
105119

106-
lb.Update([]string{lbServe.listener.Addr().String()})
120+
// add the node as a new server address.
121+
lb.Update([]string{node1Server.address()})
107122

123+
// make sure connections go to the node
108124
conn1, err := net.Dial("tcp", localAddress)
109125
if err != nil {
110126
t.Fatalf("net.Dial failed: %v", err)
111127
}
112-
result1, err := ping(conn1)
113-
if err != nil {
128+
if result, err := ping(conn1); err != nil {
114129
t.Fatalf("ping(conn1) failed: %v", err)
130+
} else if result != "node1:ping" {
131+
t.Fatalf("Unexpected ping(conn1) result: %v", result)
115132
}
116-
if result1 != "lb:ping" {
117-
t.Fatalf("Unexpected ping result: %v", result1)
118-
}
119133

120-
lbServe.close()
134+
t.Log("conn1 tested OK")
135+
136+
// set failing health check for node 1
137+
lb.SetHealthCheck(node1Server.address(), func() bool { return false })
138+
139+
// Server connections are checked every second, now that node 1 is failed
140+
// the connections to it should be closed.
141+
time.Sleep(2 * time.Second)
121142

122-
_, err = ping(conn1)
123-
if err == nil {
143+
if _, err := ping(conn1); err == nil {
124144
t.Fatal("Unexpected successful ping on closed connection conn1")
125145
}
126146

147+
t.Log("conn1 closed on failure OK")
148+
149+
// make sure connection still goes to the first node - it is failing health checks but so
150+
// is the default endpoint, so it should be tried first with health checks disabled,
151+
// before failing back to the default.
127152
conn2, err := net.Dial("tcp", localAddress)
128153
if err != nil {
129154
t.Fatalf("net.Dial failed: %v", err)
130155

131156
}
132-
result2, err := ping(conn2)
133-
if err != nil {
157+
if result, err := ping(conn2); err != nil {
134158
t.Fatalf("ping(conn2) failed: %v", err)
159+
} else if result != "node1:ping" {
160+
t.Fatalf("Unexpected ping(conn2) result: %v", result)
135161
}
136-
if result2 != "og:ping" {
137-
t.Fatalf("Unexpected ping result: %v", result2)
162+
163+
t.Log("conn2 tested OK")
164+
165+
// make sure the health checks don't close the connection we just made -
166+
// connections should only be closed when it transitions from health to unhealthy.
167+
time.Sleep(2 * time.Second)
168+
169+
if result, err := ping(conn2); err != nil {
170+
t.Fatalf("ping(conn2) failed: %v", err)
171+
} else if result != "node1:ping" {
172+
t.Fatalf("Unexpected ping(conn2) result: %v", result)
138173
}
174+
175+
t.Log("conn2 tested OK again")
176+
177+
// shut down the first node server to force failover to the default
178+
node1Server.close()
179+
180+
// make sure new connections go to the default, and existing connections are closed
181+
conn3, err := net.Dial("tcp", localAddress)
182+
if err != nil {
183+
t.Fatalf("net.Dial failed: %v", err)
184+
185+
}
186+
if result, err := ping(conn3); err != nil {
187+
t.Fatalf("ping(conn3) failed: %v", err)
188+
} else if result != "default:ping" {
189+
t.Fatalf("Unexpected ping(conn3) result: %v", result)
190+
}
191+
192+
t.Log("conn3 tested OK")
193+
194+
if _, err := ping(conn2); err == nil {
195+
t.Fatal("Unexpected successful ping on closed connection conn2")
196+
}
197+
198+
t.Log("conn2 closed on failure OK")
199+
200+
// add the second node as a new server address.
201+
lb.Update([]string{node2Server.address()})
202+
203+
// make sure connection now goes to the second node,
204+
// and connections to the default are closed.
205+
conn4, err := net.Dial("tcp", localAddress)
206+
if err != nil {
207+
t.Fatalf("net.Dial failed: %v", err)
208+
209+
}
210+
if result, err := ping(conn4); err != nil {
211+
t.Fatalf("ping(conn4) failed: %v", err)
212+
} else if result != "node2:ping" {
213+
t.Fatalf("Unexpected ping(conn4) result: %v", result)
214+
}
215+
216+
t.Log("conn4 tested OK")
217+
218+
// Server connections are checked every second, now that we have a healthy
219+
// server, connections to the default server should be closed
220+
time.Sleep(2 * time.Second)
221+
222+
if _, err := ping(conn3); err == nil {
223+
t.Fatal("Unexpected successful ping on connection conn3")
224+
}
225+
226+
t.Log("conn3 closed on failure OK")
139227
}
140228

229+
// Test_UnitFailFast confirms that connnections to invalid addresses fail quickly
141230
func Test_UnitFailFast(t *testing.T) {
142231
tmpDir := t.TempDir()
232+
ctx, cancel := context.WithCancel(context.Background())
233+
defer cancel()
143234

144-
cfg := cmds.Agent{
145-
ServerURL: "http://127.0.0.1:0/",
146-
DataDir: tmpDir,
147-
}
148-
149-
lb, err := New(context.TODO(), cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort, false)
235+
serverURL := "http://127.0.0.1:0/"
236+
lb, err := New(ctx, tmpDir, SupervisorServiceName, serverURL, RandomPort, false)
150237
if err != nil {
151238
t.Fatalf("New() failed: %v", err)
152239
}
@@ -172,3 +259,44 @@ func Test_UnitFailFast(t *testing.T) {
172259
t.Fatal("Test timed out")
173260
}
174261
}
262+
263+
// Test_UnitFailUnreachable confirms that connnections to unreachable addresses do fail
264+
// within the expected duration
265+
func Test_UnitFailUnreachable(t *testing.T) {
266+
if testing.Short() {
267+
t.Skip("skipping slow test in short mode.")
268+
}
269+
tmpDir := t.TempDir()
270+
ctx, cancel := context.WithCancel(context.Background())
271+
defer cancel()
272+
273+
serverAddr := "192.0.2.1:6443"
274+
lb, err := New(ctx, tmpDir, SupervisorServiceName, "http://"+serverAddr, RandomPort, false)
275+
if err != nil {
276+
t.Fatalf("New() failed: %v", err)
277+
}
278+
279+
// Set failing health check to reduce retries
280+
lb.SetHealthCheck(serverAddr, func() bool { return false })
281+
282+
conn, err := net.Dial("tcp", lb.localAddress)
283+
if err != nil {
284+
t.Fatalf("net.Dial failed: %v", err)
285+
}
286+
287+
done := make(chan error)
288+
go func() {
289+
_, err = ping(conn)
290+
done <- err
291+
}()
292+
timeout := time.After(11 * time.Second)
293+
294+
select {
295+
case err := <-done:
296+
if err == nil {
297+
t.Fatal("Unexpected successful ping from unreachable address")
298+
}
299+
case <-timeout:
300+
t.Fatal("Test timed out")
301+
}
302+
}

0 commit comments

Comments
 (0)