Skip to content

Commit

Permalink
Update Flow aggregate field httpVals (vmware#334)
Browse files Browse the repository at this point in the history
if exporter sends httpval1 in first export and sends httpvals2 in second
we should append both before sending further.
Also the function should take care of deduplicating same TxID items.

Signed-off-by: Tushar Tathgur <[email protected]>
Co-authored-by: Tushar Tathgur <[email protected]>
  • Loading branch information
2 people authored and heanlan committed Dec 7, 2023
1 parent 54bc3ba commit dec743b
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 5 deletions.
35 changes: 35 additions & 0 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package intermediate

import (
"container/heap"
"encoding/json"
"fmt"
"net"
"strings"
Expand Down Expand Up @@ -542,6 +543,16 @@ func (a *AggregationProcess) aggregateRecords(incomingRecord, existingRecord ent
incomingVal := ieWithValue.GetStringValue()
existingIeWithValue.SetStringValue(incomingVal)
}
case "httpVals":
incomingVal := ieWithValue.GetStringValue()
existingVal := existingIeWithValue.GetStringValue()
updatedHttpVals, err := fillHttpVals(incomingVal, existingVal)
if err != nil {
klog.Errorf("httpVals could not be updated, err: %v", err)
existingIeWithValue.SetStringValue(incomingVal)
} else {
existingIeWithValue.SetStringValue(updatedHttpVals)
}
default:
klog.Errorf("Fields with name %v is not supported in aggregation fields list.", element)
}
Expand Down Expand Up @@ -984,3 +995,27 @@ func isCorrelationRequired(flowType uint8, record entities.Record) bool {
}
return false
}

