Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions lib/healthcheck/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func TestManager(t *testing.T) {
t.Run("duplicate target is an error", func(t *testing.T) {
err = mgr.AddTarget(devTarget)
require.Error(t, err)
require.IsType(t, trace.AlreadyExists(""), err)
require.ErrorIs(t, trace.AlreadyExists("target health checker \"name=devDB, kind=db\" already exists"), err)
})
t.Run("unsupported target resource is an error", func(t *testing.T) {
err = mgr.AddTarget(Target{
Expand All @@ -253,7 +253,7 @@ func TestManager(t *testing.T) {
},
})
require.Error(t, err)
require.IsType(t, trace.BadParameter(""), err)
require.ErrorIs(t, trace.BadParameter("health check target resource kind \"node\" is not supported"), err)
})

requireTargetHealth := func(t *testing.T, r types.ResourceWithLabels, status types.TargetHealthStatus, reason types.TargetHealthTransitionReason) {
Expand Down Expand Up @@ -397,11 +397,11 @@ func TestManager(t *testing.T) {
// shouldn't be any target health after the target is removed
_, err = mgr.GetTargetHealth(devDB)
require.Error(t, err)
require.IsType(t, trace.NotFound(""), err)
require.ErrorIs(t, trace.NotFound("health checker \"name=devDB, kind=db\" not found"), err)

err = mgr.RemoveTarget(devDB)
require.Error(t, err)
require.IsType(t, trace.NotFound(""), err)
require.ErrorIs(t, trace.NotFound("health checker \"name=devDB, kind=db\" not found"), err)

// prodDB should still be disabled
requireTargetHealth(t, prodDB, types.TargetHealthStatusUnknown, types.TargetHealthTransitionReasonDisabled)
Expand Down
45 changes: 45 additions & 0 deletions lib/healthcheck/order_by.go
Comment thread
rosstimothy marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Teleport
* Copyright (C) 2025 Gravitational, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package healthcheck

import (
"iter"

"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/utils"
)

// OrderByTargetHealthStatus returns an iterator over resources ordered by
// health status: healthy, unknown, and unhealthy. Each group is shuffled
// to distributing load on resources.
func OrderByTargetHealthStatus[T types.TargetHealthStatusGetter](resources []T) iter.Seq[T] {
return func(yield func(T) bool) {
groups := types.GroupByTargetHealthStatus(resources)
for _, group := range [][]T{groups.Healthy, groups.Unknown, groups.Unhealthy} {
// ShuffleVisit is used for its efficient early return and partial shuffle.
// The whole healthy group is likely not shuffled or visited.
// And the unknown and unhealthy groups are likely not shuffled or visited.
for _, resource := range utils.ShuffleVisit(group) {
if !yield(resource) {
return
}
}
}
}
}
122 changes: 122 additions & 0 deletions lib/healthcheck/order_by_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Teleport
* Copyright (C) 2025 Gravitational, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package healthcheck

import (
"math"
"slices"
"testing"

"github.com/stretchr/testify/require"

"github.com/gravitational/teleport/api/types"
)

func TestOrderByHealthEmpty(t *testing.T) {
t.Parallel()
var servers []types.KubeServer
var visited []string
for server := range OrderByTargetHealthStatus(servers) {
visited = append(visited, server.GetHostID())
}
require.Empty(t, visited)
}

func TestOrderByHealthOne(t *testing.T) {
t.Parallel()
servers := []types.KubeServer{
newKubeServer(t, "one", types.TargetHealthStatusHealthy),
}
var visited []string
for server := range OrderByTargetHealthStatus(servers) {
visited = append(visited, server.GetHostID())
}
require.Equal(t, []string{"one"}, visited)
}

func TestOrderByHealthUnsorted(t *testing.T) {
t.Parallel()
servers := []types.KubeServer{
newKubeServer(t, "unknown-2", types.TargetHealthStatusUnknown),
newKubeServer(t, "unknown-1", types.TargetHealthStatusUnknown),
newKubeServer(t, "unhealthy-1", types.TargetHealthStatusUnhealthy),
newKubeServer(t, "healthy-2", types.TargetHealthStatusHealthy),
newKubeServer(t, "unhealthy-2", types.TargetHealthStatusUnhealthy),
newKubeServer(t, "healthy-1", types.TargetHealthStatusHealthy),
}
var visited []types.KubeServer
for server := range OrderByTargetHealthStatus(servers) {
visited = append(visited, server)
}
require.Len(t, visited, len(servers))
require.True(t, slices.IsSortedFunc(visited, byHealthOrder))
}

func TestOrderByHealthEarlyExit(t *testing.T) {
t.Parallel()
servers := []types.KubeServer{
newKubeServer(t, "unknown-1", types.TargetHealthStatusUnknown),
newKubeServer(t, "unhealthy-1", types.TargetHealthStatusUnhealthy),
newKubeServer(t, "healthy-2", types.TargetHealthStatusHealthy),
newKubeServer(t, "healthy-1", types.TargetHealthStatusHealthy),
}
var visited []string
for server := range OrderByTargetHealthStatus(servers) {
visited = append(visited, server.GetHostID())
if len(visited) >= 2 {
break
}
}
require.Len(t, visited, 2)
require.NotContains(t, visited, "unknown-1")
require.NotContains(t, visited, "unhealthy-1")
}

func newKubeServer(t *testing.T, hostID string, health types.TargetHealthStatus) types.KubeServer {
t.Helper()
cluster, err := types.NewKubernetesClusterV3(
types.Metadata{Name: "test-cluster"},
types.KubernetesClusterSpecV3{},
)
require.NoError(t, err)
server, err := types.NewKubernetesServerV3FromCluster(cluster, "localhost:8080", hostID)
require.NoError(t, err)
server.Status = &types.KubernetesServerStatusV3{
TargetHealth: &types.TargetHealth{
Status: string(health),
},
}
return server
}

func healthOrder(s types.KubeServer) int {
switch s.GetTargetHealthStatus() {
case types.TargetHealthStatusHealthy:
return 0
case types.TargetHealthStatusUnknown:
return 1
case types.TargetHealthStatusUnhealthy:
return 2
}
return math.MaxInt
}

func byHealthOrder(a, b types.KubeServer) int {
return healthOrder(a) - healthOrder(b)
}
24 changes: 13 additions & 11 deletions lib/kube/proxy/transport.go
Comment thread
rosstimothy marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/auth/authclient"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/healthcheck"
"github.com/gravitational/teleport/lib/kube/internal"
"github.com/gravitational/teleport/lib/reversetunnelclient"
"github.com/gravitational/teleport/lib/utils"
Expand Down Expand Up @@ -319,38 +320,39 @@ func (f *Forwarder) localClusterDialer(kubeClusterName string, opts ...contextDi
return nil, trace.Wrap(err)
}

// Dial kube servers in the order of health status healthy, unknown, and unhealthy.
// Each health group is shuffled to distribute load.
// Unknown servers and unhealthy servers are still dialed
// in case health status changed since last check.
var errs []error
// Shuffle the list of servers to avoid always connecting to the same
// server.
for _, s := range utils.ShuffleVisit(kubeServers) {
for server := range healthcheck.OrderByTargetHealthStatus(kubeServers) {
// Validate that the requested kube cluster is registered.
kubeCluster := s.GetCluster()
if kubeCluster.GetName() != kubeClusterName || !opt.matches(s.GetHostID()) {
if server.GetCluster().GetName() != kubeClusterName || !opt.matches(server.GetHostID()) {
continue
}

// serverID is a unique identifier of the server in the cluster.
// It is a combination of the server's hostname and the cluster name.
// <host_id>.<cluster_name>
serverID := fmt.Sprintf("%s.%s", s.GetHostID(), f.cfg.ClusterName)
serverID := server.GetHostID() + "." + f.cfg.ClusterName
conn, err := localCluster.DialTCP(reversetunnelclient.DialParams{
// Send a sentinel value to the remote cluster because this connection
// will be used to forward multiple requests to the remote cluster from
// different users.
// IP Pinning is based on the source IP address of the connection that
// we transport over HTTP headers so it's not affected.
From: &utils.NetAddr{AddrNetwork: "tcp", Addr: "0.0.0.0:0"},
To: &utils.NetAddr{AddrNetwork: "tcp", Addr: s.GetHostname()},
To: &utils.NetAddr{AddrNetwork: "tcp", Addr: server.GetHostname()},
ConnType: types.KubeTunnel,
ServerID: serverID,
ProxyIDs: s.GetProxyIDs(),
ProxyIDs: server.GetProxyIDs(),
})
if err == nil {
opt.collect(s.GetHostID())
opt.collect(server.GetHostID())
return conn, nil
}
errs = append(errs, trace.Wrap(err))
errs = append(errs, err)
}

if len(errs) > 0 {
return nil, trace.NewAggregate(errs...)
}
Expand Down
Loading
Loading