Skip to content

Commit

Permalink
Fix race condition in agent Traceflow controller (#6069)
Browse files Browse the repository at this point in the history
It may happen that a Traceflow is assigned with a tag that was just
released from an old Traceflow but the controller hasn't processed the
deletion event of the old Traceflow yet. Previously the controller
skipped starting new Traceflow if the tag was already being used, which
caused the Traceflow to timeout.

The commit adds a check when determining whether it should start a
Traceflow. If the tag is associated with another Traceflow, it will
clean it up then start a new trace for the current one.

It also fixes a bug in cleanupTraceflow, which might uninstall flows for
another Traceflow if the tag is reassigned.

Signed-off-by: Quan Tian <[email protected]>
  • Loading branch information
tnqn committed Mar 6, 2024
1 parent 1be7d47 commit 78bf4ef
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 26 deletions.
43 changes: 21 additions & 22 deletions pkg/agent/controller/traceflow/traceflow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ const (
)

type traceflowState struct {
name string
name string
// Used to uniquely identify Traceflow.
uid types.UID
tag int8
liveTraffic bool
droppedOnly bool
Expand Down Expand Up @@ -268,10 +270,17 @@ func (c *Controller) syncTraceflow(traceflowName string) error {
if tf.Status.DataplaneTag != 0 {
start := false
c.runningTraceflowsMutex.Lock()
if _, ok := c.runningTraceflows[tf.Status.DataplaneTag]; !ok {
tfState, ok := c.runningTraceflows[tf.Status.DataplaneTag]
c.runningTraceflowsMutex.Unlock()
// This may happen if a Traceflow is assigned with a tag that was just released from an old Traceflow but
// the agent hasn't processed the deletion event of the old Traceflow yet.
if ok && tfState.uid != tf.UID {
klog.V(2).InfoS("Found a stale Traceflow associated with the dataplane tag, cleaning it up", "tag", tf.Status.DataplaneTag, "currentTraceflow", traceflowName, "staleTraceflow", tfState.name)
c.cleanupTraceflow(tfState.name)
start = true
} else if !ok {
start = true
}
c.runningTraceflowsMutex.Unlock()
if start {
err = c.startTraceflow(tf)
}
Expand Down Expand Up @@ -336,7 +345,7 @@ func (c *Controller) startTraceflow(tf *crdv1beta1.Traceflow) error {
// Store Traceflow to cache.
c.runningTraceflowsMutex.Lock()
tfState := traceflowState{
name: tf.Name, tag: tf.Status.DataplaneTag,
uid: tf.UID, name: tf.Name, tag: tf.Status.DataplaneTag,
liveTraffic: liveTraffic, droppedOnly: tf.Spec.DroppedOnly && liveTraffic,
receiverOnly: receiverOnly, isSender: isSender}
c.runningTraceflows[tfState.tag] = &tfState
Expand Down Expand Up @@ -570,29 +579,19 @@ func (c *Controller) errorTraceflowCRD(tf *crdv1beta1.Traceflow, reason string)
return c.crdClient.CrdV1beta1().Traceflows().Patch(context.TODO(), tf.Name, types.MergePatchType, payloads, metav1.PatchOptions{}, "status")
}

// Delete Traceflow from cache.
func (c *Controller) deleteTraceflowState(tfName string) *traceflowState {
// Delete Traceflow state and OVS flows.
func (c *Controller) cleanupTraceflow(tfName string) {
c.runningTraceflowsMutex.Lock()
defer c.runningTraceflowsMutex.Unlock()
// Controller could have deallocated the tag and cleared the DataplaneTag
// field in the Traceflow Status, so try looking up the tag from the
// cache by Traceflow name.
for tag, tfState := range c.runningTraceflows {
if tfName == tfState.name {
// This must be executed before deleting the tag from runningTraceflows, otherwise it may uninstall another
// Traceflow's flows if the tag is reassigned.
if err := c.ofClient.UninstallTraceflowFlows(uint8(tag)); err != nil {
klog.ErrorS(err, "Failed to uninstall Traceflow flows", "Traceflow", tfName, "state", tfState)
}
delete(c.runningTraceflows, tag)
return tfState
}
}
return nil
}

// Delete Traceflow state and OVS flows.
func (c *Controller) cleanupTraceflow(tfName string) {
tfState := c.deleteTraceflowState(tfName)
if tfState != nil {
err := c.ofClient.UninstallTraceflowFlows(uint8(tfState.tag))
if err != nil {
klog.Errorf("Failed to uninstall Traceflow %s flows: %v", tfName, err)
break
}
}
}
85 changes: 81 additions & 4 deletions pkg/agent/controller/traceflow/traceflow_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,8 @@ func TestSyncTraceflow(t *testing.T) {
tcs := []struct {
name string
tf *crdv1beta1.Traceflow
tfState *traceflowState
existingState *traceflowState
newState *traceflowState
expectedCalls func(mockOFClient *openflowtest.MockClient)
}{
{
Expand All @@ -698,11 +699,83 @@ func TestSyncTraceflow(t *testing.T) {
DataplaneTag: 1,
},
},
tfState: &traceflowState{
existingState: &traceflowState{
name: "tf1",
uid: "uid1",
tag: 1,
},
newState: &traceflowState{
name: "tf1",
uid: "uid1",
tag: 1,
},
},
{
name: "traceflow in running phase with empty state",
tf: &crdv1beta1.Traceflow{
ObjectMeta: metav1.ObjectMeta{Name: "tf1", UID: "uid1"},
Spec: crdv1beta1.TraceflowSpec{
Source: crdv1beta1.Source{
Namespace: pod1.Namespace,
Pod: pod1.Name,
},
Destination: crdv1beta1.Destination{
Namespace: pod2.Namespace,
Pod: pod2.Name,
},
},
Status: crdv1beta1.TraceflowStatus{
Phase: crdv1beta1.Running,
DataplaneTag: 1,
},
},
newState: &traceflowState{
name: "tf1",
uid: "uid1",
tag: 1,
isSender: true,
},
expectedCalls: func(mockOFClient *openflowtest.MockClient) {
mockOFClient.EXPECT().InstallTraceflowFlows(uint8(1), false, false, false, nil, uint32(1), uint16(20))
mockOFClient.EXPECT().SendTraceflowPacket(uint8(1), gomock.Any(), ofPortPod1, int32(-1))
},
},
{
name: "traceflow in running phase with conflict state",
tf: &crdv1beta1.Traceflow{
ObjectMeta: metav1.ObjectMeta{Name: "tf1", UID: "uid1"},
Spec: crdv1beta1.TraceflowSpec{
Source: crdv1beta1.Source{
Namespace: pod1.Namespace,
Pod: pod1.Name,
},
Destination: crdv1beta1.Destination{
Namespace: pod2.Namespace,
Pod: pod2.Name,
},
},
Status: crdv1beta1.TraceflowStatus{
Phase: crdv1beta1.Running,
DataplaneTag: 1,
},
},
existingState: &traceflowState{
name: "tf1",
uid: "uid2",
tag: 1,
},
newState: &traceflowState{
name: "tf1",
uid: "uid1",
tag: 1,
isSender: true,
},
expectedCalls: func(mockOFClient *openflowtest.MockClient) {
mockOFClient.EXPECT().UninstallTraceflowFlows(uint8(1))
mockOFClient.EXPECT().InstallTraceflowFlows(uint8(1), false, false, false, nil, uint32(1), uint16(20))
mockOFClient.EXPECT().SendTraceflowPacket(uint8(1), gomock.Any(), ofPortPod1, int32(-1))
},
},
{
name: "traceflow in failed phase",
tf: &crdv1beta1.Traceflow{
Expand All @@ -722,7 +795,7 @@ func TestSyncTraceflow(t *testing.T) {
DataplaneTag: 1,
},
},
tfState: &traceflowState{
existingState: &traceflowState{
name: "tf1",
tag: 1,
},
Expand All @@ -740,13 +813,17 @@ func TestSyncTraceflow(t *testing.T) {
tfc.crdInformerFactory.Start(stopCh)
tfc.crdInformerFactory.WaitForCacheSync(stopCh)

tfc.runningTraceflows[tt.tf.Status.DataplaneTag] = tt.tfState
if tt.existingState != nil {
tfc.runningTraceflows[tt.tf.Status.DataplaneTag] = tt.existingState
}

if tt.expectedCalls != nil {
tt.expectedCalls(tfc.mockOFClient)
}

err := tfc.syncTraceflow(tt.tf.Name)
require.NoError(t, err)
assert.Equal(t, tt.newState, tfc.runningTraceflows[tt.tf.Status.DataplaneTag])
})
}
}
Expand Down

0 comments on commit 78bf4ef

Please sign in to comment.