Skip to content

Commit

Permalink
Add flowType field for Flow Exporter
Browse files Browse the repository at this point in the history
In this PR, we implemented the logic of flowType value assignment.
We distinguished Pod-To-Pod flows and Pod-To-External flows using the
podCIDRs of all nodes in the k8s cluster.
  • Loading branch information
dreamtalen committed Apr 2, 2021
1 parent f4599c0 commit ea9bdca
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 9 deletions.
3 changes: 3 additions & 0 deletions build/yamls/elk-flow-collector/logstash/ipfix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4118,3 +4118,6 @@
135:
- :uint64
- :reverseOctetDeltaCountFromDestinationNode
137:
- :uint8
- :flowType
2 changes: 1 addition & 1 deletion ci/kind/test-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ if $np; then
manifest_args="$manifest_args --np --tun vxlan"
fi

COMMON_IMAGES_LIST=("gcr.io/kubernetes-e2e-test-images/agnhost:2.8" "projects.registry.vmware.com/library/busybox" "projects.registry.vmware.com/antrea/nginx" "projects.registry.vmware.com/antrea/perftool" "projects.registry.vmware.com/antrea/ipfix-collector:v0.4.3")
COMMON_IMAGES_LIST=("gcr.io/kubernetes-e2e-test-images/agnhost:2.8" "projects.registry.vmware.com/library/busybox" "projects.registry.vmware.com/antrea/nginx" "projects.registry.vmware.com/antrea/perftool" "projects.registry.vmware.com/antrea/ipfix-collector:v0.4.7")
for image in "${COMMON_IMAGES_LIST[@]}"; do
docker pull $image
done
Expand Down
3 changes: 2 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ func run(o *Options) error {
o.config.EnableTLSToFlowAggregator,
v4Enabled,
v6Enabled,
k8sClient)
k8sClient,
nodeRouteController)
if err != nil {
return fmt.Errorf("error when creating IPFIX flow exporter: %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions docs/network-flow-visibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ the flow. All the IEs used by the Antrea Flow Exporter are listed below:
| ingressNetworkPolicyNamespace| 56506 | 111 | string |
| egressNetworkPolicyName | 56506 | 112 | string |
| egressNetworkPolicyNamespace | 56506 | 113 | string |
| flowType | 56506 | 137 | unsigned8 |

### Supported capabilities

Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ require (
github.com/golang/protobuf v1.3.2
github.com/google/uuid v1.1.1
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd
github.com/kr/text v0.2.0 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/common v0.4.1
Expand All @@ -42,7 +41,7 @@ require (
github.com/stretchr/testify v1.6.1
github.com/ti-mo/conntrack v0.3.0
github.com/vishvananda/netlink v1.1.0
github.com/vmware/go-ipfix v0.4.5
github.com/vmware/go-ipfix v0.4.7
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495
golang.org/x/mod v0.4.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vmware/go-ipfix v0.4.5 h1:EwG2bQXKT72IzMOsCcbvP1Po2PncLoSoPuYrHf3YrsI=
github.com/vmware/go-ipfix v0.4.5/go.mod h1:lQz3f4r2pZWo0q8s8BtZ0xo5fPSOYsYteqJgBASP69o=
github.com/vmware/go-ipfix v0.4.7 h1:zyKpsifh19Mdmo6FE3C2GooAWiiThX2JV+X4VdzqeZY=
github.com/vmware/go-ipfix v0.4.7/go.mod h1:lQz3f4r2pZWo0q8s8BtZ0xo5fPSOYsYteqJgBASP69o=
github.com/wenyingd/ofnet v0.0.0-20210318032909-171b6795a2da h1:ragN21nQa4zKuCwR2UEbTXEAh3L2YN/Id5SCVkjjwdY=
github.com/wenyingd/ofnet v0.0.0-20210318032909-171b6795a2da/go.mod h1:8mMMWAYBNUeTGXYKizOLETfN3WIbu3P5DgvS2jiXKdI=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
Expand Down
42 changes: 41 additions & 1 deletion pkg/agent/controller/noderoute/node_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ const (
ovsExternalIDNodeName = "node-name"

nodeRouteInfoPodCIDRIndexName = "podCIDR"

PodIPv4CIDRMaskSize = 24
IPv4BitLen = 32
PodIPv6CIDRMaskSize = 64
IPv6BitLen = 128
)

// Controller is responsible for setting up necessary IP routes and Openflow entries for inter-node traffic.
Expand All @@ -71,6 +76,7 @@ type Controller struct {
// The key is the host name of the Node, the value is the nodeRouteInfo of the Node.
// A node will be in the map after its flows and routes are installed successfully.
installedNodes cache.Indexer
PodCIDRIPsMap map[string]int
}

// NewNodeRouteController instantiates a new Controller object which will process Node events
Expand All @@ -97,7 +103,9 @@ func NewNodeRouteController(
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "noderoute"),
installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc})}
installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc}),
PodCIDRIPsMap: make(map[string]int),
}
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(cur interface{}) {
Expand All @@ -112,6 +120,12 @@ func NewNodeRouteController(
},
nodeResyncPeriod,
)
if nodeConfig.PodIPv4CIDR != nil {
controller.PodCIDRIPsMap[nodeConfig.PodIPv4CIDR.IP.String()] = 1
}
if nodeConfig.PodIPv6CIDR != nil {
controller.PodCIDRIPsMap[nodeConfig.PodIPv6CIDR.IP.String()] = 1
}
return controller
}

