Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,9 @@ protected HoodieData<HoodieRecord> prepRecords(Map<MetadataPartitionType,
HoodieData<HoodieRecord> rddSinglePartitionRecords = records.map(r -> {
FileSlice slice = finalFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
fileGroupCount));
r.unseal();
r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
r.seal();
return r;
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.hudi.commmon.model;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.Schema;
import org.apache.hudi.HoodieInternalRowUtils;
import org.apache.hudi.SparkAdapterSupport$;
Expand Down Expand Up @@ -70,51 +74,67 @@
* need to be updated (ie serving as an overlay layer on top of [[UnsafeRow]])</li>
* </ul>
*

*/
public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
public class HoodieSparkRecord extends HoodieRecord<InternalRow> implements KryoSerializable {

/**
* Record copy operation to avoid double copying. InternalRow do not need to copy twice.
*/
private boolean copy;

/**
* We should use this construction method when we read internalRow from file.
* The record constructed by this method must be used in iter.
* NOTE: {@code HoodieSparkRecord} is holding the schema only in cases when it would have
* to execute {@link UnsafeProjection} so that the {@link InternalRow} it's holding to
* could be projected into {@link UnsafeRow} and be efficiently serialized subsequently
* (by Kryo)
*/
public HoodieSparkRecord(InternalRow data) {
private final transient StructType schema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is unnecessary. The schema is only used in construction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually now don't do projection in ctor, instead we only do it when serializing HoodieSparkRecord


public HoodieSparkRecord(UnsafeRow data) {
this(data, null);
}

public HoodieSparkRecord(InternalRow data, StructType schema) {
super(null, data);
validateRow(data);

validateRow(data, schema);
this.copy = false;
this.schema = schema;
}

public HoodieSparkRecord(HoodieKey key, UnsafeRow data, boolean copy) {
this(key, data, null, copy);
}

public HoodieSparkRecord(HoodieKey key, InternalRow data, boolean copy) {
public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, boolean copy) {
super(key, data);
validateRow(data);

validateRow(data, schema);
this.copy = copy;
this.schema = schema;
}

private HoodieSparkRecord(HoodieKey key, InternalRow data, HoodieOperation operation, boolean copy) {
private HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation, boolean copy) {
super(key, data, operation);
validateRow(data);

validateRow(data, schema);
this.copy = copy;
this.schema = schema;
}

@Override
public HoodieSparkRecord newInstance() {
return new HoodieSparkRecord(this.key, this.data, this.operation, this.copy);
return new HoodieSparkRecord(this.key, this.data, this.schema, this.operation, this.copy);
}

@Override
public HoodieSparkRecord newInstance(HoodieKey key, HoodieOperation op) {
return new HoodieSparkRecord(key, this.data, op, this.copy);
return new HoodieSparkRecord(key, this.data, this.schema, op, this.copy);
}

@Override
public HoodieSparkRecord newInstance(HoodieKey key) {
return new HoodieSparkRecord(key, this.data, this.operation, this.copy);
return new HoodieSparkRecord(key, this.data, this.schema, this.operation, this.copy);
}

@Override
Expand Down Expand Up @@ -155,7 +175,7 @@ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) {
InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData());
UnsafeProjection projection =
HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, targetStructType);
return new HoodieSparkRecord(getKey(), projection.apply(mergeRow), getOperation(), copy);
return new HoodieSparkRecord(getKey(), projection.apply(mergeRow), targetStructType, getOperation(), copy);
}

@Override
Expand All @@ -169,7 +189,7 @@ public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema
// TODO add actual rewriting
InternalRow finalRow = new HoodieInternalRow(metaFields, data, containMetaFields);

return new HoodieSparkRecord(getKey(), finalRow, getOperation(), copy);
return new HoodieSparkRecord(getKey(), finalRow, targetStructType, getOperation(), copy);
}

@Override
Expand All @@ -184,7 +204,7 @@ public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties p
HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, newStructType, renameCols);
HoodieInternalRow finalRow = new HoodieInternalRow(metaFields, rewrittenRow, containMetaFields);

return new HoodieSparkRecord(getKey(), finalRow, getOperation(), copy);
return new HoodieSparkRecord(getKey(), finalRow, newStructType, getOperation(), copy);
}

