Skip to content

Commit

Permalink
Address feedback and rebase
Browse files Browse the repository at this point in the history
Signed-off-by: Alejandro Pedraza <[email protected]>
  • Loading branch information
alpeb committed Mar 25, 2019
1 parent 570f1bc commit 43edcab
Show file tree
Hide file tree
Showing 16 changed files with 263 additions and 244 deletions.
11 changes: 7 additions & 4 deletions chart/templates/proxy_injector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,13 @@ rules:
verbs: ["create", "get", "delete"]
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["get"]
- apiGroups: ["", "extensions"]
resources: ["replicationcontrollers", "replicasets"]
verbs: ["get"]
verbs: ["list", "get", "watch"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["list"]
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["list", "get", "watch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
6 changes: 4 additions & 2 deletions cli/cmd/inject.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (rt resourceTransformerInject) transform(bytes []byte) ([]byte, []inject.Re
if len(rt.proxyOutboundCapacity) > 0 {
conf = conf.WithProxyOutboundCapacity(rt.proxyOutboundCapacity)
}
nonEmpty, err := conf.ParseMeta(bytes)
nonEmpty, err := conf.ParseMeta(bytes, "")
if err != nil {
return nil, nil, err
}
Expand All @@ -142,7 +142,9 @@ func (rt resourceTransformerInject) transform(bytes []byte) ([]byte, []inject.Re
if patchJSON == nil {
return bytes, reports, nil
}
log.Infof("patch generated for: %s", conf)
// TODO: refactor GetPatch() for it to return just one report item
r := reports[0]
log.Infof("patch generated for: %s", r.ResName())
log.Debugf("patch: %s", patchJSON)
patch, err := jsonpatch.DecodePatch(patchJSON)
if err != nil {
Expand Down
11 changes: 7 additions & 4 deletions cli/cmd/testdata/install_no_init_container_auto_inject.golden
Original file line number Diff line number Diff line change
Expand Up @@ -1491,10 +1491,13 @@ rules:
verbs: ["create", "get", "delete"]
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["get"]
- apiGroups: ["", "extensions"]
resources: ["replicationcontrollers", "replicasets"]
verbs: ["get"]
verbs: ["list", "get", "watch"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["list"]
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["list", "get", "watch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
11 changes: 7 additions & 4 deletions cli/cmd/testdata/install_output.golden
Original file line number Diff line number Diff line change
Expand Up @@ -1424,10 +1424,13 @@ rules:
verbs: ["create", "get", "delete"]
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["get"]
- apiGroups: ["", "extensions"]
resources: ["replicationcontrollers", "replicasets"]
verbs: ["get"]
verbs: ["list", "get", "watch"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["list"]
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["list", "get", "watch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
10 changes: 6 additions & 4 deletions controller/cmd/proxy-injector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ func main() {
defer close(stop)
signal.Notify(stop, os.Interrupt, os.Kill)

k8sClient, err := k8s.NewClientSet(*kubeconfig)
k8sAPI, err := k8s.InitializeAPI(*kubeconfig, k8s.NS, k8s.RS)
if err != nil {
log.Fatalf("failed to initialize Kubernetes client: %s", err)
log.Fatalf("failed to initialize Kubernetes API: %s", err)
}

rootCA, err := tls.GenerateRootCAWithDefaults("Proxy Injector Mutating Webhook Admission Controller CA")
if err != nil {
log.Fatalf("failed to create root CA: %s", err)
}

webhookConfig, err := injector.NewWebhookConfig(k8sClient, *controllerNamespace, *webhookServiceName, rootCA)
webhookConfig, err := injector.NewWebhookConfig(k8sAPI, *controllerNamespace, *webhookServiceName, rootCA)
if err != nil {
log.Fatalf("failed to read the trust anchor file: %s", err)
}
Expand All @@ -48,11 +48,13 @@ func main() {
}
log.Infof("created mutating webhook configuration: %s", mwc.ObjectMeta.SelfLink)

s, err := injector.NewWebhookServer(k8sClient, *addr, *controllerNamespace, *noInitContainer, rootCA)
s, err := injector.NewWebhookServer(k8sAPI, *addr, *controllerNamespace, *noInitContainer, rootCA)
if err != nil {
log.Fatalf("failed to initialize the webhook server: %s", err)
}

k8sAPI.Sync()

go func() {
log.Infof("listening at %s", *addr)
if err := s.ListenAndServeTLS("", ""); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions controller/proxy-injector/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"io/ioutil"
"net/http"

"github.com/linkerd/linkerd2/controller/k8s"
pkgTls "github.com/linkerd/linkerd2/pkg/tls"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
)

// WebhookServer is the webhook's HTTP server. It has an embedded webhook which
Expand All @@ -21,7 +21,7 @@ type WebhookServer struct {
}

// NewWebhookServer returns a new instance of the WebhookServer.
func NewWebhookServer(client kubernetes.Interface, addr, controllerNamespace string, noInitContainer bool, rootCA *pkgTls.CA) (*WebhookServer, error) {
func NewWebhookServer(api *k8s.API, addr, controllerNamespace string, noInitContainer bool, rootCA *pkgTls.CA) (*WebhookServer, error) {
c, err := tlsConfig(rootCA, controllerNamespace)
if err != nil {
return nil, err
Expand All @@ -32,7 +32,7 @@ func NewWebhookServer(client kubernetes.Interface, addr, controllerNamespace str
TLSConfig: c,
}

webhook, err := NewWebhook(client, controllerNamespace, noInitContainer)
webhook, err := NewWebhook(api, controllerNamespace, noInitContainer)
if err != nil {
return nil, err
}
Expand Down
21 changes: 12 additions & 9 deletions controller/proxy-injector/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"reflect"
"testing"

"github.com/linkerd/linkerd2/controller/k8s"
"github.com/linkerd/linkerd2/controller/proxy-injector/fake"
"github.com/linkerd/linkerd2/pkg/tls"
log "github.com/sirupsen/logrus"
Expand All @@ -20,9 +21,12 @@ var (

func init() {
// create a webhook which uses its fake client to seed the sidecar configmap
fakeClient := fake.NewClient("")
k8sAPI, err := k8s.NewFakeAPI()
if err != nil {
panic(err)
}

webhook, err := NewWebhook(fakeClient, fake.DefaultControllerNamespace, false)
webhook, err := NewWebhook(k8sAPI, fake.DefaultControllerNamespace, false)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -73,13 +77,12 @@ func TestNewWebhookServer(t *testing.T) {
log.Fatalf("failed to create root CA: %s", err)
}

var (
addr = ":7070"
kubeconfig = ""
)
fakeClient := fake.NewClient(kubeconfig)

server, err := NewWebhookServer(fakeClient, addr, fake.DefaultControllerNamespace, false, rootCA)
addr := ":7070"
k8sAPI, err := k8s.NewFakeAPI()
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}
server, err := NewWebhookServer(k8sAPI, addr, fake.DefaultControllerNamespace, false, rootCA)
if err != nil {
t.Fatal("Unexpected error: ", err)
}
Expand Down
103 changes: 46 additions & 57 deletions controller/proxy-injector/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,41 @@ package injector

import (
"fmt"
"strings"

pb "github.com/linkerd/linkerd2/controller/gen/config"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/linkerd/linkerd2/pkg/config"
"github.com/linkerd/linkerd2/pkg/inject"
"github.com/linkerd/linkerd2/pkg/k8s"
pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/version"
log "github.com/sirupsen/logrus"
admissionv1beta1 "k8s.io/api/admission/v1beta1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/yaml"
)

// Webhook is a Kubernetes mutating admission webhook that mutates pods admission
// requests by injecting sidecar container spec into the pod spec during pod
// creation.
type Webhook struct {
client kubernetes.Interface
k8sAPI *k8s.API
deserializer runtime.Decoder
controllerNamespace string
noInitContainer bool
}

// NewWebhook returns a new instance of Webhook.
func NewWebhook(client kubernetes.Interface, controllerNamespace string, noInitContainer bool) (*Webhook, error) {
func NewWebhook(api *k8s.API, controllerNamespace string, noInitContainer bool) (*Webhook, error) {
var (
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
)

return &Webhook{
client: client,
k8sAPI: api,
deserializer: codecs.UniversalDeserializer(),
controllerNamespace: controllerNamespace,
noInitContainer: noInitContainer,
Expand Down Expand Up @@ -89,27 +88,30 @@ func (w *Webhook) decode(data []byte) (*admissionv1beta1.AdmissionReview, error)
func (w *Webhook) inject(request *admissionv1beta1.AdmissionRequest) (*admissionv1beta1.AdmissionResponse, error) {
log.Debugf("request object bytes: %s", request.Object.Raw)

globalConfig, err := config.Global(k8s.MountPathGlobalConfig)
globalConfig, err := config.Global(pkgK8s.MountPathGlobalConfig)
if err != nil {
return nil, err
}

proxyConfig, err := config.Proxy(k8s.MountPathProxyConfig)
proxyConfig, err := config.Proxy(pkgK8s.MountPathProxyConfig)
if err != nil {
return nil, err
}

namespace, err := w.client.CoreV1().Namespaces().Get(request.Namespace, metav1.GetOptions{})
namespaces, err := w.k8sAPI.GetObjects("", pkgK8s.Namespace, request.Namespace)
if err != nil {
return nil, err
}
nsAnnotations := namespace.GetAnnotations()
if len(namespaces) == 0 {
return nil, fmt.Errorf("namespace \"%s\" not found", request.Namespace)
}
nsAnnotations := namespaces[0].(*v1.Namespace).GetAnnotations()

configs := &pb.All{Global: globalConfig, Proxy: proxyConfig}
conf := inject.NewResourceConfig(configs).
WithNsAnnotations(nsAnnotations).
WithKind(request.Kind.Kind)
nonEmpty, err := conf.ParseMeta(request.Object.Raw)
nonEmpty, err := conf.ParseMeta(request.Object.Raw, request.Namespace)
if err != nil {
return nil, err
}
Expand All @@ -122,7 +124,7 @@ func (w *Webhook) inject(request *admissionv1beta1.AdmissionRequest) (*admission
return admissionResponse, nil
}

p, _, err := conf.GetPatch(request.Object.Raw, inject.ShouldInjectWebhook)
p, reports, err := conf.GetPatch(request.Object.Raw, inject.ShouldInjectWebhook)
if err != nil {
return nil, err
}
Expand All @@ -133,26 +135,21 @@ func (w *Webhook) inject(request *admissionv1beta1.AdmissionRequest) (*admission

p.AddCreatedByPodAnnotation(fmt.Sprintf("linkerd/proxy-injector %s", version.Version))

if refs := conf.GetOwnerReferences(); len(refs) > 0 {
// assuming just one owner reference
k, v, err := w.parentRefLabel(request.Namespace, refs[0])
if err != nil {
return nil, err
}
p.AddPodLabel(k, v)
} else {
// When adding workloads through `kubectl apply` the spec template labels are
// automatically copied to the workload's main metadata section.
// This doesn't happen when adding labels through the webhook. So we manually
// add them to remain consistent.
conf.AddRootLabels(p)
key, name, err := w.getLabelForParent(conf)
if err != nil {
return nil, err
}
if key != "" && name != "" {
p.AddPodLabel(key, name)
}

patchJSON, err := p.Marshal()
if err != nil {
return nil, err
}
log.Infof("patch generated for: %s", conf)
// TODO: refactor GetPatch() so it only returns one report item
r := reports[0]
log.Infof("patch generated for: %s", r.ResName())
log.Debugf("patch: %s", patchJSON)

patchType := admissionv1beta1.PatchTypeJSONPatch
Expand All @@ -162,35 +159,27 @@ func (w *Webhook) inject(request *admissionv1beta1.AdmissionRequest) (*admission
return admissionResponse, nil
}

func (w *Webhook) parentRefLabel(ns string, ref metav1.OwnerReference) (string, string, error) {
var key string
switch strings.ToLower(ref.Kind) {
case k8s.Deployment:
key = k8s.ProxyDeploymentLabel
case k8s.ReplicationController:
key = k8s.ProxyReplicationControllerLabel
rs, err := w.client.CoreV1().ReplicationControllers(ns).Get(ref.Name, v1.GetOptions{})
if err != nil {
return "", "", err
}
for _, ref := range rs.OwnerReferences {
return w.parentRefLabel(ns, ref)
}
case k8s.ReplicaSet:
key = k8s.ProxyReplicaSetLabel
rs, err := w.client.ExtensionsV1beta1().ReplicaSets(ns).Get(ref.Name, v1.GetOptions{})
if err != nil {
return "", "", err
}
for _, ref := range rs.OwnerReferences {
return w.parentRefLabel(ns, ref)
func (w *Webhook) getLabelForParent(conf *inject.ResourceConfig) (string, string, error) {
pod, err := conf.GetPod()
if err != nil {
return "", "", err
}
if kind, name := w.k8sAPI.GetOwnerKindAndName(pod); kind != pkgK8s.Pod {
switch kind {
case pkgK8s.Deployment:
return pkgK8s.ProxyDeploymentLabel, name, nil
case pkgK8s.ReplicationController:
return pkgK8s.ProxyReplicationControllerLabel, name, nil
case pkgK8s.ReplicaSet:
return pkgK8s.ProxyReplicaSetLabel, name, nil
case pkgK8s.Job:
return pkgK8s.ProxyJobLabel, name, nil
case pkgK8s.DaemonSet:
return pkgK8s.ProxyDaemonSetLabel, name, nil
case pkgK8s.StatefulSet:
return pkgK8s.ProxyStatefulSetLabel, name, nil
}
case k8s.Job:
key = k8s.ProxyJobLabel
case k8s.DaemonSet:
key = k8s.ProxyDaemonSetLabel
case k8s.StatefulSet:
key = k8s.ProxyStatefulSetLabel
}
return key, ref.Name, nil
return "", "", fmt.Errorf("unsupported parent kind \"%s\"", kind)
}
return "", "", nil
}
Loading

0 comments on commit 43edcab

Please sign in to comment.