Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import org.apache.hudi.cli.HoodieCliSparkConfig;
import org.apache.hudi.cli.commands.SparkEnvCommand;
import org.apache.hudi.cli.commands.SparkMain;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;

import org.apache.spark.HoodieSparkKryoRegistrar$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.launcher.SparkLauncher;
Expand Down Expand Up @@ -116,7 +116,7 @@ public static JavaSparkContext initJavaSparkContext(String name, Option<String>
}

public static JavaSparkContext initJavaSparkContext(SparkConf sparkConf) {
SparkRDDWriteClient.registerClasses(sparkConf);
HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().setBoolean(HoodieCliSparkConfig.CLI_PARQUET_ENABLE_SUMMARY_METADATA, false);
FSUtils.prepareHadoopConf(jsc.hadoopConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
package org.apache.hudi.cli.functional;

import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hudi.timeline.service.TimelineService;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.HoodieSparkKryoRegistrar$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
Expand Down Expand Up @@ -100,7 +100,7 @@ public synchronized void runBeforeEach() {
initialized = spark != null;
if (!initialized) {
SparkConf sparkConf = conf();
SparkRDDWriteClient.registerClasses(sparkConf);
HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

Expand Down Expand Up @@ -97,17 +96,6 @@ public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeC
super(context, writeConfig, timelineService, SparkUpgradeDowngradeHelper.getInstance());
}

/**
* Register hudi classes for Kryo serialization.
*
* @param conf instance of SparkConf
* @return SparkConf
*/
public static SparkConf registerClasses(SparkConf conf) {
conf.registerKryoClasses(new Class[]{HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class});
return conf;
}

@Override
protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) {
return SparkHoodieIndexFactory.createIndex(config);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import com.esotericsoftware.kryo.Kryo
import org.apache.hudi.client.model.HoodieInternalRow
import org.apache.hudi.commmon.model.HoodieSparkRecord
import org.apache.hudi.common.util.HoodieCommonKryoRegistrar
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.serializer.KryoRegistrator

/**
* NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
*
* This class is responsible for registering Hudi specific components that are often
* serialized by Kryo (for ex, during Spark's Shuffling operations) to make sure Kryo
* doesn't need to serialize their full class-names (for every object) which will quickly
* add up to considerable amount of overhead.
*
* Please note of the following:
* <ol>
* <li>Ordering of the registration COULD NOT change as it's directly impacting
* associated class ids (on the Kryo side)</li>
* <li>This class might be loaded up using reflection and as such should not be relocated
* or renamed (w/o correspondingly updating such usages)</li>
* </ol>
*/
class HoodieSparkKryoRegistrar extends HoodieCommonKryoRegistrar with KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
///////////////////////////////////////////////////////////////////////////
// NOTE: DO NOT REORDER REGISTRATIONS
///////////////////////////////////////////////////////////////////////////
super[HoodieCommonKryoRegistrar].registerClasses(kryo)

kryo.register(classOf[HoodieWriteConfig])

kryo.register(classOf[HoodieSparkRecord])
kryo.register(classOf[HoodieInternalRow])
}
}

object HoodieSparkKryoRegistrar {

// NOTE: We're copying definition of the config introduced in Spark 3.0
// (to stay compatible w/ Spark 2.4)
private val KRYO_USER_REGISTRATORS = "spark.kryo.registrator"

def register(conf: SparkConf): SparkConf = {
conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[HoodieSparkKryoRegistrar].getName).mkString(","))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down Expand Up @@ -458,7 +459,10 @@ private void testInsertAndCleanByVersions(

/**
* Test Clean-By-Commits using insert/upsert API.
*
* TODO reenable test after rebasing on master
*/
@Disabled
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testInsertAndCleanByCommits(boolean isAsync) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.spark.HoodieSparkKryoRegistrar$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
Expand Down Expand Up @@ -138,7 +139,7 @@ public synchronized void runBeforeEach() throws Exception {
initialized = spark != null && hdfsTestService != null;
if (!initialized) {
SparkConf sparkConf = conf();
SparkRDDWriteClient.registerClasses(sparkConf);
HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.hudi.testutils.providers.HoodieWriteClientProvider;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.spark.HoodieSparkKryoRegistrar$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -184,7 +185,7 @@ public synchronized void runBeforeEach() {
initialized = spark != null;
if (!initialized) {
SparkConf sparkConf = conf();
SparkRDDWriteClient.registerClasses(sparkConf);
HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.util;

import com.esotericsoftware.kryo.Kryo;
import org.apache.avro.util.Utf8;
import org.apache.hudi.common.HoodieJsonPayload;
import org.apache.hudi.common.model.AWSDmsAvroPayload;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.model.RewriteAvroPayload;
import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
import org.apache.hudi.metadata.HoodieMetadataPayload;

/**
* NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
*
* This class is responsible for registering Hudi specific components that are often
* serialized by Kryo (for ex, during Spark's Shuffling operations) to make sure Kryo
* doesn't need to serialize their full class-names (for every object) which will quickly
* add up to considerable amount of overhead.
*
* Please note of the following:
* <ol>
* <li>Ordering of the registration COULD NOT change as it's directly impacting
* associated class ids (on the Kryo side)</li>
* <li>This class might be loaded up using reflection and as such should not be relocated
* or renamed (w/o correspondingly updating such usages)</li>
* </ol>
*/
public class HoodieCommonKryoRegistrar {

public void registerClasses(Kryo kryo) {
///////////////////////////////////////////////////////////////////////////
// NOTE: DO NOT REORDER REGISTRATIONS
///////////////////////////////////////////////////////////////////////////

kryo.register(HoodieAvroRecord.class);
kryo.register(HoodieAvroIndexedRecord.class);
kryo.register(HoodieEmptyRecord.class);

kryo.register(OverwriteWithLatestAvroPayload.class);
kryo.register(DefaultHoodieRecordPayload.class);
kryo.register(OverwriteNonDefaultsWithLatestAvroPayload.class);
kryo.register(RewriteAvroPayload.class);
kryo.register(EventTimeAvroPayload.class);
kryo.register(PartialUpdateAvroPayload.class);
kryo.register(MySqlDebeziumAvroPayload.class);
kryo.register(PostgresDebeziumAvroPayload.class);
// TODO need to relocate to hudi-common
//kryo.register(BootstrapRecordPayload.class);
kryo.register(AWSDmsAvroPayload.class);
kryo.register(HoodieAvroPayload.class);
kryo.register(HoodieJsonPayload.class);
kryo.register(HoodieMetadataPayload.class);

kryo.register(HoodieRecordLocation.class);
kryo.register(HoodieRecordGlobalLocation.class);

kryo.register(Utf8.class, new SerializationUtils.AvroUtf8Serializer());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.hudi.common.util;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.util.Utf8;
import org.objenesis.strategy.StdInstantiatorStrategy;

import java.io.ByteArrayOutputStream;
Expand All @@ -36,9 +38,6 @@ public class SerializationUtils {
private static final ThreadLocal<KryoSerializerInstance> SERIALIZER_REF =
ThreadLocal.withInitial(KryoSerializerInstance::new);

// Serialize
// -----------------------------------------------------------------------

/**
* <p>
* Serializes an {@code Object} to a byte array for storage/serialization.
Expand All @@ -52,9 +51,6 @@ public static byte[] serialize(final Object obj) throws IOException {
return SERIALIZER_REF.get().serialize(obj);
}

// Deserialize
// -----------------------------------------------------------------------

/**
* <p>
* Deserializes a single {@code Object} from an array of bytes.
Expand Down Expand Up @@ -112,17 +108,42 @@ Object deserialize(byte[] objectData) {
private static class KryoInstantiator implements Serializable {

public Kryo newKryo() {

Kryo kryo = new Kryo();
// ensure that kryo doesn't fail if classes are not registered with kryo.

// This instance of Kryo should not require prior registration of classes
kryo.setRegistrationRequired(false);
// This would be used for object initialization if nothing else works out.
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
// Handle cases where we may have an odd classloader setup like with libjars
// for hadoop
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());

// Register Hudi's classes
new HoodieCommonKryoRegistrar().registerClasses(kryo);

return kryo;
}

}

/**
* NOTE: This {@link Serializer} could deserialize instance of {@link Utf8} serialized
* by implicitly generated Kryo serializer (based on {@link com.esotericsoftware.kryo.serializers.FieldSerializer}
*/
public static class AvroUtf8Serializer extends Serializer<Utf8> {

@SuppressWarnings("unchecked")
@Override
public void write(Kryo kryo, Output output, Utf8 utf8String) {
Serializer<byte[]> bytesSerializer = kryo.getDefaultSerializer(byte[].class);
bytesSerializer.write(kryo, output, utf8String.getBytes());
}

@SuppressWarnings("unchecked")
@Override
public Utf8 read(Kryo kryo, Input input, Class<Utf8> type) {
Serializer<byte[]> bytesSerializer = kryo.getDefaultSerializer(byte[].class);
byte[] bytes = bytesSerializer.read(kryo, input, byte[].class);
return new Utf8(bytes);
}
}
}
Loading