Skip to content

Commit

Permalink
Fix race condition in agent Traceflow controller
Browse files Browse the repository at this point in the history
It may happen that a Traceflow is assigned with a tag that was just
released from an old Traceflow but the controller hasn't processed the
deletion event of the old Traceflow yet. Previously the controller
skipped starting new Traceflow if the tag was already being used, which
caused the Traceflow to timeout.

The commit adds a check when determining whether it should start a
Traceflow. If the tag is associated with another Traceflow, it will
clean it up then start a new trace for the current one.

It also fixes a bug in cleanupTraceflow, which might uninstall flows for
another Traceflow if the tag is reassigned.

Signed-off-by: Quan Tian <[email protected]>
  • Loading branch information
tnqn committed Feb 1, 2024
1 parent a94cf75 commit 97be1dd
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 24 deletions.
37 changes: 17 additions & 20 deletions pkg/agent/controller/traceflow/traceflow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,17 @@ func (c *Controller) syncTraceflow(traceflowName string) error {
if tf.Status.DataplaneTag != 0 {
start := false
c.runningTraceflowsMutex.Lock()
if _, ok := c.runningTraceflows[tf.Status.DataplaneTag]; !ok {
tfState, ok := c.runningTraceflows[tf.Status.DataplaneTag]
c.runningTraceflowsMutex.Unlock()
// This may happen if a Traceflow is assigned with a tag that was just released from an old Traceflow but
// the agent hasn't processed the deletion event of the old Traceflow yet.
if ok && tfState.name != traceflowName {
klog.V(2).InfoS("Found another Traceflow associated with the dataplane tag, cleaning it up", "tag", tf.Status.DataplaneTag, "currentTraceflow", traceflowName, "otherTraceflow", tfState.name)
c.cleanupTraceflow(tfState.name)
start = true
} else if !ok {
start = true
}
c.runningTraceflowsMutex.Unlock()
if start {
err = c.startTraceflow(tf)
}
Expand Down Expand Up @@ -570,29 +577,19 @@ func (c *Controller) errorTraceflowCRD(tf *crdv1beta1.Traceflow, reason string)
return c.crdClient.CrdV1beta1().Traceflows().Patch(context.TODO(), tf.Name, types.MergePatchType, payloads, metav1.PatchOptions{}, "status")
}

// Delete Traceflow from cache.
func (c *Controller) deleteTraceflowState(tfName string) *traceflowState {
// Delete Traceflow state and OVS flows.
func (c *Controller) cleanupTraceflow(tfName string) {
c.runningTraceflowsMutex.Lock()
defer c.runningTraceflowsMutex.Unlock()
// Controller could have deallocated the tag and cleared the DataplaneTag
// field in the Traceflow Status, so try looking up the tag from the
// cache by Traceflow name.
for tag, tfState := range c.runningTraceflows {
if tfName == tfState.name {
// This must be executed before deleting the tag from runningTraceflows, otherwise it may uninstall another
// Traceflow's flows if the tag is reassigned.
if err := c.ofClient.UninstallTraceflowFlows(uint8(tag)); err != nil {
klog.ErrorS(err, "Failed to uninstall Traceflow flows", "Traceflow", tfName, "state", tfState)
}
delete(c.runningTraceflows, tag)
return tfState
}
}
return nil
}

// Delete Traceflow state and OVS flows.
func (c *Controller) cleanupTraceflow(tfName string) {
tfState := c.deleteTraceflowState(tfName)
if tfState != nil {
err := c.ofClient.UninstallTraceflowFlows(uint8(tfState.tag))
if err != nil {
klog.Errorf("Failed to uninstall Traceflow %s flows: %v", tfName, err)
break
}
}
}
80 changes: 76 additions & 4 deletions pkg/agent/controller/traceflow/traceflow_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,8 @@ func TestSyncTraceflow(t *testing.T) {
tcs := []struct {
name string
tf *crdv1beta1.Traceflow
tfState *traceflowState
existingState *traceflowState
newState *traceflowState
expectedCalls func(mockOFClient *openflowtest.MockClient)
}{
{
Expand All @@ -698,10 +699,77 @@ func TestSyncTraceflow(t *testing.T) {
DataplaneTag: 1,
},
},
tfState: &traceflowState{
existingState: &traceflowState{
name: "tf1",
tag: 1,
},
newState: &traceflowState{
name: "tf1",
tag: 1,
},
},
{
name: "traceflow in running phase with empty state",
tf: &crdv1beta1.Traceflow{
ObjectMeta: metav1.ObjectMeta{Name: "tf1", UID: "uid1"},
Spec: crdv1beta1.TraceflowSpec{
Source: crdv1beta1.Source{
Namespace: pod1.Namespace,
Pod: pod1.Name,
},
Destination: crdv1beta1.Destination{
Namespace: pod2.Namespace,
Pod: pod2.Name,
},
},
Status: crdv1beta1.TraceflowStatus{
Phase: crdv1beta1.Running,
DataplaneTag: 1,
},
},
newState: &traceflowState{
name: "tf1",
tag: 1,
isSender: true,
},
expectedCalls: func(mockOFClient *openflowtest.MockClient) {
mockOFClient.EXPECT().InstallTraceflowFlows(uint8(1), false, false, false, nil, uint32(1), uint16(20))
mockOFClient.EXPECT().SendTraceflowPacket(uint8(1), gomock.Any(), ofPortPod1, int32(-1))
},
},
{
name: "traceflow in running phase with conflict state",
tf: &crdv1beta1.Traceflow{
ObjectMeta: metav1.ObjectMeta{Name: "tf1", UID: "uid1"},
Spec: crdv1beta1.TraceflowSpec{
Source: crdv1beta1.Source{
Namespace: pod1.Namespace,
Pod: pod1.Name,
},
Destination: crdv1beta1.Destination{
Namespace: pod2.Namespace,
Pod: pod2.Name,
},
},
Status: crdv1beta1.TraceflowStatus{
Phase: crdv1beta1.Running,
DataplaneTag: 1,
},
},
existingState: &traceflowState{
name: "tf2",
tag: 1,
},
newState: &traceflowState{
name: "tf1",
tag: 1,
isSender: true,
},
expectedCalls: func(mockOFClient *openflowtest.MockClient) {
mockOFClient.EXPECT().UninstallTraceflowFlows(uint8(1))
mockOFClient.EXPECT().InstallTraceflowFlows(uint8(1), false, false, false, nil, uint32(1), uint16(20))
mockOFClient.EXPECT().SendTraceflowPacket(uint8(1), gomock.Any(), ofPortPod1, int32(-1))
},
},
{
name: "traceflow in failed phase",
Expand All @@ -722,7 +790,7 @@ func TestSyncTraceflow(t *testing.T) {
DataplaneTag: 1,
},
},
tfState: &traceflowState{
existingState: &traceflowState{
name: "tf1",
tag: 1,
},
Expand All @@ -740,13 +808,17 @@ func TestSyncTraceflow(t *testing.T) {
tfc.crdInformerFactory.Start(stopCh)
tfc.crdInformerFactory.WaitForCacheSync(stopCh)

tfc.runningTraceflows[tt.tf.Status.DataplaneTag] = tt.tfState
if tt.existingState != nil {
tfc.runningTraceflows[tt.tf.Status.DataplaneTag] = tt.existingState
}

if tt.expectedCalls != nil {
tt.expectedCalls(tfc.mockOFClient)
}

err := tfc.syncTraceflow(tt.tf.Name)
require.NoError(t, err)
assert.Equal(t, tt.newState, tfc.runningTraceflows[tt.tf.Status.DataplaneTag])
})
}
}
Expand Down

0 comments on commit 97be1dd

Please sign in to comment.