-
Notifications
You must be signed in to change notification settings - Fork 3k
Allow writers to control size of files generated #432
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@xabriel, do you know how large the min/max values are for each row group in the file? I'm curious whether this is caused by the number of row groups or by the stats in the row group metadata. |
spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
Outdated
Show resolved
Hide resolved
|
Test failures seem unrelated, all around Hive Metastore: |
|
@xabriel, I pushed a fix for the flaky Java test this morning. And we're looking into the python test. |
31419ad to
560230a
Compare
|
(Rebased to pickup java test fixes.) |
|
Travis CI still unhappy with the following unrelated test: |
|
Fix in #450 indeed takes care of the test failure. Thanks @rdblue. @rdblue, @aokolnychyi: This PR is now ready for further consideration. |
spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
Outdated
Show resolved
Hide resolved
|
|
||
| @Override | ||
| public WriterCommitMessage commit() throws IOException { | ||
| Preconditions.checkArgument(currentAppender != null, "Commit called on a closed writer: %s", this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These precondition checks on currentAppender, which were only done in UnpartitionedWriter are no longer done in BaseWriter since they make PartitionedWriter fail.
spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
Outdated
Show resolved
Hide resolved
aokolnychyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a quick pass, LGTM.
|
Sorry that I haven't reviewed this yet! I was out for ApacheCon this week and I'm going to be out until Wednesday next week. I'll have a look when I'm back. |
| this.fileCount = 0; | ||
| } | ||
|
|
||
| private synchronized String generateFilename() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should be synchronized unless fileCount is volatile. This doesn't need to be synchronized anyway because each write is single-threaded. I would just remove this to make it a bit simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
| protected PartitionKey currentKey = null; | ||
| protected FileAppender<InternalRow> currentAppender = null; | ||
| protected EncryptedOutputFile currentFile = null; | ||
| protected long currentRows; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: all fields should be initialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is also strange that this is only used in child classes.
Can this class provide a writeInternal method that updates currentRows, writes to the appender, and checks when to open and close the current appender? That would be cleaner and would no longer require all the protected fields. Does performance really degrade when the subclasses are a bit more separated from this base class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd still need field currentKey to be accessible, since it is used in openCurrent() and closeCurrent() in base class, but also on PartitionedWriter's write().
So I can:
- Only make
currentKeyprotected.
or - Add getter/setter for this particular field.
WDYT?
Does performance really degrade when the subclasses are a bit more separated from this base class?
A method call is more expensive than a field access, although I admit that the JIT compiler should pick this up right away. So will fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say to use the getter/setter.
I think this commit is almost ready, except for the encapsulation in this class. If we think the JIT compiler will handle this, then let's go ahead and make this base class handle the important parts.
| private abstract static class BaseWriter implements DataWriter<InternalRow> { | ||
| protected static final int ROWS_DIVISOR = 1000; | ||
|
|
||
| protected final Set<PartitionKey> completedPartitions = Sets.newHashSet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only used in the partitioned case. Can this be a field of the partitioned writer instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix.
| } | ||
| } | ||
|
|
||
| private class EncryptedOutputFileFactory implements OutputFileFactory<EncryptedOutputFile> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think OutputFileFactory is needed. This is the only implementation of it. This could also be named OutputFileFactory and not mention encryption because the files may not actually be encrypted if the plaintext encryption manager is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
|
(Test failures around |
|
Thanks for fixing this, @xabriel! I'll merge it. |
For big jobs where parquet files generated get to be >= 10GB, we have found latency on read related to reading the parquet footer.
For our data and tech stack, we observe that it takes about 1 second per 10GB of file size:
To avoid this, we propose this PR that allows iceberg writers to close and open new files when a target file size is achieved. The semantics of having at most one file open per writers are not changed, and for the case of a
PartitionedWriter, the semantics of failing if the data is not ordered is kept as well.With this PR, now we can do: