Skip to content

Commit 57c2548

Browse files
authored
Merge pull request #252 from haoranleo/fetch-SA-from-apiserver
Fetch SA from apiserver
2 parents 9f13e26 + c249cc6 commit 57c2548

File tree

8 files changed

+310
-30
lines changed

8 files changed

+310
-30
lines changed

main.go

+1
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ func main() {
181181
saInformer,
182182
cmInformer,
183183
composeRoleArnCache,
184+
clientset.CoreV1(),
184185
)
185186
stop := make(chan struct{})
186187
informerFactory.Start(stop)

pkg/cache/cache.go

+66-20
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,27 @@
1616
package cache
1717

1818
import (
19+
"context"
1920
"encoding/json"
2021
"fmt"
2122
"regexp"
2223
"strconv"
2324
"strings"
2425
"sync"
26+
"time"
2527

2628
"github.com/aws/amazon-eks-pod-identity-webhook/pkg"
2729
"github.com/prometheus/client_golang/prometheus"
30+
"golang.org/x/time/rate"
2831
v1 "k8s.io/api/core/v1"
32+
"k8s.io/apimachinery/pkg/api/errors"
33+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2934
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3035
coreinformers "k8s.io/client-go/informers/core/v1"
3136
"k8s.io/client-go/kubernetes"
37+
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
3238
"k8s.io/client-go/tools/cache"
39+
"k8s.io/client-go/util/retry"
3340
"k8s.io/klog/v2"
3441
)
3542

@@ -80,8 +87,7 @@ type serviceAccountCache struct {
8087
composeRoleArn ComposeRoleArn
8188
defaultTokenExpiration int64
8289
webhookUsage prometheus.Gauge
83-
notificationHandlers map[string]chan struct{}
84-
handlerMu sync.Mutex
90+
notifications *notifications
8591
}
8692

