Skip to content

Commit c2bc738

Browse files
committed
More changes based on PR comments.
1 parent 569a416 commit c2bc738

File tree

3 files changed

+20
-1
lines changed

3 files changed

+20
-1
lines changed

streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
103103
var dataRead: ByteBuffer = null
104104
var writeAheadLog: WriteAheadLog = null
105105
try {
106+
// The WriteAheadLogUtils.createLog*** method needs a directory to create a
107+
// WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
108+
// writing log data. However, the directory is not needed if data needs to be read, hence
109+
// a dummy path is provided to satisfy the method parameter requirements.
110+
// FileBasedWriteAheadLog will not create any file or directory at that path.
106111
val dummyDirectory = FileUtils.getTempDirectoryPath()
107112
writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
108113
SparkEnv.get.conf, dummyDirectory, hadoopConf)
@@ -114,6 +119,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
114119
} finally {
115120
if (writeAheadLog != null) {
116121
writeAheadLog.close()
122+
writeAheadLog = null
117123
}
118124
}
119125
if (dataRead == null) {

streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration
2424
/**
2525
* A random access reader for reading write ahead log files written using
2626
* [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]]. Given the file segment info,
27-
* this reads the record (bytebuffer) from the log file.
27+
* this reads the record (ByteBuffer) from the log file.
2828
*/
2929
private[streaming] class FileBasedWriteAheadLogRandomReader(path: String, conf: Configuration)
3030
extends Closeable {

streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,19 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
288288
val readData = readDataUsingWriteAheadLog(testDir)
289289
assert(readData === dataToWrite2)
290290
}
291+
292+
test("FileBasedWriteAheadLog - do not create directories or files unless write") {
293+
val nonexistentTempPath = File.createTempFile("test", "")
294+
nonexistentTempPath.delete()
295+
assert(!nonexistentTempPath.exists())
296+
297+
val writtenSegment = writeDataManually(generateRandomData(), testFile)
298+
val wal = new FileBasedWriteAheadLog(
299+
new SparkConf(), tempDir.getAbsolutePath, new Configuration(), 1, 1)
300+
assert(!nonexistentTempPath.exists(), "Directory created just by creating log object")
301+
wal.read(writtenSegment.head)
302+
assert(!nonexistentTempPath.exists(), "Directory created just by attempting to read segment")
303+
}
291304
}
292305

293306
object WriteAheadLogSuite {

0 commit comments

Comments
 (0)