Skip to content

Commit

Permalink
Add flowType field for Flow Exporter
Browse files Browse the repository at this point in the history
In this PR, we implemented the logic of flowType value assignment.
We distinguished Pod-To-Pod flows and Pod-To-External flows using the
podCIDRs of all nodes in the k8s cluster.
  • Loading branch information
dreamtalen committed Mar 29, 2021
1 parent df5298d commit 45c400e
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 10 deletions.
3 changes: 2 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ func run(o *Options) error {
o.config.EnableTLSToFlowAggregator,
v4Enabled,
v6Enabled,
k8sClient)
k8sClient,
nodeRouteController)
if err != nil {
return fmt.Errorf("error when creating IPFIX flow exporter: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7Zo
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vmware/go-ipfix v0.4.7 h1:zyKpsifh19Mdmo6FE3C2GooAWiiThX2JV+X4VdzqeZY=
github.com/vmware/go-ipfix v0.4.7/go.mod h1:lQz3f4r2pZWo0q8s8BtZ0xo5fPSOYsYteqJgBASP69o=
github.com/wenyingd/ofnet v0.0.0-20210205051801-5a4f247248d4 h1:HwolNov6r/aM4zwA3MiSzxJKUTi3MypPOR6PRCTg1sA=
github.com/wenyingd/ofnet v0.0.0-20210205051801-5a4f247248d4/go.mod h1:8mMMWAYBNUeTGXYKizOLETfN3WIbu3P5DgvS2jiXKdI=
github.com/wenyingd/ofnet v0.0.0-20210318032909-171b6795a2da h1:ragN21nQa4zKuCwR2UEbTXEAh3L2YN/Id5SCVkjjwdY=
github.com/wenyingd/ofnet v0.0.0-20210318032909-171b6795a2da/go.mod h1:8mMMWAYBNUeTGXYKizOLETfN3WIbu3P5DgvS2jiXKdI=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xlab/handysort v0.0.0-20150421192137-fb3537ed64a1/go.mod h1:QcJo0QPSfTONNIgpN5RA8prR7fF8nkF6cTWTcNerRO8=
Expand Down
14 changes: 14 additions & 0 deletions pkg/agent/controller/noderoute/node_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,3 +615,17 @@ func GetNodeAddr(node *corev1.Node) (net.IP, error) {
}
return ipAddr, nil
}

func (c *Controller) GetPodSubnetsFromAllNodes() []*net.IPNet {
podSubnets := []*net.IPNet{}
for _, node := range c.installedNodes.List() {
podSubnets = append(podSubnets, node.(*nodeRouteInfo).podCIDRs...)
}
if c.nodeConfig.PodIPv4CIDR != nil {
podSubnets = append(podSubnets, c.nodeConfig.PodIPv4CIDR)
}
if c.nodeConfig.PodIPv6CIDR != nil {
podSubnets = append(podSubnets, c.nodeConfig.PodIPv6CIDR)
}
return podSubnets
}
45 changes: 38 additions & 7 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/klog"

"github.com/vmware-tanzu/antrea/pkg/agent/controller/noderoute"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/connections"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/flowrecords"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
"github.com/vmware-tanzu/antrea/pkg/ipfix"
"github.com/vmware-tanzu/antrea/pkg/util/env"
)
Expand Down Expand Up @@ -94,6 +96,8 @@ type flowExporter struct {
idleFlowTimeout time.Duration
enableTLSToFlowAggregator bool
k8sClient kubernetes.Interface
nodeRouteController *noderoute.Controller
podSubnets []*net.IPNet
}

