Skip to content

[SPARK-20894][SS]Resolve the checkpoint location in driver and use the resolved path in state store#18149

Closed
zsxwing wants to merge 2 commits intoapache:masterfrom
zsxwing:SPARK-20894
Closed

[SPARK-20894][SS]Resolve the checkpoint location in driver and use the resolved path in state store#18149
zsxwing wants to merge 2 commits intoapache:masterfrom
zsxwing:SPARK-20894

Conversation

@zsxwing
Copy link
Copy Markdown
Member

@zsxwing zsxwing commented May 30, 2017

What changes were proposed in this pull request?

When the user runs a Structured Streaming query in a cluster, if the driver uses the local file system, StateStore running in executors will throw a file-not-found exception. However, the current error is not obvious.

This PR makes StreamExecution resolve the path in driver and uses the full path including the scheme part (such as hdfs:/, file:/) in StateStore.

Then if the above error happens, StateStore will throw an error with this full path which starts with file:/, and it makes this error obvious: the checkpoint location is on the local file system.

One potential minor issue is that the user cannot use different default file system settings in driver and executors (e.g., use a public HDFS address in driver and a private HDFS address in executors) after this change. However, since the batch query also has this issue (See

path.makeQualified(fs.getUri, fs.getWorkingDirectory)
), it doesn't make things worse.

How was this patch tested?

The new added test.

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 31, 2017

Test build #77558 has finished for PR 18149 at commit 133f0dd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


/**
* Processes any data available between `availableOffsets` and `committedOffsets`.
*
Copy link
Copy Markdown
Contributor

@tdas tdas May 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: extra line

val query = MemoryStream[Int].toDF
.writeStream
.option("checkpointLocation", checkpointLocation)
.format("console").start()
Copy link
Copy Markdown
Contributor

@tdas tdas May 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: .start() on the next line.

@tdas
Copy link
Copy Markdown
Contributor

tdas commented May 31, 2017

roughly LGTM, as long as you resolve conflicts and tests pass.

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 31, 2017

Test build #77605 has finished for PR 18149 at commit b099c56.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class DayOfWeek(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
  • case class StringReplace(srcExpr: Expression, searchExpr: Expression, replaceExpr: Expression)
  • trait Command extends LogicalPlan
  • case class ExecutedCommandExec(cmd: RunnableCommand, children: Seq[SparkPlan]) extends SparkPlan
  • case class StateStoreId(
  • class UnsafeRowPair(var key: UnsafeRow = null, var value: UnsafeRow = null)
  • trait StateStoreWriter extends StatefulOperator

@zsxwing
Copy link
Copy Markdown
Member Author

zsxwing commented Jun 1, 2017

Thanks! Merging to master and 2.2.

@asfgit asfgit closed this in 2bc3272 Jun 1, 2017
@zsxwing zsxwing deleted the SPARK-20894 branch June 1, 2017 00:29
zsxwing added a commit to zsxwing/spark that referenced this pull request Jun 1, 2017
…he resolved path in state store

When the user runs a Structured Streaming query in a cluster, if the driver uses the local file system, StateStore running in executors will throw a file-not-found exception. However, the current error is not obvious.

This PR makes StreamExecution resolve the path in driver and uses the full path including the scheme part (such as `hdfs:/`, `file:/`) in StateStore.

Then if the above error happens, StateStore will throw an error with this full path which starts with `file:/`, and it makes this error obvious: the checkpoint location is on the local file system.

One potential minor issue is that the user cannot use different default file system settings in driver and executors (e.g., use a public HDFS address in driver and a private HDFS address in executors) after this change. However, since the batch query also has this issue (See https://github.com/apache/spark/blob/4bb6a53ebd06de3de97139a2dbc7c85fc3aa3e66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L402), it doesn't make things worse.

The new added test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes apache#18149 from zsxwing/SPARK-20894.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants