diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5711262654a1..e19f589dfa8c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -438,7 +438,9 @@ class StreamExecution( availableOffsets = nextOffsets.toStreamProgress(sources) /* Initialize committed offsets to a committed batch, which at this * is the second latest batch id in the offset log. */ - offsetLog.get(latestBatchId - 1).foreach { secondLatestBatchId => + offsetLog.get(latestBatchId - 1).getOrElse { + throw new IllegalStateException(s"batch $latestBatchId doesn't exist") + }.foreach { secondLatestBatchId => committedOffsets = secondLatestBatchId.toStreamProgress(sources) } @@ -565,7 +567,9 @@ class StreamExecution( // Now that we've updated the scheduler's persistent checkpoint, it is safe for the // sources to discard data from the previous batch. - val prevBatchOff = offsetLog.get(currentBatchId - 1) + val prevBatchOff = offsetLog.get(currentBatchId - 1).getOrElse { + throw new IllegalStateException(s"batch $currentBatchId doesn't exist") + } if (prevBatchOff.isDefined) { prevBatchOff.get.toStreamProgress(sources).foreach { case (src, off) => src.commit(off)