From c74bf6b9dadfdb3588fcd10a09b9518dff98aaa7 Mon Sep 17 00:00:00 2001 From: Anton Miniailo Date: Tue, 7 Nov 2023 15:54:27 -0500 Subject: [PATCH 1/2] Fix PROXY protocol handling of dedicated kube listener with TLS routing --- lib/multiplexer/multiplexer.go | 52 +++++++++++++++++++++++++++++----- lib/service/service.go | 12 +------- 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/lib/multiplexer/multiplexer.go b/lib/multiplexer/multiplexer.go index 5fbae24c7243d..c064641565fda 100644 --- a/lib/multiplexer/multiplexer.go +++ b/lib/multiplexer/multiplexer.go @@ -563,15 +563,9 @@ func (m *Mux) detect(conn net.Conn) (*Conn, error) { proxyLine = newPROXYLine // repeat the cycle to detect the protocol case ProtoTLS, ProtoSSH, ProtoHTTP, ProtoPostgres: - // Proxy and other services might call itself directly, avoiding - // load balancer, so we shouldn't fail connections without PROXY headers for such cases. - selfConnection, err := m.isSelfConnection(conn) - if err != nil { + if err := m.checkPROXYProtocolRequirement(conn, unsignedPROXYLineReceived); err != nil { return nil, trace.Wrap(err) } - if !selfConnection && m.PROXYProtocolMode == PROXYProtocolOn && !unsignedPROXYLineReceived { - return nil, trace.BadParameter(missingProxyLineError, conn.RemoteAddr().String(), conn.LocalAddr().String()) - } return &Conn{ protocol: proto, @@ -585,6 +579,50 @@ func (m *Mux) detect(conn net.Conn) (*Conn, error) { return nil, trace.BadParameter(unknownProtocolError) } +// checkPROXYProtocolRequirement checks that if multiplexer is required to receive unsigned PROXY line +// that requirement is fulfilled, or exceptions apply - self connections and connections that are passed +// from upstream multiplexed listener (as it happens for alpn proxy). +func (m *Mux) checkPROXYProtocolRequirement(conn net.Conn, unsignedPROXYLineReceived bool) error { + if m.PROXYProtocolMode != PROXYProtocolOn { + return nil + } + + // Proxy and other services might call itself directly, avoiding + // load balancer, so we shouldn't fail connections without PROXY headers for such cases. + selfConnection, err := m.isSelfConnection(conn) + if err != nil { + return trace.Wrap(err) + } + + // We try to get inner multiplexer connection, if we succeed and there is on, it means conn was passed + // to us from another multiplexer listener and unsigned PROXY protocol requirement was handled there. + innerConn := unwrapMuxConn(conn) + + if !selfConnection && innerConn == nil && !unsignedPROXYLineReceived { + return trace.BadParameter(missingProxyLineError, conn.RemoteAddr().String(), conn.LocalAddr().String()) + } + + return nil +} + +func unwrapMuxConn(conn net.Conn) *Conn { + type netConn interface { + NetConn() net.Conn + } + + for { + if muxConn, ok := conn.(*Conn); ok { + return muxConn + } + + connGetter, ok := conn.(netConn) + if !ok { + return nil + } + conn = connGetter.NetConn() + } +} + func (m *Mux) isSelfConnection(conn net.Conn) (bool, error) { if m.IgnoreSelfConnections { return false, nil diff --git a/lib/service/service.go b/lib/service/service.go index 729d1337d0de0..9d9a307e9b802 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -4410,16 +4410,6 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { return trace.Wrap(err) } - proxyProtocol := cfg.Proxy.PROXYProtocolMode - if clusterNetworkConfig.GetProxyListenerMode() == types.ProxyListenerMode_Multiplex { - // If ProxyListenerMode is MULTIPLEX it means that the ALPN listener handles the PROXY line - // and sends the connection to the Proxy Kube listener. When it does, it uses the same net.Conn - // and doesn't dial so the PROXY Protocol cannot be present. Under those circumstances, - // ProxyProtocol for Proxy Kube listener must be off. - - proxyProtocol = multiplexer.PROXYProtocolOff - } - kubeServer, err = kubeproxy.NewTLSServer(kubeproxy.TLSServerConfig{ ForwarderConfig: kubeproxy.ForwarderConfig{ Namespace: apidefaults.Namespace, @@ -4455,7 +4445,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { Log: log, IngressReporter: ingressReporter, KubernetesServersWatcher: kubeServerWatcher, - PROXYProtocolMode: proxyProtocol, + PROXYProtocolMode: cfg.Proxy.PROXYProtocolMode, }) if err != nil { return trace.Wrap(err) From 259d72ac3ad91ca660305ebe35ba8a704ed8573b Mon Sep 17 00:00:00 2001 From: Anton Miniailo Date: Mon, 13 Nov 2023 22:46:22 -0500 Subject: [PATCH 2/2] Improve test by checking both addresses in multiplexed mode --- integration/proxy/proxy_test.go | 66 +++++++++++++++++---------------- 1 file changed, 34 insertions(+), 32 deletions(-) diff --git a/integration/proxy/proxy_test.go b/integration/proxy/proxy_test.go index db8c534536960..1c744353fcb87 100644 --- a/integration/proxy/proxy_test.go +++ b/integration/proxy/proxy_test.go @@ -583,6 +583,7 @@ func TestKubePROXYProtocol(t *testing.T) { Log: utils.NewLoggerForTests(), } tconf := servicecfg.MakeDefaultConfig() + tconf.Proxy.Kube.ListenAddr = *utils.MustParseAddr(helpers.NewListener(t, service.ListenerProxyKube, &cfg.Fds)) if tt.proxyListenerMode == types.ProxyListenerMode_Multiplex { cfg.Listeners = helpers.SingleProxyPortSetup(t, &cfg.Fds) tconf.Auth.NetworkingConfig.SetProxyListenerMode(types.ProxyListenerMode_Multiplex) @@ -590,7 +591,6 @@ func TestKubePROXYProtocol(t *testing.T) { cfg.Listeners = helpers.StandardListenerSetup(t, &cfg.Fds) tconf.Auth.NetworkingConfig.SetProxyListenerMode(types.ProxyListenerMode_Separate) tconf.Proxy.DisableALPNSNIListener = true - tconf.Proxy.Kube.ListenAddr = *utils.MustParseAddr(helpers.NewListener(t, service.ListenerProxyKube, &cfg.Fds)) } testCluster := helpers.NewInstance(t, cfg) @@ -624,41 +624,43 @@ func TestKubePROXYProtocol(t *testing.T) { require.NoError(t, testCluster.StopAll()) }) - targetAddr := testCluster.Config.Proxy.WebAddr - if tt.proxyListenerMode == types.ProxyListenerMode_Separate { - targetAddr = testCluster.Config.Proxy.Kube.ListenAddr - } - - // If PROXY protocol is required, create load balancer in front of Teleport cluster - if tt.proxyProtocolMode == multiplexer.PROXYProtocolOn { - frontend := *utils.MustParseAddr("127.0.0.1:0") - lb, err := utils.NewLoadBalancer(context.Background(), frontend) - require.NoError(t, err) - lb.PROXYHeader = []byte("PROXY TCP4 127.0.0.1 127.0.0.2 12345 42\r\n") // Send fake PROXY header - lb.AddBackend(targetAddr) - err = lb.Listen() + checkForTargetAddr := func(targetAddr utils.NetAddr) { + // If PROXY protocol is required, create load balancer in front of Teleport cluster + if tt.proxyProtocolMode == multiplexer.PROXYProtocolOn { + frontend := *utils.MustParseAddr("127.0.0.1:0") + lb, err := utils.NewLoadBalancer(context.Background(), frontend) + require.NoError(t, err) + lb.PROXYHeader = []byte("PROXY TCP4 127.0.0.1 127.0.0.2 12345 42\r\n") // Send fake PROXY header + lb.AddBackend(targetAddr) + err = lb.Listen() + require.NoError(t, err) + + go lb.Serve() + t.Cleanup(func() { require.NoError(t, lb.Close()) }) + targetAddr = *utils.MustParseAddr(lb.Addr().String()) + } + + // Create kube client that we'll use to test that connection is working correctly. + k8Client, _, err := kube.ProxyClient(kube.ProxyConfig{ + T: testCluster, + Username: kubeRoleSpec.Allow.Logins[0], + KubeUsers: kubeRoleSpec.Allow.KubeGroups, + KubeGroups: kubeRoleSpec.Allow.KubeUsers, + CustomTLSServerName: kubeCluster, + TargetAddress: targetAddr, + RouteToCluster: testCluster.Secrets.SiteName, + }) require.NoError(t, err) - go lb.Serve() - t.Cleanup(func() { require.NoError(t, lb.Close()) }) - targetAddr = *utils.MustParseAddr(lb.Addr().String()) + resp, err := k8Client.CoreV1().Pods("default").List(context.Background(), metav1.ListOptions{}) + require.NoError(t, err) + require.Len(t, resp.Items, 3, "pods item length mismatch") } - // Create kube client that we'll use to test that connection is working correctly. - k8Client, _, err := kube.ProxyClient(kube.ProxyConfig{ - T: testCluster, - Username: kubeRoleSpec.Allow.Logins[0], - KubeUsers: kubeRoleSpec.Allow.KubeGroups, - KubeGroups: kubeRoleSpec.Allow.KubeUsers, - CustomTLSServerName: kubeCluster, - TargetAddress: targetAddr, - RouteToCluster: testCluster.Secrets.SiteName, - }) - require.NoError(t, err) - - resp, err := k8Client.CoreV1().Pods("default").List(context.Background(), metav1.ListOptions{}) - require.NoError(t, err) - require.Len(t, resp.Items, 3, "pods item length mismatch") + checkForTargetAddr(testCluster.Config.Proxy.Kube.ListenAddr) + if tt.proxyListenerMode == types.ProxyListenerMode_Multiplex { + checkForTargetAddr(testCluster.Config.Proxy.WebAddr) + } }) } }