diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 564d63ba77542..2ef02b1dae54c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodiePayloadProps; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; import org.apache.hudi.common.model.IOType; @@ -215,18 +216,16 @@ private Option prepareRecord(HoodieRecord hoodieRecord) { // If the format can not record the operation field, nullify the DELETE payload manually. boolean nullifyPayload = HoodieOperation.isDelete(hoodieRecord.getOperation()) && !config.allowOperationMetadataField(); recordProperties.put(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, String.valueOf(isUpdateRecord)); - Option finalRecord = Option.empty(); - if (!nullifyPayload && !hoodieRecord.isDelete(tableSchema, recordProperties)) { - if (hoodieRecord.shouldIgnore(tableSchema, recordProperties)) { - return Option.of(hoodieRecord); + Option finalRecord = nullifyPayload ? Option.empty() : Option.of(hoodieRecord); + // Check for delete + if (finalRecord.isPresent() && !finalRecord.get().isDelete(tableSchema, recordProperties)) { + // Check for ignore ExpressionPayload + if (finalRecord.get().shouldIgnore(tableSchema, recordProperties)) { + return finalRecord; } // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema - HoodieRecord rewrittenRecord; - if (schemaOnReadEnabled) { - rewrittenRecord = hoodieRecord.rewriteRecordWithNewSchema(tableSchema, recordProperties, writeSchemaWithMetaFields); - } else { - rewrittenRecord = hoodieRecord.rewriteRecord(tableSchema, recordProperties, writeSchemaWithMetaFields); - } + HoodieRecord rewrittenRecord = schemaOnReadEnabled ? finalRecord.get().rewriteRecordWithNewSchema(tableSchema, recordProperties, writeSchemaWithMetaFields) + : finalRecord.get().rewriteRecord(tableSchema, recordProperties, writeSchemaWithMetaFields); HoodieRecord populatedRecord = populateMetadataFields(rewrittenRecord, writeSchemaWithMetaFields, recordProperties); finalRecord = Option.of(populatedRecord); if (isUpdateRecord) { @@ -236,6 +235,7 @@ private Option prepareRecord(HoodieRecord hoodieRecord) { } recordsWritten++; } else { + finalRecord = Option.empty(); recordsDeleted++; } @@ -364,7 +364,9 @@ private void processAppendResult(AppendResult result, List recordL updateWriteStatus(stat, result); } - if (config.isMetadataColumnStatsIndexEnabled()) { + // TODO MetadataColumnStatsIndex for spark record + // https://issues.apache.org/jira/browse/HUDI-5249 + if (config.isMetadataColumnStatsIndexEnabled() && recordMerger.getRecordType() == HoodieRecordType.AVRO) { final List fieldsToIndex; // If column stats index is enabled but columns not configured then we assume that // all columns should be indexed @@ -511,7 +513,7 @@ private void writeToBuffer(HoodieRecord record) { record.seal(); } // fetch the ordering val first in case the record was deflated. - final Comparable orderingVal = record.getOrderingValue(tableSchema, config.getProps()); + final Comparable orderingVal = record.getOrderingValue(tableSchema, recordProperties); Option indexedRecord = prepareRecord(record); if (indexedRecord.isPresent()) { // Skip the ignored record. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 276b318890138..4e5370f10898e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -347,7 +347,6 @@ public void write(HoodieRecord oldRecord) { Option> mergeResult = recordMerger.merge(oldRecord, oldSchema, newRecord, newSchema, props); Schema combineRecordSchema = mergeResult.map(Pair::getRight).orElse(null); Option combinedRecord = mergeResult.map(Pair::getLeft); - if (combinedRecord.isPresent() && combinedRecord.get().shouldIgnore(combineRecordSchema, props)) { // If it is an IGNORE_RECORD, just copy the old record, and do not update the new record. copyOldRecord = true; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java index 43000d19647cd..19b8cb5c658f4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.MetadataValues; import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; @@ -122,6 +123,19 @@ private HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, Ho this.schema = schema; } + public HoodieSparkRecord( + HoodieKey key, + InternalRow data, + StructType schema, + HoodieOperation operation, + HoodieRecordLocation currentLocation, + HoodieRecordLocation newLocation, + boolean copy) { + super(key, data, operation, currentLocation, newLocation); + this.copy = copy; + this.schema = schema; + } + @Override public HoodieSparkRecord newInstance() { return new HoodieSparkRecord(this.key, this.data, this.schema, this.operation, this.copy); @@ -175,7 +189,7 @@ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) { InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData()); UnsafeProjection projection = HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, targetStructType); - return new HoodieSparkRecord(getKey(), projection.apply(mergeRow), targetStructType, getOperation(), copy); + return new HoodieSparkRecord(getKey(), projection.apply(mergeRow), targetStructType, getOperation(), this.currentLocation, this.newLocation, copy); } @Override @@ -189,7 +203,7 @@ public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema // TODO add actual rewriting InternalRow finalRow = new HoodieInternalRow(metaFields, data, containMetaFields); - return new HoodieSparkRecord(getKey(), finalRow, targetStructType, getOperation(), copy); + return new HoodieSparkRecord(getKey(), finalRow, targetStructType, getOperation(), this.currentLocation, this.newLocation, copy); } @Override @@ -204,7 +218,7 @@ public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties p HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, newStructType, renameCols); HoodieInternalRow finalRow = new HoodieInternalRow(metaFields, rewrittenRow, containMetaFields); - return new HoodieSparkRecord(getKey(), finalRow, newStructType, getOperation(), copy); + return new HoodieSparkRecord(getKey(), finalRow, newStructType, getOperation(), this.currentLocation, this.newLocation, copy); } @Override @@ -219,7 +233,7 @@ public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, } }); - return new HoodieSparkRecord(getKey(), updatableRow, structType, getOperation(), copy); + return new HoodieSparkRecord(getKey(), updatableRow, structType, getOperation(), this.currentLocation, this.newLocation, copy); } @Override @@ -244,11 +258,7 @@ public boolean isDelete(Schema recordSchema, Properties props) throws IOExceptio @Override public boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException { - if (data != null && data.equals(SENTINEL)) { - return true; - } else { - return false; - } + return false; } @Override @@ -284,7 +294,7 @@ public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema, P partition = data.get(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal(), StringType).toString(); } HoodieKey hoodieKey = new HoodieKey(key, partition); - return new HoodieSparkRecord(hoodieKey, data, structType, getOperation(), copy); + return new HoodieSparkRecord(hoodieKey, data, structType, getOperation(), this.currentLocation, this.newLocation, copy); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java index cd3a95e6bf786..aaafe61abff9a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java @@ -21,9 +21,11 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.exception.HoodieException; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import java.io.Serializable; +import java.util.Properties; /** * Base class for all AVRO record based payloads, that can be ordered based on a field. @@ -32,12 +34,14 @@ public abstract class BaseAvroPayload implements Serializable { /** * Avro data extracted from the source converted to bytes. */ - public final byte[] recordBytes; + protected final byte[] recordBytes; /** * For purposes of preCombining. */ - public final Comparable orderingVal; + protected final Comparable orderingVal; + + protected final boolean isDeletedRecord; /** * Instantiate {@link BaseAvroPayload}. @@ -48,8 +52,46 @@ public abstract class BaseAvroPayload implements Serializable { public BaseAvroPayload(GenericRecord record, Comparable orderingVal) { this.recordBytes = record != null ? HoodieAvroUtils.avroToBytes(record) : new byte[0]; this.orderingVal = orderingVal; + this.isDeletedRecord = record == null || isDeleteRecord(record); + if (orderingVal == null) { throw new HoodieException("Ordering value is null for record: " + record); } } + + public Comparable getOrderingVal() { + return orderingVal; + } + + /** + * Defines whether this implementation of {@link HoodieRecordPayload} is deleted. + * We will not do deserialization in this method. + */ + public boolean isDeleted(Schema schema, Properties props) { + return isDeletedRecord; + } + + /** + * Defines whether this implementation of {@link HoodieRecordPayload} could produce + * {@link HoodieRecord#SENTINEL} + */ + public boolean canProduceSentinel() { + return false; + } + + /** + * @param genericRecord instance of {@link GenericRecord} of interest. + * @returns {@code true} if record represents a delete record. {@code false} otherwise. + */ + protected static boolean isDeleteRecord(GenericRecord genericRecord) { + final String isDeleteKey = HoodieRecord.HOODIE_IS_DELETED_FIELD; + // Modify to be compatible with new version Avro. + // The new version Avro throws for GenericRecord.get if the field name + // does not exist in the schema. + if (genericRecord.getSchema().getField(isDeleteKey) == null) { + return false; + } + Object deleteMarker = genericRecord.get(isDeleteKey); + return (deleteMarker instanceof Boolean && (boolean) deleteMarker); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java index 5a588eafa5f3f..a218e9dc33dd7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java @@ -51,7 +51,7 @@ public DefaultHoodieRecordPayload(Option record) { @Override public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException { - if (recordBytes.length == 0) { + if (recordBytes.length == 0 || isDeletedRecord) { return Option.empty(); } @@ -71,18 +71,18 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue /* * Now check if the incoming record is a delete record. */ - return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord); + return Option.of(incomingRecord); } @Override public Option getInsertValue(Schema schema, Properties properties) throws IOException { - if (recordBytes.length == 0) { + if (recordBytes.length == 0 || isDeletedRecord) { return Option.empty(); } GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); eventTime = updateEventTime(incomingRecord, properties); - return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord); + return Option.of(incomingRecord); } private static Option updateEventTime(GenericRecord record, Properties properties) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java index 7c8efb66e5cb6..b750cffb6aee0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java @@ -46,7 +46,7 @@ public EventTimeAvroPayload(Option record) { @Override public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException { - if (recordBytes.length == 0) { + if (recordBytes.length == 0 || isDeletedRecord) { return Option.empty(); } @@ -61,17 +61,16 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue /* * Now check if the incoming record is a delete record. */ - return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord); + return Option.of(incomingRecord); } @Override public Option getInsertValue(Schema schema, Properties properties) throws IOException { - if (recordBytes.length == 0) { + if (recordBytes.length == 0 || isDeletedRecord) { return Option.empty(); } - GenericRecord incomingRecord = bytesToAvro(recordBytes, schema); - return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord); + return Option.of(bytesToAvro(recordBytes, schema)); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java index de653054cdfe2..a1318c462c1ac 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java @@ -51,6 +51,15 @@ public HoodieAvroRecord(HoodieRecord record) { super(record); } + public HoodieAvroRecord( + HoodieKey key, + T data, + HoodieOperation operation, + HoodieRecordLocation currentLocation, + HoodieRecordLocation newLocation) { + super(key, data, operation, currentLocation, newLocation); + } + public HoodieAvroRecord() { } @@ -113,14 +122,14 @@ public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema Option avroRecordPayloadOpt = getData().getInsertValue(recordSchema, props); GenericRecord avroPayloadInNewSchema = HoodieAvroUtils.rewriteRecord((GenericRecord) avroRecordPayloadOpt.get(), targetSchema); - return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroPayloadInNewSchema), getOperation()); + return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroPayloadInNewSchema), getOperation(), this.currentLocation, this.newLocation); } @Override public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map renameCols) throws IOException { GenericRecord oldRecord = (GenericRecord) getData().getInsertValue(recordSchema, props).get(); GenericRecord rewriteRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(oldRecord, newSchema, renameCols); - return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(rewriteRecord), getOperation()); + return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(rewriteRecord), getOperation(), this.currentLocation, this.newLocation); } @Override @@ -133,30 +142,36 @@ public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, } }); - return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroRecordPayload), getOperation()); + return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroRecordPayload), getOperation(), this.currentLocation, this.newLocation); } @Override public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, String keyFieldName) throws IOException { GenericRecord avroRecordPayload = (GenericRecord) getData().getInsertValue(recordSchema, props).get(); avroRecordPayload.put(keyFieldName, StringUtils.EMPTY_STRING); - return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroRecordPayload), getOperation()); + return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroRecordPayload), getOperation(), this.currentLocation, this.newLocation); } @Override public boolean isDelete(Schema recordSchema, Properties props) throws IOException { - return !getData().getInsertValue(recordSchema, props).isPresent(); + if (this.data instanceof BaseAvroPayload) { + return ((BaseAvroPayload) this.data).isDeleted(recordSchema, props); + } else { + return !this.data.getInsertValue(recordSchema, props).isPresent(); + } } @Override public boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException { - Option insertRecord = getData().getInsertValue(recordSchema, props); - // just skip the ignored record - if (insertRecord.isPresent() && insertRecord.get().equals(SENTINEL)) { - return true; - } else { - return false; + HoodieRecordPayload recordPayload = getData(); + // NOTE: Currently only records borne by [[ExpressionPayload]] can currently be ignored, + // as such, we limit exposure of this method only to such payloads + if (recordPayload instanceof BaseAvroPayload && ((BaseAvroPayload) recordPayload).canProduceSentinel()) { + Option insertRecord = recordPayload.getInsertValue(recordSchema, props); + return insertRecord.isPresent() && insertRecord.get().equals(SENTINEL); } + + return false; } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java index 778186d4bc871..255a2b2a10fe7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java @@ -124,12 +124,12 @@ public String getFieldName() { /** * Current location of record on storage. Filled in by looking up index */ - private HoodieRecordLocation currentLocation; + protected HoodieRecordLocation currentLocation; /** * New location of record on storage, after written. */ - private HoodieRecordLocation newLocation; + protected HoodieRecordLocation newLocation; /** * Indicates whether the object is sealed. @@ -154,6 +154,19 @@ public HoodieRecord(HoodieKey key, T data, HoodieOperation operation) { this.operation = operation; } + public HoodieRecord( + HoodieKey key, + T data, + HoodieOperation operation, + HoodieRecordLocation currentLocation, + HoodieRecordLocation newLocation) { + this.key = key; + this.data = data; + this.currentLocation = currentLocation; + this.newLocation = newLocation; + this.operation = operation; + } + public HoodieRecord(HoodieRecord record) { this(record.key, record.data, record.operation); this.currentLocation = record.currentLocation; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java index 5268d76281394..a99e3005f1f53 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java @@ -69,31 +69,11 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue @Override public Option getInsertValue(Schema schema) throws IOException { - if (recordBytes.length == 0) { + if (recordBytes.length == 0 || isDeletedRecord) { return Option.empty(); } - IndexedRecord indexedRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); - if (isDeleteRecord((GenericRecord) indexedRecord)) { - return Option.empty(); - } else { - return Option.of(indexedRecord); - } - } - /** - * @param genericRecord instance of {@link GenericRecord} of interest. - * @returns {@code true} if record represents a delete record. {@code false} otherwise. - */ - protected boolean isDeleteRecord(GenericRecord genericRecord) { - final String isDeleteKey = HoodieRecord.HOODIE_IS_DELETED_FIELD; - // Modify to be compatible with new version Avro. - // The new version Avro throws for GenericRecord.get if the field name - // does not exist in the schema. - if (genericRecord.getSchema().getField(isDeleteKey) == null) { - return false; - } - Object deleteMarker = genericRecord.get(isDeleteKey); - return (deleteMarker instanceof Boolean && (boolean) deleteMarker); + return Option.of((IndexedRecord) HoodieAvroUtils.bytesToAvro(recordBytes, schema)); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java index daa40acc76404..7871e4515634a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java @@ -143,7 +143,6 @@ private Option mergeOldRecord(IndexedRecord oldRecord, Schema schema, boolean isOldRecordNewer) throws IOException { Option recordOption = getInsertValue(schema); - if (!recordOption.isPresent()) { // use natural order for delete record return Option.empty(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java index fb850bace7d48..b7756a490bf31 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java @@ -86,7 +86,7 @@ public HoodieRecordPayload createPayload(GenericRecord record) throws Excepti public HoodieRecordPayload createDeletePayload(BaseAvroPayload payload) throws Exception { if (shouldCombine) { - return (HoodieRecordPayload) constructor.newInstance(null, payload.orderingVal); + return (HoodieRecordPayload) constructor.newInstance(null, payload.getOrderingVal()); } else { return (HoodieRecordPayload) this.constructor.newInstance(Option.empty()); } diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java index 453cbb4e748ac..59674b928fdfa 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java @@ -18,7 +18,9 @@ package org.apache.hudi; -import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -26,10 +28,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; import org.apache.spark.sql.Row; import java.io.IOException; @@ -239,8 +237,8 @@ public void close() { private static Option convertToString(HoodieRecord record) { try { - String str = HoodieAvroUtils - .bytesToAvro(((OverwriteWithLatestAvroPayload) record.getData()).recordBytes, DataGenerator.avroSchema) + String str = ((OverwriteWithLatestAvroPayload) record.getData()) + .getInsertValue(DataGenerator.avroSchema) .toString(); str = "{" + str.substring(str.indexOf("\"ts\":")); return Option.of(str.replaceAll("}", ", \"partitionpath\": \"" + record.getPartitionPath() + "\"}")); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala index 0f11cbf954374..5d8e224477aa1 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala @@ -24,10 +24,10 @@ import org.apache.hudi.AvroConversionUtils import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro +import org.apache.hudi.common.model.BaseAvroPayload.isDeleteRecord import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodiePayloadProps, HoodieRecord} import org.apache.hudi.common.util.{ValidationUtils, Option => HOption} import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.io.HoodieWriteHandle import org.apache.hudi.sql.IExpressionEvaluator import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters} import org.apache.spark.sql.catalyst.expressions.Expression @@ -35,8 +35,8 @@ import org.apache.spark.sql.hudi.SerDeUtils import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.{getEvaluator, getMergedSchema, setWriteSchema} import org.apache.spark.sql.types.{StructField, StructType} -import java.util.{Base64, Properties} import java.util.function.Function +import java.util.{Base64, Properties} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -46,7 +46,7 @@ import scala.collection.mutable.ArrayBuffer * match and not-match actions and compute the final record to write. * * If there is no condition match the record, ExpressionPayload will return - * a HoodieWriteHandle.IGNORE_RECORD, and the write handles will ignore this record. + * a [[HoodieRecord.SENTINEL]], and the write handles will ignore this record. */ class ExpressionPayload(record: GenericRecord, orderingVal: Comparable[_]) @@ -77,11 +77,14 @@ class ExpressionPayload(record: GenericRecord, processMatchedRecord(joinSqlRecord, Some(targetRecord), properties) } + override def canProduceSentinel: Boolean = true + /** * Process the matched record. Firstly test if the record matched any of the update-conditions, * if matched, return the update assignments result. Secondly, test if the record matched * delete-condition, if matched then return a delete record. Finally if no condition matched, - * return a {@link HoodieWriteHandle.IGNORE_RECORD} which will be ignored by HoodieWriteHandle. + * return a [[HoodieRecord.SENTINEL]] which will be ignored by HoodieWriteHandle. + * * @param inputRecord The input record to process. * @param targetRecord The origin exist record. * @param properties The properties. @@ -140,7 +143,7 @@ class ExpressionPayload(record: GenericRecord, /** * Process the not-matched record. Test if the record matched any of insert-conditions, * if matched then return the result of insert-assignment. Or else return a - * {@link HoodieWriteHandle.IGNORE_RECORD} which will be ignored by HoodieWriteHandle. + * [[HoodieRecord.SENTINEL]] which will be ignored by HoodieWriteHandle. * * @param inputRecord The input record to process. * @param properties The properties. @@ -173,6 +176,16 @@ class ExpressionPayload(record: GenericRecord, } } + override def isDeleted(schema: Schema, props: Properties): Boolean = { + val deleteConditionText = props.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION) + val isUpdateRecord = props.getProperty(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, "false").toBoolean + val isDeleteOnCondition= if (isUpdateRecord && deleteConditionText != null) { + !getInsertValue(schema, props).isPresent + } else false + + isDeletedRecord || isDeleteOnCondition + } + override def getInsertValue(schema: Schema, properties: Properties): HOption[IndexedRecord] = { val incomingRecord = bytesToAvro(recordBytes, schema) if (isDeleteRecord(incomingRecord)) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala index eb1339ad2f0f0..02cb46721c27d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala @@ -110,7 +110,7 @@ class TestHoodieRecordSerialization extends SparkClientFunctionalTestHarness { val avroIndexedRecord = new HoodieAvroIndexedRecord(key, avroRecord) Seq( - (legacyRecord, 527), + (legacyRecord, 528), (avroIndexedRecord, 389) ) foreach { case (record, expectedSize) => routine(record, expectedSize) } }