-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-5579] Fixing Kryo registration to be properly wired into Spark sessions #7702
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
70fcf7c to
11cceab
Compare
| // NOTE: We're copying definition of the config introduced in Spark 3.0 | ||
| // (to stay compatible w/ Spark 2.4) | ||
| private val KRYO_USER_REGISTRATORS = "spark.kryo.registrator" | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guess we can make it public so that there is no need to hard code the option key spark.kryo.registrator everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually won't be able to use it everywhere, so i rather stuck w/ the Spark option for consistency (which is the way we handle every other option as well)
| def register(conf: SparkConf): SparkConf = { | ||
| conf.registerKryoClasses(new HoodieSparkKryoProvider().registerClasses()) | ||
| conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[HoodieSparkKryoRegistrar].getName).mkString(",")) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does .mkString(",") make sense here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to convert it to a string, so i kept it generic so that we can drop in one more class. Not strictly necessary though
| .setMaster("local[4]") | ||
| .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | ||
| .set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") | ||
| .set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also move these common options into a tool method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is exactly the method you're referring to (used in tests)
| HoodieRecordGlobalLocation.class | ||
| }; | ||
| }) | ||
| .forEachOrdered(kryo::register); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A stateless function (function that does not take any side effect) is always a better choice especially for tool method, personally I prefer the old way we handle this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree in principle, but here we actually aligning it w/ an interface of KryoRegistrator
| .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | ||
| .config("hoodie.insert.shuffle.parallelism", "4") | ||
| .config("hoodie.upsert.shuffle.parallelism", "4") | ||
| .config("hoodie.delete.shuffle.parallelism", "4") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure whether we can remove these parallelism options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are not removed -- they are replaced w/ options set in getSparkConfForTest
| <relocation> | ||
| <pattern>com.esotericsoftware.kryo.</pattern> | ||
| <shadedPattern>org.apache.hudi.com.esotericsoftware.kryo.</shadedPattern> | ||
| </relocation> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose to move the common bundle dependencies to each bundle pom files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually move it just to bundles that will have the Kryo included (Spark and Flink won't have Kryo included)
e96bbfc to
61e6215
Compare
xushiyan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
but this is a very broad usability change. we should have brought this up for highlighting earlier. |
Agreed, not ideal, but unavoidable unfortunately -- w/o we'd be passing around ~20-30% more dead-weight data. And in some cases it would actually lead to failures as well. |
|
@hudi-bot run azure |
1b075e2 to
8218fde
Compare
- For engines shipping Kryo (Spark, Flink) we'll not be adding it at all (and no shading therefore) - For other engines we'll be shipping shaded Kryo (as we do today)
ffe3e3b to
c372fe7
Compare
…o Spark sessions (apache#7702)" This reverts commit a70355f.
…sessions (apache#7702) ### Change Logs Due to RFC-46 the profile of the data being serialized by Hudi had changed considerably: previously we're mostly passing around Avro payloads, while now we hold our own internal `HoodieRecord` implementations. When classes are not explicitly registered w/ Kryo, it would have to serialize class fully qualified name (FQN) as id every time an object is serialized, which carries a lot of [unnecessary overhead](https://github.com/apache/hudi/pull/7026/files#diff-81f9b48f7f7e71b46ea8764c7d63e310c871895d03640ae93c81b09f38306acb). To work this around in apache#7026 added `HoodieSparkKryoRegistrar` registering some of the commonly serialized Hudi classes. However, during rebasing/merging of the RFC-46 feature branch this changes have been partially reverted and this PR takes a stab at reinstating these. On top of that we had to revisit our current approach to bundling and shading Kryo universally for all bundles. Instead - For engines providing Kryo (Spark, Flink) we're not bundling it at all - For other bundles still requiring it we bundle and shade it (same way we do it today)
…sessions (apache#7702) ### Change Logs Due to RFC-46 the profile of the data being serialized by Hudi had changed considerably: previously we're mostly passing around Avro payloads, while now we hold our own internal `HoodieRecord` implementations. When classes are not explicitly registered w/ Kryo, it would have to serialize class fully qualified name (FQN) as id every time an object is serialized, which carries a lot of [unnecessary overhead](https://github.com/apache/hudi/pull/7026/files#diff-81f9b48f7f7e71b46ea8764c7d63e310c871895d03640ae93c81b09f38306acb). To work this around in apache#7026 added `HoodieSparkKryoRegistrar` registering some of the commonly serialized Hudi classes. However, during rebasing/merging of the RFC-46 feature branch this changes have been partially reverted and this PR takes a stab at reinstating these. On top of that we had to revisit our current approach to bundling and shading Kryo universally for all bundles. Instead - For engines providing Kryo (Spark, Flink) we're not bundling it at all - For other bundles still requiring it we bundle and shade it (same way we do it today)

Change Logs
Due to RFC-46 the profile of the data being serialized by Hudi had changed considerably: previously we're mostly passing around Avro payloads, while now we hold our own internal
HoodieRecordimplementations.When classes are not explicitly registered w/ Kryo, it would have to serialize class fully qualified name (FQN) as id every time an object is serialized, which carries a lot of unnecessary overhead.
To work this around in #7026 added
HoodieSparkKryoRegistrarregistering some of the commonly serialized Hudi classes. However, during rebasing/merging of the RFC-46 feature branch this changes have been partially reverted and this PR takes a stab at reinstating these.On top of that we had to revisit our current approach to bundling and shading Kryo universally for all bundles. Instead
Impact
This will improve performance of ser/de during shuffles, since no FQNs will need to be serialized.
Risk level (write none, low medium or high below)
Low: our bundle validation and testing should uncover issues w/ the packaging if any
Documentation Update
Documentation update is required now specifying that
--conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrarproperty will be mandatoryContributor's checklist