Skip to content

Commit

Permalink
fix(controller): Fixed workflow stuck with mutex lock (#4744)
Browse files Browse the repository at this point in the history
Signed-off-by: Saravanan Balasubramanian <[email protected]>
  • Loading branch information
sarabala1979 authored Dec 15, 2020
1 parent 1a7ed73 commit a1f7aed
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 3 deletions.
18 changes: 16 additions & 2 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2510,6 +2510,9 @@ func (ss *SemaphoreStatus) LockWaiting(holderKey, lockKey string, currentHolders
func (ss *SemaphoreStatus) LockAcquired(holderKey, lockKey string, currentHolders []string) bool {
i, semaphoreHolding := ss.GetHolding(lockKey)
items := strings.Split(holderKey, "/")
if len(items) == 0 {
return false
}
holdingName := items[len(items)-1]
if i < 0 {
ss.Holding = append(ss.Holding, SemaphoreHolding{Semaphore: lockKey, Holders: []string{holdingName}})
Expand All @@ -2525,6 +2528,9 @@ func (ss *SemaphoreStatus) LockAcquired(holderKey, lockKey string, currentHolder
func (ss *SemaphoreStatus) LockReleased(holderKey, lockKey string) bool {
i, semaphoreHolding := ss.GetHolding(lockKey)
items := strings.Split(holderKey, "/")
if len(items) == 0 {
return false
}
holdingName := items[len(items)-1]
if i >= 0 {
semaphoreHolding.Holders = slice.RemoveString(semaphoreHolding.Holders, holdingName)
Expand Down Expand Up @@ -2599,6 +2605,9 @@ func (ms *MutexStatus) LockWaiting(holderKey, lockKey string, currentHolders []s
func (ms *MutexStatus) LockAcquired(holderKey, lockKey string, currentHolders []string) bool {
i, mutexHolding := ms.GetHolding(lockKey)
items := strings.Split(holderKey, "/")
if len(items) == 0 {
return false
}
holdingName := items[len(items)-1]
if i < 0 {
ms.Holding = append(ms.Holding, MutexHolding{Mutex: lockKey, Holder: holdingName})
Expand All @@ -2612,8 +2621,13 @@ func (ms *MutexStatus) LockAcquired(holderKey, lockKey string, currentHolders []
}

func (ms *MutexStatus) LockReleased(holderKey, lockKey string) bool {
i, _ := ms.GetHolding(lockKey)
if i >= 0 {
i, holder := ms.GetHolding(lockKey)
items := strings.Split(holderKey, "/")
if len(items) == 0 {
return false
}
holdingName := items[len(items)-1]
if i >= 0 && holder.Holder == holdingName {
ms.Holding = append(ms.Holding[:i], ms.Holding[i+1:]...)
return true
}
Expand Down
6 changes: 5 additions & 1 deletion workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmplCtx *templateresolu

sgNode := woc.executeStepGroup(stepGroup.Steps, sgNodeName, &stepsCtx)

if !sgNode.Fulfilled() {
if sgNode.Fulfilled() {
if tmpl.Synchronization != nil {
woc.controller.syncManager.Release(woc.wf, node.ID, tmpl.Synchronization)
}
} else {
woc.log.Infof("Workflow step group node %s not yet completed", sgNode.ID)
return node, nil
}
Expand Down
7 changes: 7 additions & 0 deletions workflow/sync/mutex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func TestMutexTmplLevel(t *testing.T) {
assert.False(t, wfUpdate)
assert.False(t, status)

assert.Equal(t, "synchronization-tmpl-level-mutex-vjcdk-3941195474", wf.Status.Synchronization.Mutex.Holding[0].Holder)
concurrenyMgr.Release(wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474", tmpl.Synchronization)
assert.NotNil(t, wf.Status.Synchronization)
assert.NotNil(t, wf.Status.Synchronization.Mutex)
Expand All @@ -324,5 +325,11 @@ func TestMutexTmplLevel(t *testing.T) {
assert.NotNil(t, wf.Status.Synchronization)
assert.NotNil(t, wf.Status.Synchronization.Mutex)
assert.Equal(t, "synchronization-tmpl-level-mutex-vjcdk-2216915482", wf.Status.Synchronization.Mutex.Holding[0].Holder)

assert.NotEqual(t, "synchronization-tmpl-level-mutex-vjcdk-3941195474", wf.Status.Synchronization.Mutex.Holding[0].Holder)
concurrenyMgr.Release(wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474", tmpl.Synchronization)
assert.NotNil(t, wf.Status.Synchronization)
assert.NotNil(t, wf.Status.Synchronization.Mutex)
assert.NotEmpty(t, wf.Status.Synchronization.Mutex.Holding)
})
}

0 comments on commit a1f7aed

Please sign in to comment.