Skip to content

Commit

Permalink
v2(backend): fix scheduledworkflow (#1315)
Browse files Browse the repository at this point in the history
Change the behavior of the run creation in scheduledworkflow:
submit the run creation request to apiserver.

Signed-off-by: Yihong Wang <[email protected]>
  • Loading branch information
yhwang authored Aug 4, 2023
1 parent be53cfe commit 6b129b7
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 23 deletions.
15 changes: 15 additions & 0 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,21 @@ func (r *ResourceManager) CreateRun(ctx context.Context, run *model.Run) (*model
return nil, util.NewInternalServerError(util.NewInvalidInputError("Namespace cannot be empty when creating an Argo workflow. Check if you have specified POD_NAMESPACE or try adding the parent namespace to the request"), "Failed to create a run due to empty namespace")
}
executionSpec.SetExecutionNamespace(k8sNamespace)

// assign OwnerReference to scheduledworkflow
if run.RecurringRunId != "" {
job, err := r.jobStore.GetJob(run.RecurringRunId)
if err != nil {
return nil, util.NewInternalServerError(util.NewInvalidInputError("RecurringRunId doesn't exist: %s", run.RecurringRunId), "Failed to create a run due to invalid recurring run id")
}
swf, err := r.swfClient.ScheduledWorkflow(job.Namespace).Get(ctx, job.K8SName, v1.GetOptions{})
if err != nil {
return nil, util.NewInternalServerError(util.NewInvalidInputError("ScheduledWorkflow doesn't exist: %s", job.K8SName), "Failed to create a run due to invalid name")
}
executionSpec.SetOwnerReferences(swf)
// executionSpec.SetExecutionName(run.Description)
}

newExecSpec, err := r.getWorkflowClient(k8sNamespace).Create(ctx, executionSpec, v1.CreateOptions{})
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
Expand Down
4 changes: 3 additions & 1 deletion backend/src/apiserver/template/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ func TestScheduledWorkflow(t *testing.T) {
Parameters: []scheduledworkflow.Parameter{{Name: "y", Value: "\"world\""}},
Spec: "",
},
NoCatchup: util.BoolPointer(true),
PipelineId: "1",
PipelineName: "pipeline name",
NoCatchup: util.BoolPointer(true),
},
}

Expand Down
7 changes: 6 additions & 1 deletion backend/src/apiserver/template/v2_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,12 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche
Parameters: parameters,
Spec: executionSpec.ToStringForSchedule(),
},
NoCatchup: util.BoolPointer(modelJob.NoCatchup),
NoCatchup: util.BoolPointer(modelJob.NoCatchup),
ExperimentId: modelJob.ExperimentId,
PipelineId: modelJob.PipelineId,
PipelineName: modelJob.PipelineName,
PipelineVersionId: modelJob.PipelineVersionId,
ServiceAccount: modelJob.ServiceAccount,
},
}
return scheduledWorkflow, nil
Expand Down
37 changes: 28 additions & 9 deletions backend/src/crd/controller/scheduledworkflow/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"time"

