Skip to content

Commit

Permalink
feat: Refactor to make it easy to extend new kinds (#865)
Browse files Browse the repository at this point in the history
* feat: Refactor to make it easy to extend new kinds

Signed-off-by: Ce Gao <[email protected]>

* fix: Remove hard coded name

Signed-off-by: Ce Gao <[email protected]>
  • Loading branch information
gaocegege authored and k8s-ci-robot committed Oct 12, 2019
1 parent fb6739c commit 198a63a
Show file tree
Hide file tree
Showing 12 changed files with 303 additions and 137 deletions.
22 changes: 0 additions & 22 deletions pkg/common/v1alpha3/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,8 @@ import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)

func GetSupportedJobList() []schema.GroupVersionKind {
supportedJobList := []schema.GroupVersionKind{
{
Group: "batch",
Version: "v1",
Kind: "Job",
},
{
Group: "kubeflow.org",
Version: "v1",
Kind: "TFJob",
},
{
Group: "kubeflow.org",
Version: "v1",
Kind: "PyTorchJob",
},
}
return supportedJobList
}

func ConvertTime2RFC3339(t *metav1.Time) string {
if t != nil {
return t.UTC().Format(time.RFC3339)
Expand Down
40 changes: 35 additions & 5 deletions pkg/controller.v1alpha3/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,59 @@ package consts
import "github.com/kubeflow/katib/pkg/util/v1alpha3/env"

const (
// ConfigExperimentSuggestionName is the config name of the
// suggestion client implementation in experiment controller.
ConfigExperimentSuggestionName = "experiment-suggestion-name"

// LabelExperimentName is the label of experiment name.
LabelExperimentName = "experiment"
// LabelSuggestionName is the label of suggestion name.
LabelSuggestionName = "suggestion"

// ContainerSuggestion is the container name in Suggestion.
ContainerSuggestion = "suggestion"

DefaultSuggestionPort = 6789
// DefaultSuggestionPort is the default port of suggestion service.
DefaultSuggestionPort = 6789
// DefaultSuggestionPortName is the default port name of suggestion service.
DefaultSuggestionPortName = "katib-api"
DefaultGRPCService = "manager.v1alpha3.Suggestion"
// DefaultGRPCService is the default service name in Suggestion,
// which is used to run healthz check using grpc probe.
DefaultGRPCService = "manager.v1alpha3.Suggestion"

// Default env name of katib namespace
// DefaultKatibNamespaceEnvName is the default env name of katib namespace
DefaultKatibNamespaceEnvName = "KATIB_CORE_NAMESPACE"

// Katib config map constants
// KatibConfigMapName is the config map constants
// Configmap name which includes Katib's configuration
KatibConfigMapName = "katib-config"

// LabelSuggestionTag is the name of suggestion config in configmap.
LabelSuggestionTag = "suggestion"

// LabelSuggestionImageTag is the name of suggestion image config in configmap.
LabelSuggestionImageTag = "image"

// ReconcileErrorReason is the reason when there is a reconcile error.
ReconcileErrorReason = "ReconcileError"

// JobKindJob is the kind of the Kubernetes Job.
JobKindJob = "Job"
// JobKindTF is the kind of TFJob.
JobKindTF = "TFJob"
// JobKindPyTorch is the kind of PyTorchJob.
JobKindPyTorch = "PyTorchJob"

// JobVersionJob is the api version of Kubernetes Job.
JobVersionJob = "v1"
// JobVersionTF is the api version of TFJob.
JobVersionTF = "v1"
// JobVersionPyTorch is the api version of PyTorchJob.
JobVersionPyTorch = "v1"

// JobGroupJob is the group name of Kubernetes Job.
JobGroupJob = "batch"
// JobGroupKubeflow is the group name of Kubeflow.
JobGroupKubeflow = "kubeflow.org"
)

var (
Expand Down
15 changes: 11 additions & 4 deletions pkg/controller.v1alpha3/trial/trial_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

trialsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1alpha3"
commonv1alpha3 "github.com/kubeflow/katib/pkg/common/v1alpha3"
"github.com/kubeflow/katib/pkg/controller.v1alpha3/trial/managerclient"
jobv1alpha3 "github.com/kubeflow/katib/pkg/job/v1alpha3"
)

const (
// ControllerName is the controller name.
ControllerName = "trial-controller"
)

Expand Down Expand Up @@ -107,7 +108,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

for _, gvk := range commonv1alpha3.GetSupportedJobList() {
for _, gvk := range jobv1alpha3.GetSupportedJobList() {
unstructuredJob := &unstructured.Unstructured{}
unstructuredJob.SetGroupVersionKind(gvk)
err = c.Watch(
Expand Down Expand Up @@ -218,9 +219,15 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1alpha3.Trial) error {
// Job already exists
// TODO Can desired Spec differ from deployedSpec?
if deployedJob != nil {
jobCondition, err := r.GetDeployedJobStatus(deployedJob)
kind := deployedJob.GetKind()
jobProvider, err := jobv1alpha3.New(kind)
if err != nil {
logger.Error(err, "Get deployed status error")
logger.Error(err, "Failed to create the provider")
return err
}
jobCondition, err := jobProvider.GetDeployedJobStatus(deployedJob)
if err != nil {
logger.Error(err, "Get deployed status error")
return err
}

Expand Down
52 changes: 0 additions & 52 deletions pkg/controller.v1alpha3/trial/trial_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import (
"fmt"
"strconv"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

commonv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/common/v1alpha3"
Expand All @@ -37,56 +35,6 @@ const (
cleanMetricsFinalizer = "clean-metrics-in-db"
)

func (r *ReconcileTrial) GetDeployedJobStatus(deployedJob *unstructured.Unstructured) (*commonv1.JobCondition, error) {
jobCondition := commonv1.JobCondition{}
jobCondition.Type = commonv1.JobRunning
kind := deployedJob.GetKind()
status, ok, unerr := unstructured.NestedFieldCopy(deployedJob.Object, "status")

if ok {
statusMap := status.(map[string]interface{})
switch kind {

case DefaultJobKind:
jobStatus := batchv1.JobStatus{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus)
if err != nil {
log.Error(err, "Convert unstructured to status error")
return nil, err
}
for _, cond := range jobStatus.Conditions {
if cond.Type == batchv1.JobComplete && cond.Status == corev1.ConditionTrue {
jobCondition.Type = commonv1.JobSucceeded
// JobConditions message not populated when succeeded for batchv1 Job
break
}
if cond.Type == batchv1.JobFailed && cond.Status == corev1.ConditionTrue {
jobCondition.Type = commonv1.JobFailed
jobCondition.Message = cond.Message
break
}
}
default:
jobStatus := commonv1.JobStatus{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus)

if err != nil {
log.Error(err, "Convert unstructured to status error")
return nil, err
}
if len(jobStatus.Conditions) > 0 {
lc := jobStatus.Conditions[len(jobStatus.Conditions)-1]
jobCondition.Type = lc.Type
jobCondition.Message = lc.Message
}
}
} else if unerr != nil {
log.Error(unerr, "NestedFieldCopy unstructured to status error")
return nil, unerr
}
return &jobCondition, nil
}

func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1alpha3.Trial, deployedJob *unstructured.Unstructured, jobCondition *commonv1.JobCondition) {
now := metav1.Now()
jobConditionType := (*jobCondition).Type
Expand Down
48 changes: 48 additions & 0 deletions pkg/job/v1alpha3/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package v1alpha3

import (
"github.com/kubeflow/katib/pkg/controller.v1alpha3/consts"
"k8s.io/apimachinery/pkg/runtime/schema"
)

const (
// JobNameLabel represents the label key for the job name, the value is job name
JobNameLabel = "job-name"
// JobRoleLabel represents the label key for the job role, e.g. the value is master
JobRoleLabel = "job-role"
// TFJobRoleLabel is deprecated in kubeflow 0.7, but we need to be compatible.
TFJobRoleLabel = "tf-job-role"
// PyTorchJobRoleLabel is deprecated in kubeflow 0.7, but we need to be compatible.
PyTorchJobRoleLabel = "pytorch-job-role"
)

// JobRoleMap is the map which is used to determin if the replica is master.
// Katib will inject metrics collector into master replica.
var JobRoleMap = map[string][]string{
// Job kind does not support distributed training, thus no master.
consts.JobKindJob: {},
consts.JobKindTF: {JobRoleLabel, TFJobRoleLabel},
consts.JobKindPyTorch: {JobRoleLabel, PyTorchJobRoleLabel},
}

// GetSupportedJobList returns the list of the supported jobs' GVK.
func GetSupportedJobList() []schema.GroupVersionKind {
supportedJobList := []schema.GroupVersionKind{
{
Group: consts.JobGroupJob,
Version: consts.JobVersionJob,
Kind: consts.JobKindJob,
},
{
Group: consts.JobGroupKubeflow,
Version: consts.JobVersionTF,
Kind: consts.JobKindTF,
},
{
Group: consts.JobGroupKubeflow,
Version: consts.JobVersionPyTorch,
Kind: consts.JobKindPyTorch,
},
}
return supportedJobList
}
68 changes: 68 additions & 0 deletions pkg/job/v1alpha3/job/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package job

import (
"fmt"

commonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
)

var (
log = logf.Log.WithName("provider-job")
)

// Job is the provider of Job kind.
type Job struct{}

// GetDeployedJobStatus get the deployed job status.
func (j Job) GetDeployedJobStatus(
deployedJob *unstructured.Unstructured) (*commonv1.JobCondition, error) {
jobCondition := commonv1.JobCondition{}
// Set default type to running.
jobCondition.Type = commonv1.JobRunning
status, ok, unerr := unstructured.NestedFieldCopy(deployedJob.Object, "status")
if !ok {
if unerr != nil {
log.Error(unerr, "NestedFieldCopy unstructured to status error")
return nil, unerr
}
err := fmt.Errorf("value is missing")
log.Error(err, "NestedFieldCopy unstructured to status error")
return nil, err
}

statusMap := status.(map[string]interface{})
jobStatus := batchv1.JobStatus{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus)
if err != nil {
log.Error(err, "Convert unstructured to status error")
return nil, err
}
for _, cond := range jobStatus.Conditions {
if cond.Type == batchv1.JobComplete && cond.Status == corev1.ConditionTrue {
jobCondition.Type = commonv1.JobSucceeded
// JobConditions message not populated when succeeded for batchv1 Job
break
}
if cond.Type == batchv1.JobFailed && cond.Status == corev1.ConditionTrue {
jobCondition.Type = commonv1.JobFailed
jobCondition.Message = cond.Message
break
}
}
return &jobCondition, nil
}

// IsTrainingContainer returns if the c is the actual training container.
func (j Job) IsTrainingContainer(index int, c corev1.Container) bool {
if index == 0 {
// for Job worker, the first container will be taken as worker container,
// katib document should note it
return true
}
return false
}
Loading

0 comments on commit 198a63a

Please sign in to comment.