Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodiePayloadProps;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
Expand Down Expand Up @@ -215,18 +216,16 @@ private Option<HoodieRecord> prepareRecord(HoodieRecord<T> hoodieRecord) {
// If the format can not record the operation field, nullify the DELETE payload manually.
boolean nullifyPayload = HoodieOperation.isDelete(hoodieRecord.getOperation()) && !config.allowOperationMetadataField();
recordProperties.put(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, String.valueOf(isUpdateRecord));
Option<HoodieRecord> finalRecord = Option.empty();
if (!nullifyPayload && !hoodieRecord.isDelete(tableSchema, recordProperties)) {
if (hoodieRecord.shouldIgnore(tableSchema, recordProperties)) {
return Option.of(hoodieRecord);
Option<HoodieRecord> finalRecord = nullifyPayload ? Option.empty() : Option.of(hoodieRecord);
// Check for delete
if (finalRecord.isPresent() && !finalRecord.get().isDelete(tableSchema, recordProperties)) {
// Check for ignore ExpressionPayload
if (finalRecord.get().shouldIgnore(tableSchema, recordProperties)) {
return finalRecord;
}
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
HoodieRecord rewrittenRecord;
if (schemaOnReadEnabled) {
rewrittenRecord = hoodieRecord.rewriteRecordWithNewSchema(tableSchema, recordProperties, writeSchemaWithMetaFields);
} else {
rewrittenRecord = hoodieRecord.rewriteRecord(tableSchema, recordProperties, writeSchemaWithMetaFields);
}
HoodieRecord rewrittenRecord = schemaOnReadEnabled ? finalRecord.get().rewriteRecordWithNewSchema(tableSchema, recordProperties, writeSchemaWithMetaFields)
: finalRecord.get().rewriteRecord(tableSchema, recordProperties, writeSchemaWithMetaFields);
HoodieRecord populatedRecord = populateMetadataFields(rewrittenRecord, writeSchemaWithMetaFields, recordProperties);
finalRecord = Option.of(populatedRecord);
if (isUpdateRecord) {
Expand All @@ -236,6 +235,7 @@ private Option<HoodieRecord> prepareRecord(HoodieRecord<T> hoodieRecord) {
}
recordsWritten++;
} else {
finalRecord = Option.empty();
recordsDeleted++;
}

Expand Down Expand Up @@ -364,7 +364,9 @@ private void processAppendResult(AppendResult result, List<HoodieRecord> recordL
updateWriteStatus(stat, result);
}

if (config.isMetadataColumnStatsIndexEnabled()) {
// TODO MetadataColumnStatsIndex for spark record
// https://issues.apache.org/jira/browse/HUDI-5249
if (config.isMetadataColumnStatsIndexEnabled() && recordMerger.getRecordType() == HoodieRecordType.AVRO) {
final List<Schema.Field> fieldsToIndex;
// If column stats index is enabled but columns not configured then we assume that
// all columns should be indexed
Expand Down Expand Up @@ -511,7 +513,7 @@ private void writeToBuffer(HoodieRecord<T> record) {
record.seal();
}
// fetch the ordering val first in case the record was deflated.
final Comparable<?> orderingVal = record.getOrderingValue(tableSchema, config.getProps());
final Comparable<?> orderingVal = record.getOrderingValue(tableSchema, recordProperties);
Option<HoodieRecord> indexedRecord = prepareRecord(record);
if (indexedRecord.isPresent()) {
// Skip the ignored record.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,6 @@ public void write(HoodieRecord<T> oldRecord) {
Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger.merge(oldRecord, oldSchema, newRecord, newSchema, props);
Schema combineRecordSchema = mergeResult.map(Pair::getRight).orElse(null);
Option<HoodieRecord> combinedRecord = mergeResult.map(Pair::getLeft);

if (combinedRecord.isPresent() && combinedRecord.get().shouldIgnore(combineRecordSchema, props)) {
// If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.
copyOldRecord = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -122,6 +123,19 @@ private HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, Ho
this.schema = schema;
}

public HoodieSparkRecord(
HoodieKey key,
InternalRow data,
StructType schema,
HoodieOperation operation,
HoodieRecordLocation currentLocation,
HoodieRecordLocation newLocation,
boolean copy) {
super(key, data, operation, currentLocation, newLocation);
this.copy = copy;
this.schema = schema;
}

@Override
public HoodieSparkRecord newInstance() {
return new HoodieSparkRecord(this.key, this.data, this.schema, this.operation, this.copy);
Expand Down Expand Up @@ -175,7 +189,7 @@ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) {
InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData());
UnsafeProjection projection =
HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, targetStructType);
return new HoodieSparkRecord(getKey(), projection.apply(mergeRow), targetStructType, getOperation(), copy);
return new HoodieSparkRecord(getKey(), projection.apply(mergeRow), targetStructType, getOperation(), this.currentLocation, this.newLocation, copy);
}

@Override
Expand All @@ -189,7 +203,7 @@ public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema
// TODO add actual rewriting
InternalRow finalRow = new HoodieInternalRow(metaFields, data, containMetaFields);

return new HoodieSparkRecord(getKey(), finalRow, targetStructType, getOperation(), copy);
return new HoodieSparkRecord(getKey(), finalRow, targetStructType, getOperation(), this.currentLocation, this.newLocation, copy);
}

@Override
Expand All @@ -204,7 +218,7 @@ public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties p
HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, newStructType, renameCols);
HoodieInternalRow finalRow = new HoodieInternalRow(metaFields, rewrittenRow, containMetaFields);

return new HoodieSparkRecord(getKey(), finalRow, newStructType, getOperation(), copy);
return new HoodieSparkRecord(getKey(), finalRow, newStructType, getOperation(), this.currentLocation, this.newLocation, copy);
}

@Override
Expand All @@ -219,7 +233,7 @@ public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props,
}
});

