Skip to content

Commit

Permalink
Modify Aggregation process methods (antrea-io#158) (antrea-io#181)
Browse files Browse the repository at this point in the history
Delete the unrequired method of deleting record from
record map without lock.
Add a method to get the flow updated time for flow given flow key
  • Loading branch information
srikartati committed May 3, 2021
1 parent b11eef6 commit 19760d7
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 34 deletions.
29 changes: 21 additions & 8 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,32 @@ func (a *AggregationProcess) ForAllRecordsDo(callback FlowKeyRecordMapCallBack)
return nil
}

func (a *AggregationProcess) DeleteFlowKeyFromMapWithLock(flowKey FlowKey) {
// GetLastUpdatedTimeOfFlow provides the last updated time in the format of IPFIX
// field "flowEndSeconds".
func (a *AggregationProcess) GetLastUpdatedTimeOfFlow(flowKey FlowKey) (uint32, error) {
a.mutex.Lock()
defer a.mutex.Unlock()
delete(a.flowKeyRecordMap, flowKey)
record, exists := a.flowKeyRecordMap[flowKey]
if !exists {
return 0, fmt.Errorf("flow key is not present in the map")
}
flowEndField, exists := record.Record.GetInfoElementWithValue("flowEndSeconds")
if exists {
return flowEndField.Value.(uint32), nil
} else {
return 0, fmt.Errorf("flowEndSeconds field is not present in the record")
}
}

// DeleteFlowKeyFromMapWithoutLock need to be used only when the caller has already
// acquired the lock. For example, this can be used in a callback of ForAllRecordsDo
// function.
// TODO:Remove this when there is notion of invalid flows supported in aggregation
// process.
func (a *AggregationProcess) DeleteFlowKeyFromMapWithoutLock(flowKey FlowKey) {
func (a *AggregationProcess) DeleteFlowKeyFromMap(flowKey FlowKey) error {
a.mutex.Lock()
defer a.mutex.Unlock()
_, exists := a.flowKeyRecordMap[flowKey]
if !exists {
return fmt.Errorf("flow key is not present in the map")
}
delete(a.flowKeyRecordMap, flowKey)
return nil
}

// addOrUpdateRecordInMap either adds the record to flowKeyMap or updates the record in
Expand Down
46 changes: 20 additions & 26 deletions pkg/intermediate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,21 +515,20 @@ func TestCorrelateRecordsForInterNodeFlow(t *testing.T) {
runCorrelationAndCheckResult(t, ap, record1, record2, false, false, false)
// Cleanup the flowKeyMap in aggregation process.
flowKey1, _ := getFlowKeyFromRecord(record1)
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
err := ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
// Test the scenario, where record2 is added first and then record1.
record1 = createDataMsgForSrc(t, false, false, false, false).GetSet().GetRecords()[0]
record2 = createDataMsgForDst(t, false, false, false).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record2, record1, false, false, false)
// Cleanup the flowKeyMap in aggregation process.
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)

err = ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
// Test IPv6 fields.
// Test the scenario, where record1 is added first and then record2.
record1 = createDataMsgForSrc(t, true, false, false, false).GetSet().GetRecords()[0]
record2 = createDataMsgForDst(t, true, false, false).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record1, record2, true, false, false)
// Cleanup the flowKeyMap in aggregation process.
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
// Test the scenario, where record2 is added first and then record1.
record1 = createDataMsgForSrc(t, true, false, false, false).GetSet().GetRecords()[0]
record2 = createDataMsgForDst(t, true, false, false).GetSet().GetRecords()[0]
Expand All @@ -549,7 +548,8 @@ func TestCorrelateRecordsForIntraNodeFlow(t *testing.T) {
runCorrelationAndCheckResult(t, ap, record1, nil, false, true, false)
// Cleanup the flowKeyMap in aggregation process.
flowKey1, _ := getFlowKeyFromRecord(record1)
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
err := ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
// Test IPv6 fields.
record1 = createDataMsgForSrc(t, true, true, false, false).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record1, nil, true, true, false)
Expand All @@ -568,7 +568,8 @@ func TestCorrelateRecordsForToExternalFlow(t *testing.T) {
runCorrelationAndCheckResult(t, ap, record1, nil, false, true, true)
// Cleanup the flowKeyMap in aggregation process.
flowKey1, _ := getFlowKeyFromRecord(record1)
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
err := ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
// Test IPv6 fields.
record1 = createDataMsgForSrc(t, true, true, false, true).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record1, nil, true, true, true)
Expand Down Expand Up @@ -616,13 +617,15 @@ func TestDeleteFlowKeyFromMapWithLock(t *testing.T) {
}
aggregationProcess.flowKeyRecordMap[flowKey1] = aggFlowRecord
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
aggregationProcess.DeleteFlowKeyFromMapWithLock(flowKey2)
err := aggregationProcess.DeleteFlowKeyFromMap(flowKey2)
assert.Error(t, err)
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
aggregationProcess.DeleteFlowKeyFromMapWithLock(flowKey1)
err = aggregationProcess.DeleteFlowKeyFromMap(flowKey1)
assert.NoError(t, err)
assert.Empty(t, aggregationProcess.flowKeyRecordMap)
}

func TestDeleteFlowKeyFromMapWithoutLock(t *testing.T) {
func TestAggregationProcess_GetLastUpdatedTimeOfFlow(t *testing.T) {
messageChan := make(chan *entities.Message)
input := AggregationInput{
MessageChan: messageChan,
Expand All @@ -632,22 +635,13 @@ func TestDeleteFlowKeyFromMapWithoutLock(t *testing.T) {
aggregationProcess, _ := InitAggregationProcess(input)
message := createDataMsgForSrc(t, false, false, false, false)
flowKey1 := FlowKey{"10.0.0.1", "10.0.0.2", 6, 1234, 5678}
flowKey2 := FlowKey{"2001:0:3238:dfe1:63::fefb", "2001:0:3238:dfe1:63::fefc", 6, 1234, 5678}
aggFlowRecord := AggregationFlowRecord{
message.GetSet().GetRecords()[0],
true,
true,
}
aggregationProcess.flowKeyRecordMap[flowKey1] = aggFlowRecord
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
aggregationProcess.mutex.Lock()
aggregationProcess.DeleteFlowKeyFromMapWithoutLock(flowKey2)
aggregationProcess.mutex.Unlock()
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
aggregationProcess.mutex.Lock()
aggregationProcess.DeleteFlowKeyFromMapWithoutLock(flowKey1)
aggregationProcess.mutex.Unlock()
assert.Empty(t, aggregationProcess.flowKeyRecordMap)
_, err := aggregationProcess.GetLastUpdatedTimeOfFlow(flowKey1)
assert.Error(t, err)
err = aggregationProcess.addOrUpdateRecordInMap(&flowKey1, message.GetSet().GetRecords()[0])
assert.NoError(t, err)
flowUpdatedTime, err := aggregationProcess.GetLastUpdatedTimeOfFlow(flowKey1)
assert.NoError(t, err)
assert.Equal(t, uint32(1), flowUpdatedTime)
}

func runCorrelationAndCheckResult(t *testing.T, ap *AggregationProcess, record1, record2 entities.Record, isIPv6, isIntraNode, isToExternal bool) {
Expand Down

0 comments on commit 19760d7

Please sign in to comment.