Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ var (

startOpts struct {
kubeconfig string
nodeName string
}
)

func init() {
rootCmd.AddCommand(startCmd)
startCmd.PersistentFlags().StringVar(&startOpts.kubeconfig, "kubeconfig", "", "Kubeconfig file to access a remote cluster (testing only)")
startCmd.PersistentFlags().StringVar(&startOpts.nodeName, "node-name", "", "kubernetes node name CVO is scheduled on.")
}

func runStartCmd(cmd *cobra.Command, args []string) {
Expand All @@ -60,6 +62,14 @@ func runStartCmd(cmd *cobra.Command, args []string) {
// To help debugging, immediately log version
glog.Infof("%s", version.String)

if startOpts.nodeName == "" {
name, ok := os.LookupEnv("NODE_NAME")
if !ok || name == "" {
glog.Fatalf("node-name is required")
}
startOpts.nodeName = name
}

cb, err := newClientBuilder(startOpts.kubeconfig)
if err != nil {
glog.Fatalf("error creating clients: %v", err)
Expand Down Expand Up @@ -137,6 +147,11 @@ type clientBuilder struct {
config *rest.Config
}

func (cb *clientBuilder) RestConfig() *rest.Config {
c := rest.CopyConfig(cb.config)
return c
}

func (cb *clientBuilder) ClientOrDie(name string) clientset.Interface {
return clientset.NewForConfigOrDie(rest.AddUserAgent(cb.config, name))
}
Expand Down Expand Up @@ -205,11 +220,13 @@ func createControllerContext(cb *clientBuilder, stop <-chan struct{}) *controlle

func startControllers(ctx *controllerContext) error {
go cvo.New(
startOpts.nodeName,
componentNamespace, componentName,
ctx.InformerFactory.Clusterversion().V1().CVOConfigs(),
ctx.InformerFactory.Clusterversion().V1().OperatorStatuses(),
ctx.APIExtInformerFactory.Apiextensions().V1beta1().CustomResourceDefinitions(),
ctx.KubeInformerFactory.Apps().V1().Deployments(),
ctx.ClientBuilder.RestConfig(),
ctx.ClientBuilder.ClientOrDie(componentName),
ctx.ClientBuilder.KubeClientOrDie(componentName),
ctx.ClientBuilder.APIExtClientOrDie(componentName),
Expand Down
198 changes: 198 additions & 0 deletions lib/manifest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package lib

import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/kubernetes/scheme"
)

// Manifest stores Kubernetes object in Raw from a file.
// It stores the GroupVersionKind for the manifest.
type Manifest struct {
Raw []byte
GVK schema.GroupVersionKind

obj *unstructured.Unstructured
}

// UnmarshalJSON unmarshals bytes of single kubernetes object to Manifest.
func (m *Manifest) UnmarshalJSON(in []byte) error {
if m == nil {
return errors.New("Manifest: UnmarshalJSON on nil pointer")
}

// This happens when marshalling
// <yaml>
// --- (this between two `---`)
// ---
// <yaml>
if bytes.Equal(in, []byte("null")) {
m.Raw = nil
return nil
}

m.Raw = append(m.Raw[0:0], in...)
udi, _, err := scheme.Codecs.UniversalDecoder().Decode(in, nil, &unstructured.Unstructured{})
if err != nil {
return fmt.Errorf("unable to decode manifest: %v", err)
}
ud, ok := udi.(*unstructured.Unstructured)
if !ok {
return fmt.Errorf("expected manifest to decode into *unstructured.Unstructured, got %T", ud)
}

m.GVK = ud.GroupVersionKind()
m.obj = ud.DeepCopy()
return nil
}

// Object returns underlying metav1.Object
func (m *Manifest) Object() metav1.Object { return m.obj }

const (
// rootDirKey is used as key for the manifest files in root dir
// passed to LoadManifests
// It is set to `000` to give it more priority if the actor sorts
// based on keys.
rootDirKey = "000"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively we could just force the root dir to be a real dir and ignore files in the root (treat as not manifests). The only content we had in there in the payload right now was the image mapping and the Cincinnati file.

Copy link
Contributor Author

@abhinavdahiya abhinavdahiya Aug 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the update payload is

/cincinnati.json
/images.json
/manifests/
   ....
   ....

The root here would mean manifests dir.

making manifests in /manifests (or root in code) gives us a way to add things like job-migrations with highest priority above any operators.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on keeping the manifests dir, seems pretty clean to me and not much overhead.

)

// LoadManifests loads manifest from disk.
//
// root/
// manifest0
// manifest1
// 00_subdir0/
// manifest0
// manifest1
// 01_subdir1/
// manifest0
// manifest1
// LoadManifests(<abs path to>/root):
// returns map
// 000: [manifest0, manifest1]
// 00_subdir0: [manifest0, manifest1]
// 01_subdir1: [manifest0, manifest1]
//
// It skips dirs that have not files.
// It only reads dir `p` and its direct subdirs.
func LoadManifests(p string) (map[string][]Manifest, error) {
var out = make(map[string][]Manifest)

fs, err := ioutil.ReadDir(p)
if err != nil {
return nil, err
}

// We want to accumulate all the errors, not returning at the
// first error encountered when reading subdirs.
var errs []error

// load manifest files in p to rootDirKey
ms, err := loadManifestsFromDir(p)
if err != nil {
errs = append(errs, fmt.Errorf("error loading from dir %s: %v", p, err))
}
if len(ms) > 0 {
out[rootDirKey] = ms
}

// load manifests from subdirs to subdir-name
for _, f := range fs {
if !f.IsDir() {
continue
}
path := filepath.Join(p, f.Name())
ms, err := loadManifestsFromDir(path)
if err != nil {
errs = append(errs, fmt.Errorf("error loading from dir %s: %v", path, err))
continue
}
if len(ms) > 0 {
out[f.Name()] = ms
}
}

agg := utilerrors.NewAggregate(errs)
if agg != nil {
return nil, errors.New(agg.Error())
}
return out, nil
}

// loadManifestsFromDir only returns files. not subdirs are traversed.
// returns manifests in increasing order of their filename.
func loadManifestsFromDir(dir string) ([]Manifest, error) {
var manifests []Manifest
fs, err := ioutil.ReadDir(dir)
if err != nil {
return nil, err
}

// ensure sorted.
sort.Slice(fs, func(i, j int) bool {
return fs[i].Name() < fs[j].Name()
})

var errs []error
for _, f := range fs {
if f.IsDir() {
continue
}

path := filepath.Join(dir, f.Name())
file, err := os.Open(path)
if err != nil {
errs = append(errs, fmt.Errorf("error opening %s: %v", path, err))
continue
}
defer file.Close()

ms, err := parseManifests(file)
if err != nil {
errs = append(errs, fmt.Errorf("error parsing %s: %v", path, err))
continue
}
manifests = append(manifests, ms...)
}

agg := utilerrors.NewAggregate(errs)
if agg != nil {
return nil, fmt.Errorf("error loading manifests from %q: %v", dir, agg.Error())
}

return manifests, nil
}

// parseManifests parses a YAML or JSON document that may contain one or more
// kubernetes resources.
func parseManifests(r io.Reader) ([]Manifest, error) {
d := yaml.NewYAMLOrJSONDecoder(r, 1024)
var manifests []Manifest
for {
m := Manifest{}
if err := d.Decode(&m); err != nil {
if err == io.EOF {
return manifests, nil
}
return manifests, fmt.Errorf("error parsing: %v", err)
}
m.Raw = bytes.TrimSpace(m.Raw)
if len(m.Raw) == 0 || bytes.Equal(m.Raw, []byte("null")) {
continue
}
manifests = append(manifests, m)
}
}
Loading