Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-1875: Make enrichment indexes configurable #711

Merged
merged 3 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ parameters:
output: dstLocation
- type: add_kubernetes
kubernetes:
input: srcIP
ipField: srcIP
output: srcK8S
```

Expand All @@ -443,7 +443,7 @@ All the geo-location fields will be named by appending `output` value
(e.g., `CountryName`, `CountryLongName`, `RegionName`, `CityName` , `Longitude` and `Latitude`)

The rule `add_kubernetes` generates new fields with kubernetes information by
matching the `input` value (`srcIP` in the example above) with kubernetes `nodes`, `pods` and `services` IPs.
matching the `ipField` value (`srcIP` in the example above) with kubernetes `nodes`, `pods` and `services` IPs.
All the kubernetes fields will be named by appending `output` value
(`srcK8S` in the example above) to the kubernetes metadata field names
(e.g., `Namespace`, `Name`, `Type`, `HostIP`, `OwnerName`, `OwnerType` )
Expand Down
2 changes: 1 addition & 1 deletion cmd/flowlogs-pipeline/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestPipelineConfigSetup(t *testing.T) {

js := `{
"PipeLine": "[{\"name\":\"grpc\"},{\"follows\":\"grpc\",\"name\":\"enrich\"},{\"follows\":\"enrich\",\"name\":\"loki\"},{\"follows\":\"enrich\",\"name\":\"prometheus\"}]",
"Parameters": "[{\"ingest\":{\"grpc\":{\"port\":2055},\"type\":\"grpc\"},\"name\":\"grpc\"},{\"name\":\"enrich\",\"transform\":{\"network\":{\"rules\":[{\"kubernetes\":{\"input\":\"SrcAddr\",\"output\":\"SrcK8S\"},\"type\":\"add_kubernetes\"},{\"kubernetes\":{\"input\":\"DstAddr\",\"output\":\"DstK8S\"},\"type\":\"add_kubernetes\"},{\"add_service\":{\"input\":\"DstPort\",\"output\":\"Service\",\"protocol\":\"Proto\"},\"type\":\"add_service\"},{\"add_subnet\":{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet\",\"subnet_mask\":\"/16\"},\"type\":\"add_subnet\"}]},\"type\":\"network\"}},{\"name\":\"loki\",\"write\":{\"loki\":{\"batchSize\":102400,\"batchWait\":\"1s\",\"clientConfig\":{\"follow_redirects\":false,\"proxy_url\":null,\"tls_config\":{\"insecure_skip_verify\":false}},\"labels\":[\"SrcK8S_Namespace\",\"SrcK8S_OwnerName\",\"DstK8S_Namespace\",\"DstK8S_OwnerName\",\"FlowDirection\"],\"maxBackoff\":\"5m0s\",\"maxRetries\":10,\"minBackoff\":\"1s\",\"staticLabels\":{\"app\":\"netobserv-flowcollector\"},\"tenantID\":\"netobserv\",\"timeout\":\"10s\",\"timestampLabel\":\"TimeFlowEndMs\",\"timestampScale\":\"1ms\",\"url\":\"http://loki.netobserv.svc:3100/\"},\"type\":\"loki\"}},{\"encode\":{\"prom\":{\"metrics\":[{\"buckets\":null,\"labels\":[\"Service\",\"SrcK8S_Namespace\"],\"name\":\"bandwidth_per_network_service_per_namespace\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"SrcSubnet\"],\"name\":\"bandwidth_per_source_subnet\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"Service\"],\"name\":\"network_service_total\",\"type\":\"counter\",\"valueKey\":\"\"}],\"prefix\":\"netobserv_\"},\"type\":\"prom\"},\"name\":\"prometheus\"}]",
"Parameters": "[{\"ingest\":{\"grpc\":{\"port\":2055},\"type\":\"grpc\"},\"name\":\"grpc\"},{\"name\":\"enrich\",\"transform\":{\"network\":{\"rules\":[{\"kubernetes\":{\"ipField\":\"SrcAddr\",\"output\":\"SrcK8S\"},\"type\":\"add_kubernetes\"},{\"kubernetes\":{\"ipField\":\"DstAddr\",\"output\":\"DstK8S\"},\"type\":\"add_kubernetes\"},{\"add_service\":{\"input\":\"DstPort\",\"output\":\"Service\",\"protocol\":\"Proto\"},\"type\":\"add_service\"},{\"add_subnet\":{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet\",\"subnet_mask\":\"/16\"},\"type\":\"add_subnet\"}]},\"type\":\"network\"}},{\"name\":\"loki\",\"write\":{\"loki\":{\"batchSize\":102400,\"batchWait\":\"1s\",\"clientConfig\":{\"follow_redirects\":false,\"proxy_url\":null,\"tls_config\":{\"insecure_skip_verify\":false}},\"labels\":[\"SrcK8S_Namespace\",\"SrcK8S_OwnerName\",\"DstK8S_Namespace\",\"DstK8S_OwnerName\",\"FlowDirection\"],\"maxBackoff\":\"5m0s\",\"maxRetries\":10,\"minBackoff\":\"1s\",\"staticLabels\":{\"app\":\"netobserv-flowcollector\"},\"tenantID\":\"netobserv\",\"timeout\":\"10s\",\"timestampLabel\":\"TimeFlowEndMs\",\"timestampScale\":\"1ms\",\"url\":\"http://loki.netobserv.svc:3100/\"},\"type\":\"loki\"}},{\"encode\":{\"prom\":{\"metrics\":[{\"buckets\":null,\"labels\":[\"Service\",\"SrcK8S_Namespace\"],\"name\":\"bandwidth_per_network_service_per_namespace\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"SrcSubnet\"],\"name\":\"bandwidth_per_source_subnet\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"Service\"],\"name\":\"network_service_total\",\"type\":\"counter\",\"valueKey\":\"\"}],\"prefix\":\"netobserv_\"},\"type\":\"prom\"},\"name\":\"prometheus\"}]",
"Health": {
"Port": "8080"
},
Expand Down
2 changes: 1 addition & 1 deletion contrib/kubernetes/flowlogs-pipeline.conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ parameters:
subnet_mask: /16
- type: add_kubernetes
kubernetes:
input: srcIP
ipField: srcIP
output: srcK8S
labels_prefix: srcK8S_labels
- type: add_location
Expand Down
10 changes: 7 additions & 3 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,9 @@ Following is the supported API format for network transformations:
name: name of the object
namespace: namespace of the object
kubernetes: Kubernetes rule configuration
input: entry IP input field
mac-input: Optional entry MAC input field
ipField: entry IP input field
interfacesField: entry Interfaces input field
macField: entry MAC input field
output: entry output field
assignee: value needs to assign to output field
labels_prefix: labels prefix to use to copy input lables, if empty labels will not be copied
Expand All @@ -272,7 +273,10 @@ Following is the supported API format for network transformations:
protocol: entry protocol field
kubeConfig: global configuration related to Kubernetes (optional)
configPath: path to kubeconfig file (optional)
managedCNI: a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn, multus
secondaryNetworks: configuration for secondary networks
name: name of the secondary network, as mentioned in the annotation 'k8s.v1.cni.cncf.io/network-status'
index: fields to use for indexing, must be any combination of 'mac', 'ip', 'interface'
managedCNI: a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn
servicesFile: path to services file (optional, default: /etc/services)
protocolsFile: path to protocols file (optional, default: /etc/protocols)
subnetLabels: configure subnet and IPs custom labels
Expand Down
8 changes: 8 additions & 0 deletions docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
| **Labels** | stage |


### secondary_network_indexer_hit
| **Name** | secondary_network_indexer_hit |
|:---|:---|
| **Description** | Counter of hits per secondary network index for Kubernetes enrichment |
| **Type** | counter |
| **Labels** | kind, network, warning |


### stage_duration_ms
| **Name** | stage_duration_ms |
|:---|:---|
Expand Down
26 changes: 16 additions & 10 deletions pkg/api/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ func (tn *TransformNetwork) GetServiceFiles() (string, string) {
}

const (
OVN = "ovn"
Multus = "multus"
OVN = "ovn"
)

type NetworkTransformKubeConfig struct {
ConfigPath string `yaml:"configPath,omitempty" json:"configPath,omitempty" doc:"path to kubeconfig file (optional)"`
ManagedCNI []string `yaml:"managedCNI,omitempty" json:"managedCNI,omitempty" doc:"a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn, multus"`
ConfigPath string `yaml:"configPath,omitempty" json:"configPath,omitempty" doc:"path to kubeconfig file (optional)"`
SecondaryNetworks []SecondaryNetwork `yaml:"secondaryNetworks,omitempty" json:"secondaryNetworks,omitempty" doc:"configuration for secondary networks"`
ManagedCNI []string `yaml:"managedCNI,omitempty" json:"managedCNI,omitempty" doc:"a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn"`
}

type TransformNetworkOperationEnum string
Expand Down Expand Up @@ -84,12 +84,18 @@ type K8sReference struct {
}

type K8sRule struct {
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry IP input field"`
MacInput string `yaml:"mac-input,omitempty" json:"mac-input,omitempty" doc:"Optional entry MAC input field"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"`
LabelsPrefix string `yaml:"labels_prefix,omitempty" json:"labels_prefix,omitempty" doc:"labels prefix to use to copy input lables, if empty labels will not be copied"`
AddZone bool `yaml:"add_zone,omitempty" json:"add_zone,omitempty" doc:"if true the rule will add the zone"`
IPField string `yaml:"ipField,omitempty" json:"ipField,omitempty" doc:"entry IP input field"`
InterfacesField string `yaml:"interfacesField,omitempty" json:"interfacesField,omitempty" doc:"entry Interfaces input field"`
MACField string `yaml:"macField,omitempty" json:"macField,omitempty" doc:"entry MAC input field"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"`
LabelsPrefix string `yaml:"labels_prefix,omitempty" json:"labels_prefix,omitempty" doc:"labels prefix to use to copy input lables, if empty labels will not be copied"`
AddZone bool `yaml:"add_zone,omitempty" json:"add_zone,omitempty" doc:"if true the rule will add the zone"`
}

type SecondaryNetwork struct {
Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"name of the secondary network, as mentioned in the annotation 'k8s.v1.cni.cncf.io/network-status'"`
Index map[string]any `yaml:"index,omitempty" json:"index,omitempty" doc:"fields to use for indexing, must be any combination of 'mac', 'ip', 'interface'"`
}

type NetworkGenericRule struct {
Expand Down
14 changes: 7 additions & 7 deletions pkg/confgen/dedup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ import (

func Test_dedupeNetworkTransformRules(t *testing.T) {
slice := api.NetworkTransformRules{
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i1", Output: "o1"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i3", Output: "o3"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i1", Output: "o1"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i2", Output: "o2"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i3", Output: "o3"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i2", Output: "o2"}},
}
expected := api.NetworkTransformRules{
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i1", Output: "o1"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i3", Output: "o3"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i1", Output: "o1"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i2", Output: "o2"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i3", Output: "o3"}},
}
actual := dedupeNetworkTransformRules(slice)

Expand Down
20 changes: 10 additions & 10 deletions pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ func TestLokiPipeline(t *testing.T) {
pl = pl.TransformNetwork("enrich", api.TransformNetwork{Rules: api.NetworkTransformRules{{
Type: api.NetworkAddKubernetes,
Kubernetes: &api.K8sRule{
Input: "SrcAddr",
Output: "SrcK8S",
IPField: "SrcAddr",
Output: "SrcK8S",
},
}, {
Type: api.NetworkAddKubernetes,
Kubernetes: &api.K8sRule{
Input: "DstAddr",
Output: "DstK8S",
IPField: "DstAddr",
Output: "DstK8S",
},
}}})
pl = pl.WriteLoki("loki", api.WriteLoki{URL: "http://loki:3100/"})
Expand All @@ -58,7 +58,7 @@ func TestLokiPipeline(t *testing.T) {

b, err = json.Marshal(params[1])
require.NoError(t, err)
require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"kubeConfig":{},"rules":[{"kubernetes":{"input":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"input":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b))
require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"kubeConfig":{},"rules":[{"kubernetes":{"ipField":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"ipField":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b))

b, err = json.Marshal(params[2])
require.NoError(t, err)
Expand Down Expand Up @@ -206,14 +206,14 @@ func TestIPFIXPipeline(t *testing.T) {
pl = pl.TransformNetwork("enrich", api.TransformNetwork{Rules: api.NetworkTransformRules{{
Type: api.NetworkAddKubernetes,
Kubernetes: &api.K8sRule{
Input: "SrcAddr",
Output: "SrcK8S",
IPField: "SrcAddr",
Output: "SrcK8S",
},
}, {
Type: api.NetworkAddKubernetes,
Kubernetes: &api.K8sRule{
Input: "DstAddr",
Output: "DstK8S",
IPField: "DstAddr",
Output: "DstK8S",
},
}}})
pl = pl.WriteIpfix("ipfix", api.WriteIpfix{
Expand All @@ -238,7 +238,7 @@ func TestIPFIXPipeline(t *testing.T) {

b, err = json.Marshal(params[1])
require.NoError(t, err)
require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"kubeConfig":{},"rules":[{"kubernetes":{"input":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"input":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b))
require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"kubeConfig":{},"rules":[{"kubernetes":{"ipField":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"ipField":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b))

b, err = json.Marshal(params[2])
require.NoError(t, err)
Expand Down
13 changes: 13 additions & 0 deletions pkg/operational/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ var (
TypeHistogram,
"stage",
)
indexerHit = DefineMetric(
"secondary_network_indexer_hit",
"Counter of hits per secondary network index for Kubernetes enrichment",
TypeCounter,
"kind",
"namespace",
"network",
"warning",
)
)

func (def *MetricDefinition) mapLabels(labels []string) prometheus.Labels {
Expand Down Expand Up @@ -241,6 +250,10 @@ func (o *Metrics) GetOrCreateStageDurationHisto() *prometheus.HistogramVec {
return o.stageDurationHisto
}

func (o *Metrics) CreateIndexerHitCounter() *prometheus.CounterVec {
return o.NewCounterVec(&indexerHit)
}

func GetDocumentation() string {
doc := ""
sort.Slice(allMetrics, func(i, j int) bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func getWriter(opMetrics *operational.Metrics, params config.StageParam) (write.
return writer, err
}

func getTransformer(_ *operational.Metrics, params config.StageParam) (transform.Transformer, error) {
func getTransformer(opMetrics *operational.Metrics, params config.StageParam) (transform.Transformer, error) {
var transformer transform.Transformer
var err error
switch params.Transform.Type {
Expand All @@ -443,7 +443,7 @@ func getTransformer(_ *operational.Metrics, params config.StageParam) (transform
case api.FilterType:
transformer, err = transform.NewTransformFilter(params)
case api.NetworkType:
transformer, err = transform.NewTransformNetwork(params)
transformer, err = transform.NewTransformNetwork(params, opMetrics)
case api.NoneType:
transformer, err = transform.NewTransformNone()
default:
Expand Down
1 change: 0 additions & 1 deletion pkg/pipeline/transform/kubernetes/cni/cni.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,4 @@ import (

type Plugin interface {
GetNodeIPs(node *v1.Node) []string
GetPodIPsAndMACs(pod *v1.Pod) ([]string, []string)
}
Loading
Loading