Skip to content
This repository has been archived by the owner on Feb 1, 2022. It is now read-only.

implementation of mxnet operator API v1 #39

Merged
merged 3 commits into from
May 28, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ install:
script:
- hack/verify-codegen.sh
- go build -o mxnet-operator.v1beta1 github.com/kubeflow/mxnet-operator/cmd/mxnet-operator.v1beta1
- go build -o mxnet-operator.v1 github.com/kubeflow/mxnet-operator/cmd/mxnet-operator.v1
- gometalinter --config=linter_config.json --vendor ./...
# We customize the build step because by default
# Travis runs go test -v ./... which will include the vendor
Expand Down
5 changes: 4 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ FROM golang:1.8.2

RUN mkdir -p /opt/kubeflow
COPY mxnet-operator.v1beta1 /opt/kubeflow
COPY mxnet-operator.v1 /opt/kubeflow

RUN chmod a+x /opt/kubeflow/mxnet-operator.v1beta1
RUN chmod a+x /opt/kubeflow/mxnet-operator.v1

CMD ["/opt/kubeflow/mxnet-operator.v1beta1", "--alsologtostderr", "-v=1"]
CMD ["/opt/kubeflow/mxnet-operator.v1", "--alsologtostderr", "-v=1"]
58 changes: 58 additions & 0 deletions cmd/mxnet-operator.v1/app/options/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2018 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package options

import (
"flag"

v1 "k8s.io/api/core/v1"
)

// ServerOption is the main context object for the controller manager.
type ServerOption struct {
Kubeconfig string
MasterURL string
Threadiness int
PrintVersion bool
JSONLogFormat bool
EnableGangScheduling bool
Namespace string
}

// NewServerOption creates a new CMServer with a default config.
func NewServerOption() *ServerOption {
s := ServerOption{}
return &s
}

// AddFlags adds flags for a specific CMServer to the specified FlagSet.
func (s *ServerOption) AddFlags(fs *flag.FlagSet) {
fs.StringVar(&s.MasterURL, "master", "",
`The url of the Kubernetes API server,
will overrides any value in kubeconfig, only required if out-of-cluster.`)

fs.StringVar(&s.Namespace, "namespace", v1.NamespaceAll,
`The namespace to monitor mxjobs. If unset, it monitors all namespaces cluster-wide.
If set, it only monitors mxjobs in the given namespace.`)

fs.IntVar(&s.Threadiness, "threadiness", 1,
`How many threads to process the main logic`)

fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")

fs.BoolVar(&s.JSONLogFormat, "json-log-format", true,
"Set true to use json style log format. Set false to use plaintext style log format")
fs.BoolVar(&s.EnableGangScheduling, "enable-gang-scheduling", false, "Set true to enable gang scheduling by kube-arbitrator.")
}
208 changes: 208 additions & 0 deletions cmd/mxnet-operator.v1/app/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Copyright 2018 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"fmt"
"os"
"time"

log "github.com/sirupsen/logrus"

"k8s.io/api/core/v1"
crdclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
restclientset "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
election "k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"

"github.com/kubeflow/mxnet-operator/cmd/mxnet-operator.v1/app/options"
mxnetv1 "github.com/kubeflow/mxnet-operator/pkg/apis/mxnet/v1"
mxjobclientset "github.com/kubeflow/mxnet-operator/pkg/client/clientset/versioned"
"github.com/kubeflow/mxnet-operator/pkg/client/clientset/versioned/scheme"
mxjobinformers "github.com/kubeflow/mxnet-operator/pkg/client/informers/externalversions"
controller "github.com/kubeflow/mxnet-operator/pkg/controller.v1/mxnet"
"github.com/kubeflow/mxnet-operator/pkg/version"
"github.com/kubeflow/tf-operator/pkg/util/signals"
kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
)

const (
apiVersion = "v1"
)

var (
// leader election config
leaseDuration = 15 * time.Second
renewDuration = 5 * time.Second
retryPeriod = 3 * time.Second
resyncPeriod = 30 * time.Second
)

const RecommendedKubeConfigPathEnv = "KUBECONFIG"

func Run(opt *options.ServerOption) error {
// Check if the -version flag was passed and, if so, print the version and exit.
if opt.PrintVersion {
version.PrintVersionAndExit(apiVersion)
}

namespace := os.Getenv(mxnetv1.EnvKubeflowNamespace)
if len(namespace) == 0 {
log.Infof("EnvKubeflowNamespace not set, use default namespace")
namespace = metav1.NamespaceDefault
}
if opt.Namespace == v1.NamespaceAll {
log.Info("Using cluster scoped operator")
} else {
log.Infof("Scoping operator to namespace %s", opt.Namespace)
}

// To help debugging, immediately log version.
log.Infof("%+v", version.Info(apiVersion))

// Set up signals so we handle the first shutdown signal gracefully.
stopCh := signals.SetupSignalHandler()

log.Infof("RecommendedKubeConfigPathEnv : %+v", RecommendedKubeConfigPathEnv)
log.Infof("KUBECONFIG : %+v", os.Getenv("KUBECONFIG"))

// Note: ENV KUBECONFIG will overwrite user defined Kubeconfig option.
if len(os.Getenv(RecommendedKubeConfigPathEnv)) > 0 {
// use the current context in kubeconfig
// This is very useful for running locally.
opt.Kubeconfig = os.Getenv(RecommendedKubeConfigPathEnv)
}

// Get kubernetes config.
kcfg, err := clientcmd.BuildConfigFromFlags(opt.MasterURL, opt.Kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %s", err.Error())
}

// Create clients.
kubeClientSet, leaderElectionClientSet, mxJobClientSet, kubeBatchClientSet, err := createClientSets(kcfg)
if err != nil {
return err
}

// Create informer factory.
kubeInformerFactory := kubeinformers.NewFilteredSharedInformerFactory(kubeClientSet, resyncPeriod, opt.Namespace, nil)
mxJobInformerFactory := mxjobinformers.NewSharedInformerFactory(mxJobClientSet, resyncPeriod)

unstructuredInformer := controller.NewUnstructuredMXJobInformer(kcfg, opt.Namespace)

// Create mx controller.
tc := controller.NewMXController(unstructuredInformer, kubeClientSet, mxJobClientSet, kubeBatchClientSet, kubeInformerFactory, mxJobInformerFactory, *opt)

// Start informer goroutines.
go kubeInformerFactory.Start(stopCh)

// We do not use the generated informer because of
// go mxJobInformerFactory.Start(stopCh)
go unstructuredInformer.Informer().Run(stopCh)

// Set leader election start function.
run := func(<-chan struct{}) {
if err := tc.Run(opt.Threadiness, stopCh); err != nil {
log.Errorf("Failed to run the controller: %v", err)
}
}

id, err := os.Hostname()
if err != nil {
return fmt.Errorf("failed to get hostname: %v", err)
}

// Prepare event clients.
eventBroadcaster := record.NewBroadcaster()
if err = v1.AddToScheme(scheme.Scheme); err != nil {
return fmt.Errorf("CoreV1 Add Scheme failed: %v", err)
}
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "mxnet-operator"})

