Skip to content

Conversation

@mikedias
Copy link

What changes were proposed in this pull request?

The current behavior only the check the filename to determine if a file should be processed or not. I propose to add an option to also test the file timestamp if is greater than last time it was processed, as an indication that it's modified and have different content.

It is useful when the source producer eventually overrides files with new content.

How was this patch tested?

Added unit tests.

@HeartSaVioR
Copy link
Contributor

First of all, I agree this would be one of valid use cases.

I'm just thinking out loud about edge-case (maybe that's why Spark restricts): when timestamp of file is modified in any chance (contents being added, some unintended modification, etc.), all of contents in file are reprocessed (as UT in this patch leverages it) which is not only breaking end-to-end exactly-once but also breaking stateful exactly-once because state will not be rolled back. So the option would fall into "at-least-once" semantic for such case which end users would expect at least stateful exactly-once. It needs to be warned.

@HeartSaVioR
Copy link
Contributor

Ah the title of PR is saying about processing "modified file" instead of new file being overwritten to same file name. Then what I stated is not for an edge-case but just for the case this PR addresses.

@mikedias
Copy link
Author

Yes, that why is important to keep this option as default false keeping the end-to-end exactly-once guarantee. I'll add a note clarifying that in case of modification the whole file content will be processed again.

I like the "modified file" because it's just what it is. If I change to "overwritten files" it might be misunderstood as a content comparison, which is harder to address.

@HeartSaVioR
Copy link
Contributor

The patch looks simple and clear: it seems to be just a matter of policy - allow or disallow possibly non-safe behavior via option.

Personally I'd a bit worried there might be some cases which last modified timestamp on file is modified unintentionally, then things got messed up. Even end users intend to enable this option, end users might complain when end users encounter reprocessing file as well as breaking semantic due to unintended reason. Thought about way to mitigate shortly - mostly regarding file offset - but new overwritten file could have same file length, as well as we also need to store file offset so doesn't seem to be good option as of now.

Maybe I'd worried too much, so need to hear other voices as well. (I'm just a one of contributor anyway and decision will be taken from committers so it's just my 2 cents.)

@mikedias
Copy link
Author

The users always have to understand their use-cases before activating non-default options. Take fileNameOnly option for example: If it gets activated in a wrong use-case it leads to data loss.

Same here, we just have to be clear about what will happen when activating the option in the docs.

@gaborgsomogyi
Copy link
Contributor

gaborgsomogyi commented Feb 14, 2019

The question is why some producer generates the same file again?

From data source perspective I see mainly 2 actually implemented ways:

  • Atomic move to a directory (several engines does that but Spark does it differently because S3 moves the files with copy for example)
  • Write the file a non-atomic way but update metadata file with the already properly written filename. Here the available files are coming from the metadata and all others considered junk.

+1 @HeartSaVioR and I'm worried with this patch as well.

  • Let's take any filesystem, append a file 10k times and then close it. Is it guaranteed that only after the last append will the timestamp updated and no internal OS flush touch it? If there is no guarantee random exception will be thrown by the SQL engine because maybe half of a row written out.
  • Let's take S3 as another example. Even with S3 guard the file modified, the metadata shows the file is there but because of it's read-after-write consistency the file content can be
    • The original one
    • The new one
    • Empty file

This change may increase this behaviour.

All in all with my actual understanding I would change the producer.

@mikedias
Copy link
Author

Some producers does not care much about the uniqueness of filenames, leading into possible/often file overriding. The motivation of this patch is exactly when you can't change the producer's behavior 😄

In my view, this option is a good complimentary of #22952 where we would be able to archive/delete processed files. Without this option, if we upload a file with same name as the previous processed and deleted one, it wouldn't get processed leading into a non-intuitive behavior.

Addressing your concerns:

  • No random exception will be introduced by the option. It only changes the behavior of considering the file for processing or not for each microbatch. The possible race condition that you mention can happen even for a brand new file being written while processing, not related with the patch.
  • Again, the patch does not change anything about how the files are processed. It just introduces another option to control what files were already processed besides the filename. When enabled, it basically treats an already processed file with a new timestamp as a new file again.

@gaborgsomogyi
Copy link
Contributor

you can't change the producer's behavior

and

No random exception will be introduced

is contradictory to me.
How would you deal with the simplest situation: Spark is archiving and producer is uploading the same?

@mikedias
Copy link
Author

Maybe is not clear that the patch does not change any processing behavior, it only adds an option to consider "is this file new and should be included on the micro-batch?"

