Skip to content

Commit ef8db09

Browse files
committed
Merge pull request #17 from harishreedharan/driver-ha-wal
Use Minicluster for WAL tests.
2 parents 4ab602a + 7e40e56 commit ef8db09

File tree

7 files changed

+132
-120
lines changed

7 files changed

+132
-120
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,12 @@
406406
<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
407407
<version>${akka.version}</version>
408408
</dependency>
409+
<dependency>
410+
<groupId>org.apache.hadoop</groupId>
411+
<artifactId>hadoop-minicluster</artifactId>
412+
<version>${hadoop.version}</version>
413+
<scope>test</scope>
414+
</dependency>
409415
<dependency>
410416
<groupId>${akka.group}</groupId>
411417
<artifactId>akka-testkit_${scala.binary.version}</artifactId>

streaming/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@
6868
<artifactId>junit-interface</artifactId>
6969
<scope>test</scope>
7070
</dependency>
71+
<dependency>
72+
<groupId>org.apache.hadoop</groupId>
73+
<artifactId>hadoop-minicluster</artifactId>
74+
<scope>test</scope>
75+
</dependency>
7176
</dependencies>
7277
<build>
7378
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,7 @@ private[streaming] object HdfsUtils {
2525
// HDFS is not thread-safe when getFileSystem is called, so synchronize on that
2626

2727
val dfsPath = new Path(path)
28-
val dfs =
29-
this.synchronized {
30-
dfsPath.getFileSystem(conf)
31-
}
28+
val dfs = getFileSystemForPath(dfsPath, conf)
3229
// If the file exists and we have append support, append instead of creating a new file
3330
val stream: FSDataOutputStream = {
3431
if (dfs.isFile(dfsPath)) {
@@ -46,27 +43,26 @@ private[streaming] object HdfsUtils {
4643

4744
def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
4845
val dfsPath = new Path(path)
49-
val dfs = this.synchronized {
50-
dfsPath.getFileSystem(conf)
51-
}
46+
val dfs = getFileSystemForPath(dfsPath, conf)
5247
val instream = dfs.open(dfsPath)
5348
instream
5449
}
5550

5651
def checkState(state: Boolean, errorMsg: => String) {
57-
if(!state) {
52+
if (!state) {
5853
throw new IllegalStateException(errorMsg)
5954
}
6055
}
6156

6257
def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = {
6358
val dfsPath = new Path(path)
64-
val dfs =
65-
this.synchronized {
66-
dfsPath.getFileSystem(conf)
67-
}
59+
val dfs = getFileSystemForPath(dfsPath, conf)
6860
val fileStatus = dfs.getFileStatus(dfsPath)
6961
val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen))
7062
blockLocs.map(_.flatMap(_.getHosts))
7163
}
64+
65+
def getFileSystemForPath(path: Path, conf: Configuration) = synchronized {
66+
path.getFileSystem(conf)
67+
}
7268
}

streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,18 @@ private[streaming] class WriteAheadLogManager(
6363
Utils.newDaemonFixedThreadPool(1, threadpoolName))
6464
override protected val logName = s"WriteAheadLogManager $callerNameTag"
6565

66-
private var currentLogPath: String = null
66+
private var currentLogPath: Option[String] = None
6767
private var currentLogWriter: WriteAheadLogWriter = null
6868
private var currentLogWriterStartTime: Long = -1L
6969
private var currentLogWriterStopTime: Long = -1L
7070

7171
initializeOrRecover()
7272

73-
/** Write a byte buffer to the log file */
73+
/**
74+
* Write a byte buffer to the log file. This method synchronously writes the data in the
75+
* ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
76+
* to HDFS, and will be available for readers to read.
77+
*/
7478
def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
7579
var fileSegment: FileSegment = null
7680
var failures = 0
@@ -99,13 +103,13 @@ private[streaming] class WriteAheadLogManager(
99103
* Read all the existing logs from the log directory.
100104
*
101105
* Note that this is typically called when the caller is initializing and wants
102-
* to recover past state from the write ahead logs (that is, before making any writes).
106+
* to recover past state from the write ahead logs (that is, before making any writes).
103107
* If this is called after writes have been made using this manager, then it may not return
104108
* the latest the records. This does not deal with currently active log files, and
105109
* hence the implementation is kept simple.
106110
*/
107111
def readFromLog(): Iterator[ByteBuffer] = synchronized {
108-
val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath)
112+
val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
109113
logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
110114
logFilesToRead.iterator.map { file =>
111115
logDebug(s"Creating log reader with $file")
@@ -130,7 +134,7 @@ private[streaming] class WriteAheadLogManager(
130134
oldLogFiles.foreach { logInfo =>
131135
try {
132136
val path = new Path(logInfo.path)
133-
val fs = hadoopConf.synchronized { path.getFileSystem(hadoopConf) }
137+
val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
134138
fs.delete(path, true)
135139
synchronized { pastLogs -= logInfo }
136140
logDebug(s"Cleared log file $logInfo")
@@ -159,34 +163,30 @@ private[streaming] class WriteAheadLogManager(
159163
private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized {
160164
if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
161165
resetWriter()
162-
if (currentLogPath != null) {
163-
pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, currentLogPath)
166+
currentLogPath.foreach {
167+
pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _)
164168
}
165169
currentLogWriterStartTime = currentTime
166170
currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
167171
val newLogPath = new Path(logDirectory,
168172
timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
169-
currentLogPath = newLogPath.toString
170-
currentLogWriter = new WriteAheadLogWriter(currentLogPath, hadoopConf)
173+
currentLogPath = Some(newLogPath.toString)
174+
currentLogWriter = new WriteAheadLogWriter(currentLogPath.get, hadoopConf)
171175
}
172176
currentLogWriter
173177
}
174178

175179
/** Initialize the log directory or recover existing logs inside the directory */
176180
private def initializeOrRecover(): Unit = synchronized {
177181
val logDirectoryPath = new Path(logDirectory)
178-
val fileSystem = logDirectoryPath.getFileSystem(hadoopConf)
182+
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
179183

180184
if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
181185
val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
182186
pastLogs.clear()
183187
pastLogs ++= logFileInfo
184188
logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
185189
logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
186-
} else {
187-
fileSystem.mkdirs(logDirectoryPath,
188-
FsPermission.createImmutable(Integer.parseInt("770", 8).toShort))
189-
logInfo(s"Created ${logDirectory} for write ahead log files")
190190
}
191191
}
192192

streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
5656
close()
5757
false
5858
case e: Exception =>
59-
logDebug("Error reading next item, EOF reached", e)
59+
logWarning("Error while trying to read data from HDFS.", e)
6060
close()
6161
throw e
6262
}

streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,8 @@ import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
3030
*/
3131
private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
3232
extends Closeable {
33-
private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = {
34-
val uri = new URI(path)
35-
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
36-
val isDefaultLocal = defaultFs == null || defaultFs == "file"
3733

38-
if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
39-
assert(!new File(uri.getPath).exists)
40-
Left(new DataOutputStream(new BufferedOutputStream(new FileOutputStream(uri.getPath))))
41-
} else {
42-
Right(HdfsUtils.getOutputStream(path, hadoopConf))
43-
}
44-
}
34+
private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)
4535

4636
private lazy val hadoopFlushMethod = {
4737
val cls = classOf[FSDataOutputStream]
@@ -77,21 +67,14 @@ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configura
7767
stream.close()
7868
}
7969

80-
private def stream(): DataOutputStream = {
81-
underlyingStream.fold(x => x, x => x)
82-
}
8370

8471
private def getPosition(): Long = {
85-
underlyingStream match {
86-
case Left(localStream) => localStream.size
87-
case Right(dfsStream) => dfsStream.getPos()
88-
}
72+
stream.getPos()
8973
}
9074

9175
private def flush() {
92-
underlyingStream match {
93-
case Left(localStream) => localStream.flush
94-
case Right(dfsStream) => hadoopFlushMethod.foreach { _.invoke(dfsStream) }
76+
hadoopFlushMethod.foreach {
77+
_.invoke(stream)
9578
}
9679
}
9780

0 commit comments

Comments
 (0)