Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Refactor to make it easy to extend new kinds #865

Merged
merged 2 commits into from
Oct 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 0 additions & 22 deletions pkg/common/v1alpha3/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,8 @@ import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)

func GetSupportedJobList() []schema.GroupVersionKind {
supportedJobList := []schema.GroupVersionKind{
{
Group: "batch",
Version: "v1",
Kind: "Job",
},
{
Group: "kubeflow.org",
Version: "v1",
Kind: "TFJob",
},
{
Group: "kubeflow.org",
Version: "v1",
Kind: "PyTorchJob",
},
}
return supportedJobList
}

func ConvertTime2RFC3339(t *metav1.Time) string {
if t != nil {
return t.UTC().Format(time.RFC3339)
Expand Down
40 changes: 35 additions & 5 deletions pkg/controller.v1alpha3/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,59 @@ package consts
import "github.com/kubeflow/katib/pkg/util/v1alpha3/env"

const (
// ConfigExperimentSuggestionName is the config name of the
// suggestion client implementation in experiment controller.
ConfigExperimentSuggestionName = "experiment-suggestion-name"

// LabelExperimentName is the label of experiment name.
LabelExperimentName = "experiment"
// LabelSuggestionName is the label of suggestion name.
LabelSuggestionName = "suggestion"

// ContainerSuggestion is the container name in Suggestion.
ContainerSuggestion = "suggestion"

DefaultSuggestionPort = 6789
// DefaultSuggestionPort is the default port of suggestion service.
DefaultSuggestionPort = 6789
// DefaultSuggestionPortName is the default port name of suggestion service.
DefaultSuggestionPortName = "katib-api"
DefaultGRPCService = "manager.v1alpha3.Suggestion"
// DefaultGRPCService is the default service name in Suggestion,
// which is used to run healthz check using grpc probe.
DefaultGRPCService = "manager.v1alpha3.Suggestion"

// Default env name of katib namespace
// DefaultKatibNamespaceEnvName is the default env name of katib namespace
DefaultKatibNamespaceEnvName = "KATIB_CORE_NAMESPACE"

// Katib config map constants
// KatibConfigMapName is the config map constants
// Configmap name which includes Katib's configuration
KatibConfigMapName = "katib-config"

// LabelSuggestionTag is the name of suggestion config in configmap.
LabelSuggestionTag = "suggestion"

// LabelSuggestionImageTag is the name of suggestion image config in configmap.
LabelSuggestionImageTag = "image"

// ReconcileErrorReason is the reason when there is a reconcile error.
ReconcileErrorReason = "ReconcileError"

// JobKindJob is the kind of the Kubernetes Job.
JobKindJob = "Job"
// JobKindTF is the kind of TFJob.
JobKindTF = "TFJob"
// JobKindPyTorch is the kind of PyTorchJob.
JobKindPyTorch = "PyTorchJob"

// JobVersionJob is the api version of Kubernetes Job.
JobVersionJob = "v1"
// JobVersionTF is the api version of TFJob.
JobVersionTF = "v1"
// JobVersionPyTorch is the api version of PyTorchJob.
JobVersionPyTorch = "v1"

// JobGroupJob is the group name of Kubernetes Job.
JobGroupJob = "batch"
// JobGroupKubeflow is the group name of Kubeflow.
JobGroupKubeflow = "kubeflow.org"
)

var (
Expand Down
15 changes: 11 additions & 4 deletions pkg/controller.v1alpha3/trial/trial_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

trialsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1alpha3"
commonv1alpha3 "github.com/kubeflow/katib/pkg/common/v1alpha3"
"github.com/kubeflow/katib/pkg/controller.v1alpha3/trial/managerclient"
jobv1alpha3 "github.com/kubeflow/katib/pkg/job/v1alpha3"
)

const (
// ControllerName is the controller name.
ControllerName = "trial-controller"
)

Expand Down Expand Up @@ -107,7 +108,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

for _, gvk := range commonv1alpha3.GetSupportedJobList() {
for _, gvk := range jobv1alpha3.GetSupportedJobList() {
unstructuredJob := &unstructured.Unstructured{}
unstructuredJob.SetGroupVersionKind(gvk)
err = c.Watch(
Expand Down Expand Up @@ -218,9 +219,15 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1alpha3.Trial) error {
// Job already exists
// TODO Can desired Spec differ from deployedSpec?
if deployedJob != nil {
jobCondition, err := r.GetDeployedJobStatus(deployedJob)
kind := deployedJob.GetKind()
jobProvider, err := jobv1alpha3.New(kind)
if err != nil {
logger.Error(err, "Get deployed status error")
logger.Error(err, "Failed to create the provider")
return err
}
jobCondition, err := jobProvider.GetDeployedJobStatus(deployedJob)
if err != nil {
logger.Error(err, "Get deployed status error")
return err
}

Expand Down
52 changes: 0 additions & 52 deletions pkg/controller.v1alpha3/trial/trial_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import (
"fmt"
"strconv"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

commonv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/common/v1alpha3"
Expand All @@ -37,56 +35,6 @@ const (
cleanMetricsFinalizer = "clean-metrics-in-db"
)

func (r *ReconcileTrial) GetDeployedJobStatus(deployedJob *unstructured.Unstructured) (*commonv1.JobCondition, error) {
jobCondition := commonv1.JobCondition{}
jobCondition.Type = commonv1.JobRunning
kind := deployedJob.GetKind()
status, ok, unerr := unstructured.NestedFieldCopy(deployedJob.Object, "status")

if ok {
statusMap := status.(map[string]interface{})
switch kind {

case DefaultJobKind:
jobStatus := batchv1.JobStatus{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus)
if err != nil {
log.Error(err, "Convert unstructured to status error")
return nil, err
}
for _, cond := range jobStatus.Conditions {
if cond.Type == batchv1.JobComplete && cond.Status == corev1.ConditionTrue {
jobCondition.Type = commonv1.JobSucceeded
// JobConditions message not populated when succeeded for batchv1 Job
break
}
if cond.Type == batchv1.JobFailed && cond.Status == corev1.ConditionTrue {
jobCondition.Type = commonv1.JobFailed
jobCondition.Message = cond.Message
break
}
}
default:
jobStatus := commonv1.JobStatus{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus)

if err != nil {
log.Error(err, "Convert unstructured to status error")
return nil, err
}
if len(jobStatus.Conditions) > 0 {
lc := jobStatus.Conditions[len(jobStatus.Conditions)-1]
jobCondition.Type = lc.Type
jobCondition.Message = lc.Message
}
}
} else if unerr != nil {
log.Error(unerr, "NestedFieldCopy unstructured to status error")
return nil, unerr
}
return &jobCondition, nil
}

func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1alpha3.Trial, deployedJob *unstructured.Unstructured, jobCondition *commonv1.JobCondition) {
now := metav1.Now()
jobConditionType := (*jobCondition).Type
Expand Down
48 changes: 48 additions & 0 deletions pkg/job/v1alpha3/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package v1alpha3

import (
"github.com/kubeflow/katib/pkg/controller.v1alpha3/consts"
"k8s.io/apimachinery/pkg/runtime/schema"
)

const (
// JobNameLabel represents the label key for the job name, the value is job name
JobNameLabel = "job-name"
// JobRoleLabel represents the label key for the job role, e.g. the value is master
JobRoleLabel = "job-role"
// TFJobRoleLabel is deprecated in kubeflow 0.7, but we need to be compatible.
TFJobRoleLabel = "tf-job-role"
// PyTorchJobRoleLabel is deprecated in kubeflow 0.7, but we need to be compatible.
PyTorchJobRoleLabel = "pytorch-job-role"
)

// JobRoleMap is the map which is used to determin if the replica is master.
// Katib will inject metrics collector into master replica.
var JobRoleMap = map[string][]string{
// Job kind does not support distributed training, thus no master.
consts.JobKindJob: {},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaocegege Just wondering why don't we keep all job related constants in this file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot do it since it will cause import cycle. If we defined consts in job/v1alpha3, job/v1alpha3 uses job/v1alph3/job, job/v1alph3/job uses consts defined in job/v1alpha3.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it work for you? @johnugeorge

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that we have defined too many constants :)

consts.JobKindTF: {JobRoleLabel, TFJobRoleLabel},
consts.JobKindPyTorch: {JobRoleLabel, PyTorchJobRoleLabel},
}

// GetSupportedJobList returns the list of the supported jobs' GVK.
func GetSupportedJobList() []schema.GroupVersionKind {
supportedJobList := []schema.GroupVersionKind{
{
Group: consts.JobGroupJob,
Version: consts.JobVersionJob,
Kind: consts.JobKindJob,
},
{
Group: consts.JobGroupKubeflow,
Version: consts.JobVersionTF,
Kind: consts.JobKindTF,
},
{
Group: consts.JobGroupKubeflow,
Version: consts.JobVersionPyTorch,
Kind: consts.JobKindPyTorch,
},
}
return supportedJobList
}
68 changes: 68 additions & 0 deletions pkg/job/v1alpha3/job/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package job

import (
"fmt"

commonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
)

var (
log = logf.Log.WithName("provider-job")
)

// Job is the provider of Job kind.
type Job struct{}

// GetDeployedJobStatus get the deployed job status.
func (j Job) GetDeployedJobStatus(
deployedJob *unstructured.Unstructured) (*commonv1.JobCondition, error) {
jobCondition := commonv1.JobCondition{}
// Set default type to running.
jobCondition.Type = commonv1.JobRunning
status, ok, unerr := unstructured.NestedFieldCopy(deployedJob.Object, "status")
if !ok {
if unerr != nil {
log.Error(unerr, "NestedFieldCopy unstructured to status error")
return nil, unerr
}
err := fmt.Errorf("value is missing")
log.Error(err, "NestedFieldCopy unstructured to status error")
return nil, err
}

statusMap := status.(map[string]interface{})
jobStatus := batchv1.JobStatus{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus)
if err != nil {
log.Error(err, "Convert unstructured to status error")
return nil, err
}
for _, cond := range jobStatus.Conditions {
if cond.Type == batchv1.JobComplete && cond.Status == corev1.ConditionTrue {
jobCondition.Type = commonv1.JobSucceeded
// JobConditions message not populated when succeeded for batchv1 Job
break
}
if cond.Type == batchv1.JobFailed && cond.Status == corev1.ConditionTrue {
jobCondition.Type = commonv1.JobFailed
jobCondition.Message = cond.Message
break
}
}
return &jobCondition, nil
}

// IsTrainingContainer returns if the c is the actual training container.
func (j Job) IsTrainingContainer(index int, c corev1.Container) bool {
if index == 0 {
// for Job worker, the first container will be taken as worker container,
// katib document should note it
return true
}
return false
}
Loading