Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ case object TERMINATED extends State
class StreamExecution(
override val sparkSession: SparkSession,
override val name: String,
val checkpointRoot: String,
private val checkpointRoot: String,
analyzedPlan: LogicalPlan,
val sink: Sink,
val trigger: Trigger,
Expand All @@ -84,6 +84,12 @@ class StreamExecution(
private val startLatch = new CountDownLatch(1)
private val terminationLatch = new CountDownLatch(1)

val resolvedCheckpointRoot = {
val checkpointPath = new Path(checkpointRoot)
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
checkpointPath.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toUri.toString
}

/**
* Tracks how much data we have processed and committed to the sink or state store from each
* input source.
Expand Down Expand Up @@ -154,7 +160,7 @@ class StreamExecution(
case streamingRelation@StreamingRelation(dataSource, _, output) =>
toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
val source = dataSource.createSource(metadataPath)
nextSourceId += 1
// We still need to use the previous `output` instead of `source.schema` as attributes in
Expand Down Expand Up @@ -233,14 +239,14 @@ class StreamExecution(

/** Returns the path of a file with `name` in the checkpoint directory. */
private def checkpointFile(name: String): String =
new Path(new Path(checkpointRoot), name).toUri.toString
new Path(new Path(resolvedCheckpointRoot), name).toUri.toString

/**
* Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]]
* has been posted to all the listeners.
*/
def start(): Unit = {
logInfo(s"Starting $prettyIdString. Use $checkpointRoot to store the query checkpoint.")
logInfo(s"Starting $prettyIdString. Use $resolvedCheckpointRoot to store the query checkpoint.")
microBatchThread.setDaemon(true)
microBatchThread.start()
startLatch.await() // Wait until thread started and QueryStart event has been posted
Expand Down Expand Up @@ -374,7 +380,7 @@ class StreamExecution(

// Delete the temp checkpoint only when the query didn't fail
if (deleteCheckpointOnStop && exception.isEmpty) {
val checkpointPath = new Path(checkpointRoot)
val checkpointPath = new Path(resolvedCheckpointRoot)
try {
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
fs.delete(checkpointPath, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,25 @@ class StreamSuite extends StreamTest {
assertDescContainsQueryNameAnd(batch = 2)
query.stop()
}

test("should resolve the checkpoint path") {
Copy link
Copy Markdown
Member Author

@zsxwing zsxwing Jun 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just this test file has conflicts with 2.2.

withTempDir { dir =>
val checkpointLocation = dir.getCanonicalPath
assert(!checkpointLocation.startsWith("file:/"))
val query = MemoryStream[Int].toDF
.writeStream
.option("checkpointLocation", checkpointLocation)
.format("console")
.start()
try {
val resolvedCheckpointDir =
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot
assert(resolvedCheckpointDir.startsWith("file:/"))
} finally {
query.stop()
}
}
}
}

abstract class FakeSource extends StreamSourceProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
CheckAnswer(6, 3, 6, 3, 1, 1),

AssertOnQuery("metadata log should contain only two files") { q =>
val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString)
val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toUri)
val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
val toTest = logFileNames.filter(!_.endsWith(".crc")).sorted // Workaround for SPARK-17475
assert(toTest.size == 2 && toTest.head == "1")
Expand All @@ -492,7 +492,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
CheckAnswer(1, 2, 1, 2, 3, 4, 5, 6, 7, 8),

AssertOnQuery("metadata log should contain three files") { q =>
val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString)
val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toUri)
val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
val toTest = logFileNames.filter(!_.endsWith(".crc")).sorted // Workaround for SPARK-17475
assert(toTest.size == 3 && toTest.head == "2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,14 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {

verify(LastOptions.mockStreamSourceProvider).createSource(
any(),
meq(s"$checkpointLocationURI/sources/0"),
meq(s"${makeQualifiedPath(checkpointLocationURI.toString)}/sources/0"),
meq(None),
meq("org.apache.spark.sql.streaming.test"),
meq(Map.empty))

verify(LastOptions.mockStreamSourceProvider).createSource(
any(),
meq(s"$checkpointLocationURI/sources/1"),
meq(s"${makeQualifiedPath(checkpointLocationURI.toString)}/sources/1"),
meq(None),
meq("org.apache.spark.sql.streaming.test"),
meq(Map.empty))
Expand Down Expand Up @@ -642,7 +642,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
import testImplicits._
val query = MemoryStream[Int].toDS.writeStream.format("console").start()
val checkpointDir = new Path(
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot)
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot)
val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf())
assert(fs.exists(checkpointDir))
query.stop()
Expand All @@ -654,7 +654,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
val input = MemoryStream[Int]
val query = input.toDS.map(_ / 0).writeStream.format("console").start()
val checkpointDir = new Path(
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot)
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot)
val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf())
assert(fs.exists(checkpointDir))
input.addData(1)
Expand Down