Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions examples/spark-pi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ metadata:
spec:
type: Scala
mode: cluster
image: "liyinan926/spark:v2.3.0"
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar"
driver:
image: "liyinan926/spark:v2.3.0"
cores: "0.1"
executor:
image: "liyinan926/spark:v2.3.0"
instances: 1
memory: "512m"
restartPolicy: Never
Expand Down
17 changes: 12 additions & 5 deletions pkg/apis/sparkoperator.k8s.io/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,17 @@ type SparkApplicationSpec struct {
Type SparkApplicationType `json:"type"`
// Mode is the deployment mode of the Spark application.
Mode DeployMode `json:"mode,omitempty"`
// Image is the container image for the driver, executor, and init-container. Any custom container images for the
// driver, executor, or init-container takes precedence over this.
// Optional.
Image *string `json:"image,omitempty"`
// MainClass is the fully-qualified main class of the Spark application.
// This only applies to Java/Scala Spark applications.
// Optional.
MainClass *string `json:"mainClass,omitempty"`
// MainFile is the path to a bundled JAR, Python, or R file of the application.
MainApplicationFile string `json:"mainApplicationFile"`
// Optional.
MainApplicationFile *string `json:"mainApplicationFile"`
// Arguments is a list of arguments to be passed to the application.
// Optional.
Arguments []string `json:"arguments,omitempty"`
Expand Down Expand Up @@ -196,8 +201,9 @@ type DriverSpec struct {
// Memory is used to set spark.driver.memory.
// Optional.
Memory *string `json:"memory,omitempty"`
// Image is the driver Docker image to use.
Image string `json:"image"`
// Image is the driver container image to use. Overrides Spec.Image if set.
// Optional.
Image *string `json:"image,omitempty"`
// ConfigMaps carries information of other ConfigMaps to add to the driver Pod.
// Optional.
ConfigMaps []NamePath `json:"configMaps,omitempty"`
Expand All @@ -217,8 +223,9 @@ type ExecutorSpec struct {
// Memory is used to set spark.executor.memory.
// Optional.
Memory *string `json:"memory,omitempty"`
// Image is the executor Docker image to use.
Image string `json:"image"`
// Image is the executor container image to use. Overrides Spec.Image if set.
// Optional.
Image *string `json:"image,omitempty"`
// Instances is the number of executor instances.
Instances *int32 `json:"instances,omitempty"`
// ConfigMaps carries information of other ConfigMaps to add to the executor Pods.
Expand Down
36 changes: 36 additions & 0 deletions pkg/apis/sparkoperator.k8s.io/v1alpha1/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ func (in *DriverSpec) DeepCopyInto(out *DriverSpec) {
**out = **in
}
}
if in.Image != nil {
in, out := &in.Image, &out.Image
if *in == nil {
*out = nil
} else {
*out = new(string)
**out = **in
}
}
if in.ConfigMaps != nil {
in, out := &in.ConfigMaps, &out.ConfigMaps
*out = make([]NamePath, len(*in))
Expand Down Expand Up @@ -168,6 +177,15 @@ func (in *ExecutorSpec) DeepCopyInto(out *ExecutorSpec) {
**out = **in
}
}
if in.Image != nil {
in, out := &in.Image, &out.Image
if *in == nil {
*out = nil
} else {
*out = new(string)
**out = **in
}
}
if in.Instances != nil {
in, out := &in.Instances, &out.Instances
if *in == nil {
Expand Down Expand Up @@ -305,6 +323,15 @@ func (in *SparkApplicationList) DeepCopyObject() runtime.Object {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SparkApplicationSpec) DeepCopyInto(out *SparkApplicationSpec) {
*out = *in
if in.Image != nil {
in, out := &in.Image, &out.Image
if *in == nil {
*out = nil
} else {
*out = new(string)
**out = **in
}
}
if in.MainClass != nil {
in, out := &in.MainClass, &out.MainClass
if *in == nil {
Expand All @@ -314,6 +341,15 @@ func (in *SparkApplicationSpec) DeepCopyInto(out *SparkApplicationSpec) {
**out = **in
}
}
if in.MainApplicationFile != nil {
in, out := &in.MainApplicationFile, &out.MainApplicationFile
if *in == nil {
*out = nil
} else {
*out = new(string)
**out = **in
}
}
if in.Arguments != nil {
in, out := &in.Arguments, &out.Arguments
*out = make([]string, len(*in))
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,7 @@ const (
SparkExecutorLabelKeyPrefix = "spark.kubernetes.executor.label."
// SparkDriverPodNameKey is the Spark configuration key for driver pod name.
SparkDriverPodNameKey = "spark.kubernetes.driver.pod.name"
SparkContainerImageKey = "spark.kubernetes.container.image"
SparkDriverContainerImageKey = "spark.kubernetes.driver.container.image"
SparkExecutorContainerImageKey = "spark.kubernetes.executor.container.image"
)
43 changes: 33 additions & 10 deletions pkg/controller/submission.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ func buildSubmissionCommandArgs(app *v1alpha1.SparkApplication) ([]string, error
args = append(args, "--py-files", strings.Join(app.Spec.Deps.PyFiles, ","))
}

if app.Spec.Image != nil {
args = append(
args,
"--conf",
fmt.Sprintf("%s=%s", config.SparkContainerImageKey, *app.Spec.Image))
}

if app.Spec.SparkConfigMap != nil {
config.AddConfigMapAnnotation(app, config.SparkDriverAnnotationKeyPrefix, config.SparkConfigMapAnnotation,
*app.Spec.SparkConfigMap)
Expand Down Expand Up @@ -113,8 +120,11 @@ func buildSubmissionCommandArgs(app *v1alpha1.SparkApplication) ([]string, error
args = append(args, "--conf", fmt.Sprintf("%s%s=%s", config.SparkDriverAnnotationKeyPrefix,
config.OwnerReferenceAnnotation, string(referenceData)))

// Add the main application file.
args = append(args, app.Spec.MainApplicationFile)
if app.Spec.MainApplicationFile != nil {
// Add the main application file if it is present.
args = append(args, *app.Spec.MainApplicationFile)
}

// Add application arguments.
for _, argument := range app.Spec.Arguments {
args = append(args, argument)
Expand Down Expand Up @@ -151,10 +161,20 @@ func addDriverConfOptions(app *v1alpha1.SparkApplication) []string {
driverConfOptions,
"--conf",
fmt.Sprintf("%s%s=%s", config.SparkDriverLabelKeyPrefix, config.SparkAppIDLabel, app.Status.AppID))
driverConfOptions = append(
driverConfOptions,
"--conf",
fmt.Sprintf("spark.kubernetes.driver.container.image=%s", app.Spec.Driver.Image))

if app.Spec.Driver.PodName != nil {
driverConfOptions = append(
driverConfOptions,
"--conf",
fmt.Sprintf("%s=%s", config.SparkDriverPodNameKey, *app.Spec.Driver.PodName))
}

if app.Spec.Driver.Image != nil {
driverConfOptions = append(
driverConfOptions,
"--conf",
fmt.Sprintf("%s=%s", config.SparkDriverContainerImageKey, *app.Spec.Driver.Image))
}

if app.Spec.Driver.Cores != nil {
conf := fmt.Sprintf("spark.driver.cores=%s", *app.Spec.Driver.Cores)
Expand All @@ -178,10 +198,13 @@ func addExecutorConfOptions(app *v1alpha1.SparkApplication) []string {
executorConfOptions,
"--conf",
fmt.Sprintf("%s%s=%s", config.SparkExecutorLabelKeyPrefix, config.SparkAppIDLabel, app.Status.AppID))
executorConfOptions = append(
executorConfOptions,
"--conf",
fmt.Sprintf("spark.kubernetes.executor.container.image=%s", app.Spec.Executor.Image))

if app.Spec.Executor.Image != nil {
executorConfOptions = append(
executorConfOptions,
"--conf",
fmt.Sprintf("%s=%s", config.SparkExecutorContainerImageKey, *app.Spec.Executor.Image))
}

if app.Spec.Executor.Instances != nil {
conf := fmt.Sprintf("spark.executor.instances=%d", *app.Spec.Executor.Instances)
Expand Down
6 changes: 3 additions & 3 deletions sparkctl/cmd/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ func TestLoadFromYAML(t *testing.T) {

assert.Equal(t, app.Name, "example")
assert.Equal(t, *app.Spec.MainClass, "org.examples.SparkExample")
assert.Equal(t, app.Spec.MainApplicationFile, "local:///path/to/example.jar")
assert.Equal(t, app.Spec.Driver.Image, "spark")
assert.Equal(t, app.Spec.Executor.Image, "spark")
assert.Equal(t, *app.Spec.MainApplicationFile, "local:///path/to/example.jar")
assert.Equal(t, *app.Spec.Driver.Image, "spark")
assert.Equal(t, *app.Spec.Executor.Image, "spark")
assert.Equal(t, int(*app.Spec.Executor.Instances), 1)
}

Expand Down