Skip to content

Conversation

@xuanyuanking
Copy link
Member

@xuanyuanking xuanyuanking commented Feb 24, 2021

What changes were proposed in this pull request?

When checking the path in FileStreamSink.hasMetadata, we should ignore the error and assume the user wants to read a batch output.

Why are the changes needed?

Keep the original behavior of ignoring the error.

Does this PR introduce any user-facing change?

Yes.
The path checking will not throw an exception when checking file sink format

How was this patch tested?

New UT added.

@xuanyuanking
Copy link
Member Author

@SparkQA
Copy link

SparkQA commented Feb 24, 2021

Test build #135434 has finished for PR 31638 at commit 743b19b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@HeartSaVioR HeartSaVioR Feb 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be simplified after checking "isDirectory() and exists() work with glob path?".

If these methods don't support glob path (and I expect they don't), we can check glob path in prior instead of dealing with exception. That would also removes the necessity of context to know "these methods can fail when the path is glob and the path is too long".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a really reasonable concern. Let me share more details about the context:

  • It's a regression after SPARK-26824 (you may find after the fix we move the isDirectory and exists checking out of try catch block). In our user case, the exception was thrown only with the long glob path. The same code passed in Spark 2.4.
  • The exception was thrown of the checking isDirecoty. However, we might not be sure that it's always the same behavior since the FileSystem has different implementation in different systems (we met this in a non-hdfs file system).

So based on the two points above, I just chose a safer way to fix this issue. The current fix, a new config and dealing with the glob path exception, should carefully back to the behavior before SPARK-26824, and without new behavior changes.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Mar 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's focus on "glob path" here - from the quick look on org.apache.hadoop.fs.FileSystem javadoc, both isDirectory() and exists() seem to require the exact path, not glob path. The method which accept glob path is globStatus(), and there the API clearly names the parameter as pathPattern.

https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/FileSystem.html

And intuitively, it sounds very odd to me if someone can say the glob path is a directory while matching paths can be both files and directories, and same for checking existence as well. I don't think it's feasible to "expect" the meaningful value from calling these methods with glob path. That sounds to me as "undefined behavior" even any weird file system could return true for the case.

That said, I'd rather consider the input of glob path as "wrong one" and always return false (some sort of debug log message is fine if we really like to log). If there's a code relying on such behavior, I don't think that is correct. I'd rather say the possible paths should be populated before, and this method should be called per each path.

I still need to hear voices from others, but if the consensus goes to just disallow the glob path here, we won't need to introduce the new configuration.

@tdas @zsxwing @jose-torres @viirya @gaborgsomogyi Would like to hear your voices on this. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc. @steveloughran @sunchao as they could provide great inputs on Hadoop filesystem's point of view.

The summary of comment thread - is the behavior defined on providing "glob path" to isDirectory() and exists()? If defined, what is the expected behavior?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No isDirectory and exists don't accept glob path, and it will just say "File does not exist". Internally they all call getFileStatus which returns information for a single file, e.g., file length, permission, is it a directory or file, etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR I will change this PR for only resolving the glob path issue as our discussion. (This should be my fault to mix 2 things together.)
I'll create another PR for the behavior changing caused by SPARK-26824.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. So let's sort it out:

  1. filter out the glob path first and simply return false as an "early-return"
  2. restore try-catch to let method tolerate the non-fatal exception

I'm OK to either doing both in here, or one by one.

Copy link
Member Author

@xuanyuanking xuanyuanking Mar 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the summary. I addressed in d7abb94.
Also, I tried to add the config in the exception branch and error message, which can give a way for users to fall back to the behavior before SPARK-26824. Please check whether it makes sense for you to add a config in this way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xuanyuanking
Sorry I didn't realize you've updated this PR.

As I said earlier, if we want to ignore metadata for file sink output directory, that should be added to "source option". Let's deal with another JIRA issue / PR so that we can continue discussion afterwards.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's deal with another JIRA issue / PR so that we can continue discussion afterwards.

