From 7090fc064b9df22f6082f6de03b82a5bdfb29210 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 1 Aug 2017 11:26:02 -0700 Subject: [PATCH 1/5] Ensure places calling HDFSMetadataLog.get check the return value --- .../streaming/CompactibleFileStreamLog.scala | 12 ++++- .../execution/streaming/HDFSMetadataLog.scala | 53 ++++++++++++++++++- .../execution/streaming/StreamExecution.scala | 17 ++++-- .../streaming/HDFSMetadataLogSuite.scala | 13 +++++ 4 files changed, 86 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index e37033b19a8e..d37b3bf42fee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -169,7 +169,11 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( */ private def compact(batchId: Long, logs: Array[T]): Boolean = { val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) - val allLogs = validBatches.flatMap(batchId => super.get(batchId)).flatten ++ logs + val allLogs = validBatches.map { batchId => + super.get(batchId).getOrElse { + throw new IllegalStateException(s"batch $batchId doesn't exist") + } + }.flatten ++ logs // Return false as there is another writer. super.add(batchId, compactLogs(allLogs).toArray) } @@ -186,7 +190,11 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( if (latestId >= 0) { try { val logs = - getAllValidBatches(latestId, compactInterval).flatMap(id => super.get(id)).flatten + getAllValidBatches(latestId, compactInterval).map { id => + super.get(id).getOrElse { + throw new IllegalStateException(s"batch $id doesn't exist") + } + }.flatten return compactLogs(logs).toArray } catch { case e: IOException => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 46bfc297931f..3fb5839189e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -211,13 +211,17 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: } override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)] = { + assert(startId.isEmpty || endId.isEmpty || startId.get <= endId.get) val files = fileManager.list(metadataPath, batchFilesFilter) val batchIds = files .map(f => pathToBatchId(f.getPath)) .filter { batchId => (endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId >= startId.get) - } - batchIds.sorted.map(batchId => (batchId, get(batchId))).filter(_._2.isDefined).map { + }.sorted + + verifyBatchIds(batchIds, startId, endId) + + batchIds.map(batchId => (batchId, get(batchId))).filter(_._2.isDefined).map { case (batchId, metadataOption) => (batchId, metadataOption.get) } @@ -396,6 +400,7 @@ object HDFSMetadataLog { /** * Rename a path. Note that this implementation is not atomic. + * * @throws FileNotFoundException if source path does not exist. * @throws FileAlreadyExistsException if destination path already exists. * @throws IOException if renaming fails for some unknown reason. @@ -437,4 +442,48 @@ object HDFSMetadataLog { } } } + + /** + * Verify if batchIds are continuous and between `startId` and `endId`. + * + * @param batchIds the sorted ids to verify. + * @param startId the start id. If it's set, batchIds should start with this id. + * @param endId the start id. If it's set, batchIds should end with this id. + */ + def verifyBatchIds(batchIds: Seq[Long], startId: Option[Long], endId: Option[Long]): Unit = { + // Verify that we can get all batches between `startId` and `endId`. + if (startId.isDefined || endId.isDefined) { + if (batchIds.isEmpty) { + throw new IllegalStateException(s"batch ${startId.orElse(endId).get} doesn't exist") + } + if (startId.isDefined) { + val minBatchId = batchIds.head + assert(minBatchId >= startId.get) + if (minBatchId != startId.get) { + val missingBatchIds = startId.get to minBatchId + throw new IllegalStateException( + s"batches (${missingBatchIds.mkString(", ")}) don't exist") + } + } + + if (endId.isDefined) { + val maxBatchId = batchIds.last + assert(maxBatchId <= endId.get) + if (maxBatchId != endId.get) { + val missingBatchIds = maxBatchId to endId.get + throw new IllegalStateException( + s"batches (${missingBatchIds.mkString(", ")}) don't exist") + } + } + } + + if (batchIds.nonEmpty) { + val minBatchId = batchIds.head + val maxBatchId = batchIds.last + val missingBatchIds = (minBatchId to maxBatchId).toSet -- batchIds + if (missingBatchIds.nonEmpty) { + throw new IllegalStateException(s"batches (${missingBatchIds.mkString(", ")}) don't exist") + } + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5711262654a1..9407fe014cfc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -438,7 +438,10 @@ class StreamExecution( availableOffsets = nextOffsets.toStreamProgress(sources) /* Initialize committed offsets to a committed batch, which at this * is the second latest batch id in the offset log. */ - offsetLog.get(latestBatchId - 1).foreach { secondLatestBatchId => + if (latestBatchId != 0) { + val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse { + throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist") + } committedOffsets = secondLatestBatchId.toStreamProgress(sources) } @@ -565,10 +568,14 @@ class StreamExecution( // Now that we've updated the scheduler's persistent checkpoint, it is safe for the // sources to discard data from the previous batch. - val prevBatchOff = offsetLog.get(currentBatchId - 1) - if (prevBatchOff.isDefined) { - prevBatchOff.get.toStreamProgress(sources).foreach { - case (src, off) => src.commit(off) + if (currentBatchId != 0) { + val prevBatchOff = offsetLog.get(currentBatchId - 1) + if (prevBatchOff.isDefined) { + prevBatchOff.get.toStreamProgress(sources).foreach { + case (src, off) => src.commit(off) + } + } else { + throw new IllegalStateException(s"batch $currentBatchId doesn't exist") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 7689bc03a4cc..c04bf5b90ee9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -259,6 +259,19 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { fm.rename(path2, path3) } } + + test("verifyBatchIds") { + import HDFSMetadataLog.verifyBatchIds + verifyBatchIds(Seq(1L, 2L, 3L), Some(1L), Some(3L)) + verifyBatchIds(Seq(1L), Some(1L), Some(1L)) + intercept[IllegalStateException](verifyBatchIds(Seq(), Some(1L), None)) + intercept[IllegalStateException](verifyBatchIds(Seq(), None, Some(1L))) + intercept[IllegalStateException](verifyBatchIds(Seq(), Some(1L), Some(1L))) + intercept[IllegalStateException](verifyBatchIds(Seq(2, 3, 4), Some(1L), None)) + intercept[IllegalStateException](verifyBatchIds(Seq(2, 3, 4), None, Some(5L))) + intercept[IllegalStateException](verifyBatchIds(Seq(2, 3, 4), Some(1L), Some(5L))) + intercept[IllegalStateException](verifyBatchIds(Seq(1, 2, 4, 5), Some(1L), Some(5L))) + } } /** FakeFileSystem to test fallback of the HDFSMetadataLog from FileContext to FileSystem API */ From 91efeb3553e5e9f0f6fb45ae7574231c2e70d845 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 1 Aug 2017 17:03:17 -0700 Subject: [PATCH 2/5] One more place --- .../spark/sql/execution/streaming/FileStreamSourceLog.scala | 5 ++++- .../apache/spark/sql/streaming/FileStreamSourceSuite.scala | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index 33e6a1d5d6e1..8628471fdb92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -115,7 +115,10 @@ class FileStreamSourceLog( Map.empty[Long, Option[Array[FileEntry]]] } - (existedBatches ++ retrievedBatches).map(i => i._1 -> i._2.get).toArray.sortBy(_._1) + val batches = + (existedBatches ++ retrievedBatches).map(i => i._1 -> i._2.get).toArray.sortBy(_._1) + HDFSMetadataLog.verifyBatchIds(batches.map(_._1), startId, endId) + batches } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 2108b118bf05..e2ec690d90e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1314,6 +1314,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val metadataLog = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath) assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L, 0)))) + assert(metadataLog.add(1, Array(FileEntry(s"$scheme:///file2", 200L, 0)))) val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil), Nil, dir.getAbsolutePath, Map.empty) From 5170950784235984978d9a29f50f8317204c9e19 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 7 Aug 2017 16:58:52 -0700 Subject: [PATCH 3/5] don't close an output stream quietly --- .../apache/spark/sql/execution/streaming/HDFSMetadataLog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 3fb5839189e4..bf658f441e0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -123,7 +123,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: serialize(metadata, output) return Some(tempPath) } finally { - IOUtils.closeQuietly(output) + output.close() } } catch { case e: FileAlreadyExistsException => From 7f85b637467b559e9fb6065bdc2ac59f82a35333 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 8 Aug 2017 14:18:28 -0700 Subject: [PATCH 4/5] Address --- .../streaming/CompactibleFileStreamLog.scala | 12 ++++++++---- .../sql/execution/streaming/HDFSMetadataLog.scala | 1 - 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index d37b3bf42fee..77bc0ba5548d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -169,9 +169,11 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( */ private def compact(batchId: Long, logs: Array[T]): Boolean = { val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) - val allLogs = validBatches.map { batchId => - super.get(batchId).getOrElse { - throw new IllegalStateException(s"batch $batchId doesn't exist") + val allLogs = validBatches.map { id => + super.get(id).getOrElse { + throw new IllegalStateException( + s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId " + + s"(compactInterval: $compactInterval)") } }.flatten ++ logs // Return false as there is another writer. @@ -192,7 +194,9 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( val logs = getAllValidBatches(latestId, compactInterval).map { id => super.get(id).getOrElse { - throw new IllegalStateException(s"batch $id doesn't exist") + throw new IllegalStateException( + s"${batchIdToPath(id)} doesn't exist " + + s"(latestId: $latestId, compactInterval: $compactInterval)") } }.flatten return compactLogs(logs).toArray diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index bf658f441e0f..d1c03f237a5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -400,7 +400,6 @@ object HDFSMetadataLog { /** * Rename a path. Note that this implementation is not atomic. - * * @throws FileNotFoundException if source path does not exist. * @throws FileAlreadyExistsException if destination path already exists. * @throws IOException if renaming fails for some unknown reason. From 16b02da4f75f7838006ce8a03d63e04d8bc21455 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 8 Aug 2017 17:32:27 -0700 Subject: [PATCH 5/5] Address more --- .../spark/sql/execution/streaming/HDFSMetadataLog.scala | 9 ++++++--- .../sql/execution/streaming/HDFSMetadataLogSuite.scala | 4 ++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index d1c03f237a5b..5f8973fd0946 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -461,7 +461,8 @@ object HDFSMetadataLog { if (minBatchId != startId.get) { val missingBatchIds = startId.get to minBatchId throw new IllegalStateException( - s"batches (${missingBatchIds.mkString(", ")}) don't exist") + s"batches (${missingBatchIds.mkString(", ")}) don't exist " + + s"(startId: $startId, endId: $endId)") } } @@ -471,7 +472,8 @@ object HDFSMetadataLog { if (maxBatchId != endId.get) { val missingBatchIds = maxBatchId to endId.get throw new IllegalStateException( - s"batches (${missingBatchIds.mkString(", ")}) don't exist") + s"batches (${missingBatchIds.mkString(", ")}) don't exist " + + s"(startId: $startId, endId: $endId)") } } } @@ -481,7 +483,8 @@ object HDFSMetadataLog { val maxBatchId = batchIds.last val missingBatchIds = (minBatchId to maxBatchId).toSet -- batchIds if (missingBatchIds.nonEmpty) { - throw new IllegalStateException(s"batches (${missingBatchIds.mkString(", ")}) don't exist") + throw new IllegalStateException(s"batches (${missingBatchIds.mkString(", ")}) " + + s"don't exist (startId: $startId, endId: $endId)") } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index c04bf5b90ee9..48e70e48b179 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -264,6 +264,10 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { import HDFSMetadataLog.verifyBatchIds verifyBatchIds(Seq(1L, 2L, 3L), Some(1L), Some(3L)) verifyBatchIds(Seq(1L), Some(1L), Some(1L)) + verifyBatchIds(Seq(1L, 2L, 3L), None, Some(3L)) + verifyBatchIds(Seq(1L, 2L, 3L), Some(1L), None) + verifyBatchIds(Seq(1L, 2L, 3L), None, None) + intercept[IllegalStateException](verifyBatchIds(Seq(), Some(1L), None)) intercept[IllegalStateException](verifyBatchIds(Seq(), None, Some(1L))) intercept[IllegalStateException](verifyBatchIds(Seq(), Some(1L), Some(1L)))