Skip to content

Commit

Permalink
Update Flow aggregate field httpVals
Browse files Browse the repository at this point in the history
if exporter sends httpval1 in first export and sends httpvals2 in second
we should append both before sending further.
Also the function should take care of deduplicating same TxID items.

Signed-off-by: Tushar Tathgur <[email protected]>
  • Loading branch information
Tushar Tathgur authored and Tushar Tathgur committed Dec 6, 2023
1 parent d5ea241 commit 4947179
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 5 deletions.
35 changes: 35 additions & 0 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package intermediate

import (
"container/heap"
"encoding/json"
"fmt"
"net"
"strings"
Expand Down Expand Up @@ -542,6 +543,16 @@ func (a *AggregationProcess) aggregateRecords(incomingRecord, existingRecord ent
incomingVal := ieWithValue.GetStringValue()
existingIeWithValue.SetStringValue(incomingVal)
}
case "httpVals":
incomingVal := ieWithValue.GetStringValue()
existingVal := existingIeWithValue.GetStringValue()
updatedHttpVals, err := fillHttpVals(incomingVal, existingVal)
if err != nil {
klog.Errorf("httpVals could not be updated, err: %v", err)
existingIeWithValue.SetStringValue(incomingVal)

Check warning on line 552 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L551-L552

Added lines #L551 - L552 were not covered by tests
} else {
existingIeWithValue.SetStringValue(updatedHttpVals)
}
default:
klog.Errorf("Fields with name %v is not supported in aggregation fields list.", element)
}
Expand Down Expand Up @@ -984,3 +995,27 @@ func isCorrelationRequired(flowType uint8, record entities.Record) bool {
}
return false
}

