Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented May 30, 2021

What changes were proposed in this pull request?

This patch proposes to add an internal config for ignoring metadata of FileStreamSink when reading the output path.

Why are the changes needed?

FileStreamSink produces a metadata directory which logs output files per micro-batch. When we read from the output path, Spark will look at the metadata and ignore other files not in the log.

Normally it works well. But for some use-cases, we may need to ignore the metadata when reading the output path. For example, when we change the streaming query and must to run it with new checkpoint directory, we cannot use previous metadata. If we create a new metadata too, when we read the output path later in Spark, Spark only reads the files listed in the new metadata. The files written before we use new checkpoint and metadata are ignored by Spark.

Although seems we can output to different output directory every time, but it is bad idea as we will produce many directories unnecessarily.

We need a config for ignoring the metadata of FileStreamSink when reading the output path.

Does this PR introduce any user-facing change?

Added a config for ignoring metadata of FileStreamSink when reading the output.

How was this patch tested?

Unit tests.

@SparkQA
Copy link

SparkQA commented May 30, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43604/

@SparkQA
Copy link

SparkQA commented May 30, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43604/

@SparkQA
Copy link

SparkQA commented May 30, 2021

Test build #139083 has finished for PR 32702 at commit c9443a5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented May 30, 2021

cc @HeartSaVioR @xuanyuanking

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented May 30, 2021

This was already proposed before from a part of #31638, though I'm not sure you've indicated this.

Quoting my comment #31638 (comment) :

The behavioral change is bound to file data source, right? I prefer adding the source option instead of adding config, because 1) Spark has a bunch of configurations already 2) I'd prefer having smaller range of impact, per source instead of session-wide.

