diff --git a/Gopkg.lock b/Gopkg.lock index f080482308..9a19be5511 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -346,7 +346,7 @@ version = "kubernetes-1.11.1" [[projects]] - digest = "1:c6d38303d9da335700612bb53b1bb1f6fd18e8335b76049a798a2303eb9231d3" + digest = "1:159c095dfb7597e624380203dc6151fe870f7c52b4950a3459ecf1890f6500e2" name = "k8s.io/apimachinery" packages = [ "pkg/api/equality", @@ -380,6 +380,7 @@ "pkg/util/json", "pkg/util/mergepatch", "pkg/util/net", + "pkg/util/rand", "pkg/util/runtime", "pkg/util/sets", "pkg/util/strategicpatch", @@ -584,11 +585,14 @@ analyzer-version = 1 input-imports = [ "github.com/blang/semver", + "github.com/davecgh/go-spew/spew", "github.com/golang/glog", "github.com/google/uuid", "github.com/spf13/cobra", "k8s.io/api/apps/v1", + "k8s.io/api/batch/v1", "k8s.io/api/core/v1", + "k8s.io/api/rbac/v1", "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1", "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset", "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1", @@ -598,14 +602,18 @@ "k8s.io/apimachinery/pkg/api/equality", "k8s.io/apimachinery/pkg/api/errors", "k8s.io/apimachinery/pkg/apis/meta/v1", + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured", "k8s.io/apimachinery/pkg/labels", "k8s.io/apimachinery/pkg/runtime", "k8s.io/apimachinery/pkg/runtime/schema", "k8s.io/apimachinery/pkg/runtime/serializer", "k8s.io/apimachinery/pkg/types", + "k8s.io/apimachinery/pkg/util/errors", "k8s.io/apimachinery/pkg/util/intstr", + "k8s.io/apimachinery/pkg/util/rand", "k8s.io/apimachinery/pkg/util/runtime", "k8s.io/apimachinery/pkg/util/wait", + "k8s.io/apimachinery/pkg/util/yaml", "k8s.io/apimachinery/pkg/watch", "k8s.io/client-go/discovery", "k8s.io/client-go/discovery/fake", @@ -614,7 +622,9 @@ "k8s.io/client-go/kubernetes", "k8s.io/client-go/kubernetes/scheme", "k8s.io/client-go/kubernetes/typed/apps/v1", + "k8s.io/client-go/kubernetes/typed/batch/v1", "k8s.io/client-go/kubernetes/typed/core/v1", + "k8s.io/client-go/kubernetes/typed/rbac/v1", "k8s.io/client-go/listers/apps/v1", "k8s.io/client-go/rest", "k8s.io/client-go/testing", diff --git a/cmd/start.go b/cmd/start.go index afcb9ef877..120ba299d4 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -45,12 +45,14 @@ var ( startOpts struct { kubeconfig string + nodeName string } ) func init() { rootCmd.AddCommand(startCmd) startCmd.PersistentFlags().StringVar(&startOpts.kubeconfig, "kubeconfig", "", "Kubeconfig file to access a remote cluster (testing only)") + startCmd.PersistentFlags().StringVar(&startOpts.nodeName, "node-name", "", "kubernetes node name CVO is scheduled on.") } func runStartCmd(cmd *cobra.Command, args []string) { @@ -60,6 +62,14 @@ func runStartCmd(cmd *cobra.Command, args []string) { // To help debugging, immediately log version glog.Infof("%s", version.String) + if startOpts.nodeName == "" { + name, ok := os.LookupEnv("NODE_NAME") + if !ok || name == "" { + glog.Fatalf("node-name is required") + } + startOpts.nodeName = name + } + cb, err := newClientBuilder(startOpts.kubeconfig) if err != nil { glog.Fatalf("error creating clients: %v", err) @@ -137,6 +147,11 @@ type clientBuilder struct { config *rest.Config } +func (cb *clientBuilder) RestConfig() *rest.Config { + c := rest.CopyConfig(cb.config) + return c +} + func (cb *clientBuilder) ClientOrDie(name string) clientset.Interface { return clientset.NewForConfigOrDie(rest.AddUserAgent(cb.config, name)) } @@ -205,11 +220,13 @@ func createControllerContext(cb *clientBuilder, stop <-chan struct{}) *controlle func startControllers(ctx *controllerContext) error { go cvo.New( + startOpts.nodeName, componentNamespace, componentName, ctx.InformerFactory.Clusterversion().V1().CVOConfigs(), ctx.InformerFactory.Clusterversion().V1().OperatorStatuses(), ctx.APIExtInformerFactory.Apiextensions().V1beta1().CustomResourceDefinitions(), ctx.KubeInformerFactory.Apps().V1().Deployments(), + ctx.ClientBuilder.RestConfig(), ctx.ClientBuilder.ClientOrDie(componentName), ctx.ClientBuilder.KubeClientOrDie(componentName), ctx.ClientBuilder.APIExtClientOrDie(componentName), diff --git a/lib/manifest.go b/lib/manifest.go new file mode 100644 index 0000000000..cf12c864de --- /dev/null +++ b/lib/manifest.go @@ -0,0 +1,198 @@ +package lib + +import ( + "bytes" + "errors" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "sort" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/client-go/kubernetes/scheme" +) + +// Manifest stores Kubernetes object in Raw from a file. +// It stores the GroupVersionKind for the manifest. +type Manifest struct { + Raw []byte + GVK schema.GroupVersionKind + + obj *unstructured.Unstructured +} + +// UnmarshalJSON unmarshals bytes of single kubernetes object to Manifest. +func (m *Manifest) UnmarshalJSON(in []byte) error { + if m == nil { + return errors.New("Manifest: UnmarshalJSON on nil pointer") + } + + // This happens when marshalling + // + // --- (this between two `---`) + // --- + // + if bytes.Equal(in, []byte("null")) { + m.Raw = nil + return nil + } + + m.Raw = append(m.Raw[0:0], in...) + udi, _, err := scheme.Codecs.UniversalDecoder().Decode(in, nil, &unstructured.Unstructured{}) + if err != nil { + return fmt.Errorf("unable to decode manifest: %v", err) + } + ud, ok := udi.(*unstructured.Unstructured) + if !ok { + return fmt.Errorf("expected manifest to decode into *unstructured.Unstructured, got %T", ud) + } + + m.GVK = ud.GroupVersionKind() + m.obj = ud.DeepCopy() + return nil +} + +// Object returns underlying metav1.Object +func (m *Manifest) Object() metav1.Object { return m.obj } + +const ( + // rootDirKey is used as key for the manifest files in root dir + // passed to LoadManifests + // It is set to `000` to give it more priority if the actor sorts + // based on keys. + rootDirKey = "000" +) + +// LoadManifests loads manifest from disk. +// +// root/ +// manifest0 +// manifest1 +// 00_subdir0/ +// manifest0 +// manifest1 +// 01_subdir1/ +// manifest0 +// manifest1 +// LoadManifests(/root): +// returns map +// 000: [manifest0, manifest1] +// 00_subdir0: [manifest0, manifest1] +// 01_subdir1: [manifest0, manifest1] +// +// It skips dirs that have not files. +// It only reads dir `p` and its direct subdirs. +func LoadManifests(p string) (map[string][]Manifest, error) { + var out = make(map[string][]Manifest) + + fs, err := ioutil.ReadDir(p) + if err != nil { + return nil, err + } + + // We want to accumulate all the errors, not returning at the + // first error encountered when reading subdirs. + var errs []error + + // load manifest files in p to rootDirKey + ms, err := loadManifestsFromDir(p) + if err != nil { + errs = append(errs, fmt.Errorf("error loading from dir %s: %v", p, err)) + } + if len(ms) > 0 { + out[rootDirKey] = ms + } + + // load manifests from subdirs to subdir-name + for _, f := range fs { + if !f.IsDir() { + continue + } + path := filepath.Join(p, f.Name()) + ms, err := loadManifestsFromDir(path) + if err != nil { + errs = append(errs, fmt.Errorf("error loading from dir %s: %v", path, err)) + continue + } + if len(ms) > 0 { + out[f.Name()] = ms + } + } + + agg := utilerrors.NewAggregate(errs) + if agg != nil { + return nil, errors.New(agg.Error()) + } + return out, nil +} + +// loadManifestsFromDir only returns files. not subdirs are traversed. +// returns manifests in increasing order of their filename. +func loadManifestsFromDir(dir string) ([]Manifest, error) { + var manifests []Manifest + fs, err := ioutil.ReadDir(dir) + if err != nil { + return nil, err + } + + // ensure sorted. + sort.Slice(fs, func(i, j int) bool { + return fs[i].Name() < fs[j].Name() + }) + + var errs []error + for _, f := range fs { + if f.IsDir() { + continue + } + + path := filepath.Join(dir, f.Name()) + file, err := os.Open(path) + if err != nil { + errs = append(errs, fmt.Errorf("error opening %s: %v", path, err)) + continue + } + defer file.Close() + + ms, err := parseManifests(file) + if err != nil { + errs = append(errs, fmt.Errorf("error parsing %s: %v", path, err)) + continue + } + manifests = append(manifests, ms...) + } + + agg := utilerrors.NewAggregate(errs) + if agg != nil { + return nil, fmt.Errorf("error loading manifests from %q: %v", dir, agg.Error()) + } + + return manifests, nil +} + +// parseManifests parses a YAML or JSON document that may contain one or more +// kubernetes resources. +func parseManifests(r io.Reader) ([]Manifest, error) { + d := yaml.NewYAMLOrJSONDecoder(r, 1024) + var manifests []Manifest + for { + m := Manifest{} + if err := d.Decode(&m); err != nil { + if err == io.EOF { + return manifests, nil + } + return manifests, fmt.Errorf("error parsing: %v", err) + } + m.Raw = bytes.TrimSpace(m.Raw) + if len(m.Raw) == 0 || bytes.Equal(m.Raw, []byte("null")) { + continue + } + manifests = append(manifests, m) + } +} diff --git a/lib/manifest_test.go b/lib/manifest_test.go new file mode 100644 index 0000000000..1cb22f6310 --- /dev/null +++ b/lib/manifest_test.go @@ -0,0 +1,558 @@ +package lib + +import ( + "io/ioutil" + "os" + "path/filepath" + "reflect" + "sort" + "strings" + "testing" + + "github.com/davecgh/go-spew/spew" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func TestParseManifests(t *testing.T) { + tests := []struct { + name string + raw string + want []Manifest + }{{ + name: "ingress", + raw: ` +apiVersion: extensions/v1beta1 +kind: Ingress +metadata: + name: test-ingress + namespace: test-namespace +spec: + rules: + - http: + paths: + - path: /testpath + backend: + serviceName: test + servicePort: 80 +`, + want: []Manifest{{ + Raw: []byte(`{"apiVersion":"extensions/v1beta1","kind":"Ingress","metadata":{"name":"test-ingress","namespace":"test-namespace"},"spec":{"rules":[{"http":{"paths":[{"backend":{"serviceName":"test","servicePort":80},"path":"/testpath"}]}}]}}`), + GVK: schema.GroupVersionKind{Group: "extensions", Version: "v1beta1", Kind: "Ingress"}, + }}, + }, { + name: "configmap", + raw: ` +apiVersion: v1 +kind: ConfigMap +metadata: + name: a-config + namespace: default +data: + color: "red" + multi-line: | + hello world + how are you? +`, + want: []Manifest{{ + Raw: []byte(`{"apiVersion":"v1","data":{"color":"red","multi-line":"hello world\nhow are you?\n"},"kind":"ConfigMap","metadata":{"name":"a-config","namespace":"default"}}`), + GVK: schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"}, + }}, + }, { + name: "two-resources", + raw: ` +apiVersion: extensions/v1beta1 +kind: Ingress +metadata: + name: test-ingress + namespace: test-namespace +spec: + rules: + - http: + paths: + - path: /testpath + backend: + serviceName: test + servicePort: 80 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: a-config + namespace: default +data: + color: "red" + multi-line: | + hello world + how are you? +`, + want: []Manifest{{ + Raw: []byte(`{"apiVersion":"extensions/v1beta1","kind":"Ingress","metadata":{"name":"test-ingress","namespace":"test-namespace"},"spec":{"rules":[{"http":{"paths":[{"backend":{"serviceName":"test","servicePort":80},"path":"/testpath"}]}}]}}`), + GVK: schema.GroupVersionKind{Group: "extensions", Version: "v1beta1", Kind: "Ingress"}, + }, { + Raw: []byte(`{"apiVersion":"v1","data":{"color":"red","multi-line":"hello world\nhow are you?\n"},"kind":"ConfigMap","metadata":{"name":"a-config","namespace":"default"}}`), + GVK: schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"}, + }}, + }, { + name: "two-resources-with-empty", + raw: ` +--- +apiVersion: extensions/v1beta1 +kind: Ingress +metadata: + name: test-ingress + namespace: test-namespace +spec: + rules: + - http: + paths: + - path: /testpath + backend: + serviceName: test + servicePort: 80 +--- +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: a-config + namespace: default +data: + color: "red" + multi-line: | + hello world + how are you? +--- +`, + want: []Manifest{{ + Raw: []byte(`{"apiVersion":"extensions/v1beta1","kind":"Ingress","metadata":{"name":"test-ingress","namespace":"test-namespace"},"spec":{"rules":[{"http":{"paths":[{"backend":{"serviceName":"test","servicePort":80},"path":"/testpath"}]}}]}}`), + GVK: schema.GroupVersionKind{Group: "extensions", Version: "v1beta1", Kind: "Ingress"}, + }, { + Raw: []byte(`{"apiVersion":"v1","data":{"color":"red","multi-line":"hello world\nhow are you?\n"},"kind":"ConfigMap","metadata":{"name":"a-config","namespace":"default"}}`), + GVK: schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"}, + }}, + }} + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got, err := parseManifests(strings.NewReader(test.raw)) + if err != nil { + t.Fatalf("failed to parse manifest: %v", err) + } + + for i := range got { + got[i].obj = nil + } + + if !reflect.DeepEqual(got, test.want) { + t.Fatalf("mismatch found") + } + }) + } + +} + +func TestLoadManifestsFromDir(t *testing.T) { + tests := []struct { + name string + fs []dir + want []Manifest + }{{ + name: "no-files", + fs: []dir{{ + name: "a", + }, { + name: "a/00_dir", + }, { + name: "a/01_dir", + }}, + want: nil, + }, { + name: "all-files", + fs: []dir{{ + name: "a", + files: []file{{ + name: "f0", + contents: ` +apiVersion: extensions/v1beta1 +kind: Ingress +metadata: + name: test-ingress + namespace: test-namespace +spec: + rules: + - http: + paths: + - path: /testpath + backend: + serviceName: test + servicePort: 80 +`, + }, { + name: "f1", + contents: ` +apiVersion: v1 +kind: ConfigMap +metadata: + name: a-config + namespace: default +data: + color: "red" + multi-line: | + hello world + how are you? +`, + }}, + }}, + want: []Manifest{{ + GVK: schema.GroupVersionKind{Group: "extensions", Version: "v1beta1", Kind: "Ingress"}, + }, { + GVK: schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"}, + }}, + }, { + name: "files-and-subdirs", + fs: []dir{{ + name: "a", + files: []file{{ + name: "f0", + contents: ` +apiVersion: extensions/v1beta1 +kind: Ingress +metadata: + name: test-ingress + namespace: test-namespace +spec: + rules: + - http: + paths: + - path: /testpath + backend: + serviceName: test + servicePort: 80 +`, + }, { + name: "f1", + contents: ` +apiVersion: v1 +kind: ConfigMap +metadata: + name: a-config + namespace: default +data: + color: "red" + multi-line: | + hello world + how are you? +`, + }}, + }, { + name: "a/00_dir", + files: []file{{ + name: "f0", + contents: ` +apiVersion: extensions/v1beta1 +kind: Ingress +metadata: + name: test-ingress + namespace: test-namespace +spec: + rules: + - http: + paths: + - path: /testpath + backend: + serviceName: test + servicePort: 80 +`, + }, { + name: "f1", + contents: ` +apiVersion: v1 +kind: ConfigMap +metadata: + name: a-config + namespace: default +data: + color: "red" + multi-line: | + hello world + how are you? +`, + }}, + }, { + name: "a/01_dir", + files: []file{{ + name: "f0", + contents: ` +apiVersion: extensions/v1beta1 +kind: Ingress +metadata: + name: test-ingress + namespace: test-namespace +spec: + rules: + - http: + paths: + - path: /testpath + backend: + serviceName: test + servicePort: 80 +`, + }, { + name: "f1", + contents: ` +apiVersion: v1 +kind: ConfigMap +metadata: + name: a-config + namespace: default +data: + color: "red" + multi-line: | + hello world + how are you? +`, + }}, + }}, + want: []Manifest{{ + GVK: schema.GroupVersionKind{Group: "extensions", Version: "v1beta1", Kind: "Ingress"}, + }, { + GVK: schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"}, + }}, + }} + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tmpdir, cleanup := setupTestFS(t, test.fs) + defer func() { + if err := cleanup(); err != nil { + t.Logf("error cleaning %q", tmpdir) + } + }() + + got, err := loadManifestsFromDir(filepath.Join(tmpdir, "a")) + if err != nil { + t.Fatal(err) + } + for i := range got { + got[i].Raw = nil + got[i].obj = nil + } + if !reflect.DeepEqual(got, test.want) { + t.Fatalf("mismatch \ngot: %s \nwant: %s", spew.Sdump(got), spew.Sdump(test.want)) + } + }) + } +} + +func TestLoadManifests(t *testing.T) { + tests := []struct { + name string + fs []dir + + wantkeys []string + wantlen []int + }{{ + name: "empty-root", + fs: []dir{{ + name: "a", + }, { + name: "a/00_dir", + files: []file{{ + name: "f0", + contents: ` +apiVersion: v1 +kind: ConfigMap +metadata: + name: a-config + namespace: default +data: + color: "red" + multi-line: | + hello world + how are you? +`, + }}, + }, { + name: "a/01_dir", + files: []file{{ + name: "f0", + contents: ` +apiVersion: v1 +kind: ConfigMap +metadata: + name: a-config + namespace: default +data: + color: "red" + multi-line: | + hello world + how are you? +`, + }}, + }}, + wantkeys: []string{"00_dir", "01_dir"}, + wantlen: []int{1, 1}, + }, { + name: "non-empty-root", + fs: []dir{{ + name: "a", + files: []file{{ + name: "f0", + contents: ` +apiVersion: v1 +kind: ConfigMap +metadata: + name: a-config + namespace: default +data: + color: "red" + multi-line: | + hello world + how are you? +`, + }}, + }, { + name: "a/00_dir", + files: []file{{ + name: "f0", + contents: ` +apiVersion: v1 +kind: ConfigMap +metadata: + name: a-config + namespace: default +data: + color: "red" + multi-line: | + hello world + how are you? +`, + }}, + }, { + name: "a/01_dir", + files: []file{{ + name: "f0", + contents: ` +apiVersion: v1 +kind: ConfigMap +metadata: + name: a-config + namespace: default +data: + color: "red" + multi-line: | + hello world + how are you? +`, + }}, + }}, + wantkeys: []string{"000", "00_dir", "01_dir"}, + wantlen: []int{1, 1, 1}, + }, { + name: "empty-sub-dir", + fs: []dir{{ + name: "a", + files: []file{{ + name: "f0", + contents: ` +apiVersion: v1 +kind: ConfigMap +metadata: + name: a-config + namespace: default +data: + color: "red" + multi-line: | + hello world + how are you? +`, + }}, + }, { + name: "a/00_dir", + files: []file{{ + name: "f0", + contents: ` +apiVersion: v1 +kind: ConfigMap +metadata: + name: a-config + namespace: default +data: + color: "red" + multi-line: | + hello world + how are you? +`, + }}, + }, { + name: "a/01_dir", + }}, + wantkeys: []string{"000", "00_dir"}, + wantlen: []int{1, 1}, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tmpdir, cleanup := setupTestFS(t, test.fs) + defer func() { + if err := cleanup(); err != nil { + t.Logf("error cleaning %q", tmpdir) + } + }() + + got, err := LoadManifests(filepath.Join(tmpdir, "a")) + if err != nil { + t.Fatal(err) + } + + gotkeys := []string{} + for k := range got { + gotkeys = append(gotkeys, k) + for j := range got[k] { + (got[k])[j].Raw = nil + } + } + sort.Strings(gotkeys) + gotlen := []int{} + for _, k := range gotkeys { + gotlen = append(gotlen, len(got[k])) + } + if !reflect.DeepEqual(test.wantkeys, gotkeys) { + t.Fatalf("mismatch \ngot: %s \nwant: %s", spew.Sdump(gotkeys), spew.Sdump(test.wantkeys)) + } + if !reflect.DeepEqual(test.wantlen, gotlen) { + t.Fatalf("mismatch \ngot: %s \nwant: %s", spew.Sdump(gotlen), spew.Sdump(test.wantlen)) + } + }) + } +} + +type file struct { + name string + contents string +} + +type dir struct { + name string + files []file +} + +// setupTestFS returns path of the tmp dir created and cleanup function. +func setupTestFS(t *testing.T, dirs []dir) (string, func() error) { + root, err := ioutil.TempDir("", "test") + if err != nil { + t.Fatal(err) + } + for _, dir := range dirs { + dpath := filepath.Join(root, dir.name) + if err := os.MkdirAll(dpath, 0755); err != nil { + t.Fatal(err) + } + for _, file := range dir.files { + path := filepath.Join(dpath, file.name) + ioutil.WriteFile(path, []byte(file.contents), 0755) + } + } + cleanup := func() error { + return os.RemoveAll(root) + } + return root, cleanup +} diff --git a/lib/resourceapply/batch.go b/lib/resourceapply/batch.go new file mode 100644 index 0000000000..95adba2a7c --- /dev/null +++ b/lib/resourceapply/batch.go @@ -0,0 +1,31 @@ +package resourceapply + +import ( + "github.com/openshift/cluster-version-operator/lib/resourcemerge" + batchv1 "k8s.io/api/batch/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + batchclientv1 "k8s.io/client-go/kubernetes/typed/batch/v1" + "k8s.io/utils/pointer" +) + +// ApplyJob applies the required Job to the cluster. +func ApplyJob(client batchclientv1.JobsGetter, required *batchv1.Job) (*batchv1.Job, bool, error) { + existing, err := client.Jobs(required.Namespace).Get(required.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + actual, err := client.Jobs(required.Namespace).Create(required) + return actual, true, err + } + if err != nil { + return nil, false, err + } + + modified := pointer.BoolPtr(false) + resourcemerge.EnsureJob(modified, existing, *required) + if !*modified { + return existing, false, nil + } + + actual, err := client.Jobs(required.Namespace).Update(existing) + return actual, true, err +} diff --git a/lib/resourceapply/core.go b/lib/resourceapply/core.go new file mode 100644 index 0000000000..eb8edab3d9 --- /dev/null +++ b/lib/resourceapply/core.go @@ -0,0 +1,52 @@ +package resourceapply + +import ( + "github.com/openshift/cluster-version-operator/lib/resourcemerge" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + coreclientv1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/utils/pointer" +) + +// ApplyServiceAccount applies the required serviceaccount to the cluster. +func ApplyServiceAccount(client coreclientv1.ServiceAccountsGetter, required *corev1.ServiceAccount) (*corev1.ServiceAccount, bool, error) { + existing, err := client.ServiceAccounts(required.Namespace).Get(required.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + actual, err := client.ServiceAccounts(required.Namespace).Create(required) + return actual, true, err + } + if err != nil { + return nil, false, err + } + + modified := pointer.BoolPtr(false) + resourcemerge.EnsureObjectMeta(modified, &existing.ObjectMeta, required.ObjectMeta) + if !*modified { + return existing, false, nil + } + + actual, err := client.ServiceAccounts(required.Namespace).Update(existing) + return actual, true, err +} + +// ApplyConfigMap applies the required serviceaccount to the cluster. +func ApplyConfigMap(client coreclientv1.ConfigMapsGetter, required *corev1.ConfigMap) (*corev1.ConfigMap, bool, error) { + existing, err := client.ConfigMaps(required.Namespace).Get(required.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + actual, err := client.ConfigMaps(required.Namespace).Create(required) + return actual, true, err + } + if err != nil { + return nil, false, err + } + + modified := pointer.BoolPtr(false) + resourcemerge.EnsureConfigMap(modified, existing, *required) + if !*modified { + return existing, false, nil + } + + actual, err := client.ConfigMaps(required.Namespace).Update(existing) + return actual, true, err +} diff --git a/lib/resourceapply/rbac.go b/lib/resourceapply/rbac.go new file mode 100644 index 0000000000..a5f46bdd3e --- /dev/null +++ b/lib/resourceapply/rbac.go @@ -0,0 +1,52 @@ +package resourceapply + +import ( + "github.com/openshift/cluster-version-operator/lib/resourcemerge" + rbacv1 "k8s.io/api/rbac/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + rbacclientv1 "k8s.io/client-go/kubernetes/typed/rbac/v1" + "k8s.io/utils/pointer" +) + +// ApplyClusterRoleBinding applies the required clusterrolebinding to the cluster. +func ApplyClusterRoleBinding(client rbacclientv1.ClusterRoleBindingsGetter, required *rbacv1.ClusterRoleBinding) (*rbacv1.ClusterRoleBinding, bool, error) { + existing, err := client.ClusterRoleBindings().Get(required.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + actual, err := client.ClusterRoleBindings().Create(required) + return actual, true, err + } + if err != nil { + return nil, false, err + } + + modified := pointer.BoolPtr(false) + resourcemerge.EnsureClusterRoleBinding(modified, existing, *required) + if !*modified { + return existing, false, nil + } + + actual, err := client.ClusterRoleBindings().Update(existing) + return actual, true, err +} + +// ApplyClusterRole applies the required clusterrole to the cluster. +func ApplyClusterRole(client rbacclientv1.ClusterRolesGetter, required *rbacv1.ClusterRole) (*rbacv1.ClusterRole, bool, error) { + existing, err := client.ClusterRoles().Get(required.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + actual, err := client.ClusterRoles().Create(required) + return actual, true, err + } + if err != nil { + return nil, false, err + } + + modified := pointer.BoolPtr(false) + resourcemerge.EnsureClusterRole(modified, existing, *required) + if !*modified { + return existing, false, nil + } + + actual, err := client.ClusterRoles().Update(existing) + return actual, true, err +} diff --git a/lib/resourcebuilder/apiext.go b/lib/resourcebuilder/apiext.go new file mode 100644 index 0000000000..7ff6ae70b2 --- /dev/null +++ b/lib/resourcebuilder/apiext.go @@ -0,0 +1,87 @@ +package resourcebuilder + +import ( + "fmt" + "time" + + "github.com/golang/glog" + + "github.com/openshift/cluster-version-operator/lib" + "github.com/openshift/cluster-version-operator/lib/resourceapply" + "github.com/openshift/cluster-version-operator/lib/resourceread" + apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + apiextclientv1beta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/rest" +) + +func newAPIExtBuilder(config *rest.Config, m lib.Manifest) (Interface, error) { + kind := m.GVK.Kind + switch kind { + case CRDKind: + return newCRDBuilder(config, m), nil + default: + return nil, fmt.Errorf("no APIExt builder found for %s", kind) + } +} + +const ( + // CRDKind is kind for CustomResourceDefinitions + CRDKind = "CustomResourceDefinition" +) + +type crdBuilder struct { + client *apiextclientv1beta1.ApiextensionsV1beta1Client + raw []byte +} + +func newCRDBuilder(config *rest.Config, m lib.Manifest) *crdBuilder { + return &crdBuilder{ + client: apiextclientv1beta1.NewForConfigOrDie(config), + raw: m.Raw, + } +} + +func (b *crdBuilder) Do(modifier MetaV1ObjectModifierFunc) error { + crd := resourceread.ReadCustomResourceDefinitionV1Beta1OrDie(b.raw) + + modifier(crd) + + _, updated, err := resourceapply.ApplyCustomResourceDefinition(b.client, crd) + if err != nil { + return err + } + if updated { + return waitForCustomResourceDefinitionCompletion(b.client, crd) + } + return nil +} + +const ( + crdPollInterval = 1 * time.Second + crdPollTimeout = 1 * time.Minute +) + +func waitForCustomResourceDefinitionCompletion(client apiextclientv1beta1.CustomResourceDefinitionsGetter, crd *apiextv1beta1.CustomResourceDefinition) error { + return wait.Poll(crdPollInterval, crdPollTimeout, func() (bool, error) { + c, err := client.CustomResourceDefinitions().Get(crd.Name, metav1.GetOptions{}) + if errors.IsNotFound(err) { + // exit early to recreate the crd. + return false, err + } + if err != nil { + glog.Errorf("error getting CustomResourceDefinition %s: %v", crd.Name, err) + return false, nil + } + + for _, condition := range c.Status.Conditions { + if condition.Type == apiextv1beta1.Established && condition.Status == apiextv1beta1.ConditionTrue { + return true, nil + } + } + glog.V(4).Infof("CustomResourceDefinition %s is not ready. conditions: %v", c.Name, c.Status.Conditions) + return false, nil + }) +} diff --git a/lib/resourcebuilder/apps.go b/lib/resourcebuilder/apps.go new file mode 100644 index 0000000000..fd9d0d8a5d --- /dev/null +++ b/lib/resourcebuilder/apps.go @@ -0,0 +1,91 @@ +package resourcebuilder + +import ( + "fmt" + "time" + + "github.com/golang/glog" + + "github.com/openshift/cluster-version-operator/lib" + "github.com/openshift/cluster-version-operator/lib/resourceapply" + "github.com/openshift/cluster-version-operator/lib/resourceread" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + appsclientv1 "k8s.io/client-go/kubernetes/typed/apps/v1" + "k8s.io/client-go/rest" +) + +func newAppsBuilder(config *rest.Config, m lib.Manifest) (Interface, error) { + kind := m.GVK.Kind + switch kind { + case DeploymentKind: + return newDeploymentBuilder(config, m), nil + default: + return nil, fmt.Errorf("no Apps builder found for %s", kind) + } +} + +const ( + // DeploymentKind is kind for Deployment. + DeploymentKind = "Deployment" +) + +type deploymentBuilder struct { + client *appsclientv1.AppsV1Client + raw []byte +} + +func newDeploymentBuilder(config *rest.Config, m lib.Manifest) *deploymentBuilder { + return &deploymentBuilder{ + client: appsclientv1.NewForConfigOrDie(config), + raw: m.Raw, + } +} + +func (b *deploymentBuilder) Do(modifier MetaV1ObjectModifierFunc) error { + deployment := resourceread.ReadDeploymentV1OrDie(b.raw) + + modifier(deployment) + + _, updated, err := resourceapply.ApplyDeployment(b.client, deployment) + if err != nil { + return err + } + if updated { + return waitForDeploymentCompletion(b.client, deployment) + } + return nil +} + +const ( + deploymentPollInterval = 1 * time.Second + deploymentPollTimeout = 5 * time.Minute +) + +func waitForDeploymentCompletion(client appsclientv1.DeploymentsGetter, deployment *appsv1.Deployment) error { + return wait.Poll(deploymentPollInterval, deploymentPollTimeout, func() (bool, error) { + d, err := client.Deployments(deployment.Namespace).Get(deployment.Name, metav1.GetOptions{}) + if errors.IsNotFound(err) { + // exit early to recreate the deployment. + return false, err + } + if err != nil { + // Do not return error here, as we could be updating the API Server itself, in which case we + // want to continue waiting. + glog.Errorf("error getting Deployment %s during rollout: %v", deployment.Name, err) + return false, nil + } + + if d.DeletionTimestamp != nil { + return false, fmt.Errorf("Deployment %s is being deleted", deployment.Name) + } + + if d.Generation <= d.Status.ObservedGeneration && d.Status.UpdatedReplicas == d.Status.Replicas && d.Status.UnavailableReplicas == 0 { + return true, nil + } + glog.V(4).Infof("Deployment %s is not ready. status: (replicas: %d, updated: %d, ready: %d, unavailable: %d)", d.Name, d.Status.Replicas, d.Status.UpdatedReplicas, d.Status.ReadyReplicas, d.Status.UnavailableReplicas) + return false, nil + }) +} diff --git a/lib/resourcebuilder/batch.go b/lib/resourcebuilder/batch.go new file mode 100644 index 0000000000..6e910344c1 --- /dev/null +++ b/lib/resourcebuilder/batch.go @@ -0,0 +1,91 @@ +package resourcebuilder + +import ( + "fmt" + "time" + + "github.com/golang/glog" + + "github.com/openshift/cluster-version-operator/lib" + "github.com/openshift/cluster-version-operator/lib/resourceapply" + "github.com/openshift/cluster-version-operator/lib/resourceread" + batchv1 "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + batchclientv1 "k8s.io/client-go/kubernetes/typed/batch/v1" + "k8s.io/client-go/rest" +) + +func newBatchBuilder(config *rest.Config, m lib.Manifest) (Interface, error) { + kind := m.GVK.Kind + switch kind { + case jobKind: + return newJobBuilder(config, m), nil + default: + return nil, fmt.Errorf("no Batch builder found for %s", kind) + } +} + +const ( + jobKind = "Job" +) + +type jobBuilder struct { + client *batchclientv1.BatchV1Client + raw []byte +} + +func newJobBuilder(config *rest.Config, m lib.Manifest) *jobBuilder { + return &jobBuilder{ + client: batchclientv1.NewForConfigOrDie(config), + raw: m.Raw, + } +} + +func (b *jobBuilder) Do(modifier MetaV1ObjectModifierFunc) error { + job := resourceread.ReadJobV1OrDie(b.raw) + + modifier(job) + + _, updated, err := resourceapply.ApplyJob(b.client, job) + if err != nil { + return err + } + if updated { + return WaitForJobCompletion(b.client, job) + } + return nil +} + +const ( + jobPollInterval = 1 * time.Second + jobPollTimeout = 5 * time.Minute +) + +// WaitForJobCompletion waits for job to complete. +func WaitForJobCompletion(client batchclientv1.JobsGetter, job *batchv1.Job) error { + return wait.Poll(jobPollInterval, jobPollTimeout, func() (bool, error) { + j, err := client.Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{}) + if err != nil { + glog.Errorf("error getting Job %s: %v", job.Name, err) + return false, nil + } + + if j.Status.Succeeded > 0 { + return true, nil + } + + // Since we have filled in "activeDeadlineSeconds", + // the Job will 'Active == 0' iff it exceeds the deadline. + // Failed jobs will be recreated in the next run. + if j.Status.Active == 0 && j.Status.Failed > 0 { + reason := "DeadlineExceeded" + message := "Job was active longer than specified deadline" + if len(j.Status.Conditions) > 0 { + reason, message = j.Status.Conditions[0].Reason, j.Status.Conditions[0].Message + } + return false, fmt.Errorf("deadline exceeded, reason: %q, message: %q", reason, message) + } + return false, nil + }) +} diff --git a/lib/resourcebuilder/core.go b/lib/resourcebuilder/core.go new file mode 100644 index 0000000000..5579de8f74 --- /dev/null +++ b/lib/resourcebuilder/core.go @@ -0,0 +1,69 @@ +package resourcebuilder + +import ( + "fmt" + + "github.com/openshift/cluster-version-operator/lib" + "github.com/openshift/cluster-version-operator/lib/resourceapply" + "github.com/openshift/cluster-version-operator/lib/resourceread" + coreclientv1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" +) + +func newCoreBuilder(config *rest.Config, m lib.Manifest) (Interface, error) { + kind := m.GVK.Kind + switch kind { + case serviceAccountKind: + return newServiceAccountBuilder(config, m), nil + case configMapKind: + return newConfigMapBuilder(config, m), nil + default: + return nil, fmt.Errorf("no Core builder found for %s", kind) + } +} + +const ( + serviceAccountKind = "ServiceAccount" +) + +type serviceAccountBuilder struct { + client *coreclientv1.CoreV1Client + raw []byte +} + +func newServiceAccountBuilder(config *rest.Config, m lib.Manifest) *serviceAccountBuilder { + return &serviceAccountBuilder{ + client: coreclientv1.NewForConfigOrDie(config), + raw: m.Raw, + } +} + +func (b *serviceAccountBuilder) Do(modifier MetaV1ObjectModifierFunc) error { + serviceAccount := resourceread.ReadServiceAccountV1OrDie(b.raw) + modifier(serviceAccount) + _, _, err := resourceapply.ApplyServiceAccount(b.client, serviceAccount) + return err +} + +const ( + configMapKind = "ConfigMap" +) + +type configMapBuilder struct { + client *coreclientv1.CoreV1Client + raw []byte +} + +func newConfigMapBuilder(config *rest.Config, m lib.Manifest) *configMapBuilder { + return &configMapBuilder{ + client: coreclientv1.NewForConfigOrDie(config), + raw: m.Raw, + } +} + +func (b *configMapBuilder) Do(modifier MetaV1ObjectModifierFunc) error { + configMap := resourceread.ReadConfigMapV1OrDie(b.raw) + modifier(configMap) + _, _, err := resourceapply.ApplyConfigMap(b.client, configMap) + return err +} diff --git a/lib/resourcebuilder/interface.go b/lib/resourcebuilder/interface.go new file mode 100644 index 0000000000..942b4062fc --- /dev/null +++ b/lib/resourcebuilder/interface.go @@ -0,0 +1,38 @@ +package resourcebuilder + +import ( + "fmt" + + "github.com/openshift/cluster-version-operator/lib" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/rest" +) + +type MetaV1ObjectModifierFunc func(metav1.Object) + +type Interface interface { + Do(MetaV1ObjectModifierFunc) error +} + +func New(rest *rest.Config, m lib.Manifest) (Interface, error) { + group, version := m.GVK.Group, m.GVK.Version + switch { + case group == batchv1.SchemeGroupVersion.Group && version == batchv1.SchemeGroupVersion.Version: + return newBatchBuilder(rest, m) + case group == corev1.SchemeGroupVersion.Group && version == corev1.SchemeGroupVersion.Version: + return newCoreBuilder(rest, m) + case group == appsv1.SchemeGroupVersion.Group && version == appsv1.SchemeGroupVersion.Version: + return newAppsBuilder(rest, m) + case group == rbacv1.SchemeGroupVersion.Group && version == rbacv1.SchemeGroupVersion.Version: + return newRbacBuilder(rest, m) + case group == apiextv1beta1.SchemeGroupVersion.Group && version == apiextv1beta1.SchemeGroupVersion.Version: + return newAPIExtBuilder(rest, m) + default: + return nil, fmt.Errorf("no builder found for %s,%s", group, version) + } +} diff --git a/lib/resourcebuilder/rbac.go b/lib/resourcebuilder/rbac.go new file mode 100644 index 0000000000..65c7e3f1ac --- /dev/null +++ b/lib/resourcebuilder/rbac.go @@ -0,0 +1,69 @@ +package resourcebuilder + +import ( + "fmt" + + "github.com/openshift/cluster-version-operator/lib" + "github.com/openshift/cluster-version-operator/lib/resourceapply" + "github.com/openshift/cluster-version-operator/lib/resourceread" + rbacclientv1 "k8s.io/client-go/kubernetes/typed/rbac/v1" + "k8s.io/client-go/rest" +) + +func newRbacBuilder(config *rest.Config, m lib.Manifest) (Interface, error) { + kind := m.GVK.Kind + switch kind { + case clusterRoleKind: + return newClusterRoleBuilder(config, m), nil + case clusterRoleBindingKind: + return newClusterRoleBindingBuilder(config, m), nil + default: + return nil, fmt.Errorf("no Rbac builder found for %s", kind) + } +} + +const ( + clusterRoleKind = "ClusterRole" +) + +type clusterRoleBuilder struct { + client *rbacclientv1.RbacV1Client + raw []byte +} + +func newClusterRoleBuilder(config *rest.Config, m lib.Manifest) *clusterRoleBuilder { + return &clusterRoleBuilder{ + client: rbacclientv1.NewForConfigOrDie(config), + raw: m.Raw, + } +} + +func (b *clusterRoleBuilder) Do(modifier MetaV1ObjectModifierFunc) error { + clusterRole := resourceread.ReadClusterRoleV1OrDie(b.raw) + modifier(clusterRole) + _, _, err := resourceapply.ApplyClusterRole(b.client, clusterRole) + return err +} + +const ( + clusterRoleBindingKind = "ClusterRoleBinding" +) + +type clusterRoleBindingBuilder struct { + client *rbacclientv1.RbacV1Client + raw []byte +} + +func newClusterRoleBindingBuilder(config *rest.Config, m lib.Manifest) *clusterRoleBindingBuilder { + return &clusterRoleBindingBuilder{ + client: rbacclientv1.NewForConfigOrDie(config), + raw: m.Raw, + } +} + +func (b *clusterRoleBindingBuilder) Do(modifier MetaV1ObjectModifierFunc) error { + clusterRoleBinding := resourceread.ReadClusterRoleBindingV1OrDie(b.raw) + modifier(clusterRoleBinding) + _, _, err := resourceapply.ApplyClusterRoleBinding(b.client, clusterRoleBinding) + return err +} diff --git a/lib/resourcemerge/batch.go b/lib/resourcemerge/batch.go new file mode 100644 index 0000000000..81ab7bb98a --- /dev/null +++ b/lib/resourcemerge/batch.go @@ -0,0 +1,28 @@ +package resourcemerge + +import ( + batchv1 "k8s.io/api/batch/v1" + "k8s.io/apimachinery/pkg/api/equality" +) + +// EnsureJob ensures that the existing matches the required. +// modified is set to true when existing had to be updated with required. +func EnsureJob(modified *bool, existing *batchv1.Job, required batchv1.Job) { + EnsureObjectMeta(modified, &existing.ObjectMeta, required.ObjectMeta) + + if existing.Spec.Selector == nil { + *modified = true + existing.Spec.Selector = required.Spec.Selector + } + if !equality.Semantic.DeepEqual(existing.Spec.Selector, required.Spec.Selector) { + *modified = true + existing.Spec.Selector = required.Spec.Selector + } + setInt32Ptr(modified, &existing.Spec.Parallelism, required.Spec.Parallelism) + setInt32Ptr(modified, &existing.Spec.Completions, required.Spec.Completions) + setInt64Ptr(modified, &existing.Spec.ActiveDeadlineSeconds, required.Spec.ActiveDeadlineSeconds) + setInt32Ptr(modified, &existing.Spec.BackoffLimit, required.Spec.BackoffLimit) + setBoolPtr(modified, &existing.Spec.ManualSelector, required.Spec.ManualSelector) + + ensurePodTemplateSpec(modified, &existing.Spec.Template, required.Spec.Template) +} diff --git a/lib/resourcemerge/core.go b/lib/resourcemerge/core.go index 1f4c8b3d8c..1b0aa21aaa 100644 --- a/lib/resourcemerge/core.go +++ b/lib/resourcemerge/core.go @@ -5,6 +5,14 @@ import ( "k8s.io/apimachinery/pkg/api/equality" ) +// EnsureConfigMap ensures that the existing matches the required. +// modified is set to true when existing had to be updated with required. +func EnsureConfigMap(modified *bool, existing *corev1.ConfigMap, required corev1.ConfigMap) { + EnsureObjectMeta(modified, &existing.ObjectMeta, required.ObjectMeta) + + mergeMap(modified, &existing.Data, required.Data) +} + // ensurePodTemplateSpec ensures that the existing matches the required. // modified is set to true when existing had to be updated with required. func ensurePodTemplateSpec(modified *bool, existing *corev1.PodTemplateSpec, required corev1.PodTemplateSpec) { diff --git a/lib/resourcemerge/cv.go b/lib/resourcemerge/cv.go index 44bf92f12f..881b889f92 100644 --- a/lib/resourcemerge/cv.go +++ b/lib/resourcemerge/cv.go @@ -43,11 +43,14 @@ func EnsureCVOConfig(modified *bool, existing *v1.CVOConfig, required v1.CVOConf *modified = true existing.ClusterID = required.ClusterID } - if existing.DesiredUpdate.Payload != required.DesiredUpdate.Payload { + + if required.DesiredUpdate.Payload != "" && + existing.DesiredUpdate.Payload != required.DesiredUpdate.Payload { *modified = true existing.DesiredUpdate.Payload = required.DesiredUpdate.Payload } - if existing.DesiredUpdate.Version != required.DesiredUpdate.Version { + if required.DesiredUpdate.Version != "" && + existing.DesiredUpdate.Version != required.DesiredUpdate.Version { *modified = true existing.DesiredUpdate.Version = required.DesiredUpdate.Version } diff --git a/lib/resourcemerge/rbac.go b/lib/resourcemerge/rbac.go new file mode 100644 index 0000000000..07e2ae3728 --- /dev/null +++ b/lib/resourcemerge/rbac.go @@ -0,0 +1,30 @@ +package resourcemerge + +import ( + rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/api/equality" +) + +// EnsureClusterRoleBinding ensures that the existing matches the required. +// modified is set to true when existing had to be updated with required. +func EnsureClusterRoleBinding(modified *bool, existing *rbacv1.ClusterRoleBinding, required rbacv1.ClusterRoleBinding) { + EnsureObjectMeta(modified, &existing.ObjectMeta, required.ObjectMeta) + if !equality.Semantic.DeepEqual(existing.Subjects, required.Subjects) { + *modified = true + existing.Subjects = required.Subjects + } + if !equality.Semantic.DeepEqual(existing.RoleRef, required.RoleRef) { + *modified = true + existing.RoleRef = required.RoleRef + } +} + +// EnsureClusterRole ensures that the existing matches the required. +// modified is set to true when existing had to be updated with required. +func EnsureClusterRole(modified *bool, existing *rbacv1.ClusterRole, required rbacv1.ClusterRole) { + EnsureObjectMeta(modified, &existing.ObjectMeta, required.ObjectMeta) + if !equality.Semantic.DeepEqual(existing.Rules, required.Rules) { + *modified = true + existing.Rules = required.Rules + } +} diff --git a/lib/resourceread/apiext.go b/lib/resourceread/apiext.go new file mode 100644 index 0000000000..ed6fbeb169 --- /dev/null +++ b/lib/resourceread/apiext.go @@ -0,0 +1,27 @@ +package resourceread + +import ( + apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" +) + +var ( + apiExtensionsScheme = runtime.NewScheme() + apiExtensionsCodecs = serializer.NewCodecFactory(apiExtensionsScheme) +) + +func init() { + if err := apiextv1beta1.AddToScheme(apiExtensionsScheme); err != nil { + panic(err) + } +} + +// ReadCustomResourceDefinitionV1Beta1OrDie reads crd object from bytes. Panics on error. +func ReadCustomResourceDefinitionV1Beta1OrDie(objBytes []byte) *apiextv1beta1.CustomResourceDefinition { + requiredObj, err := runtime.Decode(apiExtensionsCodecs.UniversalDecoder(apiextv1beta1.SchemeGroupVersion), objBytes) + if err != nil { + panic(err) + } + return requiredObj.(*apiextv1beta1.CustomResourceDefinition) +} diff --git a/lib/resourceread/apps.go b/lib/resourceread/apps.go new file mode 100644 index 0000000000..e62c12e317 --- /dev/null +++ b/lib/resourceread/apps.go @@ -0,0 +1,36 @@ +package resourceread + +import ( + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" +) + +var ( + appsScheme = runtime.NewScheme() + appsCodecs = serializer.NewCodecFactory(appsScheme) +) + +func init() { + if err := appsv1.AddToScheme(appsScheme); err != nil { + panic(err) + } +} + +// ReadDeploymentV1OrDie reads deployment object from bytes. Panics on error. +func ReadDeploymentV1OrDie(objBytes []byte) *appsv1.Deployment { + requiredObj, err := runtime.Decode(appsCodecs.UniversalDecoder(appsv1.SchemeGroupVersion), objBytes) + if err != nil { + panic(err) + } + return requiredObj.(*appsv1.Deployment) +} + +// ReadDaemonSetV1OrDie reads daemonset object from bytes. Panics on error. +func ReadDaemonSetV1OrDie(objBytes []byte) *appsv1.DaemonSet { + requiredObj, err := runtime.Decode(appsCodecs.UniversalDecoder(appsv1.SchemeGroupVersion), objBytes) + if err != nil { + panic(err) + } + return requiredObj.(*appsv1.DaemonSet) +} diff --git a/lib/resourceread/batch.go b/lib/resourceread/batch.go new file mode 100644 index 0000000000..e5f5a4a581 --- /dev/null +++ b/lib/resourceread/batch.go @@ -0,0 +1,27 @@ +package resourceread + +import ( + batchv1 "k8s.io/api/batch/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" +) + +var ( + batchScheme = runtime.NewScheme() + batchCodecs = serializer.NewCodecFactory(batchScheme) +) + +func init() { + if err := batchv1.AddToScheme(batchScheme); err != nil { + panic(err) + } +} + +// ReadJobV1OrDie reads Job object from bytes. Panics on error. +func ReadJobV1OrDie(objBytes []byte) *batchv1.Job { + requiredObj, err := runtime.Decode(batchCodecs.UniversalDecoder(batchv1.SchemeGroupVersion), objBytes) + if err != nil { + panic(err) + } + return requiredObj.(*batchv1.Job) +} diff --git a/lib/resourceread/core.go b/lib/resourceread/core.go new file mode 100644 index 0000000000..53bfd96932 --- /dev/null +++ b/lib/resourceread/core.go @@ -0,0 +1,36 @@ +package resourceread + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" +) + +var ( + coreScheme = runtime.NewScheme() + coreCodecs = serializer.NewCodecFactory(coreScheme) +) + +func init() { + if err := corev1.AddToScheme(coreScheme); err != nil { + panic(err) + } +} + +// ReadConfigMapV1OrDie reads configmap object from bytes. Panics on error. +func ReadConfigMapV1OrDie(objBytes []byte) *corev1.ConfigMap { + requiredObj, err := runtime.Decode(coreCodecs.UniversalDecoder(corev1.SchemeGroupVersion), objBytes) + if err != nil { + panic(err) + } + return requiredObj.(*corev1.ConfigMap) +} + +// ReadServiceAccountV1OrDie reads serviceaccount object from bytes. Panics on error. +func ReadServiceAccountV1OrDie(objBytes []byte) *corev1.ServiceAccount { + requiredObj, err := runtime.Decode(coreCodecs.UniversalDecoder(corev1.SchemeGroupVersion), objBytes) + if err != nil { + panic(err) + } + return requiredObj.(*corev1.ServiceAccount) +} diff --git a/lib/resourceread/rbac.go b/lib/resourceread/rbac.go new file mode 100644 index 0000000000..343161e59f --- /dev/null +++ b/lib/resourceread/rbac.go @@ -0,0 +1,36 @@ +package resourceread + +import ( + rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" +) + +var ( + rbacScheme = runtime.NewScheme() + rbacCodecs = serializer.NewCodecFactory(rbacScheme) +) + +func init() { + if err := rbacv1.AddToScheme(rbacScheme); err != nil { + panic(err) + } +} + +// ReadClusterRoleBindingV1OrDie reads clusterrolebinding object from bytes. Panics on error. +func ReadClusterRoleBindingV1OrDie(objBytes []byte) *rbacv1.ClusterRoleBinding { + requiredObj, err := runtime.Decode(rbacCodecs.UniversalDecoder(rbacv1.SchemeGroupVersion), objBytes) + if err != nil { + panic(err) + } + return requiredObj.(*rbacv1.ClusterRoleBinding) +} + +// ReadClusterRoleV1OrDie reads clusterole object from bytes. Panics on error. +func ReadClusterRoleV1OrDie(objBytes []byte) *rbacv1.ClusterRole { + requiredObj, err := runtime.Decode(rbacCodecs.UniversalDecoder(rbacv1.SchemeGroupVersion), objBytes) + if err != nil { + panic(err) + } + return requiredObj.(*rbacv1.ClusterRole) +} diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index 03af188185..f4c6dd96c8 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -23,6 +23,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" coreclientsetv1 "k8s.io/client-go/kubernetes/typed/core/v1" appslisterv1 "k8s.io/client-go/listers/apps/v1" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" @@ -42,11 +43,19 @@ const ( workQueueKey = "kube-system/installconfig" ) +// ownerKind contains the schema.GroupVersionKind for type that owns objects managed by CVO. +var ownerKind = v1.SchemeGroupVersion.WithKind("CVOConfig") + // Operator defines cluster version operator. type Operator struct { + // nodename allows CVO to sync fetchPayload to same node as itself. + nodename string // namespace and name are used to find the CVOConfig, OperatorStatus. namespace, name string + // restConfig is used to create resourcebuilder. + restConfig *rest.Config + client clientset.Interface kubeClient kubernetes.Interface apiExtClient apiextclientset.Interface @@ -68,11 +77,13 @@ type Operator struct { // New returns a new cluster version operator. func New( + nodename string, namespace, name string, cvoConfigInformer informersv1.CVOConfigInformer, operatorStatusInformer informersv1.OperatorStatusInformer, crdInformer apiextinformersv1beta1.CustomResourceDefinitionInformer, deployInformer appsinformersv1.DeploymentInformer, + restConfig *rest.Config, client clientset.Interface, kubeClient kubernetes.Interface, apiExtClient apiextclientset.Interface, @@ -82,8 +93,10 @@ func New( eventBroadcaster.StartRecordingToSink(&coreclientsetv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) optr := &Operator{ + nodename: nodename, namespace: namespace, name: name, + restConfig: restConfig, client: client, kubeClient: kubeClient, apiExtClient: apiExtClient, @@ -180,6 +193,7 @@ func (optr *Operator) sync(key string) error { glog.V(4).Infof("Finished syncing operator %q (%v)", key, time.Since(startTime)) }() + // We always run this to make sure CVOConfig can be synced. if err := optr.syncCVOCRDs(); err != nil { return err } @@ -193,7 +207,12 @@ func (optr *Operator) sync(key string) error { return err } - if err := optr.syncCVODeploy(config); err != nil { + payload, err := optr.syncUpdatePayloadContents(updatePayloadsPathPrefix, config) + if err != nil { + return err + } + + if err := optr.syncUpdatePayload(config, payload); err != nil { return err } diff --git a/pkg/cvo/sync.go b/pkg/cvo/sync.go index 199d1b6386..163774dda7 100644 --- a/pkg/cvo/sync.go +++ b/pkg/cvo/sync.go @@ -2,83 +2,187 @@ package cvo import ( "fmt" + "os" + "path/filepath" + "sort" "time" "github.com/golang/glog" + "github.com/openshift/cluster-version-operator/lib" "github.com/openshift/cluster-version-operator/lib/resourceapply" + "github.com/openshift/cluster-version-operator/lib/resourcebuilder" "github.com/openshift/cluster-version-operator/pkg/apis" "github.com/openshift/cluster-version-operator/pkg/apis/clusterversion.openshift.io/v1" - "github.com/openshift/cluster-version-operator/pkg/version" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + randutil "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/utils/pointer" ) -func (optr *Operator) syncCVODeploy(config *v1.CVOConfig) error { - if config.DesiredUpdate.Payload == "" || config.DesiredUpdate.Version == "" || config.DesiredUpdate.Version == version.Version.String() { - glog.V(4).Info("no update to be applied.") - return nil +func (optr *Operator) syncUpdatePayload(config *v1.CVOConfig, payload [][]lib.Manifest) error { + for _, manifests := range payload { + for _, manifest := range manifests { + glog.V(4).Infof("Running sync for (%s) %s/%s", manifest.GVK.String(), manifest.Object().GetNamespace(), manifest.Object().GetName()) + b, err := resourcebuilder.New(optr.restConfig, manifest) + if err != nil { + return err + } + if err := b.Do(ownerRefModifier(config)); err != nil { + return err + } + glog.V(4).Infof("Done syncing for (%s) %s/%s", manifest.GVK.String(), manifest.Object().GetNamespace(), manifest.Object().GetName()) + } + } + return nil +} + +func ownerRefModifier(config *v1.CVOConfig) resourcebuilder.MetaV1ObjectModifierFunc { + oref := metav1.NewControllerRef(config, ownerKind) + return func(obj metav1.Object) { + obj.SetOwnerReferences([]metav1.OwnerReference{*oref}) + } +} + +const ( + updatePayloadsPathPrefix = "/etc/cvo/update-payloads" +) + +func (optr *Operator) syncUpdatePayloadContents(pathprefix string, config *v1.CVOConfig) ([][]lib.Manifest, error) { + if !updatedesired(config.DesiredUpdate) { + return nil, nil + } + dv := config.DesiredUpdate.Version + dp := config.DesiredUpdate.Payload + + dirpath := filepath.Join(pathprefix, dv) + _, err := os.Stat(dirpath) + if err != nil && !os.IsNotExist(err) { + return nil, err + } + if os.IsNotExist(err) { + if err := optr.fetchUpdatePayloadToPath(pathprefix, config); err != nil { + return nil, err + } + } + + // read dirpath to manifests: + // For each operator in payload the manifests should be ordered as: + // 1. CRDs + // 2. others + mmap, err := lib.LoadManifests(dirpath) + if err != nil { + return nil, err + } + + if len(mmap) == 0 { + return nil, fmt.Errorf("empty update payload %s", dp) } - cvo := &appsv1.Deployment{ + + sortedkeys := []string{} + for k := range mmap { + sortedkeys = append(sortedkeys, k) + } + sort.Strings(sortedkeys) + + payload := [][]lib.Manifest{} + for _, k := range sortedkeys { + ordered := orderManifests(mmap[k]) + payload = append(payload, ordered) + } + return payload, nil +} + +func (optr *Operator) fetchUpdatePayloadToPath(pathprefix string, config *v1.CVOConfig) error { + if !updatedesired(config.DesiredUpdate) { + return fmt.Errorf("error DesiredUpdate is empty") + } + var ( + version = config.DesiredUpdate.Version + payload = config.DesiredUpdate.Payload + name = fmt.Sprintf("%s-%s-%s", optr.name, version, randutil.String(5)) + namespace = optr.namespace + deadline = pointer.Int64Ptr(2 * 60) + nodeSelectorKey = "node-role.kubernetes.io/master" + nodename = optr.nodename + vpath = filepath.Join(pathprefix, version) + cmd = []string{"/bin/sh"} + args = []string{"-c", fmt.Sprintf("mv /manifests %s", vpath)} + ) + + job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ - Name: optr.name, - Namespace: optr.namespace, - Labels: map[string]string{ - "k8s-app": optr.name, - }, + Name: name, + Namespace: namespace, }, - Spec: appsv1.DeploymentSpec{ - Replicas: pointer.Int32Ptr(1), - Strategy: appsv1.DeploymentStrategy{ - Type: appsv1.RollingUpdateDeploymentStrategyType, - RollingUpdate: &appsv1.RollingUpdateDeployment{ - MaxUnavailable: intOrStringPtr(intstr.FromInt(0)), - MaxSurge: intOrStringPtr(intstr.FromInt(1)), - }, - }, - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "k8s-app": optr.name, - }, - }, + Spec: batchv1.JobSpec{ + ActiveDeadlineSeconds: deadline, Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "k8s-app": optr.name, - }, - }, Spec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: optr.name, - Image: config.DesiredUpdate.Payload, + Name: "payload", + Image: payload, + Command: cmd, + Args: args, + VolumeMounts: []corev1.VolumeMount{{ + MountPath: pathprefix, + Name: "payload", + }}, }}, + Volumes: []corev1.Volume{{ + Name: "payload", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: pathprefix, + }, + }, + }}, + NodeName: nodename, NodeSelector: map[string]string{ - "node-role.kubernetes.io/master": "", + nodeSelectorKey: "", }, Tolerations: []corev1.Toleration{{ - Key: "node-role.kubernetes.io/master", + Key: nodeSelectorKey, }}, + RestartPolicy: corev1.RestartPolicyOnFailure, }, }, }, } - glog.V(4).Infof("Updating CVO to %s...", config.DesiredUpdate.Version) - _, updated, err := resourceapply.ApplyDeploymentFromCache(optr.deployLister, optr.kubeClient.AppsV1(), cvo) + _, err := optr.kubeClient.BatchV1().Jobs(job.Namespace).Create(job) if err != nil { return err } - if updated { - if err := optr.waitForDeploymentRollout(cvo); err != nil { - return err + return resourcebuilder.WaitForJobCompletion(optr.kubeClient.BatchV1(), job) +} + +func orderManifests(manifests []lib.Manifest) []lib.Manifest { + var crds, others []lib.Manifest + for _, m := range manifests { + group, version, kind := m.GVK.Group, m.GVK.Version, m.GVK.Kind + switch { + case group == apiextv1beta1.SchemeGroupVersion.Group && version == apiextv1beta1.SchemeGroupVersion.Version && kind == resourcebuilder.CRDKind: + crds = append(crds, m) + default: + others = append(others, m) } } - return nil + + out := []lib.Manifest{} + out = append(out, crds...) + out = append(out, others...) + return out +} + +func updatedesired(desired v1.Update) bool { + return desired.Payload != "" && + desired.Version != "" } func (optr *Operator) syncCVOCRDs() error { diff --git a/vendor/k8s.io/apimachinery/pkg/util/rand/rand.go b/vendor/k8s.io/apimachinery/pkg/util/rand/rand.go new file mode 100644 index 0000000000..9421edae86 --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/rand/rand.go @@ -0,0 +1,120 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package rand provides utilities related to randomization. +package rand + +import ( + "math/rand" + "sync" + "time" +) + +var rng = struct { + sync.Mutex + rand *rand.Rand +}{ + rand: rand.New(rand.NewSource(time.Now().UTC().UnixNano())), +} + +// Intn generates an integer in range [0,max). +// By design this should panic if input is invalid, <= 0. +func Intn(max int) int { + rng.Lock() + defer rng.Unlock() + return rng.rand.Intn(max) +} + +// IntnRange generates an integer in range [min,max). +// By design this should panic if input is invalid, <= 0. +func IntnRange(min, max int) int { + rng.Lock() + defer rng.Unlock() + return rng.rand.Intn(max-min) + min +} + +// IntnRange generates an int64 integer in range [min,max). +// By design this should panic if input is invalid, <= 0. +func Int63nRange(min, max int64) int64 { + rng.Lock() + defer rng.Unlock() + return rng.rand.Int63n(max-min) + min +} + +// Seed seeds the rng with the provided seed. +func Seed(seed int64) { + rng.Lock() + defer rng.Unlock() + + rng.rand = rand.New(rand.NewSource(seed)) +} + +// Perm returns, as a slice of n ints, a pseudo-random permutation of the integers [0,n) +// from the default Source. +func Perm(n int) []int { + rng.Lock() + defer rng.Unlock() + return rng.rand.Perm(n) +} + +const ( + // We omit vowels from the set of available characters to reduce the chances + // of "bad words" being formed. + alphanums = "bcdfghjklmnpqrstvwxz2456789" + // No. of bits required to index into alphanums string. + alphanumsIdxBits = 5 + // Mask used to extract last alphanumsIdxBits of an int. + alphanumsIdxMask = 1<>= alphanumsIdxBits + remaining-- + } + return string(b) +} + +// SafeEncodeString encodes s using the same characters as rand.String. This reduces the chances of bad words and +// ensures that strings generated from hash functions appear consistent throughout the API. +func SafeEncodeString(s string) string { + r := make([]byte, len(s)) + for i, b := range []rune(s) { + r[i] = alphanums[(int(b) % len(alphanums))] + } + return string(r) +}