Expand Down Expand Up @@ -381,6 +395,15 @@ func (c *Controller) deleteNodeRoute(nodeName string) error {
if err := c.routeClient.DeleteRoutes(podCIDR); err != nil {
return fmt.Errorf("failed to delete the route to Node %s: %v", nodeName, err)
}
podCIDRStr := podCIDR.IP.String()
if _, ok := c.PodCIDRIPsMap[podCIDRStr]; ok {
c.PodCIDRIPsMap[podCIDRStr] -= 1
if c.PodCIDRIPsMap[podCIDRStr] == 0 {
delete(c.PodCIDRIPsMap, podCIDRStr)
}
} else {
klog.Warningf("PodCIDR IP: %s doesn't exist in PodCIDRIPsMap", podCIDRStr)
}
}
if err := c.ofClient.UninstallNodeFlows(nodeName); err != nil {
return fmt.Errorf("failed to uninstall flows to Node %s: %v", nodeName, err)
Expand Down Expand Up @@ -452,6 +475,12 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error {
peerGatewayIP := ip.NextIP(peerPodCIDRAddr)
peerConfig[peerPodCIDR] = peerGatewayIP
podCIDRs = append(podCIDRs, peerPodCIDR)
podCIDRStr := peerPodCIDR.IP.String()
if _, ok := c.PodCIDRIPsMap[podCIDRStr]; ok {
c.PodCIDRIPsMap[podCIDRStr] += 1
} else {
c.PodCIDRIPsMap[podCIDRStr] = 1
}
}

peerNodeIP, err := GetNodeAddr(node)
Expand Down Expand Up @@ -615,3 +644,14 @@ func GetNodeAddr(node *corev1.Node) (net.IP, error) {
}
return ipAddr, nil
}

func (c *Controller) IPInPodSubnets(ip net.IP) bool {
var ipStr string
if ip.To4() != nil {
ipStr = ip.Mask(net.CIDRMask(PodIPv4CIDRMaskSize, IPv4BitLen)).String()
} else {
ipStr = ip.Mask(net.CIDRMask(PodIPv6CIDRMaskSize, IPv6BitLen)).String()
}
_, ok := c.PodCIDRIPsMap[ipStr]
return ok
}
33 changes: 32 additions & 1 deletion pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/klog"

"github.com/vmware-tanzu/antrea/pkg/agent/controller/noderoute"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/connections"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/flowrecords"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
"github.com/vmware-tanzu/antrea/pkg/ipfix"
"github.com/vmware-tanzu/antrea/pkg/util/env"
)
Expand Down Expand Up @@ -69,6 +71,7 @@ var (
"ingressNetworkPolicyNamespace",
"egressNetworkPolicyName",
"egressNetworkPolicyNamespace",
"flowType",
}
AntreaInfoElementsIPv4 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv4"}...)
AntreaInfoElementsIPv6 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv6"}...)
Expand Down Expand Up @@ -97,6 +100,8 @@ type flowExporter struct {
idleFlowTimeout time.Duration
enableTLSToFlowAggregator bool
k8sClient kubernetes.Interface
nodeRouteController *noderoute.Controller
podSubnets []*net.IPNet
}