Yep, thanks for the advice, done in 11f6bf0.
(actually that's what I mean in I'll create another PR for the behavior changing caused by SPARK-26824. I think I didn't express clear enough...)

@HeartSaVioR
Copy link
Contributor

The behavioral change is bound to file data source, right? I prefer adding the source option instead of adding config, because 1) Spark has a bunch of configurations already 2) I'd prefer having smaller range of impact, per source instead of session-wide.

And also, the option should be used carefully (at least we'd be better to indicate to end users), as the metadata in file stream sink output ensures the "correctly written" files are only read. What would happen if they ignore it?

  1. files not listed in metadata could be corrupted, and these files are now being read. They may need to also turn on the flag "ignore corrupted" as well.

  2. They are no longer able to consider the output directory as "exactly once" - that is "at least one", meaning the output rows can be written multiple times. Without deduplication or consideration of logic on read side, it may result to incorrect output on read side.

Technically this is an existing issue when batch query tries to read from multiple directories or glob path and it includes the output directory on file stream sink. (That said, they could use the glob path as a workaround without adding the new configuration, though I'd agree explicit config is more intuitive.) I'd love to see the proper notice on such case as well since we are here.

@xuanyuanking
Copy link
Member Author

The behavioral change is bound to file data source, right? I prefer adding the source option instead of adding config, because 1) Spark has a bunch of configurations already 2) I'd prefer having smaller range of impact, per source instead of session-wide.

Agree. Especially for the 1), fully agree we should carefully to add new configs only when it's really needed. It should be my fault that didn't provide more context as mentioned in #31638 (comment). Actually the same user code can pass in version 2.4 but fail now. If we add a source option, code changes is needed for them on controlling the behavior. It might not be a user friendly fix IMO. If this is a new behavior, I totally will follow your suggestion to add an option instead of config.

I'd love to see the proper notice on such case as well since we are here.

Yes, thanks for the reminding. I'll add warning log for the cases you mentioned in proper places (like the hasMetadata return false) as well as SS user guides (do you think user guides is a also a reasonable place?). Do you prefer to do this in the current PR or a separated one?

@HeartSaVioR
Copy link
Contributor

OK now I understand the rationalization of the PR. I agree that it shouldn't be a new source option if we just want to fix the exposed problem rather than making it as valid option. I was feeling some sort of adding such behavior as new available option, but now looks like it's not.

I'm still concerned that we are living with huge number of flags; I understand that any behavior fix has a risk that some workload could be broken, but we also need to explore all options to see if we can fix the issue without introducing the flag. Let's hear some more voices to see whether we can fix this safely, or have to deal with flag.

@xuanyuanking
Copy link
Member Author

Yep, let's wait for more voices.

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no major issues from my perspective; functionality, performance and tests all look OK

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be a file/dir, or is existence all that matters?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existence is all that matters

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

=== ? or a .contains()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, change to contains in d7abb94

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed here, but it's worth knowing that hadoop-common-test jar has a class org.apache.hadoop.fs.FileSystemTestHelper that lets you inject an instance of an FS into the cache. Very useful from time to time. Spark could either use that or add its own (test module) class to o.a.h.fs and call FileSystem.addFileSystemForTesting() when needed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the guidance, let me check the usage of FileSystemTestHelper.

@SparkQA
Copy link

SparkQA commented Mar 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40794/

@SparkQA
Copy link

SparkQA commented Mar 18, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40794/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we exclude the functionality on ignoring metadata in this PR, please also remove the sentence as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done in 11f6bf0.

@xuanyuanking xuanyuanking changed the title [SPARK-34526][SS] Add a flag to skip checking file sink format and handle glob path [SPARK-34526][SS] Skip checking glob path in FileStreamSink.hasMetadata Mar 25, 2021
@SparkQA
Copy link

SparkQA commented Mar 25, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41109/

@SparkQA
Copy link

SparkQA commented Mar 25, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41109/

Copy link
Member

