Skip to content

Commit

Permalink
Fix conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
andreyvelich committed Feb 21, 2019
2 parents 46221b2 + 5a1a791 commit d407be8
Show file tree
Hide file tree
Showing 10 changed files with 311 additions and 73 deletions.
7 changes: 7 additions & 0 deletions manifests/studyjobcontroller/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ rules:
resources:
- configmaps
- serviceaccounts
- services
verbs:
- "*"
- apiGroups:
Expand All @@ -32,6 +33,12 @@ rules:
verbs:
- create
- get
- apiGroups:
- admissionregistration.k8s.io
resources:
- validatingwebhookconfigurations
verbs:
- '*'
- apiGroups:
- kubeflow.org
resources:
Expand Down
12 changes: 12 additions & 0 deletions manifests/studyjobcontroller/service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: v1
kind: Service
metadata:
name: studyjob-controller
namespace: kubeflow
spec:
ports:
- port: 443
protocol: TCP
targetPort: 443
selector:
app: studyjob-controller
4 changes: 4 additions & 0 deletions manifests/studyjobcontroller/studyjobcontroller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ spec:
- name: studyjob-controller
image: katib/studyjob-controller
imagePullPolicy: Always
ports:
- containerPort: 443
name: validating
protocol: TCP
env:
- name: VIZIER_CORE_NAMESPACE
valueFrom:
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/studyjob/katib_api_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"google.golang.org/grpc"
)