func fillHttpVals(incomingHttpVals, existingHttpVals string) (string, error) {
incomingHttpValsJson := make(map[int32]string)
existingHttpValsJson := make(map[int32]string)

if incomingHttpVals != "" {
if err := json.Unmarshal([]byte(incomingHttpVals), &incomingHttpValsJson); err != nil {
return "", fmt.Errorf("error parsing JSON: %v", err)

Check warning on line 1005 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L1005

Added line #L1005 was not covered by tests
}
}
if existingHttpVals != "" {
if err := json.Unmarshal([]byte(existingHttpVals), &existingHttpValsJson); err != nil {
return "", fmt.Errorf("error parsing JSON: %v", err)

Check warning on line 1010 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L1010

Added line #L1010 was not covered by tests
}
}
for key, value := range existingHttpValsJson {
incomingHttpValsJson[key] = value
}
updatedHttpVals, err := json.Marshal(incomingHttpValsJson)
if err != nil {
return "", fmt.Errorf("error converting JSON to string: %v", err)

Check warning on line 1018 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L1018

Added line #L1018 was not covered by tests
}
return string(updatedHttpVals), nil
}
51 changes: 46 additions & 5 deletions pkg/intermediate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var (
"flowEndSeconds",
"flowEndReason",
"tcpState",
"httpVals",
}
statsElementList = []string{
"packetTotalCount",
Expand Down Expand Up @@ -165,7 +166,7 @@ func createDataMsgForSrc(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
ie6 := entities.NewStringInfoElement(entities.NewInfoElement("sourcePodName", 101, 13, registry.AntreaEnterpriseID, 65535), srcPod)
ie7 := entities.NewStringInfoElement(entities.NewInfoElement("destinationPodName", 103, 13, registry.AntreaEnterpriseID, 65535), dstPod)
ie9 := entities.NewUnsigned16InfoElement(entities.NewInfoElement("destinationServicePort", 107, 2, registry.AntreaEnterpriseID, 2), uint16(4739))
var ie1, ie2, ie8, ie10, ie11, ie12, ie13, ie14, ie15, ie16 entities.InfoElementWithValue
var ie1, ie2, ie8, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17, ie18 entities.InfoElementWithValue
if !isIPv6 {
ie1 = entities.NewIPAddressInfoElement(entities.NewInfoElement("sourceIPv4Address", 8, 18, 0, 4), net.ParseIP("10.0.0.1").To4())
ie2 = entities.NewIPAddressInfoElement(entities.NewInfoElement("destinationIPv4Address", 12, 18, 0, 4), net.ParseIP("10.0.0.2").To4())
Expand All @@ -179,15 +180,18 @@ func createDataMsgForSrc(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
tmpFlowEndSecs, _ := registry.GetInfoElement("flowEndSeconds", registry.IANAEnterpriseID)
tmpFlowEndReason, _ := registry.GetInfoElement("flowEndReason", registry.IANAEnterpriseID)
tmpTCPState, _ := registry.GetInfoElement("tcpState", registry.AntreaEnterpriseID)
tmpHttpVals, _ := registry.GetInfoElement("httpVals", registry.AntreaEnterpriseID)

if !isUpdatedRecord {
ie10 = entities.NewDateTimeSecondsInfoElement(tmpFlowEndSecs, uint32(1))
ie12 = entities.NewUnsigned8InfoElement(tmpFlowEndReason, registry.ActiveTimeoutReason)
ie13 = entities.NewStringInfoElement(tmpTCPState, "ESTABLISHED")
ie18 = entities.NewStringInfoElement(tmpHttpVals, "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}")
} else {
ie10 = entities.NewDateTimeSecondsInfoElement(tmpFlowEndSecs, uint32(10))
ie12 = entities.NewUnsigned8InfoElement(tmpFlowEndReason, registry.EndOfFlowReason)
ie13 = entities.NewStringInfoElement(tmpTCPState, "TIME_WAIT")
ie18 = entities.NewStringInfoElement(tmpHttpVals, "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}")
}

if isToExternal {
Expand All @@ -207,9 +211,9 @@ func createDataMsgForSrc(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
} else {
ie15 = entities.NewUnsigned8InfoElement(entities.NewInfoElement("egressNetworkPolicyRuleAction", 140, 1, registry.AntreaEnterpriseID, 1), registry.NetworkPolicyRuleActionNoAction)
}
ie17 := entities.NewDateTimeSecondsInfoElement(tmpFlowStartSecs, uint32(0))
ie17 = entities.NewDateTimeSecondsInfoElement(tmpFlowStartSecs, uint32(0))

elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17)
elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17, ie18)
// Add all elements in statsElements.
for _, element := range statsElementList {
var e *entities.InfoElement
Expand Down Expand Up @@ -269,7 +273,7 @@ func createDataMsgForDst(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
var srcAddr, dstAddr, svcAddr []byte
var flowStartTime, flowEndTime uint32
var flowEndReason, ingressNetworkPolicyRuleAction, antreaFlowType uint8
var srcPod, dstPod, tcpState string
var srcPod, dstPod, tcpState, httpVals string
var svcPort uint16
srcPort := uint16(1234)
dstPort := uint16(5678)
Expand Down Expand Up @@ -323,10 +327,12 @@ func createDataMsgForDst(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
flowEndTime = uint32(1)
flowEndReason = registry.ActiveTimeoutReason
tcpState = "ESTABLISHED"
httpVals = "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}"
} else {
flowEndTime = uint32(10)
flowEndReason = registry.EndOfFlowReason
tcpState = "TIME_WAIT"
httpVals = "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}"
}
tmpElement, _ := registry.GetInfoElement("flowStartSeconds", registry.IANAEnterpriseID)
ie17 := entities.NewDateTimeSecondsInfoElement(tmpElement, flowStartTime)
Expand All @@ -345,8 +351,10 @@ func createDataMsgForDst(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
ie14 := entities.NewUnsigned8InfoElement(entities.NewInfoElement("ingressNetworkPolicyRuleAction", 139, 1, registry.AntreaEnterpriseID, 1), ingressNetworkPolicyRuleAction)
ie15 := entities.NewUnsigned8InfoElement(entities.NewInfoElement("egressNetworkPolicyRuleAction", 140, 1, registry.AntreaEnterpriseID, 1), egressNetworkPolicyRuleAction)
ie16 := entities.NewSigned32InfoElement(entities.NewInfoElement("ingressNetworkPolicyRulePriority", 116, 7, registry.AntreaEnterpriseID, 4), ingressNetworkPolicyRulePriority)
tmpElement, _ = registry.GetInfoElement("httpVals", registry.AntreaEnterpriseID)
ie18 := entities.NewStringInfoElement(tmpElement, httpVals)

elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17)
elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17, ie18)
// Add all elements in statsElements.
for _, element := range statsElementList {
var e *entities.InfoElement
Expand Down Expand Up @@ -802,6 +810,7 @@ func assertElementMap(t *testing.T, record map[string]interface{}, ipv6 bool) {
assert.Equal(t, uint64(0), record["packetDeltaCount"])
assert.Equal(t, uint64(502), record["reversePacketTotalCount"])
assert.Equal(t, uint64(0), record["reversePacketDeltaCount"])
assert.Equal(t, "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}", record["httpVals"])
}

func TestGetRecords(t *testing.T) {
Expand Down Expand Up @@ -1127,3 +1136,35 @@ func runAggregationAndCheckResult(t *testing.T, ap *AggregationProcess, srcRecor
assert.Equalf(t, uint64(expectedVal), ieWithValue.GetUnsigned64Value(), "values should be equal for element %v", e)
}
}

func TestFillHttpVals(t *testing.T) {
testCases := []struct {
name string
incomingHttpVals string
existingHttpVals string
updatedHttpVals string
}{
{
name: "Normal case",
incomingHttpVals: "{\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
existingHttpVals: "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
updatedHttpVals: "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\",\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
}, {
name: "Existing httpVals empty",
incomingHttpVals: "{\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
existingHttpVals: "",
updatedHttpVals: "{\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
}, {
name: "overlapping httpVals empty",
incomingHttpVals: "{\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
existingHttpVals: "{\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
updatedHttpVals: "{\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
retVals, _ := fillHttpVals(tt.incomingHttpVals, tt.existingHttpVals)
assert.Equal(t, tt.updatedHttpVals, retVals)
})
}
}

0 comments on commit 4947179

Please sign in to comment.