This is the current FileSourceStream workflow:

  1. When it starts, it read the checkpoint data to get a list of previously processed files and put them in a map called seenFiles where the key is filename and value is timestamp.
  2. For each micro-batch, it lists all files in the directory and checks if the seenFiles map contains the filename to determine if it is a new file or not. Here is where I'm proposing the change.
  3. With the list of new files, it creates a DataSource instance that will handle the correct file format and codecs on and setup a Dataset. These are the classes responsible for reading the file, not changing anything here.
  4. The micro-batch gets the Dataset and executes it. When it finishes, update the checkpoint data with the processed filenames. Then it goes back to step 1.

Answering your question:

How would you deal with the simplest situation: Spark is archiving and producer is uploading the same?

  • includeModifiedFiles=true: the file content will be processed in the next micro-batch.
  • includeModifiedFiles=false: the file content will be ignored.

Regarding eventually race conditions, nothing is changed. Spark will deal with it using the Datasource current mechanisms.

@mikedias mikedias changed the title [SPARK-26875][SQL] Add an option on FileStreamSource to include modified files [SPARK-26875][SS] Add an option on FileStreamSource to include modified files Feb 15, 2019
@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Feb 15, 2019

Maybe @gaborgsomogyi is considering more than what it is as of now. Currently #22952 is implemented as synchronous approach, but @gaborgsomogyi had his voice to make archiving/deletion being async in #22952 (we talked about dealing with it in next TODO), and it's based on the assumption that we never process the file again even same file is added as new.

When the option is turned on, it breaks assumption and things will get changed very differently - including race condition what @gaborgsomogyi is stated. When we are hit by race condition It would produce unintended result (maybe worse scenario would be archive/delete a new file before processing which end users expect to get them processed).
(EDIT: Looks like things will be same even without asynchronous approach - maybe @gaborgsomogyi considered about current state.)