apiv2 "github.com/kubeflow/pipelines/backend/api/v2beta1/go_client"
commonutil "github.com/kubeflow/pipelines/backend/src/common/util"
"github.com/kubeflow/pipelines/backend/src/crd/controller/scheduledworkflow/client"
"github.com/kubeflow/pipelines/backend/src/crd/controller/scheduledworkflow/util"
Expand Down Expand Up @@ -56,9 +57,10 @@ var (

// Controller is the controller implementation for ScheduledWorkflow resources
type Controller struct {
kubeClient *client.KubeClient
swfClient *client.ScheduledWorkflowClient
workflowClient *client.WorkflowClient
kubeClient *client.KubeClient
swfClient *client.ScheduledWorkflowClient
workflowClient *client.WorkflowClient
runServiceclient apiv2.RunServiceClient

// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
Expand All @@ -81,6 +83,7 @@ func NewController(
workflowClientSet commonutil.ExecutionClient,
swfInformerFactory swfinformers.SharedInformerFactory,
executionInformer commonutil.ExecutionInformer,
runServiceClient apiv2.RunServiceClient,
time commonutil.TimeInterface,
location *time.Location) *Controller {

Expand All @@ -99,9 +102,10 @@ func NewController(
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: util.ControllerAgentName})

controller := &Controller{
kubeClient: client.NewKubeClient(kubeClientSet, recorder),
swfClient: client.NewScheduledWorkflowClient(swfClientSet, swfInformer),
workflowClient: client.NewWorkflowClient(workflowClientSet, executionInformer),
kubeClient: client.NewKubeClient(kubeClientSet, recorder),
swfClient: client.NewScheduledWorkflowClient(swfClientSet, swfInformer),
workflowClient: client.NewWorkflowClient(workflowClientSet, executionInformer),
runServiceclient: runServiceClient,
workqueue: workqueue.NewNamedRateLimitingQueue(
workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), swfregister.Kind),
time: time,
Expand Down Expand Up @@ -495,12 +499,27 @@ func (c *Controller) submitNewWorkflowIfNotAlreadySubmitted(
}

// If the workflow is not found, we need to create it.
newWorkflow, err := swf.NewWorkflow(nextScheduledEpoch, nowEpoch)
createdWorkflow, err := c.workflowClient.Create(ctx, swf.Namespace, newWorkflow)

run := &apiv2.CreateRunRequest{
Run: &apiv2.Run{
ExperimentId: swf.Spec.ExperimentId,
DisplayName: workflowName,
PipelineSource: &apiv2.Run_PipelineVersionReference{
PipelineVersionReference: &apiv2.PipelineVersionReference{
PipelineId: swf.Spec.PipelineId,
PipelineVersionId: swf.Spec.PipelineVersionId,
},
},
ServiceAccount: swf.Spec.ServiceAccount,
RecurringRunId: string(swf.GetObjectMeta().GetUID()),
},
}
newRun, err := c.runServiceclient.CreateRun(ctx, run)

if err != nil {
return false, "", err
}
return true, createdWorkflow.ExecutionName(), nil
return true, newRun.GetDisplayName(), nil
}

func (c *Controller) updateStatus(
Expand Down
65 changes: 58 additions & 7 deletions backend/src/crd/controller/scheduledworkflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ package main

import (
"flag"
"fmt"
"strings"
"time"

apiv2 "github.com/kubeflow/pipelines/backend/api/v2beta1/go_client"
commonutil "github.com/kubeflow/pipelines/backend/src/common/util"
"github.com/kubeflow/pipelines/backend/src/crd/controller/scheduledworkflow/util"
swfclientset "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned"
swfinformers "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/informers/externalversions"
"github.com/kubeflow/pipelines/backend/src/crd/pkg/signals"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"k8s.io/client-go/kubernetes"
Expand All @@ -32,13 +35,30 @@ import (
)

var (
masterURL string
kubeconfig string
namespace string
location *time.Location
clientQPS float64
clientBurst int
executionType string
masterURL string
kubeconfig string
namespace string
location *time.Location
clientQPS float64
clientBurst int
executionType string
initializeTimeout time.Duration
timeout time.Duration
mlPipelineAPIServerName string
mlPipelineAPIServerPort string
mlPipelineAPIServerBasePath string
mlPipelineServiceHttpPort string
mlPipelineServiceGRPCPort string
)

const (
initializationTimeoutFlagName = "initializeTimeout"
timeoutFlagName = "timeout"
mlPipelineAPIServerBasePathFlagName = "mlPipelineAPIServerBasePath"
mlPipelineAPIServerNameFlagName = "mlPipelineAPIServerName"
mlPipelineAPIServerHttpPortFlagName = "mlPipelineServiceHttpPort"
mlPipelineAPIServerGRPCPortFlagName = "mlPipelineServiceGRPCPort"
addressTemp = "%s:%s"
)

func main() {
Expand Down Expand Up @@ -67,6 +87,11 @@ func main() {
log.Fatalf("Error building schedule clientset: %s", err.Error())
}

runServiceClient, err := NewRunServiceclient()
if err != nil {
log.Fatalf("Error connecting to apiserver: %s", err.Error())
}

clientParam := commonutil.ClientParameters{QPS: float64(cfg.QPS), Burst: cfg.Burst}
execClient := commonutil.NewExecutionClientOrFatal(commonutil.CurrentExecutionType(), time.Second*30, clientParam)

Expand All @@ -84,6 +109,7 @@ func main() {
execClient,
scheduleInformerFactory,
execInformer,
runServiceClient,
commonutil.NewRealTime(),
location)

Expand All @@ -95,6 +121,23 @@ func main() {
}
}

func NewRunServiceclient() (apiv2.RunServiceClient, error) {

httpAddress := fmt.Sprintf(addressTemp, mlPipelineAPIServerName, mlPipelineServiceHttpPort)
grpcAddress := fmt.Sprintf(addressTemp, mlPipelineAPIServerName, mlPipelineServiceGRPCPort)
err := commonutil.WaitForAPIAvailable(initializeTimeout, mlPipelineAPIServerBasePath, httpAddress)
if err != nil {
return nil, errors.Wrapf(err,
"Failed to initialize pipeline client. Error: %s", err.Error())
}
connection, err := commonutil.GetRpcConnection(grpcAddress)
if err != nil {
return nil, errors.Wrapf(err,
"Failed to get RPC connection. Error: %s", err.Error())
}
return apiv2.NewRunServiceClient(connection), nil
}

func initEnv() {
// Import environment variable, support nested vars e.g. OBJECTSTORECONFIG_ACCESSKEY
replacer := strings.NewReplacer(".", "_")
Expand All @@ -114,6 +157,14 @@ func init() {
flag.Float64Var(&clientQPS, "clientQPS", 5, "The maximum QPS to the master from this client.")
flag.IntVar(&clientBurst, "clientBurst", 10, "Maximum burst for throttle from this client.")
flag.StringVar(&executionType, "executionType", "Workflow", "Custom Resource's name of the backend Orchestration Engine")
flag.DurationVar(&initializeTimeout, initializationTimeoutFlagName, 2*time.Minute, "Duration to wait for initialization of the ML pipeline API server.")
flag.DurationVar(&timeout, timeoutFlagName, 1*time.Minute, "Duration to wait for calls to complete.")
flag.StringVar(&mlPipelineAPIServerName, mlPipelineAPIServerNameFlagName, "ml-pipeline", "Name of the ML pipeline API server.")
flag.StringVar(&mlPipelineServiceHttpPort, mlPipelineAPIServerHttpPortFlagName, "8888", "Http Port of the ML pipeline API server.")
flag.StringVar(&mlPipelineServiceGRPCPort, mlPipelineAPIServerGRPCPortFlagName, "8887", "GRPC Port of the ML pipeline API server.")
flag.StringVar(&mlPipelineAPIServerBasePath, mlPipelineAPIServerBasePathFlagName,
"/apis/v2beta1", "The base path for the ML pipeline API server.")

var err error
location, err = util.GetLocation()
if err != nil {
Expand Down
17 changes: 17 additions & 0 deletions backend/src/crd/pkg/apis/scheduledworkflow/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,23 @@ type ScheduledWorkflowSpec struct {
// +optional
Workflow *WorkflowResource `json:"workflow,omitempty"`

// ExperimentId
ExperimentId string `json:"experimentId,omitempty"`

// PipelineId
PipelineId string `json:"pipelineId,omitempty"`

// PipelineVersionId
PipelineVersionId string `json:"pipelineVersionId,omitempty"`

// TODO(gkcalat): consider adding PipelineVersionName to avoid confusion.
// Pipeline versions's Name will be required if ID is not empty.
// This carries the name of the pipeline version in v2beta1.
PipelineName string `json:"pipelineName,omitempty"`

// ServiceAccount
ServiceAccount string `json:"serviceAccount,omitempty"`

// TODO: support additional resource types: K8 jobs, etc.

}
Expand Down
10 changes: 5 additions & 5 deletions scripts/deploy/github/manifests/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ resources:
# when application is deleted.

images:
- name: '*/aipipeline/tekton-driver-dev'
- name: '*/aipipeline/tekton-driver'
newName: kind-registry:5000/tekton-driver
newTag: latest
- name: '*/aipipeline/tekton-exithandler-controller-dev'
- name: '*/aipipeline/tekton-exithandler-controller'
newName: kind-registry:5000/tekton-exithandler-controller
newTag: latest
- name: '*/aipipeline/tekton-exithandler-webhook-dev'
- name: '*/aipipeline/tekton-exithandler-webhook'
newName: kind-registry:5000/tekton-exithandler-webhook
newTag: latest
- name: '*/aipipeline/tekton-kfptask-controller-dev'
- name: '*/aipipeline/tekton-kfptask-controller'
newName: kind-registry:5000/tekton-kfptask-controller
newTag: latest
- name: '*/aipipeline/tekton-kfptask-webhook-dev'
- name: '*/aipipeline/tekton-kfptask-webhook'
newName: kind-registry:5000/tekton-kfptask-webhook
newTag: latest
- name: gcr.io/ml-pipeline/api-server
Expand Down

0 comments on commit 6b129b7

Please sign in to comment.