@zsxwing zsxwing Mar 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot do this. For example, the following test works today. But if you changed this, it would be broken.

  test("foo") {
    withTempDir { tempDir =>
      val path = new java.io.File(tempDir, "[]")
      val chk = new java.io.File(tempDir, "chk")
      val q = spark.readStream.format("rate").load().writeStream.format("parquet")
        .trigger(org.apache.spark.sql.streaming.Trigger.Once())
        .option("checkpointLocation", chk.getCanonicalPath)
        .start(path.getCanonicalPath)
      q.awaitTermination()
      assert(SparkHadoopUtil.get.isGlobPath(new Path(path.getCanonicalPath)))
      val fileIndex = spark.read.format("parquet").load(path.getCanonicalPath)
        .queryExecution.executedPlan.collectFirst {
        case f: FileSourceScanExec => f.relation.location
      }.head
      assert(fileIndex.isInstanceOf[MetadataLogFileIndex])
    }
  }

The confusing part is Hadoop uses the same class Path for both normal path and glob path. But that's not something we can change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expectation on user side is important here; they clearly specify that the input path is a "glob path". It seems easier to reason about the behavior we simply ignore metadata if the input path is a glob path.
I'm not clear where it is broken here, but I think there could be some sort of arguments that file index should be MetadataLogFileIndex. For me, listing files even for this case is consistent with the cases where glob path matches multiple directories.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just thinking out loud; I'm OK with fixing regression first, and revisit if we still would like to make the behavior clear to reason about, despite of the fact we might break the existing behavior.

@SparkQA
Copy link

SparkQA commented Mar 25, 2021

Test build #136525 has finished for PR 31638 at commit 11f6bf0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Mar 25, 2021

I think the issue we try to fix is when a glob path is valid but we cannot call getFileStatus with it, how to allow users to access batch output.

For example, a glob path can be very long such as /foo/bar/{20200101,20200102,...20201231} which cannot get accepted by getFileStatus because most of cloud storages won't accept such a long path. However, after we expend the glob path to real file paths, these paths are valid paths.

In 2.4, we blindly ignore errors when checking whether a directory has _metadata_log or not. So the above case works in 2.4. However, in 3.0, we don't ignore errors any more, and it exposes this issue.

In addition, a glob path can be a normal path, which makes it much more complicated. For example,

  test("foo") {
    withTempDir { tempDir =>
      val path = new java.io.File(tempDir, "[]")
      val chk = new java.io.File(tempDir, "chk")
      val q = spark.readStream.format("rate").load().writeStream.format("parquet")
        .trigger(org.apache.spark.sql.streaming.Trigger.Once())
        .option("checkpointLocation", chk.getCanonicalPath)
        .start(path.getCanonicalPath)
      q.awaitTermination()
      assert(SparkHadoopUtil.get.isGlobPath(new Path(path.getCanonicalPath)))
      val fileIndex = spark.read.format("parquet").load(path.getCanonicalPath)
        .queryExecution.executedPlan.collectFirst {
        case f: FileSourceScanExec => f.relation.location
      }.head
      assert(fileIndex.isInstanceOf[MetadataLogFileIndex])
    }
  }

Ideally, batch queries should not be impacted by the extra code from streaming queries. But unfortunately that's impossible now. I'm inclined to do the initial approach which only skip glob paths when an error is thrown.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Mar 26, 2021

The glob path is valid for the path of data source, but I'm not sure I can agree it's also valid for the parameter of FileStreamSink.hasMetadata().

I couldn't imagine the "correct" behavior and return value when the glob path is provided. Let's say there're /output/a and /output/b, and only /output/b was created with streaming query so having metadata directory.

