-
Notifications
You must be signed in to change notification settings - Fork 3k
Core, Data: File Format API interfaces #12774
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
liurenjie1024
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pvary for this pr, left some comments, genearlly looks great!
data/src/main/java/org/apache/iceberg/data/ObjectModelRegistry.java
Outdated
Show resolved
Hide resolved
data/src/main/java/org/apache/iceberg/data/ObjectModelRegistry.java
Outdated
Show resolved
Hide resolved
data/src/main/java/org/apache/iceberg/data/ObjectModelRegistry.java
Outdated
Show resolved
Hide resolved
data/src/main/java/org/apache/iceberg/data/AppenderBuilder.java
Outdated
Show resolved
Hide resolved
data/src/main/java/org/apache/iceberg/data/ObjectModelRegistry.java
Outdated
Show resolved
Hide resolved
data/src/main/java/org/apache/iceberg/data/PositionDeleteWriterBuilder.java
Outdated
Show resolved
Hide resolved
liurenjie1024
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pvary for this pr, LGTM!
data/src/main/java/org/apache/iceberg/data/AppenderBuilder.java
Outdated
Show resolved
Hide resolved
liurenjie1024
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just some nits!
data/src/main/java/org/apache/iceberg/data/ObjectModelRegistry.java
Outdated
Show resolved
Hide resolved
data/src/main/java/org/apache/iceberg/data/ObjectModelRegistry.java
Outdated
Show resolved
Hide resolved
liurenjie1024
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pvary !
stevenzwu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some initial comments on the interfaces. will still need to take a look at the other bigger PR to understand more on the work as a whole.
data/src/main/java/org/apache/iceberg/data/ObjectModelRegistry.java
Outdated
Show resolved
Hide resolved
data/src/main/java/org/apache/iceberg/data/FileWriteBuilderBase.java
Outdated
Show resolved
Hide resolved
data/src/main/java/org/apache/iceberg/data/PositionDeleteWriteBuilder.java
Outdated
Show resolved
Hide resolved
data/src/main/java/org/apache/iceberg/data/ContentFileWriteBuilderImpl.java
Outdated
Show resolved
Hide resolved
data/src/main/java/org/apache/iceberg/data/ContentFileWriteBuilder.java
Outdated
Show resolved
Hide resolved
data/src/main/java/org/apache/iceberg/data/ContentFileWriteBuilderImpl.java
Outdated
Show resolved
Hide resolved
data/src/main/java/org/apache/iceberg/data/FileAccessFactoryRegistry.java
Outdated
Show resolved
Hide resolved
|
@huaxingao, @pvary Could you take a look from a comet prospective? I know you have some custom code that would be using this as well |
Originally I thought that the |
…ppender<D> build()' instead '<D> FileAppender<D> build()'
core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
Outdated
Show resolved
Hide resolved
| * Sets the input schema accepted by the writer. If not provided derived from the {@link | ||
| * #schema(Schema)}. | ||
| */ | ||
| WriteBuilder inputSchema(Object schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the drawback of not having generic parameters.
@rdblue Should we revisit @pvary 's earlier attempt to define the new interfaces with generic types.
To avoid breaking the source compatibility and keep the change scope small, the existing implementations (Avro, Orc, Parquet) would use WriteBuilder<Object, Object>, which means the generic parameters don't bring any materialized benefits.
but at least the new interfaces are properly typed. Who knows. In the future, the implementations may be updated. Also if we add a new file format support in the future, the new implementation can be properly typed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @stevenzwu generic types would be better. There are several benefits here:
- Better readability.
- More consistent with other apis such as
DataFileWriterBuilder.
In the future, the implementations may be updated. Also if we add a new file format support in the future, the new implementation can be properly typed.
+1 for this. One of the goals of this effort is to make adopting new file formats easier. Generic types would benefit other file formats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why this is Object. I thought that the builders would be parameterized by this type. Unlike the Parquet and Avro builders, the DataWriteBuilder and delete builders do not expose the writerFunc method of building a writer tree. Instead, those builders accept the Iceberg schema and engine schema and configure the Avro and Parquet builders by calling writerFunc. So the new builders should have a type parameter for this, but the wrapped format-specific builders should not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great that we have consensus that the new WriteBuilder interface can have generic parameters.
The Parquet.WriteBuilder would implement this WriteBuilder interface.
public static class WriteBuilder
implements InternalData.WriteBuilder, org.apache.iceberg.formats.WriteBuilder<Object, Object> {
Peter's earlier attempt is to use Object for the generic params to avoid breaking the source compatibility for the existing Parquet.WriteBuilder. Don't know if we have agreed on how to move forward for that?
| * @param outputFile destination for the written data | ||
| * @return configured writer builder | ||
| */ | ||
| WriteBuilder writeBuilder(OutputFile outputFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this should return WriteBuilder<S>. If it's Parquet format, then it would be WriteBuilder<MessageType>?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We definitely don't want to expose the internal file format types handled by the FileFormats themselves. We might be able to expose the accepted data types, and the engine specific schemas if we decide to do so. See: #12774 (comment)
Based on the decision there, it could be:
- For Spark:
WriteBuilder<InternalRow, SchemaType>,WriteBuilder<InternalRow>, orWriteBuilder - For Flink:
WriteBuilder<RowData, RowType>,WriteBuilder<RowData>, orWriteBuilder - For Generic:
WriteBuilder<Record, Schema>,WriteBuilder<Record>, orWriteBuilder
| * @param outputFile destination for the written data | ||
| * @return configured writer builder | ||
| */ | ||
| WriteBuilder<D, S> writeBuilder(EncryptedOutputFile outputFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EncryptedOutputFile and OutputFile are mostly interchangeable—but not everywhere.
The FileWriterFactory API expects an EncryptedOutputFile as input. In FormatModelRegistry, we can obtain an OutputFile using EncryptedOutputFile.encryptingOutputFile(). However, this means we can’t pass the original EncryptedOutputFile down to Parquet.write, which looks like this:
iceberg/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Lines 148 to 157 in 4e68ff0
| public static WriteBuilder write(EncryptedOutputFile file) { | |
| if (file instanceof NativeEncryptionOutputFile) { | |
| NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile) file; | |
| return write(nativeFile.plainOutputFile()) | |
| .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey()) | |
| .withAADPrefix(nativeFile.keyMetadata().aadPrefix()); | |
| } else { | |
| return write(file.encryptingOutputFile()); | |
| } | |
| } |
If we pass the result of
EncryptedOutputFile.encryptingOutputFile(), the following code path is used:iceberg/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Lines 140 to 146 in 4e68ff0
| public static WriteBuilder write(OutputFile file) { | |
| if (file instanceof EncryptedOutputFile) { | |
| return write((EncryptedOutputFile) file); | |
| } | |
| return new WriteBuilder(file); | |
| } |
Since
EncryptedOutputFile.encryptingOutputFile() returns a plain OutputFile, the file will not be encrypted.
To make the FormatModel API symmetric, we would need to change:
ReadBuilder<D, S> readBuilder(InputFile inputFile);
to
ReadBuilder<D, S> readBuilder(EncryptedInputFile inputFile);
But that feels unnecessary and doesn’t add real value.
@ggershinsky: What's your take? Will we need to push EncryptedInputFile to the file format readers sometime in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll have a look
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought the EncryptedOutputFile. encryptingOutputFile () would return an encryption wrapped `OutputFile
iceberg/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java
Line 291 in ebc6b66
| public OutputFile encryptingOutputFile() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I followed exactly what @stevenzwu suggested.
Unfortunately, there are some hidden requirements in the Parquet implementation for encryption to work properly.
Parquet encryption was only functioning when the target was a HadoopOutputFile. Using StandardEncryptedOutputFile.encryptingOutputFile() produced an AesGcmOutputFile, which resulted in corrupt files.
Since Parquet only needed the Hadoop Configuration object from HadoopOutputFile, I addressed this by introducing:
- HasConfiguration interface – to expose the Hadoop configuration (similar to
HadoopConfigurable). - HadoopAesGcmOutputFile – which extends
AesGcmOutputFile, but also stores the Hadoop configuration.
With these new classes, I made the following changes:
- Implemented
HasConfigurationin bothHadoopAesGcmOutputFileandHadoopOutputFile. - Updated the check in the
Parquet.WriteBuilderconstructor to look forHasConfigurationinstead ofHadoopOutputFilewhen extracting the configuration. - Modified
ParquetIO.fileto check forHasConfigurationinstead ofHadoopOutputFilewhen creating theparquet.HadoopOutputFile. - This one seems a bit risky for me, I would like to hear your thoughts
You can review the changes here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created a PR with a fix: #14528
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two types of file encryption in Iceberg,
- Native Parquet encryption (PME), part of the Parquet spec/impl. Later, it will be leveraged for columnar encryption. Possibly, ORC native encryption will be used too.
- AES GCM Stream, part of the Iceberg spec, for encrypting the non-columnar Avro format (manifest, manifest list and Avro data files).
For Parquet write, we need to pass it the original plain output file (see NativeEncryptionOutputFile.plainOutputFile()) and the encryption key plus AAD prefix (file ID) - see the NativeEncryptionKeyMetadata methods.
For Parquet read, we need to pass it the original input file (via StandardDecryptedInputFile.encryptedInputFile() that does not activate AES GCM Stream decryption) and the encryption key plus AAD prefix (file ID) - see NativeEncryptionKeyMetadata methods.
Parquet encryption/decryption does not use (depend on) Hadoop config or input/output files.
For AES GCM Stream write, we take the StandardEncryptedOutputFile.encryptingOutputFile() that produces an AesGcmOutputFile.
For AES GCM Stream read, we use the StandardDecryptedInputFile as an InputFile object which produces an AesGcmInputFile upon eg. newStream call in StandardDecryptedInputFile)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ggershinsky: Could you please help me with unit tests which execute the different encryption/decryption methods, so I can check what is happening?
I suspect that the Parquet.write expects and handles NativeEncryptingOutputFile for writes, as there explicit implements checks in the Parquet.java
StandardEncryptedOutputFile also implements NativeEncryptingOutputFile. I guess this is why it is handled correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ggershinsky: The File Format API is designed to handle data and delete file writes. From what I understand, in this case native Parquet encryption is being used, which means the input is a NativeEncryptingOutputFile.
Given that, the File Format API should only accept OutputFile as input—not EncryptingOutputFile—since data and delete files are always natively encrypted.
What I don’t understand is why FileWriterFactory expects EncryptedOutputFile:
public interface FileWriterFactory<T> {
DataWriter<T> newDataWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition);
EqualityDeleteWriter<T> newEqualityDeleteWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition);
PositionDeleteWriter<T> newPositionDeleteWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition);
}
All of these should have either NativeEncryptingOutputFile or OutputFile as an input.
… and format to the ContentFileWriteBuilders
core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
Outdated
Show resolved
Hide resolved
stevenzwu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Left a nit comment for a log
646d9f7 to
d450ec6
Compare
The interface part of the changes from #12298
Interfaces which have to be implemented by the File Formats:
ReadBuilder- Builder for reading data from data filesAppenderBuilder- Builder for writing data to data filesObjectModel- Providing ReadBuilders, and AppenderBuilders for the specific data file format and object model pairInterfaces which are used by the actual readers/writers:
AppenderBuilder- Builder for writing a fileDataWriterBuilder- Builder for generating a data filePositionDeleteWriterBuilder- Builder for generating a position delete fileEqualityDeleteWriterBuilder- Builder for generating an equality delete fileReadBuilderhere - the file format reader builder is reusedImplementation classes tying them together:
WriterBuilderclass which implements the AppenderBuilder/DataWriterBuilder/PositionDeleteWriterBuilder/EqualityDeleteWriterBuilder interfaces using the AppenderBuilder provided by the File Format itselfObjectModelRegistrywhich stores the availableObjectModelsand users could request the readers (ReadBuilder) and writers (AppenderBuilder/DataWriterBuilder/PositionDeleteWriterBuilder/EqualityDeleteWriterBuilder) from.