-
Notifications
You must be signed in to change notification settings - Fork 3k
Abstract the generic task writers for sharing the common codes between spark and flink #1213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I'm assuming that this one should go in before #1145 and will review this one next. If that's not the case, please let me know! |
|
Yeah, you're right. Please help to review this patch firstly if you have time , sir. |
core/src/main/java/org/apache/iceberg/taskio/FileAppenderFactory.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/taskio/BaseTaskWriter.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| @Override | ||
| public List<DataFile> pollCompleteFiles() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's a good idea to have a poll method like this one because it leaks critical state (completedFiles) and creates an opportunity for threading issues between write and pollCompleteFiles.
Instead, I think the base implementation should use a push model, where each file is released as it is closed.
/**
* Called when a data file is completed and no longer needed by the writer.
*/
protected abstract void completedFile(DataFile file);Then closeCurrent would call completedFile(dataFile) and the implementation of completedFile would handle it from there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read the BaseWriter code again and got the difference here. For spark streaming writer, once we did a commit , then we will create another new streaming writer to write the future records, so we don't need a method like pollCompleteFiles() to poll the newly added DataFile continusely. In the current iceberg flink writer implementation, I will use the same TaskWriter to write record even if a checkpoint happen, so I designed the pollCompleteFiles to fetch all completed data files incrementally. I think it's design difference, the state leaks issues and threading issues as you said, it's not a problem in current version but I agree that it's easy to get into those issues if others did not handle it carefully. I can align with the current spark design.
| /** | ||
| * Create a new {@link FileAppender}. | ||
| * | ||
| * @param outputFile indicate the file location to write. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: I think the Javadoc for arguments should describe the argument's purpose, like an OutputFile used to create an output stream. If the purpose is clear from the expected type, then keeping it simple is fine, like an OutputFile.
| */ | ||
|
|
||
| package org.apache.iceberg.spark.source; | ||
| package org.apache.iceberg.taskio; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just use the existing io package? That, or maybe a tasks package.
| return new WrappedFileAppender(partitionKey, outputFile, appender); | ||
| } | ||
|
|
||
| class WrappedFileAppender { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see much value in this class. Its primary use is to keep track of whether a file is large enough to release, but it doesn't actually have any of the logic to do that. As a consequence, the code is now split across multiple places.
This also has the logic for closing an appender and converting it to a DataFile, but that could just as easily be done in a DataFile closeAppender(FileAppender appender) method.
It would make sense to keep this class if it completely encapsulated the logic of rolling new files. That would require some refactoring so that it could create new files using the file and appender factories. It would also require passing a Consumer<DataFile> so that it can release closed files. Otherwise, I think we should remove this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created this class because in fanout writer we will have several opening writers and when building the DataFile, we will need all the informations for the given FileAppender, such as partitionKey, EncryptedOutputFile etc. The previous spark implementations won't need the class because all of the context information are maintained inside the PartitionedWriter (currentXXX ), that's not work for fanout writer. It will be better to have such a class to hold those infos to build DataFile.
It would make sense to keep this class if it completely encapsulated the logic of rolling new files
Good point. Make the WrappedFileAppender to accomplish all the rolling things, let me refactor this.
| } | ||
|
|
||
| boolean shouldRollToNewFile() { | ||
| //TODO: ORC file now not support target file size before closed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should consider changing the ORC appender to simply return 0 if the file isn't finished. That way this check is still valid, but the file will never be rolled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be a separate issue to address this ORC issue you described ? I think we could focus on the writer refactor.
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
|
|
||
| public class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> { | ||
| private final Function<T, PartitionKey> keyGetter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of passing a function, I think this should be an abstract method:
/**
* Create a PartitionKey from the values in row.
* <p>
* Any PartitionKey returned by this method can be reused by the implementation.
*
* @param row a data row
*/
protected abstract PartitionKey partition(T row);Passing a function is good if we need to inject behavior that might need to be customized, but here the only customization that would be required is to partition the objects that this class is already parameterized by. So it will be easier just to add a method for subclasses to implement. And that puts the responsibility on the implementation instead of on the code that constructs the writer.
| super(spec, format, appenderFactory, fileFactory, io, targetFileSize); | ||
| this.key = new PartitionKey(spec, writeSchema); | ||
| this.wrapper = new InternalRowWrapper(SparkSchemaUtil.convert(writeSchema)); | ||
| this.keyGetter = keyGetter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like the other partitioned writer, I think this should use an abstract method to be implemented by subclasses.
|
Ping @rdblue , I think this issue is currently the biggest blocker to move flink sink connector forward now, pls take a look if you have time, thanks. |
| // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers. | ||
| PartitionKey copiedKey = partitionKey.copy(); | ||
| writer = new RollingFileAppender(copiedKey); | ||
| writers.put(copiedKey, writer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code here is handling the case where we've not seen this partition key yet. This is especially likely to happen when users did not keyBy or otherwise pre-shuffle the data according to the partition key.
Is pre-shuffling something that the users should be doing before writing to the table (either keyBy or ORDER BY in Flink SQL)? I understand that this is specifically a PartitionedFanoutWriter, and so it makes sense that keys might not always come together (and even in the case where users did keyBy the partition key, if the number of TaskManager slots that are writing does not equal the cardinality of the partition key you'll still wind up with multiple RollingFileAppenders in a single Flink writing task and thus fanout). However, for long running streaming queries, it's possible that this TaskManager doesn't see this partition key again for days or even weeks (especially at a high enough volume to emit a complete file of the given target file size).
I guess my concern is that users wind up with a very high cardinality of keys on a single TaskManager. Either because they didn't pre-shuffle their data or perhaps they have an imbalance between the cardinality on the partition key and the parallelism at the write stage such that records might not naturally group together enough to emit an entire file. Or, as another edge case, one partition key value is simply not common enough to emit an entire file from this PartitionedFanoutWriter.
IIUC, if the PartitionedFanoutWriter does not see this partition key enough times in this TaskManager again to emit a full file for quite some time, a file containing this data won't be written until close is called. For very long running streaming jobs, this could be days or even weeks in my experience. This could also lead to small files upon close. Is this a concern that Iceberg should take into consideration or is this left to the users in their Flink query to determine when tuning their queries?
I imagine with S3, data locality of a file written much later than its timestamp of when the data was received is not a major concern, as the manifest file will tell whatever query engine reads this table which keys in their S3 bucket to grab and the locality issue is relatively abstracted away from the user, but what about if the user is using HDFS? Could this lead to performance issues (or even correctness issues) on read if records with relatively similar timestamps at their RollingFileAppender are scattered across a potentially large number of files?
I suppose this amounts to three concerns (and forgive me if these are non-issues as I am still new to the project, but not new to Flink so partially this is for helping me understand, as well as reviewing my concerns when reading this code):
- Should we be concerned that a writer won't emit a file until a streaming query is closed due to the previously mentioned case? Possibly tracking the time that each writer has existed and then emitting a file if it has been far too long (however that could be determined).
- If a record comes in at some time, and then the file containing that record isn't written for a much greater period of time (on the order of days or weeks), could this lead to correctness problems or very large performance problems when any query engine reads this table?
- Would it be beneficial to at least emit a warning or info level log to the user that it might be beneficial to pre-partition their data according to the partition key spec if perhaps the number of unique
RollingFileAppenderwriters gets too high for one given Flink writer slot / TaskManager? Admittedly, it might be difficult to determine a heuristic of when this might be a problem vs just the natural difference in the parallelism of writing task slots vs the cardinality of the partition key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be concerned that a writer won't emit a file until a streaming query is closed due to the previously mentioned case?
I think that the intent is to close and emit all of the file files each checkpoint, rather than keeping them open. That is required to achieve exactly-once writes because the data needs to be committed to the table.
I think that also takes care of your second question because data is constantly added to the table.
Would it be beneficial to at least emit a warning or info level log to the user that it might be beneficial to pre-partition their data according to the partition key spec . . .
I think a reasonable thing to do is to limit the number of writers that are kept open, to limit the resources that are held. Then you can either fail if you go over that limit, or can close and release files with a LRU policy. Failing brings the problem to the user's attention immediately and is similar to what we do on the Spark side, which doesn't allow writing new data to a partition after it is finished. That ensures that data is either clustered for the write, or the job fails.
The long-term plan for Spark is to be able to influence the logical plan that is writing to a table. That would be the equivalent of adding an automatic keyBy or rough orderBy for Flink. I think we would eventually want to do this for Flink as well, but I'm not sure what data clustering and sorting operations are supported currently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok. I hadn't realized that was the plan.
I wrote a parquet writer for flink way back when flink did not support it and outputting files on checkpoint was the only real solution that I could come up with.
It also involved forking the base parquet-library, so we wound up abandoning it as we don't really have the engineering head count to be constantly updating and maintaining something like that. Despite the fact that Flink can now support writing parquet files etc, this is why I'm interested in this project. That and then the numerous additions to the data lake that the project supports.
Thanks for the info @rdblue!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During scan planning, IIUC, an inclusive projection could possibly match a very large number of rows that might fall outside of the predicate range if the RollingFileAppender for this rarely observed predicate at this Task Manager buffers its data for a very long time before writing (say days or even weeks in a longer running streaming query).
You mean the flink streaming reader won't see the buffered data which is still not committed to iceberg table ? Actually, that's exactly the expected behavior. Say we have a data pipeline:
(flink-streaming-sink-job-A) -> (iceberg table) -> (flink-streaming-reader-job-B).
The upstream flink-streaming-sink-job-A will append the records to iceberg table continuously, and commit to the iceberg table if checkpoint happen. we need to guarantee the transaction semantic, so the downstream flink streaming reader could only see the committed iceberg data, the delta data between two contiguous snapshots is the incremental data that the flink streaming reader should consume.
| } | ||
| } | ||
|
|
||
| class RollingFileAppender implements Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: This doesn't implement FileAppender, so maybe RollingFileWriter would make more sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, sounds great.
| } | ||
|
|
||
| if (currentAppender == null) { | ||
| currentAppender = new RollingFileAppender(currentKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to not change the logic for opening an appender. Before, this was part of the flow of changing partitions and I don't see any value in moving it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we've changed to maintain the partitionKey inside the RollingFileWriter (we've discussed this before , this is because for fanout writer we may have multiple writers append records), so the RollingFileAppender creation is actually doing the partition key setting. I did not open the appender here because we only need to open an appender when there's a real record to write (in case of opening an appender without writing a record) , all those logic have been hidden inside the RollingFileAppender.
| PartitionedWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, | ||
| OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema writeSchema) { | ||
| private PartitionKey currentKey = null; | ||
| private RollingFileAppender currentAppender = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that the current key is null, we will need a check before adding it to completedPartitions in the write method:
if (!key.equals(currentKey)) {
closeCurrent();
if (currentKey != null) {
// if the key is null, there was no previous current key
completedPartitions.add(currentKey);
}
...
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice finding.
| @Override | ||
| public void write(T record) throws IOException { | ||
| if (currentAppender == null) { | ||
| currentAppender = new RollingFileAppender(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not initialize currentAppender in the constructor? Then we don't need an additional null check in write, which is called in a tight loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactor this part because we don't need to initialize any real writer if there's no record come in. Before this patch , it will open a real file writer even if there's no record to write, and in the end we will need to close this useless writer and clean its file.
spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
Show resolved
Hide resolved
| this.close(); | ||
|
|
||
| List<DataFile> dataFiles = complete(); | ||
| return new TaskCommit(new TaskResult(dataFiles)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If complete doesn't produce TaskResult, then I'm not sure that we need it at all anymore. Could we just construct TaskCommit directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
| while (iterator.hasNext()) { | ||
| iterator.next().close(); | ||
| // Remove from the writers after closed. | ||
| iterator.remove(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many iterator classes don't implement remove. What about iterating over the key set separately instead?
if (!writers.isEmpty()) {
for (PartitionKey key : writers.keySet()) {
RollingFileAppender writer = writers.remove(key);
writer.close();
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, sounds good.
| private final Map<PartitionKey, RollingFileAppender> writers = Maps.newHashMap(); | ||
|
|
||
| public PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory, | ||
| OutputFileFactory fileFactory, FileIO io, long targetFileSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine, but you might want to move this into Flink and combine it with the Flink-specific writer. There are a lot of concerns that might need to change for this class, like using a LRU cache for writers, incrementally releasing files, etc. Since this is only used by Flink, we might just want to iterate on it there instead of trying to maintain this as an independent class. We can always bring it back out when we have an additional use case.
|
Thanks, @openinx! The |
|
Ping @rdblue , Mind to take another look ? Thanks. |
| <!-- --> | ||
| </a> | ||
| <h3>Class org.apache.iceberg.spark.source.SparkBatchWrite.TaskCommit extends org.apache.iceberg.spark.source.TaskResult implements Serializable</h3> | ||
| <h3>Class org.apache.iceberg.spark.source.SparkBatchWrite.TaskCommit implements Serializable</h3> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Javadoc for a release should not be modified. I think this is probably a search and replace error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. we shouldn't change the 0.9.0 Javadoc, let's revert it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, we should not change the javadoc of 0.9.0 release.
| } | ||
|
|
||
| private void openCurrent() { | ||
| if (spec.fields().size() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unpartitioned writers pass a null partition key. Would it make more sense to use that instead of using spec?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it make sense. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it makes sense.
spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
Outdated
Show resolved
Hide resolved
|
|
||
| @Override | ||
| public WriterCommitMessage commit() throws IOException { | ||
| this.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to use the prefix this for close calls, is there?
| TaskCommit(TaskResult result) { | ||
| super(result.files()); | ||
| public static class TaskCommit implements WriterCommitMessage { | ||
| private final List<DataFile> taskFiles; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, this class should use an Array of data files.
| closeCurrent(); | ||
| } | ||
|
|
||
| private void closeCurrent() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this method needed? Why not merge it with close?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, its logics could be just moved to close().
core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java
Outdated
Show resolved
Hide resolved
…n spark and flink.
|
@rdblue I've addressed all latest comment, thanks. |
|
Thanks, @openinx! I fixed the minor problem that caused tests to fail and merged this. |
|
Thanks for the fixing. |
When I implement the PR #1145, I found that the flink TaskWriter share most of the codes with spark. So I did some abstraction to move the common logics in the
iceberg-coremodule, so that both of them could share it.FYI @rdblue .