From 3540566363ac63d91016576b2904b946bdd3ccfd Mon Sep 17 00:00:00 2001 From: Siddhesh Ghadi Date: Tue, 15 Aug 2023 19:45:37 +0530 Subject: [PATCH] Prune resources in reverse of sync wave order Signed-off-by: Siddhesh Ghadi --- pkg/sync/sync_context.go | 57 ++++++- pkg/sync/sync_context_test.go | 289 +++++++++++++++++++++++++++++++++- pkg/sync/sync_task.go | 4 + 3 files changed, 344 insertions(+), 6 deletions(-) diff --git a/pkg/sync/sync_context.go b/pkg/sync/sync_context.go index 50e23bc46..38fc8747b 100644 --- a/pkg/sync/sync_context.go +++ b/pkg/sync/sync_context.go @@ -457,6 +457,18 @@ func (sc *syncContext) Sync() { return } + // if pruned tasks pending deletion, then wait... + prunedTasksPendingDelete := tasks.Filter(func(t *syncTask) bool { + if t.pruned() && t.liveObj != nil { + return t.liveObj.GetDeletionTimestamp() != nil + } + return false + }) + if prunedTasksPendingDelete.Len() > 0 { + sc.setRunningPhase(prunedTasksPendingDelete, true) + return + } + // collect all completed hooks which have appropriate delete policy hooksPendingDeletionSuccessful := tasks.Filter(func(task *syncTask) bool { return task.isHook() && task.liveObj != nil && !task.running() && task.deleteOnPhaseSuccessful() @@ -747,11 +759,52 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { } } - // for pruneLast tasks, modify the wave to sync phase last wave of non prune task +1 + // for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order) + pruneTasks := make(map[int][]*syncTask) + for _, task := range tasks { + if task.isPrune() { + pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task) + } + } + + var uniquePruneWaves []int + for k := range pruneTasks { + uniquePruneWaves = append(uniquePruneWaves, k) + } + sort.Ints(uniquePruneWaves) + + // reorder waves for pruning tasks using symmetric swap on prune waves + n := len(uniquePruneWaves) + for i := 0; i < n/2; i++ { + // waves to swap + startWave := uniquePruneWaves[i] + endWave := uniquePruneWaves[n-1-i] + + for _, task := range pruneTasks[startWave] { + annotations := task.liveObj.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + annotations[common.AnnotationSyncWave] = strconv.Itoa(endWave) + task.liveObj.SetAnnotations(annotations) + } + + for _, task := range pruneTasks[endWave] { + annotations := task.liveObj.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + annotations[common.AnnotationSyncWave] = strconv.Itoa(startWave) + task.liveObj.SetAnnotations(annotations) + } + } + + // for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1 + // to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave syncPhaseLastWave := 0 for _, task := range tasks { if task.phase == common.SyncPhaseSync { - if task.wave() > syncPhaseLastWave && !task.isPrune() { + if task.wave() > syncPhaseLastWave { syncPhaseLastWave = task.wave() } } diff --git a/pkg/sync/sync_context_test.go b/pkg/sync/sync_context_test.go index 0c2e1e14c..83c0ad7b8 100644 --- a/pkg/sync/sync_context_test.go +++ b/pkg/sync/sync_context_test.go @@ -9,6 +9,7 @@ import ( "net/http/httptest" "reflect" "testing" + "time" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" @@ -1583,8 +1584,8 @@ func TestPruneLast(t *testing.T) { assert.True(t, successful) assert.Len(t, tasks, 3) - // last wave is the last sync wave for non-prune task + 1 - assert.Equal(t, 3, tasks.lastWave()) + // last wave is the last sync wave for tasks + 1 + assert.Equal(t, 8, tasks.lastWave()) }) t.Run("pruneLastIndividualResources", func(t *testing.T) { @@ -1601,8 +1602,8 @@ func TestPruneLast(t *testing.T) { assert.True(t, successful) assert.Len(t, tasks, 3) - // last wave is the last sync wave for non-prune task + 1 - assert.Equal(t, 3, tasks.lastWave()) + // last wave is the last sync wave for tasks + 1 + assert.Equal(t, 8, tasks.lastWave()) }) } @@ -1686,3 +1687,283 @@ func TestSetOperationFailedNoTasks(t *testing.T) { assert.Equal(t, sc.message, "one or more objects failed to apply") } + +func TestWaveReorderingOfPruneTasks(t *testing.T) { + + ns := NewNamespace() + ns.SetName("ns") + pod1 := NewPod() + pod1.SetName("pod-1") + pod2 := NewPod() + pod2.SetName("pod-2") + pod3 := NewPod() + pod3.SetName("pod-3") + pod4 := NewPod() + pod4.SetName("pod-4") + pod5 := NewPod() + pod5.SetName("pod-5") + pod6 := NewPod() + pod6.SetName("pod-6") + pod7 := NewPod() + pod7.SetName("pod-7") + + type Test struct { + name string + target []*unstructured.Unstructured + live []*unstructured.Unstructured + expectedWaveOrder map[string]int + pruneLast bool + } + runTest := func(test Test) { + t.Run(test.name, func(t *testing.T) { + syncCtx := newTestSyncCtx(nil) + syncCtx.pruneLast = test.pruneLast + syncCtx.resources = groupResources(ReconciliationResult{ + Live: test.live, + Target: test.target, + }) + tasks, successful := syncCtx.getSyncTasks() + + assert.True(t, successful) + assert.Len(t, tasks, len(test.target)) + + for _, task := range tasks { + assert.Equal(t, test.expectedWaveOrder[task.name()], task.wave()) + } + }) + } + + // same wave + sameWaveTests := []Test{ + { + name: "sameWave_noPruneTasks", + live: []*unstructured.Unstructured{nil, nil, nil, nil, nil}, + target: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4}, + // no change in wave order + expectedWaveOrder: map[string]int{ns.GetName(): 0, pod1.GetName(): 0, pod2.GetName(): 0, pod3.GetName(): 0, pod4.GetName(): 0}, + }, + { + name: "sameWave_allPruneTasks", + live: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4}, + target: []*unstructured.Unstructured{nil, nil, nil, nil, nil}, + // no change in wave order + expectedWaveOrder: map[string]int{ns.GetName(): 0, pod1.GetName(): 0, pod2.GetName(): 0, pod3.GetName(): 0, pod4.GetName(): 0}, + }, + { + name: "sameWave_mixedTasks", + live: []*unstructured.Unstructured{ns, pod1, nil, pod3, pod4}, + target: []*unstructured.Unstructured{ns, nil, pod2, nil, nil}, + // no change in wave order + expectedWaveOrder: map[string]int{ns.GetName(): 0, pod1.GetName(): 0, pod2.GetName(): 0, pod3.GetName(): 0, pod4.GetName(): 0}, + }, + } + + for _, test := range sameWaveTests { + runTest(test) + } + + // different wave + differentWaveTests := []Test{ + { + name: "differentWave_noPruneTasks", + target: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4}, + live: []*unstructured.Unstructured{nil, nil, nil, nil, nil}, + // no change in wave order + expectedWaveOrder: map[string]int{ + // new wave // original wave + ns.GetName(): 0, // 0 + pod1.GetName(): 1, // 1 + pod2.GetName(): 2, // 2 + pod3.GetName(): 3, // 3 + pod4.GetName(): 4, // 4 + }, + }, + { + name: "differentWave_allPruneTasks", + target: []*unstructured.Unstructured{nil, nil, nil, nil, nil}, + live: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4}, + // change in prune wave order + expectedWaveOrder: map[string]int{ + // new wave // original wave + ns.GetName(): 4, // 0 + pod1.GetName(): 3, // 1 + pod2.GetName(): 2, // 2 + pod3.GetName(): 1, // 3 + pod4.GetName(): 0, // 4 + }, + }, + { + name: "differentWave_mixedTasks", + target: []*unstructured.Unstructured{ns, nil, pod2, nil, nil}, + live: []*unstructured.Unstructured{ns, pod1, nil, pod3, pod4}, + // change in prune wave order + expectedWaveOrder: map[string]int{ + // new wave // original wave + pod1.GetName(): 4, // 1 + pod3.GetName(): 3, // 3 + pod4.GetName(): 1, // 4 + + // no change since non prune tasks + ns.GetName(): 0, // 0 + pod2.GetName(): 2, // 2 + }, + }, + } + + for _, test := range differentWaveTests { + ns.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "0"}) + pod1.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "1"}) + pod2.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "2"}) + pod3.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3"}) + pod4.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "4"}) + + runTest(test) + } + + // prune last + pruneLastTests := []Test{ + { + name: "pruneLast", + pruneLast: true, + live: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4}, + target: []*unstructured.Unstructured{ns, nil, nil, nil, nil}, + // change in prune wave order + expectedWaveOrder: map[string]int{ + // new wave // original wave + pod1.GetName(): 5, // 1 + pod2.GetName(): 5, // 2 + pod3.GetName(): 5, // 3 + pod4.GetName(): 5, // 4 + + // no change since non prune tasks + ns.GetName(): 0, // 0 + }, + }, + { + name: "pruneLastIndividualResources", + pruneLast: false, + live: []*unstructured.Unstructured{ns, pod1, pod2, pod3, pod4}, + target: []*unstructured.Unstructured{ns, nil, nil, nil, nil}, + // change in wave order + expectedWaveOrder: map[string]int{ + // new wave // original wave + pod1.GetName(): 4, // 1 + pod2.GetName(): 5, // 2 + pod3.GetName(): 2, // 3 + pod4.GetName(): 1, // 4 + + // no change since non prune tasks + ns.GetName(): 0, // 0 + }, + }, + } + + for _, test := range pruneLastTests { + ns.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "0"}) + pod1.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "1"}) + pod2.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "2", synccommon.AnnotationSyncOptions: synccommon.SyncOptionPruneLast}) + pod3.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3"}) + pod4.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "4"}) + + runTest(test) + } + + // additional test + tests := []Test{ + { + name: "mixedTasks", + target: []*unstructured.Unstructured{ns, nil, pod2, nil, nil, nil, pod6, nil}, + live: []*unstructured.Unstructured{ns, pod1, nil, pod3, pod4, pod5, pod6, pod7}, + // change in prune wave order + expectedWaveOrder: map[string]int{ + // new wave // original wave + pod1.GetName(): 5, // 1 + pod3.GetName(): 4, // 3 + pod4.GetName(): 4, // 3 + pod5.GetName(): 3, // 4 + pod7.GetName(): 1, // 5 + + // no change since non prune tasks + ns.GetName(): -1, // -1 + pod2.GetName(): 3, // 3 + pod6.GetName(): 5, // 5 + }, + }, + } + for _, test := range tests { + ns.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "-1"}) + pod1.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "1"}) + pod2.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3"}) + pod3.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3"}) + pod4.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3"}) + pod5.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "4"}) + pod6.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "5"}) + pod7.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "5"}) + + runTest(test) + } + +} + +func TestWaitForCleanUpBeforeNextWave(t *testing.T) { + + pod1 := NewPod() + pod1.SetName("pod-1") + pod2 := NewPod() + pod2.SetName("pod-2") + + pod1.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "1"}) + pod2.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "2"}) + + syncCtx := newTestSyncCtx(nil) + syncCtx.prune = true + + // prune order : pod2 -> pod1 + syncCtx.resources = groupResources(ReconciliationResult{ + Target: []*unstructured.Unstructured{nil, nil}, + Live: []*unstructured.Unstructured{pod1, pod2}, + }) + + var phase common.OperationPhase + var msg string + var result []common.ResourceSyncResult + + // 1st sync should prune only pod2 + syncCtx.Sync() + phase, _, result = syncCtx.GetState() + assert.Equal(t, synccommon.OperationRunning, phase) + assert.Equal(t, 1, len(result)) + assert.Equal(t, "pod-2", result[0].ResourceKey.Name) + assert.Equal(t, synccommon.ResultCodePruned, result[0].Status) + + // add delete timestamp on pod2 to simulate pending delete + pod2.SetDeletionTimestamp(&metav1.Time{Time: time.Now()}) + + // next sync should wait for deletion of pod2 from cluster, + // it should not move to next wave and prune pod1 + syncCtx.Sync() + phase, msg, result = syncCtx.GetState() + assert.Equal(t, synccommon.OperationRunning, phase) + assert.Equal(t, "waiting for deletion of /Pod/pod-2", msg) + assert.Equal(t, 1, len(result)) + + // simulate successful delete of pod2 + pod2.SetDeletionTimestamp(nil) + pod2 = nil + syncCtx.resources = groupResources(ReconciliationResult{ + Target: []*unstructured.Unstructured{nil, }, + Live: []*unstructured.Unstructured{pod1, }, + }) + + // next sync should proceed with next wave + // i.e deletion of pod1 + syncCtx.Sync() + phase, _, result = syncCtx.GetState() + assert.Equal(t, synccommon.OperationSucceeded, phase) + assert.Equal(t, 2, len(result)) + assert.Equal(t, "pod-2", result[0].ResourceKey.Name) + assert.Equal(t, "pod-1", result[1].ResourceKey.Name) + assert.Equal(t, synccommon.ResultCodePruned, result[0].Status) + assert.Equal(t, synccommon.ResultCodePruned, result[1].Status) + +} diff --git a/pkg/sync/sync_task.go b/pkg/sync/sync_task.go index f6bebb56f..01c67a98b 100644 --- a/pkg/sync/sync_task.go +++ b/pkg/sync/sync_task.go @@ -107,6 +107,10 @@ func (t *syncTask) successful() bool { return t.operationState.Successful() } +func (t *syncTask) pruned() bool { + return t.syncStatus == common.ResultCodePruned +} + func (t *syncTask) hookType() common.HookType { if t.isHook() { return common.HookType(t.phase)