Skip to content

Commit 6e8156e

Browse files
Simplify tests, use Guava stream copy methods.
1 parent d8ddede commit 6e8156e

File tree

3 files changed

+50
-83
lines changed

3 files changed

+50
-83
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.util.zip.{ZipEntry, ZipOutputStream}
2323

2424
import scala.collection.mutable
2525

26+
import com.google.common.io.ByteStreams
2627
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
2728
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2829
import org.apache.hadoop.fs.permission.AccessControlException
@@ -233,19 +234,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
233234
*/
234235
def zipFileToStream(file: Path, entryName: String, outputStream: ZipOutputStream): Unit = {
235236
val fs = FileSystem.get(hadoopConf)
236-
val buffer = new Array[Byte](64 * 1024)
237237
val inputStream = fs.open(file, 1 * 1024 * 1024) // 1MB Buffer
238238
try {
239239
outputStream.putNextEntry(new ZipEntry(entryName))
240-
var dataRemaining = true
241-
while (dataRemaining) {
242-
val length = inputStream.read(buffer)
243-
if (length != -1) {
244-
outputStream.write(buffer, 0, length)
245-
} else {
246-
dataRemaining = false
247-
}
248-
}
240+
ByteStreams.copy(inputStream, outputStream)
249241
outputStream.closeEntry()
250242
} finally {
251243
inputStream.close()

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@
1717

1818
package org.apache.spark.deploy.history
1919

20-
import java.io.{BufferedOutputStream, FileInputStream, File, FileOutputStream, OutputStreamWriter}
20+
import java.io.{BufferedOutputStream, ByteArrayInputStream, ByteArrayOutputStream, File,
21+
FileOutputStream, OutputStreamWriter}
2122
import java.net.URI
2223
import java.util.concurrent.TimeUnit
23-
import java.util.zip.ZipOutputStream
24+
import java.util.zip.{ZipInputStream, ZipOutputStream}
2425

2526
import scala.io.Source
2627

27-
import org.apache.commons.io.IOUtils
28+
import com.google.common.base.Charsets
29+
import com.google.common.io.{ByteStreams, Files}
2830
import org.apache.hadoop.fs.Path
2931
import org.json4s.jackson.JsonMethods._
3032
import org.scalatest.{BeforeAndAfter, FunSuite}
@@ -351,36 +353,23 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
351353
provider.checkForLogs()
352354

353355
(1 to 2).foreach { i =>
354-
val outDir = Utils.createTempDir()
355-
val unzipDir = Utils.createTempDir()
356-
try {
357-
Utils.chmod700(outDir)
358-
Utils.chmod700(unzipDir)
359-
val outFile = new File(outDir, s"file$i.zip")
360-
val outputStream = new ZipOutputStream(new FileOutputStream(outFile))
361-
provider.writeEventLogs("downloadApp1", Some(s"attempt$i"), outputStream)
362-
HistoryTestUtils.unzipToDir(new FileInputStream(outFile), unzipDir)
363-
val actualFiles = unzipDir.listFiles()
364-
assert(actualFiles.length == 1)
365-
actualFiles.foreach { actualFile =>
366-
val expFile = logs.find(_.getName == actualFile.getName).get
367-
val expStream = new FileInputStream(expFile)
368-
val resultStream = new FileInputStream(actualFile)
369-
try {
370-
val input = IOUtils.toString(expStream)
371-
val out = IOUtils.toString(resultStream)
372-
out should be(input)
373-
} finally {
374-
Seq(expStream, resultStream).foreach { s =>
375-
Utils.tryWithSafeFinally(s.close())()
376-
}
377-
}
378-
}
379-
} finally {
380-
Seq(outDir, unzipDir).foreach { f =>
381-
Utils.tryWithSafeFinally(Utils.deleteRecursively(f))()
382-
}
356+
val underlyingStream = new ByteArrayOutputStream()
357+
val outputStream = new ZipOutputStream(underlyingStream)
358+
provider.writeEventLogs("downloadApp1", Some(s"attempt$i"), outputStream)
359+
outputStream.close()
360+
val inputStream = new ZipInputStream(new ByteArrayInputStream(underlyingStream.toByteArray))
361+
var totalEntries = 0
362+
var entry = inputStream.getNextEntry
363+
entry should not be null
364+
while (entry != null) {
365+
val actual = new String(ByteStreams.toByteArray(inputStream), Charsets.UTF_8)
366+
val expected = Files.toString(logs.find(_.getName == entry.getName).get, Charsets.UTF_8)
367+
actual should be (expected)
368+
totalEntries += 1
369+
entry = inputStream.getNextEntry
383370
}
371+
totalEntries should be (1)
372+
inputStream.close()
384373
}
385374
}
386375

core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala

Lines changed: 27 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,18 @@ package org.apache.spark.deploy.history
1818

1919
import java.io.{File, FileInputStream, FileWriter, InputStream, IOException}
2020
import java.net.{HttpURLConnection, URL}
21+
import java.util.zip.ZipInputStream
2122
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
2223

24+
import com.google.common.base.Charsets
25+
import com.google.common.io.{ByteStreams, Files}
2326
import org.apache.commons.io.{FileUtils, IOUtils}
2427
import org.mockito.Mockito.when
2528
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
2629
import org.scalatest.mock.MockitoSugar
2730

2831
import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf}
2932
import org.apache.spark.ui.SparkUI
30-
import org.apache.spark.util.Utils
3133

3234
/**
3335
* A collection of tests against the historyserver, including comparing responses from the json
@@ -169,18 +171,6 @@ class HistoryServerSuite extends FunSuite with BeforeAndAfter with Matchers with
169171
// Test that the files are downloaded correctly, and validate them.
170172
def doDownloadTest(appId: String, attemptId: Option[Int], legacy: Boolean = false): Unit = {
171173

172-
def validateFile(expStream: FileInputStream, actualStream: FileInputStream): Unit = {
173-
try {
174-
val expected = IOUtils.toString(expStream)
175-
val actual = IOUtils.toString(actualStream)
176-
actual should be(expected)
177-
} finally {
178-
Seq(expStream, actualStream).foreach { s =>
179-
Utils.tryWithSafeFinally(s.close())()
180-
}
181-
}
182-
}
183-
184174
val url = attemptId match {
185175
case Some(id) =>
186176
new URL(s"${generateURL(s"applications/$appId")}/$id/logs")
@@ -193,39 +183,35 @@ class HistoryServerSuite extends FunSuite with BeforeAndAfter with Matchers with
193183
inputStream should not be None
194184
error should be (None)
195185

196-
val dir = Utils.createTempDir()
197-
try {
198-
Utils.chmod700(dir)
199-
HistoryTestUtils.unzipToDir(inputStream.get, dir)
200-
val unzippedContent = dir.listFiles()
201-
attemptId match {
202-
case Some(_) => unzippedContent.length should be (1)
203-
case None => unzippedContent.length should be (2)
204-
}
205-
206-
// If these are legacy files, then each of the unzipped contents is actually a legacy log dir.
186+
val zipStream = new ZipInputStream(inputStream.get)
187+
var entry = zipStream.getNextEntry
188+
entry should not be null
189+
val totalFiles = {
207190
if (legacy) {
208-
unzippedContent.foreach { legacyDir =>
209-
assert(legacyDir.isDirectory)
210-
val logFiles = legacyDir.listFiles()
211-
logFiles.length should be (3)
212-
logFiles.foreach { f =>
213-
val actualStream = new FileInputStream(f)
214-
val expectedStream =
215-
new FileInputStream(new File(new File(logDir, legacyDir.getName), f.getName))
216-
validateFile(expectedStream, actualStream)
217-
}
218-
}
191+
attemptId.map { x => 3 }.getOrElse(6)
219192
} else {
220-
unzippedContent.foreach { f =>
221-
val actualStream = new FileInputStream(f)
222-
val expectedStream = new FileInputStream(new File(logDir, f.getName))
223-
validateFile(expectedStream, actualStream)
193+
attemptId.map { x => 1 }.getOrElse(2)
194+
}
195+
}
196+
var filesCompared = 0
197+
while(entry != null) {
198+
if (!entry.isDirectory) {
199+
val expectedFile = {
200+
if (legacy) {
201+
val splits = entry.getName.split("/")
202+
new File(new File(logDir, splits(0)), splits(1))
203+
} else {
204+
new File(logDir, entry.getName)
205+
}
224206
}
207+
val expected = Files.toString(expectedFile, Charsets.UTF_8)
208+
val actual = new String(ByteStreams.toByteArray(zipStream), Charsets.UTF_8)
209+
actual should be (expected)
210+
filesCompared += 1
225211
}
226-
} finally {
227-
Utils.deleteRecursively(dir)
212+
entry = zipStream.getNextEntry
228213
}
214+
filesCompared should be (totalFiles)
229215
}
230216

231217
test("response codes on bad paths") {

0 commit comments

Comments
 (0)