func initializeStudy(instance *katibv1alpha1.StudyJob, ns string) error {
if validErr := validateStudy(instance, ns); validErr != nil {
func initializeStudy(instance *katibv1alpha1.StudyJob) error {
if validErr := validateStudy(instance); validErr != nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
return validErr
}
Expand Down
130 changes: 90 additions & 40 deletions pkg/controller/studyjob/studyjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1"
batchv1 "k8s.io/api/batch/v1"
batchv1beta "k8s.io/api/batch/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -45,6 +48,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission/builder"
)

const (
Expand Down Expand Up @@ -120,7 +125,6 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
if isFatalWatchError(err, TFJobWorker) {
return err
}

err = c.Watch(
&source.Kind{Type: &pytorchjobv1beta1.PyTorchJob{}},
&handler.EnqueueRequestForOwner{
Expand All @@ -131,6 +135,37 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

validatingWebhook, err := builder.NewWebhookBuilder().
Name("validating.studyjob.kubeflow.org").
Validating().
Operations(admissionregistrationv1beta1.Create, admissionregistrationv1beta1.Update).
WithManager(mgr).
ForType(&katibv1alpha1.StudyJob{}).
Handlers(&studyJobValidator{}).
Build()
if err != nil {
return err
}
as, err := webhook.NewServer("studyjob-admission-server", mgr, webhook.ServerOptions{
BootstrapOptions: &webhook.BootstrapOptions{
Service: &webhook.Service{
Namespace: getMyNamespace(),
Name: "studyjob-controller",
Selectors: map[string]string{
"app": "studyjob-controller",
},
},
ValidatingWebhookConfigName: "studyjob-validating-webhook-config",
},
})
if err != nil {
return err
}
err = as.Register(validatingWebhook)
if err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -213,7 +248,7 @@ func (r *ReconcileStudyJobController) Reconcile(request reconcile.Request) (reco
default:
now := metav1.Now()
instance.Status.StartTime = &now
err = initializeStudy(instance, request.Namespace)
err = initializeStudy(instance)
if err != nil {
r.Update(context.TODO(), instance)
log.Printf("Fail to initialize %v", err)
Expand Down Expand Up @@ -296,15 +331,15 @@ func (r *ReconcileStudyJobController) checkGoal(instance *katibv1alpha1.StudyJob
return goal, nil
}

func (r *ReconcileStudyJobController) deleteWorkerResources(instance *katibv1alpha1.StudyJob, ns string, w *katibv1alpha1.WorkerCondition) error {
wid := w.WorkerID
obj := createWorkerJobObj(w.Kind)
func (r *ReconcileStudyJobController) deleteWorkerResources(instance *katibv1alpha1.StudyJob, ns string, wid string, wkind *schema.GroupVersionKind) error {
nname := types.NamespacedName{Namespace: ns, Name: wid}
var wretain, mcretain bool = false, false
if instance.Spec.WorkerSpec != nil {
wretain = instance.Spec.WorkerSpec.Retain
}
if !wretain {
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(*wkind)
joberr := r.Client.Get(context.TODO(), nname, obj)
if joberr == nil {
if err := r.Delete(context.TODO(), obj, client.PropagationPolicy(metav1.DeletePropagationForeground)); err != nil {
Expand Down Expand Up @@ -393,46 +428,56 @@ func (r *ReconcileStudyJobController) updateWorker(c katibapi.ManagerClient, ins
return update, nil
}

func (r *ReconcileStudyJobController) getJobWorkerStatus(w *katibv1alpha1.WorkerCondition, ns string) WorkerStatus {
runtimejob := createWorkerJobObj(w.Kind)
nname := types.NamespacedName{Namespace: ns, Name: w.WorkerID}
joberr := r.Client.Get(context.TODO(), nname, runtimejob)
if joberr != nil {
return WorkerStatus{}
}
func (r *ReconcileStudyJobController) getJobWorkerStatus(ns string, wid string, wkind *schema.GroupVersionKind) WorkerStatus {
nname := types.NamespacedName{Namespace: ns, Name: wid}
var state katibapi.State = katibapi.State_RUNNING
var cpTime *metav1.Time
switch w.Kind {
switch wkind.Kind {

case DefaultJobWorker:
job := runtimejob.(*batchv1.Job)
var job batchv1.Job
if err := r.Client.Get(context.TODO(), nname, &job); err != nil {
log.Printf("Client Get error %v for %v", err, nname)
return WorkerStatus{}
}
if job.Status.Active == 0 && job.Status.Succeeded > 0 {
state = katibapi.State_COMPLETED
} else if job.Status.Failed > 0 {
state = katibapi.State_ERROR
}
cpTime = job.Status.CompletionTime
case TFJobWorker:
job := runtimejob.(*tfjobv1beta1.TFJob)
if len(job.Status.Conditions) > 0 {
lc := job.Status.Conditions[len(job.Status.Conditions)-1]
if lc.Type == commonv1beta1.JobSucceeded {
state = katibapi.State_COMPLETED
} else if lc.Type == commonv1beta1.JobFailed {
state = katibapi.State_ERROR
}

default:
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(*wkind)
if err := r.Client.Get(context.TODO(), nname, u); err != nil {
log.Printf("Client Get error %v for %v", err, nname)
return WorkerStatus{}
}
cpTime = job.Status.CompletionTime
case PyTorchJobWorker:
job := runtimejob.(*pytorchjobv1beta1.PyTorchJob)
if len(job.Status.Conditions) > 0 {
lc := job.Status.Conditions[len(job.Status.Conditions)-1]
if lc.Type == commonv1beta1.JobSucceeded {
state = katibapi.State_COMPLETED
} else if lc.Type == commonv1beta1.JobFailed {
state = katibapi.State_ERROR
status, ok, unerr := unstructured.NestedFieldCopy(u.Object, "status")

if ok {
statusMap := status.(map[string]interface{})
jobStatus := commonv1beta1.JobStatus{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus)
if err != nil {
log.Printf("Error in converting unstructured to status: %v ", err)
return WorkerStatus{}
}
if len(jobStatus.Conditions) > 0 {
lc := jobStatus.Conditions[len(jobStatus.Conditions)-1]
if lc.Type == commonv1beta1.JobSucceeded {
state = katibapi.State_COMPLETED
} else if lc.Type == commonv1beta1.JobFailed {
state = katibapi.State_ERROR
}
}
cpTime = jobStatus.CompletionTime

} else if unerr != nil {
log.Printf("Error in getting Job Status from unstructured: %v", unerr)
return WorkerStatus{}
}
cpTime = job.Status.CompletionTime
}
return WorkerStatus{
CompletionTime: cpTime,
Expand All @@ -459,19 +504,24 @@ func (r *ReconcileStudyJobController) checkStatus(instance *katibv1alpha1.StudyJ
}
defer conn.Close()
c := katibapi.NewManagerClient(conn)
wkind, err := getWorkerKind(instance.Spec.WorkerSpec)
if err != nil {
log.Printf("getWorkerKind error %v", err)
return false, err
}
for i, t := range instance.Status.Trials {
for j, w := range t.WorkerList {
if w.Condition == katibv1alpha1.ConditionCompleted || w.Condition == katibv1alpha1.ConditionFailed {
if w.ObjectiveValue == nil && w.Condition == katibv1alpha1.ConditionCompleted {
cwids = append(cwids, w.WorkerID)
}
if err := r.deleteWorkerResources(instance, ns, &w); err != nil {
if err := r.deleteWorkerResources(instance, ns, w.WorkerID, wkind); err != nil {
return false, err
}
continue
}
nextSuggestionSchedule = false
js := r.getJobWorkerStatus(&w, ns)
js := r.getJobWorkerStatus(ns, w.WorkerID, wkind)
update, err = r.updateWorker(c, instance, js, ns, cwids[0:], i, j)
}
}
Expand Down Expand Up @@ -545,7 +595,7 @@ func (r *ReconcileStudyJobController) getAndRunSuggestion(instance *katibv1alpha
return true, err
}
for _, t := range trials {
wid, err := r.spawnWorker(instance, c, instance.Status.StudyID, t, instance.Spec.WorkerSpec, wkind, false)
wid, err := r.spawnWorker(instance, c, instance.Status.StudyID, t, instance.Spec.WorkerSpec, wkind.Kind, false)
if err != nil {
log.Printf("Spawn worker error %v", err)
instance.Status.Condition = katibv1alpha1.ConditionFailed
Expand All @@ -558,7 +608,7 @@ func (r *ReconcileStudyJobController) getAndRunSuggestion(instance *katibv1alpha
WorkerList: []katibv1alpha1.WorkerCondition{
katibv1alpha1.WorkerCondition{
WorkerID: wid,
Kind: wkind,
Kind: wkind.Kind,
Condition: katibv1alpha1.ConditionCreated,
StartTime: metav1.Now(),
},
Expand Down Expand Up @@ -596,12 +646,12 @@ func (r *ReconcileStudyJobController) spawnWorker(instance *katibv1alpha1.StudyJ
return "", err
}
BUFSIZE := 1024
job := createWorkerJobObj(wkind)
job := &unstructured.Unstructured{}
if err := k8syaml.NewYAMLOrJSONDecoder(wm, BUFSIZE).Decode(job); err != nil {
log.Printf("Yaml decode error %v", err)
return "", err
}
if err := controllerutil.SetControllerReference(instance, job.(metav1.Object), r.scheme); err != nil {
if err := controllerutil.SetControllerReference(instance, job, r.scheme); err != nil {
log.Printf("SetControllerReference error %v", err)
return "", err
}
Expand All @@ -621,7 +671,7 @@ func (r *ReconcileStudyJobController) spawnMetricsCollector(instance *katibv1alp
log.Printf("getWorkerKind error %v", err)
return err
}
mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, wkind, namespace, mcs)
mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, wkind.Kind, namespace, mcs)
if err != nil {
log.Printf("getMetricsCollectorManifest error %v", err)
return err
Expand Down
Loading

0 comments on commit d407be8

Please sign in to comment.