Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,33 @@
* id ts name price
* 1 2 name_1 price_1
* </pre>
*
* <p>Gotchas:
* <p>In cases where a batch of records is preCombine before combineAndGetUpdateValue with the underlying records to be updated located in parquet files, the end states of records might not be as how
* one will expect when applying a straightforward partial update.
*
* <p>Gotchas-Example:
* <pre>
* -- Insertion order of records:
* INSERT INTO t1 VALUES (1, 'a1', 10, 1000); -- (1)
* INSERT INTO t1 VALUES (1, 'a1', 11, 999), (1, 'a1_0', null, 1001); -- (2)
*
* SELECT id, name, price, _ts FROM t1;
* -- One would the results to return:
* -- 1 a1_0 10.0 1001

* -- However, the results returned are:
* -- 1 a1_0 11.0 1001
*
* -- This occurs as preCombine is applied on (2) first to return:
* -- 1 a1_0 11.0 1001
*
* -- And this then combineAndGetUpdateValue with the existing oldValue:
* -- 1 a1_0 10.0 1000
*
* -- To return:
* -- 1 a1_0 11.0 1001
* </pre>
*/
public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

package org.apache.hudi.common.model;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.Option;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -223,4 +223,87 @@ public void testUseLatestRecordMetaValue() throws IOException {
assertEquals(mergedRecord2.get("_hoodie_commit_time").toString(), record2.get("_hoodie_commit_time").toString());
assertEquals(mergedRecord2.get("_hoodie_commit_seqno").toString(), record2.get("_hoodie_commit_seqno").toString());
}

/**
* This test is to highlight the gotcha, where there are differences in result of the two queries on the same input data below:
* <pre>
* Query A (No precombine):
*
* INSERT INTO t1 VALUES (1, 'partition1', 1, false, NY0, ['A']);
* INSERT INTO t1 VALUES (1, 'partition1', 0, false, NY1, ['A']);
* INSERT INTO t1 VALUES (1, 'partition1', 2, false, NULL, ['A']);
*
* Final output of Query A:
* (1, 'partition1', 2, false, NY0, ['A'])
*
* Query B (preCombine invoked)
* INSERT INTO t1 VALUES (1, 'partition1', 1, false, NULL, ['A']);
* INSERT INTO t1 VALUES (1, 'partition1', 0, false, NY1, ['A']), (1, 'partition1', 2, false, NULL, ['A']);
*
* Final output of Query B:
* (1, 'partition1', 2, false, NY1, ['A'])
* </pre>
*
*
* @throws IOException
*/
@Test
public void testPartialUpdateGotchas() throws IOException {
Properties properties = new Properties();
properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts");

GenericRecord record1 = new GenericData.Record(schema);
record1.put("id", "1");
record1.put("partition", "partition1");
record1.put("ts", 1L);
record1.put("_hoodie_is_deleted", false);
record1.put("city", "NY0");
record1.put("child", Arrays.asList("A"));

GenericRecord record2 = new GenericData.Record(schema);
record2.put("id", "1");
record2.put("partition", "partition1");
record2.put("ts", 0L);
record2.put("_hoodie_is_deleted", false);
record2.put("city", "NY1");
record2.put("child", Arrays.asList("B"));

GenericRecord record3 = new GenericData.Record(schema);
record3.put("id", "1");
record3.put("partition", "partition1");
record3.put("ts", 2L);
record3.put("_hoodie_is_deleted", false);
record3.put("city", null);
record3.put("child", Arrays.asList("A"));

// define expected outputs
GenericRecord pureCombineOutput = new GenericData.Record(schema);
pureCombineOutput.put("id", "1");
pureCombineOutput.put("partition", "partition1");
pureCombineOutput.put("ts", 2L);
pureCombineOutput.put("_hoodie_is_deleted", false);
pureCombineOutput.put("city", "NY0");
pureCombineOutput.put("child", Arrays.asList("A"));

GenericRecord outputWithPreCombineUsed = new GenericData.Record(schema);
outputWithPreCombineUsed.put("id", "1");
outputWithPreCombineUsed.put("partition", "partition1");
outputWithPreCombineUsed.put("ts", 2L);
outputWithPreCombineUsed.put("_hoodie_is_deleted", false);
outputWithPreCombineUsed.put("city", "NY1");
outputWithPreCombineUsed.put("child", Arrays.asList("A"));

PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 0L);
PartialUpdateAvroPayload payload3 = new PartialUpdateAvroPayload(record3, 2L);

// query A (no preCombine)
IndexedRecord firstCombineOutput = payload2.combineAndGetUpdateValue(record1, schema, properties).get();
IndexedRecord secondCombineOutput = payload3.combineAndGetUpdateValue(firstCombineOutput, schema, properties).get();
assertEquals(pureCombineOutput, secondCombineOutput);

// query B (preCombine invoked)
PartialUpdateAvroPayload payloadAfterPreCombine = payload3.preCombine(payload2, schema, properties);
IndexedRecord finalOutputWithPreCombine = payloadAfterPreCombine.combineAndGetUpdateValue(record1, schema, properties).get();
assertEquals(outputWithPreCombineUsed, finalOutputWithPreCombine);
}
}