func genObservationID() (uint32, error) {
Expand Down Expand Up @@ -123,7 +127,8 @@ func prepareExporterInputArgs(collectorAddr, collectorProto string) (exporter.Ex

func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords.FlowRecords,
collectorAddr string, collectorProto string, activeFlowTimeout time.Duration, idleFlowTimeout time.Duration,
enableTLSToFlowAggregator bool, v4Enabled bool, v6Enabled bool, k8sClient kubernetes.Interface) (*flowExporter, error) {
enableTLSToFlowAggregator bool, v4Enabled bool, v6Enabled bool, k8sClient kubernetes.Interface,
nodeRouteController *noderoute.Controller) (*flowExporter, error) {
// Initialize IPFIX registry
registry := ipfix.NewIPFIXRegistry()
registry.LoadRegistry()
Expand All @@ -133,6 +138,7 @@ func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords
if err != nil {
return nil, err
}

return &flowExporter{
connStore: connStore,
flowRecords: records,
Expand All @@ -145,6 +151,7 @@ func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords
ipfixSet: ipfix.NewSet(false),
enableTLSToFlowAggregator: enableTLSToFlowAggregator,
k8sClient: k8sClient,
nodeRouteController: nodeRouteController,
}, nil
}

Expand Down Expand Up @@ -523,12 +530,7 @@ func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error {
case "tcpState":
ie.Value = record.Conn.TCPState
case "flowType":
// TODO: assign flow type to support Pod-to-External flows
if record.Conn.SourcePodName == "" || record.Conn.DestinationPodName == "" {
ie.Value = ipfixregistry.InterNode
} else {
ie.Value = ipfixregistry.IntraNode
}
ie.Value = exp.findFlowType(record)
}
}

Expand All @@ -551,3 +553,32 @@ func (exp *flowExporter) sendDataSet() (int, error) {
klog.V(4).Infof("Data set sent successfully. Bytes sent: %d", sentBytes)
return sentBytes, nil
}

func (exp *flowExporter) findFlowType(record flowexporter.FlowRecord) uint8 {
if record.Conn.Mark == openflow.ServiceCTMark || exp.ipInPodSubnets(record.Conn.TupleOrig.SourceAddress) && exp.ipInPodSubnets(record.Conn.TupleOrig.DestinationAddress) {
if record.Conn.SourcePodName == "" || record.Conn.DestinationPodName == "" {
return ipfixregistry.InterNode
}
return ipfixregistry.IntraNode
} else {
// Update Pod subnets to distinguish Pod-To-External flows.
exp.podSubnets = exp.nodeRouteController.GetPodSubnetsFromAllNodes()
klog.V(4).Infof("Updated Pod subnets: %v", exp.podSubnets)
if !exp.ipInPodSubnets(record.Conn.TupleOrig.SourceAddress) {
return ipfixregistry.FromExternal
} else if !exp.ipInPodSubnets(record.Conn.TupleOrig.DestinationAddress) {
return ipfixregistry.ToExternal
} else {
return exp.findFlowType(record)
}
}
}

func (exp *flowExporter) ipInPodSubnets(ip net.IP) bool {
for _, subnet := range exp.podSubnets {
if subnet.Contains(ip) {
return true
}
}
return false
}
9 changes: 9 additions & 0 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
ipfixregistry "github.com/vmware/go-ipfix/pkg/registry"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
Expand Down Expand Up @@ -242,8 +243,10 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
// Check if record has both Pod name of source and destination pod.
if isIntraNode {
checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-b", controlPlaneNodeName())
checkFlowType(t, record, ipfixregistry.IntraNode)
} else {
checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-c", workerNodeName(1))
checkFlowType(t, record, ipfixregistry.InterNode)
}

if checkService {
Expand Down Expand Up @@ -336,6 +339,12 @@ func checkBandwidthFromRecord(t *testing.T, record, bandwidth string) {
}
}

func checkFlowType(t *testing.T, record string, flowType uint8) {
if !strings.Contains(record, fmt.Sprintf("%s: %d", "flowType", flowType)) {
t.Errorf("Record does not have correct flowType")
}
}

func getRecordsFromOutput(output string) []string {
re := regexp.MustCompile("(?m)^.*" + "#" + ".*$[\r\n]+")
output = re.ReplaceAllString(output, "")
Expand Down

0 comments on commit 45c400e

Please sign in to comment.