Race condition still occurs between Spark and FileSystem even without this feature (just in #22952), but that's not Spark can control and Spark just should make its best to archive/delete faster before others overwrite/delete them. It doesn't still break query semantic or output even we are hit by race condition.

@mikedias
Copy link
Author

I think that the archive/delete race condition can be addressed by checking the file timestamp before archive/delete. If it is the same as the processed, proceed. If not, skip. This extra step can be enabled only if includeModifiedFiles is enabled, which tells that files can be overridden.

Talking about end users expectations, if they upload a file and it gets deleted/archived, they probably expect a new file with the same name to be processed as well when uploaded again. Do not process the file is not intuitive and is also hard to debug which files names were processed in past. Why my file is not getting processed can be a frequently asked question.

I totally understand the implications of files been unintentionally modified as well pointed by @HeartSaVioR and that's why the option is false by default, but I do think we need to provide an option to cover more use-cases and give a solution for users who understand that their files can be overridden.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@zsxwing
Copy link
Member

zsxwing commented Nov 13, 2019

Thanks a lot for your contribution. However, I think overwriting an existing file is an anti-pattern. Most of storage systems cannot handle this properly:

  • S3: overwriting will be eventually consistent. There is no guarantee that which version we will get.
  • Azure blob storage and data lake: reading a file that's being modified may throw a conflict error.
  • HDFS: I don't know whether it supports reading a file that's being modified.

Generally, file stream source requires the files appear in the directory atomically so that we don't need to handle the case that Spark reads a file that is still being written. Overwriting an existing file breaks this assumption.

@mikedias
Copy link
Author

Hi @zsxwing, thanks for taking time and share your thoughts. The idea of this configuration to add another condition to consider if a file should be processed or not. It does not make any assumption about concurrently modified files or anything else. Everything remains the same.

The scenario that I'm trying to solve here is:

  • User uploads the file lastest_sales.csv to the source folder
  • Spark processes it
  • File lastest_sales.csv gets deleted (manually or via the new configuration [SPARK-20568][SS] Provide option to clean up completed files in streaming query #22952)
  • User uploads the file lastest_sales.csv to the source folder
  • Spark does not process it because it already processed the lastest_sales.csv filename
  • User gets confused. Even if explained/documented, there is no way to tell which filenames were already processed.

What this PR simply proposes is: If enabled, instead of only check the filename to determine if a file was already processed, check the file timestamp as well. Race conditions, file system specifics, stream semantics, and everything else remains the same.

@zsxwing
Copy link
Member

zsxwing commented Nov 18, 2019

@mikedias Thanks for giving the scenario. Yep, I understand this could be helpful. However, I would like to understand more about the use case. How does a user upload lastest_sales.csv to the source folder?

  • Writing directly. This is not recommended as it can potentially break the streaming query if it sees a partial file.
  • Rename into the source folder. Then since a rename operator is involved, why not add a uuid to the file name so overwriting a file is totally not necessary.

As I mentioned previously, overwriting a file makes everything complicated. The user has to think about when is the safe time to overwrite a file. Adding this option may make the user think Spark can handle file overwriting correctly, however we don't and cannot handle race conditions internally in Spark.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Nov 19, 2019

File lastest_sales.csv gets deleted (manually or via the new configuration #22952)

I don't think it is safe to delete the file manually outside of Spark, as it cannot be known whether the file is processed and added to the commit log. Checking whether the file is processed is not sufficient - it should ensure the file will be never accessed.

Assuming the file is deleted/archived from Spark, I'm seeing the possible confusion from user side as well since it's no longer an "overwrite" if the file is processed and cleaned up. So that seems to be a valid use case, though we may need to think how we let Spark differentiate "overwrite" and "add new file which was deleted or moved". Overwriting a file shouldn't be still allowed, as there're pretty much comments here to explain why it's harmful.

@mikedias
Copy link
Author

Thank you @zsxwing @HeartSaVioR for taking time and reviewing this PR. Glad to see activity here :)

@zsxwing The files are uploaded directly to the source folder. I don't see a need for an intermediate step that moves files around.

@HeartSaVioR A common use case for deleting files outside Spark is to remove the old files sitting in the source folder impacting the performance of the ListObjects operation. We use S3 lifecycle policies to delete the files after 15 days (giving plenty of time to Spark process them).

I agree that having an option with modified file in its name might suggest that Spark is providing extra guarantees for race conditions. But what if we rename it to something like: fileUniqueness: filename (default) and fileUniqueness: filename+timestamp? That would be more accurate to what the PR is trying to achieve and does not set wrong expectations.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Nov 19, 2019

@mikedias

A common use case for deleting files outside Spark is to remove the old files sitting in the source folder impacting the performance of the ListObjects operation. We use S3 lifecycle policies to delete the files after 15 days (giving plenty of time to Spark process them).

I agree that some manual operations have to be taken for such case, but as Spark has no idea about the current status of files if they can be modified, unfortunately that has to be with your own risk. There's an option spark.files.ignoreMissingFiles which helps to tolerate file deletion, but for "overwrite" there's no option to help tolerating this, and @zsxwing already explained how hard "in depth" to do it right.

SPARK-20568 provides the way to remove/archive processed files in safe manner officially, so I would agree there's a valid case if end users see the folder and confirm the file doesn't exist (so they are NOT overwriting existing file) and put the file there while the other file with same path was actually processed and removed/archived. I guess that might be considered if we all agree about this as valid use case.

@mikedias
Copy link
Author

mikedias commented Nov 19, 2019

There's an option spark.files.ignoreMissingFiles which helps to tolerate file deletion, but for "overwrite" there's no option to help tolerating this

In fact, there's the option spark.sql.files.ignoreCorruptFiles that helps to deal with partial/corrupted files. We've been using it along with spark.sql.files.ignoreMissingFiles successfully in our stream queries.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Nov 20, 2019

In fact, there's the option spark.sql.files.ignoreCorruptFiles that helps to deal with partial/corrupted files.

Ignoring the input file is not what we want even if we support overwriting file, right? And there's another possible unintended behavior - if the partial file is readable by luck, then the query will not read the further content. That's not an option really reserved for overwriting file.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Feb 29, 2020
@github-actions github-actions bot closed this Mar 1, 2020
@murali9704
Copy link

@mikedias Any reason this PR wasn't merged and closed with inactivity? were there any issues discovered in testing or was it just lack of time to follow through?

can we have this re-opened and reviewed again and merged since it solves an interesting problem for us too.

Thanks again for the solution here.

@HeartSaVioR
Copy link
Contributor

See above comments - I think we already explained why it is not a good idea.

@murali9704
Copy link

@HeartSaVioR If you don't mind, can you summarize the main concern here. I read all the comments here but it was hard to follow them and understand the reason why this is not approved.

@HeartSaVioR
Copy link
Contributor

#23782 (comment)

This comment explains everything.

Also I do not agree that spark.sql.files.ignoreCorruptFiles is a rescue, likewise I commented above. If you ever require Spark to provide at-least-once fault tolerance, there should be never a change to the source on replay. If the input file is somehow overwritten between the batch failure and the reprocessing of the same batch, fault tolerance is going to be broken. It's a hard problem, not a trivial one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants