From cfe771ccacd47267a2f877d63b160019434c97b4 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Wed, 5 Jun 2024 10:58:04 -0700 Subject: [PATCH 1/2] Always include Pod labels in FlowAggregator IPFIX template We always include the Pod labels IEs (for source and destination Pods), regardless of the value of the recordContents.podLabels configuration parameter. This simplifies the logic and the IPFIXExporter no longer needs to be aware of this configuration. There will be a minor size increase to the IPFIX records exported by the FlowAggregator when recordContents.podLabels is false, as we will need to include empty strings in the records for the 2 IEs. We use an empty string when recordContents.podLabels is false, or when the endpoint is not a Pod. We use an empty JSON dictionary ("{}"), when the Pod has no labels. Fixes #6386 Signed-off-by: Antonin Bas --- pkg/flowaggregator/exporter/ipfix.go | 22 +-- pkg/flowaggregator/exporter/ipfix_test.go | 52 ++--- pkg/flowaggregator/flowaggregator.go | 36 ++-- pkg/flowaggregator/flowaggregator_test.go | 222 ++++++++++++---------- 4 files changed, 170 insertions(+), 162 deletions(-) diff --git a/pkg/flowaggregator/exporter/ipfix.go b/pkg/flowaggregator/exporter/ipfix.go index e7331b25434..35f46a9ac17 100644 --- a/pkg/flowaggregator/exporter/ipfix.go +++ b/pkg/flowaggregator/exporter/ipfix.go @@ -45,7 +45,6 @@ type IPFIXExporter struct { externalFlowCollectorProto string exportingProcess ipfix.IPFIXExportingProcess sendJSONRecord bool - includePodLabels bool observationDomainID uint32 templateIDv4 uint16 templateIDv6 uint16 @@ -95,7 +94,6 @@ func NewIPFIXExporter( externalFlowCollectorAddr: opt.ExternalFlowCollectorAddr, externalFlowCollectorProto: opt.ExternalFlowCollectorProto, sendJSONRecord: sendJSONRecord, - includePodLabels: opt.Config.RecordContents.PodLabels, observationDomainID: observationDomainID, registry: registry, set: ipfixentities.NewSet(false), @@ -127,7 +125,7 @@ func (e *IPFIXExporter) AddRecord(record ipfixentities.Record, isRecordIPv6 bool func (e *IPFIXExporter) UpdateOptions(opt *options.Options) { config := opt.Config.FlowCollector - if reflect.DeepEqual(config, e.config) && opt.Config.RecordContents.PodLabels == e.includePodLabels { + if reflect.DeepEqual(config, e.config) { return } @@ -144,12 +142,8 @@ func (e *IPFIXExporter) UpdateOptions(opt *options.Options) { } else { e.observationDomainID = genObservationDomainID(e.k8sClient) } - e.includePodLabels = opt.Config.RecordContents.PodLabels - klog.InfoS("New IPFIXExporter configuration", "collectorAddress", e.externalFlowCollectorAddr, "collectorProtocol", e.externalFlowCollectorProto, "sendJSON", e.sendJSONRecord, "domainID", e.observationDomainID, "includePodLabels", e.includePodLabels) + klog.InfoS("New IPFIXExporter configuration", "collectorAddress", e.externalFlowCollectorAddr, "collectorProtocol", e.externalFlowCollectorProto, "sendJSON", e.sendJSONRecord, "domainID", e.observationDomainID) - // In theory, a change to e.includePodLabels does not require opening a new connection, it - // just requires sending new templates. But it is easier to treat all configuration changes - // uniformly. if e.exportingProcess != nil { e.exportingProcess.CloseConnToCollector() e.exportingProcess = nil @@ -327,14 +321,12 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) { } elements = append(elements, ie) } - if e.includePodLabels { - for _, ie := range infoelements.AntreaLabelsElementList { - ie, err := e.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID) - if err != nil { - return 0, err - } - elements = append(elements, ie) + for _, ie := range infoelements.AntreaLabelsElementList { + ie, err := e.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID) + if err != nil { + return 0, err } + elements = append(elements, ie) } e.set.ResetSet() if err := e.set.PrepareSet(ipfixentities.Template, templateID); err != nil { diff --git a/pkg/flowaggregator/exporter/ipfix_test.go b/pkg/flowaggregator/exporter/ipfix_test.go index 6096c3043df..0009f2b2c3e 100644 --- a/pkg/flowaggregator/exporter/ipfix_test.go +++ b/pkg/flowaggregator/exporter/ipfix_test.go @@ -51,14 +51,14 @@ func createElement(name string, enterpriseID uint32) ipfixentities.InfoElementWi } func TestIPFIXExporter_sendTemplateSet(t *testing.T) { - ctrl := gomock.NewController(t) + runTest := func(t *testing.T, isIPv6 bool) { + ctrl := gomock.NewController(t) - mockIPFIXExpProc := ipfixtesting.NewMockIPFIXExportingProcess(ctrl) - mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl) - mockTempSet := ipfixentitiestesting.NewMockSet(ctrl) + mockIPFIXExpProc := ipfixtesting.NewMockIPFIXExportingProcess(ctrl) + mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl) + mockTempSet := ipfixentitiestesting.NewMockSet(ctrl) - newIPFIXExporter := func(includePodLabels bool) *IPFIXExporter { - return &IPFIXExporter{ + exporter := &IPFIXExporter{ externalFlowCollectorAddr: "", externalFlowCollectorProto: "", exportingProcess: mockIPFIXExpProc, @@ -66,34 +66,13 @@ func TestIPFIXExporter_sendTemplateSet(t *testing.T) { templateIDv6: testTemplateIDv6, registry: mockIPFIXRegistry, set: mockTempSet, - includePodLabels: includePodLabels, observationDomainID: testObservationDomainID, } - } - - testcases := []struct { - isIPv6 bool - includePodLabels bool - }{ - {false, true}, - {true, true}, - {false, false}, - {true, false}, - } - - for _, tc := range testcases { - exporter := newIPFIXExporter(tc.includePodLabels) - elemList := createElementList(tc.isIPv6, mockIPFIXRegistry) + elemList := createElementList(isIPv6, mockIPFIXRegistry) testTemplateID := exporter.templateIDv4 - if tc.isIPv6 { + if isIPv6 { testTemplateID = exporter.templateIDv6 } - if tc.includePodLabels { - for _, ie := range infoelements.AntreaLabelsElementList { - elemList = append(elemList, createElement(ie, ipfixregistry.AntreaEnterpriseID)) - mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) - } - } mockTempSet.EXPECT().ResetSet() mockTempSet.EXPECT().PrepareSet(ipfixentities.Template, testTemplateID).Return(nil) mockTempSet.EXPECT().AddRecord(elemList, testTemplateID).Return(nil) @@ -101,9 +80,12 @@ func TestIPFIXExporter_sendTemplateSet(t *testing.T) { // above elements: ianaInfoElements, ianaReverseInfoElements and antreaInfoElements. mockIPFIXExpProc.EXPECT().SendSet(mockTempSet).Return(0, nil) - _, err := exporter.sendTemplateSet(tc.isIPv6) - assert.NoErrorf(t, err, "Error in sending template record: %v, isIPv6: %v", err, tc.isIPv6) + _, err := exporter.sendTemplateSet(isIPv6) + assert.NoErrorf(t, err, "Error when sending template record") } + + t.Run("IPv4", func(t *testing.T) { runTest(t, false) }) + t.Run("IPv6", func(t *testing.T) { runTest(t, true) }) } func TestIPFIXExporter_UpdateOptions(t *testing.T) { @@ -163,7 +145,7 @@ func TestIPFIXExporter_UpdateOptions(t *testing.T) { const newAddr = "newAddr" const newProto = "newProto" config.FlowCollector.Address = fmt.Sprintf("%s:%s", newAddr, newProto) - config.RecordContents.PodLabels = true + config.FlowCollector.RecordFormat = "JSON" ipfixExporter.UpdateOptions(&options.Options{ Config: config, @@ -173,7 +155,7 @@ func TestIPFIXExporter_UpdateOptions(t *testing.T) { assert.Equal(t, newAddr, ipfixExporter.externalFlowCollectorAddr) assert.Equal(t, newProto, ipfixExporter.externalFlowCollectorProto) - assert.True(t, ipfixExporter.includePodLabels) + assert.True(t, ipfixExporter.sendJSONRecord) require.NoError(t, ipfixExporter.AddRecord(mockRecord, false)) assert.Equal(t, 2, setCount, "Invalid number of flow sets sent by exporter") @@ -305,6 +287,10 @@ func createElementList(isIPv6 bool, mockIPFIXRegistry *ipfixtesting.MockIPFIXReg elemList = append(elemList, createElement(infoelements.AntreaDestinationThroughputElementList[i], ipfixregistry.AntreaEnterpriseID)) mockIPFIXRegistry.EXPECT().GetInfoElement(infoelements.AntreaDestinationThroughputElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) } + for _, ie := range infoelements.AntreaLabelsElementList { + elemList = append(elemList, createElement(ie, ipfixregistry.AntreaEnterpriseID)) + mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) + } return elemList } diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index 047d3ff8da9..232cb5d84c1 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -417,7 +417,8 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor fa.fillK8sMetadata(key, record.Record, *startTime) fa.aggregationProcess.SetCorrelatedFieldsFilled(record, true) } - if fa.includePodLabels && !fa.aggregationProcess.AreExternalFieldsFilled(*record) { + // Even if fa.includePodLabels is false, we still need to add an empty IE to match the template. + if !fa.aggregationProcess.AreExternalFieldsFilled(*record) { fa.fillPodLabels(key, record.Record, *startTime) fa.aggregationProcess.SetExternalFieldsFilled(record, true) } @@ -502,7 +503,11 @@ func (fa *flowAggregator) fetchPodLabels(ip string, startTime time.Time) string klog.ErrorS(nil, "Error when getting Pod information from podInformer", "ip", ip, "startTime", startTime) return "" } - labelsJSON, err := json.Marshal(pod.GetLabels()) + labels := pod.GetLabels() + if labels == nil { + labels = map[string]string{} + } + labelsJSON, err := json.Marshal(labels) if err != nil { klog.ErrorS(err, "Error when JSON encoding of Pod labels") return "" @@ -512,22 +517,25 @@ func (fa *flowAggregator) fetchPodLabels(ip string, startTime time.Time) string func (fa *flowAggregator) fillPodLabelsForSide(ip string, record ipfixentities.Record, startTime time.Time, podNamespaceIEName, podNameIEName, podLabelsIEName string) error { podLabelsString := "" - if podName, _, ok := record.GetInfoElementWithValue(podNameIEName); ok { - podNameString := podName.GetStringValue() - if podNamespace, _, ok := record.GetInfoElementWithValue(podNamespaceIEName); ok { - podNamespaceString := podNamespace.GetStringValue() - if podNameString != "" && podNamespaceString != "" { - podLabelsString = fa.fetchPodLabels(ip, startTime) + // If fa.includePodLabels is false, we always use an empty string. + // If fa.includePodLabels is true, we use an empty string in case of error or if the + // endpoint is not a Pod, and a valid JSON dictionary otherwise (which will be empty if the + // Pod has no labels). + if fa.includePodLabels { + if podName, _, ok := record.GetInfoElementWithValue(podNameIEName); ok { + podNameString := podName.GetStringValue() + if podNamespace, _, ok := record.GetInfoElementWithValue(podNamespaceIEName); ok { + podNamespaceString := podNamespace.GetStringValue() + if podNameString != "" && podNamespaceString != "" { + podLabelsString = fa.fetchPodLabels(ip, startTime) + } } } } podLabelsElement, err := fa.registry.GetInfoElement(podLabelsIEName, ipfixregistry.AntreaEnterpriseID) if err == nil { - podLabelsIE, err := ipfixentities.DecodeAndCreateInfoElementWithValue(podLabelsElement, bytes.NewBufferString(podLabelsString).Bytes()) - if err != nil { - return fmt.Errorf("error when creating podLabels InfoElementWithValue: %v", err) - } + podLabelsIE := ipfixentities.NewStringInfoElement(podLabelsElement, podLabelsString) if err := record.AddInfoElement(podLabelsIE); err != nil { return fmt.Errorf("error when adding podLabels InfoElementWithValue: %v", err) } @@ -540,10 +548,10 @@ func (fa *flowAggregator) fillPodLabelsForSide(ip string, record ipfixentities.R func (fa *flowAggregator) fillPodLabels(key ipfixintermediate.FlowKey, record ipfixentities.Record, startTime time.Time) { if err := fa.fillPodLabelsForSide(key.SourceAddress, record, startTime, "sourcePodNamespace", "sourcePodName", "sourcePodLabels"); err != nil { - klog.ErrorS(err, "Error when filling pod labels", "side", "source") + klog.ErrorS(err, "Error when filling Pod labels", "side", "source") } if err := fa.fillPodLabelsForSide(key.DestinationAddress, record, startTime, "destinationPodNamespace", "destinationPodName", "destinationPodLabels"); err != nil { - klog.ErrorS(err, "Error when filling pod labels", "side", "destination") + klog.ErrorS(err, "Error when filling Pod labels", "side", "destination") } } diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index 53160e87bb2..b3b789f0fef 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -18,7 +18,6 @@ import ( "bytes" "os" "path/filepath" - "strconv" "sync" "testing" "time" @@ -59,31 +58,6 @@ func init() { } func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { - ctrl := gomock.NewController(t) - mockPodStore := podstoretest.NewMockInterface(ctrl) - mockIPFIXExporter := exportertesting.NewMockInterface(ctrl) - mockClickHouseExporter := exportertesting.NewMockInterface(ctrl) - mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl) - mockRecord := ipfixentitiestesting.NewMockRecord(ctrl) - mockAggregationProcess := ipfixtesting.NewMockIPFIXAggregationProcess(ctrl) - - newFlowAggregator := func(includePodLabels bool) *flowAggregator { - return &flowAggregator{ - aggregatorTransportProtocol: "tcp", - aggregationProcess: mockAggregationProcess, - activeFlowRecordTimeout: testActiveTimeout, - inactiveFlowRecordTimeout: testInactiveTimeout, - ipfixExporter: mockIPFIXExporter, - clickHouseExporter: mockClickHouseExporter, - registry: mockIPFIXRegistry, - flowAggregatorAddress: "", - includePodLabels: includePodLabels, - podStore: mockPodStore, - } - } - - mockExporters := []*exportertesting.MockInterface{mockIPFIXExporter, mockClickHouseExporter} - ipv4Key := ipfixintermediate.FlowKey{ SourceAddress: "10.0.0.1", DestinationAddress: "10.0.0.2", @@ -91,7 +65,6 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { SourcePort: 1234, DestinationPort: 5678, } - ipv6Key := ipfixintermediate.FlowKey{ SourceAddress: "2001:0:3238:dfe1:63::fefb", DestinationAddress: "2001:0:3238:dfe1:63::fefc", @@ -100,82 +73,124 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { DestinationPort: 5678, } - readyRecord := &ipfixintermediate.AggregationFlowRecord{ - Record: mockRecord, - ReadyToSend: true, + podA := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "podA", + }, + } + podB := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "podB", + }, } testcases := []struct { name string isIPv6 bool flowKey ipfixintermediate.FlowKey - flowRecord *ipfixintermediate.AggregationFlowRecord includePodLabels bool }{ { "IPv4_ready_to_send_with_pod_labels", false, ipv4Key, - readyRecord, true, }, { "IPv6_ready_to_send_with_pod_labels", true, ipv6Key, - readyRecord, true, }, { "IPv4_ready_to_send_without_pod_labels", false, ipv4Key, - readyRecord, false, }, { "IPv6_ready_to_send_without_pod_labels", true, ipv6Key, - readyRecord, false, }, } for _, tc := range testcases { - fa := newFlowAggregator(tc.includePodLabels) - for _, exporter := range mockExporters { - exporter.EXPECT().AddRecord(mockRecord, tc.isIPv6) - } - emptyStr := make([]byte, 0) - - mockAggregationProcess.EXPECT().ResetStatAndThroughputElementsInRecord(mockRecord).Return(nil) - flowStartSecondsElement, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("flowStartSeconds", 150, 14, ipfixregistry.IANAEnterpriseID, 4), []byte(strconv.Itoa(int(time.Now().Unix())))) - mockRecord.EXPECT().GetInfoElementWithValue("flowStartSeconds").Return(flowStartSecondsElement, 0, true) - mockAggregationProcess.EXPECT().AreCorrelatedFieldsFilled(*tc.flowRecord).Return(false) - sourcePodNameElem, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("sourcePodName", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), emptyStr) - mockRecord.EXPECT().GetInfoElementWithValue("sourcePodName").Return(sourcePodNameElem, 0, false) - destPodNameElem, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("destinationPodName", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), emptyStr) - mockRecord.EXPECT().GetInfoElementWithValue("destinationPodName").Return(destPodNameElem, 0, false) - mockAggregationProcess.EXPECT().SetCorrelatedFieldsFilled(tc.flowRecord, true) - if tc.includePodLabels { - mockAggregationProcess.EXPECT().AreExternalFieldsFilled(*tc.flowRecord).Return(false) - mockRecord.EXPECT().GetInfoElementWithValue("sourcePodName").Return(sourcePodNameElem, 0, false) + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPodStore := podstoretest.NewMockInterface(ctrl) + mockIPFIXExporter := exportertesting.NewMockInterface(ctrl) + mockClickHouseExporter := exportertesting.NewMockInterface(ctrl) + mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl) + mockRecord := ipfixentitiestesting.NewMockRecord(ctrl) + mockAggregationProcess := ipfixtesting.NewMockIPFIXAggregationProcess(ctrl) + + newFlowAggregator := func(includePodLabels bool) *flowAggregator { + return &flowAggregator{ + aggregatorTransportProtocol: "tcp", + aggregationProcess: mockAggregationProcess, + activeFlowRecordTimeout: testActiveTimeout, + inactiveFlowRecordTimeout: testInactiveTimeout, + ipfixExporter: mockIPFIXExporter, + clickHouseExporter: mockClickHouseExporter, + registry: mockIPFIXRegistry, + flowAggregatorAddress: "", + includePodLabels: includePodLabels, + podStore: mockPodStore, + } + } + + mockExporters := []*exportertesting.MockInterface{mockIPFIXExporter, mockClickHouseExporter} + + flowRecord := &ipfixintermediate.AggregationFlowRecord{ + Record: mockRecord, + ReadyToSend: true, + } + + fa := newFlowAggregator(tc.includePodLabels) + for _, exporter := range mockExporters { + exporter.EXPECT().AddRecord(mockRecord, tc.isIPv6) + } + + startTime := time.Now().Truncate(time.Second) + + mockAggregationProcess.EXPECT().ResetStatAndThroughputElementsInRecord(mockRecord).Return(nil) + flowStartSecondsIE := ipfixentities.NewDateTimeSecondsInfoElement(ipfixentities.NewInfoElement("flowStartSeconds", 150, 14, ipfixregistry.IANAEnterpriseID, 4), uint32(startTime.Unix())) + mockRecord.EXPECT().GetInfoElementWithValue("flowStartSeconds").Return(flowStartSecondsIE, 0, true) + mockAggregationProcess.EXPECT().AreCorrelatedFieldsFilled(*flowRecord).Return(false) + sourcePodNameIE := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("sourcePodName", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), "podA") + mockRecord.EXPECT().GetInfoElementWithValue("sourcePodName").Return(sourcePodNameIE, 0, true).MinTimes(1) + destinationPodNameIE := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("destinationPodName", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), "podB") + mockRecord.EXPECT().GetInfoElementWithValue("destinationPodName").Return(destinationPodNameIE, 0, true).MinTimes(1) + mockAggregationProcess.EXPECT().SetCorrelatedFieldsFilled(flowRecord, true) + mockAggregationProcess.EXPECT().AreExternalFieldsFilled(*flowRecord).Return(false) + podLabels := "" + if tc.includePodLabels { + podLabels = "{}" + sourcePodNamespaceIE := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("sourcePodNamespace", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), "default") + mockRecord.EXPECT().GetInfoElementWithValue("sourcePodNamespace").Return(sourcePodNamespaceIE, 0, true) + destinationPodNamespaceIE := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("destinationPodNamespace", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), "default") + mockRecord.EXPECT().GetInfoElementWithValue("destinationPodNamespace").Return(destinationPodNamespaceIE, 0, true) + mockPodStore.EXPECT().GetPodByIPAndTime(tc.flowKey.SourceAddress, startTime).Return(podA, true) + mockPodStore.EXPECT().GetPodByIPAndTime(tc.flowKey.DestinationAddress, startTime).Return(podB, true) + } sourcePodLabelsElement := ipfixentities.NewInfoElement("sourcePodLabels", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0) mockIPFIXRegistry.EXPECT().GetInfoElement("sourcePodLabels", ipfixregistry.AntreaEnterpriseID).Return(sourcePodLabelsElement, nil) - sourcePodLabelsIE, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(sourcePodLabelsElement, bytes.NewBufferString("").Bytes()) + sourcePodLabelsIE := ipfixentities.NewStringInfoElement(sourcePodLabelsElement, podLabels) mockRecord.EXPECT().AddInfoElement(sourcePodLabelsIE).Return(nil) - mockRecord.EXPECT().GetInfoElementWithValue("destinationPodName").Return(destPodNameElem, 0, false) destinationPodLabelsElement := ipfixentities.NewInfoElement("destinationPodLabels", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0) mockIPFIXRegistry.EXPECT().GetInfoElement("destinationPodLabels", ipfixregistry.AntreaEnterpriseID).Return(destinationPodLabelsElement, nil) - destinationPodLabelsIE, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(destinationPodLabelsElement, bytes.NewBufferString("").Bytes()) + destinationPodLabelsIE := ipfixentities.NewStringInfoElement(destinationPodLabelsElement, podLabels) mockRecord.EXPECT().AddInfoElement(destinationPodLabelsIE).Return(nil) - mockAggregationProcess.EXPECT().SetExternalFieldsFilled(tc.flowRecord, true) - } - mockAggregationProcess.EXPECT().IsAggregatedRecordIPv4(*tc.flowRecord).Return(!tc.isIPv6) + mockAggregationProcess.EXPECT().SetExternalFieldsFilled(flowRecord, true) + mockAggregationProcess.EXPECT().IsAggregatedRecordIPv4(*flowRecord).Return(!tc.isIPv6) - err := fa.sendFlowKeyRecord(tc.flowKey, tc.flowRecord) - assert.NoError(t, err, "Error in sending flow key record: %v, key: %v, record: %v", err, tc.flowKey, tc.flowRecord) + err := fa.sendFlowKeyRecord(tc.flowKey, flowRecord) + assert.NoError(t, err, "Error in sending flow key record: %v, key: %v, record: %v", err, tc.flowKey, flowRecord) + }) } } @@ -688,50 +703,64 @@ func TestFlowAggregator_closeUpdateChBeforeFlowExportLoopReturns(t *testing.T) { } func TestFlowAggregator_fetchPodLabels(t *testing.T) { - ctrl := gomock.NewController(t) - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "testPod", - Labels: map[string]string{ - "test": "ut", - }, - }, - Status: v1.PodStatus{ - Phase: v1.PodPending, - PodIPs: []v1.PodIP{ - { - IP: "192.168.1.2", - }, - }, - }, - } - - client := fake.NewSimpleClientset() - // Mock pod store - mockPodStore := podstoretest.NewMockInterface(ctrl) - mockPodStore.EXPECT().GetPodByIPAndTime("", gomock.Any()).Return(nil, false) - mockPodStore.EXPECT().GetPodByIPAndTime("192.168.1.2", gomock.Any()).Return(pod, true) - tests := []struct { name string ip string + pod *v1.Pod want string }{ { name: "no pod object", - ip: "", + ip: "192.168.1.2", + pod: nil, want: "", }, { name: "pod with label", ip: "192.168.1.2", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "testPod", + Labels: map[string]string{ + "test": "ut", + }, + }, + }, want: "{\"test\":\"ut\"}", }, + { + name: "pod with empty labels", + ip: "192.168.1.2", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "testPod", + Labels: map[string]string{}, + }, + }, + want: "{}", + }, + { + name: "pod with null labels", + ip: "192.168.1.2", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "testPod", + Labels: nil, + }, + }, + want: "{}", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + client := fake.NewSimpleClientset() + mockPodStore := podstoretest.NewMockInterface(ctrl) + mockPodStore.EXPECT().GetPodByIPAndTime(tt.ip, gomock.Any()).Return(tt.pod, tt.pod != nil) fa := &flowAggregator{ k8sClient: client, includePodLabels: true, @@ -865,19 +894,12 @@ func TestFlowAggregator_fillK8sMetadata(t *testing.T) { }, }, } - emptyStr := make([]byte, 0) - sourcePodNameElem, err := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("sourcePodName", uint16(0), ipfixentities.String, ipfixregistry.AntreaEnterpriseID, uint16(0)), emptyStr) - require.NoError(t, err) - sourcePodNamespaceElem, err := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("sourcePodNamespace", uint16(0), ipfixentities.String, ipfixregistry.AntreaEnterpriseID, uint16(0)), emptyStr) - require.NoError(t, err) - sourceNodeNameElem, err := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("sourceNodeName", uint16(0), ipfixentities.String, ipfixregistry.AntreaEnterpriseID, uint16(0)), emptyStr) - require.NoError(t, err) - destinationPodNameElem, err := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("destinationPodName", uint16(0), ipfixentities.String, ipfixregistry.AntreaEnterpriseID, uint16(0)), emptyStr) - require.NoError(t, err) - destinationPodNamespaceElem, err := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("destinationPodNamespace", uint16(0), ipfixentities.String, ipfixregistry.AntreaEnterpriseID, uint16(0)), emptyStr) - require.NoError(t, err) - destinationNodeNameElem, err := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("destinationNodeName", uint16(0), ipfixentities.String, ipfixregistry.AntreaEnterpriseID, uint16(0)), emptyStr) - require.NoError(t, err) + sourcePodNameElem := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("sourcePodName", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0), "") + sourcePodNamespaceElem := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("sourcePodNamespace", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0), "") + sourceNodeNameElem := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("sourceNodeName", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0), "") + destinationPodNameElem := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("destinationPodName", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0), "") + destinationPodNamespaceElem := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("destinationPodNamespace", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0), "") + destinationNodeNameElem := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("destinationNodeName", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0), "") ctrl := gomock.NewController(t) mockRecord := ipfixentitiestesting.NewMockRecord(ctrl) From ea22cac2e8e7b1d9a92563f92e4361ed3997e10b Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Fri, 7 Jun 2024 14:52:21 -0700 Subject: [PATCH 2/2] Fix some grammar Signed-off-by: Antonin Bas --- pkg/agent/flowexporter/exporter/exporter_test.go | 4 ++-- pkg/flowaggregator/flowaggregator_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/agent/flowexporter/exporter/exporter_test.go b/pkg/agent/flowexporter/exporter/exporter_test.go index 5df827176e8..0d2968a76d1 100644 --- a/pkg/agent/flowexporter/exporter/exporter_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_test.go @@ -126,7 +126,7 @@ func sendTemplateSet(t *testing.T, ctrl *gomock.Controller, mockIPFIXExpProc *ip } mockIPFIXExpProc.EXPECT().SendSet(mockTempSet).Return(0, nil) _, err := flowExp.sendTemplateSet(isIPv6) - assert.NoError(t, err, "Error in sending template set") + assert.NoError(t, err, "Error when sending template set") eL := flowExp.elementsListv4 if isIPv6 { @@ -254,7 +254,7 @@ func testSendDataSet(t *testing.T, v4Enabled bool, v6Enabled bool) { err := flowExp.addConnToSet(&conn) assert.NoError(t, err, "Error when adding record to data set") _, err = flowExp.sendDataSet() - assert.NoError(t, err, "Error in sending data set") + assert.NoError(t, err, "Error when sending data set") } if v4Enabled { diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index b3b789f0fef..956b7bf83cd 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -189,7 +189,7 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { mockAggregationProcess.EXPECT().IsAggregatedRecordIPv4(*flowRecord).Return(!tc.isIPv6) err := fa.sendFlowKeyRecord(tc.flowKey, flowRecord) - assert.NoError(t, err, "Error in sending flow key record: %v, key: %v, record: %v", err, tc.flowKey, flowRecord) + assert.NoError(t, err, "Error when sending flow key record, key: %v, record: %v", tc.flowKey, flowRecord) }) } }