func genObservationID() (uint32, error) {
Expand Down Expand Up @@ -126,7 +131,8 @@ func prepareExporterInputArgs(collectorAddr, collectorProto string) (exporter.Ex

func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords.FlowRecords,
collectorAddr string, collectorProto string, activeFlowTimeout time.Duration, idleFlowTimeout time.Duration,
enableTLSToFlowAggregator bool, v4Enabled bool, v6Enabled bool, k8sClient kubernetes.Interface) (*flowExporter, error) {
enableTLSToFlowAggregator bool, v4Enabled bool, v6Enabled bool, k8sClient kubernetes.Interface,
nodeRouteController *noderoute.Controller) (*flowExporter, error) {
// Initialize IPFIX registry
registry := ipfix.NewIPFIXRegistry()
registry.LoadRegistry()
Expand All @@ -136,6 +142,7 @@ func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords
if err != nil {
return nil, err
}

return &flowExporter{
connStore: connStore,
flowRecords: records,
Expand All @@ -148,6 +155,7 @@ func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords
ipfixSet: ipfix.NewSet(false),
enableTLSToFlowAggregator: enableTLSToFlowAggregator,
k8sClient: k8sClient,
nodeRouteController: nodeRouteController,
}, nil
}

Expand Down Expand Up @@ -517,6 +525,8 @@ func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error {
ie.Value = record.Conn.EgressNetworkPolicyName
case "egressNetworkPolicyNamespace":
ie.Value = record.Conn.EgressNetworkPolicyNamespace
case "flowType":
ie.Value = exp.findFlowType(record)
}
}

Expand All @@ -539,3 +549,24 @@ func (exp *flowExporter) sendDataSet() (int, error) {
klog.V(4).Infof("Data set sent successfully. Bytes sent: %d", sentBytes)
return sentBytes, nil
}

func (exp *flowExporter) findFlowType(record flowexporter.FlowRecord) uint8 {
if exp.nodeRouteController == nil {
klog.Warningf("Can't find flowType without nodeRouteController")
return 0
}
if exp.nodeRouteController.IPInPodSubnets(record.Conn.TupleOrig.SourceAddress) {
if record.Conn.Mark == openflow.ServiceCTMark || exp.nodeRouteController.IPInPodSubnets(record.Conn.TupleOrig.DestinationAddress) {
if record.Conn.SourcePodName == "" || record.Conn.DestinationPodName == "" {
return ipfixregistry.InterNode
}
return ipfixregistry.IntraNode
} else {
return ipfixregistry.ToExternal
}
} else {
// We do not support External-To-Pod flows for now.
klog.Warningf("Source IP: %s doesn't exist in PodCIDRs", record.Conn.TupleOrig.SourceAddress.String())
return 0
}
}
2 changes: 2 additions & 0 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ var (
"ingressNetworkPolicyNamespace",
"egressNetworkPolicyName",
"egressNetworkPolicyNamespace",
"flowType",
}
antreaInfoElementsIPv4 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv4"}...)
antreaInfoElementsIPv6 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv6"}...)
Expand All @@ -75,6 +76,7 @@ var (

nonStatsElementList = []string{
"flowEndSeconds",
"flowEndReason",
}
statsElementList = []string{
"octetDeltaCount",
Expand Down
9 changes: 9 additions & 0 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
ipfixregistry "github.com/vmware/go-ipfix/pkg/registry"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
Expand Down Expand Up @@ -242,8 +243,10 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
// Check if record has both Pod name of source and destination pod.
if isIntraNode {
checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-b", controlPlaneNodeName())
checkFlowType(t, record, ipfixregistry.IntraNode)
} else {
checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-c", workerNodeName(1))
checkFlowType(t, record, ipfixregistry.InterNode)
}

if checkService {
Expand Down Expand Up @@ -336,6 +339,12 @@ func checkBandwidthFromRecord(t *testing.T, record, bandwidth string) {
}
}

func checkFlowType(t *testing.T, record string, flowType uint8) {
if !strings.Contains(record, fmt.Sprintf("%s: %d", "flowType", flowType)) {
t.Errorf("Record does not have correct flowType")
}
}

func getRecordsFromOutput(output string) []string {
re := regexp.MustCompile("(?m)^.*" + "#" + ".*$[\r\n]+")
output = re.ReplaceAllString(output, "")
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ const (
busyboxImage = "projects.registry.vmware.com/library/busybox"
nginxImage = "projects.registry.vmware.com/antrea/nginx"
perftoolImage = "projects.registry.vmware.com/antrea/perftool"
ipfixCollectorImage = "projects.registry.vmware.com/antrea/ipfix-collector:v0.4.3"
ipfixCollectorImage = "projects.registry.vmware.com/antrea/ipfix-collector:v0.4.7"
ipfixCollectorPort = "4739"

nginxLBService = "nginx-loadbalancer"
Expand Down

0 comments on commit ea9bdca

Please sign in to comment.