diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index b6ddf7437ea13..f00ac350bc6b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -58,7 +58,7 @@ case object TERMINATED extends State class StreamExecution( override val sparkSession: SparkSession, override val name: String, - val checkpointRoot: String, + private val checkpointRoot: String, analyzedPlan: LogicalPlan, val sink: Sink, val trigger: Trigger, @@ -84,6 +84,12 @@ class StreamExecution( private val startLatch = new CountDownLatch(1) private val terminationLatch = new CountDownLatch(1) + val resolvedCheckpointRoot = { + val checkpointPath = new Path(checkpointRoot) + val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + checkpointPath.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toUri.toString + } + /** * Tracks how much data we have processed and committed to the sink or state store from each * input source. @@ -154,7 +160,7 @@ class StreamExecution( case streamingRelation@StreamingRelation(dataSource, _, output) => toExecutionRelationMap.getOrElseUpdate(streamingRelation, { // Materialize source to avoid creating it in every batch - val metadataPath = s"$checkpointRoot/sources/$nextSourceId" + val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" val source = dataSource.createSource(metadataPath) nextSourceId += 1 // We still need to use the previous `output` instead of `source.schema` as attributes in @@ -233,14 +239,14 @@ class StreamExecution( /** Returns the path of a file with `name` in the checkpoint directory. */ private def checkpointFile(name: String): String = - new Path(new Path(checkpointRoot), name).toUri.toString + new Path(new Path(resolvedCheckpointRoot), name).toUri.toString /** * Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]] * has been posted to all the listeners. */ def start(): Unit = { - logInfo(s"Starting $prettyIdString. Use $checkpointRoot to store the query checkpoint.") + logInfo(s"Starting $prettyIdString. Use $resolvedCheckpointRoot to store the query checkpoint.") microBatchThread.setDaemon(true) microBatchThread.start() startLatch.await() // Wait until thread started and QueryStart event has been posted @@ -374,7 +380,7 @@ class StreamExecution( // Delete the temp checkpoint only when the query didn't fail if (deleteCheckpointOnStop && exception.isEmpty) { - val checkpointPath = new Path(checkpointRoot) + val checkpointPath = new Path(resolvedCheckpointRoot) try { val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) fs.delete(checkpointPath, true) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 1fc062974e185..1b86468047b94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -614,6 +614,25 @@ class StreamSuite extends StreamTest { assertDescContainsQueryNameAnd(batch = 2) query.stop() } + + test("should resolve the checkpoint path") { + withTempDir { dir => + val checkpointLocation = dir.getCanonicalPath + assert(!checkpointLocation.startsWith("file:/")) + val query = MemoryStream[Int].toDF + .writeStream + .option("checkpointLocation", checkpointLocation) + .format("console") + .start() + try { + val resolvedCheckpointDir = + query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot + assert(resolvedCheckpointDir.startsWith("file:/")) + } finally { + query.stop() + } + } + } } abstract class FakeSource extends StreamSourceProvider { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index b69536ed37463..0925646beb869 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -466,7 +466,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi CheckAnswer(6, 3, 6, 3, 1, 1), AssertOnQuery("metadata log should contain only two files") { q => - val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) + val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toUri) val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) val toTest = logFileNames.filter(!_.endsWith(".crc")).sorted // Workaround for SPARK-17475 assert(toTest.size == 2 && toTest.head == "1") @@ -492,7 +492,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi CheckAnswer(1, 2, 1, 2, 3, 4, 5, 6, 7, 8), AssertOnQuery("metadata log should contain three files") { q => - val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) + val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toUri) val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) val toTest = logFileNames.filter(!_.endsWith(".crc")).sorted // Workaround for SPARK-17475 assert(toTest.size == 3 && toTest.head == "2") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index dc2506a48ad00..b5f1e28d7396a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -378,14 +378,14 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { verify(LastOptions.mockStreamSourceProvider).createSource( any(), - meq(s"$checkpointLocationURI/sources/0"), + meq(s"${makeQualifiedPath(checkpointLocationURI.toString)}/sources/0"), meq(None), meq("org.apache.spark.sql.streaming.test"), meq(Map.empty)) verify(LastOptions.mockStreamSourceProvider).createSource( any(), - meq(s"$checkpointLocationURI/sources/1"), + meq(s"${makeQualifiedPath(checkpointLocationURI.toString)}/sources/1"), meq(None), meq("org.apache.spark.sql.streaming.test"), meq(Map.empty)) @@ -642,7 +642,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { import testImplicits._ val query = MemoryStream[Int].toDS.writeStream.format("console").start() val checkpointDir = new Path( - query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot) + query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot) val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf()) assert(fs.exists(checkpointDir)) query.stop() @@ -654,7 +654,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { val input = MemoryStream[Int] val query = input.toDS.map(_ / 0).writeStream.format("console").start() val checkpointDir = new Path( - query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot) + query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot) val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf()) assert(fs.exists(checkpointDir)) input.addData(1)