@Override
Expand All @@ -199,7 +219,7 @@ public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props,
}
});

return new HoodieSparkRecord(getKey(), updatableRow, getOperation(), copy);
return new HoodieSparkRecord(getKey(), updatableRow, structType, getOperation(), copy);
}

@Override
Expand Down Expand Up @@ -264,7 +284,7 @@ public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema, P
partition = data.get(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal(), StringType).toString();
}
HoodieKey hoodieKey = new HoodieKey(key, partition);
return new HoodieSparkRecord(hoodieKey, data, getOperation(), copy);
return new HoodieSparkRecord(hoodieKey, data, structType, getOperation(), copy);
}

@Override
Expand Down Expand Up @@ -299,6 +319,42 @@ public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) {
}
}

/**
* NOTE: This method is declared final to make sure there's no polymorphism and therefore
* JIT compiler could perform more aggressive optimizations
*/
@Override
protected final void writeRecordPayload(InternalRow payload, Kryo kryo, Output output) {
// NOTE: [[payload]] could be null if record has already been deflated
UnsafeRow unsafeRow = convertToUnsafeRow(payload, schema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HoodieInternalRow must hold UnsafeRow. Does it need to unsafe projection? We use HoodieInternalRow because we want to modify metaField.
I don't really understand why we should make sure that SparkRecord only hold UnsafeRow. HoodieInternalRow hold UnsafeRow too. They basically make no difference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done to simplify ser/deserialization sequnce:

  • We always project to UnsafeRow before serialization, therefore
  • We need to decode just the UnsafeRow when we deserialize
  • Ser/de of UnsafeRow is as simple as just writing out the bytes


kryo.writeObjectOrNull(output, unsafeRow, UnsafeRow.class);
}

/**
* NOTE: This method is declared final to make sure there's no polymorphism and therefore
* JIT compiler could perform more aggressive optimizations
*/
@Override
protected final InternalRow readRecordPayload(Kryo kryo, Input input) {
// NOTE: After deserialization every object is allocated on the heap, therefore
// we annotate this object as being copied
this.copy = true;

return kryo.readObjectOrNull(input, UnsafeRow.class);
}

private static UnsafeRow convertToUnsafeRow(InternalRow payload, StructType schema) {
if (payload == null) {
return null;
} else if (payload instanceof UnsafeRow) {
return (UnsafeRow) payload;
}

UnsafeProjection unsafeProjection = HoodieInternalRowUtils.getCachedUnsafeProjection(schema, schema);
return unsafeProjection.apply(payload);
}

private static HoodieInternalRow wrapIntoUpdatableOverlay(InternalRow data, StructType structType) {
if (data instanceof HoodieInternalRow) {
return (HoodieInternalRow) data;
Expand Down Expand Up @@ -351,14 +407,21 @@ private static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType s

HoodieOperation operation = withOperationField
? HoodieOperation.fromName(getNullableValAsString(structType, record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), record.data, operation, record.copy);
return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), record.data, structType, operation, record.copy);
}

