diff --git a/README.md b/README.md
index 9af1b78b76529..2efc756a3fb6f 100644
--- a/README.md
+++ b/README.md
@@ -1,3 +1,4 @@
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
org.apache.parquet
diff --git a/hudi-client/hudi-java-client/pom.xml b/hudi-client/hudi-java-client/pom.xml
index 4eee515578126..00fc0c3513ca2 100644
--- a/hudi-client/hudi-java-client/pom.xml
+++ b/hudi-client/hudi-java-client/pom.xml
@@ -43,6 +43,12 @@
${project.parent.version}
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
org.apache.parquet
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoProvider.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
similarity index 63%
rename from hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoProvider.scala
rename to hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
index 9dcb911971c8b..3894065d8090b 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoProvider.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
@@ -19,11 +19,12 @@
package org.apache.spark
import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.serializers.JavaSerializer
import org.apache.hudi.client.model.HoodieInternalRow
-import org.apache.hudi.common.model.{HoodieKey, HoodieSparkRecord}
-import org.apache.hudi.common.util.HoodieCommonKryoProvider
+import org.apache.hudi.common.config.SerializableConfiguration
+import org.apache.hudi.common.model.HoodieSparkRecord
+import org.apache.hudi.common.util.HoodieCommonKryoRegistrar
import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.serializer.KryoRegistrator
/**
@@ -42,22 +43,31 @@ import org.apache.spark.serializer.KryoRegistrator
* or renamed (w/o correspondingly updating such usages)
*
*/
-class HoodieSparkKryoProvider extends HoodieCommonKryoProvider {
- override def registerClasses(): Array[Class[_]] = {
+class HoodieSparkKryoRegistrar extends HoodieCommonKryoRegistrar with KryoRegistrator {
+ override def registerClasses(kryo: Kryo): Unit = {
///////////////////////////////////////////////////////////////////////////
// NOTE: DO NOT REORDER REGISTRATIONS
///////////////////////////////////////////////////////////////////////////
- val classes = super[HoodieCommonKryoProvider].registerClasses()
- classes ++ Array(
- classOf[HoodieWriteConfig],
- classOf[HoodieSparkRecord],
- classOf[HoodieInternalRow]
- )
+ super[HoodieCommonKryoRegistrar].registerClasses(kryo)
+
+ kryo.register(classOf[HoodieWriteConfig])
+
+ kryo.register(classOf[HoodieSparkRecord])
+ kryo.register(classOf[HoodieInternalRow])
+
+ // NOTE: Hadoop's configuration is not a serializable object by itself, and hence
+ // we're relying on [[SerializableConfiguration]] wrapper to work it around
+ kryo.register(classOf[SerializableConfiguration], new JavaSerializer())
}
}
-object HoodieSparkKryoProvider {
+object HoodieSparkKryoRegistrar {
+
+ // NOTE: We're copying definition of the config introduced in Spark 3.0
+ // (to stay compatible w/ Spark 2.4)
+ private val KRYO_USER_REGISTRATORS = "spark.kryo.registrator"
+
def register(conf: SparkConf): SparkConf = {
- conf.registerKryoClasses(new HoodieSparkKryoProvider().registerClasses())
+ conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[HoodieSparkKryoRegistrar].getName).mkString(","))
}
}
\ No newline at end of file
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
index f4ddd82a5e914..cdf762db0ac64 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
@@ -38,7 +38,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.spark.HoodieSparkKryoProvider$;
+import org.apache.spark.HoodieSparkKryoRegistrar$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
@@ -139,7 +139,7 @@ public synchronized void runBeforeEach() throws Exception {
initialized = spark != null && hdfsTestService != null;
if (!initialized) {
SparkConf sparkConf = conf();
- HoodieSparkKryoProvider$.MODULE$.register(sparkConf);
+ HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index f2e630eaa36d1..609fdb0bd5c0e 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -18,6 +18,7 @@
package org.apache.hudi.testutils;
+import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -92,10 +93,18 @@ public class HoodieClientTestUtils {
*/
public static SparkConf getSparkConfForTest(String appName) {
SparkConf sparkConf = new SparkConf().setAppName(appName)
- .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[4]")
+ .setMaster("local[4]")
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
+ .set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.set("spark.sql.shuffle.partitions", "4")
.set("spark.default.parallelism", "4");
+ if (HoodieSparkUtils.gteqSpark3_2()) {
+ sparkConf.set("spark.sql.catalog.spark_catalog",
+ "org.apache.spark.sql.hudi.catalog.HoodieCatalog");
+ }
+
String evlogDir = System.getProperty("SPARK_EVLOG_DIR");
if (evlogDir != null) {
sparkConf.set("spark.eventLog.enabled", "true");
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index 0b4fc38dfed9e..8664cf7865d74 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -60,7 +60,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.spark.HoodieSparkKryoProvider$;
+import org.apache.spark.HoodieSparkKryoRegistrar$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -186,7 +186,7 @@ public synchronized void runBeforeEach() {
initialized = spark != null;
if (!initialized) {
SparkConf sparkConf = conf();
- HoodieSparkKryoProvider$.MODULE$.register(sparkConf);
+ HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java
index 92b1f76ac4024..3a8bb1a300f1d 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java
@@ -47,6 +47,7 @@ default SparkConf conf(Map overwritingConfigs) {
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ sparkConf.set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar");
overwritingConfigs.forEach(sparkConf::set);
return sparkConf;
}
diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml
index 200b759ed234d..10cbc7fe7d081 100644
--- a/hudi-common/pom.xml
+++ b/hudi-common/pom.xml
@@ -229,10 +229,10 @@
test
+
com.esotericsoftware
kryo-shaded
- 4.0.2
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableConfiguration.java b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableConfiguration.java
index 408cb178af068..23a22e018220c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableConfiguration.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableConfiguration.java
@@ -31,6 +31,7 @@
public class SerializableConfiguration implements Serializable {
private static final long serialVersionUID = 1L;
+
private transient Configuration configuration;
public SerializableConfiguration(Configuration configuration) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieCommonKryoProvider.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieCommonKryoRegistrar.java
similarity index 93%
rename from hudi-common/src/main/java/org/apache/hudi/common/util/HoodieCommonKryoProvider.java
rename to hudi-common/src/main/java/org/apache/hudi/common/util/HoodieCommonKryoRegistrar.java
index b25efb632582a..5db7c641ac246 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieCommonKryoProvider.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieCommonKryoRegistrar.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.util;
+import com.esotericsoftware.kryo.Kryo;
import org.apache.hudi.common.HoodieJsonPayload;
import org.apache.hudi.common.model.AWSDmsAvroPayload;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
@@ -36,6 +37,8 @@
import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
import org.apache.hudi.metadata.HoodieMetadataPayload;
+import java.util.Arrays;
+
/**
* NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
*
@@ -52,14 +55,13 @@
* or renamed (w/o correspondingly updating such usages)
*
*/
-public class HoodieCommonKryoProvider {
+public class HoodieCommonKryoRegistrar {
- public Class>[] registerClasses() {
+ public void registerClasses(Kryo kryo) {
///////////////////////////////////////////////////////////////////////////
// NOTE: DO NOT REORDER REGISTRATIONS
///////////////////////////////////////////////////////////////////////////
-
- return new Class>[] {
+ Arrays.stream(new Class>[] {
HoodieAvroRecord.class,
HoodieAvroIndexedRecord.class,
HoodieEmptyRecord.class,
@@ -81,7 +83,8 @@ public Class>[] registerClasses() {
HoodieRecordLocation.class,
HoodieRecordGlobalLocation.class
- };
+ })
+ .forEachOrdered(kryo::register);
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
index 5962cd6e3c734..d31e648c00e75 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
@@ -28,7 +28,6 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
-import java.util.Arrays;
/**
* {@link SerializationUtils} class internally uses {@link Kryo} serializer for serializing / deserializing objects.
@@ -119,8 +118,7 @@ public Kryo newKryo() {
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
// Register Hudi's classes
- Arrays.stream(new HoodieCommonKryoProvider().registerClasses())
- .forEach(kryo::register);
+ new HoodieCommonKryoRegistrar().registerClasses(kryo);
// Register serializers
kryo.register(Utf8.class, new AvroUtf8Serializer());
diff --git a/hudi-examples/bin/hudi-delta-streamer b/hudi-examples/bin/hudi-delta-streamer
index a1e9ee18804f0..352d2a515ec8a 100755
--- a/hudi-examples/bin/hudi-delta-streamer
+++ b/hudi-examples/bin/hudi-delta-streamer
@@ -29,6 +29,8 @@ fi
exec "${SPARK_HOME}"/bin/spark-submit \
--master ${SPARK_MASTER} \
--conf spark.serializer="org.apache.spark.serializer.KryoSerializer" \
+--conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
+--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
--conf spark.kryoserializer.buffer.max=128m \
--conf spark.yarn.queue=root.default \
--conf spark.yarn.submit.waitAppCompletion=false \
diff --git a/hudi-examples/hudi-examples-common/pom.xml b/hudi-examples/hudi-examples-common/pom.xml
index cf6c9748341dc..dac3d391e8563 100644
--- a/hudi-examples/hudi-examples-common/pom.xml
+++ b/hudi-examples/hudi-examples-common/pom.xml
@@ -95,6 +95,12 @@
${project.version}
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
org.apache.avro
diff --git a/hudi-examples/hudi-examples-java/pom.xml b/hudi-examples/hudi-examples-java/pom.xml
index 03a74a6137cb5..48dcf1d9d5cbb 100644
--- a/hudi-examples/hudi-examples-java/pom.xml
+++ b/hudi-examples/hudi-examples-java/pom.xml
@@ -121,6 +121,12 @@
${project.version}
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
org.apache.hudi
hudi-client-common
diff --git a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/HoodieExampleSparkUtils.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/HoodieExampleSparkUtils.java
index fcdc2a813ab66..14f026a64b074 100644
--- a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/HoodieExampleSparkUtils.java
+++ b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/HoodieExampleSparkUtils.java
@@ -32,8 +32,9 @@ public class HoodieExampleSparkUtils {
private static Map defaultConf() {
Map additionalConfigs = new HashMap<>();
additionalConfigs.put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- additionalConfigs.put("spark.kryoserializer.buffer.max", "512m");
+ additionalConfigs.put("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar");
additionalConfigs.put("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
+ additionalConfigs.put("spark.kryoserializer.buffer.max", "512m");
return additionalConfigs;
}
diff --git a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java
index 4c7a02cc8419d..96c73dd240bc3 100644
--- a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java
+++ b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java
@@ -22,7 +22,7 @@
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.testutils.providers.SparkProvider;
-import org.apache.spark.HoodieSparkKryoProvider$;
+import org.apache.spark.HoodieSparkKryoRegistrar$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
@@ -84,7 +84,7 @@ public synchronized void runBeforeEach() {
initialized = spark != null;
if (!initialized) {
SparkConf sparkConf = conf();
- HoodieSparkKryoProvider$.MODULE$.register(sparkConf);
+ HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
diff --git a/hudi-examples/hudi-examples-spark/src/test/python/HoodiePySparkQuickstart.py b/hudi-examples/hudi-examples-spark/src/test/python/HoodiePySparkQuickstart.py
index c3be6a176c9b7..c1303f6365d20 100644
--- a/hudi-examples/hudi-examples-spark/src/test/python/HoodiePySparkQuickstart.py
+++ b/hudi-examples/hudi-examples-spark/src/test/python/HoodiePySparkQuickstart.py
@@ -255,6 +255,8 @@ def insertOverwrite(self):
.builder \
.appName("Hudi Spark basic example") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
+ .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") \
+ .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.kryoserializer.buffer.max", "512m") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.getOrCreate()
diff --git a/hudi-flink-datasource/hudi-flink/pom.xml b/hudi-flink-datasource/hudi-flink/pom.xml
index 68cb50290238c..3701998bbaa6e 100644
--- a/hudi-flink-datasource/hudi-flink/pom.xml
+++ b/hudi-flink-datasource/hudi-flink/pom.xml
@@ -132,12 +132,32 @@
${project.version}
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
org.apache.flink
${flink.streaming.java.artifactId}
${flink.version}
compile
+
+
+ com.esotericsoftware.kryo
+ kryo
+
+
+ com.esotericsoftware.minlog
+ minlog
+
+
+ org.objenesis
+ objenesis
+
+
org.apache.flink
@@ -152,6 +172,10 @@
com.esotericsoftware.minlog
minlog
+
+ org.objenesis
+ objenesis
+
diff --git a/hudi-hadoop-mr/pom.xml b/hudi-hadoop-mr/pom.xml
index 047f040943871..a358e720a2e18 100644
--- a/hudi-hadoop-mr/pom.xml
+++ b/hudi-hadoop-mr/pom.xml
@@ -43,6 +43,12 @@
${project.version}
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
org.apache.avro
diff --git a/hudi-integ-test/README.md b/hudi-integ-test/README.md
index ac62e61e03e85..c64a1b12f4ebc 100644
--- a/hudi-integ-test/README.md
+++ b/hudi-integ-test/README.md
@@ -205,6 +205,7 @@ spark-submit \
--conf spark.rdd.compress=true \
--conf spark.kryoserializer.buffer.max=2000m \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
+--conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
--conf spark.memory.storageFraction=0.1 \
--conf spark.shuffle.service.enabled=true \
--conf spark.sql.hive.convertMetastoreParquet=false \
@@ -251,6 +252,7 @@ spark-submit \
--conf spark.rdd.compress=true \
--conf spark.kryoserializer.buffer.max=2000m \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
+--conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
--conf spark.memory.storageFraction=0.1 \
--conf spark.shuffle.service.enabled=true \
--conf spark.sql.hive.convertMetastoreParquet=false \
@@ -443,6 +445,7 @@ spark-submit \
--conf spark.rdd.compress=true \
--conf spark.kryoserializer.buffer.max=2000m \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
+--conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
--conf spark.memory.storageFraction=0.1 \
--conf spark.shuffle.service.enabled=true \
--conf spark.sql.hive.convertMetastoreParquet=false \
@@ -571,6 +574,7 @@ Sample spark-submit command to test one delta streamer and a spark data source w
--conf spark.task.maxFailures=100 --conf spark.memory.fraction=0.4 \
--conf spark.rdd.compress=true --conf spark.kryoserializer.buffer.max=2000m \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
+--conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
--conf spark.memory.storageFraction=0.1 --conf spark.shuffle.service.enabled=true \
--conf spark.sql.hive.convertMetastoreParquet=false --conf spark.driver.maxResultSize=12g \
--conf spark.executor.heartbeatInterval=120s --conf spark.network.timeout=600s \
@@ -605,6 +609,7 @@ Sample spark-submit command to test one delta streamer and a spark data source w
--conf spark.task.maxFailures=100 --conf spark.memory.fraction=0.4 \
--conf spark.rdd.compress=true --conf spark.kryoserializer.buffer.max=2000m \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
+--conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
--conf spark.memory.storageFraction=0.1 --conf spark.shuffle.service.enabled=true \
--conf spark.sql.hive.convertMetastoreParquet=false --conf spark.driver.maxResultSize=12g \
--conf spark.executor.heartbeatInterval=120s --conf spark.network.timeout=600s \
@@ -663,6 +668,7 @@ Here is the full command:
--conf spark.rdd.compress=true \
--conf spark.kryoserializer.buffer.max=2000m \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
+--conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
--conf spark.memory.storageFraction=0.1 \
--conf spark.shuffle.service.enabled=true \
--conf spark.sql.hive.convertMetastoreParquet=false \
diff --git a/hudi-kafka-connect/pom.xml b/hudi-kafka-connect/pom.xml
index 7a0bf7ac07c97..8b233fb21e748 100644
--- a/hudi-kafka-connect/pom.xml
+++ b/hudi-kafka-connect/pom.xml
@@ -130,6 +130,13 @@
hudi-flink
${project.version}
+
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
org.apache.flink
flink-core
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 858d914d7b57c..9eda3e8be1001 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.analysis
import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.util.ReflectionUtils
+import org.apache.hudi.common.util.ReflectionUtils.loadClass
import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, SparkAdapterSupport}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.catalog.{CatalogUtils, HoodieCatalogTable}
@@ -54,18 +55,18 @@ object HoodieAnalysis {
if (HoodieSparkUtils.gteqSpark3_2) {
val dataSourceV2ToV1FallbackClass = "org.apache.spark.sql.hudi.analysis.HoodieDataSourceV2ToV1Fallback"
val dataSourceV2ToV1Fallback: RuleBuilder =
- session => ReflectionUtils.loadClass(dataSourceV2ToV1FallbackClass, session).asInstanceOf[Rule[LogicalPlan]]
+ session => instantiateKlass(dataSourceV2ToV1FallbackClass, session)
val spark3AnalysisClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3Analysis"
val spark3Analysis: RuleBuilder =
- session => ReflectionUtils.loadClass(spark3AnalysisClass, session).asInstanceOf[Rule[LogicalPlan]]
+ session => instantiateKlass(spark3AnalysisClass, session)
val resolveAlterTableCommandsClass =
if (HoodieSparkUtils.gteqSpark3_3)
"org.apache.spark.sql.hudi.Spark33ResolveHudiAlterTableCommand"
else "org.apache.spark.sql.hudi.Spark32ResolveHudiAlterTableCommand"
val resolveAlterTableCommands: RuleBuilder =
- session => ReflectionUtils.loadClass(resolveAlterTableCommandsClass, session).asInstanceOf[Rule[LogicalPlan]]
+ session => instantiateKlass(resolveAlterTableCommandsClass, session)
// NOTE: PLEASE READ CAREFULLY
//
@@ -76,7 +77,7 @@ object HoodieAnalysis {
} else if (HoodieSparkUtils.gteqSpark3_1) {
val spark31ResolveAlterTableCommandsClass = "org.apache.spark.sql.hudi.Spark31ResolveHudiAlterTableCommand"
val spark31ResolveAlterTableCommands: RuleBuilder =
- session => ReflectionUtils.loadClass(spark31ResolveAlterTableCommandsClass, session).asInstanceOf[Rule[LogicalPlan]]
+ session => instantiateKlass(spark31ResolveAlterTableCommandsClass, session)
rules ++= Seq(spark31ResolveAlterTableCommands)
}
@@ -93,7 +94,7 @@ object HoodieAnalysis {
if (HoodieSparkUtils.gteqSpark3_2) {
val spark3PostHocResolutionClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3PostAnalysisRule"
val spark3PostHocResolution: RuleBuilder =
- session => ReflectionUtils.loadClass(spark3PostHocResolutionClass, session).asInstanceOf[Rule[LogicalPlan]]
+ session => instantiateKlass(spark3PostHocResolutionClass, session)
rules += spark3PostHocResolution
}
@@ -114,7 +115,7 @@ object HoodieAnalysis {
"org.apache.spark.sql.execution.datasources.Spark31NestedSchemaPruning"
}
- val nestedSchemaPruningRule = ReflectionUtils.loadClass(nestedSchemaPruningClass).asInstanceOf[Rule[LogicalPlan]]
+ val nestedSchemaPruningRule = instantiateKlass(nestedSchemaPruningClass)
// TODO(HUDI-5443) re-enable
//optimizerRules += (_ => nestedSchemaPruningRule)
}
@@ -137,6 +138,16 @@ object HoodieAnalysis {
// CBO is only supported in Spark >= 3.1.x
def customPreCBORules: Seq[RuleBuilder] = Seq()
*/
+ private def instantiateKlass(klass: String): Rule[LogicalPlan] = {
+ loadClass(klass).asInstanceOf[Rule[LogicalPlan]]
+ }
+
+ private def instantiateKlass(klass: String, session: SparkSession): Rule[LogicalPlan] = {
+ // NOTE: We have to cast session to [[SparkSession]] sp that reflection lookup can
+ // find appropriate constructor in the target class
+ loadClass(klass, Array(classOf[SparkSession]).asInstanceOf[Array[Class[_]]], session)
+ .asInstanceOf[Rule[LogicalPlan]]
+ }
}
/**
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
index 54f31ee281d74..762b3bf602898 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
@@ -56,6 +56,7 @@
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
+import static org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest;
/**
* Sample program that writes & reads hoodie tables via the Spark datasource.
@@ -112,10 +113,10 @@ public static void main(String[] args) throws Exception {
}
public void run() throws Exception {
+ SparkSession spark = SparkSession.builder()
+ .config(getSparkConfForTest("Hoodie Spark APP"))
+ .getOrCreate();
- // Spark session setup..
- SparkSession spark = SparkSession.builder().appName("Hoodie Spark APP")
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate();
JavaSparkContext jssc = new JavaSparkContext(spark.sparkContext());
spark.sparkContext().setLogLevel("WARN");
FileSystem fs = FileSystem.get(jssc.hadoopConfiguration());
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaGenerateApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaGenerateApp.java
index 491e164b9572d..ff1f12df3afbf 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaGenerateApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaGenerateApp.java
@@ -52,6 +52,7 @@
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
+import static org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest;
public class HoodieJavaGenerateApp {
@Parameter(names = {"--table-path", "-p"}, description = "Path for Hoodie sample table")
@@ -109,9 +110,10 @@ public static void main(String[] args) throws Exception {
}
private SparkSession getOrCreateSparkSession() {
- // Spark session setup..
- SparkSession spark = SparkSession.builder().appName("Hoodie Spark APP")
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate();
+ SparkSession spark = SparkSession.builder()
+ .config(getSparkConfForTest("Hoodie Spark APP"))
+ .getOrCreate();
+
spark.sparkContext().setLogLevel("WARN");
return spark;
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
index 3f26b3d036cfd..9bb94b4e84021 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
@@ -59,6 +59,7 @@
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
+import static org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest;
/**
* Sample program that writes & reads hoodie tables via the Spark datasource streaming.
@@ -136,9 +137,10 @@ public static void main(String[] args) throws Exception {
* @throws Exception
*/
public void run() throws Exception {
- // Spark session setup..
- SparkSession spark = SparkSession.builder().appName("Hoodie Spark Streaming APP")
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate();
+ SparkSession spark = SparkSession.builder()
+ .config(getSparkConfForTest("Hoodie Spark Streaming APP"))
+ .getOrCreate();
+
JavaSparkContext jssc = new JavaSparkContext(spark.sparkContext());
// folder path clean up and creation, preparing the environment
@@ -205,8 +207,11 @@ public void run() throws Exception {
// Deletes Stream
// Need to restart application to ensure spark does not assume there are multiple streams active.
spark.close();
- SparkSession newSpark = SparkSession.builder().appName("Hoodie Spark Streaming APP")
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate();
+
+ SparkSession newSpark = SparkSession.builder()
+ .config(getSparkConfForTest("Hoodie Spark Streaming APP"))
+ .getOrCreate();
+
jssc = new JavaSparkContext(newSpark.sparkContext());
String ckptPath2 = streamingCheckpointingPath + "/stream2";
String srcPath2 = srcPath + "/stream2";
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
index 071b954a17f75..5fca2b8bffea5 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
@@ -36,10 +36,11 @@
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.hudi.HoodieSparkSessionExtension;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
+
+import static org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat;
@@ -58,22 +59,15 @@ public void setUp() {
}
private void initSparkContexts(String appName) {
- SparkConf sparkConf = new SparkConf();
- if (HoodieSparkUtils.gteqSpark3_2()) {
- sparkConf.set("spark.sql.catalog.spark_catalog",
- "org.apache.spark.sql.hudi.catalog.HoodieCatalog");
- }
- sparkSession = SparkSession.builder().appName(appName)
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .withExtensions(new HoodieSparkSessionExtension())
- .config("hoodie.insert.shuffle.parallelism", "4")
- .config("hoodie.upsert.shuffle.parallelism", "4")
- .config("hoodie.delete.shuffle.parallelism", "4")
+ SparkConf sparkConf = getSparkConfForTest(appName);
+
+ sparkSession = SparkSession.builder()
.config("hoodie.support.write.lock", "false")
.config("spark.sql.session.timeZone", "CTT")
.config("spark.sql.hive.convertMetastoreParquet", "false")
.config(sparkConf)
- .master("local[1]").getOrCreate();
+ .getOrCreate();
+
sparkSession.sparkContext().setLogLevel("ERROR");
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 212f6ad0fb1f5..657c7762c380e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -35,6 +35,7 @@ import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
import org.apache.hudi.functional.TestBootstrap
import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.testutils.DataSourceTestUtils
+import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{expr, lit}
@@ -95,20 +96,17 @@ class TestHoodieSparkSqlWriter {
/**
* Utility method for initializing the spark context.
+ *
+ * TODO rebase this onto existing base class to avoid duplication
*/
def initSparkContext(): Unit = {
- val sparkConf = new SparkConf()
- if (HoodieSparkUtils.gteqSpark3_2) {
- sparkConf.set("spark.sql.catalog.spark_catalog",
- "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
- }
+ val sparkConf = getSparkConfForTest(getClass.getSimpleName)
+
spark = SparkSession.builder()
- .appName(hoodieFooTableName)
- .master("local[2]")
.withExtensions(new HoodieSparkSessionExtension)
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config(sparkConf)
.getOrCreate()
+
sc = spark.sparkContext
sc.setLogLevel("ERROR")
sqlContext = spark.sqlContext
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
index 89d8faaf2e702..51682119d23f9 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
@@ -91,6 +91,8 @@ class TestHoodieSparkUtils {
.appName("Hoodie Datasource test")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
+ .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.getOrCreate
val schema = DataSourceTestUtils.getStructTypeExampleSchema
@@ -127,6 +129,8 @@ class TestHoodieSparkUtils {
.appName("Hoodie Datasource test")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
+ .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.getOrCreate
val innerStruct1 = new StructType().add("innerKey","string",false).add("innerValue", "long", true)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala
index 3258c7536d1c3..d9d5b59c8d762 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model._
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.DataSourceTestUtils
+import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
@@ -80,10 +81,7 @@ class TestTableSchemaResolverWithSparkSQL {
*/
def initSparkContext(): Unit = {
spark = SparkSession.builder()
- .appName(hoodieFooTableName)
- .master("local[2]")
- .withExtensions(new HoodieSparkSessionExtension)
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .config(getSparkConfForTest(hoodieFooTableName))
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index a2b9d29009fd8..8d57a29e42a40 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieCompactionConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
+import org.apache.hudi.testutils.HoodieClientTestUtils
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.functions.{col, lit}
@@ -65,12 +66,14 @@ class TestDataSourceForBootstrap {
val originalVerificationVal: String = "driver_0"
val updatedVerificationVal: String = "driver_update"
- @BeforeEach def initialize(@TempDir tempDir: java.nio.file.Path) {
- spark = SparkSession.builder
- .appName("Hoodie Datasource test")
- .master("local[2]")
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .getOrCreate
+ /**
+ * TODO rebase onto existing test base-class to avoid duplication
+ */
+ @BeforeEach
+ def initialize(@TempDir tempDir: java.nio.file.Path) {
+ val sparkConf = HoodieClientTestUtils.getSparkConfForTest(getClass.getSimpleName)
+
+ spark = SparkSession.builder.config(sparkConf).getOrCreate
basePath = tempDir.toAbsolutePath.toString + "/base"
srcPath = tempDir.toAbsolutePath.toString + "/src"
fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
index 84cc741b1d2c3..dede9fad45b91 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
@@ -42,7 +42,10 @@ class TestStreamingSource extends StreamTest {
org.apache.log4j.Logger.getRootLogger.setLevel(Level.WARN)
override protected def sparkConf = {
- super.sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ super.sparkConf
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
+ .set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
}
test("test cow stream source") {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala
index 8ba983246e600..eba322c16cf7f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala
@@ -39,6 +39,7 @@ object BoundInMemoryExecutorBenchmark extends HoodieBenchmarkBase {
.appName(this.getClass.getCanonicalName)
.withExtensions(new HoodieSparkSessionExtension)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
.config("spark.sql.session.timeZone", "CTT")
.config(sparkConf())
.getOrCreate()
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/CowTableReadBenchmark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/CowTableReadBenchmark.scala
index ef926658ad652..13d1746fd51a7 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/CowTableReadBenchmark.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/CowTableReadBenchmark.scala
@@ -39,6 +39,7 @@ object CowTableReadBenchmark extends HoodieBenchmarkBase {
.appName(this.getClass.getCanonicalName)
.withExtensions(new HoodieSparkSessionExtension)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
.config("hoodie.insert.shuffle.parallelism", "2")
.config("hoodie.upsert.shuffle.parallelism", "2")
.config("hoodie.delete.shuffle.parallelism", "2")
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala
index 582a6b4cf2012..ef7e606077f4d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala
@@ -45,6 +45,7 @@ object ReadAndWriteWithoutAvroBenchmark extends HoodieBenchmarkBase {
.config("spark.driver.memory", "4G")
.config("spark.executor.memory", "4G")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
.config("hoodie.insert.shuffle.parallelism", "2")
.config("hoodie.upsert.shuffle.parallelism", "2")
.config("hoodie.delete.shuffle.parallelism", "2")
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
index 6beed5cfc39a1..059f692514c88 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.ExceptionUtil.getRootCause
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
+import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -54,27 +55,18 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
DateTimeZone.setDefault(DateTimeZone.UTC)
TimeZone.setDefault(DateTimeUtils.getTimeZone("UTC"))
protected lazy val spark: SparkSession = SparkSession.builder()
- .master("local[1]")
- .appName("hoodie sql test")
- .withExtensions(new HoodieSparkSessionExtension)
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .config("spark.sql.warehouse.dir", sparkWareHouse.getCanonicalPath)
+ .config("spark.sql.session.timeZone", "UTC")
.config("hoodie.insert.shuffle.parallelism", "4")
.config("hoodie.upsert.shuffle.parallelism", "4")
.config("hoodie.delete.shuffle.parallelism", "4")
- .config("spark.sql.warehouse.dir", sparkWareHouse.getCanonicalPath)
- .config("spark.sql.session.timeZone", "UTC")
.config(sparkConf())
.getOrCreate()
private var tableId = 0
def sparkConf(): SparkConf = {
- val sparkConf = new SparkConf()
- if (HoodieSparkUtils.gteqSpark3_2) {
- sparkConf.set("spark.sql.catalog.spark_catalog",
- "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
- }
- sparkConf
+ getSparkConfForTest("Hoodie SQL Test")
}
protected def withTempDir(f: File => Unit): Unit = {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
index 9ac6d83f4eaf1..766dff82801fb 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
@@ -51,7 +51,6 @@ class TestBootstrapProcedure extends HoodieSparkProcedureTestBase {
}
spark.sql("set hoodie.bootstrap.parallelism = 20")
- // run bootstrap
checkAnswer(
s"""call run_bootstrap(
|table => '$tableName',
@@ -81,7 +80,7 @@ class TestBootstrapProcedure extends HoodieSparkProcedureTestBase {
// show bootstrap's index mapping
result = spark.sql(
s"""call show_bootstrap_mapping(table => '$tableName')""".stripMargin).collect()
- assertResult(3) {
+ assertResult(10) {
result.length
}
}
diff --git a/hudi-sync/hudi-sync-common/pom.xml b/hudi-sync/hudi-sync-common/pom.xml
index df31e860ca64a..a5ac2e88d762f 100644
--- a/hudi-sync/hudi-sync-common/pom.xml
+++ b/hudi-sync/hudi-sync-common/pom.xml
@@ -45,6 +45,12 @@
${project.version}
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
org.apache.parquet
parquet-avro
diff --git a/hudi-timeline-service/pom.xml b/hudi-timeline-service/pom.xml
index 668ee4e279f69..88eebaa2bd8ae 100644
--- a/hudi-timeline-service/pom.xml
+++ b/hudi-timeline-service/pom.xml
@@ -81,6 +81,12 @@
${project.version}
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
com.fasterxml.jackson.core
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
index 402b380a00e08..1f6cac0aacb9d 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
@@ -52,6 +52,8 @@
import scala.Tuple2;
+import static org.apache.hudi.utilities.UtilHelpers.buildSparkConf;
+
/**
* Hoodie snapshot copy job which copies latest files from all partitions to another place, for snapshot backup.
*
@@ -183,8 +185,7 @@ public static void main(String[] args) throws IOException {
cfg.outputPath));
// Create a spark job to do the snapshot copy
- SparkConf sparkConf = new SparkConf().setAppName("Hoodie-snapshot-copier");
- sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ SparkConf sparkConf = buildSparkConf("Hoodie-snapshot-copier", "local[*]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
LOG.info("Initializing spark job.");
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index ca0a495e31213..c8878c2d66497 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -66,6 +66,8 @@
import scala.collection.JavaConversions;
+import static org.apache.hudi.utilities.UtilHelpers.buildSparkConf;
+
/**
* Export the latest records of Hudi dataset to a set of external files (e.g., plain parquet files).
*/
@@ -282,8 +284,7 @@ public static void main(String[] args) throws IOException {
final Config cfg = new Config();
new JCommander(cfg, null, args);
- SparkConf sparkConf = new SparkConf().setAppName("Hoodie-snapshot-exporter");
- sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ SparkConf sparkConf = buildSparkConf("Hoodie-snapshot-exporter", "local[*]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
LOG.info("Initializing spark job.");
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 88e78077ee44c..d159fee0be4c2 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -64,7 +64,6 @@
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.HoodieSparkKryoProvider$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -275,6 +274,8 @@ private static SparkConf buildSparkConf(String appName, String defaultMaster, Ma
sparkConf.set("spark.ui.port", "8090");
sparkConf.setIfMissing("spark.driver.maxResultSize", "2g");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ sparkConf.set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar");
+ sparkConf.set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
@@ -282,7 +283,6 @@ private static SparkConf buildSparkConf(String appName, String defaultMaster, Ma
sparkConf.set("spark.driver.allowMultipleContexts", "true");
additionalConfigs.forEach(sparkConf::set);
- HoodieSparkKryoProvider$.MODULE$.register(sparkConf);
return sparkConf;
}
@@ -291,13 +291,14 @@ private static SparkConf buildSparkConf(String appName, Map addi
sparkConf.set("spark.ui.port", "8090");
sparkConf.setIfMissing("spark.driver.maxResultSize", "2g");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ sparkConf.set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar");
+ sparkConf.set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
additionalConfigs.forEach(sparkConf::set);
- HoodieSparkKryoProvider$.MODULE$.register(sparkConf);
return sparkConf;
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java
index 4238439294908..bac24f3484b0e 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java
@@ -35,7 +35,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.HoodieSparkKryoProvider$;
+import org.apache.spark.HoodieSparkKryoRegistrar$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
@@ -93,7 +93,7 @@ public void initWithCleanState() throws IOException {
boolean initialized = spark != null;
if (!initialized) {
SparkConf sparkConf = conf();
- HoodieSparkKryoProvider$.MODULE$.register(sparkConf);
+ HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
diff --git a/packaging/bundle-validation/conf/spark-defaults.conf b/packaging/bundle-validation/conf/spark-defaults.conf
index 07575134c735b..a53d141bc85c6 100644
--- a/packaging/bundle-validation/conf/spark-defaults.conf
+++ b/packaging/bundle-validation/conf/spark-defaults.conf
@@ -16,6 +16,7 @@
#
spark.serializer org.apache.spark.serializer.KryoSerializer
+spark.kryo.registrator org.apache.spark.HoodieSparkKryoRegistrar
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.sql.warehouse.dir file:///tmp/hudi-bundles/hive/warehouse
spark.default.parallelism 8
diff --git a/packaging/hudi-cli-bundle/pom.xml b/packaging/hudi-cli-bundle/pom.xml
index 981ec7405026e..e0ed9027fa02d 100644
--- a/packaging/hudi-cli-bundle/pom.xml
+++ b/packaging/hudi-cli-bundle/pom.xml
@@ -91,6 +91,12 @@
org.apache.hudi:hudi-cli
org.apache.hudi:hudi-utilities_${scala.binary.version}
+
+
+ com.esotericsoftware:kryo-shaded
+ com.esotericsoftware:minlog
+ org.objenesis:objenesis
+
com.fasterxml:classmate
com.fasterxml.woodstox:woodstox-core
@@ -117,6 +123,24 @@
+
+
+ com.esotericsoftware.kryo.
+ org.apache.hudi.com.esotericsoftware.kryo.
+
+
+ com.esotericsoftware.reflectasm.
+ org.apache.hudi.com.esotericsoftware.reflectasm.
+
+
+ com.esotericsoftware.minlog.
+ org.apache.hudi.com.esotericsoftware.minlog.
+
+
+ org.objenesis.
+ org.apache.hudi.org.objenesis.
+
+
com.google.code.gson.
org.apache.hudi.com.google.code.gson.
@@ -158,6 +182,15 @@
hudi-utilities_${scala.binary.version}
${project.version}
+
+
+
+ com.esotericsoftware
+ kryo-shaded
+ ${kryo.shaded.version}
+ compile
+
+
org.apache.hadoop
hadoop-common
diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml
index a24751c3c1d0e..63310604cb5d9 100644
--- a/packaging/hudi-flink-bundle/pom.xml
+++ b/packaging/hudi-flink-bundle/pom.xml
@@ -86,6 +86,11 @@
org.apache.hudi:hudi-timeline-service
org.apache.hudi:hudi-aws
+
+ com.esotericsoftware:kryo-shaded
+ com.esotericsoftware:minlog
+ org.objenesis:objenesis
+
com.yammer.metrics:metrics-core
com.beust:jcommander
io.javalin:javalin
@@ -162,6 +167,24 @@
+
+
+ com.esotericsoftware.kryo.
+ org.apache.hudi.com.esotericsoftware.kryo.
+
+
+ com.esotericsoftware.reflectasm.
+ org.apache.hudi.com.esotericsoftware.reflectasm.
+
+
+ com.esotericsoftware.minlog.
+ org.apache.hudi.com.esotericsoftware.minlog.
+
+
+ org.objenesis.
+ org.apache.hudi.org.objenesis.
+
+
javax.servlet.
${flink.bundle.shade.prefix}javax.servlet.
@@ -406,6 +429,15 @@
+
+
+
+ com.esotericsoftware
+ kryo-shaded
+ ${kryo.shaded.version}
+ compile
+
+
javax.servlet
javax.servlet-api
diff --git a/packaging/hudi-hadoop-mr-bundle/pom.xml b/packaging/hudi-hadoop-mr-bundle/pom.xml
index 4a501e72b8453..5ab88990a3b36 100644
--- a/packaging/hudi-hadoop-mr-bundle/pom.xml
+++ b/packaging/hudi-hadoop-mr-bundle/pom.xml
@@ -68,6 +68,12 @@
org.apache.hudi:hudi-common
org.apache.hudi:hudi-hadoop-mr
+
+
+ com.esotericsoftware:kryo-shaded
+ com.esotericsoftware:minlog
+ org.objenesis:objenesis
+
org.apache.parquet:parquet-avro
org.apache.avro:avro
org.apache.hbase:hbase-common
@@ -90,6 +96,24 @@
+
+
+ com.esotericsoftware.kryo.
+ org.apache.hudi.com.esotericsoftware.kryo.
+
+
+ com.esotericsoftware.reflectasm.
+ org.apache.hudi.com.esotericsoftware.reflectasm.
+
+
+ com.esotericsoftware.minlog.
+ org.apache.hudi.com.esotericsoftware.minlog.
+
+
+ org.objenesis.
+ org.apache.hudi.org.objenesis.
+
+
com.yammer.metrics.
org.apache.hudi.com.yammer.metrics.
@@ -241,6 +265,14 @@
${project.version}
+
+
+ com.esotericsoftware
+ kryo-shaded
+ ${kryo.shaded.version}
+ compile
+
+
org.apache.parquet
diff --git a/packaging/hudi-integ-test-bundle/pom.xml b/packaging/hudi-integ-test-bundle/pom.xml
index 2a4d754993208..c89e779204ac9 100644
--- a/packaging/hudi-integ-test-bundle/pom.xml
+++ b/packaging/hudi-integ-test-bundle/pom.xml
@@ -87,6 +87,11 @@
org.apache.hudi:hudi-timeline-service
org.apache.hudi:hudi-integ-test
+
+ com.esotericsoftware:kryo-shaded
+ com.esotericsoftware:minlog
+ org.objenesis:objenesis
+
org.apache.hbase:hbase-common
org.apache.hbase:hbase-client
org.apache.hbase:hbase-hadoop-compat
@@ -186,6 +191,25 @@
org.apache.spark.sql.avro.
org.apache.hudi.org.apache.spark.sql.avro.
+
+
+
+ com.esotericsoftware.kryo.
+ org.apache.hudi.com.esotericsoftware.kryo.
+
+
+ com.esotericsoftware.reflectasm.
+ org.apache.hudi.com.esotericsoftware.reflectasm.
+
+
+ com.esotericsoftware.minlog.
+ org.apache.hudi.com.esotericsoftware.minlog.
+
+
+ org.objenesis.
+ org.apache.hudi.org.objenesis.
+
+
com.beust.jcommander.
org.apache.hudi.com.beust.jcommander.
@@ -458,6 +482,14 @@
${project.version}
+
+
+ com.esotericsoftware
+ kryo-shaded
+ ${kryo.shaded.version}
+ compile
+
+
diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml b/packaging/hudi-kafka-connect-bundle/pom.xml
index 6321ee3c0e97d..33c7c9ca5f043 100644
--- a/packaging/hudi-kafka-connect-bundle/pom.xml
+++ b/packaging/hudi-kafka-connect-bundle/pom.xml
@@ -90,6 +90,11 @@
org.apache.flink:flink-core
${flink.hadoop.compatibility.artifactId}
+
+ com.esotericsoftware:kryo-shaded
+ com.esotericsoftware:minlog
+ org.objenesis:objenesis
+
com.lmax:disruptor
com.github.davidmoten:guava-mini
com.github.davidmoten:hilbert-curve
@@ -130,6 +135,24 @@
+
+
+ com.esotericsoftware.kryo.
+ org.apache.hudi.com.esotericsoftware.kryo.
+
+
+ com.esotericsoftware.reflectasm.
+ org.apache.hudi.com.esotericsoftware.reflectasm.
+
+
+ com.esotericsoftware.minlog.
+ org.apache.hudi.com.esotericsoftware.minlog.
+
+
+ org.objenesis.
+ org.apache.hudi.org.objenesis.
+
+
com.google.protobuf.
${kafka.connect.bundle.shade.prefix}com.google.protobuf.
@@ -321,6 +344,14 @@
+
+
+ com.esotericsoftware
+ kryo-shaded
+ ${kryo.shaded.version}
+ compile
+
+
org.apache.avro
diff --git a/packaging/hudi-presto-bundle/pom.xml b/packaging/hudi-presto-bundle/pom.xml
index 6c093b70187d3..e08c83fe913c1 100644
--- a/packaging/hudi-presto-bundle/pom.xml
+++ b/packaging/hudi-presto-bundle/pom.xml
@@ -69,6 +69,11 @@
org.apache.hudi:hudi-common
org.apache.hudi:hudi-hadoop-mr
+
+ com.esotericsoftware:kryo-shaded
+ com.esotericsoftware:minlog
+ org.objenesis:objenesis
+
org.apache.parquet:parquet-avro
org.apache.avro:avro
com.github.ben-manes.caffeine:caffeine
@@ -96,6 +101,24 @@
+
+
+ com.esotericsoftware.kryo.
+ org.apache.hudi.com.esotericsoftware.kryo.
+
+
+ com.esotericsoftware.reflectasm.
+ org.apache.hudi.com.esotericsoftware.reflectasm.
+
+
+ com.esotericsoftware.minlog.
+ org.apache.hudi.com.esotericsoftware.minlog.
+
+
+ org.objenesis.
+ org.apache.hudi.org.objenesis.
+
+
org.apache.parquet.avro.
org.apache.hudi.org.apache.parquet.avro.
@@ -264,6 +287,14 @@
${project.version}
+
+
+ com.esotericsoftware
+ kryo-shaded
+ ${kryo.shaded.version}
+ compile
+
+
org.apache.parquet
diff --git a/packaging/hudi-trino-bundle/pom.xml b/packaging/hudi-trino-bundle/pom.xml
index 91b79c901b739..e8073407db440 100644
--- a/packaging/hudi-trino-bundle/pom.xml
+++ b/packaging/hudi-trino-bundle/pom.xml
@@ -70,6 +70,11 @@
org.apache.hudi:hudi-common
org.apache.hudi:hudi-hadoop-mr
+
+ com.esotericsoftware:kryo-shaded
+ com.esotericsoftware:minlog
+ org.objenesis:objenesis
+
org.apache.parquet:parquet-avro
org.apache.avro:avro
org.codehaus.jackson:*
@@ -95,6 +100,24 @@
+
+
+ com.esotericsoftware.kryo.
+ org.apache.hudi.com.esotericsoftware.kryo.
+
+
+ com.esotericsoftware.reflectasm.
+ org.apache.hudi.com.esotericsoftware.reflectasm.
+
+
+ com.esotericsoftware.minlog.
+ org.apache.hudi.com.esotericsoftware.minlog.
+
+
+ org.objenesis.
+ org.apache.hudi.org.objenesis.
+
+
org.apache.parquet.avro.
org.apache.hudi.org.apache.parquet.avro.
@@ -249,6 +272,14 @@
${project.version}
+
+
+ com.esotericsoftware
+ kryo-shaded
+ ${kryo.shaded.version}
+ compile
+
+
org.apache.parquet
diff --git a/packaging/hudi-utilities-slim-bundle/README.md b/packaging/hudi-utilities-slim-bundle/README.md
index 60ee739153fdd..f9125749b958a 100644
--- a/packaging/hudi-utilities-slim-bundle/README.md
+++ b/packaging/hudi-utilities-slim-bundle/README.md
@@ -29,6 +29,7 @@ hudi-utilities-bundle solely introduces problems for a specific Spark version.
bin/spark-submit \
--driver-memory 4g --executor-memory 2g --num-executors 3 --executor-cores 1 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
+ --conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
--conf spark.sql.catalogImplementation=hive \
--conf spark.driver.maxResultSize=1g \
--conf spark.ui.port=6679 \
@@ -57,6 +58,8 @@ bin/spark-submit \
bin/spark-submit \
--driver-memory 4g --executor-memory 2g --num-executors 3 --executor-cores 1 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
+ --conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
+ --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
--conf spark.sql.catalogImplementation=hive \
--conf spark.driver.maxResultSize=1g \
--conf spark.ui.port=6679 \
@@ -85,6 +88,8 @@ bin/spark-submit \
bin/spark-submit \
--driver-memory 4g --executor-memory 2g --num-executors 3 --executor-cores 1 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
+ --conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
+ --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
--conf spark.sql.catalogImplementation=hive \
--conf spark.driver.maxResultSize=1g \
--conf spark.ui.port=6679 \
diff --git a/pom.xml b/pom.xml
index 0dff6d4d863d0..62719abddd0fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,6 +92,7 @@
0.37.0
1.8
+ 4.0.2
2.6.7
2.6.7.3
2.6.7.1
@@ -440,10 +441,6 @@
-
- com.esotericsoftware:kryo-shaded
- com.esotericsoftware:minlog
- org.objenesis:objenesis
org.apache.httpcomponents:httpclient
org.apache.httpcomponents:httpcore
@@ -451,23 +448,6 @@
-
-
- com.esotericsoftware.kryo.
- org.apache.hudi.com.esotericsoftware.kryo.
-
-
- com.esotericsoftware.reflectasm.
- org.apache.hudi.com.esotericsoftware.reflectasm.
-
-
- com.esotericsoftware.minlog.
- org.apache.hudi.com.esotericsoftware.minlog.
-
-
- org.objenesis.
- org.apache.hudi.org.objenesis.
-
org.apache.http.
@@ -1524,6 +1504,14 @@
test
+
+
+ com.esotericsoftware
+ kryo-shaded
+ ${kryo.shaded.version}
+ provided
+
+
com.esotericsoftware