@@ -19,7 +19,7 @@ package org.apache.spark.streaming.util
1919import java .io ._
2020import java .nio .ByteBuffer
2121
22- import org .apache .hadoop .fs .{ FileStatus , Path }
22+ import org .apache .hadoop .fs .Path
2323
2424import scala .collection .mutable .ArrayBuffer
2525import scala .concurrent .duration ._
@@ -31,63 +31,53 @@ import WriteAheadLogSuite._
3131import com .google .common .io .Files
3232import org .apache .commons .io .FileUtils
3333import org .apache .hadoop .conf .Configuration
34- import org .apache .hadoop .hdfs .MiniDFSCluster
35- import org .scalatest .{BeforeAndAfterAll , BeforeAndAfter , FunSuite }
34+ import org .scalatest .{BeforeAndAfter , FunSuite }
3635import org .apache .spark .util .Utils
3736import org .scalatest .concurrent .Eventually ._
3837
39- class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll {
38+ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
4039
4140 val hadoopConf = new Configuration ()
42- val dfsDir = Files .createTempDir()
43- val TEST_BUILD_DATA_KEY : String = " test.build.data"
44- val oldTestBuildDataProp = Option (System .getProperty(TEST_BUILD_DATA_KEY ))
45- System .setProperty(TEST_BUILD_DATA_KEY , dfsDir.toString)
46- val cluster = new MiniDFSCluster (new Configuration , 2 , true , null )
47- val nnPort = cluster.getNameNode.getNameNodeAddress.getPort
48- val hdfsUrl = s " hdfs://localhost: $nnPort/ ${getRandomString()}/ "
49- var pathForTest : String = null
50-
51- override def beforeAll () {
52- cluster.waitActive()
53- }
41+ var tempDir : File = null
42+ var dirForTest : String = null
43+ var fileForTest : String = null
5444
5545 before {
56- pathForTest = hdfsUrl + getRandomString()
46+ tempDir = Files .createTempDir()
47+ dirForTest = " file:///" + tempDir.toString
48+ fileForTest = " file:///" + new File (tempDir, getRandomString()).toString
5749 }
5850
59- override def afterAll () {
60- cluster.shutdown()
61- FileUtils .deleteDirectory(dfsDir)
62- oldTestBuildDataProp.foreach(System .setProperty(TEST_BUILD_DATA_KEY , _))
51+ after {
52+ FileUtils .deleteDirectory(tempDir)
6353 }
6454
6555 test(" WriteAheadLogWriter - writing data" ) {
6656 val dataToWrite = generateRandomData()
67- val writer = new WriteAheadLogWriter (pathForTest , hadoopConf)
57+ val writer = new WriteAheadLogWriter (fileForTest , hadoopConf)
6858 val segments = dataToWrite.map(data => writer.write(data))
6959 writer.close()
70- val writtenData = readDataManually(pathForTest , segments)
60+ val writtenData = readDataManually(fileForTest , segments)
7161 assert(writtenData.toArray === dataToWrite.toArray)
7262 }
7363
7464 test(" WriteAheadLogWriter - syncing of data by writing and reading immediately" ) {
7565 val dataToWrite = generateRandomData()
76- val writer = new WriteAheadLogWriter (pathForTest , hadoopConf)
66+ val writer = new WriteAheadLogWriter (fileForTest , hadoopConf)
7767 dataToWrite.foreach { data =>
78- val segment = writer.write(ByteBuffer .wrap (data.getBytes() ))
79- val reader = new WriteAheadLogRandomReader (pathForTest , hadoopConf)
68+ val segment = writer.write(stringToByteBuffer (data))
69+ val reader = new WriteAheadLogRandomReader (fileForTest , hadoopConf)
8070 val dataRead = reader.read(segment)
81- assert(data === new String (dataRead.array() ))
71+ assert(data === byteBufferToString (dataRead))
8272 }
8373 writer.close()
8474 }
8575
8676 test(" WriteAheadLogReader - sequentially reading data" ) {
8777 // Write data manually for testing the sequential reader
8878 val writtenData = generateRandomData()
89- writeDataManually(writtenData, pathForTest )
90- val reader = new WriteAheadLogReader (pathForTest , hadoopConf)
79+ writeDataManually(writtenData, fileForTest )
80+ val reader = new WriteAheadLogReader (fileForTest , hadoopConf)
9181 val readData = reader.toSeq.map(byteBufferToString)
9282 assert(readData.toList === writtenData.toList)
9383 assert(reader.hasNext === false )
@@ -100,9 +90,9 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
10090 test(" WriteAheadLogReader - sequentially reading data written with writer" ) {
10191 // Write data manually for testing the sequential reader
10292 val dataToWrite = generateRandomData()
103- writeDataUsingWriter(pathForTest , dataToWrite)
93+ writeDataUsingWriter(fileForTest , dataToWrite)
10494 val iter = dataToWrite.iterator
105- val reader = new WriteAheadLogReader (pathForTest , hadoopConf)
95+ val reader = new WriteAheadLogReader (fileForTest , hadoopConf)
10696 reader.foreach { byteBuffer =>
10797 assert(byteBufferToString(byteBuffer) === iter.next())
10898 }
@@ -112,11 +102,11 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
112102 test(" WriteAheadLogRandomReader - reading data using random reader" ) {
113103 // Write data manually for testing the random reader
114104 val writtenData = generateRandomData()
115- val segments = writeDataManually(writtenData, pathForTest )
105+ val segments = writeDataManually(writtenData, fileForTest )
116106
117107 // Get a random order of these segments and read them back
118108 val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10 ).flatten
119- val reader = new WriteAheadLogRandomReader (pathForTest , hadoopConf)
109+ val reader = new WriteAheadLogRandomReader (fileForTest , hadoopConf)
120110 writtenDataAndSegments.foreach { case (data, segment) =>
121111 assert(data === byteBufferToString(reader.read(segment)))
122112 }
@@ -126,11 +116,11 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
126116 test(" WriteAheadLogRandomReader - reading data using random reader written with writer" ) {
127117 // Write data using writer for testing the random reader
128118 val data = generateRandomData()
129- val segments = writeDataUsingWriter(pathForTest , data)
119+ val segments = writeDataUsingWriter(fileForTest , data)
130120
131121 // Read a random sequence of segments and verify read data
132122 val dataAndSegments = data.zip(segments).toSeq.permutations.take(10 ).flatten
133- val reader = new WriteAheadLogRandomReader (pathForTest , hadoopConf)
123+ val reader = new WriteAheadLogRandomReader (fileForTest , hadoopConf)
134124 dataAndSegments.foreach { case (data, segment) =>
135125 assert(data === byteBufferToString(reader.read(segment)))
136126 }
@@ -140,62 +130,58 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
140130 test(" WriteAheadLogManager - write rotating logs" ) {
141131 // Write data using manager
142132 val dataToWrite = generateRandomData()
143- val dir = pathForTest
144- writeDataUsingManager(dir, dataToWrite)
133+ writeDataUsingManager(dirForTest, dataToWrite)
145134
146135 // Read data manually to verify the written data
147- val logFiles = getLogFilesInDirectory(dir )
136+ val logFiles = getLogFilesInDirectory(dirForTest )
148137 assert(logFiles.size > 1 )
149138 val writtenData = logFiles.flatMap { file => readDataManually(file)}
150139 assert(writtenData.toList === dataToWrite.toList)
151140 }
152141
153142 test(" WriteAheadLogManager - read rotating logs" ) {
154143 // Write data manually for testing reading through manager
155- val dir = pathForTest
156144 val writtenData = (1 to 10 ).map { i =>
157145 val data = generateRandomData()
158- val file = dir + s " /log- $i- $i"
146+ val file = dirForTest + s " /log- $i- $i"
159147 writeDataManually(data, file)
160148 data
161149 }.flatten
162150
163- val logDirectoryPath = new Path (dir )
151+ val logDirectoryPath = new Path (dirForTest )
164152 val fileSystem = HdfsUtils .getFileSystemForPath(logDirectoryPath, hadoopConf)
165153 assert(fileSystem.exists(logDirectoryPath) === true )
166154
167155 // Read data using manager and verify
168- val readData = readDataUsingManager(dir )
156+ val readData = readDataUsingManager(dirForTest )
169157 assert(readData.toList === writtenData.toList)
170158 }
171159
172160 test(" WriteAheadLogManager - recover past logs when creating new manager" ) {
173161 // Write data with manager, recover with new manager and verify
174162 val dataToWrite = generateRandomData()
175- val dir = pathForTest
176- writeDataUsingManager(dir, dataToWrite)
177- val logFiles = getLogFilesInDirectory(dir)
163+ writeDataUsingManager(dirForTest, dataToWrite)
164+ val logFiles = getLogFilesInDirectory(dirForTest)
178165 assert(logFiles.size > 1 )
179- val readData = readDataUsingManager(dir )
166+ val readData = readDataUsingManager(dirForTest )
180167 assert(dataToWrite.toList === readData.toList)
181168 }
182169
183170 test(" WriteAheadLogManager - cleanup old logs" ) {
184171 // Write data with manager, recover with new manager and verify
185- val dir = pathForTest
186172 val dataToWrite = generateRandomData()
187173 val fakeClock = new ManualClock
188- val manager = new WriteAheadLogManager (dir , hadoopConf,
174+ val manager = new WriteAheadLogManager (dirForTest , hadoopConf,
189175 rollingIntervalSecs = 1 , callerName = " WriteAheadLogSuite" , clock = fakeClock)
190176 dataToWrite.foreach { item =>
191177 fakeClock.addToTime(500 ) // half second for each
192178 manager.writeToLog(item)
193179 }
194- val logFiles = getLogFilesInDirectory(dir )
180+ val logFiles = getLogFilesInDirectory(dirForTest )
195181 assert(logFiles.size > 1 )
196182 manager.cleanupOldLogs(fakeClock.currentTime() / 2 )
197183 eventually(timeout(1 second), interval(10 milliseconds)) {
198- assert(getLogFilesInDirectory(dir ).size < logFiles.size)
184+ assert(getLogFilesInDirectory(dirForTest ).size < logFiles.size)
199185 }
200186 }
201187 // TODO (Hari, TD): Test different failure conditions of writers and readers.
@@ -305,7 +291,7 @@ object WriteAheadLogSuite {
305291 if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
306292 fileSystem.listStatus(logDirectoryPath).map {
307293 _.getPath.toString
308- }
294+ }.sorted
309295 } else {
310296 Seq .empty
311297 }
0 commit comments