Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Flow aggregate field httpVals #334

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import (
"container/heap"
"encoding/json"
"fmt"
"net"
"strings"
Expand Down Expand Up @@ -542,6 +543,16 @@
incomingVal := ieWithValue.GetStringValue()
existingIeWithValue.SetStringValue(incomingVal)
}
case "httpVals":
incomingVal := ieWithValue.GetStringValue()
existingVal := existingIeWithValue.GetStringValue()
updatedHttpVals, err := fillHttpVals(incomingVal, existingVal)
if err != nil {
klog.Errorf("httpVals could not be updated, err: %v", err)
existingIeWithValue.SetStringValue(incomingVal)

Check warning on line 552 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L551-L552

Added lines #L551 - L552 were not covered by tests
} else {
existingIeWithValue.SetStringValue(updatedHttpVals)
}
default:
klog.Errorf("Fields with name %v is not supported in aggregation fields list.", element)
}
Expand Down Expand Up @@ -984,3 +995,27 @@
}
return false
}

func fillHttpVals(incomingHttpVals, existingHttpVals string) (string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a unit test for this function to make sure we correlate two HttpVals correctly?

incomingHttpValsJson := make(map[int32]string)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgive me if it is a dumb question. Transaction iD should be the same for the same event in different nodes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes transaction ID should be same

Copy link
Member

@antoninbas antoninbas Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a key point, and the commit message should probably mention it.
Given that the HTTP values from both Nodes should eventually be consistent, I wonder if there is real value in that "complex" aggregation logic, or if we should just use the "latest" record (like we do for some other fields) and simply override the aggregated value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not able to reproduce it in my local setup, but the thought behind this design is if multiple exporters send the httpvals with different or overlapping TxIDs, we should deduplicate it.
if we only take the latest record for aggregation, we may not get all the TxIDs in a aggregated output given the aggregation interval is bigger than the exporter interval.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we won't be able to use the "latest" record in the current design.
The records from our Flow Exporter only contain partial httpVals as this field is reset after the connection is exported from the FE.
If we want to use the latest record in the FA, we might need to do the aggregation in the FE instead, which could end up increasing the packet size, transferred from FE to FA, by time?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @yuntanghsu, that makes sense.

existingHttpValsJson := make(map[int32]string)

if incomingHttpVals != "" {
if err := json.Unmarshal([]byte(incomingHttpVals), &incomingHttpValsJson); err != nil {
return "", fmt.Errorf("error parsing JSON: %v", err)

Check warning on line 1005 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L1005

Added line #L1005 was not covered by tests
}
}
if existingHttpVals != "" {
if err := json.Unmarshal([]byte(existingHttpVals), &existingHttpValsJson); err != nil {
return "", fmt.Errorf("error parsing JSON: %v", err)

Check warning on line 1010 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L1010

Added line #L1010 was not covered by tests
}
}
for key, value := range existingHttpValsJson {
incomingHttpValsJson[key] = value
}
updatedHttpVals, err := json.Marshal(incomingHttpValsJson)
if err != nil {
return "", fmt.Errorf("error converting JSON to string: %v", err)

Check warning on line 1018 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L1018

Added line #L1018 was not covered by tests
}
return string(updatedHttpVals), nil
}
51 changes: 46 additions & 5 deletions pkg/intermediate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var (
"flowEndSeconds",
"flowEndReason",
"tcpState",
"httpVals",
}
statsElementList = []string{
"packetTotalCount",
Expand Down Expand Up @@ -165,7 +166,7 @@ func createDataMsgForSrc(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
ie6 := entities.NewStringInfoElement(entities.NewInfoElement("sourcePodName", 101, 13, registry.AntreaEnterpriseID, 65535), srcPod)
ie7 := entities.NewStringInfoElement(entities.NewInfoElement("destinationPodName", 103, 13, registry.AntreaEnterpriseID, 65535), dstPod)
ie9 := entities.NewUnsigned16InfoElement(entities.NewInfoElement("destinationServicePort", 107, 2, registry.AntreaEnterpriseID, 2), uint16(4739))
var ie1, ie2, ie8, ie10, ie11, ie12, ie13, ie14, ie15, ie16 entities.InfoElementWithValue
var ie1, ie2, ie8, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17, ie18 entities.InfoElementWithValue
if !isIPv6 {
ie1 = entities.NewIPAddressInfoElement(entities.NewInfoElement("sourceIPv4Address", 8, 18, 0, 4), net.ParseIP("10.0.0.1").To4())
ie2 = entities.NewIPAddressInfoElement(entities.NewInfoElement("destinationIPv4Address", 12, 18, 0, 4), net.ParseIP("10.0.0.2").To4())
Expand All @@ -179,15 +180,18 @@ func createDataMsgForSrc(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
tmpFlowEndSecs, _ := registry.GetInfoElement("flowEndSeconds", registry.IANAEnterpriseID)
tmpFlowEndReason, _ := registry.GetInfoElement("flowEndReason", registry.IANAEnterpriseID)
tmpTCPState, _ := registry.GetInfoElement("tcpState", registry.AntreaEnterpriseID)
tmpHttpVals, _ := registry.GetInfoElement("httpVals", registry.AntreaEnterpriseID)

if !isUpdatedRecord {
ie10 = entities.NewDateTimeSecondsInfoElement(tmpFlowEndSecs, uint32(1))
ie12 = entities.NewUnsigned8InfoElement(tmpFlowEndReason, registry.ActiveTimeoutReason)
ie13 = entities.NewStringInfoElement(tmpTCPState, "ESTABLISHED")
ie18 = entities.NewStringInfoElement(tmpHttpVals, "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}")
} else {
ie10 = entities.NewDateTimeSecondsInfoElement(tmpFlowEndSecs, uint32(10))
ie12 = entities.NewUnsigned8InfoElement(tmpFlowEndReason, registry.EndOfFlowReason)
ie13 = entities.NewStringInfoElement(tmpTCPState, "TIME_WAIT")
ie18 = entities.NewStringInfoElement(tmpHttpVals, "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}")
}

if isToExternal {
Expand All @@ -207,9 +211,9 @@ func createDataMsgForSrc(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
} else {
ie15 = entities.NewUnsigned8InfoElement(entities.NewInfoElement("egressNetworkPolicyRuleAction", 140, 1, registry.AntreaEnterpriseID, 1), registry.NetworkPolicyRuleActionNoAction)
}
ie17 := entities.NewDateTimeSecondsInfoElement(tmpFlowStartSecs, uint32(0))
ie17 = entities.NewDateTimeSecondsInfoElement(tmpFlowStartSecs, uint32(0))

elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17)
elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17, ie18)
// Add all elements in statsElements.
for _, element := range statsElementList {
var e *entities.InfoElement
Expand Down Expand Up @@ -269,7 +273,7 @@ func createDataMsgForDst(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
var srcAddr, dstAddr, svcAddr []byte
var flowStartTime, flowEndTime uint32
var flowEndReason, ingressNetworkPolicyRuleAction, antreaFlowType uint8
var srcPod, dstPod, tcpState string
var srcPod, dstPod, tcpState, httpVals string
var svcPort uint16
srcPort := uint16(1234)
dstPort := uint16(5678)
Expand Down Expand Up @@ -323,10 +327,12 @@ func createDataMsgForDst(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
flowEndTime = uint32(1)
flowEndReason = registry.ActiveTimeoutReason
tcpState = "ESTABLISHED"
httpVals = "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}"
} else {
flowEndTime = uint32(10)
flowEndReason = registry.EndOfFlowReason
tcpState = "TIME_WAIT"
httpVals = "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}"
}
tmpElement, _ := registry.GetInfoElement("flowStartSeconds", registry.IANAEnterpriseID)
ie17 := entities.NewDateTimeSecondsInfoElement(tmpElement, flowStartTime)
Expand All @@ -345,8 +351,10 @@ func createDataMsgForDst(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
ie14 := entities.NewUnsigned8InfoElement(entities.NewInfoElement("ingressNetworkPolicyRuleAction", 139, 1, registry.AntreaEnterpriseID, 1), ingressNetworkPolicyRuleAction)
ie15 := entities.NewUnsigned8InfoElement(entities.NewInfoElement("egressNetworkPolicyRuleAction", 140, 1, registry.AntreaEnterpriseID, 1), egressNetworkPolicyRuleAction)
ie16 := entities.NewSigned32InfoElement(entities.NewInfoElement("ingressNetworkPolicyRulePriority", 116, 7, registry.AntreaEnterpriseID, 4), ingressNetworkPolicyRulePriority)
tmpElement, _ = registry.GetInfoElement("httpVals", registry.AntreaEnterpriseID)
ie18 := entities.NewStringInfoElement(tmpElement, httpVals)

elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17)
elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17, ie18)
// Add all elements in statsElements.
for _, element := range statsElementList {
var e *entities.InfoElement
Expand Down Expand Up @@ -802,6 +810,7 @@ func assertElementMap(t *testing.T, record map[string]interface{}, ipv6 bool) {
assert.Equal(t, uint64(0), record["packetDeltaCount"])
assert.Equal(t, uint64(502), record["reversePacketTotalCount"])
assert.Equal(t, uint64(0), record["reversePacketDeltaCount"])
assert.Equal(t, "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}", record["httpVals"])
}

func TestGetRecords(t *testing.T) {
Expand Down Expand Up @@ -1127,3 +1136,35 @@ func runAggregationAndCheckResult(t *testing.T, ap *AggregationProcess, srcRecor
assert.Equalf(t, uint64(expectedVal), ieWithValue.GetUnsigned64Value(), "values should be equal for element %v", e)
}
}

func TestFillHttpVals(t *testing.T) {
testCases := []struct {
name string
incomingHttpVals string
existingHttpVals string
updatedHttpVals string
}{
{
name: "Normal case",
incomingHttpVals: "{\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
existingHttpVals: "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
updatedHttpVals: "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\",\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
}, {
name: "Existing httpVals empty",
incomingHttpVals: "{\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
existingHttpVals: "",
updatedHttpVals: "{\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
}, {
name: "Overlapping httpVals",
incomingHttpVals: "{\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
existingHttpVals: "{\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
updatedHttpVals: "{\"1\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}",
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
retVals, _ := fillHttpVals(tt.incomingHttpVals, tt.existingHttpVals)
assert.Equal(t, tt.updatedHttpVals, retVals)
})
}
}
Loading