Skip to content

Commit

Permalink
Prune resources in reverse of sync wave order
Browse files Browse the repository at this point in the history
Signed-off-by: Siddhesh Ghadi <[email protected]>
  • Loading branch information
svghadi committed Sep 3, 2023
1 parent ed7c77a commit 3540566
Show file tree
Hide file tree
Showing 3 changed files with 344 additions and 6 deletions.
57 changes: 55 additions & 2 deletions pkg/sync/sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,18 @@ func (sc *syncContext) Sync() {
return
}

// if pruned tasks pending deletion, then wait...
prunedTasksPendingDelete := tasks.Filter(func(t *syncTask) bool {
if t.pruned() && t.liveObj != nil {
return t.liveObj.GetDeletionTimestamp() != nil
}
return false
})
if prunedTasksPendingDelete.Len() > 0 {
sc.setRunningPhase(prunedTasksPendingDelete, true)
return
}

// collect all completed hooks which have appropriate delete policy
hooksPendingDeletionSuccessful := tasks.Filter(func(task *syncTask) bool {
return task.isHook() && task.liveObj != nil && !task.running() && task.deleteOnPhaseSuccessful()
Expand Down Expand Up @@ -747,11 +759,52 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
}
}

// for pruneLast tasks, modify the wave to sync phase last wave of non prune task +1
// for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order)
pruneTasks := make(map[int][]*syncTask)
for _, task := range tasks {
if task.isPrune() {
pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task)
}
}

var uniquePruneWaves []int
for k := range pruneTasks {
uniquePruneWaves = append(uniquePruneWaves, k)
}
sort.Ints(uniquePruneWaves)

// reorder waves for pruning tasks using symmetric swap on prune waves
n := len(uniquePruneWaves)
for i := 0; i < n/2; i++ {
// waves to swap
startWave := uniquePruneWaves[i]
endWave := uniquePruneWaves[n-1-i]

for _, task := range pruneTasks[startWave] {
annotations := task.liveObj.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations[common.AnnotationSyncWave] = strconv.Itoa(endWave)
task.liveObj.SetAnnotations(annotations)
}

for _, task := range pruneTasks[endWave] {
annotations := task.liveObj.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations[common.AnnotationSyncWave] = strconv.Itoa(startWave)
task.liveObj.SetAnnotations(annotations)
}
}

// for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1
// to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave
syncPhaseLastWave := 0
for _, task := range tasks {
if task.phase == common.SyncPhaseSync {
if task.wave() > syncPhaseLastWave && !task.isPrune() {
if task.wave() > syncPhaseLastWave {
syncPhaseLastWave = task.wave()
}
}
Expand Down
289 changes: 285 additions & 4 deletions pkg/sync/sync_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http/httptest"
"reflect"
"testing"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -1583,8 +1584,8 @@ func TestPruneLast(t *testing.T) {

assert.True(t, successful)
assert.Len(t, tasks, 3)
// last wave is the last sync wave for non-prune task + 1
assert.Equal(t, 3, tasks.lastWave())
// last wave is the last sync wave for tasks + 1
assert.Equal(t, 8, tasks.lastWave())
})

t.Run("pruneLastIndividualResources", func(t *testing.T) {
Expand All @@ -1601,8 +1602,8 @@ func TestPruneLast(t *testing.T) {

assert.True(t, successful)
assert.Len(t, tasks, 3)
// last wave is the last sync wave for non-prune task + 1
assert.Equal(t, 3, tasks.lastWave())
// last wave is the last sync wave for tasks + 1
assert.Equal(t, 8, tasks.lastWave())
})
}

Expand Down Expand Up @@ -1686,3 +1687,283 @@ func TestSetOperationFailedNoTasks(t *testing.T) {
assert.Equal(t, sc.message, "one or more objects failed to apply")

}

func TestWaveReorderingOfPruneTasks(t *testing.T) {

ns := NewNamespace()
ns.SetName("ns")
pod1 := NewPod()
pod1.SetName("pod-1")
pod2 := NewPod()
pod2.SetName("pod-2")
pod3 := NewPod()
pod3.SetName("pod-3")
pod4 := NewPod()
pod4.SetName("pod-4")
pod5 := NewPod()
pod5.SetName("pod-5")
pod6 := NewPod()
pod6.SetName("pod-6")
pod7 := NewPod()
pod7.SetName("pod-7")

type Test struct {
name string
target []*unstructured.Unstructured
live []*unstructured.Unstructured
expectedWaveOrder map[string]int
pruneLast bool
}
runTest := func(test Test) {
t.Run(test.name, func(t *testing.T) {
syncCtx := newTestSyncCtx(nil)
syncCtx.pruneLast = test.pruneLast
syncCtx.resources = groupResources(ReconciliationResult{
Live: test.live,
Target: test.target,
})
tasks, successful := syncCtx.getSyncTasks()

assert.True(t, successful)
assert.Len(t, tasks, len(test.target))

for _, task := range tasks {
assert.Equal(t, test.expectedWaveOrder[task.name()], task.wave())
}
})
}

// same wave
sameWaveTests := []Test{
{
name: "sameWave_noPruneTasks",
live: []*unstructured.Unstructured{nil, nil, nil, nil, nil},
target: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4},
// no change in wave order
expectedWaveOrder: map[string]int{ns.GetName(): 0, pod1.GetName(): 0, pod2.GetName(): 0, pod3.GetName(): 0, pod4.GetName(): 0},
},
{
name: "sameWave_allPruneTasks",
live: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4},
target: []*unstructured.Unstructured{nil, nil, nil, nil, nil},
// no change in wave order
expectedWaveOrder: map[string]int{ns.GetName(): 0, pod1.GetName(): 0, pod2.GetName(): 0, pod3.GetName(): 0, pod4.GetName(): 0},
},
{
name: "sameWave_mixedTasks",
live: []*unstructured.Unstructured{ns, pod1, nil, pod3, pod4},
target: []*unstructured.Unstructured{ns, nil, pod2, nil, nil},
// no change in wave order
expectedWaveOrder: map[string]int{ns.GetName(): 0, pod1.GetName(): 0, pod2.GetName(): 0, pod3.GetName(): 0, pod4.GetName(): 0},
},
}

