From c52b20d46b457df65f9d144b787bfe7f2b817006 Mon Sep 17 00:00:00 2001 From: Yongming Ding Date: Thu, 25 Mar 2021 17:12:17 -0700 Subject: [PATCH] Add flowType field for Flow Exporter In this PR, we implemented the logic of flowType value assignment. We distinguished Pod-To-Pod flows and Pod-To-External flows using the podCIDRs of all nodes in the k8s cluster. --- cmd/antrea-agent/agent.go | 5 +- .../noderoute/node_route_controller.go | 40 +++++++++- .../noderoute/node_route_controller_test.go | 80 +++++++++++++++++-- pkg/agent/flowexporter/exporter/exporter.go | 46 +++++++++-- pkg/util/ip/ip.go | 8 +- test/e2e/flowaggregator_test.go | 10 +++ 6 files changed, 171 insertions(+), 18 deletions(-) diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index d2deb2ca7b4..6def7e95eb6 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -353,6 +353,7 @@ func run(o *Options) error { if features.DefaultFeatureGate.Enabled(features.FlowExporter) { v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode) v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode) + isNetworkPolicyOnly := networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() flowRecords := flowrecords.NewFlowRecords() connStore := connections.NewConnectionStore( @@ -376,7 +377,9 @@ func run(o *Options) error { o.config.EnableTLSToFlowAggregator, v4Enabled, v6Enabled, - k8sClient) + k8sClient, + nodeRouteController, + isNetworkPolicyOnly) if err != nil { return fmt.Errorf("error when creating IPFIX flow exporter: %v", err) } diff --git a/pkg/agent/controller/noderoute/node_route_controller.go b/pkg/agent/controller/noderoute/node_route_controller.go index 4c0060c8a01..acfd4aa1ddf 100644 --- a/pkg/agent/controller/noderoute/node_route_controller.go +++ b/pkg/agent/controller/noderoute/node_route_controller.go @@ -37,6 +37,7 @@ import ( "github.com/vmware-tanzu/antrea/pkg/agent/route" "github.com/vmware-tanzu/antrea/pkg/agent/util" "github.com/vmware-tanzu/antrea/pkg/ovs/ovsconfig" + utilip "github.com/vmware-tanzu/antrea/pkg/util/ip" ) const ( @@ -97,7 +98,8 @@ func NewNodeRouteController( nodeLister: nodeInformer.Lister(), nodeListerSynced: nodeInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "noderoute"), - installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc})} + installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc}), + } nodeInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: func(cur interface{}) { @@ -615,3 +617,39 @@ func GetNodeAddr(node *corev1.Node) (net.IP, error) { } return ipAddr, nil } + +func (c *Controller) IPInPodSubnets(ip net.IP) bool { + var ipCIDR *net.IPNet + var curNodeCIDRStr string + if ip.To4() != nil { + var podIPv4CIDRMaskSize int + if c.nodeConfig.PodIPv4CIDR != nil { + curNodeCIDRStr = c.nodeConfig.PodIPv4CIDR.String() + podIPv4CIDRMaskSize, _ = c.nodeConfig.PodIPv4CIDR.Mask.Size() + } else { + return false + } + v4Mask := net.CIDRMask(podIPv4CIDRMaskSize, utilip.V4BitLen) + ipCIDR = &net.IPNet{ + IP: ip.Mask(v4Mask), + Mask: v4Mask, + } + + } else { + var podIPv6CIDRMaskSize int + if c.nodeConfig.PodIPv6CIDR != nil { + curNodeCIDRStr = c.nodeConfig.PodIPv6CIDR.String() + podIPv6CIDRMaskSize, _ = c.nodeConfig.PodIPv6CIDR.Mask.Size() + } else { + return false + } + v6Mask := net.CIDRMask(podIPv6CIDRMaskSize, utilip.V6BitLen) + ipCIDR = &net.IPNet{ + IP: ip.Mask(v6Mask), + Mask: v6Mask, + } + } + ipCIDRStr := ipCIDR.String() + nodeInCluster, _ := c.installedNodes.ByIndex(nodeRouteInfoPodCIDRIndexName, ipCIDRStr) + return len(nodeInCluster) > 0 || ipCIDRStr == curNodeCIDRStr +} diff --git a/pkg/agent/controller/noderoute/node_route_controller_test.go b/pkg/agent/controller/noderoute/node_route_controller_test.go index 26ed85c1d77..df175ca4db2 100644 --- a/pkg/agent/controller/noderoute/node_route_controller_test.go +++ b/pkg/agent/controller/noderoute/node_route_controller_test.go @@ -22,6 +22,7 @@ import ( "github.com/containernetworking/plugins/pkg/ip" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" @@ -35,11 +36,13 @@ import ( ) var ( - gatewayMAC, _ = net.ParseMAC("00:00:00:00:00:01") - _, podCIDR, _ = net.ParseCIDR("1.1.1.0/24") - podCIDRGateway = ip.NextIP(podCIDR.IP) - nodeIP1 = net.ParseIP("10.10.10.10") - nodeIP2 = net.ParseIP("10.10.10.11") + gatewayMAC, _ = net.ParseMAC("00:00:00:00:00:01") + _, podCIDR, _ = net.ParseCIDR("1.1.1.0/24") + _, podCIDR2, _ = net.ParseCIDR("1.1.2.0/24") + podCIDRGateway = ip.NextIP(podCIDR.IP) + podCIDR2Gateway = ip.NextIP(podCIDR2.IP) + nodeIP1 = net.ParseIP("10.10.10.10") + nodeIP2 = net.ParseIP("10.10.10.11") ) type fakeController struct { @@ -158,3 +161,70 @@ func TestControllerWithDuplicatePodCIDR(t *testing.T) { case <-finishCh: } } + +func TestIPInPodSubnets(t *testing.T) { + c, closeFn := newController(t) + defer closeFn() + defer c.queue.ShutDown() + + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + // Must wait for cache sync, otherwise resource creation events will be missing if the resources are created + // in-between list and watch call of an informer. This is because fake clientset doesn't support watching with + // resourceVersion. A watcher of fake clientset only gets events that happen after the watcher is created. + c.informerFactory.WaitForCacheSync(stopCh) + c.Controller.nodeConfig.PodIPv4CIDR = podCIDR + + node1 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + Spec: corev1.NodeSpec{ + PodCIDR: podCIDR.String(), + PodCIDRs: []string{podCIDR.String()}, + }, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + { + Type: corev1.NodeInternalIP, + Address: nodeIP1.String(), + }, + }, + }, + } + node2 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, + Spec: corev1.NodeSpec{ + PodCIDR: podCIDR2.String(), + PodCIDRs: []string{podCIDR2.String()}, + }, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + { + Type: corev1.NodeInternalIP, + Address: nodeIP2.String(), + }, + }, + }, + } + + c.clientset.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{}) + // The 2nd argument is Any() because the argument is unpredictable when it uses pointer as the key of map. + // The argument type is map[*net.IPNet]net.IP. + c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), nodeIP1, uint32(0)).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR, "node1", nodeIP1, podCIDRGateway).Times(1) + c.processNextWorkItem() + + c.clientset.CoreV1().Nodes().Create(context.TODO(), node2, metav1.CreateOptions{}) + c.ofClient.EXPECT().InstallNodeFlows("node2", gomock.Any(), nodeIP2, uint32(0)).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR2, "node2", nodeIP2, podCIDR2Gateway).Times(1) + c.processNextWorkItem() + + assert.Equal(t, true, c.Controller.IPInPodSubnets(net.ParseIP("1.1.1.1"))) + assert.Equal(t, true, c.Controller.IPInPodSubnets(net.ParseIP("1.1.2.1"))) + assert.Equal(t, false, c.Controller.IPInPodSubnets(net.ParseIP("10.10.10.10"))) + assert.Equal(t, false, c.Controller.IPInPodSubnets(net.ParseIP("8.8.8.8"))) +} diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index 1df0f064c09..f0d17bc5ff7 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -27,9 +27,11 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/klog" + "github.com/vmware-tanzu/antrea/pkg/agent/controller/noderoute" "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter" "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/connections" "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/flowrecords" + "github.com/vmware-tanzu/antrea/pkg/agent/openflow" "github.com/vmware-tanzu/antrea/pkg/ipfix" "github.com/vmware-tanzu/antrea/pkg/util/env" ) @@ -94,6 +96,8 @@ type flowExporter struct { idleFlowTimeout time.Duration enableTLSToFlowAggregator bool k8sClient kubernetes.Interface + nodeRouteController *noderoute.Controller + isNetworkPolicyOnly bool } func genObservationID() (uint32, error) { @@ -123,7 +127,8 @@ func prepareExporterInputArgs(collectorAddr, collectorProto string) (exporter.Ex func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords.FlowRecords, collectorAddr string, collectorProto string, activeFlowTimeout time.Duration, idleFlowTimeout time.Duration, - enableTLSToFlowAggregator bool, v4Enabled bool, v6Enabled bool, k8sClient kubernetes.Interface) (*flowExporter, error) { + enableTLSToFlowAggregator bool, v4Enabled bool, v6Enabled bool, k8sClient kubernetes.Interface, + nodeRouteController *noderoute.Controller, isNetworkPolicyOnly bool) (*flowExporter, error) { // Initialize IPFIX registry registry := ipfix.NewIPFIXRegistry() registry.LoadRegistry() @@ -133,6 +138,7 @@ func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords if err != nil { return nil, err } + return &flowExporter{ connStore: connStore, flowRecords: records, @@ -145,6 +151,8 @@ func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords ipfixSet: ipfix.NewSet(false), enableTLSToFlowAggregator: enableTLSToFlowAggregator, k8sClient: k8sClient, + nodeRouteController: nodeRouteController, + isNetworkPolicyOnly: isNetworkPolicyOnly, }, nil } @@ -516,12 +524,7 @@ func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error { case "tcpState": ie.Value = record.Conn.TCPState case "flowType": - // TODO: assign flow type to support Pod-to-External flows - if record.Conn.SourcePodName == "" || record.Conn.DestinationPodName == "" { - ie.Value = ipfixregistry.InterNode - } else { - ie.Value = ipfixregistry.IntraNode - } + ie.Value = exp.findFlowType(record) } } @@ -544,3 +547,32 @@ func (exp *flowExporter) sendDataSet() (int, error) { klog.V(4).Infof("Data set sent successfully. Bytes sent: %d", sentBytes) return sentBytes, nil } + +func (exp *flowExporter) findFlowType(record flowexporter.FlowRecord) uint8 { + // TODO: support Pod-To-External flows in network policy only mode. + if exp.isNetworkPolicyOnly { + if record.Conn.SourcePodName == "" || record.Conn.DestinationPodName == "" { + return ipfixregistry.InterNode + } + return ipfixregistry.IntraNode + } + + if exp.nodeRouteController == nil { + klog.Warningf("Can't find flowType without nodeRouteController") + return 0 + } + if exp.nodeRouteController.IPInPodSubnets(record.Conn.TupleOrig.SourceAddress) { + if record.Conn.Mark == openflow.ServiceCTMark || exp.nodeRouteController.IPInPodSubnets(record.Conn.TupleOrig.DestinationAddress) { + if record.Conn.SourcePodName == "" || record.Conn.DestinationPodName == "" { + return ipfixregistry.InterNode + } + return ipfixregistry.IntraNode + } else { + return ipfixregistry.ToExternal + } + } else { + // We do not support External-To-Pod flows for now. + klog.Warningf("Source IP: %s doesn't exist in PodCIDRs", record.Conn.TupleOrig.SourceAddress.String()) + return 0 + } +} diff --git a/pkg/util/ip/ip.go b/pkg/util/ip/ip.go index f6b6cfe28e6..306a5c154ad 100644 --- a/pkg/util/ip/ip.go +++ b/pkg/util/ip/ip.go @@ -23,8 +23,8 @@ import ( ) const ( - v4BitLen = 8 * net.IPv4len - v6BitLen = 8 * net.IPv6len + V4BitLen = 8 * net.IPv4len + V6BitLen = 8 * net.IPv6len ) // This function takes in one allow CIDR and multiple except CIDRs and gives diff CIDRs @@ -71,9 +71,9 @@ func diffFromCIDR(allowCIDR, exceptCIDR *net.IPNet) []*net.IPNet { exceptStartIP := exceptCIDR.IP.Mask(exceptCIDR.Mask) var bits int if allowStartIP.To4() != nil { - bits = v4BitLen + bits = V4BitLen } else { - bits = v6BitLen + bits = V6BitLen } // New CIDRs should not contain the IPs in exceptCIDR. Manipulating the bits in start IP of diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 34786da576f..52559859fb6 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + ipfixregistry "github.com/vmware/go-ipfix/pkg/registry" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" @@ -40,6 +41,7 @@ DATA SET: DATA RECORD-0: flowStartSeconds: 1608338066 flowEndSeconds: 1608338072 + flowEndReason: 2 sourceTransportPort: 43600 destinationTransportPort: 5201 protocolIdentifier: 6 @@ -65,6 +67,7 @@ DATA SET: ingressNetworkPolicyNamespace: antrea-test egressNetworkPolicyName: test-flow-aggregator-networkpolicy-egress egressNetworkPolicyNamespace: antrea-test + flowType: 1 destinationClusterIPv4: 0.0.0.0 originalExporterIPv4Address: 10.10.0.1 originalObservationDomainId: 2134708971 @@ -242,8 +245,10 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri // Check if record has both Pod name of source and destination pod. if isIntraNode { checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-b", controlPlaneNodeName()) + checkFlowType(t, record, ipfixregistry.IntraNode) } else { checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-c", workerNodeName(1)) + checkFlowType(t, record, ipfixregistry.InterNode) } if checkService { @@ -336,6 +341,11 @@ func checkBandwidthFromRecord(t *testing.T, record, bandwidth string) { } } +// TODO: Add a test that checks the functionality of Pod-To-External flow. +func checkFlowType(t *testing.T, record string, flowType uint8) { + assert.Containsf(t, record, fmt.Sprintf("%s: %d", "flowType", flowType), "Record does not have correct flowType") +} + func getRecordsFromOutput(output string) []string { re := regexp.MustCompile("(?m)^.*" + "#" + ".*$[\r\n]+") output = re.ReplaceAllString(output, "")