-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark: Encapsulate parquet objects for Comet #13786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
@huaxingao, @anuragmantri please take a look. |
.../v3.4/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReader.java
Show resolved
Hide resolved
.../v3.4/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReader.java
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java
Outdated
Show resolved
Hide resolved
.../v3.4/spark/src/main/java/org/apache/iceberg/spark/parquet/CometVectorizedParquetReader.java
Show resolved
Hide resolved
|
@shangxinli thank you for taking a look. I found that there are more changes to get around parquet shading that need to be addressed; let me address your comments along with that. |
0119c24 to
a148a5b
Compare
|
@huaxingao, @shangxinli, @hsiang-c this is ready for final review. |
|
@huaxingao any comments you would like me to address? |
|
@parthchandra Apologies for the delay. I’ve skimmed the changes and would like more time for a deeper review. Because we’re adding several classes to the Parquet module, Comet code is no longer confined to the Spark module. Given the broader scope, it would be ideal if we can get one more committer to review and sign off. |
|
Thanks @huaxingao. Who would you recommend we ask for a second review? |
|
cc @aokolnychyi @flyrain @szehon-ho @stevenzwu Appreciate it if you could please take a look too. Thanks a lot! |
|
What about something like this: pvary@4b6f7a5 |
| // If no explicit factory is set and reader type is COMET, use the default Comet factory | ||
| if (factoryClassName == null && readerType == ParquetReaderType.COMET) { | ||
| factoryClassName = | ||
| org.apache.iceberg.spark.SparkSQLProperties.COMET_VECTORIZED_READER_FACTORY_CLASS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not import SparkSQLProperties?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fails checkstyle -
Task :iceberg-spark:iceberg-spark-4.0_2.13:checkstyleMain FAILED
[ant:checkstyle] [ERROR] /iceberg/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java:21:58: Using a static member import should be avoided - org.apache.iceberg.spark.SparkSQLProperties.COMET_VECTORIZED_READER_FACTORY_CLASS. [AvoidStaticImport]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But you can import SparkSQLProperties and invoke like SparkSQLProperties.COMET_VECTORIZED_READER_FACTORY_CLASS
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
Show resolved
Hide resolved
@pvary I'm not sure what you're asking here. Would you like this PR to be changed to build on top of the proposed API? |
|
@pvary @huaxingao I've rebased on the latest and resolved the conflicts. PTAL. |
| } | ||
|
|
||
| /** Convenience method to enable comet */ | ||
| public ReadBuilder enableComet(boolean enableComet) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is this method used? Can we just use vectorizedReaderFactory?
| if (enableComet) { | ||
| this.properties.put( | ||
| VECTORIZED_READER_FACTORY, | ||
| "org.apache.iceberg.spark.parquet.CometVectorizedParquetReaderFactory"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to keep this consistent with that in SparkSQLProperties? Can we put it in a common module?
|
|
||
| public class Parquet { | ||
| private static final Logger LOG = LoggerFactory.getLogger(Parquet.class); | ||
| private static final String VECTORIZED_READER_FACTORY = "read.parquet.vectorized-reader.factory"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a unit test for changes in this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a bunch of tests for this. For the Comet specific code, there is a plan to add a diff file that can be applied to change the reader for all the existing tests and we can add a ci test pipeline that verifies that nothing is broken.
|
|
||
| if (batchedReaderFunc != null) { | ||
| // Try to load custom vectorized reader factory from properties | ||
| String readerName = properties.get(VECTORIZED_READER_FACTORY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this factoryName?
| Class<?> factoryClass = Class.forName(className); | ||
| if (!VectorizedParquetReaderFactory.class.isAssignableFrom(factoryClass)) { | ||
| LOG.warn("Class {} does not implement VectorizedParquetReaderFactory interface", className); | ||
| return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than return null for multiple failure cases, how about
VectorizedParquetReaderFactory factory = null;
try {
Class<?> factoryClass = Class.forName(className);
if (VectorizedParquetReaderFactory.class.isAssignableFrom(factoryClass)) {
factory = (VectorizedParquetReaderFactory) factoryClass.getDeclaredConstructor().newInstance();
} else {
// log warning
} catch (...) {
// log warning
}
return factory;
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We lose a little bit of helpful text in the error, but made the change.
| String factoryClassName = readConf.parquetVectorizedReaderFactory(); | ||
|
|
||
| // If no explicit factory is set and reader type is COMET, use the default Comet factory | ||
| if (factoryClassName == null && readerType == ParquetReaderType.COMET) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be handled inside readConf.parquetVectorizedReaderFactory() and moved to right before the check if (factoryClassName != null) {.
| reuseContainers, | ||
| caseSensitive, | ||
| batchSize); | ||
| this.conf = readConf.copy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to immediately copy the readConf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH, I don't know myself. Based it on what is being done in VectorizedParquetReader
| private ReadConf conf = null; | ||
|
|
||
| private ReadConf init() { | ||
| if (conf == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this thread safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it needs to be. This is pretty much what is done in VectorizedParquetReader
|
@parthchandra Can we target Spark 4.0 first to make review and revise easier? Meanwhile, can you add more tests for the changes? |
27fe44d to
efb0f9e
Compare
@manuzhang appreciate your feedback and time spent on this PR. We started with just one Spark version in #13378 but that got no attention! If it helps, the Spark 3.4 and Spark 3.5 implementations are (nearly) identical to Spark 4.0. If the review focuses only on Spark 4.0, I can make sure the same changes are made for the other Spark version branches. |
@parthchandra Targeting multiple Spark versions in one PR doesn't make it faster. Usually when a PR targets Spark 4.0 is merged, the back-port PR can be reviewed and merged quickly if the changes are identical. On the other hand, reviewers can get distracted if a PR targets multiple Spark versions. |
I raised the thread to remove spark 3.4 support for the upcoming 1.11 release. We could delay the removal until 1.12 based on feedback. I would like to remove it if possible and just have 1 spark 3.x version. |
Generally I do find it easier to review the PR if we have 1 PR focused on a single Spark version, such as for Spark 4.0. Once we get that PR merged, we can add followup PR that just copy/paste (hopefully) the same code to other Spark versions. If you look through the repo, this is what we often do as "backport" PRs. Focusing on just the spark 4 integration might cut down the number of files to review by 1/3 :) |
Fixes issues in #13378. This makes all the remaining changes to Iceberg to address issues with Comet/Parquet shading. The corresponding changes to Comet are already merged into Comet 0.10.1
Additionally, the dependency of the iceberg-parquet module has been removed because it created an iceberg-flink dependency on comet-spark. The code is changed to use a reflection based CometBridge class to access the come (Parquet)
FileReader.This PR also removes the commit to enable Comet execution (moving it to a follow up PR: cc @hsiang-c).
This PR makes the changes for Spark 3.4, 3.5, and 4.0