8793
type ComposeRoleArn struct {
@@ -159,20 +165,13 @@ func (c *serviceAccountCache) GetCommonConfigurations(name, namespace string) (u
159165
return false, pkg.DefaultTokenExpiration
160166
}
161167

162-
func (c *serviceAccountCache) getSA(req Request) (*Entry, chan struct{}) {
168+
func (c *serviceAccountCache) getSA(req Request) (*Entry, <-chan struct{}) {
163169
c.mu.RLock()
164170
defer c.mu.RUnlock()
165171
entry, ok := c.saCache[req.CacheKey()]
166172
if !ok && req.RequestNotification {
167173
klog.V(5).Infof("Service Account %s not found in cache, adding notification handler", req.CacheKey())
168-
c.handlerMu.Lock()
169-
defer c.handlerMu.Unlock()
170-
notifier, found := c.notificationHandlers[req.CacheKey()]
171-
if !found {
172-
notifier = make(chan struct{})
173-
c.notificationHandlers[req.CacheKey()] = notifier
174-
}
175-
return nil, notifier
174+
return nil, c.notifications.create(req)
176175
}
177176
return entry, nil
178177
}
@@ -267,13 +266,7 @@ func (c *serviceAccountCache) setSA(name, namespace string, entry *Entry) {
267266
klog.V(5).Infof("Adding SA %q to SA cache: %+v", key, entry)
268267
c.saCache[key] = entry
269268

270-
c.handlerMu.Lock()
271-
defer c.handlerMu.Unlock()
272-
if handler, found := c.notificationHandlers[key]; found {
273-
klog.V(5).Infof("Notifying handlers for %q", key)
274-
close(handler)
275-
delete(c.notificationHandlers, key)
276-
}
269+
c.notifications.broadcast(key)
277270
}
278271

279272
func (c *serviceAccountCache) setCM(name, namespace string, entry *Entry) {
@@ -283,7 +276,15 @@ func (c *serviceAccountCache) setCM(name, namespace string, entry *Entry) {
283276
c.cmCache[namespace+"/"+name] = entry
284277
}
285278

286-
func New(defaultAudience, prefix string, defaultRegionalSTS bool, defaultTokenExpiration int64, saInformer coreinformers.ServiceAccountInformer, cmInformer coreinformers.ConfigMapInformer, composeRoleArn ComposeRoleArn) ServiceAccountCache {
279+
func New(defaultAudience,
280+
prefix string,
281+
defaultRegionalSTS bool,
282+
defaultTokenExpiration int64,
283+
saInformer coreinformers.ServiceAccountInformer,
284+
cmInformer coreinformers.ConfigMapInformer,
285+
composeRoleArn ComposeRoleArn,
286+
SAGetter corev1.ServiceAccountsGetter,
287+
) ServiceAccountCache {
287288
hasSynced := func() bool {
288289
if cmInformer != nil {
289290
return saInformer.Informer().HasSynced() && cmInformer.Informer().HasSynced()
@@ -292,6 +293,9 @@ func New(defaultAudience, prefix string, defaultRegionalSTS bool, defaultTokenEx
292293
}
293294
}
294295

296+
// Allocate capacity large enough to not block writers (sync path in pod mutation).
297+
// Rate limiting is done in the consumer side below.
298+
saFetchRequests := make(chan *Request, 1000)
295299
c := &serviceAccountCache{
296300
saCache: map[string]*Entry{},
297301
cmCache: map[string]*Entry{},
@@ -302,9 +306,30 @@ func New(defaultAudience, prefix string, defaultRegionalSTS bool, defaultTokenEx
302306
defaultTokenExpiration: defaultTokenExpiration,
303307
hasSynced: hasSynced,
304308
webhookUsage: webhookUsage,
305-
notificationHandlers: map[string]chan struct{}{},
309+
notifications: newNotifications(saFetchRequests),
306310
}
307311

312+
// Rate limiting at 10 requests per second with burst to 20.
313+
// In case the requests are queued in the channel for period longer than the service-account-lookup-grace-period,
314+
// the pod will not be mutated if the service account is also not synced by informer cache before service-account-lookup-grace-period.
315+
// This is to avoid adding unlimited latency to the pod mutation time. The maximum latency would be service-account-lookup-grace-period.
316+
rl := rate.NewLimiter(rate.Every(100*time.Millisecond), 20)
317+
go func() {
318+
for req := range saFetchRequests {
319+
go func() {
320+
// Do rate limiting inside go routine, the goal is to consume the channel as fast as possible to
321+
// avoid writer being blocked but still rate limit the requests sent to the API server.
322+
_ = rl.Wait(context.Background())
323+
sa, err := fetchFromAPI(SAGetter, req)
324+
if err != nil {
325+
klog.Errorf("fetching SA: %s, but got error from API: %v", req.CacheKey(), err)
326+
return
327+
}
328+
c.addSA(sa)
329+
}()
330+
}
331+
}()
332+
308333
saInformer.Informer().AddEventHandler(
309334
cache.ResourceEventHandlerFuncs{
310335
AddFunc: func(obj interface{}) {
@@ -354,6 +379,27 @@ func New(defaultAudience, prefix string, defaultRegionalSTS bool, defaultTokenEx
354379
return c
355380
}
356381

382+
func fetchFromAPI(getter corev1.ServiceAccountsGetter, req *Request) (*v1.ServiceAccount, error) {
383+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
384+
defer cancel()
385+
386+
klog.V(5).Infof("fetching SA: %s", req.CacheKey())
387+
388+
var sa *v1.ServiceAccount
389+
err := retry.OnError(retry.DefaultBackoff, func(err error) bool {
390+
return errors.IsServerTimeout(err)
391+
}, func() error {
392+
res, err := getter.ServiceAccounts(req.Namespace).Get(ctx, req.Name, metav1.GetOptions{})
393+
if err != nil {
394+
return err
395+
}
396+
sa = res
397+
return nil
398+
})
399+
400+
return sa, err
401+
}
402+
357403
func (c *serviceAccountCache) populateCacheFromCM(oldCM, newCM *v1.ConfigMap) error {
358404
if newCM.Name != "pod-identity-webhook" {
359405
return nil

pkg/cache/cache_test.go

+88-9
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ func TestSaCache(t *testing.T) {
3535
defaultAudience: "sts.amazonaws.com",
3636
annotationPrefix: "eks.amazonaws.com",
3737
webhookUsage: prometheus.NewGauge(prometheus.GaugeOpts{}),
38+
notifications: newNotifications(make(chan *Request, 10)),
3839
}
3940

4041
resp := cache.Get(Request{Name: "default", Namespace: "default"})
@@ -69,9 +70,9 @@ func TestNotification(t *testing.T) {
6970

7071
t.Run("with one notification handler", func(t *testing.T) {
7172
cache := &serviceAccountCache{
72-
saCache: map[string]*Entry{},
73-
notificationHandlers: map[string]chan struct{}{},
74-
webhookUsage: prometheus.NewGauge(prometheus.GaugeOpts{}),
73+
saCache: map[string]*Entry{},
74+
webhookUsage: prometheus.NewGauge(prometheus.GaugeOpts{}),
75+
notifications: newNotifications(make(chan *Request, 10)),
7576
}
7677

7778
// test that the requested SA is not in the cache
@@ -106,9 +107,9 @@ func TestNotification(t *testing.T) {
106107

107108
t.Run("with 10 notification handlers", func(t *testing.T) {
108109
cache := &serviceAccountCache{
109-
saCache: map[string]*Entry{},
110-
notificationHandlers: map[string]chan struct{}{},
111-
webhookUsage: prometheus.NewGauge(prometheus.GaugeOpts{}),
110+
saCache: map[string]*Entry{},
111+
webhookUsage: prometheus.NewGauge(prometheus.GaugeOpts{}),
112+
notifications: newNotifications(make(chan *Request, 5)),
112113
}
113114

114115
// test that the requested SA is not in the cache
@@ -153,6 +154,63 @@ func TestNotification(t *testing.T) {
153154
})
154155
}
155156

157+
func TestFetchFromAPIServer(t *testing.T) {
158+
testSA := &v1.ServiceAccount{
159+
ObjectMeta: metav1.ObjectMeta{
160+
Name: "my-sa",
161+
Namespace: "default",
162+
Annotations: map[string]string{
163+
"eks.amazonaws.com/role-arn": "arn:aws:iam::111122223333:role/s3-reader",
164+
"eks.amazonaws.com/token-expiration": "3600",
165+
},
166+
},
167+
}
168+
fakeSAClient := fake.NewSimpleClientset(testSA)
169+
170+
// use an empty informer to simulate the need to fetch SA from api server:
171+
fakeEmptyClient := fake.NewSimpleClientset()
172+
emptyInformerFactory := informers.NewSharedInformerFactory(fakeEmptyClient, 0)
173+
emptyInformer := emptyInformerFactory.Core().V1().ServiceAccounts()
174+
175+
cache := New(
176+
"sts.amazonaws.com",
177+
"eks.amazonaws.com",
178+
true,
179+
86400,
180+
emptyInformer,
181+
nil,
182+
ComposeRoleArn{},
183+
fakeSAClient.CoreV1(),
184+
)
185+
186+
stop := make(chan struct{})
187+
emptyInformerFactory.Start(stop)
188+
emptyInformerFactory.WaitForCacheSync(stop)
189+
cache.Start(stop)
190+
defer close(stop)
191+
192+
err := wait.ExponentialBackoff(wait.Backoff{Duration: 10 * time.Millisecond, Factor: 1.0, Steps: 3}, func() (bool, error) {
193+
return len(fakeEmptyClient.Actions()) != 0, nil
194+
})
195+
if err != nil {
196+
t.Fatalf("informer never called client: %v", err)
197+
}
198+
199+
resp := cache.Get(Request{Name: "my-sa", Namespace: "default", RequestNotification: true})
200+
assert.False(t, resp.FoundInCache, "Expected cache entry to not be found")
201+
202+
// wait for the notification while we fetch the SA from the API server:
203+
select {
204+
case <-resp.Notifier:
205+
// expected
206+
// test that the requested SA is now in the cache
207+
resp := cache.Get(Request{Name: "my-sa", Namespace: "default", RequestNotification: false})
208+
assert.True(t, resp.FoundInCache, "Expected cache entry to be found in cache")
209+
case <-time.After(1 * time.Second):
210+
t.Fatal("timeout waiting for notification")
211+
}
212+
}
213+
156214
func TestNonRegionalSTS(t *testing.T) {
157215
trueStr := "true"
158216
falseStr := "false"
@@ -237,7 +295,16 @@ func TestNonRegionalSTS(t *testing.T) {
237295

238296
testComposeRoleArn := ComposeRoleArn{}
239297

240-
cache := New(audience, "eks.amazonaws.com", tc.defaultRegionalSTS, 86400, informer, nil, testComposeRoleArn)
298+
cache := New(
299+
audience,
300+
"eks.amazonaws.com",
301+
tc.defaultRegionalSTS,
302+
86400,
303+
informer,
304+
nil,
305+
testComposeRoleArn,
306+
fakeClient.CoreV1(),
307+
)
241308
stop := make(chan struct{})
242309
informerFactory.Start(stop)
243310
informerFactory.WaitForCacheSync(stop)
@@ -295,7 +362,8 @@ func TestPopulateCacheFromCM(t *testing.T) {
295362
}
296363

297364
c := serviceAccountCache{
298-
cmCache: make(map[string]*Entry),
365+
cmCache: make(map[string]*Entry),
366+
notifications: newNotifications(make(chan *Request, 10)),
299367
}
300368

301369
{
@@ -413,6 +481,7 @@ func TestSAAnnotationRemoval(t *testing.T) {
413481
saCache: make(map[string]*Entry),
414482
annotationPrefix: "eks.amazonaws.com",
415483
webhookUsage: prometheus.NewGauge(prometheus.GaugeOpts{}),
484+
notifications: newNotifications(make(chan *Request, 10)),
416485
}
417486

418487
c.addSA(oldSA)
@@ -476,6 +545,7 @@ func TestCachePrecedence(t *testing.T) {
476545
defaultTokenExpiration: pkg.DefaultTokenExpiration,
477546
annotationPrefix: "eks.amazonaws.com",
478547
webhookUsage: prometheus.NewGauge(prometheus.GaugeOpts{}),
548+
notifications: newNotifications(make(chan *Request, 10)),
479549
}
480550

481551
{
@@ -574,7 +644,15 @@ func TestRoleArnComposition(t *testing.T) {
574644
informerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
575645
informer := informerFactory.Core().V1().ServiceAccounts()
576646

577-
cache := New(audience, "eks.amazonaws.com", true, 86400, informer, nil, testComposeRoleArn)
647+
cache := New(audience,
648+
"eks.amazonaws.com",
649+
true,
650+
86400,
651+
informer,
652+
nil,
653+
testComposeRoleArn,
654+
fakeClient.CoreV1(),
655+
)
578656
stop := make(chan struct{})
579657
informerFactory.Start(stop)
580658
informerFactory.WaitForCacheSync(stop)
@@ -673,6 +751,7 @@ func TestGetCommonConfigurations(t *testing.T) {
673751
defaultAudience: "sts.amazonaws.com",
674752
annotationPrefix: "eks.amazonaws.com",
675753
webhookUsage: prometheus.NewGauge(prometheus.GaugeOpts{}),
754+
notifications: newNotifications(make(chan *Request, 10)),
676755
}
677756

678757
if tc.serviceAccount != nil {

pkg/cache/notifications.go

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package cache
2+
3+
import (
4+
"sync"
5+
6+
"k8s.io/klog/v2"
7+
)
8+
9+
type notifications struct {
10+
handlers map[string]chan struct{}
11+
mu sync.Mutex
12+
fetchRequests chan<- *Request
13+
}
14+
15+
func newNotifications(saFetchRequests chan<- *Request) *notifications {
16+
return &notifications{
17+
handlers: map[string]chan struct{}{},
18+
fetchRequests: saFetchRequests,
19+
}
20+
}
21+
22+
func (n *notifications) create(req Request) <-chan struct{} {
23+
n.mu.Lock()
24+
defer n.mu.Unlock()
25+
26+
// deduplicate requests to SA with same namespace/name to single request
27+
notifier, found := n.handlers[req.CacheKey()]
28+
if !found {
29+
notifier = make(chan struct{})
30+
n.handlers[req.CacheKey()] = notifier
31+
n.fetchRequests <- &req
32+
}
33+
return notifier
34+
}
35+
36+
func (n *notifications) broadcast(key string) {
37+
n.mu.Lock()
38+
defer n.mu.Unlock()
39+
if handler, found := n.handlers[key]; found {
40+
klog.V(5).Infof("Notifying handlers for %q", key)
41+
close(handler)
42+
delete(n.handlers, key)
43+
}
44+
}

pkg/handler/handler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ func (m *Modifier) buildPodPatchConfig(pod *corev1.Pod) *podPatchConfig {
434434

435435
// Use the STS WebIdentity method if set
436436
gracePeriodEnabled := m.saLookupGraceTime > 0
437-
request := cache.Request{Namespace: pod.Namespace, Name: pod.Spec.ServiceAccountName, RequestNotification: true}
437+
request := cache.Request{Namespace: pod.Namespace, Name: pod.Spec.ServiceAccountName, RequestNotification: gracePeriodEnabled}
438438
response := m.Cache.Get(request)
439439
if !response.FoundInCache && !gracePeriodEnabled {
440440
missingSACounter.WithLabelValues().Inc()

vendor/k8s.io/client-go/util/retry/OWNERS

+4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)