Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,25 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

import PythonWorkerFactory._

// Because forking processes from Java is expensive, we prefer to launch a single Python daemon
// (pyspark/daemon.py) and tell it to fork new workers for our tasks. This daemon currently
// only works on UNIX-based systems now because it uses signals for child management, so we can
// also fall back to launching workers (pyspark/worker.py) directly.
// Because forking processes from Java is expensive, we prefer to launch a single Python daemon,
// pyspark/daemon.py (by default) and tell it to fork new workers for our tasks. This daemon
// currently only works on UNIX-based systems now because it uses signals for child management,
// so we can also fall back to launching workers, pyspark/worker.py (by default) directly.
val useDaemon = {
val useDaemonEnabled = SparkEnv.get.conf.getBoolean("spark.python.use.daemon", true)

// This flag is ignored on Windows as it's unable to fork.
!System.getProperty("os.name").startsWith("Windows") && useDaemonEnabled
}

// This configuration indicates the module to run the daemon to execute its Python workers.
val daemonModule = SparkEnv.get.conf.get("spark.python.daemon.module", "pyspark.daemon")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally, I thought we use the name "command" as what we call the thing to execute

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yup that's true in general. But please let me stick to "module" here as that's what we execute (python -m) describes:

python --help
...
-m mod : run library module as a script (terminates option list)
...


// This configuration indicates the module to run each Python worker.
// Note that this configuration only has an effect when 'spark.python.use.daemon' is enabled
// and the platform is not Windows.
val workerModule = SparkEnv.get.conf.get("spark.python.worker.module", "pyspark.worker")

var daemon: Process = null
val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
var daemonPort: Int = 0
Expand Down Expand Up @@ -74,8 +82,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}

/**
* Connect to a worker launched through pyspark/daemon.py, which forks python processes itself
* to avoid the high cost of forking from Java. This currently only works on UNIX-based systems.
* Connect to a worker launched through pyspark/daemon.py (by default), which forks python
* processes itself to avoid the high cost of forking from Java. This currently only works
* on UNIX-based systems.
*/
private def createThroughDaemon(): Socket = {

Expand Down Expand Up @@ -108,15 +117,15 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}

/**
* Launch a worker by executing worker.py directly and telling it to connect to us.
* Launch a worker by executing worker.py (by default) directly and telling it to connect to us.
*/
private def createSimpleWorker(): Socket = {
var serverSocket: ServerSocket = null
try {
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))

// Create and start the worker
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.worker"))
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", workerModule))
val workerEnv = pb.environment()
workerEnv.putAll(envVars.asJava)
workerEnv.put("PYTHONPATH", pythonPath)
Expand Down Expand Up @@ -159,7 +168,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

try {
// Create and start the daemon
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.daemon"))
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule))
val workerEnv = pb.environment()
workerEnv.putAll(envVars.asJava)
workerEnv.put("PYTHONPATH", pythonPath)
Expand Down