From 45c400ecfe5858a643ea1e2e50c0e42e0bad0647 Mon Sep 17 00:00:00 2001 From: Yongming Ding Date: Thu, 25 Mar 2021 17:12:17 -0700 Subject: [PATCH] Add flowType field for Flow Exporter In this PR, we implemented the logic of flowType value assignment. We distinguished Pod-To-Pod flows and Pod-To-External flows using the podCIDRs of all nodes in the k8s cluster. --- cmd/antrea-agent/agent.go | 3 +- go.sum | 4 +- .../noderoute/node_route_controller.go | 14 ++++++ pkg/agent/flowexporter/exporter/exporter.go | 45 ++++++++++++++++--- test/e2e/flowaggregator_test.go | 9 ++++ 5 files changed, 65 insertions(+), 10 deletions(-) diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 209de4b0d4e..b8914db55af 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -360,7 +360,8 @@ func run(o *Options) error { o.config.EnableTLSToFlowAggregator, v4Enabled, v6Enabled, - k8sClient) + k8sClient, + nodeRouteController) if err != nil { return fmt.Errorf("error when creating IPFIX flow exporter: %v", err) } diff --git a/go.sum b/go.sum index cb53094ad42..f5ca730d7e3 100644 --- a/go.sum +++ b/go.sum @@ -407,8 +407,8 @@ github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7Zo github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= github.com/vmware/go-ipfix v0.4.7 h1:zyKpsifh19Mdmo6FE3C2GooAWiiThX2JV+X4VdzqeZY= github.com/vmware/go-ipfix v0.4.7/go.mod h1:lQz3f4r2pZWo0q8s8BtZ0xo5fPSOYsYteqJgBASP69o= -github.com/wenyingd/ofnet v0.0.0-20210205051801-5a4f247248d4 h1:HwolNov6r/aM4zwA3MiSzxJKUTi3MypPOR6PRCTg1sA= -github.com/wenyingd/ofnet v0.0.0-20210205051801-5a4f247248d4/go.mod h1:8mMMWAYBNUeTGXYKizOLETfN3WIbu3P5DgvS2jiXKdI= +github.com/wenyingd/ofnet v0.0.0-20210318032909-171b6795a2da h1:ragN21nQa4zKuCwR2UEbTXEAh3L2YN/Id5SCVkjjwdY= +github.com/wenyingd/ofnet v0.0.0-20210318032909-171b6795a2da/go.mod h1:8mMMWAYBNUeTGXYKizOLETfN3WIbu3P5DgvS2jiXKdI= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xlab/handysort v0.0.0-20150421192137-fb3537ed64a1/go.mod h1:QcJo0QPSfTONNIgpN5RA8prR7fF8nkF6cTWTcNerRO8= diff --git a/pkg/agent/controller/noderoute/node_route_controller.go b/pkg/agent/controller/noderoute/node_route_controller.go index 4d318b6b4b0..f7f325664ca 100644 --- a/pkg/agent/controller/noderoute/node_route_controller.go +++ b/pkg/agent/controller/noderoute/node_route_controller.go @@ -615,3 +615,17 @@ func GetNodeAddr(node *corev1.Node) (net.IP, error) { } return ipAddr, nil } + +func (c *Controller) GetPodSubnetsFromAllNodes() []*net.IPNet { + podSubnets := []*net.IPNet{} + for _, node := range c.installedNodes.List() { + podSubnets = append(podSubnets, node.(*nodeRouteInfo).podCIDRs...) + } + if c.nodeConfig.PodIPv4CIDR != nil { + podSubnets = append(podSubnets, c.nodeConfig.PodIPv4CIDR) + } + if c.nodeConfig.PodIPv6CIDR != nil { + podSubnets = append(podSubnets, c.nodeConfig.PodIPv6CIDR) + } + return podSubnets +} diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index be2faa8b9eb..1f82754e497 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -27,9 +27,11 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/klog" + "github.com/vmware-tanzu/antrea/pkg/agent/controller/noderoute" "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter" "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/connections" "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/flowrecords" + "github.com/vmware-tanzu/antrea/pkg/agent/openflow" "github.com/vmware-tanzu/antrea/pkg/ipfix" "github.com/vmware-tanzu/antrea/pkg/util/env" ) @@ -94,6 +96,8 @@ type flowExporter struct { idleFlowTimeout time.Duration enableTLSToFlowAggregator bool k8sClient kubernetes.Interface + nodeRouteController *noderoute.Controller + podSubnets []*net.IPNet } func genObservationID() (uint32, error) { @@ -123,7 +127,8 @@ func prepareExporterInputArgs(collectorAddr, collectorProto string) (exporter.Ex func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords.FlowRecords, collectorAddr string, collectorProto string, activeFlowTimeout time.Duration, idleFlowTimeout time.Duration, - enableTLSToFlowAggregator bool, v4Enabled bool, v6Enabled bool, k8sClient kubernetes.Interface) (*flowExporter, error) { + enableTLSToFlowAggregator bool, v4Enabled bool, v6Enabled bool, k8sClient kubernetes.Interface, + nodeRouteController *noderoute.Controller) (*flowExporter, error) { // Initialize IPFIX registry registry := ipfix.NewIPFIXRegistry() registry.LoadRegistry() @@ -133,6 +138,7 @@ func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords if err != nil { return nil, err } + return &flowExporter{ connStore: connStore, flowRecords: records, @@ -145,6 +151,7 @@ func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords ipfixSet: ipfix.NewSet(false), enableTLSToFlowAggregator: enableTLSToFlowAggregator, k8sClient: k8sClient, + nodeRouteController: nodeRouteController, }, nil } @@ -523,12 +530,7 @@ func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error { case "tcpState": ie.Value = record.Conn.TCPState case "flowType": - // TODO: assign flow type to support Pod-to-External flows - if record.Conn.SourcePodName == "" || record.Conn.DestinationPodName == "" { - ie.Value = ipfixregistry.InterNode - } else { - ie.Value = ipfixregistry.IntraNode - } + ie.Value = exp.findFlowType(record) } } @@ -551,3 +553,32 @@ func (exp *flowExporter) sendDataSet() (int, error) { klog.V(4).Infof("Data set sent successfully. Bytes sent: %d", sentBytes) return sentBytes, nil } + +func (exp *flowExporter) findFlowType(record flowexporter.FlowRecord) uint8 { + if record.Conn.Mark == openflow.ServiceCTMark || exp.ipInPodSubnets(record.Conn.TupleOrig.SourceAddress) && exp.ipInPodSubnets(record.Conn.TupleOrig.DestinationAddress) { + if record.Conn.SourcePodName == "" || record.Conn.DestinationPodName == "" { + return ipfixregistry.InterNode + } + return ipfixregistry.IntraNode + } else { + // Update Pod subnets to distinguish Pod-To-External flows. + exp.podSubnets = exp.nodeRouteController.GetPodSubnetsFromAllNodes() + klog.V(4).Infof("Updated Pod subnets: %v", exp.podSubnets) + if !exp.ipInPodSubnets(record.Conn.TupleOrig.SourceAddress) { + return ipfixregistry.FromExternal + } else if !exp.ipInPodSubnets(record.Conn.TupleOrig.DestinationAddress) { + return ipfixregistry.ToExternal + } else { + return exp.findFlowType(record) + } + } +} + +func (exp *flowExporter) ipInPodSubnets(ip net.IP) bool { + for _, subnet := range exp.podSubnets { + if subnet.Contains(ip) { + return true + } + } + return false +} diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index decc0f316c8..95af3997225 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + ipfixregistry "github.com/vmware/go-ipfix/pkg/registry" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" @@ -242,8 +243,10 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri // Check if record has both Pod name of source and destination pod. if isIntraNode { checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-b", controlPlaneNodeName()) + checkFlowType(t, record, ipfixregistry.IntraNode) } else { checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-c", workerNodeName(1)) + checkFlowType(t, record, ipfixregistry.InterNode) } if checkService { @@ -336,6 +339,12 @@ func checkBandwidthFromRecord(t *testing.T, record, bandwidth string) { } } +func checkFlowType(t *testing.T, record string, flowType uint8) { + if !strings.Contains(record, fmt.Sprintf("%s: %d", "flowType", flowType)) { + t.Errorf("Record does not have correct flowType") + } +} + func getRecordsFromOutput(output string) []string { re := regexp.MustCompile("(?m)^.*" + "#" + ".*$[\r\n]+") output = re.ReplaceAllString(output, "")