Skip to content

Commit

Permalink
NETOBSERV-1790: Manage enrichment via "k8s.v1.cni.cncf.io/network-sta…
Browse files Browse the repository at this point in the history
…tus" (#674)

* Manage enrichment via "k8s.v1.cni.cncf.io/network-status"

Manage enrichment by extracting pod IPs from the annotation
"k8s.v1.cni.cncf.io/network-status", which is used (at least) by multus

This allows to correlate Pods with their IPs on secondary interfaces

* rely on mac address when possible

* optional mac-input config

* NETOBSERV-1799: avoid fmt.Sprintf

* Use enriched namespace for infra/app layer

Do not use IP lookup, since now some pods are indexed via MAC and not
IPs

* Use converToString insted of Sprintf (#701)

* remove unecessary import

---------

Co-authored-by: Julien Pinsonneau <[email protected]>
Co-authored-by: Julien Pinsonneau <[email protected]>
  • Loading branch information
3 people authored Sep 2, 2024
1 parent 9145910 commit 72029f5
Show file tree
Hide file tree
Showing 15 changed files with 470 additions and 194 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ parameters:
transform:
type: network
network:
KubeConfigPath: /tmp/config
kubeConfig:
configPath: /tmp/config
rules:
- type: add_subnet
add_subnet:
Expand Down Expand Up @@ -453,7 +454,7 @@ will be generated, and named by appending `parameters` value to the label keys.
If `assignee` is set to `otel` then the output fields of `add_kubernetes` will be produced in opentelemetry format.

> Note: kubernetes connection is done using the first available method:
> 1. configuration parameter `KubeConfigPath` (in the example above `/tmp/config`) or
> 1. configuration parameter `kubeConfig.configPath` (in the example above `/tmp/config`) or
> 2. using `KUBECONFIG` environment variable
> 3. using local `~/.kube/config`

Expand Down
13 changes: 9 additions & 4 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,21 @@ Following is the supported API format for network transformations:
reinterpret_direction: reinterpret flow direction at the node level (instead of net interface), to ease the deduplication process
add_subnet_label: categorize IPs based on known subnets configuration
kubernetes_infra: Kubernetes infra rule configuration
inputs: entry inputs fields
namespaceNameFields: entries for namespace and name input fields
name: name of the object
namespace: namespace of the object
output: entry output field
infra_prefixes: Namespace prefixes that will be tagged as infra
infra_refs: Additional object references to be tagged as infra
name: name of the object
namespace: namespace of the object
kubernetes: Kubernetes rule configuration
input: entry input field
input: entry IP input field
mac-input: Optional entry MAC input field
output: entry output field
assignee: value needs to assign to output field
labels_prefix: labels prefix to use to copy input lables, if empty labels will not be copied
add_zone: If true the rule will add the zone
add_zone: if true the rule will add the zone
add_subnet: Add subnet rule configuration
input: entry input field
output: entry output field
Expand All @@ -266,7 +269,9 @@ Following is the supported API format for network transformations:
input: entry input field
output: entry output field
protocol: entry protocol field
kubeConfigPath: path to kubeconfig file (optional)
kubeConfig: global configuration related to Kubernetes (optional)
configPath: path to kubeconfig file (optional)
managedCNI: a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn, multus
servicesFile: path to services file (optional, default: /etc/services)
protocolsFile: path to protocols file (optional, default: /etc/protocols)
subnetLabels: configure subnet and IPs custom labels
Expand Down
35 changes: 23 additions & 12 deletions pkg/api/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
package api

type TransformNetwork struct {
Rules NetworkTransformRules `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"`
KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"`
ServicesFile string `yaml:"servicesFile,omitempty" json:"servicesFile,omitempty" doc:"path to services file (optional, default: /etc/services)"`
ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"`
SubnetLabels []NetworkTransformSubnetLabel `yaml:"subnetLabels,omitempty" json:"subnetLabels,omitempty" doc:"configure subnet and IPs custom labels"`
DirectionInfo NetworkTransformDirectionInfo `yaml:"directionInfo,omitempty" json:"directionInfo,omitempty" doc:"information to reinterpret flow direction (optional, to use with reinterpret_direction rule)"`
Rules NetworkTransformRules `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"`
KubeConfig NetworkTransformKubeConfig `yaml:"kubeConfig,omitempty" json:"kubeConfig,omitempty" doc:"global configuration related to Kubernetes (optional)"`
ServicesFile string `yaml:"servicesFile,omitempty" json:"servicesFile,omitempty" doc:"path to services file (optional, default: /etc/services)"`
ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"`
SubnetLabels []NetworkTransformSubnetLabel `yaml:"subnetLabels,omitempty" json:"subnetLabels,omitempty" doc:"configure subnet and IPs custom labels"`
DirectionInfo NetworkTransformDirectionInfo `yaml:"directionInfo,omitempty" json:"directionInfo,omitempty" doc:"information to reinterpret flow direction (optional, to use with reinterpret_direction rule)"`
}

func (tn *TransformNetwork) GetServiceFiles() (string, string) {
Expand All @@ -38,6 +38,16 @@ func (tn *TransformNetwork) GetServiceFiles() (string, string) {
return p, s
}

const (
OVN = "ovn"
Multus = "multus"
)

type NetworkTransformKubeConfig struct {
ConfigPath string `yaml:"configPath,omitempty" json:"configPath,omitempty" doc:"path to kubeconfig file (optional)"`
ManagedCNI []string `yaml:"managedCNI,omitempty" json:"managedCNI,omitempty" doc:"a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn, multus"`
}

type TransformNetworkOperationEnum string

const (
Expand All @@ -62,10 +72,10 @@ type NetworkTransformRule struct {
}

type K8sInfraRule struct {
Inputs []string `yaml:"inputs,omitempty" json:"inputs,omitempty" doc:"entry inputs fields"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
InfraPrefixes []string `yaml:"infra_prefixes,omitempty" json:"infra_prefixes,omitempty" doc:"Namespace prefixes that will be tagged as infra"`
InfraRefs []K8sReference `yaml:"infra_refs,omitempty" json:"infra_refs,omitempty" doc:"Additional object references to be tagged as infra"`
NamespaceNameFields []K8sReference `yaml:"namespaceNameFields,omitempty" json:"namespaceNameFields,omitempty" doc:"entries for namespace and name input fields"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
InfraPrefixes []string `yaml:"infra_prefixes,omitempty" json:"infra_prefixes,omitempty" doc:"Namespace prefixes that will be tagged as infra"`
InfraRefs []K8sReference `yaml:"infra_refs,omitempty" json:"infra_refs,omitempty" doc:"Additional object references to be tagged as infra"`
}

type K8sReference struct {
Expand All @@ -74,11 +84,12 @@ type K8sReference struct {
}

type K8sRule struct {
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry IP input field"`
MacInput string `yaml:"mac-input,omitempty" json:"mac-input,omitempty" doc:"Optional entry MAC input field"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"`
LabelsPrefix string `yaml:"labels_prefix,omitempty" json:"labels_prefix,omitempty" doc:"labels prefix to use to copy input lables, if empty labels will not be copied"`
AddZone bool `yaml:"add_zone,omitempty" json:"add_zone,omitempty" doc:"If true the rule will add the zone"`
AddZone bool `yaml:"add_zone,omitempty" json:"add_zone,omitempty" doc:"if true the rule will add the zone"`
}

type NetworkGenericRule struct {
Expand Down
9 changes: 9 additions & 0 deletions pkg/config/generic_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,12 @@ func (m GenericMap) IsTransportProtocol() bool {
}
return false
}

func (m GenericMap) LookupString(key string) (string, bool) {
if v, ok := m[key]; ok {
if s, ok := v.(string); ok {
return s, true
}
}
return "", false
}
4 changes: 2 additions & 2 deletions pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestLokiPipeline(t *testing.T) {

b, err = json.Marshal(params[1])
require.NoError(t, err)
require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"rules":[{"kubernetes":{"input":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"input":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b))
require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"kubeConfig":{},"rules":[{"kubernetes":{"input":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"input":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b))

b, err = json.Marshal(params[2])
require.NoError(t, err)
Expand Down Expand Up @@ -237,7 +237,7 @@ func TestIPFIXPipeline(t *testing.T) {

b, err = json.Marshal(params[1])
require.NoError(t, err)
require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"rules":[{"kubernetes":{"input":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"input":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b))
require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"kubeConfig":{},"rules":[{"kubernetes":{"input":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"input":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b))

b, err = json.Marshal(params[2])
require.NoError(t, err)
Expand Down
10 changes: 10 additions & 0 deletions pkg/pipeline/transform/kubernetes/cni/cni.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package cni

import (
v1 "k8s.io/api/core/v1"
)

type Plugin interface {
GetNodeIPs(node *v1.Node) []string
GetPodIPsAndMACs(pod *v1.Pod) ([]string, []string)
}
63 changes: 63 additions & 0 deletions pkg/pipeline/transform/kubernetes/cni/multus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package cni

import (
"encoding/json"
"fmt"
"strings"

log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
)

const (
statusAnnotation = "k8s.v1.cni.cncf.io/network-status"
)

type MultusPlugin struct {
Plugin
}

func (m *MultusPlugin) GetNodeIPs(_ *v1.Node) []string {
// No CNI-specific logic needed for pods
return nil
}

func (m *MultusPlugin) GetPodIPsAndMACs(pod *v1.Pod) ([]string, []string) {
// Cf https://k8snetworkplumbingwg.github.io/multus-cni/docs/quickstart.html#network-status-annotations
ips, macs, err := extractNetStatusIPsAndMACs(pod.Annotations)
if err != nil {
// Log the error as Info, do not block other ips indexing
log.Infof("failed to index IPs from network-status annotation: %v", err)
}
log.Tracef("GetPodIPsAndMACs found ips: %v macs: %v for pod %s", ips, macs, pod.Name)
return ips, macs
}

type netStatItem struct {
IPs []string `json:"ips"`
MAC string `json:"mac"`
}

func extractNetStatusIPsAndMACs(annotations map[string]string) ([]string, []string, error) {
if statusAnnotationJSON, ok := annotations[statusAnnotation]; ok {
var ips, macs []string
var networks []netStatItem
err := json.Unmarshal([]byte(statusAnnotationJSON), &networks)
if err == nil {
for _, network := range networks {
if len(network.IPs) > 0 {
ips = append(ips, network.IPs...)
}

if len(network.MAC) > 0 {
macs = append(macs, strings.ToUpper(network.MAC))
}
}
return ips, macs, nil
}

return nil, nil, fmt.Errorf("cannot read annotation %s: %w", statusAnnotation, err)
}
// Annotation not present => just ignore, no error
return nil, nil, nil
}
50 changes: 50 additions & 0 deletions pkg/pipeline/transform/kubernetes/cni/multus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package cni

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestExtractNetStatusIPs(t *testing.T) {
// Annotation not found => no error, no ip
ip, mac, err := extractNetStatusIPsAndMACs(map[string]string{})
require.NoError(t, err)
require.Empty(t, ip)
require.Empty(t, mac)

// Annotation malformed => error, no ip
ip, mac, err = extractNetStatusIPsAndMACs(map[string]string{
statusAnnotation: "whatever",
})
require.Error(t, err)
require.Contains(t, err.Error(), "cannot read annotation")
require.Empty(t, ip)
require.Empty(t, mac)

// Valid annotation => no error, ip
ip, mac, err = extractNetStatusIPsAndMACs(map[string]string{
statusAnnotation: `
[{
"name": "cbr0",
"ips": [
"10.244.1.73"
],
"default": true,
"dns": {}
},{
"name": "macvlan-conf",
"interface": "net1",
"ips": [
"192.168.1.205"
],
"mac": "86:1d:96:ff:55:0d",
"dns": {}
}]
`,
})
require.NoError(t, err)
require.Equal(t, []string{"10.244.1.73", "192.168.1.205"}, ip)
require.Equal(t, []string{"86:1D:96:FF:55:0D"}, mac)

}
19 changes: 14 additions & 5 deletions pkg/pipeline/transform/kubernetes/cni/ovn_kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,29 @@ const (
ovnSubnetAnnotation = "k8s.ovn.org/node-subnets"
)

func AddOvnIPs(ips []string, node *v1.Node) []string {
type OVNPlugin struct {
Plugin
}

func (o *OVNPlugin) GetNodeIPs(node *v1.Node) []string {
// Add IP that is used in OVN for some traffic on mp0 interface
// (no IP / error returned when not using ovn-k)
ip, err := findOvnMp0IP(node.Annotations)
if err != nil {
// Log the error as Info, do not block other ips indexing
log.Infof("failed to index OVN mp0 IP: %v", err)
} else if ip != "" {
return append(ips, ip)
return []string{ip}
}
return ips
return nil
}

func (o *OVNPlugin) GetPodIPsAndMACs(_ *v1.Pod) ([]string, []string) {
// No CNI-specific logic needed for pods
return nil, nil
}

func unmarshalAnnotation(annot []byte) (string, error) {
func unmarshalOVNAnnotation(annot []byte) (string, error) {
// Depending on OVN (OCP) version, the annotation might be JSON-encoded as a string (legacy), or an array of strings
var subnetsAsArray map[string][]string
err := json.Unmarshal(annot, &subnetsAsArray)
Expand All @@ -70,7 +79,7 @@ func unmarshalAnnotation(annot []byte) (string, error) {

func findOvnMp0IP(annotations map[string]string) (string, error) {
if subnetsJSON, ok := annotations[ovnSubnetAnnotation]; ok {
subnet, err := unmarshalAnnotation([]byte(subnetsJSON))
subnet, err := unmarshalOVNAnnotation([]byte(subnetsJSON))
if err != nil {
return "", err
}
Expand Down
Loading

0 comments on commit 72029f5

Please sign in to comment.