private static void validateRow(InternalRow data) {
private static void validateRow(InternalRow data, StructType schema) {
// NOTE: [[HoodieSparkRecord]] is expected to hold either
// - Instance of [[UnsafeRow]] or
// - Instance of [[HoodieInternalRow]] or
// - Instance of [[ColumnarBatchRow]]
ValidationUtils.checkState(data instanceof UnsafeRow || data instanceof HoodieInternalRow || SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
//
// In case provided row is anything but [[UnsafeRow]], it's expected that the
// corresponding schema has to be provided as well so that it could be properly
// serialized (in case it would need to be)
boolean isValid = data instanceof UnsafeRow
|| schema != null && (data instanceof HoodieInternalRow || SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));

ValidationUtils.checkState(isValid);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> reco
String key = internalRow.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
String partition = internalRow.getString(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal());
HoodieKey hoodieKey = new HoodieKey(key, partition);
return (HoodieRecord) new HoodieSparkRecord(hoodieKey, internalRow, false);
return (HoodieRecord) new HoodieSparkRecord(hoodieKey, internalRow, structType, false);
});
} else {
throw new UnsupportedOperationException(recordType.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.hudi.common.model;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -200,4 +204,30 @@ public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) {
public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, Properties props) {
return Option.of(this);
}

/**
* NOTE: This method is declared final to make sure there's no polymorphism and therefore
* JIT compiler could perform more aggressive optimizations
*/
@SuppressWarnings("unchecked")
@Override
protected final void writeRecordPayload(IndexedRecord payload, Kryo kryo, Output output) {
// NOTE: We're leveraging Spark's default [[GenericAvroSerializer]] to serialize Avro
Serializer<GenericRecord> avroSerializer = kryo.getSerializer(GenericRecord.class);

kryo.writeObjectOrNull(output, payload, avroSerializer);
}

/**
* NOTE: This method is declared final to make sure there's no polymorphism and therefore
* JIT compiler could perform more aggressive optimizations
*/
@SuppressWarnings("unchecked")
@Override
protected final IndexedRecord readRecordPayload(Kryo kryo, Input input) {
// NOTE: We're leveraging Spark's default [[GenericAvroSerializer]] to serialize Avro
Serializer<GenericRecord> avroSerializer = kryo.getSerializer(GenericRecord.class);

return kryo.readObjectOrNull(input, GenericRecord.class, avroSerializer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.apache.hudi.common.model;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -193,4 +196,17 @@ public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, Prop
return Option.empty();
}
}

@Override
protected final void writeRecordPayload(T payload, Kryo kryo, Output output) {
// NOTE: Since [[orderingVal]] is polymorphic we have to write out its class
// to be able to properly deserialize it
kryo.writeClassAndObject(output, payload);
}

@SuppressWarnings("unchecked")
@Override
protected final T readRecordPayload(Kryo kryo, Input input) {
return (T) kryo.readClassAndObject(input);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,22 @@

package org.apache.hudi.common.model;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.Schema;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.BaseKeyGenerator;

import org.apache.avro.Schema;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class HoodieEmptyRecord<T> extends HoodieRecord<T> {

private final HoodieRecordType type;
private final Comparable<?> orderingVal;
private HoodieRecordType type;
private Comparable<?> orderingVal;

public HoodieEmptyRecord(HoodieKey key, HoodieRecordType type) {
super(key, null);
Expand Down Expand Up @@ -152,4 +154,28 @@ public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, Prop
public Option<Map<String, String>> getMetadata() {
return Option.empty();
}

/**
* NOTE: This method is declared final to make sure there's no polymorphism and therefore
* JIT compiler could perform more aggressive optimizations
*/
@Override
protected final void writeRecordPayload(T payload, Kryo kryo, Output output) {
kryo.writeObject(output, type);
// NOTE: Since [[orderingVal]] is polymorphic we have to write out its class
// to be able to properly deserialize it
kryo.writeClassAndObject(output, orderingVal);
}

/**
* NOTE: This method is declared final to make sure there's no polymorphism and therefore
* JIT compiler could perform more aggressive optimizations
*/
@Override
protected final T readRecordPayload(Kryo kryo, Input input) {
this.type = kryo.readObject(input, HoodieRecordType.class);
this.orderingVal = (Comparable<?>) kryo.readClassAndObject(input);
// NOTE: [[EmptyRecord]]'s payload is always null
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package org.apache.hudi.common.model;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import java.io.Serializable;
import java.util.Objects;

Expand All @@ -27,13 +32,13 @@
* - recordKey : a recordKey that acts as primary key for a record.
* - partitionPath : the partition path of a record.
*/
public class HoodieKey implements Serializable {
public final class HoodieKey implements Serializable, KryoSerializable {

private String recordKey;
private String partitionPath;

public HoodieKey() {
}
// Required for serializer
public HoodieKey() {}

public HoodieKey(String recordKey, String partitionPath) {
this.recordKey = recordKey;
Expand Down Expand Up @@ -81,4 +86,16 @@ public String toString() {
sb.append('}');
return sb.toString();
}

@Override
public void write(Kryo kryo, Output output) {
output.writeString(recordKey);
output.writeString(partitionPath);
}

@Override
public void read(Kryo kryo, Input input) {
this.recordKey = input.readString();
this.partitionPath = input.readString();
}
}
Loading