diff --git a/examples/spark-pi.yaml b/examples/spark-pi.yaml index cf41d6e1d..0b5b9a2d9 100644 --- a/examples/spark-pi.yaml +++ b/examples/spark-pi.yaml @@ -22,13 +22,12 @@ metadata: spec: type: Scala mode: cluster + image: "liyinan926/spark:v2.3.0" mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar" driver: - image: "liyinan926/spark:v2.3.0" cores: "0.1" executor: - image: "liyinan926/spark:v2.3.0" instances: 1 memory: "512m" restartPolicy: Never diff --git a/pkg/apis/sparkoperator.k8s.io/v1alpha1/types.go b/pkg/apis/sparkoperator.k8s.io/v1alpha1/types.go index 03a30d467..3f9c86e9b 100644 --- a/pkg/apis/sparkoperator.k8s.io/v1alpha1/types.go +++ b/pkg/apis/sparkoperator.k8s.io/v1alpha1/types.go @@ -73,12 +73,17 @@ type SparkApplicationSpec struct { Type SparkApplicationType `json:"type"` // Mode is the deployment mode of the Spark application. Mode DeployMode `json:"mode,omitempty"` + // Image is the container image for the driver, executor, and init-container. Any custom container images for the + // driver, executor, or init-container takes precedence over this. + // Optional. + Image *string `json:"image,omitempty"` // MainClass is the fully-qualified main class of the Spark application. // This only applies to Java/Scala Spark applications. // Optional. MainClass *string `json:"mainClass,omitempty"` // MainFile is the path to a bundled JAR, Python, or R file of the application. - MainApplicationFile string `json:"mainApplicationFile"` + // Optional. + MainApplicationFile *string `json:"mainApplicationFile"` // Arguments is a list of arguments to be passed to the application. // Optional. Arguments []string `json:"arguments,omitempty"` @@ -196,8 +201,9 @@ type DriverSpec struct { // Memory is used to set spark.driver.memory. // Optional. Memory *string `json:"memory,omitempty"` - // Image is the driver Docker image to use. - Image string `json:"image"` + // Image is the driver container image to use. Overrides Spec.Image if set. + // Optional. + Image *string `json:"image,omitempty"` // ConfigMaps carries information of other ConfigMaps to add to the driver Pod. // Optional. ConfigMaps []NamePath `json:"configMaps,omitempty"` @@ -217,8 +223,9 @@ type ExecutorSpec struct { // Memory is used to set spark.executor.memory. // Optional. Memory *string `json:"memory,omitempty"` - // Image is the executor Docker image to use. - Image string `json:"image"` + // Image is the executor container image to use. Overrides Spec.Image if set. + // Optional. + Image *string `json:"image,omitempty"` // Instances is the number of executor instances. Instances *int32 `json:"instances,omitempty"` // ConfigMaps carries information of other ConfigMaps to add to the executor Pods. diff --git a/pkg/apis/sparkoperator.k8s.io/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/sparkoperator.k8s.io/v1alpha1/zz_generated.deepcopy.go index e672d3f71..d1674caeb 100644 --- a/pkg/apis/sparkoperator.k8s.io/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/sparkoperator.k8s.io/v1alpha1/zz_generated.deepcopy.go @@ -117,6 +117,15 @@ func (in *DriverSpec) DeepCopyInto(out *DriverSpec) { **out = **in } } + if in.Image != nil { + in, out := &in.Image, &out.Image + if *in == nil { + *out = nil + } else { + *out = new(string) + **out = **in + } + } if in.ConfigMaps != nil { in, out := &in.ConfigMaps, &out.ConfigMaps *out = make([]NamePath, len(*in)) @@ -168,6 +177,15 @@ func (in *ExecutorSpec) DeepCopyInto(out *ExecutorSpec) { **out = **in } } + if in.Image != nil { + in, out := &in.Image, &out.Image + if *in == nil { + *out = nil + } else { + *out = new(string) + **out = **in + } + } if in.Instances != nil { in, out := &in.Instances, &out.Instances if *in == nil { @@ -305,6 +323,15 @@ func (in *SparkApplicationList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SparkApplicationSpec) DeepCopyInto(out *SparkApplicationSpec) { *out = *in + if in.Image != nil { + in, out := &in.Image, &out.Image + if *in == nil { + *out = nil + } else { + *out = new(string) + **out = **in + } + } if in.MainClass != nil { in, out := &in.MainClass, &out.MainClass if *in == nil { @@ -314,6 +341,15 @@ func (in *SparkApplicationSpec) DeepCopyInto(out *SparkApplicationSpec) { **out = **in } } + if in.MainApplicationFile != nil { + in, out := &in.MainApplicationFile, &out.MainApplicationFile + if *in == nil { + *out = nil + } else { + *out = new(string) + **out = **in + } + } if in.Arguments != nil { in, out := &in.Arguments, &out.Arguments *out = make([]string, len(*in)) diff --git a/pkg/config/constants.go b/pkg/config/constants.go index fa02011fd..7233ee25f 100644 --- a/pkg/config/constants.go +++ b/pkg/config/constants.go @@ -87,4 +87,7 @@ const ( SparkExecutorLabelKeyPrefix = "spark.kubernetes.executor.label." // SparkDriverPodNameKey is the Spark configuration key for driver pod name. SparkDriverPodNameKey = "spark.kubernetes.driver.pod.name" + SparkContainerImageKey = "spark.kubernetes.container.image" + SparkDriverContainerImageKey = "spark.kubernetes.driver.container.image" + SparkExecutorContainerImageKey = "spark.kubernetes.executor.container.image" ) diff --git a/pkg/controller/submission.go b/pkg/controller/submission.go index 36eb02006..47feba291 100644 --- a/pkg/controller/submission.go +++ b/pkg/controller/submission.go @@ -75,6 +75,13 @@ func buildSubmissionCommandArgs(app *v1alpha1.SparkApplication) ([]string, error args = append(args, "--py-files", strings.Join(app.Spec.Deps.PyFiles, ",")) } + if app.Spec.Image != nil { + args = append( + args, + "--conf", + fmt.Sprintf("%s=%s", config.SparkContainerImageKey, *app.Spec.Image)) + } + if app.Spec.SparkConfigMap != nil { config.AddConfigMapAnnotation(app, config.SparkDriverAnnotationKeyPrefix, config.SparkConfigMapAnnotation, *app.Spec.SparkConfigMap) @@ -113,8 +120,11 @@ func buildSubmissionCommandArgs(app *v1alpha1.SparkApplication) ([]string, error args = append(args, "--conf", fmt.Sprintf("%s%s=%s", config.SparkDriverAnnotationKeyPrefix, config.OwnerReferenceAnnotation, string(referenceData))) - // Add the main application file. - args = append(args, app.Spec.MainApplicationFile) + if app.Spec.MainApplicationFile != nil { + // Add the main application file if it is present. + args = append(args, *app.Spec.MainApplicationFile) + } + // Add application arguments. for _, argument := range app.Spec.Arguments { args = append(args, argument) @@ -151,10 +161,20 @@ func addDriverConfOptions(app *v1alpha1.SparkApplication) []string { driverConfOptions, "--conf", fmt.Sprintf("%s%s=%s", config.SparkDriverLabelKeyPrefix, config.SparkAppIDLabel, app.Status.AppID)) - driverConfOptions = append( - driverConfOptions, - "--conf", - fmt.Sprintf("spark.kubernetes.driver.container.image=%s", app.Spec.Driver.Image)) + + if app.Spec.Driver.PodName != nil { + driverConfOptions = append( + driverConfOptions, + "--conf", + fmt.Sprintf("%s=%s", config.SparkDriverPodNameKey, *app.Spec.Driver.PodName)) + } + + if app.Spec.Driver.Image != nil { + driverConfOptions = append( + driverConfOptions, + "--conf", + fmt.Sprintf("%s=%s", config.SparkDriverContainerImageKey, *app.Spec.Driver.Image)) + } if app.Spec.Driver.Cores != nil { conf := fmt.Sprintf("spark.driver.cores=%s", *app.Spec.Driver.Cores) @@ -178,10 +198,13 @@ func addExecutorConfOptions(app *v1alpha1.SparkApplication) []string { executorConfOptions, "--conf", fmt.Sprintf("%s%s=%s", config.SparkExecutorLabelKeyPrefix, config.SparkAppIDLabel, app.Status.AppID)) - executorConfOptions = append( - executorConfOptions, - "--conf", - fmt.Sprintf("spark.kubernetes.executor.container.image=%s", app.Spec.Executor.Image)) + + if app.Spec.Executor.Image != nil { + executorConfOptions = append( + executorConfOptions, + "--conf", + fmt.Sprintf("%s=%s", config.SparkExecutorContainerImageKey, *app.Spec.Executor.Image)) + } if app.Spec.Executor.Instances != nil { conf := fmt.Sprintf("spark.executor.instances=%d", *app.Spec.Executor.Instances) diff --git a/sparkctl/cmd/create_test.go b/sparkctl/cmd/create_test.go index 94f4d741b..f0dc501e5 100644 --- a/sparkctl/cmd/create_test.go +++ b/sparkctl/cmd/create_test.go @@ -78,9 +78,9 @@ func TestLoadFromYAML(t *testing.T) { assert.Equal(t, app.Name, "example") assert.Equal(t, *app.Spec.MainClass, "org.examples.SparkExample") - assert.Equal(t, app.Spec.MainApplicationFile, "local:///path/to/example.jar") - assert.Equal(t, app.Spec.Driver.Image, "spark") - assert.Equal(t, app.Spec.Executor.Image, "spark") + assert.Equal(t, *app.Spec.MainApplicationFile, "local:///path/to/example.jar") + assert.Equal(t, *app.Spec.Driver.Image, "spark") + assert.Equal(t, *app.Spec.Executor.Image, "spark") assert.Equal(t, int(*app.Spec.Executor.Instances), 1) }