for _, test := range sameWaveTests {
runTest(test)
}

// different wave
differentWaveTests := []Test{
{
name: "differentWave_noPruneTasks",
target: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4},
live: []*unstructured.Unstructured{nil, nil, nil, nil, nil},
// no change in wave order
expectedWaveOrder: map[string]int{
// new wave // original wave
ns.GetName(): 0, // 0
pod1.GetName(): 1, // 1
pod2.GetName(): 2, // 2
pod3.GetName(): 3, // 3
pod4.GetName(): 4, // 4
},
},
{
name: "differentWave_allPruneTasks",
target: []*unstructured.Unstructured{nil, nil, nil, nil, nil},
live: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4},
// change in prune wave order
expectedWaveOrder: map[string]int{
// new wave // original wave
ns.GetName(): 4, // 0
pod1.GetName(): 3, // 1
pod2.GetName(): 2, // 2
pod3.GetName(): 1, // 3
pod4.GetName(): 0, // 4
},
},
{
name: "differentWave_mixedTasks",
target: []*unstructured.Unstructured{ns, nil, pod2, nil, nil},
live: []*unstructured.Unstructured{ns, pod1, nil, pod3, pod4},
// change in prune wave order
expectedWaveOrder: map[string]int{
// new wave // original wave
pod1.GetName(): 4, // 1
pod3.GetName(): 3, // 3
pod4.GetName(): 1, // 4

// no change since non prune tasks
ns.GetName(): 0, // 0
pod2.GetName(): 2, // 2
},
},
}

for _, test := range differentWaveTests {
ns.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "0"})
pod1.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "1"})
pod2.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "2"})
pod3.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3"})
pod4.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "4"})

runTest(test)
}

// prune last
pruneLastTests := []Test{
{
name: "pruneLast",
pruneLast: true,
live: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4},
target: []*unstructured.Unstructured{ns, nil, nil, nil, nil},
// change in prune wave order
expectedWaveOrder: map[string]int{
// new wave // original wave
pod1.GetName(): 5, // 1
pod2.GetName(): 5, // 2
pod3.GetName(): 5, // 3
pod4.GetName(): 5, // 4

// no change since non prune tasks
ns.GetName(): 0, // 0
},
},
{
name: "pruneLastIndividualResources",
pruneLast: false,
live: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4},
target: []*unstructured.Unstructured{ns, nil, nil, nil, nil},
// change in wave order
expectedWaveOrder: map[string]int{
// new wave // original wave
pod1.GetName(): 4, // 1
pod2.GetName(): 5, // 2
pod3.GetName(): 2, // 3
pod4.GetName(): 1, // 4

// no change since non prune tasks
ns.GetName(): 0, // 0
},
},
}