And also, the option should be used carefully (at least we'd be better to indicate to end users), as the metadata in file stream sink output ensures the "correctly written" files are only read. What would happen if they ignore it?

  1. files not listed in metadata could be corrupted, and these files are now being read. They may need to also turn on the flag "ignore corrupted" as well.

  2. They are no longer able to consider the output directory as "exactly once" - that is "at least one", meaning the output rows can be written multiple times. Without deduplication or consideration of logic on read side, it may result to incorrect output on read side.

Technically this is an existing issue when batch query tries to read from multiple directories or glob path and it includes the output directory on file stream sink. (That said, they could use the glob path as a workaround without adding the new configuration, though I'd agree explicit config is more intuitive.) I'd love to see the proper notice on such case as well since we are here.

@viirya
Copy link
Member Author

viirya commented May 30, 2021

This was already proposed before from a part of #31638, though I'm not sure you've indicated this.

Oh, this comes from internal customer request. It seems hard (or troublesome) to do workaround so I basically think it makes sense to support such use-case. I'm not aware of the previous PR including it.

I'm okay if you think an option is better than a config.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented May 30, 2021

And one more, I think let file stream sink to ignore metadata directory on reading existing metadata but write to the metadata directory is odd and error-prone. The metadata is no longer valid when Spark starts to write a new metadata on the same directory, and the option must be set to true for such directory to read properly despite Spark writes the metadata. There's no indication and end users have to memorize it.

The ideal approach is writing metadata to the directory indicating whether the directory is set to at-least-once (or multi-writes) or exactly-once (or single-write) when the directory is written for the first time, and leverage the option all the time instead of changing its behavior depending on the query's config/option. This will bring consistency for the directory.

Btw I've made more improvements on file stream source and file stream sink, but I had to agree that the efforts are quite duplicated with data lake solutions. (See discussions in #27694) Once you start to address the issues one by one, you've got to realize these are what data lake solutions have been fixed. That's why I stop dealing with file stream source and file stream sink, though I guess ETL to data lake solutions is still valid and then the long-running issue on file stream source should be fixed - #28422

@viirya
Copy link
Member Author

viirya commented May 30, 2021

And one more, I think let file stream sink to ignore metadata directory on reading existing metadata but write to the metadata directory is odd and error-prone. The metadata is no longer valid when Spark starts to write a new metadata on the same directory, and the option must be set to true for such directory to read properly despite Spark writes the metadata. There's no indication and end users have to memorize it.

I don't know if it is a typo, but this doesn't let file stream sink but actually lets file stream source (and batch read path) to ignore metadata directory when reading the output of file stream sink. It doesn't change how file stream sink reads or writes to the metadata directory. Is it possible we are talking two different things?

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented May 30, 2021

Looks like your code change doesn't address it, but your PR description mentions it.

For example, when we change the streaming query and must to run it with new checkpoint directory, we cannot use previous metadata. If we create a new metadata too, when we read the output path later in Spark, Spark only reads the files listed in the new metadata. The files written before we use new checkpoint and metadata are ignored by Spark.

Although seems we can output to different output directory every time, but it is bad idea as we will produce many directories unnecessarily.

What's the solution of this? Doesn't it mean you want to make the directory be writable from multiple queries (including same query with different checkpoint)?

@viirya
Copy link
Member Author

viirya commented May 30, 2021

What's the solution of this? Doesn't it mean you want to make the directory be writable from multiple queries?

The use-case looks like this. The user wants to write to same output directory after changing the query. But once they change something in the query, previous checkpoint cannot be used anymore, so they need to use a new checkpoint directory (and metadata directory, otherwise duplicate batches won't be written). They don't write to the output from multiple queries at the same time.

@HeartSaVioR
Copy link
Contributor

The user wants to write to same output directory after changing the query. But once they change something in the query, previous checkpoint cannot be used anymore, so they need to use a new checkpoint directory (and metadata directory, otherwise duplicate batches won't be written). They don't write to the output from multiple queries at the same time.

Which steps end users require to do to resolve such case with your PR? Deleting metadata directory and letting read path to ignore the metadata?

I know this is a valid workaround to unblock such case end users would be stuck on reusing directory, but they should be quite cautious as they must remember the state of directory; the metadata won't have some parts of output, which is easy to forget. Once they forget the fact and also forget setting the flag on read query, only the parts of output will be read and they will complain about the result of read query without indicating what they did.

So just allowing end users to ignore metadata is simple, but the risks on turning on the flag are not that simple. Let's take our responsibility to guide the meaning of ignoring metadata and try to provide the possible risks.

@viirya
Copy link
Member Author

viirya commented May 31, 2021

Which steps end users require to do to resolve such case with your PR? Deleting metadata directory and letting read path to ignore the metadata?

Currently in the use-case, what the users do is, when they change the query and the checkpoint doesn't work anymore, they clean up the metadata directory, run the changed query with new checkpoint.

They have another Spark app reading from the streaming query output. But as Spark respects the metadata, the another Spark app can only read the files written by the changed streaming query (i.e. the files recorded in the metadata). The other files written before changing the streaming query, are ignored by Spark now.

I know this is a valid workaround to unblock such case end users would be stuck on reusing directory, but they should be quite cautious as they must remember the state of directory; the metadata won't have some parts of output, which is easy to forget. Once they forget the fact and also forget setting the flag on read query, only the parts of output will be read and they will complain about the result of read query without indicating what they did.

So just allowing end users to ignore metadata is simple, but the risks on turning on the flag are not that simple. Let's take our responsibility to guide the meaning of ignoring metadata and try to provide the possible risks.

I agree. That is why this config is internal only so far. I should also add more cautious wordings in the config doc too. I have discussed with the users, seems to me they should know what they are asking and be cautious of the effect of this config.

@viirya
Copy link
Member Author

viirya commented May 31, 2021

@HeartSaVioR Does it sound okay for you? If okay, still prefer an option over config? If so, please let me know so I can change to use option.

@HeartSaVioR
Copy link
Contributor

Now I think it should be a source option. Given the impact, they should know what they are doing in their code, not configuration which can be brought by multiple places, even from cluster level config.

@viirya
Copy link
Member Author

viirya commented Jun 1, 2021

Okay, sounds good. Let me change to using a source option.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jun 4, 2021

@viirya
My apologize about going back and force. Let's introduce it as SQL config, I totally forgot that ignoreMissingFiles and similar options are in SQL config.

But I'd like to emphasize that the root issue won't be resolved and eventually they'd like to try out data lake solutions instead.

@viirya
Copy link
Member Author

viirya commented Jun 4, 2021

Oh, not need to apologize. I've not updated this yet. :) This is a SQL config now. Please help review if you find some time. Thanks!

@xuanyuanking
Copy link
Member

Agree. Adding a config also has one benefit is that the existing application can avoid changing the code. But we'll still avoid adding more and more configs later as @HeartSaVioR suggested. There's indeed too much configs to control the behaviors 😂

@viirya
Copy link
Member Author

viirya commented Jun 10, 2021

@HeartSaVioR @xuanyuanking Can we move forward with this?

@viirya
Copy link
Member Author

viirya commented Jun 17, 2021

ping @HeartSaVioR @xuanyuanking

@xuanyuanking
Copy link
Member

Ack, I'll review this and compare it with my original PR tomorrow (Beijing time).

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM, I check the major idea actually it's same with 83dd27a. Just left some small improvements.

caseInsensitiveOptions.get("path").toSeq ++ paths,
newHadoopConfiguration(),
sparkSession.sessionState.conf) =>
if !sparkSession.sessionState.conf.fileStreamSinkMetadataIgnored &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of checking the config on the caller side in three places, maybe we can directly check the config in FileStreamSink.hasMetadata? These 2 approaches should be equivalent while the latter one only changes single code segment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either is fine for me.

.createWithDefault(true)

val FILESTREAM_SINK_METADATA_IGNORED =
buildConf("spark.sql.streaming.fileStreamSink.metadata.ignored")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following the guideline for naming configurations, maybe the config can be named like spark.sql.streaming.fileStreamSink.ignoreMetadata or spark.sql.streaming.fileStreamSink.formatCheck.enabled, or any other good names :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally spark.sql.streaming.fileStreamSink.ignoreMetadata sounds better. I couldn't get what formatCheck means intuitively.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql.streaming.fileStreamSink.ignoreMetadata sounds good.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks OK. Let's resolve two comments and then we are good to go.

@viirya viirya force-pushed the ignore-metadata branch from 04bccc8 to 677b5f6 Compare June 18, 2021 18:25
@viirya
Copy link
Member Author

viirya commented Jun 18, 2021

Thanks @HeartSaVioR @xuanyuanking! I've updated the change.

@SparkQA
Copy link

SparkQA commented Jun 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44525/

@SparkQA
Copy link

SparkQA commented Jun 18, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44525/

@SparkQA
Copy link

SparkQA commented Jun 18, 2021

Test build #139999 has finished for PR 32702 at commit 677b5f6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

@HeartSaVioR
Copy link
Contributor

Thanks @viirya for the contribution! I merged into master.

@viirya viirya deleted the ignore-metadata branch June 18, 2021 23:34
@xuanyuanking
Copy link
Member

Thanks @viirya !

flyrain pushed a commit to flyrain/spark that referenced this pull request Sep 21, 2021
…treamSink

This patch proposes to add an internal config for ignoring metadata of `FileStreamSink` when reading the output path.

`FileStreamSink` produces a metadata directory which logs output files per micro-batch. When we read from the output path, Spark will look at the metadata and ignore other files not in the log.

Normally it works well. But for some use-cases, we may need to ignore the metadata when reading the output path. For example, when we change the streaming query and must to run it with new checkpoint directory, we cannot use previous metadata. If we create a new metadata too, when we read the output path later in Spark, Spark only reads the files listed in the new metadata. The files written before we use new checkpoint and metadata are ignored by Spark.

Although seems we can output to different output directory every time, but it is bad idea as we will produce many directories unnecessarily.

We need a config for ignoring the metadata of `FileStreamSink` when reading the output path.

Added a config for ignoring metadata of FileStreamSink when reading the output.

Unit tests.

Closes apache#32702 from viirya/ignore-metadata.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
@mohaidoss
Copy link

Sorry to jump on a very old thread.
@viirya is this still the valid method to do this ? In my example, we changed the sink s3 bucket names s3a://old_path to s3a://new_path, however now we can't batch read the files, because the _spark_metadata still points to s3a://old_path.

Using the config spark.sql.streaming.fileStreamSink.ignoreMetadata solves this for us, but I am curious if there is an elegant way, for example by actually fixing the spark_metadata folder maybe ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants