Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dst: Stop overriding Host IP with Pod IP on HostPort lookup #11328

Merged
merged 3 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 24 additions & 66 deletions controller/api/destination/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,22 @@ func (s *server) getProfileByIP(

if svcID == nil {
// If the IP does not map to a service, check if it maps to a pod
pod, err := getPodByIP(s.k8sAPI, ip.String(), port, s.log)
var pod *corev1.Pod
targetIP := ip.String()
pod, err = getPodByPodIP(s.k8sAPI, ip.String(), port, s.log)
alpeb marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
address, err := s.createAddress(pod, ip.String(), port)
if pod != nil {
targetIP = pod.Status.PodIP
} else {
pod, err = getPodByHostIP(s.k8sAPI, ip.String(), port, s.log)
if err != nil {
return err
}
}

address, err := s.createAddress(pod, targetIP, port)
if err != nil {
return fmt.Errorf("failed to create address: %w", err)
}
Expand Down Expand Up @@ -451,7 +462,7 @@ func (s *server) subscribeToServiceWithoutContext(
return nil
}

// Resolves a profile for a single endpoitn, sending updates to the provided
// Resolves a profile for a single endpoint, sending updates to the provided
// stream.
//
// This function does not return until the stream is closed.
Expand Down Expand Up @@ -496,60 +507,18 @@ func (s *server) subscribeToEndpointProfile(
return nil
}

// getPortForPod returns the port that a `pod` is listening on.
//
// Proxies usually receive traffic targeting `podIp:containerPort`.
// However, they may be receiving traffic on `nodeIp:nodePort`. In this
// case, we convert the port to the containerPort for discovery. In k8s parlance,
// this is the 'HostPort' mapping.
func (s *server) getPortForPod(pod *corev1.Pod, targetIP string, port uint32) (uint32, error) {
if pod == nil {
return port, fmt.Errorf("getPortForPod passed a nil pod")
}

if net.ParseIP(targetIP) == nil {
return port, fmt.Errorf("failed to parse hostIP into net.IP: %s", targetIP)
}

if containsIP(pod.Status.PodIPs, targetIP) {
return port, nil
}

if targetIP == pod.Status.HostIP {
for _, container := range pod.Spec.Containers {
for _, containerPort := range container.Ports {
if uint32(containerPort.HostPort) == port {
return uint32(containerPort.ContainerPort), nil
}
}
}
}

s.log.Warnf("unable to find container port as host (%s) matches neither PodIP nor HostIP (%s)", targetIP, pod)
return port, nil
}

func (s *server) createAddress(pod *corev1.Pod, targetIP string, port uint32) (watcher.Address, error) {
var ip, ownerKind, ownerName string
var ownerKind, ownerName string
var err error
if pod != nil {
ownerKind, ownerName, err = s.metadataAPI.GetOwnerKindAndName(context.Background(), pod, true)
if err != nil {
return watcher.Address{}, err
}

port, err = s.getPortForPod(pod, targetIP, port)
if err != nil {
return watcher.Address{}, fmt.Errorf("failed to find Port for Pod: %w", err)
}

ip = pod.Status.PodIP
} else {
ip = targetIP
}

address := watcher.Address{
IP: ip,
IP: targetIP,
Port: port,
Pod: pod,
OwnerName: ownerName,
Expand Down Expand Up @@ -649,12 +618,9 @@ func (s *server) getEndpointByHostname(k8sAPI *k8s.API, hostname string, svcID w
return nil, fmt.Errorf("no pod found in Endpoints %s/%s for hostname %s", svcID.Namespace, svcID.Name, hostname)
}

// getPodByIP returns a pod that maps to the given IP address. The pod can either
// be in the host network or the pod network. If the pod is in the host
// network, then it must have a container port that exposes `port` as a host
// port.
func getPodByIP(k8sAPI *k8s.API, podIP string, port uint32, log *logging.Entry) (*corev1.Pod, error) {
// First we check if the address maps to a pod in the host network.
// getPodByHostIP returns a pod that maps to the given IP address in the host
// network. It must have a container port that exposes `port` as a host port.
func getPodByHostIP(k8sAPI *k8s.API, podIP string, port uint32, log *logging.Entry) (*corev1.Pod, error) {
alpeb marked this conversation as resolved.
Show resolved Hide resolved
addr := net.JoinHostPort(podIP, fmt.Sprintf("%d", port))
hostIPPods, err := getIndexedPods(k8sAPI, watcher.HostIPIndex, addr)
if err != nil {
Expand All @@ -673,8 +639,11 @@ func getPodByIP(k8sAPI *k8s.API, podIP string, port uint32, log *logging.Entry)
return nil, status.Errorf(codes.FailedPrecondition, "found %d pods with a conflicting host network endpoint %s:%d", len(hostIPPods), podIP, port)
}

// The address did not map to a pod in the host network, so now we check
// if the IP maps to a pod IP in the pod network.
return nil, nil
}

// getPodByPodIP returns a pod that maps to the given IP address in the pod network
func getPodByPodIP(k8sAPI *k8s.API, podIP string, port uint32, log *logging.Entry) (*corev1.Pod, error) {
podIPPods, err := getIndexedPods(k8sAPI, watcher.PodIPIndex, podIP)
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
Expand Down Expand Up @@ -863,14 +832,3 @@ func getPodSkippedInboundPortsAnnotations(pod *corev1.Pod) (map[uint32]struct{},

return util.ParsePorts(annotation)
}

// Given a list of PodIP, determine is `targetIP` is a member
func containsIP(podIPs []corev1.PodIP, targetIP string) bool {
for _, ip := range podIPs {
if ip.IP == targetIP {
return true
}
}

return false
}
58 changes: 27 additions & 31 deletions controller/api/destination/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ const clusterIP = "172.17.12.0"
const clusterIPOpaque = "172.17.12.1"
const podIP1 = "172.17.0.12"
const podIP2 = "172.17.0.13"
const podIP3 = "172.17.0.17"
const podIPOpaque = "172.17.0.14"
const podIPSkipped = "172.17.0.15"
const podIPPolicy = "172.17.0.16"
Expand Down Expand Up @@ -447,6 +446,29 @@ func TestGetProfiles(t *testing.T) {

server.clusterStore.UnregisterGauges()
})

t.Run("Return profile for host port pods", func(t *testing.T) {
hostPort := uint32(7777)
stream, server := profileStream(t, externalIP, hostPort, "")

// HostPort maps to pod.
profile := assertSingleProfile(t, stream.updates)
dstPod := profile.Endpoint.MetricLabels["pod"]
if dstPod != "hostport-mapping" {
t.Fatalf("Expected dst_pod to be %s got %s", "hostport-mapping", dstPod)
}

ip, err := addr.ParseProxyIPV4(externalIP)
if err != nil {
t.Fatalf("Error parsing IP: %s", err)
}
addr := profile.Endpoint.Addr
if addr.Ip.String() != ip.String() && addr.Port != hostPort {
t.Fatalf("Expected endpoint addr to be %s port:%d got %s", ip, hostPort, addr)
}

server.clusterStore.UnregisterGauges()
})
}

func TestTokenStructure(t *testing.T) {
Expand Down Expand Up @@ -512,32 +534,6 @@ func toAddress(path string, port uint32) (*net.TcpAddress, error) {
}, nil
}

func TestHostPortMapping(t *testing.T) {
hostPort := uint32(7777)
containerPort := uint32(80)
server := makeServer(t)

pod, err := getPodByIP(server.k8sAPI, externalIP, hostPort, server.log)
if err != nil {
t.Fatalf("error retrieving pod by external IP %s", err)
}

address, err := server.createAddress(pod, externalIP, hostPort)
if err != nil {
t.Fatalf("error calling createAddress() %s", err)
}

if address.IP != podIP3 {
t.Fatalf("expected podIP (%s), received other IP (%s)", podIP3, address.IP)
}

if address.Port != containerPort {
t.Fatalf("expected containerPort (%d) but received port (%d) instead", containerPort, address.Port)
}

server.clusterStore.UnregisterGauges()
}

func TestIpWatcherGetSvcID(t *testing.T) {
name := "service"
namespace := "test"
Expand Down Expand Up @@ -646,7 +642,7 @@ status:

k8sAPI.Sync(nil)
// Get host IP pod that is mapped to the port `hostPort1`
pod, err := getPodByIP(k8sAPI, hostIP, hostPort1, logging.WithFields(nil))
pod, err := getPodByHostIP(k8sAPI, hostIP, hostPort1, logging.WithFields(nil))
if err != nil {
t.Fatalf("failed to get pod: %s", err)
}
Expand All @@ -659,7 +655,7 @@ status:
// Get host IP pod that is mapped to the port `hostPort2`; this tests
// that the indexer properly adds multiple containers from a single
// pod.
pod, err = getPodByIP(k8sAPI, hostIP, hostPort2, logging.WithFields(nil))
pod, err = getPodByHostIP(k8sAPI, hostIP, hostPort2, logging.WithFields(nil))
if err != nil {
t.Fatalf("failed to get pod: %s", err)
}
Expand All @@ -670,15 +666,15 @@ status:
t.Fatalf("expected pod name to be %s, but got %s", expectedPodName, pod.Name)
}
// Get host IP pod with unmapped host port
pod, err = getPodByIP(k8sAPI, hostIP, 12347, logging.WithFields(nil))
pod, err = getPodByHostIP(k8sAPI, hostIP, 12347, logging.WithFields(nil))
if err != nil {
t.Fatalf("expected no error when getting host IP pod with unmapped host port, but got: %s", err)
}
if pod != nil {
t.Fatal("expected no pod to be found with unmapped host port")
}
// Get pod IP pod and expect an error
_, err = getPodByIP(k8sAPI, podIP, 12346, logging.WithFields(nil))
_, err = getPodByPodIP(k8sAPI, podIP, 12346, logging.WithFields(nil))
if err == nil {
t.Fatal("expected error when getting by pod IP and unmapped host port, but got none")
}
Expand Down
1 change: 1 addition & 0 deletions controller/api/destination/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ kind: Pod
apiVersion: v1
metadata:
name: hostport-mapping
namespace: ns
status:
phase: Running
hostIP: 192.168.1.20
Expand Down