Skip to content


Merge pull request #1159 from s1061123/per-node-cert
Browse files Browse the repository at this point in the history
Add per-node-certification support
  • Loading branch information
dougbtv authored Sep 18, 2023
2 parents acfdc64 + e5d19ff commit 857d070
Show file tree
Hide file tree
Showing 20 changed files with 2,853 additions and 101 deletions.
363 changes: 363 additions & 0 deletions cmd/cert-approver/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,363 @@
// Copyright (c) 2023 Network Plumbing Working Group
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// This is Kubernetes controller which approves CSR submitted by multus.
// This command is required only if multus runs with per-node certificate.
package main

// Note: cert-approver should be simple, just approve multus' CSR, hence
// this go code should not have any dependencies from pkg/, if possible,
// to keep its code simplicity.
import (

metav1 ""
utilruntime ""

certificatesv1 ""
corev1 ""

typedcorev1 ""

// CertController object
type CertController struct {
clientset kubernetes.Interface
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
broadcaster record.EventBroadcaster
recorder record.EventRecorder
commonNamePrefixes string

const (
maxDuration = time.Hour * 24 * 365
resyncPeriod time.Duration = time.Second * 3600 // resync every one hour, default is 10 hour
maxRetries = 5

var (
ControllerName = "csr-approver"
NamePrefix = "system:multus"
Organization = []string{"system:multus"}
Groups = sets.New[string]("system:nodes", "system:multus", "system:authenticated")
UserPrefixes = sets.New[string]("system:node", NamePrefix)
Usages = sets.New[certificatesv1.KeyUsage](

func NewCertController() (*CertController, error) {
var clientset kubernetes.Interface
/* setup Kubernetes API client */
config, err := rest.InClusterConfig()
if err != nil {
return nil, err

clientset, err = kubernetes.NewForConfig(config)
if err != nil {
return nil, err

informer := cache.NewSharedIndexInformer(
"certificatesigningrequests", corev1.NamespaceAll, fields.Everything()),

broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientset.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cert-approver"})
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
c := &CertController{
clientset: clientset,
informer: informer,
queue: queue,
commonNamePrefixes: NamePrefix,
broadcaster: broadcaster,
recorder: recorder,

AddFunc: func(obj interface{}) {
if csr, ok := obj.(*certificatesv1.CertificateSigningRequest); ok {
if c.filterCSR(csr) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {

return c, nil

func (c *CertController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Info("Starting cert approver")

go c.informer.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, c.HasSynced) {
utilruntime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))

klog.Info("cert approver synced and ready")
wait.Until(c.runWorker, time.Second, stopCh)

// HasSynced is required for the cache.Controller interface.
func (c *CertController) HasSynced() bool {
return c.informer.HasSynced()

// LastSyncResourceVersion is required for the cache.Controller interface.
func (c *CertController) LastSyncResourceVersion() string {
return c.informer.LastSyncResourceVersion()

func (c *CertController) runWorker() {
for c.processNextItem() {
// continue looping

func (c *CertController) processNextItem() bool {
// Wait until there is a new item in the working queue
key, quit := c.queue.Get()
if quit {
return false
// Tell the queue that we are done with processing this key. This unblocks the key for other workers
// This allows safe parallel processing because two pods with the same key are never processed in
// parallel.
defer c.queue.Done(key)

// Invoke the method containing the business logic
err := c.processItem(key.(string))
// Handle the error if something went wrong during the execution of the business logic
c.handleErr(err, key)
return true


// handleErr checks if an error happened and makes sure we will retry later.
func (c *CertController) handleErr(err error, key interface{}) {
if err == nil {
// Forget about the #AddRateLimited history of the key on every successful synchronization.
// This ensures that future processing of updates for this key is not delayed because of
// an outdated error history.

// This controller retries 5 times if something goes wrong. After that, it stops trying.
if c.queue.NumRequeues(key) < maxRetries {
klog.Infof("Error syncing csr %s: %v", key, err)
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.

// Report to an external entity that, even after several retries, we could not successfully process this key
klog.Infof("Dropping csr %q out of the queue: %v", key, err)

func (c *CertController) processItem(key string) error {
startTime := time.Now()

obj, _, err := c.informer.GetIndexer().GetByKey(key)
if err != nil {
return fmt.Errorf("Error fetching object with key %s from store: %v", key, err)

req, _ := obj.(*certificatesv1.CertificateSigningRequest)

nodeName := "unknown"
defer func() {
klog.Infof("Finished syncing CSR %s for %s node in %v", req.Name, nodeName, time.Since(startTime))

if len(req.Status.Certificate) > 0 {
klog.V(5).Infof("CSR %s is already signed", req.Name)
return nil

if isApprovedOrDenied(&req.Status) {
klog.V(5).Infof("CSR %s is already approved/denied", req.Name)
return nil

csrPEM, _ := pem.Decode(req.Spec.Request)
if csrPEM == nil {
return fmt.Errorf("failed to PEM-parse the CSR block in .spec.request: no CSRs were found")

x509CSR, err := x509.ParseCertificateRequest(csrPEM.Bytes)
if err != nil {
return fmt.Errorf("failed to parse the CSR bytes: %v", err)

i := strings.LastIndex(req.Spec.Username, ":")
if i == -1 || i == len(req.Spec.Username)-1 {
return fmt.Errorf("failed to parse the username: %s", req.Spec.Username)

ctx := context.Background()
prefix := req.Spec.Username[:i]
nodeName = req.Spec.Username[i+1:]
if !UserPrefixes.Has(prefix) {
return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created by an unexpected user: %q", req.Name, req.Spec.Username))

if errs := validation.IsDNS1123Subdomain(nodeName); len(errs) != 0 {
return c.denyCSR(ctx, req, fmt.Sprintf("extracted node name %q is not a valid DNS subdomain %v", nodeName, errs))

if usages := sets.New[certificatesv1.KeyUsage](req.Spec.Usages...); !usages.Equal(Usages) {
return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created with unexpected usages: %v", req.Name, usages.UnsortedList()))

if !Groups.HasAll(req.Spec.Groups...) {
return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created by a user with unexpected groups: %v", req.Name, req.Spec.Groups))

expectedSubject := fmt.Sprintf("%s:%s", c.commonNamePrefixes, nodeName)
if x509CSR.Subject.CommonName != expectedSubject {
return c.denyCSR(ctx, req, fmt.Sprintf("expected the CSR's commonName to be %q, but it is %q", expectedSubject, x509CSR.Subject.CommonName))

if !reflect.DeepEqual(x509CSR.Subject.Organization, Organization) {
return c.denyCSR(ctx, req, fmt.Sprintf("expected the CSR's organization to be %v, but it is %v", Organization, x509CSR.Subject.Organization))

if req.Spec.ExpirationSeconds == nil {
return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created without specyfying the expirationSeconds", req.Name))

if csr.ExpirationSecondsToDuration(*req.Spec.ExpirationSeconds) > maxDuration {
return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created with invalid expirationSeconds value: %d", req.Name, *req.Spec.ExpirationSeconds))

return c.approveCSR(ctx, req)

// CSR specific functions

func (c *CertController) filterCSR(csr *certificatesv1.CertificateSigningRequest) bool {
nsName := types.NamespacedName{Namespace: csr.Namespace, Name: csr.Name}
csrPEM, _ := pem.Decode(csr.Spec.Request)
if csrPEM == nil {
klog.Errorf("Failed to PEM-parse the CSR block in .spec.request: no CSRs were found in %s", nsName)
return false

x509CSR, err := x509.ParseCertificateRequest(csrPEM.Bytes)
if err != nil {
klog.Errorf("Failed to parse the CSR .spec.request of %q: %v", nsName, err)
return false

return strings.HasPrefix(x509CSR.Subject.CommonName, c.commonNamePrefixes) &&
csr.Spec.SignerName == certificatesv1.KubeAPIServerClientSignerName

func (c *CertController) approveCSR(ctx context.Context, csr *certificatesv1.CertificateSigningRequest) error {
csr.Status.Conditions = append(csr.Status.Conditions,
Type: certificatesv1.CertificateApproved,
Status: corev1.ConditionTrue,
Reason: "AutoApproved",
Message: fmt.Sprintf("Auto-approved CSR %q", csr.Name),

c.recorder.Eventf(csr, corev1.EventTypeNormal, "CSRApproved", "CSR %q has been approved by %s", csr.Name, ControllerName)
_, err := c.clientset.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csr.Name, csr, metav1.UpdateOptions{})
return err

func (c *CertController) denyCSR(ctx context.Context, csr *certificatesv1.CertificateSigningRequest, message string) error {
csr.Status.Conditions = append(csr.Status.Conditions,
Type: certificatesv1.CertificateDenied,
Status: corev1.ConditionTrue,
Reason: "CSRDenied",
Message: message,

c.recorder.Eventf(csr, corev1.EventTypeWarning, "CSRDenied", "The CSR %q has been denied by: %s", csr.Name, ControllerName, message)
_, err := c.clientset.CertificatesV1().CertificateSigningRequests().Update(ctx, csr, metav1.UpdateOptions{})
return err

func isApprovedOrDenied(status *certificatesv1.CertificateSigningRequestStatus) bool {
for _, c := range status.Conditions {
if c.Type == certificatesv1.CertificateApproved || c.Type == certificatesv1.CertificateDenied {
return true
return false

func main() {
klog.Infof("starting cert-approver")

//Start watching for pod creations
certController, err := NewCertController()
if err != nil {

stopCh := make(chan struct{})
defer close(stopCh)
go certController.Run(stopCh)

sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)

0 comments on commit 857d070

Please sign in to comment.