Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,26 @@ private[spark] class MountSecretsBootstrap(secretNamesToMountPaths: Map[String,
*
* @param pod the pod into which the secret volumes are being added.
* @param container the container into which the secret volumes are being mounted.
* @param addNewVolumes whether to add new secret volumes for the secrets.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this problem arose because we're conflating two things here - adding secret volumes (which are pod-scoped) and adding volume-mounts (which are container-scoped). I think we should separate these out. The branching may work for now, but we should have a future work item to separate these out.

cc/ @mccheah

Copy link
Contributor Author

@liyinan926 liyinan926 Jan 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I didn't separate it out because we will touch this code as part of refactoring the steps code anyway as planned in https://issues.apache.org/jira/browse/SPARK-22839.

* @return the updated pod and container with the secrets mounted.
*/
def mountSecrets(pod: Pod, container: Container): (Pod, Container) = {
def mountSecrets(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we separate this method into addSecretVolumes and mountSecrets? The former would just need the pod argument, and the latter would take the container as argument. That way, we could probably separate this out better and it would make it more readable than the branching. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Done.

pod: Pod,
container: Container,
addNewVolumes: Boolean): (Pod, Container) = {
var podBuilder = new PodBuilder(pod)
secretNamesToMountPaths.keys.foreach { name =>
podBuilder = podBuilder
.editOrNewSpec()
if (addNewVolumes) {
secretNamesToMountPaths.keys.foreach { name =>
podBuilder = podBuilder
.editOrNewSpec()
.addNewVolume()
.withName(secretVolumeName(name))
.withNewSecret()
.withSecretName(name)
.endSecret()
.endVolume()
.withName(secretVolumeName(name))
.withNewSecret()
.withSecretName(name)
.endSecret()
.endVolume()
.endSpec()
}
}

var containerBuilder = new ContainerBuilder(container)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ private[spark] class DriverConfigOrchestrator(
Nil
}

val mountSecretsStep = if (secretNamesToMountPaths.nonEmpty) {
Seq(new DriverMountSecretsStep(new MountSecretsBootstrap(secretNamesToMountPaths)))
} else {
Nil
}

val initContainerBootstrapStep = if (existNonContainerLocalFiles(sparkJars ++ sparkFiles)) {
val orchestrator = new InitContainerConfigOrchestrator(
sparkJars,
Expand All @@ -147,19 +153,13 @@ private[spark] class DriverConfigOrchestrator(
Nil
}

val mountSecretsStep = if (secretNamesToMountPaths.nonEmpty) {
Seq(new DriverMountSecretsStep(new MountSecretsBootstrap(secretNamesToMountPaths)))
} else {
Nil
}

Seq(
initialSubmissionStep,
serviceBootstrapStep,
kubernetesCredentialsStep) ++
dependencyResolutionStep ++
initContainerBootstrapStep ++
mountSecretsStep
mountSecretsStep ++
initContainerBootstrapStep
}

private def existNonContainerLocalFiles(files: Seq[String]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[spark] class DriverMountSecretsStep(

override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val (pod, container) = bootstrap.mountSecrets(
driverSpec.driverPod, driverSpec.driverContainer)
driverSpec.driverPod, driverSpec.driverContainer, addNewVolumes = true)
driverSpec.copy(
driverPod = pod,
driverContainer = container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ private[spark] class InitContainerMountSecretsStep(
bootstrap: MountSecretsBootstrap) extends InitContainerConfigurationStep {

override def configureInitContainer(spec: InitContainerSpec) : InitContainerSpec = {
// Skip adding new secret volumes for the secrets as the volumes have already been added when
// mounting the secrets into the main driver container.
val (driverPod, initContainer) = bootstrap.mountSecrets(
spec.driverPod,
spec.initContainer)
spec.initContainer,
addNewVolumes = false)
spec.copy(
driverPod = driverPod,
initContainer = initContainer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ private[spark] class ExecutorPodFactory(

val (maybeSecretsMountedPod, maybeSecretsMountedContainer) =
mountSecretsBootstrap.map { bootstrap =>
bootstrap.mountSecrets(executorPod, containerWithLimitCores)
bootstrap.mountSecrets(executorPod, containerWithLimitCores, addNewVolumes = true)
}.getOrElse((executorPod, containerWithLimitCores))

val (bootstrappedPod, bootstrappedContainer) =
Expand All @@ -227,7 +227,12 @@ private[spark] class ExecutorPodFactory(

val (pod, mayBeSecretsMountedInitContainer) =
initContainerMountSecretsBootstrap.map { bootstrap =>
bootstrap.mountSecrets(podWithInitContainer.pod, podWithInitContainer.initContainer)
// Skip adding new secret volumes for the secrets as the volumes have already been added
// above when mounting the secrets into the main executor container.
bootstrap.mountSecrets(
podWithInitContainer.pod,
podWithInitContainer.initContainer,
addNewVolumes = false)
}.getOrElse((podWithInitContainer.pod, podWithInitContainer.initContainer))

val bootstrappedPod = KubernetesUtils.appendInitContainer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit
package org.apache.spark.deploy.k8s

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{Container, Pod}

private[spark] object SecretVolumeUtils {

def podHasVolume(driverPod: Pod, volumeName: String): Boolean = {
driverPod.getSpec.getVolumes.asScala.exists(volume => volume.getName == volumeName)
def podHasVolume(pod: Pod, volumeName: String): Boolean = {
pod.getSpec.getVolumes.asScala.exists(volume => volume.getName == volumeName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're touching this, the style is always .exists { foo => ... }.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

def containerHasVolume(
driverContainer: Container,
volumeName: String,
mountPath: String): Boolean = {
driverContainer.getVolumeMounts.asScala.exists(volumeMount =>
def containerHasVolume(container: Container, volumeName: String, mountPath: String): Boolean = {
container.getVolumeMounts.asScala.exists(volumeMount =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

volumeMount.getName == volumeName && volumeMount.getMountPath == mountPath)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.spark.deploy.k8s.submit.steps

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.MountSecretsBootstrap
import org.apache.spark.deploy.k8s.submit.{KubernetesDriverSpec, SecretVolumeUtils}
import org.apache.spark.deploy.k8s.{MountSecretsBootstrap, SecretVolumeUtils}
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec

class DriverMountSecretsStepSuite extends SparkFunSuite {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package org.apache.spark.deploy.k8s.submit.steps.initcontainer
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.MountSecretsBootstrap
import org.apache.spark.deploy.k8s.submit.SecretVolumeUtils
import org.apache.spark.deploy.k8s.{MountSecretsBootstrap, SecretVolumeUtils}

class InitContainerMountSecretsStepSuite extends SparkFunSuite {

Expand All @@ -44,12 +43,8 @@ class InitContainerMountSecretsStepSuite extends SparkFunSuite {
val initContainerMountSecretsStep = new InitContainerMountSecretsStep(mountSecretsBootstrap)
val configuredInitContainerSpec = initContainerMountSecretsStep.configureInitContainer(
baseInitContainerSpec)

val podWithSecretsMounted = configuredInitContainerSpec.driverPod
val initContainerWithSecretsMounted = configuredInitContainerSpec.initContainer

Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
assert(SecretVolumeUtils.podHasVolume(podWithSecretsMounted, volumeName)))
Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
assert(SecretVolumeUtils.containerHasVolume(
initContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{InitContainerBootstrap, MountSecretsBootstrap, PodWithDetachedInitContainer}
import org.apache.spark.deploy.k8s.{InitContainerBootstrap, MountSecretsBootstrap, PodWithDetachedInitContainer, SecretVolumeUtils}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._

Expand Down Expand Up @@ -172,10 +172,8 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())

assert(executor.getSpec.getInitContainers.size() === 1)
assert(executor.getSpec.getInitContainers.get(0).getVolumeMounts.get(0).getName
=== "secret1-volume")
assert(executor.getSpec.getInitContainers.get(0).getVolumeMounts.get(0)
.getMountPath === "/var/secret1")
assert(SecretVolumeUtils.containerHasVolume(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to change None at line 168 to Some(secretBootstrap) and check volumes' number to avoid regression.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link

@hex108 hex108 Jan 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! We also need check volumes' num in pod spec.

// check volume mounted.
assert(executor.getSpec.getVolumes.size() === 1)
assert(executor.getSpec.getVolumes.get(0).getSecret.getSecretName === "secret1")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. 93e1d64.

executor.getSpec.getInitContainers.get(0), "secret1-volume", "/var/secret1"))

checkOwnerReferences(executor, driverPodUid)
}
Expand Down