diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java index 1c13a11ee5990..27e744c4925b6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java @@ -87,6 +87,33 @@ * id ts name price * 1 2 name_1 price_1 * + * + *
Gotchas: + *
In cases where a batch of records is preCombine before combineAndGetUpdateValue with the underlying records to be updated located in parquet files, the end states of records might not be as how + * one will expect when applying a straightforward partial update. + * + *
Gotchas-Example: + *
+ * -- Insertion order of records: + * INSERT INTO t1 VALUES (1, 'a1', 10, 1000); -- (1) + * INSERT INTO t1 VALUES (1, 'a1', 11, 999), (1, 'a1_0', null, 1001); -- (2) + * + * SELECT id, name, price, _ts FROM t1; + * -- One would the results to return: + * -- 1 a1_0 10.0 1001 + + * -- However, the results returned are: + * -- 1 a1_0 11.0 1001 + * + * -- This occurs as preCombine is applied on (2) first to return: + * -- 1 a1_0 11.0 1001 + * + * -- And this then combineAndGetUpdateValue with the existing oldValue: + * -- 1 a1_0 10.0 1000 + * + * -- To return: + * -- 1 a1_0 11.0 1001 + **/ public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java index 6431b63899f2f..28313f150c81f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java @@ -18,13 +18,13 @@ package org.apache.hudi.common.model; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; - import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.util.Option; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -223,4 +223,87 @@ public void testUseLatestRecordMetaValue() throws IOException { assertEquals(mergedRecord2.get("_hoodie_commit_time").toString(), record2.get("_hoodie_commit_time").toString()); assertEquals(mergedRecord2.get("_hoodie_commit_seqno").toString(), record2.get("_hoodie_commit_seqno").toString()); } + + /** + * This test is to highlight the gotcha, where there are differences in result of the two queries on the same input data below: + *
+ * Query A (No precombine): + * + * INSERT INTO t1 VALUES (1, 'partition1', 1, false, NY0, ['A']); + * INSERT INTO t1 VALUES (1, 'partition1', 0, false, NY1, ['A']); + * INSERT INTO t1 VALUES (1, 'partition1', 2, false, NULL, ['A']); + * + * Final output of Query A: + * (1, 'partition1', 2, false, NY0, ['A']) + * + * Query B (preCombine invoked) + * INSERT INTO t1 VALUES (1, 'partition1', 1, false, NULL, ['A']); + * INSERT INTO t1 VALUES (1, 'partition1', 0, false, NY1, ['A']), (1, 'partition1', 2, false, NULL, ['A']); + * + * Final output of Query B: + * (1, 'partition1', 2, false, NY1, ['A']) + *+ * + * + * @throws IOException + */ + @Test + public void testPartialUpdateGotchas() throws IOException { + Properties properties = new Properties(); + properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts"); + + GenericRecord record1 = new GenericData.Record(schema); + record1.put("id", "1"); + record1.put("partition", "partition1"); + record1.put("ts", 1L); + record1.put("_hoodie_is_deleted", false); + record1.put("city", "NY0"); + record1.put("child", Arrays.asList("A")); + + GenericRecord record2 = new GenericData.Record(schema); + record2.put("id", "1"); + record2.put("partition", "partition1"); + record2.put("ts", 0L); + record2.put("_hoodie_is_deleted", false); + record2.put("city", "NY1"); + record2.put("child", Arrays.asList("B")); + + GenericRecord record3 = new GenericData.Record(schema); + record3.put("id", "1"); + record3.put("partition", "partition1"); + record3.put("ts", 2L); + record3.put("_hoodie_is_deleted", false); + record3.put("city", null); + record3.put("child", Arrays.asList("A")); + + // define expected outputs + GenericRecord pureCombineOutput = new GenericData.Record(schema); + pureCombineOutput.put("id", "1"); + pureCombineOutput.put("partition", "partition1"); + pureCombineOutput.put("ts", 2L); + pureCombineOutput.put("_hoodie_is_deleted", false); + pureCombineOutput.put("city", "NY0"); + pureCombineOutput.put("child", Arrays.asList("A")); + + GenericRecord outputWithPreCombineUsed = new GenericData.Record(schema); + outputWithPreCombineUsed.put("id", "1"); + outputWithPreCombineUsed.put("partition", "partition1"); + outputWithPreCombineUsed.put("ts", 2L); + outputWithPreCombineUsed.put("_hoodie_is_deleted", false); + outputWithPreCombineUsed.put("city", "NY1"); + outputWithPreCombineUsed.put("child", Arrays.asList("A")); + + PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 0L); + PartialUpdateAvroPayload payload3 = new PartialUpdateAvroPayload(record3, 2L); + + // query A (no preCombine) + IndexedRecord firstCombineOutput = payload2.combineAndGetUpdateValue(record1, schema, properties).get(); + IndexedRecord secondCombineOutput = payload3.combineAndGetUpdateValue(firstCombineOutput, schema, properties).get(); + assertEquals(pureCombineOutput, secondCombineOutput); + + // query B (preCombine invoked) + PartialUpdateAvroPayload payloadAfterPreCombine = payload3.preCombine(payload2, schema, properties); + IndexedRecord finalOutputWithPreCombine = payloadAfterPreCombine.combineAndGetUpdateValue(record1, schema, properties).get(); + assertEquals(outputWithPreCombineUsed, finalOutputWithPreCombine); + } }