rl := &resourcelock.EndpointsLock{
EndpointsMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: "mxnet-operator",
},
Client: leaderElectionClientSet.CoreV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
},
}

// Start leader election.
election.RunOrDie(election.LeaderElectionConfig{
Lock: rl,
LeaseDuration: leaseDuration,
RenewDeadline: renewDuration,
RetryPeriod: retryPeriod,
Callbacks: election.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
log.Fatalf("leader election lost")
},
},
})

return nil
}

func createClientSets(config *restclientset.Config) (kubeclientset.Interface, kubeclientset.Interface, mxjobclientset.Interface, kubebatchclient.Interface, error) {

crdClient, err := crdclient.NewForConfig(config)

if err != nil {
return nil, nil, nil, nil, err
}

checkCRDExists(crdClient, mxnetv1.MXCRD)

kubeClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "mxnet-operator"))
if err != nil {
return nil, nil, nil, nil, err
}

leaderElectionClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "leader-election"))
if err != nil {
return nil, nil, nil, nil, err
}

mxJobClientSet, err := mxjobclientset.NewForConfig(config)
if err != nil {
return nil, nil, nil, nil, err
}

kubeBatchClientSet, err := kubebatchclient.NewForConfig(restclientset.AddUserAgent(config, "kube-batch"))
if err != nil {
return nil, nil, nil, nil, err
}

return kubeClientSet, leaderElectionClientSet, mxJobClientSet, kubeBatchClientSet, nil
}

func checkCRDExists(clientset crdclient.Interface, crdName string) {
_, err := clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Get(crdName, metav1.GetOptions{})
gaocegege marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Error(err)
os.Exit(1)
}
}
49 changes: 49 additions & 0 deletions cmd/mxnet-operator.v1/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2018 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"flag"

"github.com/onrik/logrus/filename"
log "github.com/sirupsen/logrus"

"github.com/kubeflow/mxnet-operator/cmd/mxnet-operator.v1/app"
"github.com/kubeflow/mxnet-operator/cmd/mxnet-operator.v1/app/options"
)

func init() {
// Add filename as one of the fields of the structured log message.
filenameHook := filename.NewHook()
filenameHook.Field = "filename"
log.AddHook(filenameHook)
}

func main() {
s := options.NewServerOption()
s.AddFlags(flag.CommandLine)

flag.Parse()

if s.JSONLogFormat {
// Output logs in a json format so that it can be parsed by services like Stackdriver.
log.SetFormatter(&log.JSONFormatter{})
}

if err := app.Run(s); err != nil {
log.Fatalf("%v\n", err)
}

}
9 changes: 8 additions & 1 deletion hack/update-codegen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ CODEGEN_PKG=${CODEGEN_PKG:-$(cd ${SCRIPT_ROOT}; ls -d -1 ./vendor/k8s.io/code-ge
cd ${SCRIPT_ROOT}
${CODEGEN_PKG}/generate-groups.sh "defaulter,deepcopy,client,informer,lister" \
github.com/kubeflow/mxnet-operator/pkg/client github.com/kubeflow/mxnet-operator/pkg/apis \
mxnet:v1beta1 \
mxnet:v1beta1,v1 \
--go-header-file hack/boilerplate/boilerplate.go.txt

# Notice: The code in code-generator does not generate defaulter by default.
Expand All @@ -40,3 +40,10 @@ ${GOPATH}/bin/defaulter-gen --input-dirs github.com/kubeflow/mxnet-operator/pkg
-O zz_generated.defaults \
--go-header-file hack/boilerplate/boilerplate.go.txt \
--output-package github.com/kubeflow/mxnet-operator/pkg/apis/mxnet/v1beta1

# Notice: The code in code-generator does not generate defaulter by default.
echo "Generating defaulters for v1"
${GOPATH}/bin/defaulter-gen --input-dirs github.com/kubeflow/mxnet-operator/pkg/apis/mxnet/v1 \
-O zz_generated.defaults \
--go-header-file hack/boilerplate/boilerplate.go.txt \
--output-package github.com/kubeflow/mxnet-operator/pkg/apis/mxnet/v1
Loading