Skip to content

Commit

Permalink
Improve a couple of workqueue usages (#6692)
Browse files Browse the repository at this point in the history
- In multicluster tests, stop using a pointer to the Custom Resource as
  the workqueue key, and instead use a NamespacedName value, which
  matches actual usage in the controller.
- In the PodStore implementation (which keeps track of Pod deletion
  events with timestamps), use the Pod's UID as the workqueue key,
  instead of a pointer to the Pod object. It is uncommon to use a
  pointer as the workqueue key (even though it is functionally correct
  here). By using the string UID as the key, we could theoretically
  reduce memory usage, as we don't need to hold the Pod object in memory
  for a delay of 5 minutes, and the object could be garbage collected
  right after deletion. The Flow Aggregator in particular uses the
  PodStore for all Pods in the cluster (while for the Antrea Agent it is
  only local Pods), so storing the UID instead of the whole Pod object
  could make a big difference.

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas committed Sep 27, 2024
1 parent 85e1c67 commit 279f8f7
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
k8smcsapi "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"

Expand Down Expand Up @@ -501,9 +501,8 @@ func TestResourceImportReconciler_handleUpdateEvent(t *testing.T) {
// Reconcile loop. Once the fakeManager is run, all ResourceImports in the queue will be added
// into the fakeRemoteClient's cache, and all these events will be reconciled.
type fakeManager struct {
remoteClient client.Client
reconciler *LabelIdentityResourceImportReconciler
queue workqueue.TypedRateLimitingInterface[*mcv1alpha1.ResourceImport]
reconciler *LabelIdentityResourceImportReconciler
queue workqueue.TypedRateLimitingInterface[types.NamespacedName]
}

func (fm *fakeManager) Run(stopCh <-chan struct{}) {
Expand All @@ -520,23 +519,18 @@ func (fm *fakeManager) worker() {
}

func (fm *fakeManager) syncNextItemInQueue() bool {
resImp, quit := fm.queue.Get()
key, quit := fm.queue.Get()
if quit {
return false
}
defer fm.queue.Done(resImp)
err := fm.remoteClient.Create(ctx, resImp)
if err != nil {
fm.queue.AddRateLimited(resImp)
return true
}
defer fm.queue.Done(key)
// Simulate ResourceImport create event and have LabelIdentityResourceImportReconciler reconcile it.
req := ctrl.Request{NamespacedName: types.NamespacedName{Namespace: resImp.Namespace, Name: resImp.Name}}
if _, err = fm.reconciler.Reconcile(ctx, req); err != nil {
fm.queue.AddRateLimited(resImp)
req := ctrl.Request{NamespacedName: key}
if _, err := fm.reconciler.Reconcile(ctx, req); err != nil {
fm.queue.AddRateLimited(key)
return true
}
fm.queue.Forget(resImp)
fm.queue.Forget(key)
return true
}

Expand All @@ -560,10 +554,11 @@ func TestStaleControllerNoRaceWithResourceImportReconciler(t *testing.T) {

stopCh := make(chan struct{})
defer close(stopCh)
q := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedItemBasedRateLimiter[*mcv1alpha1.ResourceImport]())
q := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedItemBasedRateLimiter[types.NamespacedName]())
const numInitialResImp = 50
resImps := make([]*mcv1alpha1.ResourceImport, 0, numInitialResImp)
for i := uint32(1); i <= numInitialResImp; i++ {
resImp := &mcv1alpha1.ResourceImport{
resImps = append(resImps, &mcv1alpha1.ResourceImport{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("label-identity-%d", i),
Namespace: "antrea-mcs",
Expand All @@ -574,13 +569,18 @@ func TestStaleControllerNoRaceWithResourceImportReconciler(t *testing.T) {
ID: i,
},
},
}
q.Add(resImp)
})
}
for _, resImp := range resImps {
require.NoError(t, fakeRemoteClient.Create(ctx, resImp))
}
// Create a burst of events
for _, resImp := range resImps {
q.Add(types.NamespacedName{Namespace: resImp.Namespace, Name: resImp.Name})
}
mgr := fakeManager{
reconciler: r,
remoteClient: fakeRemoteClient,
queue: q,
reconciler: r,
queue: q,
}
// Give the fakeManager a head start. LabelIdentityResourceImportReconciler should be busy
// reconciling all new ResourceImport events.
Expand All @@ -591,7 +591,7 @@ func TestStaleControllerNoRaceWithResourceImportReconciler(t *testing.T) {
time.Sleep(1 * time.Second)
actLabelIdentities := &mcv1alpha1.LabelIdentityList{}
err := fakeClient.List(ctx, actLabelIdentities)
assert.NoError(t, err)
require.NoError(t, err)
// Verify that no LabelIdentities are deleted as part of the cleanup.
assert.Equal(t, numInitialResImp, len(actLabelIdentities.Items))
assert.Len(t, actLabelIdentities.Items, numInitialResImp)
}
33 changes: 20 additions & 13 deletions pkg/util/podstore/podstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ const (

type PodStore struct {
pods cache.Indexer
podsToDelete workqueue.TypedDelayingInterface[*corev1.Pod]
podsToDelete workqueue.TypedDelayingInterface[types.UID]
delayTime time.Duration
// Mapping pod.uuid to podTimestamps
timestampMap map[types.UID]*podTimestamps
clock clock.Clock
Expand All @@ -60,10 +61,11 @@ type Interface interface {
func NewPodStoreWithClock(podInformer cache.SharedIndexInformer, clock clock.WithTicker) *PodStore {
s := &PodStore{
pods: cache.NewIndexer(podKeyFunc, cache.Indexers{podIPIndex: podIPIndexFunc}),
podsToDelete: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[*corev1.Pod]{
podsToDelete: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[types.UID]{
Name: deleteQueueName,
Clock: clock,
}),
delayTime: delayTime,
clock: clock,
timestampMap: map[types.UID]*podTimestamps{},
mutex: sync.RWMutex{},
Expand Down Expand Up @@ -137,7 +139,7 @@ func (s *PodStore) onPodDelete(obj interface{}) {
return
}
timestamp.DeletionTimestamp = &timeNow
s.podsToDelete.AddAfter(pod, delayTime)
s.podsToDelete.AddAfter(pod.UID, s.delayTime)
klog.V(4).InfoS("Processed Pod Delete Event", "Pod", klog.KObj(pod))
}

Expand Down Expand Up @@ -189,25 +191,30 @@ func (s *PodStore) Run(stopCh <-chan struct{}) {
// worker runs a worker thread that just dequeues item from deleteQueue and
// remove the item from prevPod.
func (s *PodStore) worker() {
for s.processDeleteQueueItem() {
// Use the same object in each worker to delete from the indexer by key
// (UID), as there is no reason to allocate a new object for each call
// to processDeleteQueueItem.
podDeletionKey := &corev1.Pod{}
for s.processDeleteQueueItem(podDeletionKey) {
}
}

func (s *PodStore) processDeleteQueueItem() bool {
pod, quit := s.podsToDelete.Get()
func (s *PodStore) processDeleteQueueItem(podDeletionKey *corev1.Pod) bool {
podUID, quit := s.podsToDelete.Get()
if quit {
return false
}
defer s.podsToDelete.Done(podUID)
pod := podDeletionKey
pod.UID = podUID
s.mutex.Lock()
defer s.mutex.Unlock()
err := s.pods.Delete(pod)
if err != nil {
klog.ErrorS(err, "Error when deleting Pod from deletion workqueue", "Pod", klog.KObj(pod))
return false
if err := s.pods.Delete(pod); err != nil {
klog.ErrorS(err, "Error when deleting Pod from store", "key", podUID)
return true
}
delete(s.timestampMap, pod.UID)
s.podsToDelete.Done(pod)
klog.V(4).InfoS("Removed Pod from Pod Store", "Pod", klog.KObj(pod))
delete(s.timestampMap, podUID)
klog.V(4).InfoS("Removed Pod from Pod Store", "UID", podUID)
return true
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/util/podstore/podstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,15 +330,16 @@ func Test_processDeleteQueueItem(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
podStore := &PodStore{
pods: cache.NewIndexer(podKeyFunc, cache.Indexers{podIPIndex: podIPIndexFunc}),
podsToDelete: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[*v1.Pod]{
podsToDelete: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[types.UID]{
Name: deleteQueueName,
Clock: fakeClock,
}),
clock: fakeClock,
timestampMap: map[types.UID]*podTimestamps{"pod1": {}},
}
require.NoError(t, podStore.pods.Add(pod1))
podStore.podsToDelete.Add(pod1)
result := podStore.processDeleteQueueItem()
podStore.podsToDelete.Add(pod1.UID)
result := podStore.processDeleteQueueItem(&v1.Pod{})
require.Equal(t, true, result)
assert.Equal(t, 0, podStore.podsToDelete.Len())
assert.Len(t, podStore.pods.List(), 0)
Expand Down

0 comments on commit 279f8f7

Please sign in to comment.