func fillHttpVals(incomingHttpVals, existingHttpVals string) (string, error) {
incomingHttpValsJson := make(map[int32]string)
existingHttpValsJson := make(map[int32]string)

if incomingHttpVals != "" {
if err := json.Unmarshal([]byte(incomingHttpVals), &incomingHttpValsJson); err != nil {
return "", fmt.Errorf("error parsing JSON: %v", err)
}
}
if existingHttpVals != "" {
if err := json.Unmarshal([]byte(existingHttpVals), &existingHttpValsJson); err != nil {
return "", fmt.Errorf("error parsing JSON: %v", err)
}
}
for key, value := range existingHttpValsJson {
incomingHttpValsJson[key] = value
}
updatedHttpVals, err := json.Marshal(incomingHttpValsJson)
if err != nil {
return "", fmt.Errorf("error converting JSON to string: %v", err)
}
return string(updatedHttpVals), nil
}
51 changes: 46 additions & 5 deletions pkg/intermediate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var (
"flowEndSeconds",
"flowEndReason",
"tcpState",
"httpVals",
}
statsElementList = []string{
"packetTotalCount",
Expand Down Expand Up @@ -165,7 +166,7 @@ func createDataMsgForSrc(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
ie6 := entities.NewStringInfoElement(entities.NewInfoElement("sourcePodName", 101, 13, registry.AntreaEnterpriseID, 65535), srcPod)
ie7 := entities.NewStringInfoElement(entities.NewInfoElement("destinationPodName", 103, 13, registry.AntreaEnterpriseID, 65535), dstPod)
ie9 := entities.NewUnsigned16InfoElement(entities.NewInfoElement("destinationServicePort", 107, 2, registry.AntreaEnterpriseID, 2), uint16(4739))
var ie1, ie2, ie8, ie10, ie11, ie12, ie13, ie14, ie15, ie16 entities.InfoElementWithValue
var ie1, ie2, ie8, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17, ie18 entities.InfoElementWithValue
if !isIPv6 {
ie1 = entities.NewIPAddressInfoElement(entities.NewInfoElement("sourceIPv4Address", 8, 18, 0, 4), net.ParseIP("10.0.0.1").To4())
ie2 = entities.NewIPAddressInfoElement(entities.NewInfoElement("destinationIPv4Address", 12, 18, 0, 4), net.ParseIP("10.0.0.2").To4())
Expand All @@ -179,15 +180,18 @@ func createDataMsgForSrc(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
tmpFlowEndSecs, _ := registry.GetInfoElement("flowEndSeconds", registry.IANAEnterpriseID)
tmpFlowEndReason, _ := registry.GetInfoElement("flowEndReason", registry.IANAEnterpriseID)
tmpTCPState, _ := registry.GetInfoElement("tcpState", registry.AntreaEnterpriseID)
tmpHttpVals, _ := registry.GetInfoElement("httpVals", registry.AntreaEnterpriseID)

if !isUpdatedRecord {
ie10 = entities.NewDateTimeSecondsInfoElement(tmpFlowEndSecs, uint32(1))
ie12 = entities.NewUnsigned8InfoElement(tmpFlowEndReason, registry.ActiveTimeoutReason)
ie13 = entities.NewStringInfoElement(tmpTCPState, "ESTABLISHED")
ie18 = entities.NewStringInfoElement(tmpHttpVals, "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}")
} else {
ie10 = entities.NewDateTimeSecondsInfoElement(tmpFlowEndSecs, uint32(10))
ie12 = entities.NewUnsigned8InfoElement(tmpFlowEndReason, registry.EndOfFlowReason)
ie13 = entities.NewStringInfoElement(tmpTCPState, "TIME_WAIT")
ie18 = entities.NewStringInfoElement(tmpHttpVals, "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}")
}

if isToExternal {
Expand All @@ -207,9 +211,9 @@ func createDataMsgForSrc(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
} else {
ie15 = entities.NewUnsigned8InfoElement(entities.NewInfoElement("egressNetworkPolicyRuleAction", 140, 1, registry.AntreaEnterpriseID, 1), registry.NetworkPolicyRuleActionNoAction)
}
ie17 := entities.NewDateTimeSecondsInfoElement(tmpFlowStartSecs, uint32(0))
ie17 = entities.NewDateTimeSecondsInfoElement(tmpFlowStartSecs, uint32(0))

elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17)
elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17, ie18)
// Add all elements in statsElements.
for _, element := range statsElementList {
var e *entities.InfoElement
Expand Down Expand Up @@ -269,7 +273,7 @@ func createDataMsgForDst(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
var srcAddr, dstAddr, svcAddr []byte
var flowStartTime, flowEndTime uint32
var flowEndReason, ingressNetworkPolicyRuleAction, antreaFlowType uint8
var srcPod, dstPod, tcpState string
var srcPod, dstPod, tcpState, httpVals string
var svcPort uint16
srcPort := uint16(1234)
dstPort := uint16(5678)
Expand Down Expand Up @@ -323,10 +327,12 @@ func createDataMsgForDst(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
flowEndTime = uint32(1)
flowEndReason = registry.ActiveTimeoutReason
tcpState = "ESTABLISHED"
httpVals = "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}"
} else {
flowEndTime = uint32(10)
flowEndReason = registry.EndOfFlowReason
tcpState = "TIME_WAIT"
httpVals = "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}"
}
tmpElement, _ := registry.GetInfoElement("flowStartSeconds", registry.IANAEnterpriseID)
ie17 := entities.NewDateTimeSecondsInfoElement(tmpElement, flowStartTime)
Expand All @@ -345,8 +351,10 @@ func createDataMsgForDst(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
ie14 := entities.NewUnsigned8InfoElement(entities.NewInfoElement("ingressNetworkPolicyRuleAction", 139, 1, registry.AntreaEnterpriseID, 1), ingressNetworkPolicyRuleAction)
ie15 := entities.NewUnsigned8InfoElement(entities.NewInfoElement("egressNetworkPolicyRuleAction", 140, 1, registry.AntreaEnterpriseID, 1), egressNetworkPolicyRuleAction)
ie16 := entities.NewSigned32InfoElement(entities.NewInfoElement("ingressNetworkPolicyRulePriority", 116, 7, registry.AntreaEnterpriseID, 4), ingressNetworkPolicyRulePriority)
tmpElement, _ = registry.GetInfoElement("httpVals", registry.AntreaEnterpriseID)
ie18 := entities.NewStringInfoElement(tmpElement, httpVals)

elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17)
elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17, ie18)
// Add all elements in statsElements.
for _, element := range statsElementList {
var e *entities.InfoElement
Expand Down Expand Up @@ -802,6 +810,7 @@ func assertElementMap(t *testing.T, record map[string]interface{}, ipv6 bool) {
assert.Equal(t, uint64(0), record["packetDeltaCount"])
assert.Equal(t, uint64(502), record["reversePacketTotalCount"])
assert.Equal(t, uint64(0), record["reversePacketDeltaCount"])
assert.Equal(t, "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}", record["httpVals"])
}

func TestGetRecords(t *testing.T) {
Expand Down Expand Up @@ -1127,3 +1136,35 @@ func runAggregationAndCheckResult(t *testing.T, ap *AggregationProcess, srcRecor
assert.Equalf(t, uint64(expectedVal), ieWithValue.GetUnsigned64Value(), "values should be equal for element %v", e)
}
}

func TestFillHttpVals(t *testing.T) {
testCases := []struct {
name string
incomingHttpVals string
existingHttpVals string
updatedHttpVals string
}{
{
name: "Normal case",
incomingHttpVals: "{\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
existingHttpVals: "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
updatedHttpVals: "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\",\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
}, {
name: "Existing httpVals empty",
incomingHttpVals: "{\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
existingHttpVals: "",
updatedHttpVals: "{\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
}, {
name: "Overlapping httpVals",
incomingHttpVals: "{\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
existingHttpVals: "{\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
updatedHttpVals: "{\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
retVals, _ := fillHttpVals(tt.incomingHttpVals, tt.existingHttpVals)
assert.Equal(t, tt.updatedHttpVals, retVals)
})
}
}

0 comments on commit dec743b

Please sign in to comment.