File tree Expand file tree Collapse file tree 1 file changed +8
-2
lines changed
streaming/src/main/scala/org/apache/spark/streaming/rdd Expand file tree Collapse file tree 1 file changed +8
-2
lines changed Original file line number Diff line number Diff line change 1616 */
1717package org .apache .spark .streaming .rdd
1818
19+ import java .io .File
1920import java .nio .ByteBuffer
21+ import java .util .UUID
2022
2123import scala .reflect .ClassTag
2224import scala .util .control .NonFatal
@@ -108,9 +110,13 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
108110 // writing log data. However, the directory is not needed if data needs to be read, hence
109111 // a dummy path is provided to satisfy the method parameter requirements.
110112 // FileBasedWriteAheadLog will not create any file or directory at that path.
111- val dummyDirectory = FileUtils .getTempDirectoryPath()
113+ // FileBasedWriteAheadLog will not create any file or directory at that path. Also,
114+ // this dummy directory should not already exist otherwise the WAL will try to recover
115+ // past events from the directory and throw errors.
116+ val nonExistentDirectory = new File (
117+ FileUtils .getTempDirectory(), UUID .randomUUID().toString).getAbsolutePath
112118 writeAheadLog = WriteAheadLogUtils .createLogForReceiver(
113- SparkEnv .get.conf, dummyDirectory , hadoopConf)
119+ SparkEnv .get.conf, nonExistentDirectory , hadoopConf)
114120 dataRead = writeAheadLog.read(partition.walRecordHandle)
115121 } catch {
116122 case NonFatal (e) =>
You can’t perform that action at this time.
0 commit comments