Skip to content

Commit 5c70d1f

Browse files
Remove underlying stream from the WALWriter.
1 parent 4ab602a commit 5c70d1f

File tree

1 file changed

+4
-21
lines changed

1 file changed

+4
-21
lines changed

streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,8 @@ import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
3030
*/
3131
private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
3232
extends Closeable {
33-
private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = {
34-
val uri = new URI(path)
35-
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
36-
val isDefaultLocal = defaultFs == null || defaultFs == "file"
3733

38-
if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
39-
assert(!new File(uri.getPath).exists)
40-
Left(new DataOutputStream(new BufferedOutputStream(new FileOutputStream(uri.getPath))))
41-
} else {
42-
Right(HdfsUtils.getOutputStream(path, hadoopConf))
43-
}
44-
}
34+
private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)
4535

4636
private lazy val hadoopFlushMethod = {
4737
val cls = classOf[FSDataOutputStream]
@@ -77,21 +67,14 @@ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configura
7767
stream.close()
7868
}
7969

80-
private def stream(): DataOutputStream = {
81-
underlyingStream.fold(x => x, x => x)
82-
}
8370

8471
private def getPosition(): Long = {
85-
underlyingStream match {
86-
case Left(localStream) => localStream.size
87-
case Right(dfsStream) => dfsStream.getPos()
88-
}
72+
stream.getPos()
8973
}
9074

9175
private def flush() {
92-
underlyingStream match {
93-
case Left(localStream) => localStream.flush
94-
case Right(dfsStream) => hadoopFlushMethod.foreach { _.invoke(dfsStream) }
76+
hadoopFlushMethod.foreach {
77+
_.invoke(stream)
9578
}
9679
}
9780

0 commit comments

Comments
 (0)