Skip to content

Commit

Permalink
add tcpState and optimize connections for flow exporter (#1904)
Browse files Browse the repository at this point in the history
This PR fetches tcp state from conntrack and adds as tcpState
information element for flow exporter and flow aggregator. It optimizes the
connections poll, update and deletion as follows:
1. Delete connections from connection map when the connection no
longer exists in conntrack table and related flow records are deleted
(connections.go)
2. Delete flow records of corresponding dying (is closed or about to
close) or nonextant connection after exporting (exporter.go)
3. Avoid updating connection map and flow records map when the connection
is in dying state, avoid adding flow records when the connection is in
dying state and corresponding flow records have been exported.
It also moves go-ipfix to v0.4.7 which adds tcpState and flowType in Antrea
registry and update kibana dashboard index pattern for new fields.
  • Loading branch information
zyiou committed Apr 7, 2021
1 parent 2eaa916 commit e027c7b
Show file tree
Hide file tree
Showing 21 changed files with 340 additions and 180 deletions.
123 changes: 61 additions & 62 deletions build/yamls/elk-flow-collector/kibana.ndjson

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions build/yamls/elk-flow-collector/logstash/ipfix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4118,3 +4118,9 @@
135:
- :uint64
- :reverseOctetDeltaCountFromDestinationNode
136:
- :string
- :tcpState
137:
- :uint8
- :flowType
2 changes: 1 addition & 1 deletion ci/kind/test-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ if $np; then
manifest_args="$manifest_args --np --tun vxlan"
fi

COMMON_IMAGES_LIST=("gcr.io/kubernetes-e2e-test-images/agnhost:2.8" "projects.registry.vmware.com/library/busybox" "projects.registry.vmware.com/antrea/nginx" "projects.registry.vmware.com/antrea/perftool" "projects.registry.vmware.com/antrea/ipfix-collector:v0.4.3")
COMMON_IMAGES_LIST=("gcr.io/kubernetes-e2e-test-images/agnhost:2.8" "projects.registry.vmware.com/library/busybox" "projects.registry.vmware.com/antrea/nginx" "projects.registry.vmware.com/antrea/perftool" "projects.registry.vmware.com/antrea/ipfix-collector:v0.4.7")
for image in "${COMMON_IMAGES_LIST[@]}"; do
docker pull $image
done
Expand Down
3 changes: 3 additions & 0 deletions docs/network-flow-visibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ the flow. All the IEs used by the Antrea Flow Exporter are listed below:
|--------------------------|---------------|----------|----------------|
| flowStartSeconds | 0 | 150 | dateTimeSeconds|
| flowEndSeconds | 0 | 151 | dateTimeSeconds|
| flowEndReason | 0 | 136 | unsigned8 |
| sourceIPv4Address | 0 | 8 | ipv4Address |
| destinationIPv4Address | 0 | 12 | ipv4Address |
| sourceIPv6Address | 0 | 27 | ipv6Address |
Expand Down Expand Up @@ -159,6 +160,8 @@ the flow. All the IEs used by the Antrea Flow Exporter are listed below:
| ingressNetworkPolicyNamespace| 56506 | 111 | string |
| egressNetworkPolicyName | 56506 | 112 | string |
| egressNetworkPolicyNamespace | 56506 | 113 | string |
| tcpState | 56506 | 136 | string |
| flowType | 56506 | 137 | unsigned8 |

### Supported capabilities

Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ require (
github.com/golang/protobuf v1.3.2
github.com/google/uuid v1.1.1
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd
github.com/kr/text v0.2.0 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/common v0.4.1
Expand All @@ -42,7 +41,7 @@ require (
github.com/stretchr/testify v1.6.1
github.com/ti-mo/conntrack v0.3.0
github.com/vishvananda/netlink v1.1.0
github.com/vmware/go-ipfix v0.4.5
github.com/vmware/go-ipfix v0.4.7
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495
golang.org/x/mod v0.4.0
Expand Down
8 changes: 3 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/d2g/dhcp4 v0.0.0-20170904100407-a1d1b6c41b1c/go.mod h1:Ct2BUK8SB0YC1SMSibvLzxjeJLnrYEVLULFNiHY9YfQ=
github.com/d2g/dhcp4client v1.0.0/go.mod h1:j0hNfjhrt2SxUOw55nL0ATM/z4Yt3t2Kd1mW34z5W5s=
github.com/d2g/dhcp4server v0.0.0-20181031114812-7d4a0a7f59a5/go.mod h1:Eo87+Kg/IX2hfWJfwxMzLyuSZyxSoAug2nGa1G2QAi8=
Expand Down Expand Up @@ -251,9 +250,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
Expand Down Expand Up @@ -407,8 +405,8 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vmware/go-ipfix v0.4.5 h1:EwG2bQXKT72IzMOsCcbvP1Po2PncLoSoPuYrHf3YrsI=
github.com/vmware/go-ipfix v0.4.5/go.mod h1:lQz3f4r2pZWo0q8s8BtZ0xo5fPSOYsYteqJgBASP69o=
github.com/vmware/go-ipfix v0.4.7 h1:zyKpsifh19Mdmo6FE3C2GooAWiiThX2JV+X4VdzqeZY=
github.com/vmware/go-ipfix v0.4.7/go.mod h1:lQz3f4r2pZWo0q8s8BtZ0xo5fPSOYsYteqJgBASP69o=
github.com/wenyingd/ofnet v0.0.0-20210318032909-171b6795a2da h1:ragN21nQa4zKuCwR2UEbTXEAh3L2YN/Id5SCVkjjwdY=
github.com/wenyingd/ofnet v0.0.0-20210318032909-171b6795a2da/go.mod h1:8mMMWAYBNUeTGXYKizOLETfN3WIbu3P5DgvS2jiXKdI=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
Expand Down
74 changes: 47 additions & 27 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,15 @@ type ConnectionStore interface {
// Run enables the periodical polling of conntrack connections at a given flowPollInterval.
Run(stopCh <-chan struct{})
// GetConnByKey gets the connection in connection map given the connection key.
GetConnByKey(flowTuple flowexporter.ConnectionKey) (*flowexporter.Connection, bool)
GetConnByKey(connKey flowexporter.ConnectionKey) (*flowexporter.Connection, bool)
// SetExportDone sets DoneExport field of connection to true given the connection key.
SetExportDone(connKey flowexporter.ConnectionKey) error
// ForAllConnectionsDo execute the callback for each connection in connection map.
ForAllConnectionsDo(callback flowexporter.ConnectionMapCallBack) error
// DeleteConnectionByKey deletes the connection in connection map given the
// connection key. This function is called from Flow Exporter once the connection
// is deleted from conntrack module.
DeleteConnectionByKey(connKey flowexporter.ConnectionKey) error
}

type connectionStore struct {
connections map[flowexporter.ConnectionKey]flowexporter.Connection
connections map[flowexporter.ConnectionKey]*flowexporter.Connection
flowRecords *flowrecords.FlowRecords
connDumper ConnTrackDumper
ifaceStore interfacestore.InterfaceStore
Expand All @@ -75,7 +73,7 @@ func NewConnectionStore(
pollInterval time.Duration,
) *connectionStore {
return &connectionStore{
connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection),
connections: make(map[flowexporter.ConnectionKey]*flowexporter.Connection),
flowRecords: flowRecords,
connDumper: connTrackDumper,
ifaceStore: ifaceStore,
Expand Down Expand Up @@ -117,21 +115,25 @@ func (cs *connectionStore) Run(stopCh <-chan struct{}) {
func (cs *connectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
connKey := flowexporter.NewConnectionKey(conn)

existingConn, exists := cs.GetConnByKey(connKey)

cs.mutex.Lock()
defer cs.mutex.Unlock()
existingConn, exists := cs.connections[connKey]

if exists {
// avoid updating stats of the existing connection that is about to close
if flowexporter.IsConnectionDying(existingConn) {
existingConn.IsPresent = true
return
}
// Update the necessary fields that are used in generating flow records.
// Can same 5-tuple flow get deleted and added to conntrack table? If so use ID.
existingConn.StopTime = conn.StopTime
existingConn.OriginalBytes = conn.OriginalBytes
existingConn.OriginalPackets = conn.OriginalPackets
existingConn.ReverseBytes = conn.ReverseBytes
existingConn.ReversePackets = conn.ReversePackets
existingConn.TCPState = conn.TCPState
existingConn.IsPresent = true
// Reassign the flow to update the map
cs.connections[connKey] = *existingConn
klog.V(4).Infof("Antrea flow updated: %v", existingConn)
} else {
// sourceIP/destinationIP are mapped only to local pods and not remote pods.
Expand Down Expand Up @@ -201,16 +203,28 @@ func (cs *connectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
metrics.TotalAntreaConnectionsInConnTrackTable.Inc()
klog.V(4).Infof("New Antrea flow added: %v", conn)
// Add new antrea connection to connection store
cs.connections[connKey] = *conn
cs.connections[connKey] = conn
}
}

// GetConnByKey gets the connection in connection map given the connection key.
func (cs *connectionStore) GetConnByKey(flowTuple flowexporter.ConnectionKey) (*flowexporter.Connection, bool) {
func (cs *connectionStore) GetConnByKey(connKey flowexporter.ConnectionKey) (*flowexporter.Connection, bool) {
cs.mutex.Lock()
defer cs.mutex.Unlock()
conn, found := cs.connections[flowTuple]
return &conn, found
conn, found := cs.connections[connKey]
return conn, found
}

// SetExportDone sets DoneExport field of connection to true given the connection key.
func (cs *connectionStore) SetExportDone(connKey flowexporter.ConnectionKey) error {
cs.mutex.Lock()
defer cs.mutex.Unlock()
if conn, found := cs.connections[connKey]; !found {
return fmt.Errorf("connection with key %v does not exist in connection map", connKey)
} else {
conn.DoneExport = true
return nil
}
}

// ForAllConnectionsDo execute the callback for each connection in connection map.
Expand All @@ -233,15 +247,22 @@ func (cs *connectionStore) ForAllConnectionsDo(callback flowexporter.ConnectionM
// TODO: As optimization, only poll invalid/closed connections during every poll, and poll the established connections right before the export.
func (cs *connectionStore) Poll() ([]int, error) {
klog.V(2).Infof("Polling conntrack")
// Reset isActive flag for all connections in connection map before dumping flows in conntrack module.
// This is to specify that the connection and the flow record can be deleted after the next export.
resetConn := func(key flowexporter.ConnectionKey, conn flowexporter.Connection) error {
conn.IsPresent = false
cs.connections[key] = conn
// Reset IsPresent flag for all connections in connection map before dumping flows in conntrack module.
// if the connection does not exist in conntrack table and has been exported, we will delete it from connection map.
deleteIfStaleOrResetConn := func(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error {
if !conn.IsPresent && conn.DoneExport {
if err := cs.deleteConnectionByKeyWithoutLock(key); err != nil {
return err
}
} else {
conn.IsPresent = false
}
return nil
}
// We do not expect any error as resetConn is not returning any error
cs.ForAllConnectionsDo(resetConn)

if err := cs.ForAllConnectionsDo(deleteIfStaleOrResetConn); err != nil {
return []int{}, err
}

var zones []uint16
var connsLens []int
Expand Down Expand Up @@ -275,14 +296,13 @@ func (cs *connectionStore) Poll() ([]int, error) {
return connsLens, nil
}

// DeleteConnectionByKey deletes the connection in connection map given the connection key.
func (cs *connectionStore) DeleteConnectionByKey(connKey flowexporter.ConnectionKey) error {
_, exists := cs.GetConnByKey(connKey)
// deleteConnectionByKeyWithoutLock deletes the connection in connection map given the
// connection key without grabbing the lock. Caller is expected to grab lock.
func (cs *connectionStore) deleteConnectionByKeyWithoutLock(connKey flowexporter.ConnectionKey) error {
_, exists := cs.connections[connKey]
if !exists {
return fmt.Errorf("connection with key %v doesn't exist in map", connKey)
}
cs.mutex.Lock()
defer cs.mutex.Unlock()
delete(cs.connections, connKey)
metrics.TotalAntreaConnectionsInConnTrackTable.Dec()
return nil
Expand Down
81 changes: 61 additions & 20 deletions pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
metrics.InitializeConnectionMetrics()
// Create two flows; one is already in connectionStore and other one is new
// Create three flows; two are already in connectionStore and another one is new
refTime := time.Now()
// Flow-1, which is already in connectionStore
tuple1, revTuple1 := makeTuple(&net.IP{1, 2, 3, 4}, &net.IP{4, 3, 2, 1}, 6, 65280, 255)
Expand Down Expand Up @@ -110,24 +110,38 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
TupleReply: revTuple2,
IsPresent: true,
}
// To test service name mapping.
tuple3, revTuple3 := makeTuple(&net.IP{10, 10, 10, 10}, &net.IP{20, 20, 20, 20}, 6, 5000, 80)
// Flow-3 , which is already in connectionStore
tuple3, revTuple3 := makeTuple(&net.IP{10, 10, 10, 10}, &net.IP{4, 3, 2, 1}, 6, 60000, 100)
testFlow3 := flowexporter.Connection{
TupleOrig: tuple3,
TupleReply: revTuple3,
Mark: openflow.ServiceCTMark,
IsPresent: true,
StartTime: refTime.Add(-(time.Second * 50)),
StopTime: refTime,
OriginalPackets: 0xffff,
OriginalBytes: 0xbaaaaa,
ReversePackets: 0xff,
ReverseBytes: 0xbaaa,
TupleOrig: tuple3,
TupleReply: revTuple3,
TCPState: "TIME_WAIT",
IsPresent: true,
}
// To test NetworkPolicy mapping.
tuple4, revTuple4 := makeTuple(&net.IP{30, 30, 30, 30}, &net.IP{20, 20, 20, 20}, 6, 5000, 80)
// To test service name mapping.
tuple4, revTuple4 := makeTuple(&net.IP{10, 10, 10, 10}, &net.IP{20, 20, 20, 20}, 6, 5000, 80)
testFlow4 := flowexporter.Connection{
TupleOrig: tuple4,
TupleReply: revTuple4,
Mark: openflow.ServiceCTMark,
IsPresent: true,
}
// To test NetworkPolicy mapping.
tuple5, revTuple5 := makeTuple(&net.IP{30, 30, 30, 30}, &net.IP{20, 20, 20, 20}, 6, 5000, 80)
testFlow5 := flowexporter.Connection{
TupleOrig: tuple5,
TupleReply: revTuple5,
Labels: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2},
IsPresent: true,
}
// Create copy of old conntrack flow for testing purposes.
// This flow is already in connection store.
// These two flows are already in connection store.
oldTestFlow1 := flowexporter.Connection{
StartTime: testFlow1.StartTime,
StopTime: testFlow1.StopTime.Add(-(time.Second * 30)),
Expand All @@ -142,6 +156,23 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
DestinationPodNamespace: "",
DestinationPodName: "",
IsPresent: true,
TCPState: "",
}
oldTestFlow3 := flowexporter.Connection{
StartTime: testFlow3.StartTime,
StopTime: testFlow3.StopTime.Add(-(time.Second * 30)),
OriginalPackets: 0xffff,
OriginalBytes: 0xbaaaaa,
ReversePackets: 0xff,
ReverseBytes: 0xbaaa,
TupleOrig: tuple3,
TupleReply: revTuple3,
SourcePodNamespace: "ns3",
SourcePodName: "pod3",
DestinationPodNamespace: "",
DestinationPodName: "",
IsPresent: true,
TCPState: "TIME_WAIT",
}
podConfigFlow2 := &interfacestore.ContainerInterfaceConfig{
ContainerID: "2",
Expand All @@ -168,19 +199,23 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl)
connStore := NewConnectionStore(mockConnDumper, flowrecords.NewFlowRecords(), mockIfaceStore, true, false, mockProxier, npQuerier, testPollInterval)

// Add flow1conn to the Connection map
// Add flow1conn and flow3conn to the Connection map
testFlow1Tuple := flowexporter.NewConnectionKey(&testFlow1)
connStore.connections[testFlow1Tuple] = oldTestFlow1
connStore.connections[testFlow1Tuple] = &oldTestFlow1
testFlow3Tuple := flowexporter.NewConnectionKey(&testFlow3)
connStore.connections[testFlow3Tuple] = &oldTestFlow3
// For testing purposes, increment the metric
metrics.TotalAntreaConnectionsInConnTrackTable.Inc()
metrics.TotalAntreaConnectionsInConnTrackTable.Inc()

addOrUpdateConnTests := []struct {
flow flowexporter.Connection
}{
{testFlow1}, // To test update part of function.
{testFlow2}, // To test add part of function.
{testFlow3}, // To test service name mapping.
{testFlow4}, // To test NetworkPolicy mapping.
{testFlow3}, // To test update part of function for dying connection.
{testFlow4}, // To test service name mapping.
{testFlow5}, // To test NetworkPolicy mapping.
}
for i, test := range addOrUpdateConnTests {
flowTuple := flowexporter.NewConnectionKey(&test.flow)
Expand All @@ -198,6 +233,13 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
expConn.DestinationPodNamespace = "ns2"
expConn.DestinationPodName = "pod2"
case 2:
// Tests update part of the function for dying connection.

expConn.SourcePodNamespace = "ns3"
expConn.SourcePodName = "pod3"
expConn.TCPState = "TIME_WAIT"
expConn.StopTime = refTime.Add(-(time.Second * 30))
case 3:
// Tests service name mapping.
mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleOrig.SourceAddress.String()).Return(nil, false)
mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleReply.SourceAddress.String()).Return(nil, false)
Expand All @@ -206,7 +248,7 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
serviceStr := fmt.Sprintf("%s:%d/%s", expConn.TupleOrig.DestinationAddress.String(), expConn.TupleOrig.DestinationPort, protocol)
mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true)
expConn.DestinationServicePortName = servicePortName.String()
case 3:
case 4:
// Tests NetworkPolicy mapping.
mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleOrig.SourceAddress.String()).Return(nil, false)
mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleReply.SourceAddress.String()).Return(nil, false)
Expand Down Expand Up @@ -272,13 +314,12 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
connStore := NewConnectionStore(mockConnDumper, flowrecords.NewFlowRecords(), mockIfaceStore, true, false, nil, nil, testPollInterval)
// Add flows to the Connection store
for i, flow := range testFlows {
connStore.connections[*testFlowKeys[i]] = *flow
connStore.connections[*testFlowKeys[i]] = flow
}

resetTwoFields := func(key flowexporter.ConnectionKey, conn flowexporter.Connection) error {
resetTwoFields := func(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error {
conn.IsPresent = false
conn.OriginalPackets = 0
connStore.connections[key] = conn
return nil
}
connStore.ForAllConnectionsDo(resetTwoFields)
Expand Down Expand Up @@ -337,11 +378,11 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
connStore := NewConnectionStore(mockConnDumper, flowrecords.NewFlowRecords(), mockIfaceStore, true, false, nil, nil, testPollInterval)
// Add flows to the connection store.
for i, flow := range testFlows {
connStore.connections[*testFlowKeys[i]] = *flow
connStore.connections[*testFlowKeys[i]] = flow
}
// Delete the connections in connection store.
for i := 0; i < len(testFlows); i++ {
err := connStore.DeleteConnectionByKey(*testFlowKeys[i])
err := connStore.deleteConnectionByKeyWithoutLock(*testFlowKeys[i])
assert.Nil(t, err, "DeleteConnectionByKey should return nil")
_, exists := connStore.GetConnByKey(*testFlowKeys[i])
assert.Equal(t, exists, false, "connection should be deleted in connection store")
Expand Down
Loading

0 comments on commit e027c7b

Please sign in to comment.