diff --git a/pkg/reconciler.v1/common/README.md b/pkg/reconciler.v1/common/README.md index 15a0fe92..5a1b0a68 100644 --- a/pkg/reconciler.v1/common/README.md +++ b/pkg/reconciler.v1/common/README.md @@ -6,19 +6,19 @@ To use the reconciler, following methods must be overridden according to the API ```go // GetJob returns the job that matches the request -func (r *KubeflowJobReconciler) GetJob(ctx context.Context, req ctrl.Request) (client.Object, error) +func (r *JobReconciler) GetJob(ctx context.Context, req ctrl.Request) (client.Object, error) // ExtractReplicasSpec extracts the ReplicasSpec map from this job -func (r *KubeflowJobReconciler) ExtractReplicasSpec(job client.Object) (map[commonv1.ReplicaType]*commonv1.ReplicaSpec, error) +func (r *JobReconciler) ExtractReplicasSpec(job client.Object) (map[commonv1.ReplicaType]*commonv1.ReplicaSpec, error) // ExtractRunPolicy extracts the RunPolicy from this job -func (r *KubeflowJobReconciler) ExtractRunPolicy(job client.Object) (*commonv1.RunPolicy, error) +func (r *JobReconciler) ExtractRunPolicy(job client.Object) (*commonv1.RunPolicy, error) // ExtractJobStatus extracts the JobStatus from this job -func (r *KubeflowJobReconciler) ExtractJobStatus(job client.Object) (*commonv1.JobStatus, error) +func (r *JobReconciler) ExtractJobStatus(job client.Object) (*commonv1.JobStatus, error) // IsMasterRole checks if Pod is the master Pod -func (r *KubeflowJobReconciler) IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool +func (r *JobReconciler) IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool ``` -A simple example can be found at `test_job/reconciler.v1/test_job/test_job_reconciler.go`. \ No newline at end of file +A simple example can be found at `test_job/reconciler.v1/test_job/test_job_reconciler.go`. diff --git a/pkg/reconciler.v1/common/gang_volcano.go b/pkg/reconciler.v1/common/gang_volcano.go index f9cd6ab1..5ef2a74c 100644 --- a/pkg/reconciler.v1/common/gang_volcano.go +++ b/pkg/reconciler.v1/common/gang_volcano.go @@ -69,6 +69,16 @@ func (r *VolcanoReconciler) GetGangSchedulerName() string { return "volcano" } +// GangSchedulingEnabled returns if gang-scheduling is enabled for all jobs +func (r *VolcanoReconciler) GangSchedulingEnabled() bool { + return r.BaseGangReconciler.GangSchedulingEnabled() +} + +// GetPodGroupName returns the name of PodGroup for this job +func (r *VolcanoReconciler) GetPodGroupName(job client.Object) string { + return r.BaseGangReconciler.GetPodGroupName(job) +} + // GetPodGroupForJob returns the PodGroup associated with this job func (r *VolcanoReconciler) GetPodGroupForJob(ctx context.Context, job client.Object) (client.Object, error) { var pg *volcano.PodGroup = nil diff --git a/pkg/reconciler.v1/common/interface.go b/pkg/reconciler.v1/common/interface.go index 6942d16b..d5fbde2f 100644 --- a/pkg/reconciler.v1/common/interface.go +++ b/pkg/reconciler.v1/common/interface.go @@ -32,7 +32,7 @@ import ( // recorder or logger type ReconcilerUtilInterface interface { // GetReconcilerName SHOULD be overridden if a new Reconciler is defined. The default implementation returns - // "Kubeflow Reconciler" + // "common-reconciler" GetReconcilerName() string // GetRecorder CAN be overridden to customize EventRecorder @@ -48,7 +48,7 @@ type ReconcilerUtilInterface interface { // GangSchedulingInterface defines the abstract interface for gang-scheduling related actions, such like get, create or // delete PodGroup type GangSchedulingInterface interface { - // OverrideForGangSchedulingInterface MUST NOT be overridden as it reset ReconcilerUtilInterface + // OverrideForGangSchedulingInterface MUST NOT be overridden as it resets ReconcilerUtilInterface OverrideForGangSchedulingInterface(ui ReconcilerUtilInterface) // GangSchedulingEnabled CAN be overridden if definition of gang-scheduling enabling changes. @@ -239,21 +239,3 @@ type JobInterface interface { // PastActiveDeadline CAN be overridden to customize how to determine if this job has past activate deadline. PastActiveDeadline(runPolicy *commonv1.RunPolicy, jobStatus *commonv1.JobStatus) bool } - -// KubeflowReconcilerInterface defines the abstract interface for a base reconciler for kubeflow jobs. -type KubeflowReconcilerInterface interface { - JobInterface - PodInterface - ServiceInterface - GangSchedulingInterface - ReconcilerUtilInterface - - // OverrideForKubeflowReconcilerInterface MUST NOT be overridden as it reset ReconcilerUtilInterface, PodInterface, ServiceInterface, JobInterface, GangSchedulingInterface - OverrideForKubeflowReconcilerInterface(ji JobInterface, pi PodInterface, si ServiceInterface, gi GangSchedulingInterface, ui ReconcilerUtilInterface) - - // Reconcile CAN be overridden to customize how to handle a request. - Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) - - // SetupWithManager CAN be overridden to customize how to set up the reconciler with the manager. - SetupWithManager(mgr ctrl.Manager, obj client.Object) error -} diff --git a/pkg/reconciler.v1/common/job.go b/pkg/reconciler.v1/common/job.go index 45b1bc43..063bc693 100644 --- a/pkg/reconciler.v1/common/job.go +++ b/pkg/reconciler.v1/common/job.go @@ -60,12 +60,12 @@ const ( ErrReconcileGangTemplate = "ReconcilePodGroups error %v" ErrGetReplicasStatusFromStatusFailedTemplate = "failed to get ReplicasStatus for %s from status" - WarnDefaultImplementationTemplate = "Warning: executing default implementation for KubeflowReconciler.%s" + WarnDefaultImplementationTemplate = "Warning: executing default implementation for JobReconciler.%s" WarnNotCountedInBackoffLimit = "The restart policy of replica %v of the job %v is not OnFailure or Always. Not counted in backoff limit." ) -// KubeflowJobReconciler defines a Reconciler dealing with KubeflowJob -type KubeflowJobReconciler struct { +// JobReconciler defines a Reconciler dealing with generic training job +type JobReconciler struct { client.Client ReconcilerUtilInterface PodInterface @@ -74,16 +74,16 @@ type KubeflowJobReconciler struct { counter *commonutil.Counter } -// BareKubeflowJobReconciler returns the pointer of a KubeflowJobReconciler with minimal implementation -func BareKubeflowJobReconciler(client client.Client) *KubeflowJobReconciler { - return &KubeflowJobReconciler{ +// BareJobReconciler returns the pointer of a JobReconciler with minimal implementation +func BareJobReconciler(client client.Client) *JobReconciler { + return &JobReconciler{ Client: client, counter: commonutil.NewCounter(), } } -// OverrideForJobInterface resets ReconcilerUtilInterface, PodInterface, ServiceInterface, GangSchedulingInterface used in KubeflowJobReconciler -func (r *KubeflowJobReconciler) OverrideForJobInterface(ui ReconcilerUtilInterface, pi PodInterface, si ServiceInterface, gi GangSchedulingInterface) { +// OverrideForJobInterface resets ReconcilerUtilInterface, PodInterface, ServiceInterface, GangSchedulingInterface used in JobReconciler +func (r *JobReconciler) OverrideForJobInterface(ui ReconcilerUtilInterface, pi PodInterface, si ServiceInterface, gi GangSchedulingInterface) { if ui != nil { r.ReconcilerUtilInterface = ui } @@ -98,8 +98,8 @@ func (r *KubeflowJobReconciler) OverrideForJobInterface(ui ReconcilerUtilInterfa } } -// GenLabels returns labels used for this job (based on the name of this KubeflowJob) -func (r *KubeflowJobReconciler) GenLabels(jobName string) map[string]string { +// GenLabels returns labels used for this job (based on the name of this generic training job) +func (r *JobReconciler) GenLabels(jobName string) map[string]string { jobName = strings.Replace(jobName, "/", "-", -1) return map[string]string{ // TODO(#149): Remove deprecated labels. @@ -110,13 +110,13 @@ func (r *KubeflowJobReconciler) GenLabels(jobName string) map[string]string { } } -// GetGroupNameLabelValue returns the Group Name for the KubeflowJob, which is "kubeflow.org" -func (r *KubeflowJobReconciler) GetGroupNameLabelValue() string { +// GetGroupNameLabelValue returns the Group Name for the generic training job, which is "kubeflow.org" +func (r *JobReconciler) GetGroupNameLabelValue() string { return GroupName } -// ReconcileJob reconciles KubeflowJob -func (r *KubeflowJobReconciler) ReconcileJob( +// ReconcileJob reconciles generic training job +func (r *JobReconciler) ReconcileJob( ctx context.Context, job client.Object, replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, @@ -262,26 +262,26 @@ func (r *KubeflowJobReconciler) ReconcileJob( return nil } -// DeleteJob deletes this KubeflowJob -func (r *KubeflowJobReconciler) DeleteJob(job client.Object) error { +// DeleteJob deletes this generic training job +func (r *JobReconciler) DeleteJob(job client.Object) error { return r.Delete(context.Background(), job) } // RecordAbnormalPods records abnormal pods during the reconciliation of jobs -func (r *KubeflowJobReconciler) RecordAbnormalPods(activePods []*corev1.Pod, object client.Object) { +func (r *JobReconciler) RecordAbnormalPods(activePods []*corev1.Pod, object client.Object) { core.RecordAbnormalPods(activePods, object, r.GetRecorder()) } // SetStatusForSuccessJob sets the status for job that succeed -func (r *KubeflowJobReconciler) SetStatusForSuccessJob(status *commonv1.JobStatus) { +func (r *JobReconciler) SetStatusForSuccessJob(status *commonv1.JobStatus) { for rytpe := range status.ReplicaStatuses { status.ReplicaStatuses[rytpe].Succeeded += status.ReplicaStatuses[rytpe].Active status.ReplicaStatuses[rytpe].Active = 0 } } -// UpdateJobStatus updates the status of this KubeflowJob WITHOUT pushing the updated status to the APIServer -func (r *KubeflowJobReconciler) UpdateJobStatus( +// UpdateJobStatus updates the status of this generic training job WITHOUT pushing the updated status to the APIServer +func (r *JobReconciler) UpdateJobStatus( job client.Object, replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, jobStatus *commonv1.JobStatus) error { @@ -371,13 +371,13 @@ func (r *KubeflowJobReconciler) UpdateJobStatus( return nil } -// UpdateJobStatusInAPIServer updates the status of this KubeflowJob in APIServer -func (r *KubeflowJobReconciler) UpdateJobStatusInAPIServer(ctx context.Context, job client.Object) error { +// UpdateJobStatusInAPIServer updates the status of this generic training job in APIServer +func (r *JobReconciler) UpdateJobStatusInAPIServer(ctx context.Context, job client.Object) error { return r.Status().Update(ctx, job) } -// CleanupResources cleans up all resources associated with this KubeflowJob -func (r *KubeflowJobReconciler) CleanupResources(runPolicy *commonv1.RunPolicy, status commonv1.JobStatus, job client.Object) error { +// CleanupResources cleans up all resources associated with this generic training job +func (r *JobReconciler) CleanupResources(runPolicy *commonv1.RunPolicy, status commonv1.JobStatus, job client.Object) error { if *runPolicy.CleanPodPolicy == commonv1.CleanPodPolicyNone { return nil } @@ -418,8 +418,8 @@ func (r *KubeflowJobReconciler) CleanupResources(runPolicy *commonv1.RunPolicy, return nil } -// CleanupJob cleans up all resources associated with this KubeflowJob as well as the job itself -func (r *KubeflowJobReconciler) CleanupJob(runPolicy *commonv1.RunPolicy, status commonv1.JobStatus, job client.Object) error { +// CleanupJob cleans up all resources associated with this generic training job as well as the job itself +func (r *JobReconciler) CleanupJob(runPolicy *commonv1.RunPolicy, status commonv1.JobStatus, job client.Object) error { currentTime := time.Now() ttl := runPolicy.TTLSecondsAfterFinished @@ -447,54 +447,54 @@ func (r *KubeflowJobReconciler) CleanupJob(runPolicy *commonv1.RunPolicy, status return nil } -// IsFlagReplicaTypeForJobStatus checks if this replicaType is the flag replicaType for the status of KubeflowJob -func (r *KubeflowJobReconciler) IsFlagReplicaTypeForJobStatus(rtype string) bool { +// IsFlagReplicaTypeForJobStatus checks if this replicaType is the flag replicaType for the status of generic training job +func (r *JobReconciler) IsFlagReplicaTypeForJobStatus(rtype string) bool { logrus.Warnf(WarnDefaultImplementationTemplate, "IsFlagReplicaTypeForJobStatus") return true } -// IsJobSucceeded checks if this KubeflowJob succeeded -func (r *KubeflowJobReconciler) IsJobSucceeded(status commonv1.JobStatus) bool { +// IsJobSucceeded checks if this generic training job succeeded +func (r *JobReconciler) IsJobSucceeded(status commonv1.JobStatus) bool { return commonutil.IsSucceeded(status) } -// IsJobFailed checks if this KubeflowJob failed -func (r *KubeflowJobReconciler) IsJobFailed(status commonv1.JobStatus) bool { +// IsJobFailed checks if this generic training job failed +func (r *JobReconciler) IsJobFailed(status commonv1.JobStatus) bool { return commonutil.IsFailed(status) } -// ShouldCleanUp checks if resources associated with this KubeflowJob should be cleaned up -func (r *KubeflowJobReconciler) ShouldCleanUp(status commonv1.JobStatus) bool { +// ShouldCleanUp checks if resources associated with this generic training job should be cleaned up +func (r *JobReconciler) ShouldCleanUp(status commonv1.JobStatus) bool { return r.IsJobSucceeded(status) || r.IsJobFailed(status) } -// PastBackoffLimit checks if this KubeflowJob has past backoff limit -func (r *KubeflowJobReconciler) PastBackoffLimit(jobName string, runPolicy *commonv1.RunPolicy, +// PastBackoffLimit checks if this generic training job has past backoff limit +func (r *JobReconciler) PastBackoffLimit(jobName string, runPolicy *commonv1.RunPolicy, replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, pods []*corev1.Pod) (bool, error) { return core.PastBackoffLimit(jobName, runPolicy, replicas, pods, r.FilterPodsForReplicaType) } -// PastActiveDeadline checks if this KubeflowJob has ActiveDeadlineSeconds field set and if it is exceeded. -func (r *KubeflowJobReconciler) PastActiveDeadline(runPolicy *commonv1.RunPolicy, jobStatus *commonv1.JobStatus) bool { +// PastActiveDeadline checks if this generic training job has ActiveDeadlineSeconds field set and if it is exceeded. +func (r *JobReconciler) PastActiveDeadline(runPolicy *commonv1.RunPolicy, jobStatus *commonv1.JobStatus) bool { return core.PastActiveDeadline(runPolicy, *jobStatus) } -func (r *KubeflowJobReconciler) GetJob(ctx context.Context, req ctrl.Request) (client.Object, error) { +func (r *JobReconciler) GetJob(ctx context.Context, req ctrl.Request) (client.Object, error) { panic("implement me") } -func (r *KubeflowJobReconciler) ExtractReplicasSpec(job client.Object) (map[commonv1.ReplicaType]*commonv1.ReplicaSpec, error) { +func (r *JobReconciler) ExtractReplicasSpec(job client.Object) (map[commonv1.ReplicaType]*commonv1.ReplicaSpec, error) { panic("implement me") } -func (r *KubeflowJobReconciler) ExtractRunPolicy(job client.Object) (*commonv1.RunPolicy, error) { +func (r *JobReconciler) ExtractRunPolicy(job client.Object) (*commonv1.RunPolicy, error) { panic("implement me") } -func (r *KubeflowJobReconciler) ExtractJobStatus(job client.Object) (*commonv1.JobStatus, error) { +func (r *JobReconciler) ExtractJobStatus(job client.Object) (*commonv1.JobStatus, error) { panic("implement me") } -func (r *KubeflowJobReconciler) IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool { +func (r *JobReconciler) IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool { panic("implement me") } diff --git a/pkg/reconciler.v1/common/pod.go b/pkg/reconciler.v1/common/pod.go index 65ed538d..3cbd035a 100644 --- a/pkg/reconciler.v1/common/pod.go +++ b/pkg/reconciler.v1/common/pod.go @@ -52,16 +52,16 @@ var ( }) ) -// KubeflowPodReconciler defines a Pod Reconciler for KubeflowJob -type KubeflowPodReconciler struct { +// PodReconciler defines a Pod Reconciler for generic training job +type PodReconciler struct { client.Client ReconcilerUtilInterface GangSchedulingInterface JobInterface } -// OverrideForPodInterface resets ReconcilerUtilInterface, GangSchedulingInterface, JobInterface for KubeflowPodReconciler -func (r *KubeflowPodReconciler) OverrideForPodInterface(ui ReconcilerUtilInterface, gi GangSchedulingInterface, ji JobInterface) { +// OverrideForPodInterface resets ReconcilerUtilInterface, GangSchedulingInterface, JobInterface for PodReconciler +func (r *PodReconciler) OverrideForPodInterface(ui ReconcilerUtilInterface, gi GangSchedulingInterface, ji JobInterface) { if ui != nil { r.ReconcilerUtilInterface = ui } @@ -73,32 +73,32 @@ func (r *KubeflowPodReconciler) OverrideForPodInterface(ui ReconcilerUtilInterfa } } -// BareKubeflowPodReconciler returns a pointer of BareKubeflowPodReconciler with minimal implementation -func BareKubeflowPodReconciler(client client.Client) *KubeflowPodReconciler { - return &KubeflowPodReconciler{Client: client} +// BarePodReconciler returns a pointer of BarePodReconciler with minimal implementation +func BarePodReconciler(client client.Client) *PodReconciler { + return &PodReconciler{Client: client} } // GenPodName returns the name of the Pod based on jobName, replicaType and its index -func (r *KubeflowPodReconciler) GenPodName(jobName string, rtype string, index string) string { +func (r *PodReconciler) GenPodName(jobName string, rtype string, index string) string { return core.GenGeneralName(jobName, rtype, index) } // GetDefaultContainerName returns the default name of the container -func (r *KubeflowPodReconciler) GetDefaultContainerName() string { +func (r *PodReconciler) GetDefaultContainerName() string { return DefaultContainerName } // GetPodsForJob returns all Pods associated with this job -func (r *KubeflowPodReconciler) GetPodsForJob(ctx context.Context, job client.Object) ([]*corev1.Pod, error) { +func (r *PodReconciler) GetPodsForJob(ctx context.Context, job client.Object) ([]*corev1.Pod, error) { podList := &corev1.PodList{} err := r.List(ctx, podList, client.MatchingLabels(r.GenLabels(job.GetName()))) if err != nil { return nil, err } - var pods []*corev1.Pod = nil - for _, pod := range podList.Items { - pods = append(pods, &pod) + var pods []*corev1.Pod + for idx := range podList.Items { + pods = append(pods, &podList.Items[idx]) } return pods, nil @@ -106,17 +106,17 @@ func (r *KubeflowPodReconciler) GetPodsForJob(ctx context.Context, job client.Ob } // GetPodSlices generates podSlice from all Pods listed for this job -func (r *KubeflowPodReconciler) GetPodSlices(pods []*corev1.Pod, replicas int, logger *log.Entry) [][]*corev1.Pod { +func (r *PodReconciler) GetPodSlices(pods []*corev1.Pod, replicas int, logger *log.Entry) [][]*corev1.Pod { return core.GetPodSlices(pods, replicas, logger) } // FilterPodsForReplicaType filters out Pods for this replicaType -func (r *KubeflowPodReconciler) FilterPodsForReplicaType(pods []*corev1.Pod, replicaType string) ([]*corev1.Pod, error) { +func (r *PodReconciler) FilterPodsForReplicaType(pods []*corev1.Pod, replicaType string) ([]*corev1.Pod, error) { return core.FilterPodsForReplicaType(pods, replicaType) } // ReconcilePods reconciles Pods for this job -func (r *KubeflowPodReconciler) ReconcilePods( +func (r *PodReconciler) ReconcilePods( ctx context.Context, job client.Object, jobStatus *commonv1.JobStatus, @@ -198,7 +198,7 @@ func (r *KubeflowPodReconciler) ReconcilePods( } // CreateNewPod generate Pods for this job and submits creation request to APIServer -func (r *KubeflowPodReconciler) CreateNewPod(job client.Object, rt string, index string, +func (r *PodReconciler) CreateNewPod(job client.Object, rt string, index string, spec *commonv1.ReplicaSpec, masterRole bool, replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec) error { logger := commonutil.LoggerForReplica(job, rt) @@ -260,7 +260,7 @@ func (r *KubeflowPodReconciler) CreateNewPod(job client.Object, rt string, index } // DeletePod delete a Pod specified by name and namespace -func (r *KubeflowPodReconciler) DeletePod(ctx context.Context, ns string, name string) error { +func (r *PodReconciler) DeletePod(ctx context.Context, ns string, name string) error { pod := &corev1.Pod{} pod.Name = name pod.Namespace = ns @@ -272,7 +272,7 @@ func (r *KubeflowPodReconciler) DeletePod(ctx context.Context, ns string, name s } // DecoratePod decorates podTemplate before a Pod is submitted to the APIServer -func (r *KubeflowPodReconciler) DecoratePod(rtype string, podTemplate *corev1.PodTemplateSpec, job client.Object) { +func (r *PodReconciler) DecoratePod(rtype string, podTemplate *corev1.PodTemplateSpec, job client.Object) { // Default implementation applies nothing to podTemplate return } diff --git a/pkg/reconciler.v1/common/pod_test.go b/pkg/reconciler.v1/common/pod_test.go index bf985410..cef88651 100644 --- a/pkg/reconciler.v1/common/pod_test.go +++ b/pkg/reconciler.v1/common/pod_test.go @@ -18,7 +18,6 @@ import ( "testing" commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" - "github.com/kubeflow/common/pkg/reconciler.v1/common" testjobv1 "github.com/kubeflow/common/test_job/apis/test_job/v1" "github.com/kubeflow/common/test_job/reconciler.v1/test_job" testutilv1 "github.com/kubeflow/common/test_job/test_util/v1" @@ -47,8 +46,7 @@ func TestGenPodName(t *testing.T) { }(), } - actualReconciler := test_job.NewTestReconciler() - var testReconciler common.KubeflowReconcilerInterface = actualReconciler + testReconciler := test_job.NewTestReconciler() for _, c := range testCase { na := testReconciler.GenPodName(c.testJob.GetName(), c.testRType, c.testIndex) @@ -125,8 +123,7 @@ func TestFilterPodsForReplicaType(t *testing.T) { }(), } - actualReconciler := test_job.NewTestReconciler() - var testReconciler common.KubeflowReconcilerInterface = actualReconciler + testReconciler := test_job.NewTestReconciler() for _, c := range testCase { filtered, err := testReconciler.FilterPodsForReplicaType(c.testPods, c.testRType) diff --git a/pkg/reconciler.v1/common/reconciler.go b/pkg/reconciler.v1/common/reconciler.go deleted file mode 100644 index fb1a9b0f..00000000 --- a/pkg/reconciler.v1/common/reconciler.go +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright 2021 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package common - -import ( - "context" - - corev1 "k8s.io/api/core/v1" - "k8s.io/client-go/kubernetes/scheme" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/manager" -) - -// KubeflowReconciler reconciles a KubeflowJob object -type KubeflowReconciler struct { - JobInterface - PodInterface - ServiceInterface - GangSchedulingInterface - ReconcilerUtilInterface -} - -// BareKubeflowReconciler returns a pointer of KubeflowReconciler with minimal implementation -func BareKubeflowReconciler() *KubeflowReconciler { - return &KubeflowReconciler{} -} - -// DefaultKubeflowReconciler generates the default KubeflowReconciler with default sub-reconcilers fully setup -func DefaultKubeflowReconciler(mgr manager.Manager, gangEnable bool) *KubeflowReconciler { - kubeflowReconciler := BareKubeflowReconciler() - - // Generate Bare Components - jobInter := BareKubeflowJobReconciler(mgr.GetClient()) - podInter := BareKubeflowPodReconciler(mgr.GetClient()) - svcInter := BareKubeflowServiceReconciler(mgr.GetClient()) - gangInter := BareVolcanoReconciler(mgr.GetClient(), nil, gangEnable) - utilInter := BareUtilReconciler(mgr.GetEventRecorderFor(kubeflowReconciler.GetReconcilerName()), mgr.GetLogger(), mgr.GetScheme()) - - // Assign interfaces for jobInterface - jobInter.PodInterface = podInter - jobInter.ServiceInterface = svcInter - jobInter.GangSchedulingInterface = gangInter - jobInter.ReconcilerUtilInterface = utilInter - - // Assign interfaces for podInterface - podInter.JobInterface = jobInter - podInter.GangSchedulingInterface = gangInter - podInter.ReconcilerUtilInterface = utilInter - - // Assign interfaces for svcInterface - svcInter.PodInterface = podInter - svcInter.JobInterface = jobInter - svcInter.ReconcilerUtilInterface = utilInter - - // Assign interfaces for gangInterface - gangInter.ReconcilerUtilInterface = utilInter - - // Prepare KubeflowReconciler - kubeflowReconciler.JobInterface = jobInter - kubeflowReconciler.PodInterface = podInter - kubeflowReconciler.ServiceInterface = svcInter - kubeflowReconciler.GangSchedulingInterface = gangInter - kubeflowReconciler.ReconcilerUtilInterface = utilInter - - return kubeflowReconciler -} - -// OverrideForKubeflowReconcilerInterface resets JobInterface, PodInterface, ServiceInterface, GangSchedulingInterface, -// ReconcilerUtilInterface for KubeflowReconciler and its sub-reconcilers -func (r *KubeflowReconciler) OverrideForKubeflowReconcilerInterface( - ji JobInterface, - pi PodInterface, - si ServiceInterface, - gi GangSchedulingInterface, - ui ReconcilerUtilInterface) { - r.JobInterface.OverrideForJobInterface(ui, pi, si, gi) - r.PodInterface.OverrideForPodInterface(ui, gi, ji) - r.ServiceInterface.OverrideForServiceInterface(ui, pi, ji) - r.GangSchedulingInterface.OverrideForGangSchedulingInterface(ui) -} - -// Reconcile is part of the main kubernetes reconciliation loop which aims to -// move the current state of the cluster closer to the desired state. -func (r *KubeflowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - _ = log.FromContext(ctx) - - job, err := r.GetJob(ctx, req) - if err != nil { - return ctrl.Result{}, err - } - - logger := r.GetLogger(job) - - if job.GetDeletionTimestamp() != nil { - logger.Info(MsgReconcileCancelled, ReasonKey, ReasonJobDeleted) - return ctrl.Result{}, nil - } - - scheme.Scheme.Default(job) - - // Get rid of SatisfiedExpectation - replicasSpec, err := r.ExtractReplicasSpec(job) - if err != nil { - return ctrl.Result{}, err - } - - runPolicy, err := r.ExtractRunPolicy(job) - if err != nil { - return ctrl.Result{}, err - } - - status, err := r.ExtractJobStatus(job) - if err != nil { - return ctrl.Result{}, err - } - - err = r.ReconcileJob(ctx, job, replicasSpec, status, runPolicy) - if err != nil { - logger.Info("Reconcile PyTorch Job error %v", err) - return ctrl.Result{}, err - } - - return ctrl.Result{}, nil -} - -// SetupWithManager sets up the controller with the Manager. -func (r *KubeflowReconciler) SetupWithManager(mgr ctrl.Manager, obj client.Object) error { - return ctrl.NewControllerManagedBy(mgr). - For(obj). - Owns(&corev1.Pod{}). - Owns(&corev1.Service{}). - Complete(r) -} diff --git a/pkg/reconciler.v1/common/service.go b/pkg/reconciler.v1/common/service.go index d92f71ce..7576f1dd 100644 --- a/pkg/reconciler.v1/common/service.go +++ b/pkg/reconciler.v1/common/service.go @@ -43,23 +43,23 @@ var ( }) ) -// KubeflowServiceReconciler defines a Service Reconciler for KubeflowJob -type KubeflowServiceReconciler struct { +// ServiceReconciler defines a Service Reconciler for generic training job +type ServiceReconciler struct { client.Client ReconcilerUtilInterface PodInterface JobInterface } -// BareKubeflowServiceReconciler returns a pointer of KubeflowServiceReconciler with minimal implementation -func BareKubeflowServiceReconciler(client client.Client) *KubeflowServiceReconciler { - return &KubeflowServiceReconciler{ +// BareServiceReconciler returns a pointer of ServiceReconciler with minimal implementation +func BareServiceReconciler(client client.Client) *ServiceReconciler { + return &ServiceReconciler{ Client: client, } } -// OverrideForServiceInterface resets ReconcilerUtilInterface, PodInterface, JobInterface for KubeflowServiceReconciler -func (r *KubeflowServiceReconciler) OverrideForServiceInterface(ui ReconcilerUtilInterface, pi PodInterface, ji JobInterface) { +// OverrideForServiceInterface resets ReconcilerUtilInterface, PodInterface, JobInterface for ServiceReconciler +func (r *ServiceReconciler) OverrideForServiceInterface(ui ReconcilerUtilInterface, pi PodInterface, ji JobInterface) { if ui != nil { r.ReconcilerUtilInterface = ui } @@ -72,40 +72,40 @@ func (r *KubeflowServiceReconciler) OverrideForServiceInterface(ui ReconcilerUti } // GetPortsFromJob gets the ports of job container. Port could be nil, if distributed communication strategy doesn't need and no other ports that need to be exposed. -func (r *KubeflowServiceReconciler) GetPortsFromJob(spec *commonv1.ReplicaSpec) (map[string]int32, error) { +func (r *ServiceReconciler) GetPortsFromJob(spec *commonv1.ReplicaSpec) (map[string]int32, error) { defaultContainerName := r.GetDefaultContainerName() return core.GetPortsFromJob(spec, defaultContainerName) } // GetServicesForJob returns all services associated with this job -func (r *KubeflowServiceReconciler) GetServicesForJob(ctx context.Context, job client.Object) ([]*corev1.Service, error) { +func (r *ServiceReconciler) GetServicesForJob(ctx context.Context, job client.Object) ([]*corev1.Service, error) { svcList := &corev1.ServiceList{} err := r.List(ctx, svcList, client.MatchingLabels(r.GenLabels(job.GetName()))) if err != nil { return nil, err } - var svcs []*corev1.Service = nil - for _, svc := range svcList.Items { - svcs = append(svcs, &svc) + var svcs []*corev1.Service + for idx := range svcList.Items { + svcs = append(svcs, &svcList.Items[idx]) } return svcs, nil } // FilterServicesForReplicaType returns service belong to a replicaType. -func (r *KubeflowServiceReconciler) FilterServicesForReplicaType(services []*corev1.Service, +func (r *ServiceReconciler) FilterServicesForReplicaType(services []*corev1.Service, replicaType string) ([]*corev1.Service, error) { return core.FilterServicesForReplicaType(services, replicaType) } // GetServiceSlices returns the serviceSlice based on all Services listed for this job -func (r *KubeflowServiceReconciler) GetServiceSlices(services []*corev1.Service, replicas int, logger *log.Entry) [][]*corev1.Service { +func (r *ServiceReconciler) GetServiceSlices(services []*corev1.Service, replicas int, logger *log.Entry) [][]*corev1.Service { return core.GetServiceSlices(services, replicas, logger) } // ReconcileServices reconciles the Services for this job -func (r *KubeflowServiceReconciler) ReconcileServices( +func (r *ServiceReconciler) ReconcileServices( job client.Object, services []*corev1.Service, rtype commonv1.ReplicaType, @@ -155,7 +155,7 @@ func (r *KubeflowServiceReconciler) ReconcileServices( } // CreateNewService generates Service based the job, replica info. and index and submits it to APIServer -func (r *KubeflowServiceReconciler) CreateNewService(job client.Object, rtype commonv1.ReplicaType, +func (r *ServiceReconciler) CreateNewService(job client.Object, rtype commonv1.ReplicaType, spec *commonv1.ReplicaSpec, index string) error { // Convert ReplicaType to lower string. @@ -208,7 +208,7 @@ func (r *KubeflowServiceReconciler) CreateNewService(job client.Object, rtype co } // DeleteService deletes a Service specified by its name and namespace from APIServer -func (r *KubeflowServiceReconciler) DeleteService(ns string, name string, job client.Object) error { +func (r *ServiceReconciler) DeleteService(ns string, name string, job client.Object) error { svc := &corev1.Service{} svc.Name = name svc.Namespace = ns @@ -220,7 +220,7 @@ func (r *KubeflowServiceReconciler) DeleteService(ns string, name string, job cl } // DecorateService decorates the Service before it's submitted to APIServer -func (r *KubeflowServiceReconciler) DecorateService(rtype string, svc *corev1.Service, job client.Object) { +func (r *ServiceReconciler) DecorateService(rtype string, svc *corev1.Service, job client.Object) { // Default implementation applies nothing to podTemplate return } diff --git a/pkg/reconciler.v1/common/service_test.go b/pkg/reconciler.v1/common/service_test.go index 0585233a..203ec661 100644 --- a/pkg/reconciler.v1/common/service_test.go +++ b/pkg/reconciler.v1/common/service_test.go @@ -20,7 +20,6 @@ import ( "testing" commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" - "github.com/kubeflow/common/pkg/reconciler.v1/common" testjobv1 "github.com/kubeflow/common/test_job/apis/test_job/v1" "github.com/kubeflow/common/test_job/reconciler.v1/test_job" test_utilv1 "github.com/kubeflow/common/test_job/test_util/v1" @@ -75,8 +74,7 @@ func TestCreateNewService(t *testing.T) { } }(), } - actualReconciler := test_job.NewTestReconciler() - var testReconciler common.KubeflowReconcilerInterface = actualReconciler + testReconciler := test_job.NewTestReconciler() for _, c := range testCase { err := testReconciler.CreateNewService(c.testJob, c.testRType, c.testSpec, c.testIndex) @@ -86,7 +84,7 @@ func TestCreateNewService(t *testing.T) { } found := false - for _, obj := range actualReconciler.DC.Cache { + for _, obj := range testReconciler.DC.Cache { if obj.GetName() == c.expectedService.GetName() && obj.GetNamespace() == c.expectedService.GetNamespace() { found = true svcCreated := obj.(*corev1.Service) diff --git a/pkg/reconciler.v1/common/utils.go b/pkg/reconciler.v1/common/utils.go index 5ec38b63..2a9f6825 100644 --- a/pkg/reconciler.v1/common/utils.go +++ b/pkg/reconciler.v1/common/utils.go @@ -22,9 +22,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -const ReconcilerName = "Kubeflow Reconciler" +const ReconcilerName = "common-reconciler" -// GetReconcilerName returns the name of this reconciler, which is "Kubeflow Reconciler" +// GetReconcilerName returns the name of this reconciler, which is "common-reconciler" func (r *ReconcilerUtil) GetReconcilerName() string { return ReconcilerName } diff --git a/pkg/reconciler.v1/common/utils_test.go b/pkg/reconciler.v1/common/utils_test.go index e3c31e4e..4f4b1d7e 100644 --- a/pkg/reconciler.v1/common/utils_test.go +++ b/pkg/reconciler.v1/common/utils_test.go @@ -20,7 +20,6 @@ import ( commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" testjobv1 "github.com/kubeflow/common/test_job/apis/test_job/v1" - "github.com/kubeflow/common/pkg/reconciler.v1/common" "github.com/kubeflow/common/test_job/reconciler.v1/test_job" ) @@ -43,8 +42,7 @@ func TestGenLabels(t *testing.T) { }(), } - actualReconciler := test_job.NewTestReconciler() - var testReconciler common.KubeflowReconcilerInterface = actualReconciler + testReconciler := test_job.NewTestReconciler() for _, c := range testCase { labels := testReconciler.GenLabels(c.testJobName) diff --git a/test_job/apis/test_job/v1/defaults.go b/test_job/apis/test_job/v1/defaults.go index d71dbcc0..b06a35c0 100644 --- a/test_job/apis/test_job/v1/defaults.go +++ b/test_job/apis/test_job/v1/defaults.go @@ -18,7 +18,7 @@ import ( "strings" commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -87,6 +87,17 @@ func setTypeNameToCamelCase(testJob *TestJob, typ TestReplicaType) { // SetDefaults_TestJob sets any unspecified values to defaults. func SetDefaults_TestJob(testjob *TestJob) { + // Set default RunPolicy + if testjob.Spec.RunPolicy == nil { + testjob.Spec.RunPolicy = &commonv1.RunPolicy{ + CleanPodPolicy: nil, + TTLSecondsAfterFinished: nil, + ActiveDeadlineSeconds: nil, + BackoffLimit: nil, + SchedulingPolicy: nil, + } + } + // Set default cleanpod policy to Running. if testjob.Spec.RunPolicy.CleanPodPolicy == nil { running := commonv1.CleanPodPolicyRunning diff --git a/test_job/reconciler.v1/test_job/test_job_reconciler.go b/test_job/reconciler.v1/test_job/test_job_reconciler.go index 5413af42..62e00cc6 100644 --- a/test_job/reconciler.v1/test_job/test_job_reconciler.go +++ b/test_job/reconciler.v1/test_job/test_job_reconciler.go @@ -3,11 +3,10 @@ package test_job import ( "context" - "github.com/go-logr/logr" - commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" common_reconciler "github.com/kubeflow/common/pkg/reconciler.v1/common" v1 "github.com/kubeflow/common/test_job/apis/test_job/v1" + "github.com/kubeflow/common/test_job/client/clientset/versioned/scheme" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" @@ -15,12 +14,16 @@ import ( clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" ) -var _ common_reconciler.KubeflowReconcilerInterface = &TestReconciler{} - type TestReconciler struct { - common_reconciler.KubeflowReconciler + common_reconciler.ReconcilerUtil + common_reconciler.ServiceReconciler + common_reconciler.PodReconciler + common_reconciler.VolcanoReconciler + common_reconciler.JobReconciler + DC *DummyClient Job *v1.TestJob Pods []*corev1.Pod @@ -33,50 +36,79 @@ func NewTestReconciler() *TestReconciler { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(v1.AddToScheme(scheme)) - kubeflowReconciler := common_reconciler.BareKubeflowReconciler() - dummy_client := &DummyClient{} + r := &TestReconciler{ + DC: dummy_client, + } + // Generate Bare Components - jobInter := common_reconciler.BareKubeflowJobReconciler(dummy_client) - podInter := common_reconciler.BareKubeflowPodReconciler(dummy_client) - svcInter := common_reconciler.BareKubeflowServiceReconciler(dummy_client) - gangInter := common_reconciler.BareVolcanoReconciler(dummy_client, nil, true) - utilInter := common_reconciler.BareUtilReconciler(nil, logr.FromContext(context.Background()), scheme) - - // Assign interfaces for jobInterface - jobInter.PodInterface = podInter - jobInter.ServiceInterface = svcInter - jobInter.GangSchedulingInterface = gangInter - jobInter.ReconcilerUtilInterface = utilInter - - // Assign interfaces for podInterface - podInter.JobInterface = jobInter - podInter.GangSchedulingInterface = gangInter - podInter.ReconcilerUtilInterface = utilInter - - // Assign interfaces for svcInterface - svcInter.PodInterface = podInter - svcInter.JobInterface = jobInter - svcInter.ReconcilerUtilInterface = utilInter - - // Assign interfaces for gangInterface - gangInter.ReconcilerUtilInterface = utilInter - - // Prepare KubeflowReconciler - kubeflowReconciler.JobInterface = jobInter - kubeflowReconciler.PodInterface = podInter - kubeflowReconciler.ServiceInterface = svcInter - kubeflowReconciler.GangSchedulingInterface = gangInter - kubeflowReconciler.ReconcilerUtilInterface = utilInter - - testReconciler := &TestReconciler{ - KubeflowReconciler: *kubeflowReconciler, - DC: dummy_client, + jobR := common_reconciler.BareJobReconciler(dummy_client) + jobR.OverrideForJobInterface(r, r, r, r) + + podR := common_reconciler.BarePodReconciler(dummy_client) + podR.OverrideForPodInterface(r, r, r) + + svcR := common_reconciler.BareServiceReconciler(dummy_client) + svcR.OverrideForServiceInterface(r, r, r) + + gangR := common_reconciler.BareVolcanoReconciler(dummy_client, nil, false) + gangR.OverrideForGangSchedulingInterface(r) + + Log := log.Log + utilR := common_reconciler.BareUtilReconciler(nil, Log, scheme) + //kubeflowReconciler := common_reconciler.BareKubeflowReconciler() + + r.JobReconciler = *jobR + r.PodReconciler = *podR + r.ServiceReconciler = *svcR + r.VolcanoReconciler = *gangR + r.ReconcilerUtil = *utilR + + return r +} + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +func (r *TestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + _ = log.FromContext(ctx) + + job, err := r.GetJob(ctx, req) + if err != nil { + return ctrl.Result{}, err + } + + logger := r.GetLogger(job) + + if job.GetDeletionTimestamp() != nil { + return ctrl.Result{}, nil + } + + scheme.Scheme.Default(job) + + // Get rid of SatisfiedExpectation + replicasSpec, err := r.ExtractReplicasSpec(job) + if err != nil { + return ctrl.Result{}, err + } + + runPolicy, err := r.ExtractRunPolicy(job) + if err != nil { + return ctrl.Result{}, err + } + + status, err := r.ExtractJobStatus(job) + if err != nil { + return ctrl.Result{}, err + } + + err = r.ReconcileJob(ctx, job, replicasSpec, status, runPolicy) + if err != nil { + logger.Info("Reconcile Test Job error %v", err) + return ctrl.Result{}, err } - testReconciler.OverrideForKubeflowReconcilerInterface(testReconciler, testReconciler, testReconciler, testReconciler, testReconciler) - return testReconciler + return ctrl.Result{}, nil } func (r *TestReconciler) GetReconcilerName() string {