return new HoodieSparkRecord(getKey(), updatableRow, structType, getOperation(), copy);
return new HoodieSparkRecord(getKey(), updatableRow, structType, getOperation(), this.currentLocation, this.newLocation, copy);
}

@Override
Expand All @@ -244,11 +258,7 @@ public boolean isDelete(Schema recordSchema, Properties props) throws IOExceptio

@Override
public boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException {
if (data != null && data.equals(SENTINEL)) {
return true;
} else {
return false;
}
return false;
}

@Override
Expand Down Expand Up @@ -284,7 +294,7 @@ public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema, P
partition = data.get(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal(), StringType).toString();
}
HoodieKey hoodieKey = new HoodieKey(key, partition);
return new HoodieSparkRecord(hoodieKey, data, structType, getOperation(), copy);
return new HoodieSparkRecord(hoodieKey, data, structType, getOperation(), this.currentLocation, this.newLocation, copy);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.exception.HoodieException;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

import java.io.Serializable;
import java.util.Properties;

/**
* Base class for all AVRO record based payloads, that can be ordered based on a field.
Expand All @@ -32,12 +34,14 @@ public abstract class BaseAvroPayload implements Serializable {
/**
* Avro data extracted from the source converted to bytes.
*/
public final byte[] recordBytes;
protected final byte[] recordBytes;

/**
* For purposes of preCombining.
*/
public final Comparable orderingVal;
protected final Comparable orderingVal;

protected final boolean isDeletedRecord;

/**
* Instantiate {@link BaseAvroPayload}.
Expand All @@ -48,8 +52,46 @@ public abstract class BaseAvroPayload implements Serializable {
public BaseAvroPayload(GenericRecord record, Comparable orderingVal) {
this.recordBytes = record != null ? HoodieAvroUtils.avroToBytes(record) : new byte[0];
this.orderingVal = orderingVal;
this.isDeletedRecord = record == null || isDeleteRecord(record);

if (orderingVal == null) {
throw new HoodieException("Ordering value is null for record: " + record);
}
}

public Comparable getOrderingVal() {
return orderingVal;
}

/**
* Defines whether this implementation of {@link HoodieRecordPayload} is deleted.
* We will not do deserialization in this method.
*/
public boolean isDeleted(Schema schema, Properties props) {
return isDeletedRecord;
}

/**
* Defines whether this implementation of {@link HoodieRecordPayload} could produce
* {@link HoodieRecord#SENTINEL}
*/
public boolean canProduceSentinel() {
return false;
}

