From 19760d7cef9db366dc2d3175f0ee05fda1f50918 Mon Sep 17 00:00:00 2001 From: srikartati Date: Mon, 3 May 2021 16:16:21 -0700 Subject: [PATCH] Modify Aggregation process methods (#158) (#181) Delete the unrequired method of deleting record from record map without lock. Add a method to get the flow updated time for flow given flow key --- pkg/intermediate/aggregate.go | 29 +++++++++++++------ pkg/intermediate/aggregate_test.go | 46 +++++++++++++----------------- 2 files changed, 41 insertions(+), 34 deletions(-) diff --git a/pkg/intermediate/aggregate.go b/pkg/intermediate/aggregate.go index 20f651fd4dd..c034a9154ea 100644 --- a/pkg/intermediate/aggregate.go +++ b/pkg/intermediate/aggregate.go @@ -148,19 +148,32 @@ func (a *AggregationProcess) ForAllRecordsDo(callback FlowKeyRecordMapCallBack) return nil } -func (a *AggregationProcess) DeleteFlowKeyFromMapWithLock(flowKey FlowKey) { +// GetLastUpdatedTimeOfFlow provides the last updated time in the format of IPFIX +// field "flowEndSeconds". +func (a *AggregationProcess) GetLastUpdatedTimeOfFlow(flowKey FlowKey) (uint32, error) { a.mutex.Lock() defer a.mutex.Unlock() - delete(a.flowKeyRecordMap, flowKey) + record, exists := a.flowKeyRecordMap[flowKey] + if !exists { + return 0, fmt.Errorf("flow key is not present in the map") + } + flowEndField, exists := record.Record.GetInfoElementWithValue("flowEndSeconds") + if exists { + return flowEndField.Value.(uint32), nil + } else { + return 0, fmt.Errorf("flowEndSeconds field is not present in the record") + } } -// DeleteFlowKeyFromMapWithoutLock need to be used only when the caller has already -// acquired the lock. For example, this can be used in a callback of ForAllRecordsDo -// function. -// TODO:Remove this when there is notion of invalid flows supported in aggregation -// process. -func (a *AggregationProcess) DeleteFlowKeyFromMapWithoutLock(flowKey FlowKey) { +func (a *AggregationProcess) DeleteFlowKeyFromMap(flowKey FlowKey) error { + a.mutex.Lock() + defer a.mutex.Unlock() + _, exists := a.flowKeyRecordMap[flowKey] + if !exists { + return fmt.Errorf("flow key is not present in the map") + } delete(a.flowKeyRecordMap, flowKey) + return nil } // addOrUpdateRecordInMap either adds the record to flowKeyMap or updates the record in diff --git a/pkg/intermediate/aggregate_test.go b/pkg/intermediate/aggregate_test.go index e5bc8735d25..17624883990 100644 --- a/pkg/intermediate/aggregate_test.go +++ b/pkg/intermediate/aggregate_test.go @@ -515,21 +515,20 @@ func TestCorrelateRecordsForInterNodeFlow(t *testing.T) { runCorrelationAndCheckResult(t, ap, record1, record2, false, false, false) // Cleanup the flowKeyMap in aggregation process. flowKey1, _ := getFlowKeyFromRecord(record1) - ap.DeleteFlowKeyFromMapWithLock(*flowKey1) + err := ap.DeleteFlowKeyFromMap(*flowKey1) + assert.NoError(t, err) // Test the scenario, where record2 is added first and then record1. record1 = createDataMsgForSrc(t, false, false, false, false).GetSet().GetRecords()[0] record2 = createDataMsgForDst(t, false, false, false).GetSet().GetRecords()[0] runCorrelationAndCheckResult(t, ap, record2, record1, false, false, false) // Cleanup the flowKeyMap in aggregation process. - ap.DeleteFlowKeyFromMapWithLock(*flowKey1) - + err = ap.DeleteFlowKeyFromMap(*flowKey1) + assert.NoError(t, err) // Test IPv6 fields. // Test the scenario, where record1 is added first and then record2. record1 = createDataMsgForSrc(t, true, false, false, false).GetSet().GetRecords()[0] record2 = createDataMsgForDst(t, true, false, false).GetSet().GetRecords()[0] runCorrelationAndCheckResult(t, ap, record1, record2, true, false, false) - // Cleanup the flowKeyMap in aggregation process. - ap.DeleteFlowKeyFromMapWithLock(*flowKey1) // Test the scenario, where record2 is added first and then record1. record1 = createDataMsgForSrc(t, true, false, false, false).GetSet().GetRecords()[0] record2 = createDataMsgForDst(t, true, false, false).GetSet().GetRecords()[0] @@ -549,7 +548,8 @@ func TestCorrelateRecordsForIntraNodeFlow(t *testing.T) { runCorrelationAndCheckResult(t, ap, record1, nil, false, true, false) // Cleanup the flowKeyMap in aggregation process. flowKey1, _ := getFlowKeyFromRecord(record1) - ap.DeleteFlowKeyFromMapWithLock(*flowKey1) + err := ap.DeleteFlowKeyFromMap(*flowKey1) + assert.NoError(t, err) // Test IPv6 fields. record1 = createDataMsgForSrc(t, true, true, false, false).GetSet().GetRecords()[0] runCorrelationAndCheckResult(t, ap, record1, nil, true, true, false) @@ -568,7 +568,8 @@ func TestCorrelateRecordsForToExternalFlow(t *testing.T) { runCorrelationAndCheckResult(t, ap, record1, nil, false, true, true) // Cleanup the flowKeyMap in aggregation process. flowKey1, _ := getFlowKeyFromRecord(record1) - ap.DeleteFlowKeyFromMapWithLock(*flowKey1) + err := ap.DeleteFlowKeyFromMap(*flowKey1) + assert.NoError(t, err) // Test IPv6 fields. record1 = createDataMsgForSrc(t, true, true, false, true).GetSet().GetRecords()[0] runCorrelationAndCheckResult(t, ap, record1, nil, true, true, true) @@ -616,13 +617,15 @@ func TestDeleteFlowKeyFromMapWithLock(t *testing.T) { } aggregationProcess.flowKeyRecordMap[flowKey1] = aggFlowRecord assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap)) - aggregationProcess.DeleteFlowKeyFromMapWithLock(flowKey2) + err := aggregationProcess.DeleteFlowKeyFromMap(flowKey2) + assert.Error(t, err) assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap)) - aggregationProcess.DeleteFlowKeyFromMapWithLock(flowKey1) + err = aggregationProcess.DeleteFlowKeyFromMap(flowKey1) + assert.NoError(t, err) assert.Empty(t, aggregationProcess.flowKeyRecordMap) } -func TestDeleteFlowKeyFromMapWithoutLock(t *testing.T) { +func TestAggregationProcess_GetLastUpdatedTimeOfFlow(t *testing.T) { messageChan := make(chan *entities.Message) input := AggregationInput{ MessageChan: messageChan, @@ -632,22 +635,13 @@ func TestDeleteFlowKeyFromMapWithoutLock(t *testing.T) { aggregationProcess, _ := InitAggregationProcess(input) message := createDataMsgForSrc(t, false, false, false, false) flowKey1 := FlowKey{"10.0.0.1", "10.0.0.2", 6, 1234, 5678} - flowKey2 := FlowKey{"2001:0:3238:dfe1:63::fefb", "2001:0:3238:dfe1:63::fefc", 6, 1234, 5678} - aggFlowRecord := AggregationFlowRecord{ - message.GetSet().GetRecords()[0], - true, - true, - } - aggregationProcess.flowKeyRecordMap[flowKey1] = aggFlowRecord - assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap)) - aggregationProcess.mutex.Lock() - aggregationProcess.DeleteFlowKeyFromMapWithoutLock(flowKey2) - aggregationProcess.mutex.Unlock() - assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap)) - aggregationProcess.mutex.Lock() - aggregationProcess.DeleteFlowKeyFromMapWithoutLock(flowKey1) - aggregationProcess.mutex.Unlock() - assert.Empty(t, aggregationProcess.flowKeyRecordMap) + _, err := aggregationProcess.GetLastUpdatedTimeOfFlow(flowKey1) + assert.Error(t, err) + err = aggregationProcess.addOrUpdateRecordInMap(&flowKey1, message.GetSet().GetRecords()[0]) + assert.NoError(t, err) + flowUpdatedTime, err := aggregationProcess.GetLastUpdatedTimeOfFlow(flowKey1) + assert.NoError(t, err) + assert.Equal(t, uint32(1), flowUpdatedTime) } func runCorrelationAndCheckResult(t *testing.T, ap *AggregationProcess, record1, record2 entities.Record, isIPv6, isIntraNode, isToExternal bool) {