Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
Expand Down Expand Up @@ -67,7 +68,8 @@ mvn clean package -DskipTests
# Start command
spark-2.4.4-bin-hadoop2.7/bin/spark-shell \
--jars `ls packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-*.*.*-SNAPSHOT.jar` \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
```

To build for integration tests that include `hudi-integ-test-bundle`, use `-Dintegration-tests`.
Expand Down
2 changes: 2 additions & 0 deletions docker/demo/config/spark-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@
spark.master local[3]
spark.eventLog.dir hdfs://namenode:8020/tmp/spark-events
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator org.apache.spark.HoodieSparkKryoRegistrar

#spark.executor.memory 4g
# spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ spark-submit \
--conf spark.rdd.compress=true \
--conf spark.kryoserializer.buffer.max=2000m \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' \
--conf spark.memory.storageFraction=0.1 \
--conf spark.shuffle.service.enabled=true \
--conf spark.sql.hive.convertMetastoreParquet=false \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.cli.utils;

import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -43,8 +44,8 @@ public class SparkTempViewProvider implements TempViewProvider {

public SparkTempViewProvider(String appName) {
try {
SparkConf sparkConf = new SparkConf().setAppName(appName)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[8]");
SparkConf sparkConf = SparkUtil.getDefaultConf(appName, Option.of("local[8]"));

jsc = new JavaSparkContext(sparkConf);
sqlContext = new SQLContext(jsc);
} catch (Throwable ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;

import org.apache.spark.HoodieSparkKryoProvider$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.launcher.SparkLauncher;
Expand Down Expand Up @@ -92,6 +91,7 @@ public static SparkConf getDefaultConf(final String appName, final Option<String
sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_OVERWRITE, "true");
sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "false");
sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER, "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar");

// Configure hadoop conf
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESS, "true");
Expand All @@ -116,7 +116,6 @@ public static JavaSparkContext initJavaSparkContext(String name, Option<String>
}

public static JavaSparkContext initJavaSparkContext(SparkConf sparkConf) {
HoodieSparkKryoProvider$.MODULE$.register(sparkConf);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().setBoolean(HoodieCliSparkConfig.CLI_PARQUET_ENABLE_SUMMARY_METADATA, false);
FSUtils.prepareHadoopConf(jsc.hadoopConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.hudi.timeline.service.TimelineService;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.HoodieSparkKryoProvider$;
import org.apache.spark.HoodieSparkKryoRegistrar$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
Expand Down Expand Up @@ -100,7 +100,7 @@ public synchronized void runBeforeEach() {
initialized = spark != null;
if (!initialized) {
SparkConf sparkConf = conf();
HoodieSparkKryoProvider$.MODULE$.register(sparkConf);
HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
Expand Down
6 changes: 6 additions & 0 deletions hudi-client/hudi-client-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@
<artifactId>joda-time</artifactId>
</dependency>

<!-- Kryo -->
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
</dependency>

<!-- Parquet -->
<dependency>
<groupId>org.apache.parquet</groupId>
Expand Down
6 changes: 6 additions & 0 deletions hudi-client/hudi-java-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@
<version>${project.parent.version}</version>
</dependency>

<!-- Kryo -->
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
</dependency>

<!-- Parquet -->
<dependency>
<groupId>org.apache.parquet</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
package org.apache.spark

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.serializers.JavaSerializer
import org.apache.hudi.client.model.HoodieInternalRow
import org.apache.hudi.common.model.{HoodieKey, HoodieSparkRecord}
import org.apache.hudi.common.util.HoodieCommonKryoProvider
import org.apache.hudi.common.config.SerializableConfiguration
import org.apache.hudi.common.model.HoodieSparkRecord
import org.apache.hudi.common.util.HoodieCommonKryoRegistrar
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.serializer.KryoRegistrator

/**
Expand All @@ -42,22 +43,31 @@ import org.apache.spark.serializer.KryoRegistrator
* or renamed (w/o correspondingly updating such usages)</li>
* </ol>
*/
class HoodieSparkKryoProvider extends HoodieCommonKryoProvider {
override def registerClasses(): Array[Class[_]] = {
class HoodieSparkKryoRegistrar extends HoodieCommonKryoRegistrar with KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
///////////////////////////////////////////////////////////////////////////
// NOTE: DO NOT REORDER REGISTRATIONS
///////////////////////////////////////////////////////////////////////////
val classes = super[HoodieCommonKryoProvider].registerClasses()
classes ++ Array(
classOf[HoodieWriteConfig],
classOf[HoodieSparkRecord],
classOf[HoodieInternalRow]
)
super[HoodieCommonKryoRegistrar].registerClasses(kryo)

kryo.register(classOf[HoodieWriteConfig])

kryo.register(classOf[HoodieSparkRecord])
kryo.register(classOf[HoodieInternalRow])

// NOTE: Hadoop's configuration is not a serializable object by itself, and hence
// we're relying on [[SerializableConfiguration]] wrapper to work it around
kryo.register(classOf[SerializableConfiguration], new JavaSerializer())
}
}

object HoodieSparkKryoProvider {
object HoodieSparkKryoRegistrar {

// NOTE: We're copying definition of the config introduced in Spark 3.0
// (to stay compatible w/ Spark 2.4)
private val KRYO_USER_REGISTRATORS = "spark.kryo.registrator"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess we can make it public so that there is no need to hard code the option key spark.kryo.registrator everywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually won't be able to use it everywhere, so i rather stuck w/ the Spark option for consistency (which is the way we handle every other option as well)

def register(conf: SparkConf): SparkConf = {
conf.registerKryoClasses(new HoodieSparkKryoProvider().registerClasses())
conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[HoodieSparkKryoRegistrar].getName).mkString(","))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does .mkString(",") make sense here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to convert it to a string, so i kept it generic so that we can drop in one more class. Not strictly necessary though

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.spark.HoodieSparkKryoProvider$;
import org.apache.spark.HoodieSparkKryoRegistrar$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
Expand Down Expand Up @@ -139,7 +139,7 @@ public synchronized void runBeforeEach() throws Exception {
initialized = spark != null && hdfsTestService != null;
if (!initialized) {
SparkConf sparkConf = conf();
HoodieSparkKryoProvider$.MODULE$.register(sparkConf);
HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.testutils;

import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand Down Expand Up @@ -92,10 +93,18 @@ public class HoodieClientTestUtils {
*/
public static SparkConf getSparkConfForTest(String appName) {
SparkConf sparkConf = new SparkConf().setAppName(appName)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[4]")
.setMaster("local[4]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
.set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also move these common options into a tool method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly the method you're referring to (used in tests)

.set("spark.sql.shuffle.partitions", "4")
.set("spark.default.parallelism", "4");

if (HoodieSparkUtils.gteqSpark3_2()) {
sparkConf.set("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.hudi.catalog.HoodieCatalog");
}

String evlogDir = System.getProperty("SPARK_EVLOG_DIR");
if (evlogDir != null) {
sparkConf.set("spark.eventLog.enabled", "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.HoodieSparkKryoProvider$;
import org.apache.spark.HoodieSparkKryoRegistrar$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -186,7 +186,7 @@ public synchronized void runBeforeEach() {
initialized = spark != null;
if (!initialized) {
SparkConf sparkConf = conf();
HoodieSparkKryoProvider$.MODULE$.register(sparkConf);
HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ default SparkConf conf(Map<String, String> overwritingConfigs) {
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar");
overwritingConfigs.forEach(sparkConf::set);
return sparkConf;
}
Expand Down
2 changes: 1 addition & 1 deletion hudi-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,10 @@
<scope>test</scope>
</dependency>

<!-- Kryo -->
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
<version>4.0.2</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
public class SerializableConfiguration implements Serializable {

private static final long serialVersionUID = 1L;

private transient Configuration configuration;

public SerializableConfiguration(Configuration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.common.util;

import com.esotericsoftware.kryo.Kryo;
import org.apache.hudi.common.HoodieJsonPayload;
import org.apache.hudi.common.model.AWSDmsAvroPayload;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
Expand All @@ -36,6 +37,8 @@
import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
import org.apache.hudi.metadata.HoodieMetadataPayload;

import java.util.Arrays;

/**
* NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
*
Expand All @@ -52,14 +55,13 @@
* or renamed (w/o correspondingly updating such usages)</li>
* </ol>
*/
public class HoodieCommonKryoProvider {
public class HoodieCommonKryoRegistrar {

public Class<?>[] registerClasses() {
public void registerClasses(Kryo kryo) {
///////////////////////////////////////////////////////////////////////////
// NOTE: DO NOT REORDER REGISTRATIONS
///////////////////////////////////////////////////////////////////////////

return new Class<?>[] {
Arrays.stream(new Class<?>[] {
HoodieAvroRecord.class,
HoodieAvroIndexedRecord.class,
HoodieEmptyRecord.class,
Expand All @@ -81,7 +83,8 @@ public Class<?>[] registerClasses() {

HoodieRecordLocation.class,
HoodieRecordGlobalLocation.class
};
})
.forEachOrdered(kryo::register);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A stateless function (function that does not take any side effect) is always a better choice especially for tool method, personally I prefer the old way we handle this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree in principle, but here we actually aligning it w/ an interface of KryoRegistrator

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;

/**
* {@link SerializationUtils} class internally uses {@link Kryo} serializer for serializing / deserializing objects.
Expand Down Expand Up @@ -119,8 +118,7 @@ public Kryo newKryo() {
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());

// Register Hudi's classes
Arrays.stream(new HoodieCommonKryoProvider().registerClasses())
.forEach(kryo::register);
new HoodieCommonKryoRegistrar().registerClasses(kryo);

// Register serializers
kryo.register(Utf8.class, new AvroUtf8Serializer());
Expand Down
2 changes: 2 additions & 0 deletions hudi-examples/bin/hudi-delta-streamer
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ fi
exec "${SPARK_HOME}"/bin/spark-submit \
--master ${SPARK_MASTER} \
--conf spark.serializer="org.apache.spark.serializer.KryoSerializer" \
--conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
--conf spark.kryoserializer.buffer.max=128m \
--conf spark.yarn.queue=root.default \
--conf spark.yarn.submit.waitAppCompletion=false \
Expand Down
6 changes: 6 additions & 0 deletions hudi-examples/hudi-examples-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@
<version>${project.version}</version>
</dependency>

<!-- Kryo -->
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
</dependency>

<!-- Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
Expand Down
6 changes: 6 additions & 0 deletions hudi-examples/hudi-examples-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@
<version>${project.version}</version>
</dependency>

<!-- Kryo -->
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-client-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ public class HoodieExampleSparkUtils {
private static Map<String, String> defaultConf() {
Map<String, String> additionalConfigs = new HashMap<>();
additionalConfigs.put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
additionalConfigs.put("spark.kryoserializer.buffer.max", "512m");
additionalConfigs.put("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar");
additionalConfigs.put("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
additionalConfigs.put("spark.kryoserializer.buffer.max", "512m");
return additionalConfigs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.testutils.providers.SparkProvider;

import org.apache.spark.HoodieSparkKryoProvider$;
import org.apache.spark.HoodieSparkKryoRegistrar$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
Expand Down Expand Up @@ -84,7 +84,7 @@ public synchronized void runBeforeEach() {
initialized = spark != null;
if (!initialized) {
SparkConf sparkConf = conf();
HoodieSparkKryoProvider$.MODULE$.register(sparkConf);
HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
Expand Down
Loading