diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index 6d5372b47297d..b93542faec81b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -22,7 +22,6 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -80,11 +79,5 @@ public I combineOnCondition(
* @param parallelism parallelism or partitions to be used while reducing/deduplicating
* @return Collection of HoodieRecord already be deduplicated
*/
- public I deduplicateRecords(
- I records, HoodieTable table, int parallelism) {
- return deduplicateRecords(records, table.getIndex(), parallelism);
- }
-
- public abstract I deduplicateRecords(
- I records, HoodieIndex, ?> index, int parallelism);
+ public abstract I deduplicateRecords(I records, HoodieTable table, int parallelism);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index b56d39b8e3679..6251668a25099 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -26,9 +26,10 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
+import org.apache.avro.Schema;
+
public class HoodieWriteHelper extends BaseWriteHelper>,
HoodieData, HoodieData, R> {
@@ -51,8 +52,10 @@ protected HoodieData> tag(HoodieData> dedupedRec
@Override
public HoodieData> deduplicateRecords(
- HoodieData> records, HoodieIndex, ?> index, int parallelism) {
- boolean isIndexingGlobal = index.isGlobal();
+ HoodieData> records,
+ HoodieTable>, HoodieData, HoodieData> table,
+ int parallelism) {
+ boolean isIndexingGlobal = table.getIndex().isGlobal();
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their partitionPath
@@ -60,8 +63,8 @@ public HoodieData> deduplicateRecords(
return Pair.of(key, record);
}).reduceByKey((rec1, rec2) -> {
@SuppressWarnings("unchecked")
- T reducedData = (T) rec2.getData().preCombine(rec1.getData());
- HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
+ T reducedData = (T) rec2.getData().preCombine(rec1.getData(), null, new Schema.Parser().parse(table.getConfig().getWriteSchema()));
+ HoodieKey reducedKey = rec2.getData().compareTo(rec1.getData()) < 0 ? rec1.getKey() : rec2.getKey();
return new HoodieAvroRecord<>(reducedKey, reducedData);
}, parallelism).map(Pair::getRight);
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index 9c17e77b91831..f6b23b9773263 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -28,10 +28,11 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.avro.Schema;
+
import java.time.Duration;
import java.time.Instant;
import java.util.List;
@@ -89,7 +90,9 @@ protected List> tag(List> dedupedRecords, Hoodie
@Override
public List> deduplicateRecords(
- List> records, HoodieIndex, ?> index, int parallelism) {
+ List> records,
+ HoodieTable>, List, List> table,
+ int parallelism) {
// If index used is global, then records are expected to differ in their partitionPath
Map
+ */
+public class PartialOverwriteWithLatestAvroPayload extends OverwriteWithLatestAvroPayload {
+
+ public PartialOverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
+ super(record, orderingVal);
+ }
+
+ public PartialOverwriteWithLatestAvroPayload(Option record) {
+ super(record); // natural order
+ }
+
+ @Override
+ public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
+ if (recordBytes.length == 0) {
+ return Option.empty();
+ }
+
+ GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
+ GenericRecord currentRecord = (GenericRecord) currentValue;
+ if (isDeleteRecord(incomingRecord)) {
+ return Option.empty();
+ }
+ return Option.of(overwriteWithNonNullValue(schema, currentRecord, incomingRecord));
+ }
+
+ @Override
+ public int compareTo(OverwriteWithLatestAvroPayload oldValue) {
+ return this.orderingVal.compareTo(oldValue.orderingVal);
+ }
+
+ @Override
+ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Properties properties, Schema schema) {
+ if (null == schema) {
+ // using default preCombine logic
+ return super.preCombine(oldValue);
+ }
+
+ try {
+ Option incomingOption = this.getInsertValue(schema);
+ Option oldRecordOption = oldValue.getInsertValue(schema);
+
+ if (incomingOption.isPresent() && oldRecordOption.isPresent()) {
+ boolean inComingRecordIsLatest = this.compareTo(oldValue) >= 0;
+ // ordering two records by ordering value
+ GenericRecord firstRecord = (GenericRecord) (inComingRecordIsLatest ? oldRecordOption.get() : incomingOption.get());
+ GenericRecord secondRecord = (GenericRecord) (inComingRecordIsLatest ? incomingOption.get() : oldRecordOption.get());
+ GenericRecord mergedRecord = overwriteWithNonNullValue(schema, firstRecord, secondRecord);
+ return new PartialOverwriteWithLatestAvroPayload(mergedRecord, inComingRecordIsLatest ? this.orderingVal : oldValue.orderingVal);
+ } else {
+ return super.preCombine(oldValue);
+ }
+ } catch (IOException e) {
+ return super.preCombine(oldValue);
+ }
+ }
+
+ private GenericRecord mergeRecord(Schema schema, GenericRecord first, GenericRecord second, BiFunction mergeFunc) {
+ schema.getFields().forEach(field -> {
+ Object firstValue = first.get(field.name());
+ Object secondValue = second.get(field.name());
+ first.put(field.name(), mergeFunc.apply(firstValue, secondValue));
+ });
+ return first;
+ }
+
+ /**
+ * Merge two records, the merged value of each filed will adopt the filed value from secondRecord if the value is not null, otherwise, adopt the filed value from firstRecord.
+ *
+ * @param schema record schema to loop fields
+ * @param firstRecord the base record need to be updated
+ * @param secondRecord the new record provide new field value
+ * @return merged records
+ */
+ private GenericRecord overwriteWithNonNullValue(Schema schema, GenericRecord firstRecord, GenericRecord secondRecord) {
+ return mergeRecord(schema, firstRecord, secondRecord, (first, second) -> Objects.isNull(second) ? first : second);
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 23623976d915d..1efd14a37a273 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -458,7 +458,7 @@ public static void initializeBootstrapDirsIfNotExists(Configuration hadoopConf,
}
- // Create bootstrap index by partition folder if it does not exist
+ // Create bootstrap index by fields folder if it does not exist
final Path bootstrap_index_folder_by_fileids =
new Path(basePath, HoodieTableMetaClient.BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH);
if (!fs.exists(bootstrap_index_folder_by_fileids)) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index ed18736443288..b98a9b2b62063 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -146,7 +146,7 @@ protected void processNextRecord(HoodieRecord extends HoodieRecordPayload> hoo
HoodieRecord extends HoodieRecordPayload> oldRecord = records.get(key);
HoodieRecordPayload oldValue = oldRecord.getData();
- HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue);
+ HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue, null, this.readerSchema);
// If combinedValue is oldValue, no need rePut oldRecord
if (combinedValue != oldValue) {
HoodieOperation operation = hoodieRecord.getOperation();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index d912525fe9271..ded8cf6d79d64 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -116,7 +116,7 @@ protected HoodieActiveTimeline(HoodieTableMetaClient metaClient, Set inc
}
protected HoodieActiveTimeline(HoodieTableMetaClient metaClient, Set includedExtensions,
- boolean applyLayoutFilters) {
+ boolean applyLayoutFilters) {
// Filter all the filter in the metapath and include only the extensions passed and
// convert them into HoodieInstant
try {
@@ -170,7 +170,7 @@ public void createRequestedReplaceCommit(String instantTime, String actionType)
LOG.info("Creating a new instant " + instant);
// Create the request replace file
createFileInMetaPath(instant.getFileName(),
- TimelineMetadataUtils.serializeRequestedReplaceMetadata(new HoodieRequestedReplaceMetadata()), false);
+ TimelineMetadataUtils.serializeRequestedReplaceMetadata(new HoodieRequestedReplaceMetadata()), false);
} catch (IOException e) {
throw new HoodieIOException("Error create requested replace commit ", e);
}
@@ -383,7 +383,7 @@ public HoodieInstant transitionCompactionRequestedToInflight(HoodieInstant reque
* Transition Compaction State from inflight to Committed.
*
* @param inflightInstant Inflight instant
- * @param data Extra Metadata
+ * @param data Extra Metadata
* @return commit instant
*/
public HoodieInstant transitionCompactionInflightToComplete(HoodieInstant inflightInstant, Option data) {
@@ -408,7 +408,7 @@ private void createFileInAuxiliaryFolder(HoodieInstant instant, Option d
* Transition Clean State from inflight to Committed.
*
* @param inflightInstant Inflight instant
- * @param data Extra Metadata
+ * @param data Extra Metadata
* @return commit instant
*/
public HoodieInstant transitionCleanInflightToComplete(HoodieInstant inflightInstant, Option data) {
@@ -424,7 +424,7 @@ public HoodieInstant transitionCleanInflightToComplete(HoodieInstant inflightIns
* Transition Clean State from requested to inflight.
*
* @param requestedInstant requested instant
- * @param data Optional data to be stored
+ * @param data Optional data to be stored
* @return commit instant
*/
public HoodieInstant transitionCleanRequestedToInflight(HoodieInstant requestedInstant, Option data) {
@@ -439,7 +439,7 @@ public HoodieInstant transitionCleanRequestedToInflight(HoodieInstant requestedI
* Transition Rollback State from inflight to Committed.
*
* @param inflightInstant Inflight instant
- * @param data Extra Metadata
+ * @param data Extra Metadata
* @return commit instant
*/
public HoodieInstant transitionRollbackInflightToComplete(HoodieInstant inflightInstant, Option data) {
@@ -484,7 +484,7 @@ public HoodieInstant transitionRestoreRequestedToInflight(HoodieInstant requeste
* Transition replace requested file to replace inflight.
*
* @param requestedInstant Requested instant
- * @param data Extra Metadata
+ * @param data Extra Metadata
* @return inflight instant
*/
public HoodieInstant transitionReplaceRequestedToInflight(HoodieInstant requestedInstant, Option data) {
@@ -500,7 +500,7 @@ public HoodieInstant transitionReplaceRequestedToInflight(HoodieInstant requeste
* Transition replace inflight to Committed.
*
* @param inflightInstant Inflight instant
- * @param data Extra Metadata
+ * @param data Extra Metadata
* @return commit instant
*/
public HoodieInstant transitionReplaceInflightToComplete(HoodieInstant inflightInstant, Option data) {
@@ -537,7 +537,7 @@ private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant,
}
private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option data,
- boolean allowRedundantTransitions) {
+ boolean allowRedundantTransitions) {
ValidationUtils.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()));
try {
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
@@ -614,7 +614,7 @@ public void transitionRequestedToInflight(HoodieInstant requested, Option content,
- boolean allowRedundantTransitions) {
+ boolean allowRedundantTransitions) {
HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, requested.getAction(), requested.getTimestamp());
ValidationUtils.checkArgument(requested.isRequested(), "Instant " + requested + " in wrong state");
transitionState(requested, inflight, content, allowRedundantTransitions);
@@ -731,26 +731,17 @@ private void createFileInMetaPath(String filename, Option content, boole
/**
* Creates a new file in timeline with overwrite set to false. This ensures
* files are created only once and never rewritten
+ *
* @param fullPath File Path
- * @param content Content to be stored
+ * @param content Content to be stored
*/
private void createImmutableFileInPath(Path fullPath, Option content) {
- FSDataOutputStream fsout = null;
- try {
- fsout = metaClient.getFs().create(fullPath, false);
+ try (FSDataOutputStream fsout = metaClient.getFs().create(fullPath, false)) {
if (content.isPresent()) {
fsout.write(content.get());
}
} catch (IOException e) {
throw new HoodieIOException("Failed to create file " + fullPath, e);
- } finally {
- try {
- if (null != fsout) {
- fsout.close();
- }
- } catch (IOException e) {
- throw new HoodieIOException("Failed to close file " + fullPath, e);
- }
}
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/PartialOverwriteWithLatestAvroPayloadTest.java b/hudi-common/src/test/java/org/apache/hudi/common/model/PartialOverwriteWithLatestAvroPayloadTest.java
new file mode 100644
index 0000000000000..3a7b1dd8ec5c5
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/model/PartialOverwriteWithLatestAvroPayloadTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class PartialOverwriteWithLatestAvroPayloadTest {
+ private Schema schema;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ schema = Schema.createRecord("record", null, null, false, Arrays.asList(
+ new Schema.Field("id", Schema.create(Schema.Type.STRING), "", null),
+ new Schema.Field("partition", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", ""),
+ new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null),
+ new Schema.Field("_hoodie_is_deleted", Schema.create(Schema.Type.BOOLEAN), "", false),
+ new Schema.Field("city", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", null),
+ new Schema.Field("child", Schema.createArray(Schema.create(Schema.Type.STRING)), "", Collections.emptyList())
+ ));
+ }
+
+ @Test
+ public void testActiveRecordsWithoutSchema() throws IOException {
+ GenericRecord record1 = new GenericData.Record(schema);
+ record1.put("id", "1");
+ record1.put("partition", "partition1");
+ record1.put("ts", 0L);
+ record1.put("_hoodie_is_deleted", false);
+ record1.put("city", "NY0");
+ record1.put("child", Arrays.asList("A"));
+
+ GenericRecord record2 = new GenericData.Record(schema);
+ record2.put("id", "2");
+ record2.put("partition", "");
+ record2.put("ts", 1L);
+ record2.put("_hoodie_is_deleted", false);
+ record2.put("city", null);
+ record2.put("child", Collections.emptyList());
+
+ GenericRecord record3 = new GenericData.Record(schema);
+ record3.put("id", "2");
+ record3.put("partition", "");
+ record3.put("ts", 1L);
+ record3.put("_hoodie_is_deleted", false);
+ record3.put("city", "NY0");
+ record3.put("child", Collections.emptyList());
+
+
+ PartialOverwriteWithLatestAvroPayload payload1 = new PartialOverwriteWithLatestAvroPayload(record1, 1);
+ PartialOverwriteWithLatestAvroPayload payload2 = new PartialOverwriteWithLatestAvroPayload(record2, 2);
+ assertEquals(payload1.preCombine(payload2), payload2);
+ assertEquals(payload2.preCombine(payload1), payload2);
+
+ assertEquals(record1, payload1.getInsertValue(schema).get());
+ assertEquals(record2, payload2.getInsertValue(schema).get());
+
+ assertEquals(payload1.combineAndGetUpdateValue(record2, schema).get(), record1);
+ assertEquals(payload2.combineAndGetUpdateValue(record1, schema).get(), record3);
+ }
+
+ @Test
+ public void testCompareFunction() {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("id", "1");
+ record.put("partition", "partition1");
+ record.put("ts", 0L);
+ record.put("_hoodie_is_deleted", false);
+ record.put("city", "NY0");
+ record.put("child", Arrays.asList("A"));
+
+ PartialOverwriteWithLatestAvroPayload payload1 = new PartialOverwriteWithLatestAvroPayload(record, 1);
+ PartialOverwriteWithLatestAvroPayload payload2 = new PartialOverwriteWithLatestAvroPayload(record, 2);
+
+ assertEquals(payload1.compareTo(payload2), -1);
+ assertEquals(payload2.compareTo(payload1), 1);
+ assertEquals(payload1.compareTo(payload1), 0);
+ }
+
+ @Test
+ public void testActiveRecordsWithSchema() throws IOException {
+ GenericRecord record1 = new GenericData.Record(schema);
+ record1.put("id", "1");
+ record1.put("partition", "partition1");
+ record1.put("ts", 0L);
+ record1.put("_hoodie_is_deleted", false);
+ record1.put("city", null);
+ record1.put("child", Arrays.asList("A"));
+
+ GenericRecord record2 = new GenericData.Record(schema);
+ record2.put("id", "2");
+ record2.put("partition", null);
+ record2.put("ts", 1L);
+ record2.put("_hoodie_is_deleted", false);
+ record2.put("city", "NY");
+ record2.put("child", Collections.emptyList());
+
+ GenericRecord expectedRecord = new GenericData.Record(schema);
+ expectedRecord.put("id", "2");
+ expectedRecord.put("partition", "partition1");
+ expectedRecord.put("ts", 1L);
+ expectedRecord.put("_hoodie_is_deleted", false);
+ expectedRecord.put("city", "NY");
+ expectedRecord.put("child", Collections.emptyList());
+
+ PartialOverwriteWithLatestAvroPayload payload1 = new PartialOverwriteWithLatestAvroPayload(record1, 1);
+ PartialOverwriteWithLatestAvroPayload payload2 = new PartialOverwriteWithLatestAvroPayload(record2, 2);
+ PartialOverwriteWithLatestAvroPayload expectedPayload = new PartialOverwriteWithLatestAvroPayload(expectedRecord, 2);
+ assertArrayEquals(payload1.preCombine(payload2, null, schema).recordBytes, expectedPayload.recordBytes);
+ assertArrayEquals(payload2.preCombine(payload1, null, schema).recordBytes, expectedPayload.recordBytes);
+ assertEquals(payload1.preCombine(payload2, null, schema).orderingVal, expectedPayload.orderingVal);
+ assertEquals(payload2.preCombine(payload1, null, schema).orderingVal, expectedPayload.orderingVal);
+ }
+
+ @Test
+ public void testDeletedRecord() throws IOException {
+ GenericRecord record1 = new GenericData.Record(schema);
+ record1.put("id", "1");
+ record1.put("partition", "partition0");
+ record1.put("ts", 0L);
+ record1.put("_hoodie_is_deleted", false);
+ record1.put("city", "NY0");
+ record1.put("child", Collections.emptyList());
+
+ GenericRecord delRecord1 = new GenericData.Record(schema);
+ delRecord1.put("id", "2");
+ delRecord1.put("partition", "partition1");
+ delRecord1.put("ts", 1L);
+ delRecord1.put("_hoodie_is_deleted", true);
+ delRecord1.put("city", "NY0");
+ delRecord1.put("child", Collections.emptyList());
+
+ GenericRecord record2 = new GenericData.Record(schema);
+ record2.put("id", "1");
+ record2.put("partition", "partition0");
+ record2.put("ts", 0L);
+ record2.put("_hoodie_is_deleted", true);
+ record2.put("city", "NY0");
+ record2.put("child", Collections.emptyList());
+
+ PartialOverwriteWithLatestAvroPayload payload1 = new PartialOverwriteWithLatestAvroPayload(record1, 1);
+ PartialOverwriteWithLatestAvroPayload payload2 = new PartialOverwriteWithLatestAvroPayload(delRecord1, 2);
+
+ assertEquals(payload1.preCombine(payload2, null, schema), payload2);
+ assertEquals(payload2.preCombine(payload1, null, schema), payload2);
+
+ assertEquals(payload1.preCombine(payload2, null, null), payload2);
+ assertEquals(payload2.preCombine(payload1, null, null), payload2);
+ }
+
+}
diff --git a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java
index 9f8e29d68773f..853fd96cf37f8 100644
--- a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java
+++ b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java
@@ -18,7 +18,6 @@
package org.apache.hudi.examples.quickstart;
-import org.apache.hudi.QuickstartUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/QuickstartUtils.java
similarity index 99%
rename from hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
rename to hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/QuickstartUtils.java
index 56ad5a8b66c82..2e7084bea36a2 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
+++ b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/QuickstartUtils.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi;
+package org.apache.hudi.examples.quickstart;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
diff --git a/hudi-examples/hudi-examples-spark/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala b/hudi-examples/hudi-examples-spark/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala
index 33c085cba3eb6..7de770d709402 100644
--- a/hudi-examples/hudi-examples-spark/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala
+++ b/hudi-examples/hudi-examples-spark/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala
@@ -20,7 +20,7 @@ package org.apache.hudi.examples.spark
import org.apache.hudi.DataSourceReadOptions.{BEGIN_INSTANTTIME, END_INSTANTTIME, QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL}
import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, PARTITIONS_TO_DELETE, OPERATION, DELETE_PARTITION_OPERATION_OPT_VAL, DELETE_OPERATION_OPT_VAL}
-import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs
+import org.apache.hudi.examples.quickstart.QuickstartUtils.getQuickstartWriteConfigs
import org.apache.hudi.common.model.HoodieAvroPayload
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.examples.common.{HoodieExampleDataGenerator, HoodieExampleSparkUtils}
diff --git a/hudi-examples/hudi-examples-spark/src/main/scala/org/apache/hudi/examples/spark/HoodieMorCompactionJob.scala b/hudi-examples/hudi-examples-spark/src/main/scala/org/apache/hudi/examples/spark/HoodieMorCompactionJob.scala
index 8a2c8715b30eb..980a29acc05b4 100644
--- a/hudi-examples/hudi-examples-spark/src/main/scala/org/apache/hudi/examples/spark/HoodieMorCompactionJob.scala
+++ b/hudi-examples/hudi-examples-spark/src/main/scala/org/apache/hudi/examples/spark/HoodieMorCompactionJob.scala
@@ -20,7 +20,7 @@
package org.apache.hudi.examples.spark
import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE}
-import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs
+import org.apache.hudi.examples.quickstart.QuickstartUtils.getQuickstartWriteConfigs
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieRecordPayload, HoodieTableType}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestQuickstartUtils.java b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartUtils.java
similarity index 97%
rename from hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestQuickstartUtils.java
rename to hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartUtils.java
index 2042249ecd788..8dc010d12cb94 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestQuickstartUtils.java
+++ b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartUtils.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi;
+package org.apache.hudi.examples.quickstart;
import org.apache.hudi.exception.HoodieException;
import org.junit.jupiter.api.Assertions;
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index c2f54dd8aaffe..1c968dec9ac80 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -30,7 +30,6 @@
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
@@ -420,7 +419,7 @@ private boolean flushBucket(DataBucket bucket) {
List records = bucket.writeBuffer();
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
- records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
+ records = FlinkWriteHelper.newInstance().deduplicateRecords(records, this.writeClient.getHoodieTable(), -1);
}
bucket.preWrite(records);
final List writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
@@ -455,7 +454,7 @@ private void flushRemaining(boolean endInput) {
List records = bucket.writeBuffer();
if (records.size() > 0) {
if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
- records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
+ records = FlinkWriteHelper.newInstance().deduplicateRecords(records, this.writeClient.getHoodieTable(), -1);
}
bucket.preWrite(records);
writeStatus.addAll(writeFunction.apply(records, currentInstant));
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index 1fc8d393be6a9..4922fb3d679da 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -213,6 +213,7 @@ protected void loadRecords(String partitionPath) throws Exception {
if (!isValidFile(baseFile.getFileStatus())) {
return;
}
+
try (ClosableIterator iterator = fileUtils.getHoodieKeyIterator(this.hadoopConf, new Path(baseFile.getPath()))) {
iterator.forEachRemaining(hoodieKey -> {
output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))));
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index c4b83bf51aace..d61dfc41ce2dd 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -149,7 +149,7 @@ public void initializeState(FunctionInitializationContext context) {
if (ttl > 0) {
indexStateDesc.enableTimeToLive(StateTtlConfig.newBuilder(Time.milliseconds((long) ttl)).build());
}
- indexState = context.getKeyedStateStore().getState(indexStateDesc);
+ this.indexState = context.getKeyedStateStore().getState(indexStateDesc);
}
@Override
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 786a45cac7ac9..13543488c3a62 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -21,6 +21,7 @@
import org.apache.hudi.adapter.TestTableEnvs;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.PartialOverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.StreamerUtil;
@@ -1147,6 +1148,40 @@ void testParquetComplexTypes(String operation) {
assertRowsEquals(result, expected);
}
+ @ParameterizedTest
+ @ValueSource(strings = {"upsert"})
+ void testPartialUpdateFiles(String operation) {
+ TableEnvironment tableEnv = streamTableEnv;
+
+ String hoodieTableDDL = sql("t1")
+ .field("f_int int")
+ .field("version int")
+ .field("f1 varchar(10)")
+ .field("f2 varchar(10)")
+ .field("f3 varchar(10)")
+ .field("f4 array")
+ .field("f5 varchar(10)")
+ .field("f6_row row(f_row_f0 int, f_row_f1 varchar(10))")
+ .pkField("f_int")
+ .noPartition()
+ .option(FlinkOptions.PAYLOAD_CLASS_NAME, PartialOverwriteWithLatestAvroPayload.class.getCanonicalName())
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.OPERATION, operation)
+ .option(FlinkOptions.PRE_COMBINE, true)
+ .option(FlinkOptions.PRECOMBINE_FIELD, "version")
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ execInsertSql(tableEnv, "insert into t1(f_int, version, f1, f5, f6_row) values (1, 1, 'a', 'a1', row(1, 'row1-a')), (1, 2, 'a', 'a2', row(1, 'row1-b'))");
+ execInsertSql(tableEnv, "insert into t1(f_int, version, f2, f4, f6_row) values (1, 3, 'b', array[1, 1], row(2, 'row2'))");
+ execInsertSql(tableEnv, "insert into t1(f_int, version, f1, f3) values (1, 4, 'c', 'c')");
+
+ List result = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ final String expected = "[+I[1, 4, c, b, c, [1, 1], a2, +I[2, row2]]]";
+ assertRowsEquals(result, expected);
+ }
+
@ParameterizedTest
@ValueSource(strings = {"insert", "upsert", "bulk_insert"})
void testParquetComplexNestedRowTypes(String operation) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieSparkTypeUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieSparkTypeUtils.scala
new file mode 100644
index 0000000000000..d5d95872a6c2e
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieSparkTypeUtils.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.sql.types.{DataType, NumericType, StringType}
+
+// TODO unify w/ DataTypeUtils
+object HoodieSparkTypeUtils {
+
+ /**
+ * Checks whether casting expression of [[from]] [[DataType]] to [[to]] [[DataType]] will
+ * preserve ordering of the elements
+ */
+ def isCastPreservingOrdering(from: DataType, to: DataType): Boolean =
+ (from, to) match {
+ // NOTE: In the casting rules defined by Spark, only casting from String to Numeric
+ // (and vice versa) are the only casts that might break the ordering of the elements after casting
+ case (StringType, _: NumericType) => false
+ case (_: NumericType, StringType) => false
+
+ case _ => true
+ }
+}