diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index e8ce8e148709..9a73a25cac4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -145,10 +145,16 @@ class FileStreamSource( if (batchFiles.nonEmpty) { metadataLogCurrentOffset += 1 - metadataLog.add(metadataLogCurrentOffset, batchFiles.map { case (p, timestamp) => + + val fileEntries = batchFiles.map { case (p, timestamp) => FileEntry(path = p, timestamp = timestamp, batchId = metadataLogCurrentOffset) - }.toArray) - logInfo(s"Log offset set to $metadataLogCurrentOffset with ${batchFiles.size} new files") + }.toArray + if (metadataLog.add(metadataLogCurrentOffset, fileEntries)) { + logInfo(s"Log offset set to $metadataLogCurrentOffset with ${batchFiles.size} new files") + } else { + throw new IllegalStateException("Concurrent update to the log. Multiple streaming jobs " + + s"detected for $metadataLogCurrentOffset") + } } FileStreamSourceOffset(metadataLogCurrentOffset) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index e022bfb6835d..fcc102aea082 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -585,7 +585,9 @@ class MicroBatchExecution( withProgressLocked { sinkCommitProgress = batchSinkProgress watermarkTracker.updateWatermark(lastExecution.executedPlan) - commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)) + assert(commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)), + "Concurrent update to the commit log. Multiple streaming jobs detected for " + + s"$currentBatchId") committedOffsets ++= availableOffsets } logDebug(s"Completed batch ${currentBatchId}")