When we provide /output/* as a glob path on path, what would we expect? I see three possible behaviors:

  1. Leverage metadata in /output/b for reading /output/b and read /output/a via listing. Sounds ideal but not sure Spark now does it.
  2. Leverage metadata in /output/b for reading /output/b. /output/a is silently ignored.
  3. Don't leverage metadata in /output/b and read both directories via listing.

Not sure which one Spark does now, but one clear thing for me is that reasoning the return value of FileStreamSink.hasMetadata("/output/*") for above case is very hard if it doesn't work the way it always returns false for glob path. If Spark populates the valid paths from glob path and handles these paths individually (that said, the method is called with non-glob path) the result would be pretty clear. Otherwise, I'm confused what we are expecting.

@zsxwing
Copy link
Member

zsxwing commented Mar 26, 2021

When we provide /output/* as a glob path on path, what would we expect?

Currently Spark will Don't leverage metadata in /output/b and read both directories via listing.. Ideally we should respect metadata in each directory. But I cannot find a simple way to solve it.

Ideally, we should provide different APIs for normal path and glob path. But since we have mixed two different concepts into one parameter, we cannot take back it. I'd suggest to set the goal to fix the regression from 3.0: when a glob path is valid but we cannot call getFileStatus with it, how to allow users to access batch output. and avoid changing any other existing behaviors.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Mar 27, 2021

OK I agree with the direction on setting goal as fixing "regression" only if we have no clear solution to fix the root cause or overall problem.

Just a one thing I think we'd like to make clear is, the flag on behavior. As we are sure we are fixing regression here, I wonder we need to introduce a flag for that (that was my concern on initial shape of the PR); would someone want to deny fixing regression and leave the behavior as it is?

@xuanyuanking
Copy link
Member Author

Thanks for the help and discussion. I revived the first commit with more logs following the recent comment. Please check whether it makes sense for you now.

@SparkQA
Copy link

SparkQA commented Apr 1, 2021

Test build #136822 has started for PR 31638 at commit 207d063.

@SparkQA
Copy link

SparkQA commented Apr 1, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41402/

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Apr 1, 2021

I guess we still don't decide with the following:

Just a one thing I think we'd like to make clear is, the flag on behavior. As we are sure we are fixing regression here, I wonder we need to introduce a flag for that (that was my concern on initial shape of the PR); would someone want to deny fixing regression and leave the behavior as it is?

cc. @zsxwing

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need another flag for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. As the comment in #31638 (comment), it's a behavior change introduced after SPARK-26824, which is already published in Spark 3.0. So a new flag for this might be a proper way to keep both behaviors with flexibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not in favor of having flags and would like to avoid adding flag at all. When we add a flag, we are adding "branch" to maintenance. The purpose of this PR is to "fix" regression brought in Spark 3.0.0 - do we really want to retain the current behavior even we define the current behavior as "regression"? If then, can we make clear both behaviors have valid use cases?

@HeartSaVioR
Copy link
Contributor

As we agreed upon, let's focus fixing the regression only. The flag brings side-effect which is actually a new functionality, ignoring _spark_metadata, which isn't related to regression. If we think the new functionality is worth having, let's provide the functionality as a new source option, not a flag.

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42253/

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Test build #137726 has finished for PR 31638 at commit 8bd9297.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42261/

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42261/

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Test build #137734 has finished for PR 31638 at commit 867363f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks to be different from 2.4:

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala

Do you have specific exceptions to catch here? I can't imagine any SparkException from above logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is added for fixing the UT org.apache.spark.sql.streaming.StreamingQuerySuite.detect escaped path and report the migration guide, which expects to have a SparkException thrown in checkEscapedMetadataPath.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK I've missed that checkEscapedMetadataPath throws SparkException. My bad.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

I'll merge this in early next week if there's no further outstanding comment.

@HeartSaVioR
Copy link
Contributor

Sorry I missed this. I'll try to close and reopen by myself to see whether committer can retrigger the Github Action test.

@HeartSaVioR HeartSaVioR closed this May 6, 2021
@HeartSaVioR HeartSaVioR reopened this May 6, 2021
@HeartSaVioR
Copy link
Contributor

Hmm... Looks like it just picks up the old run. Need to retrigger explicitly via empty commit or so.

@HeartSaVioR
Copy link
Contributor

Retest this, please

@HeartSaVioR
Copy link
Contributor

@xuanyuanking I'm sorry I need your help on retriggering Github Action. Could you please rebase with master or push empty commit to update the commit here? Thanks in advance!

@xuanyuanking
Copy link
Member Author

Sure. Great thanks for your reminder! Rebase to the latest master and force push done.

@SparkQA
Copy link

SparkQA commented May 6, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42720/

@SparkQA
Copy link

SparkQA commented May 6, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42725/

@SparkQA
Copy link

SparkQA commented May 6, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42725/

@SparkQA
Copy link

SparkQA commented May 6, 2021

Test build #138199 has finished for PR 31638 at commit 867363f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 6, 2021

Test build #138204 has finished for PR 31638 at commit fa42b0f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants