File tree Expand file tree Collapse file tree 5 files changed +105
-72
lines changed
main/scala/org/apache/spark/streaming/util
test/scala/org/apache/spark/streaming/util Expand file tree Collapse file tree 5 files changed +105
-72
lines changed Original file line number Diff line number Diff line change 406406 <artifactId >akka-slf4j_${scala.binary.version}</artifactId >
407407 <version >${akka.version} </version >
408408 </dependency >
409+ <dependency >
410+ <groupId >org.apache.hadoop</groupId >
411+ <artifactId >hadoop-minicluster</artifactId >
412+ <version >${hadoop.version} </version >
413+ <scope >test</scope >
414+ </dependency >
409415 <dependency >
410416 <groupId >${akka.group} </groupId >
411417 <artifactId >akka-testkit_${scala.binary.version}</artifactId >
Original file line number Diff line number Diff line change 6868 <artifactId >junit-interface</artifactId >
6969 <scope >test</scope >
7070 </dependency >
71+ <dependency >
72+ <groupId >org.apache.hadoop</groupId >
73+ <artifactId >hadoop-minicluster</artifactId >
74+ <scope >test</scope >
75+ </dependency >
7176 </dependencies >
7277 <build >
7378 <outputDirectory >target/scala-${scala.binary.version} /classes</outputDirectory >
Original file line number Diff line number Diff line change @@ -25,10 +25,9 @@ private[streaming] object HdfsUtils {
2525 // HDFS is not thread-safe when getFileSystem is called, so synchronize on that
2626
2727 val dfsPath = new Path (path)
28- val dfs =
29- this .synchronized {
30- dfsPath.getFileSystem(conf)
31- }
28+ val dfs = this .synchronized {
29+ dfsPath.getFileSystem(conf)
30+ }
3231 // If the file exists and we have append support, append instead of creating a new file
3332 val stream : FSDataOutputStream = {
3433 if (dfs.isFile(dfsPath)) {
@@ -54,17 +53,16 @@ private[streaming] object HdfsUtils {
5453 }
5554
5655 def checkState (state : Boolean , errorMsg : => String ) {
57- if (! state) {
56+ if (! state) {
5857 throw new IllegalStateException (errorMsg)
5958 }
6059 }
6160
6261 def getBlockLocations (path : String , conf : Configuration ): Option [Array [String ]] = {
6362 val dfsPath = new Path (path)
64- val dfs =
65- this .synchronized {
66- dfsPath.getFileSystem(conf)
67- }
63+ val dfs = this .synchronized {
64+ dfsPath.getFileSystem(conf)
65+ }
6866 val fileStatus = dfs.getFileStatus(dfsPath)
6967 val blockLocs = Option (dfs.getFileBlockLocations(fileStatus, 0 , fileStatus.getLen))
7068 blockLocs.map(_.flatMap(_.getHosts))
Original file line number Diff line number Diff line change @@ -183,10 +183,6 @@ private[streaming] class WriteAheadLogManager(
183183 pastLogs ++= logFileInfo
184184 logInfo(s " Recovered ${logFileInfo.size} write ahead log files from $logDirectory" )
185185 logDebug(s " Recovered files are: \n ${logFileInfo.map(_.path).mkString(" \n " )}" )
186- } else {
187- fileSystem.mkdirs(logDirectoryPath,
188- FsPermission .createImmutable(Integer .parseInt(" 770" , 8 ).toShort))
189- logInfo(s " Created ${logDirectory} for write ahead log files " )
190186 }
191187 }
192188
You can’t perform that action at this time.
0 commit comments