for _, test := range pruneLastTests {
ns.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "0"})
pod1.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "1"})
pod2.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "2", synccommon.AnnotationSyncOptions: synccommon.SyncOptionPruneLast})
pod3.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3"})
pod4.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "4"})

runTest(test)
}

// additional test
tests := []Test{
{
name: "mixedTasks",
target: []*unstructured.Unstructured{ns, nil, pod2, nil, nil, nil, pod6, nil},
live: []*unstructured.Unstructured{ns, pod1, nil, pod3, pod4, pod5, pod6, pod7},
// change in prune wave order
expectedWaveOrder: map[string]int{
// new wave // original wave
pod1.GetName(): 5, // 1
pod3.GetName(): 4, // 3
pod4.GetName(): 4, // 3
pod5.GetName(): 3, // 4
pod7.GetName(): 1, // 5

// no change since non prune tasks
ns.GetName(): -1, // -1
pod2.GetName(): 3, // 3
pod6.GetName(): 5, // 5
},
},
}
for _, test := range tests {
ns.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "-1"})
pod1.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "1"})
pod2.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3"})
pod3.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3"})
pod4.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3"})
pod5.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "4"})
pod6.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "5"})
pod7.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "5"})

runTest(test)
}

}

func TestWaitForCleanUpBeforeNextWave(t *testing.T) {

pod1 := NewPod()
pod1.SetName("pod-1")
pod2 := NewPod()
pod2.SetName("pod-2")

pod1.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "1"})
pod2.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "2"})

syncCtx := newTestSyncCtx(nil)
syncCtx.prune = true

// prune order : pod2 -> pod1
syncCtx.resources = groupResources(ReconciliationResult{
Target: []*unstructured.Unstructured{nil, nil},
Live: []*unstructured.Unstructured{pod1, pod2},
})

var phase common.OperationPhase
var msg string
var result []common.ResourceSyncResult

// 1st sync should prune only pod2
syncCtx.Sync()
phase, _, result = syncCtx.GetState()
assert.Equal(t, synccommon.OperationRunning, phase)
assert.Equal(t, 1, len(result))
assert.Equal(t, "pod-2", result[0].ResourceKey.Name)
assert.Equal(t, synccommon.ResultCodePruned, result[0].Status)

// add delete timestamp on pod2 to simulate pending delete
pod2.SetDeletionTimestamp(&metav1.Time{Time: time.Now()})

// next sync should wait for deletion of pod2 from cluster,
// it should not move to next wave and prune pod1
syncCtx.Sync()
phase, msg, result = syncCtx.GetState()
assert.Equal(t, synccommon.OperationRunning, phase)
assert.Equal(t, "waiting for deletion of /Pod/pod-2", msg)
assert.Equal(t, 1, len(result))

// simulate successful delete of pod2
pod2.SetDeletionTimestamp(nil)
pod2 = nil
syncCtx.resources = groupResources(ReconciliationResult{
Target: []*unstructured.Unstructured{nil, },
Live: []*unstructured.Unstructured{pod1, },
})

// next sync should proceed with next wave
// i.e deletion of pod1
syncCtx.Sync()
phase, _, result = syncCtx.GetState()
assert.Equal(t, synccommon.OperationSucceeded, phase)
assert.Equal(t, 2, len(result))
assert.Equal(t, "pod-2", result[0].ResourceKey.Name)
assert.Equal(t, "pod-1", result[1].ResourceKey.Name)
assert.Equal(t, synccommon.ResultCodePruned, result[0].Status)
assert.Equal(t, synccommon.ResultCodePruned, result[1].Status)

}
4 changes: 4 additions & 0 deletions pkg/sync/sync_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func (t *syncTask) successful() bool {
return t.operationState.Successful()
}

func (t *syncTask) pruned() bool {
return t.syncStatus == common.ResultCodePruned
}

func (t *syncTask) hookType() common.HookType {
if t.isHook() {
return common.HookType(t.phase)
Expand Down

0 comments on commit 3540566

Please sign in to comment.