Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Commit

Permalink
simplify reconciler
Browse files Browse the repository at this point in the history
  • Loading branch information
zw0610 committed Dec 1, 2021
1 parent ba4c24a commit f320f16
Show file tree
Hide file tree
Showing 13 changed files with 191 additions and 310 deletions.
12 changes: 6 additions & 6 deletions pkg/reconciler.v1/common/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ To use the reconciler, following methods must be overridden according to the API

```go
// GetJob returns the job that matches the request
func (r *KubeflowJobReconciler) GetJob(ctx context.Context, req ctrl.Request) (client.Object, error)
func (r *JobReconciler) GetJob(ctx context.Context, req ctrl.Request) (client.Object, error)

// ExtractReplicasSpec extracts the ReplicasSpec map from this job
func (r *KubeflowJobReconciler) ExtractReplicasSpec(job client.Object) (map[commonv1.ReplicaType]*commonv1.ReplicaSpec, error)
func (r *JobReconciler) ExtractReplicasSpec(job client.Object) (map[commonv1.ReplicaType]*commonv1.ReplicaSpec, error)

// ExtractRunPolicy extracts the RunPolicy from this job
func (r *KubeflowJobReconciler) ExtractRunPolicy(job client.Object) (*commonv1.RunPolicy, error)
func (r *JobReconciler) ExtractRunPolicy(job client.Object) (*commonv1.RunPolicy, error)

// ExtractJobStatus extracts the JobStatus from this job
func (r *KubeflowJobReconciler) ExtractJobStatus(job client.Object) (*commonv1.JobStatus, error)
func (r *JobReconciler) ExtractJobStatus(job client.Object) (*commonv1.JobStatus, error)

// IsMasterRole checks if Pod is the master Pod
func (r *KubeflowJobReconciler) IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool
func (r *JobReconciler) IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool
```

A simple example can be found at `test_job/reconciler.v1/test_job/test_job_reconciler.go`.
A simple example can be found at `test_job/reconciler.v1/test_job/test_job_reconciler.go`.
10 changes: 10 additions & 0 deletions pkg/reconciler.v1/common/gang_volcano.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ func (r *VolcanoReconciler) GetGangSchedulerName() string {
return "volcano"
}

// GangSchedulingEnabled returns if gang-scheduling is enabled for all jobs
func (r *VolcanoReconciler) GangSchedulingEnabled() bool {
return r.BaseGangReconciler.GangSchedulingEnabled()
}

// GetPodGroupName returns the name of PodGroup for this job
func (r *VolcanoReconciler) GetPodGroupName(job client.Object) string {
return r.BaseGangReconciler.GetPodGroupName(job)
}

