Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Dec 17, 2019

What changes were proposed in this pull request?

This patch renews the verification logic of archive path for FileStreamSource, as we found the logic doesn't take partitioned/recursive options into account.

Before the patch, it only requires the archive path to have depth more than 2 (two subdirectories from root), leveraging the fact FileStreamSource normally reads the files where the parent directory matches the pattern or the file itself matches the pattern. Given 'archive' operation moves the files to the base archive path with retaining the full path, archive path is tend to be safe if the depth is more than 2, meaning FileStreamSource doesn't re-read archived files as new source files.

WIth partitioned/recursive options, the fact is invalid, as FileStreamSource can read any files in any depth of subdirectories for source pattern. To deal with this correctly, we have to renew the verification logic, which may not intuitive and simple but works for all cases.

The new verification logic prevents both cases:

  1. archive path matches with source pattern as "prefix" (the depth of archive path > the depth of source pattern)

e.g.

  • source pattern: /hello*/spar?
  • archive path: /hello/spark/structured/streaming

Any files in archive path will match with source pattern when recursive option is enabled.

  1. source pattern matches with archive path as "prefix" (the depth of source pattern > the depth of archive path)

e.g.

  • source pattern: /hello*/spar?/structured/hello2*
  • archive path: /hello/spark/structured

Some archive files will not match with source pattern, e.g. file path: /hello/spark/structured/hello2, then final archived path: /hello/spark/structured/hello/spark/structured/hello2.

But some other archive files will still match with source pattern, e.g. file path: /hello2/spark/structured/hello2, then final archived path: /hello/spark/structured/hello2/spark/structured/hello2 which matches with source pattern when recursive is enabled.

Implicitly it also prevents archive path matches with source pattern as full match (same depth).

We would want to prevent any source files to be archived and added to new source files again, so the patch takes most restrictive approach to prevent the possible cases.

Why are the changes needed?

Without this patch, there's a chance archived files are included as new source files when partitioned/recursive option is enabled, as current condition doesn't take these options into account.

Does this PR introduce any user-facing change?

Only for Spark 3.0.0-preview (only preview 1 for now, but possibly preview 2 as well) - end users are required to provide archive path with ensuring a bit complicated conditions, instead of simply higher than 2 depths.

How was this patch tested?

New UT.

@SparkQA
Copy link

SparkQA commented Dec 17, 2019

Test build #115432 has finished for PR 26920 at commit 4b99e61.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 17, 2019

Test build #115433 has finished for PR 26920 at commit e779edc.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Dec 17, 2019

Test build #115442 has finished for PR 26920 at commit e779edc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

cc. @zsxwing @vanzin @gaborgsomogyi

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks ok. This is basically the check I had suggested in the original PR...

<code>cleanSource</code>: option to clean up completed files after processing.<br/>
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".<br/>
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. <code>/archived/here</code>. This will ensure archived files are never included as new source files.<br/>
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" should ensure some condition to guarantee archived files are never included as new source files:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks ok but the documentation is kinda hard to follow.

First, the whole "should ensure some condition" part is redundant since there is a single condition. Just replace it with the following sentence.

The following sentence can be reworded a bit to be clearer, too:

The value of <code>sourceArchiveDir</code> must not match the source pattern, when considering just the prefix of the paths that match in subdirectory depth. Otherwise archived files would be considered new source files.

(It's kinda hard to explain the depth thing with words in documentation. It always sounds a bit confusing. An example would be much clearer.)

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Dec 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually that made me want to stick with simple condition as current (as I also felt that end users may not be easy to follow the rule), though unfortunately we found the cases which we no longer be able to do that.

I tried to follow the reworded sentence, but it seems to lead confusion cause;

  1. Otherwise archived files would be considered new source files. This sounds me as it's allowed to violate the rule and the result is this, but the goal is that we just don't allow to violate the rule.

  2. The point of condition is that we are checking the match with same depth, taking minimum, due to the fact explained in PR description. While we would want to skip elaborating why, I think we still need to clarify it in doc. I'm not sure only mentioning prefix/subdirectory contains the point.

I'll try to add an example after origin sentence.

@SparkQA
Copy link

SparkQA commented Dec 18, 2019

Test build #115472 has finished for PR 26920 at commit be988df.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Kindly reminder.

@vanzin
Copy link
Contributor

vanzin commented Jan 7, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jan 8, 2020

Test build #116265 has finished for PR 26920 at commit be988df.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Jan 8, 2020

Merging to master.

@vanzin vanzin closed this in bd7510b Jan 8, 2020
@HeartSaVioR
Copy link
Contributor Author

Thanks for reviewing and merging!

@HeartSaVioR HeartSaVioR deleted the SPARK-30281 branch January 8, 2020 23:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants