Skip to content

Commit

Permalink
fix: check level first
Browse files Browse the repository at this point in the history
Signed-off-by: isubasinghe <[email protected]>
  • Loading branch information
isubasinghe committed Sep 17, 2024
1 parent 2e84c9b commit a48aeca
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 7 deletions.
2 changes: 1 addition & 1 deletion workflow/sync/mutex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ status:
mutex:
holding:
- holder: synchronization-wf-level-xxs94
mutex: default/mutex/test
mutex: default/Mutex/test
`

func TestMutexLock(t *testing.T) {
Expand Down
81 changes: 76 additions & 5 deletions workflow/sync/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,77 @@ func (cm *Manager) CheckWorkflowExistence() {
}
}

func getUpgradedKey(wf *wfv1.Workflow, key string) string {
func getUpgradedKey(wf *wfv1.Workflow, key string, level SyncLevelType) string {
if wfv1.CheckHolderKeyVersion(key) == wfv1.HoldingNameV1 {

if wf.Name == key {
if level == WorkflowLevel {
return getHolderKey(wf, "")
}
return getHolderKey(wf, key)
}
return key
}

type SyncLevelType int

const (
WorkflowLevel SyncLevelType = 1
TemplateLevel SyncLevelType = 2
ErrorLevel SyncLevelType = 3
)

// legacy keys can be of the form
// x where x is a workflow name
// unfortunately this doesn't differentiate between workflow level keys
// and template level keys. So upgrading is a bit tricky here.

// given a legacy holding name x, namespace y and workflow name z.
// in the case of a workflow level
// if x != z
// upgradedKey := y/z
// elseif x == z
// upgradedKey := y/z
// in the case of a template level
// if x != z
// upgradedKey := y/z/x
// elif x == z
// upgradedKey := y/z/x

// there is a possibility that
// a synchronization exists both at the template level
// and at the workflow level -> impossible to upgrade correctly
// due to ambiguity. Currently we just assume workflow level.
func getWorkflowSyncLevelByName(wf *wfv1.Workflow, lockName string) (SyncLevelType, error) {
if wf.Spec.Synchronization != nil {
syncLockName, err := GetLockName(wf.Spec.Synchronization, wf.Namespace)
if err != nil {
return ErrorLevel, err
}
checkName := syncLockName.EncodeName()
if lockName == checkName {
return WorkflowLevel, nil
}
}

var firstErr error
for _, template := range wf.Spec.Templates {
if template.Synchronization != nil {
syncLockName, err := GetLockName(template.Synchronization, wf.Namespace)
if err != nil {
firstErr = err
continue
}
checkName := syncLockName.EncodeName()
if lockName == checkName {
return TemplateLevel, nil
}
}
}
if firstErr == nil {
firstErr = fmt.Errorf("was unable to determine level for %s", lockName)
}
return ErrorLevel, firstErr
}

func (cm *Manager) Initialize(wfs []wfv1.Workflow) {
for _, wf := range wfs {
if wf.Status.Synchronization == nil {
Expand All @@ -97,7 +157,12 @@ func (cm *Manager) Initialize(wfs []wfv1.Workflow) {
}

for _, holders := range holding.Holders {
key := getUpgradedKey(&wf, holders)
level, err := getWorkflowSyncLevelByName(&wf, holding.Semaphore)
if err != nil {
log.Warnf("cannot obtain level for %s", holding.Semaphore)
continue
}
key := getUpgradedKey(&wf, holders, level)
if semaphore != nil && semaphore.acquire(key) {
log.Infof("Lock acquired by %s from %s", key, holding.Semaphore)
}
Expand All @@ -113,7 +178,12 @@ func (cm *Manager) Initialize(wfs []wfv1.Workflow) {
if mutex == nil {
mutex := cm.initializeMutex(holding.Mutex)
if holding.Holder != "" {
key := getUpgradedKey(&wf, holding.Holder)
level, err := getWorkflowSyncLevelByName(&wf, holding.Mutex)
if err != nil {
log.Warnf("cannot obtain lock level for %s", holding.Mutex)
continue
}
key := getUpgradedKey(&wf, holding.Holder, level)
mutex.acquire(key)
}
cm.syncLockMap[holding.Mutex] = mutex
Expand All @@ -127,6 +197,7 @@ func (cm *Manager) Initialize(wfs []wfv1.Workflow) {
// TryAcquire tries to acquire the lock from semaphore.
// It returns status of acquiring a lock , status of Workflow status updated, waiting message if lock is not available and any error encountered
func (cm *Manager) TryAcquire(wf *wfv1.Workflow, nodeName string, syncLockRef *wfv1.Synchronization) (bool, bool, string, error) {
log.Infof("[DEBUG] called TryAcquire with holderKey %s", getHolderKey(wf, nodeName))
cm.lock.Lock()
defer cm.lock.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion workflow/sync/sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,7 @@ func TestMutexMigration(t *testing.T) {
foundNodeID := ""
for nodeID := range wfMutex3.Status.Nodes {
holder := getHolderKey(wfMutex3, nodeID)
if holder == getUpgradedKey(wfMutex3, wfMutex3.Status.Synchronization.Mutex.Holding[0].Holder) {
if holder == getUpgradedKey(wfMutex3, wfMutex3.Status.Synchronization.Mutex.Holding[0].Holder, TemplateLevel) {
foundNodeID = nodeID
numFound++
}
Expand Down

0 comments on commit a48aeca

Please sign in to comment.