// GetPodGroupForJob returns the PodGroup associated with this job
func (r *VolcanoReconciler) GetPodGroupForJob(ctx context.Context, job client.Object) (client.Object, error) {
var pg *volcano.PodGroup = nil
Expand Down
22 changes: 2 additions & 20 deletions pkg/reconciler.v1/common/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// recorder or logger
type ReconcilerUtilInterface interface {
// GetReconcilerName SHOULD be overridden if a new Reconciler is defined. The default implementation returns
// "Kubeflow Reconciler"
// "common-reconciler"
GetReconcilerName() string

// GetRecorder CAN be overridden to customize EventRecorder
Expand All @@ -48,7 +48,7 @@ type ReconcilerUtilInterface interface {
// GangSchedulingInterface defines the abstract interface for gang-scheduling related actions, such like get, create or
// delete PodGroup
type GangSchedulingInterface interface {
// OverrideForGangSchedulingInterface MUST NOT be overridden as it reset ReconcilerUtilInterface
// OverrideForGangSchedulingInterface MUST NOT be overridden as it resets ReconcilerUtilInterface
OverrideForGangSchedulingInterface(ui ReconcilerUtilInterface)

// GangSchedulingEnabled CAN be overridden if definition of gang-scheduling enabling changes.
Expand Down Expand Up @@ -239,21 +239,3 @@ type JobInterface interface {
// PastActiveDeadline CAN be overridden to customize how to determine if this job has past activate deadline.
PastActiveDeadline(runPolicy *commonv1.RunPolicy, jobStatus *commonv1.JobStatus) bool
}

// KubeflowReconcilerInterface defines the abstract interface for a base reconciler for kubeflow jobs.
type KubeflowReconcilerInterface interface {
JobInterface
PodInterface
ServiceInterface
GangSchedulingInterface
ReconcilerUtilInterface

// OverrideForKubeflowReconcilerInterface MUST NOT be overridden as it reset ReconcilerUtilInterface, PodInterface, ServiceInterface, JobInterface, GangSchedulingInterface
OverrideForKubeflowReconcilerInterface(ji JobInterface, pi PodInterface, si ServiceInterface, gi GangSchedulingInterface, ui ReconcilerUtilInterface)

// Reconcile CAN be overridden to customize how to handle a request.
Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

// SetupWithManager CAN be overridden to customize how to set up the reconciler with the manager.
SetupWithManager(mgr ctrl.Manager, obj client.Object) error
}
86 changes: 43 additions & 43 deletions pkg/reconciler.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ const (
ErrReconcileGangTemplate = "ReconcilePodGroups error %v"
ErrGetReplicasStatusFromStatusFailedTemplate = "failed to get ReplicasStatus for %s from status"

WarnDefaultImplementationTemplate = "Warning: executing default implementation for KubeflowReconciler.%s"
WarnDefaultImplementationTemplate = "Warning: executing default implementation for JobReconciler.%s"
WarnNotCountedInBackoffLimit = "The restart policy of replica %v of the job %v is not OnFailure or Always. Not counted in backoff limit."
)

// KubeflowJobReconciler defines a Reconciler dealing with KubeflowJob
type KubeflowJobReconciler struct {
// JobReconciler defines a Reconciler dealing with generic training job
type JobReconciler struct {
client.Client
ReconcilerUtilInterface
PodInterface
Expand All @@ -74,16 +74,16 @@ type KubeflowJobReconciler struct {
counter *commonutil.Counter
}

// BareKubeflowJobReconciler returns the pointer of a KubeflowJobReconciler with minimal implementation
func BareKubeflowJobReconciler(client client.Client) *KubeflowJobReconciler {
return &KubeflowJobReconciler{
// BareJobReconciler returns the pointer of a JobReconciler with minimal implementation
func BareJobReconciler(client client.Client) *JobReconciler {
return &JobReconciler{
Client: client,
counter: commonutil.NewCounter(),
}
}

// OverrideForJobInterface resets ReconcilerUtilInterface, PodInterface, ServiceInterface, GangSchedulingInterface used in KubeflowJobReconciler
func (r *KubeflowJobReconciler) OverrideForJobInterface(ui ReconcilerUtilInterface, pi PodInterface, si ServiceInterface, gi GangSchedulingInterface) {
// OverrideForJobInterface resets ReconcilerUtilInterface, PodInterface, ServiceInterface, GangSchedulingInterface used in JobReconciler
func (r *JobReconciler) OverrideForJobInterface(ui ReconcilerUtilInterface, pi PodInterface, si ServiceInterface, gi GangSchedulingInterface) {
if ui != nil {
r.ReconcilerUtilInterface = ui
}
Expand All @@ -98,8 +98,8 @@ func (r *KubeflowJobReconciler) OverrideForJobInterface(ui ReconcilerUtilInterfa
}
}

// GenLabels returns labels used for this job (based on the name of this KubeflowJob)
func (r *KubeflowJobReconciler) GenLabels(jobName string) map[string]string {
// GenLabels returns labels used for this job (based on the name of this generic training job)
func (r *JobReconciler) GenLabels(jobName string) map[string]string {
jobName = strings.Replace(jobName, "/", "-", -1)
return map[string]string{
// TODO(#149): Remove deprecated labels.
Expand All @@ -110,13 +110,13 @@ func (r *KubeflowJobReconciler) GenLabels(jobName string) map[string]string {
}
}

// GetGroupNameLabelValue returns the Group Name for the KubeflowJob, which is "kubeflow.org"
func (r *KubeflowJobReconciler) GetGroupNameLabelValue() string {
// GetGroupNameLabelValue returns the Group Name for the generic training job, which is "kubeflow.org"
func (r *JobReconciler) GetGroupNameLabelValue() string {
return GroupName
}

// ReconcileJob reconciles KubeflowJob
func (r *KubeflowJobReconciler) ReconcileJob(
// ReconcileJob reconciles generic training job
func (r *JobReconciler) ReconcileJob(
ctx context.Context,
job client.Object,
replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec,
Expand Down Expand Up @@ -262,26 +262,26 @@ func (r *KubeflowJobReconciler) ReconcileJob(
return nil
}

// DeleteJob deletes this KubeflowJob
func (r *KubeflowJobReconciler) DeleteJob(job client.Object) error {
// DeleteJob deletes this generic training job
func (r *JobReconciler) DeleteJob(job client.Object) error {
return r.Delete(context.Background(), job)
}

// RecordAbnormalPods records abnormal pods during the reconciliation of jobs
func (r *KubeflowJobReconciler) RecordAbnormalPods(activePods []*corev1.Pod, object client.Object) {
func (r *JobReconciler) RecordAbnormalPods(activePods []*corev1.Pod, object client.Object) {
core.RecordAbnormalPods(activePods, object, r.GetRecorder())
}

// SetStatusForSuccessJob sets the status for job that succeed
func (r *KubeflowJobReconciler) SetStatusForSuccessJob(status *commonv1.JobStatus) {
func (r *JobReconciler) SetStatusForSuccessJob(status *commonv1.JobStatus) {
for rytpe := range status.ReplicaStatuses {
status.ReplicaStatuses[rytpe].Succeeded += status.ReplicaStatuses[rytpe].Active
status.ReplicaStatuses[rytpe].Active = 0
}
}

// UpdateJobStatus updates the status of this KubeflowJob WITHOUT pushing the updated status to the APIServer
func (r *KubeflowJobReconciler) UpdateJobStatus(
// UpdateJobStatus updates the status of this generic training job WITHOUT pushing the updated status to the APIServer
func (r *JobReconciler) UpdateJobStatus(
job client.Object,
replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec,
jobStatus *commonv1.JobStatus) error {
Expand Down Expand Up @@ -371,13 +371,13 @@ func (r *KubeflowJobReconciler) UpdateJobStatus(
return nil
}

// UpdateJobStatusInAPIServer updates the status of this KubeflowJob in APIServer
func (r *KubeflowJobReconciler) UpdateJobStatusInAPIServer(ctx context.Context, job client.Object) error {
// UpdateJobStatusInAPIServer updates the status of this generic training job in APIServer
func (r *JobReconciler) UpdateJobStatusInAPIServer(ctx context.Context, job client.Object) error {
return r.Status().Update(ctx, job)
}

// CleanupResources cleans up all resources associated with this KubeflowJob
func (r *KubeflowJobReconciler) CleanupResources(runPolicy *commonv1.RunPolicy, status commonv1.JobStatus, job client.Object) error {
// CleanupResources cleans up all resources associated with this generic training job
func (r *JobReconciler) CleanupResources(runPolicy *commonv1.RunPolicy, status commonv1.JobStatus, job client.Object) error {
if *runPolicy.CleanPodPolicy == commonv1.CleanPodPolicyNone {
return nil
}
Expand Down Expand Up @@ -418,8 +418,8 @@ func (r *KubeflowJobReconciler) CleanupResources(runPolicy *commonv1.RunPolicy,
return nil
}

// CleanupJob cleans up all resources associated with this KubeflowJob as well as the job itself
func (r *KubeflowJobReconciler) CleanupJob(runPolicy *commonv1.RunPolicy, status commonv1.JobStatus, job client.Object) error {
// CleanupJob cleans up all resources associated with this generic training job as well as the job itself
func (r *JobReconciler) CleanupJob(runPolicy *commonv1.RunPolicy, status commonv1.JobStatus, job client.Object) error {
currentTime := time.Now()

ttl := runPolicy.TTLSecondsAfterFinished
Expand Down Expand Up @@ -447,54 +447,54 @@ func (r *KubeflowJobReconciler) CleanupJob(runPolicy *commonv1.RunPolicy, status
return nil
}

// IsFlagReplicaTypeForJobStatus checks if this replicaType is the flag replicaType for the status of KubeflowJob
func (r *KubeflowJobReconciler) IsFlagReplicaTypeForJobStatus(rtype string) bool {
// IsFlagReplicaTypeForJobStatus checks if this replicaType is the flag replicaType for the status of generic training job
func (r *JobReconciler) IsFlagReplicaTypeForJobStatus(rtype string) bool {
logrus.Warnf(WarnDefaultImplementationTemplate, "IsFlagReplicaTypeForJobStatus")
return true
}

// IsJobSucceeded checks if this KubeflowJob succeeded
func (r *KubeflowJobReconciler) IsJobSucceeded(status commonv1.JobStatus) bool {
// IsJobSucceeded checks if this generic training job succeeded
func (r *JobReconciler) IsJobSucceeded(status commonv1.JobStatus) bool {
return commonutil.IsSucceeded(status)
}

// IsJobFailed checks if this KubeflowJob failed
func (r *KubeflowJobReconciler) IsJobFailed(status commonv1.JobStatus) bool {
// IsJobFailed checks if this generic training job failed
func (r *JobReconciler) IsJobFailed(status commonv1.JobStatus) bool {
return commonutil.IsFailed(status)
}

// ShouldCleanUp checks if resources associated with this KubeflowJob should be cleaned up
func (r *KubeflowJobReconciler) ShouldCleanUp(status commonv1.JobStatus) bool {
// ShouldCleanUp checks if resources associated with this generic training job should be cleaned up
func (r *JobReconciler) ShouldCleanUp(status commonv1.JobStatus) bool {
return r.IsJobSucceeded(status) || r.IsJobFailed(status)
}

// PastBackoffLimit checks if this KubeflowJob has past backoff limit
func (r *KubeflowJobReconciler) PastBackoffLimit(jobName string, runPolicy *commonv1.RunPolicy,
// PastBackoffLimit checks if this generic training job has past backoff limit
func (r *JobReconciler) PastBackoffLimit(jobName string, runPolicy *commonv1.RunPolicy,
replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, pods []*corev1.Pod) (bool, error) {
return core.PastBackoffLimit(jobName, runPolicy, replicas, pods, r.FilterPodsForReplicaType)
}

// PastActiveDeadline checks if this KubeflowJob has ActiveDeadlineSeconds field set and if it is exceeded.
func (r *KubeflowJobReconciler) PastActiveDeadline(runPolicy *commonv1.RunPolicy, jobStatus *commonv1.JobStatus) bool {
// PastActiveDeadline checks if this generic training job has ActiveDeadlineSeconds field set and if it is exceeded.
func (r *JobReconciler) PastActiveDeadline(runPolicy *commonv1.RunPolicy, jobStatus *commonv1.JobStatus) bool {
return core.PastActiveDeadline(runPolicy, *jobStatus)
}

func (r *KubeflowJobReconciler) GetJob(ctx context.Context, req ctrl.Request) (client.Object, error) {
func (r *JobReconciler) GetJob(ctx context.Context, req ctrl.Request) (client.Object, error) {
panic("implement me")
}

func (r *KubeflowJobReconciler) ExtractReplicasSpec(job client.Object) (map[commonv1.ReplicaType]*commonv1.ReplicaSpec, error) {
func (r *JobReconciler) ExtractReplicasSpec(job client.Object) (map[commonv1.ReplicaType]*commonv1.ReplicaSpec, error) {
panic("implement me")
}

func (r *KubeflowJobReconciler) ExtractRunPolicy(job client.Object) (*commonv1.RunPolicy, error) {
func (r *JobReconciler) ExtractRunPolicy(job client.Object) (*commonv1.RunPolicy, error) {
panic("implement me")
}

func (r *KubeflowJobReconciler) ExtractJobStatus(job client.Object) (*commonv1.JobStatus, error) {
func (r *JobReconciler) ExtractJobStatus(job client.Object) (*commonv1.JobStatus, error) {
panic("implement me")
}

func (r *KubeflowJobReconciler) IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool {
func (r *JobReconciler) IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool {
panic("implement me")
}
36 changes: 18 additions & 18 deletions pkg/reconciler.v1/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ var (
})
)

// KubeflowPodReconciler defines a Pod Reconciler for KubeflowJob
type KubeflowPodReconciler struct {
// PodReconciler defines a Pod Reconciler for generic training job
type PodReconciler struct {
client.Client
ReconcilerUtilInterface
GangSchedulingInterface
JobInterface
}

// OverrideForPodInterface resets ReconcilerUtilInterface, GangSchedulingInterface, JobInterface for KubeflowPodReconciler
func (r *KubeflowPodReconciler) OverrideForPodInterface(ui ReconcilerUtilInterface, gi GangSchedulingInterface, ji JobInterface) {
// OverrideForPodInterface resets ReconcilerUtilInterface, GangSchedulingInterface, JobInterface for PodReconciler
func (r *PodReconciler) OverrideForPodInterface(ui ReconcilerUtilInterface, gi GangSchedulingInterface, ji JobInterface) {
if ui != nil {
r.ReconcilerUtilInterface = ui
}
Expand All @@ -73,50 +73,50 @@ func (r *KubeflowPodReconciler) OverrideForPodInterface(ui ReconcilerUtilInterfa
}
}

// BareKubeflowPodReconciler returns a pointer of BareKubeflowPodReconciler with minimal implementation
func BareKubeflowPodReconciler(client client.Client) *KubeflowPodReconciler {
return &KubeflowPodReconciler{Client: client}
// BarePodReconciler returns a pointer of BarePodReconciler with minimal implementation
func BarePodReconciler(client client.Client) *PodReconciler {
return &PodReconciler{Client: client}
}

// GenPodName returns the name of the Pod based on jobName, replicaType and its index
func (r *KubeflowPodReconciler) GenPodName(jobName string, rtype string, index string) string {
func (r *PodReconciler) GenPodName(jobName string, rtype string, index string) string {
return core.GenGeneralName(jobName, rtype, index)
}

// GetDefaultContainerName returns the default name of the container
func (r *KubeflowPodReconciler) GetDefaultContainerName() string {
func (r *PodReconciler) GetDefaultContainerName() string {
return DefaultContainerName
}

// GetPodsForJob returns all Pods associated with this job
func (r *KubeflowPodReconciler) GetPodsForJob(ctx context.Context, job client.Object) ([]*corev1.Pod, error) {
func (r *PodReconciler) GetPodsForJob(ctx context.Context, job client.Object) ([]*corev1.Pod, error) {
podList := &corev1.PodList{}
err := r.List(ctx, podList, client.MatchingLabels(r.GenLabels(job.GetName())))
if err != nil {
return nil, err
}

var pods []*corev1.Pod = nil
for _, pod := range podList.Items {
pods = append(pods, &pod)
for idx := range podList.Items {
pods = append(pods, &podList.Items[idx])
}

return pods, nil
// TODO: (zw0610) adding Claiming Pods
}

// GetPodSlices generates podSlice from all Pods listed for this job
func (r *KubeflowPodReconciler) GetPodSlices(pods []*corev1.Pod, replicas int, logger *log.Entry) [][]*corev1.Pod {
func (r *PodReconciler) GetPodSlices(pods []*corev1.Pod, replicas int, logger *log.Entry) [][]*corev1.Pod {
return core.GetPodSlices(pods, replicas, logger)
}

// FilterPodsForReplicaType filters out Pods for this replicaType
func (r *KubeflowPodReconciler) FilterPodsForReplicaType(pods []*corev1.Pod, replicaType string) ([]*corev1.Pod, error) {
func (r *PodReconciler) FilterPodsForReplicaType(pods []*corev1.Pod, replicaType string) ([]*corev1.Pod, error) {
return core.FilterPodsForReplicaType(pods, replicaType)
}

// ReconcilePods reconciles Pods for this job
func (r *KubeflowPodReconciler) ReconcilePods(
func (r *PodReconciler) ReconcilePods(
ctx context.Context,
job client.Object,
jobStatus *commonv1.JobStatus,
Expand Down Expand Up @@ -198,7 +198,7 @@ func (r *KubeflowPodReconciler) ReconcilePods(
}

// CreateNewPod generate Pods for this job and submits creation request to APIServer
func (r *KubeflowPodReconciler) CreateNewPod(job client.Object, rt string, index string,
func (r *PodReconciler) CreateNewPod(job client.Object, rt string, index string,
spec *commonv1.ReplicaSpec, masterRole bool, replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec) error {

logger := commonutil.LoggerForReplica(job, rt)
Expand Down Expand Up @@ -260,7 +260,7 @@ func (r *KubeflowPodReconciler) CreateNewPod(job client.Object, rt string, index
}

// DeletePod delete a Pod specified by name and namespace
func (r *KubeflowPodReconciler) DeletePod(ctx context.Context, ns string, name string) error {
func (r *PodReconciler) DeletePod(ctx context.Context, ns string, name string) error {
pod := &corev1.Pod{}
pod.Name = name
pod.Namespace = ns
Expand All @@ -272,7 +272,7 @@ func (r *KubeflowPodReconciler) DeletePod(ctx context.Context, ns string, name s
}

// DecoratePod decorates podTemplate before a Pod is submitted to the APIServer
func (r *KubeflowPodReconciler) DecoratePod(rtype string, podTemplate *corev1.PodTemplateSpec, job client.Object) {
func (r *PodReconciler) DecoratePod(rtype string, podTemplate *corev1.PodTemplateSpec, job client.Object) {
// Default implementation applies nothing to podTemplate
return
}
Loading

0 comments on commit f320f16

Please sign in to comment.