Skip to content

Commit 55514e2

Browse files
committed
Minor changes based on PR comments.
1 parent d29fddd commit 55514e2

File tree

2 files changed

+6
-5
lines changed

2 files changed

+6
-5
lines changed

streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@
1717
package org.apache.spark.streaming.util
1818

1919
import org.apache.hadoop.conf.Configuration
20-
import org.apache.hadoop.fs.{LocalFileSystem, FSDataInputStream, FSDataOutputStream, Path}
20+
import org.apache.hadoop.fs._
2121

2222
private[streaming] object HdfsUtils {
2323

2424
def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
25-
// HDFS is not thread-safe when getFileSystem is called, so synchronize on that
2625
val dfsPath = new Path(path)
2726
val dfs = getFileSystemForPath(dfsPath, conf)
2827
// If the file exists and we have append support, append instead of creating a new file
@@ -61,7 +60,9 @@ private[streaming] object HdfsUtils {
6160
blockLocs.map(_.flatMap(_.getHosts))
6261
}
6362

64-
def getFileSystemForPath(path: Path, conf: Configuration) = synchronized {
63+
def getFileSystemForPath(path: Path, conf: Configuration): FileSystem = synchronized {
64+
// For local file systems, return the raw loca file system, such calls to flush()
65+
// actually flushes the stream.
6566
val fs = path.getFileSystem(conf)
6667
fs match {
6768
case localFs: LocalFileSystem => localFs.getRawFileSystem

streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import WriteAheadLogManager._
3737
* Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
3838
* and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
3939
*
40-
*@param logDirectory Directory when rotating log files will be created.
40+
* @param logDirectory Directory when rotating log files will be created.
4141
* @param hadoopConf Hadoop configuration for reading/writing log files.
4242
* @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
4343
* Default is one minute.
@@ -57,7 +57,7 @@ private[streaming] class WriteAheadLogManager(
5757

5858
private val pastLogs = new ArrayBuffer[LogInfo]
5959
private val callerNameTag =
60-
if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
60+
if (callerName.nonEmpty) s" for $callerName" else ""
6161
private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
6262
implicit private val executionContext = ExecutionContext.fromExecutorService(
6363
Utils.newDaemonFixedThreadPool(1, threadpoolName))

0 commit comments

Comments
 (0)