Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.hudi.common.util;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.util.Utf8;
import org.objenesis.strategy.StdInstantiatorStrategy;

import java.io.ByteArrayOutputStream;
Expand All @@ -36,9 +38,6 @@ public class SerializationUtils {
private static final ThreadLocal<KryoSerializerInstance> SERIALIZER_REF =
ThreadLocal.withInitial(KryoSerializerInstance::new);

// Serialize
// -----------------------------------------------------------------------

/**
* <p>
* Serializes an {@code Object} to a byte array for storage/serialization.
Expand All @@ -52,9 +51,6 @@ public static byte[] serialize(final Object obj) throws IOException {
return SERIALIZER_REF.get().serialize(obj);
}

// Deserialize
// -----------------------------------------------------------------------

/**
* <p>
* Deserializes a single {@code Object} from an array of bytes.
Expand Down Expand Up @@ -112,17 +108,42 @@ Object deserialize(byte[] objectData) {
private static class KryoInstantiator implements Serializable {

public Kryo newKryo() {

Kryo kryo = new Kryo();
// ensure that kryo doesn't fail if classes are not registered with kryo.

// This instance of Kryo should not require prior registration of classes
kryo.setRegistrationRequired(false);
// This would be used for object initialization if nothing else works out.
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
// Handle cases where we may have an odd classloader setup like with libjars
// for hadoop
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());

// Register serializers
kryo.register(Utf8.class, new AvroUtf8Serializer());

return kryo;
}

}

/**
* NOTE: This {@link Serializer} could deserialize instance of {@link Utf8} serialized
* by implicitly generated Kryo serializer (based on {@link com.esotericsoftware.kryo.serializers.FieldSerializer}
*/
private static class AvroUtf8Serializer extends Serializer<Utf8> {

@SuppressWarnings("unchecked")
@Override
public void write(Kryo kryo, Output output, Utf8 utf8String) {
Serializer<byte[]> bytesSerializer = kryo.getDefaultSerializer(byte[].class);
bytesSerializer.write(kryo, output, utf8String.getBytes());
}

@SuppressWarnings("unchecked")
@Override
public Utf8 read(Kryo kryo, Input input, Class<Utf8> type) {
Serializer<byte[]> bytesSerializer = kryo.getDefaultSerializer(byte[].class);
byte[] bytes = bytesSerializer.read(kryo, input, byte[].class);
return new Utf8(bytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@
package org.apache.hudi.common.util;

import org.apache.avro.util.Utf8;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Objects;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -52,12 +58,33 @@ public void testSerDeser() throws IOException {
verifyObject(new LinkedList<>(Arrays.asList(2, 3, 5)));
}

@Test
public void testAvroUtf8SerDe() throws IOException {
byte[] firstBytes = SerializationUtils.serialize(new Utf8("test"));
// 4 byte string + 3 bytes length (Kryo uses variable-length encoding)
assertEquals(7, firstBytes.length);
}

@Test
public void testClassFullyQualifiedNameSerialization() throws IOException {
DeleteRecord deleteRecord = DeleteRecord.create(new HoodieKey("key", "partition"));
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(new DeleteRecord[]{deleteRecord}, Collections.emptyMap());

byte[] firstBytes = SerializationUtils.serialize(deleteBlock);
byte[] secondBytes = SerializationUtils.serialize(deleteBlock);

assertNotSame(firstBytes, secondBytes);
// NOTE: Here we assert that Kryo doesn't optimize out the fully-qualified class-name
// and always writes it out
assertEquals(ByteBuffer.wrap(firstBytes), ByteBuffer.wrap(secondBytes));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this throwing exception before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't. This is a litmus test to make sure this doesn't happen

}

private <T> void verifyObject(T expectedValue) throws IOException {
byte[] serializedObject = SerializationUtils.serialize(expectedValue);
assertNotNull(serializedObject);
assertTrue(serializedObject.length > 0);

final T deserializedValue = SerializationUtils.<T>deserialize(serializedObject);
final T deserializedValue = SerializationUtils.deserialize(serializedObject);
if (expectedValue == null) {
assertNull(deserializedValue);
} else {
Expand Down