@@ -19,7 +19,7 @@ package org.apache.spark.streaming.util
1919import java .io ._
2020import java .nio .ByteBuffer
2121
22- import org .apache .hadoop .fs .Path
22+ import org .apache .hadoop .fs .{ FileStatus , Path }
2323
2424import scala .collection .mutable .ArrayBuffer
2525import scala .concurrent .duration ._
@@ -41,14 +41,14 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
4141 val hadoopConf = new Configuration ()
4242 val dfsDir = Files .createTempDir()
4343 val TEST_BUILD_DATA_KEY : String = " test.build.data"
44- val oldTestBuildDataProp = System .getProperty(TEST_BUILD_DATA_KEY )
44+ val oldTestBuildDataProp = Option (System .getProperty(TEST_BUILD_DATA_KEY ))
45+ System .setProperty(TEST_BUILD_DATA_KEY , dfsDir.toString)
4546 val cluster = new MiniDFSCluster (new Configuration , 2 , true , null )
4647 val nnPort = cluster.getNameNode.getNameNodeAddress.getPort
47- val hdfsUrl = s " hdfs://localhost: $nnPort/ ${getRandomString()}/ "
48+ val hdfsUrl = s " hdfs://localhost: $nnPort/ ${getRandomString()}/ "
4849 var pathForTest : String = null
4950
5051 override def beforeAll () {
51- System .setProperty(TEST_BUILD_DATA_KEY , dfsDir.toString)
5252 cluster.waitActive()
5353 }
5454
@@ -59,7 +59,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
5959 override def afterAll () {
6060 cluster.shutdown()
6161 FileUtils .deleteDirectory(dfsDir)
62- System .setProperty(TEST_BUILD_DATA_KEY , oldTestBuildDataProp )
62+ oldTestBuildDataProp.foreach( System .setProperty(TEST_BUILD_DATA_KEY , _) )
6363 }
6464
6565 test(" WriteAheadLogWriter - writing data" ) {
@@ -71,8 +71,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
7171 assert(writtenData.toArray === dataToWrite.toArray)
7272 }
7373
74- test(" WriteAheadLogWriter - syncing of data by writing and reading immediately using " +
75- " Minicluster" ) {
74+ test(" WriteAheadLogWriter - syncing of data by writing and reading immediately" ) {
7675 val dataToWrite = generateRandomData()
7776 val writer = new WriteAheadLogWriter (pathForTest, hadoopConf)
7877 dataToWrite.foreach { data =>
@@ -98,7 +97,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
9897 reader.close()
9998 }
10099
101- test(" WriteAheadLogReader - sequentially reading data written with writer using Minicluster " ) {
100+ test(" WriteAheadLogReader - sequentially reading data written with writer" ) {
102101 // Write data manually for testing the sequential reader
103102 val dataToWrite = generateRandomData()
104103 writeDataUsingWriter(pathForTest, dataToWrite)
@@ -124,8 +123,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
124123 reader.close()
125124 }
126125
127- test(" WriteAheadLogRandomReader - reading data using random reader written with writer using " +
128- " Minicluster" ) {
126+ test(" WriteAheadLogRandomReader - reading data using random reader written with writer" ) {
129127 // Write data using writer for testing the random reader
130128 val data = generateRandomData()
131129 val segments = writeDataUsingWriter(pathForTest, data)
@@ -141,24 +139,23 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
141139
142140 test(" WriteAheadLogManager - write rotating logs" ) {
143141 // Write data using manager
144- val dataToWrite = generateRandomData(10 )
142+ val dataToWrite = generateRandomData()
145143 val dir = pathForTest
146144 writeDataUsingManager(dir, dataToWrite)
147145
148146 // Read data manually to verify the written data
149147 val logFiles = getLogFilesInDirectory(dir)
150148 assert(logFiles.size > 1 )
151- val writtenData = logFiles.flatMap { file => readDataManually(file) }
152- assert(writtenData.toSet === dataToWrite.toSet )
149+ val writtenData = logFiles.flatMap { file => readDataManually(file)}
150+ assert(writtenData.toList === dataToWrite.toList )
153151 }
154152
155- // This one is failing right now -- commenting out for now.
156153 test(" WriteAheadLogManager - read rotating logs" ) {
157154 // Write data manually for testing reading through manager
158155 val dir = pathForTest
159156 val writtenData = (1 to 10 ).map { i =>
160- val data = generateRandomData(10 )
161- val file = dir + " /log-" + i
157+ val data = generateRandomData()
158+ val file = dir + s " /log- $i - $i "
162159 writeDataManually(data, file)
163160 data
164161 }.flatten
@@ -169,12 +166,12 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
169166
170167 // Read data using manager and verify
171168 val readData = readDataUsingManager(dir)
172- // assert(readData.toList === writtenData.toList)
169+ assert(readData.toList === writtenData.toList)
173170 }
174171
175172 test(" WriteAheadLogManager - recover past logs when creating new manager" ) {
176173 // Write data with manager, recover with new manager and verify
177- val dataToWrite = generateRandomData(100 )
174+ val dataToWrite = generateRandomData()
178175 val dir = pathForTest
179176 writeDataUsingManager(dir, dataToWrite)
180177 val logFiles = getLogFilesInDirectory(dir)
@@ -186,7 +183,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
186183 test(" WriteAheadLogManager - cleanup old logs" ) {
187184 // Write data with manager, recover with new manager and verify
188185 val dir = pathForTest
189- val dataToWrite = generateRandomData(100 )
186+ val dataToWrite = generateRandomData()
190187 val fakeClock = new ManualClock
191188 val manager = new WriteAheadLogManager (dir, hadoopConf,
192189 rollingIntervalSecs = 1 , callerName = " WriteAheadLogSuite" , clock = fakeClock)
@@ -201,7 +198,6 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
201198 assert(getLogFilesInDirectory(dir).size < logFiles.size)
202199 }
203200 }
204-
205201 // TODO (Hari, TD): Test different failure conditions of writers and readers.
206202 // - Failure while reading incomplete/corrupt file
207203}
@@ -243,8 +239,10 @@ object WriteAheadLogSuite {
243239
244240 def writeDataUsingManager (logDirectory : String , data : Seq [String ]) {
245241 val fakeClock = new ManualClock
242+ fakeClock.setTime(1000000 )
246243 val manager = new WriteAheadLogManager (logDirectory, hadoopConf,
247244 rollingIntervalSecs = 1 , callerName = " WriteAheadLogSuite" , clock = fakeClock)
245+ // Ensure that 500 does not get sorted after 2000, so put a high base value.
248246 data.foreach { item =>
249247 fakeClock.addToTime(500 )
250248 manager.writeToLog(item)
@@ -271,7 +269,8 @@ object WriteAheadLogSuite {
271269 val reader = HdfsUtils .getInputStream(file, hadoopConf)
272270 val buffer = new ArrayBuffer [String ]
273271 try {
274- while (true ) { // Read till EOF is thrown
272+ while (true ) {
273+ // Read till EOF is thrown
275274 val length = reader.readInt()
276275 val bytes = new Array [Byte ](length)
277276 reader.read(bytes)
@@ -293,8 +292,10 @@ object WriteAheadLogSuite {
293292 data
294293 }
295294
296- def generateRandomData (numItems : Int = 50 , itemSize : Int = 50 ): Seq [String ] = {
297- (1 to numItems).map { _.toString }
295+ def generateRandomData (): Seq [String ] = {
296+ (1 to 50 ).map {
297+ _.toString
298+ }
298299 }
299300
300301 def getLogFilesInDirectory (directory : String ): Seq [String ] = {
0 commit comments