/**
* @param genericRecord instance of {@link GenericRecord} of interest.
* @returns {@code true} if record represents a delete record. {@code false} otherwise.
*/
protected static boolean isDeleteRecord(GenericRecord genericRecord) {
final String isDeleteKey = HoodieRecord.HOODIE_IS_DELETED_FIELD;
// Modify to be compatible with new version Avro.
// The new version Avro throws for GenericRecord.get if the field name
// does not exist in the schema.
if (genericRecord.getSchema().getField(isDeleteKey) == null) {
return false;
}
Object deleteMarker = genericRecord.get(isDeleteKey);
return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public DefaultHoodieRecordPayload(Option<GenericRecord> record) {

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException {
if (recordBytes.length == 0) {
if (recordBytes.length == 0 || isDeletedRecord) {
return Option.empty();
}

Expand All @@ -71,18 +71,18 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
/*
* Now check if the incoming record is a delete record.
*/
return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord);
return Option.of(incomingRecord);
}

@Override
public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException {
if (recordBytes.length == 0) {
if (recordBytes.length == 0 || isDeletedRecord) {
return Option.empty();
}
GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema);
eventTime = updateEventTime(incomingRecord, properties);

return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord);
return Option.of(incomingRecord);
}

private static Option<Object> updateEventTime(GenericRecord record, Properties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public EventTimeAvroPayload(Option<GenericRecord> record) {

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException {
if (recordBytes.length == 0) {
if (recordBytes.length == 0 || isDeletedRecord) {
return Option.empty();
}

Expand All @@ -61,17 +61,16 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
/*
* Now check if the incoming record is a delete record.
*/
return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord);
return Option.of(incomingRecord);
}

@Override
public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException {
if (recordBytes.length == 0) {
if (recordBytes.length == 0 || isDeletedRecord) {
return Option.empty();
}
GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);

return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord);
return Option.of(bytesToAvro(recordBytes, schema));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ public HoodieAvroRecord(HoodieRecord<T> record) {
super(record);
}

public HoodieAvroRecord(
HoodieKey key,
T data,
HoodieOperation operation,
HoodieRecordLocation currentLocation,
HoodieRecordLocation newLocation) {
super(key, data, operation, currentLocation, newLocation);
}

public HoodieAvroRecord() {
}

Expand Down Expand Up @@ -113,14 +122,14 @@ public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema
Option<IndexedRecord> avroRecordPayloadOpt = getData().getInsertValue(recordSchema, props);
GenericRecord avroPayloadInNewSchema =
HoodieAvroUtils.rewriteRecord((GenericRecord) avroRecordPayloadOpt.get(), targetSchema);
return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroPayloadInNewSchema), getOperation());
return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroPayloadInNewSchema), getOperation(), this.currentLocation, this.newLocation);
}

@Override
public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) throws IOException {
GenericRecord oldRecord = (GenericRecord) getData().getInsertValue(recordSchema, props).get();
GenericRecord rewriteRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(oldRecord, newSchema, renameCols);
return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(rewriteRecord), getOperation());
return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(rewriteRecord), getOperation(), this.currentLocation, this.newLocation);
}

@Override
Expand All @@ -133,30 +142,36 @@ public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props,
}
});

return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroRecordPayload), getOperation());
return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroRecordPayload), getOperation(), this.currentLocation, this.newLocation);
}

@Override
public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, String keyFieldName) throws IOException {
GenericRecord avroRecordPayload = (GenericRecord) getData().getInsertValue(recordSchema, props).get();
avroRecordPayload.put(keyFieldName, StringUtils.EMPTY_STRING);
return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroRecordPayload), getOperation());
return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroRecordPayload), getOperation(), this.currentLocation, this.newLocation);
}

@Override
public boolean isDelete(Schema recordSchema, Properties props) throws IOException {
return !getData().getInsertValue(recordSchema, props).isPresent();
if (this.data instanceof BaseAvroPayload) {
return ((BaseAvroPayload) this.data).isDeleted(recordSchema, props);
} else {
return !this.data.getInsertValue(recordSchema, props).isPresent();
}
}

@Override
public boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException {
Option<IndexedRecord> insertRecord = getData().getInsertValue(recordSchema, props);
// just skip the ignored record
if (insertRecord.isPresent() && insertRecord.get().equals(SENTINEL)) {
return true;
} else {
return false;
HoodieRecordPayload<?> recordPayload = getData();
// NOTE: Currently only records borne by [[ExpressionPayload]] can currently be ignored,
// as such, we limit exposure of this method only to such payloads
if (recordPayload instanceof BaseAvroPayload && ((BaseAvroPayload) recordPayload).canProduceSentinel()) {
Option<IndexedRecord> insertRecord = recordPayload.getInsertValue(recordSchema, props);
return insertRecord.isPresent() && insertRecord.get().equals(SENTINEL);
}

return false;
}

@Override
Expand Down
Loading