-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-14678][SQL]Add a file sink log to support versioning and compaction #12435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #55977 has finished for PR 12435 at commit
|
| import org.apache.spark.sql.internal.SQLConf | ||
|
|
||
| /** | ||
| * @param path the file path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Add basic doc string on what this class represents.
|
overall looks quite good. just a few nits on naming and docs. |
|
@tdas FYI, I changed |
|
Test build #56150 has finished for PR 12435 at commit
|
| if (isCompactionBatch(batchId, 3)) { | ||
| // Since batchId is a compaction batch, the batch log file should contain all logs | ||
| assert(sinkLog.get(batchId).getOrElse(Nil) === (0 to batchId).map { | ||
| id => SinkFileStatus("/a/b/" + id, 100L, FileStreamSinkLog.ADD_ACTION) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this and line 147 can be deduped
| * Returns all files except the deleted ones. | ||
| */ | ||
| def allLogs(): Array[FileLog] = { | ||
| def allFiles(): Array[SinkFileStatus] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make this allSinkFile() so that its not ambiguous with the log files?
|
Test build #56197 has finished for PR 12435 at commit
|
| * should set a reasonable `fileCleanupDelayMs`. We will wait until then so that the compaction | ||
| * file is guaranteed to be visible for all readers | ||
| */ | ||
| private val fileCleanupDelayMs = sqlContext.getConf(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all AWS S3 endpoints now implement create consistency: if a new object is created, then a GET made directly on it will return that object.
what can take time to appear is the aggregate file in an ls of the parent "directory" —that's really a wild card match on the path. If the processes can determine the final name of the compaction file, they can look for that file directly (getFileStatus() should suffice, open() even better). If the compact file isn't found, they can look for the non-aggregate files. All that should be required is the aggregate file fully written (with a close() at the end of output operation which doesn't discard any raised exception), before deleting the original files. Adding a minor delay is a low-harm feature, but having a direct check for the aggregate file is something which should be done first
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@steveloughran thanks for pointing out it. I updated the codes. Now it will try to access the next compaction/aggregate file directly. However, a cleanup delay is still helpful to avoid a live lock.
|
Test build #56242 has finished for PR 12435 at commit
|
| val FILE_SINK_LOG_CLEANUP_DELAY = | ||
| SQLConfigBuilder("spark.sql.streaming.fileSink.log.cleanupDelay") | ||
| .internal() | ||
| .doc("How long in milliseconds a file is guaranteed to be visible for all readers.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this? I thought the plan was to use optimistic concurrency control (i.e. just retry if there is a FileNotFoundException).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this? I thought the plan was to use optimistic concurrency control (i.e. just retry if there is a FileNotFoundException).
See my comments here: https://github.com/apache/spark/pull/12435/files#diff-e529f046ee04b9926e8dd88e131134e5R61
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition, old Hadoop's open method doesn't guarantee to throw FileNotFoundException. E.g., https://github.com/apache/hadoop/blob/release-2.4.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3/S3FileSystem.java#L181
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignore S3; look at S3N in Hadoop 2.4. Sadly, it doesn't either; I didn't fix that till 2.5 & HADOOP-9361/HADOOP-9597. Hadoop 2.4 s3n is broken in other ways; look at HADOOP-10457.
to summarise: Don't use s3n in Hadoop 2.4; it was the first update to a later Jets3t library and under tested. 2.5 fixed it, 2.6.0 added s3a, though that's not ready for use in 2.7.
Best to do a check for existence up front (getFileStatus()), which works everywhere.
|
Test build #56289 has finished for PR 12435 at commit
|
|
Test build #56380 has finished for PR 12435 at commit
|
|
Thanks, merging to master! |
| throw new IllegalStateException("Incomplete log file") | ||
| } | ||
| val version = lines(0) | ||
| if (version != VERSION) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be 'version > VERSION' ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be 'version > VERSION' ?
It doesn't matter now. This is the first version. We will update the logic here when we add a new format in future.
|
@zsxwing if you have q's about the quirks of s3* APIs and endpoints, feel free to email me direct, stevel @ hortonworks. |
What changes were proposed in this pull request?
This PR adds a special log for FileStreamSink for two purposes:
FileStreamSinkLog has a new log format instead of Java serialization format. It will write one log file for each batch. The first line of the log file is the version number, and there are multiple JSON lines following. Each JSON line is a JSON format of FileLog.
FileStreamSinkLog will compact log files every "spark.sql.sink.file.log.compactLen" batches into a big file. When doing a compact, it will read all history logs and merge them with the new batch. During the compaction, it will also delete the files that are deleted (marked by FileLog.action). When the reader uses allLogs to list all files, this method only returns the visible files (drops the deleted files).
How was this patch tested?
FileStreamSinkLogSuite