Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{Logging, SparkConf, SparkContext}
Expand Down Expand Up @@ -51,10 +52,12 @@ private[spark] class EventLoggingListener(
private val logBaseDir = conf.get("spark.eventLog.dir", "/tmp/spark-events").stripSuffix("/")
private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
val logDir = logBaseDir + "/" + name
val LOG_FILE_PERMISSIONS: FsPermission =
FsPermission.createImmutable(Integer.parseInt("770", 8).toShort: Short)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to specify : Short; it's pretty clear what it is with toShort

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I think it's better if we move this to object EventLoggingListener, alongside with other scary all caps variables.


private val logger =
new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress,
shouldOverwrite)
shouldOverwrite, Some(LOG_FILE_PERMISSIONS))

/**
* Begin logging events.
Expand All @@ -64,10 +67,11 @@ private[spark] class EventLoggingListener(
logInfo("Logging events to %s".format(logDir))
if (shouldCompress) {
val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
logger.newFile(COMPRESSION_CODEC_PREFIX + codec, Some(LOG_FILE_PERMISSIONS))
}
logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION)
logger.newFile(LOG_PREFIX + logger.fileIndex)
logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION,
Some(LOG_FILE_PERMISSIONS))
logger.newFile(LOG_PREFIX + logger.fileIndex, Some(LOG_FILE_PERMISSIONS))
}

/** Log the event as JSON. */
Expand Down Expand Up @@ -114,7 +118,7 @@ private[spark] class EventLoggingListener(
* In addition, create an empty special file to indicate application completion.
*/
def stop() = {
logger.newFile(APPLICATION_COMPLETE)
logger.newFile(APPLICATION_COMPLETE, Some(LOG_FILE_PERMISSIONS))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little awkward that every time we create a new file we pass in Some(LOG_FILE_PERMISSIONS). I think in FileLogger we should check if the logger has a permissions and default on it if it does. (see below)

logger.stop()
}
}
Expand Down
20 changes: 15 additions & 5 deletions core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.Date

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
import org.apache.hadoop.fs.permission.FsPermission

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.io.CompressionCodec
Expand All @@ -42,7 +43,8 @@ private[spark] class FileLogger(
hadoopConfiguration: Configuration,
outputBufferSize: Int = 8 * 1024, // 8 KB
compress: Boolean = false,
overwrite: Boolean = true)
overwrite: Boolean = true,
dirPermissions: Option[FsPermission] = None)
extends Logging {

private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
Expand Down Expand Up @@ -79,16 +81,23 @@ private[spark] class FileLogger(
if (!fileSystem.mkdirs(path)) {
throw new IOException("Error in creating log directory: %s".format(logDir))
}
if (dirPermissions.isDefined) {
val fsStatus = fileSystem.getFileStatus(path)
if (fsStatus.getPermission().toShort() != dirPermissions.get.toShort()) {
fileSystem.setPermission(path, dirPermissions.get);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: semicolon, also in the line above, do toShort without parentheses

}
}
}

/**
* Create a new writer for the file identified by the given path.
*/
private def createWriter(fileName: String): PrintWriter = {
private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class says its a generic logging class. Left this to allow file permissions to be different per file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

val logPath = logDir + "/" + fileName
val uri = new URI(logPath)
val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme
val isDefaultLocal = (defaultFs == null || defaultFs == "file")
val path = new Path(logPath)

/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
* Therefore, for local files, use FileOutputStream instead. */
Expand All @@ -97,11 +106,11 @@ private[spark] class FileLogger(
// Second parameter is whether to append
new FileOutputStream(uri.getPath, !overwrite)
} else {
val path = new Path(logPath)
hadoopDataStream = Some(fileSystem.create(path, overwrite))
hadoopDataStream.get
}

perms.foreach {p => fileSystem.setPermission(path, p)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I would do perms.orElse(dirPermissions).foreach { ... }, so we fall back on the logger's permissions if they exist. That way if we intend for all our files to have one set of permissions (e.g. in event logging) we don't have to provide Some(permissions) every time.

val bstream = new BufferedOutputStream(dstream, outputBufferSize)
val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream
new PrintWriter(cstream)
Expand Down Expand Up @@ -150,15 +159,16 @@ private[spark] class FileLogger(
/**
* Start a writer for a new file, closing the existing one if it exists.
* @param fileName Name of the new file, defaulting to the file index if not provided.
* @param perms Permissions to put on the new file.
*/
def newFile(fileName: String = "") {
def newFile(fileName: String = "", perms: Option[FsPermission] = None) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here (unless your intent was to have separate permissions on the file level granularity, which is also fine. Though this is not applicable to event logging)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep intent was to allow permissions to be set at file level if someone wanted.

fileIndex += 1
writer.foreach(_.close())
val name = fileName match {
case "" => fileIndex.toString
case _ => fileName
}
writer = Some(createWriter(name))
writer = Some(createWriter(name, perms))
}

/**
Expand Down
2 changes: 2 additions & 0 deletions docs/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Spark currently supports authentication via a shared secret. Authentication can

The Spark UI can also be secured by using javax servlet filters. A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view acls to make sure they are authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls' control the behavior of the acls. Note that the person who started the application always has view access to the UI.

If your applications are using event logging, the directory where the event logs go (`spark.eventLog.dir`) should be manually created and have the proper permissions set on it. If you want those log files secure, the permissions should be set to drwxrwxrwxt for that directory. The owner of the directory should be the super user who is running the history server and the group permissions should be restricted to super user group. This will allow all users to write to the directory but will prevent unprivileged users from removing or renaming a file unless they own the file or directory. The event log files will be created by Spark with permissions such that only the user and group have read and write access.

For Spark on Yarn deployments, configuring `spark.authenticate` to true will automatically handle generating and distributing the shared secret. Each application will use a unique shared secret. The Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. If an authentication filter is enabled, the acls controls can be used by control which users can via the Spark UI.

For other types of Spark deployments, the spark config `spark.authenticate.secret` should be configured on each of the nodes. This secret will be used by all the Master/Workers and applications. The UI can be secured using a javax servlet filter installed via `spark.ui.filters`. If an authentication filter is enabled, the acls controls can be used by control which users can via the Spark UI.
Expand Down