Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
137 commits
Select commit Hold shift + click to select a range
d73d7f5
[HUDI-2815] add partial overwrite payload to support partial overwrit…
stayrascal Jan 30, 2022
6b6a60f
[HUDI-2815] add compareTo test case
stayrascal Feb 6, 2022
2fa2d57
Merge branch 'master' into HUDI-2815
stayrascal Feb 6, 2022
21df6fe
[HUDI-2815] fix conflict by changing HoodieRecord to HoodieAvroRecord
stayrascal Feb 7, 2022
940f6de
Merge remote-tracking branch 'origin/master' into HUDI-2815
stayrascal Feb 21, 2022
14edef0
[HUDI-2815] 1. passing the payload schema instead of embedding it in …
stayrascal Feb 21, 2022
ce561bc
[HUDI-2815] add test case for nest type for testing partial update
stayrascal Feb 21, 2022
c6f524e
[HUDI-2815] remove unused configuration and refactor partial update l…
stayrascal Feb 25, 2022
d3b3e05
[HUDI-2815] pass schema during precombine two record in compaction pr…
stayrascal Feb 26, 2022
10e080b
[MINOR] fix get builtin function issue from Hudi catalog
stayrascal Feb 27, 2022
a86b7ff
Merge branch 'master' into HUDI-2815
stayrascal Mar 8, 2022
d11c670
Merge branch 'master' into HUDI-2815
stayrascal Mar 16, 2022
715d4b0
Merge branch 'master' into HUDI-2815
stayrascal Mar 22, 2022
e89fd60
[HUDI-2815] fix the conflict and small refactor
stayrascal Mar 22, 2022
3f771d3
Merge branch 'master' into HUDI-2815
stayrascal Mar 25, 2022
b823e94
[HUDI-3521] Fixing kakfa key and value serializer value type from cla…
nsivabalan Feb 27, 2022
9c15335
[HUDI-3018] Adding validation to dataframe scheme to ensure reserved …
nsivabalan Feb 27, 2022
eef40bc
[MINOR] Change MINI_BATCH_SIZE to 2048 (#4862)
cuibo01 Feb 28, 2022
3a373c2
[HUDI-2917] rollback insert data appended to log file when using Hbas…
nsivabalan Feb 28, 2022
6fbf453
[HUDI-3528] Fix String convert issue and overwrite putAll method in T…
stayrascal Feb 28, 2022
1e236ba
[HUDI-3341] Fix log file reader for S3 with hadoop-aws 2.7.x (#4897)
yihua Feb 28, 2022
ac3e72a
[HUDI-3450] Avoid passing empty string spark master to hudi cli (#4844)
zhedoubushishi Feb 28, 2022
f1a8d0c
[HUDI-3418] Save timeout option for remote RemoteFileSystemView (#4809)
yuzhaojing Feb 28, 2022
3697d8c
[HUDI-3465] Add validation of column stats and bloom filters in Hoodi…
yihua Mar 1, 2022
975c463
[HUDI-3497] Adding Datatable validator tool (#4902)
nsivabalan Mar 1, 2022
46ea95d
[HUDI-3441] Add support for "marker delete" in hudi-cli (#4922)
XuQianJin-Stars Mar 1, 2022
4aaee39
[HUDI-3516] Implement record iterator for HoodieDataBlock (#4909)
cuibo01 Mar 2, 2022
6a13069
[HUDI-2631] In CompactFunction, set up the write schema each time wit…
yuzhaojing Mar 2, 2022
466a633
[HUDI-3469] Refactor `HoodieTestDataGenerator` to provide for reprodu…
Mar 2, 2022
fe4aefd
[HUDI-3315] RFC-35 Part-1 Support bucket index in Flink writer (#4679)
garyli1019 Mar 2, 2022
0b9f295
[minor] Cosmetic changes following HUDI-3315 (#4934)
danny0405 Mar 2, 2022
6731992
[MINOR] Adding more test props to integ tests (#4935)
nsivabalan Mar 2, 2022
4b975fd
[MINOR] RFC-38 markdown content error (#4933)
liujinhui1994 Mar 2, 2022
7a30b08
[HUDI-3264]: made schema registry urls configurable with MTDS (#4779)
pratyakshsharma Mar 2, 2022
d6e38af
[HUDI-2973] RFC-27: Data skipping index to improve query performance …
manojpec Mar 3, 2022
ef9ff1a
[HUDI-3544] Fixing "populate meta fields" update to metadata table (#…
nsivabalan Mar 3, 2022
dd7e772
[HUDI-3552] Strength the NetworkUtils#getHostname by checking network…
danny0405 Mar 3, 2022
8a4cfb7
[HUDI-3548] Fix if user specify key "hoodie.datasource.clustering.asy…
Mar 4, 2022
6af6076
[HUDI-3445] Support Clustering Command Based on Call Procedure Comman…
huberylee Mar 4, 2022
77b0f3f
[HUDI-3161][RFC-47] Add Call Produce Command for Spark SQL (#4607)
XuQianJin-Stars Mar 4, 2022
6bb4181
[MINOR] fix UTC timezone config (#4950)
YuweiXiao Mar 4, 2022
6c4b714
[HUDI-3348] Add UT to verify HoodieRealtimeFileSplit serde (#4951)
xushiyan Mar 4, 2022
b851feb
[HUDI-3460] Add reader merge memory option for flink (#4911)
cuibo01 Mar 4, 2022
4d86424
[HUDI-2761] Fixing timeline server for repeated refreshes (#4812)
nsivabalan Mar 5, 2022
0b21be2
[HUDI-3130] Fixing Hive getSchema for RT tables addressing different …
aditiwari01 Mar 6, 2022
55f5626
[HUDI-3520] Introduce DeleteSupportSchemaPostProcessor to support add…
wangxianghu Mar 6, 2022
d2aed60
[HUDI-3525] Introduce JsonkafkaSourceProcessor to support data prepro…
wangxianghu Mar 6, 2022
4c15551
[HUDI-3069] Improve HoodieMergedLogRecordScanner avoid putting unnece…
scxwhite Mar 7, 2022
b9230e0
[HUDI-3213] Making commit preserve metadata to true for compaction (#…
nsivabalan Mar 7, 2022
1e68d6f
[HUDI-3561] Avoid including whole `MultipleSparkJobExecutionStrategy`…
Mar 7, 2022
f28bad6
[HUDI-3365] Make sure Metadata Table records are updated appropriatel…
Mar 7, 2022
6fa32a0
[HUDI-2747] support set --sparkMaster for MDT cli (#4964)
zhangyue19921010 Mar 7, 2022
da9962b
[HUDI-3576] Configuring timeline refreshes based on latest commit (#4…
nsivabalan Mar 7, 2022
f52553b
[HUDI-3573] flink cleanFuntion execute clean on initialization (#4936)
todd5167 Mar 8, 2022
a5b9f66
[MINOR][HUDI-3460]Fix HoodieDataSourceITCase
cuibo01 Mar 6, 2022
69f058c
[HUDI-2677] Add DFS based message queue for flink writer[part3] (#4961)
danny0405 Mar 8, 2022
2a18375
[HUDI-3574] Improve maven module configs for different spark profiles…
XuQianJin-Stars Mar 8, 2022
8cba0a9
[HUDI-3584] Skip integ test modules by default (#4986)
xushiyan Mar 8, 2022
ced2def
[HUDI-3356][HUDI-3203] HoodieData for metadata index records; BloomFi…
codope Mar 8, 2022
1409c0b
[HUDI-3221] Support querying a table as of a savepoint (#4720)
XuQianJin-Stars Mar 8, 2022
cd47bc9
[HUDI-3587] Making SupportsUpgradeDowngrade serializable (#4991)
nsivabalan Mar 9, 2022
d22d93f
[HUDI-3568] Introduce ChainedSchemaPostProcessor to support setting m…
wangxianghu Mar 9, 2022
d0d6981
[HUDI-3383] Sync column comments while syncing a hive table (#4960)
MrSleeping123 Mar 10, 2022
180b690
[MINOR] Add IT CI Test timeout option (#5003)
XuQianJin-Stars Mar 10, 2022
b4770df
[HUDI-3396] Make sure `BaseFileOnlyViewRelation` only reads projected…
Mar 10, 2022
f76144b
[HUDI-3581] Reorganize some clazz for hudi flink (#4983)
danny0405 Mar 10, 2022
f7886f8
[HUDI-3602][DOCS] Update docker README to build multi-arch images usi…
codope Mar 10, 2022
fc6c7a7
[HUDI-3586] Add Trino Queries in integration tests (#4988)
yihua Mar 11, 2022
7d89404
[HUDI-3595] Fixing NULL schema provider for empty batch (#5002)
nsivabalan Mar 11, 2022
801c69d
[HUDI-3522] Introduce DropColumnSchemaPostProcessor to support drop c…
wangxianghu Mar 11, 2022
cf03735
[HUDI-2999] [RFC-42] RFC for consistent hashing index (#4326)
YuweiXiao Mar 11, 2022
d963079
[HUDI-3566] Add thread factory in BoundedInMemoryExecutor (#4926)
scxwhite Mar 11, 2022
5f59bcb
[HUDI-3575] Use HoodieTestDataGenerator#TRIP_SCHEMA as example schema…
wangxianghu Mar 11, 2022
04baf70
[HUDI-3567] Refactor HoodieCommonUtils to make code more reasonable (…
huberylee Mar 11, 2022
5da95d5
[HUDI-3513] Make sure Column Stats does not fail in case it fails to …
Mar 11, 2022
a51bdb5
[HUDI-3592] Fix NPE of DefaultHoodieRecordPayload if Property is empt…
Mar 11, 2022
9e1cad8
[HUDI-3569] Introduce ChainedJsonKafkaSourePostProcessor to support s…
wangxianghu Mar 11, 2022
5403db3
[HUDI-3556] Re-use rollback instant for rolling back of clustering an…
nsivabalan Mar 11, 2022
151ce1e
[HUDI-3593] Restore TypedProperties and flush checksum in table confi…
codope Mar 13, 2022
ff16cdc
[HUDI-3583] Fix MarkerBasedRollbackStrategy NoSuchElementException (#…
liujinhui1994 Mar 13, 2022
54808ec
[HUDI-3501] Support savepoints command based on Call Produce Command …
XuQianJin-Stars Mar 13, 2022
6530d83
[HUDI-3613] Adding/fixing yamls for metadata (#5029)
nsivabalan Mar 14, 2022
6570198
[HUDI-3600] Tweak the default cleaning strategy to be more streaming …
danny0405 Mar 14, 2022
967b336
fix NPE when run schdule using spark-sql if the commits time < hoodie…
peanut-chenzhong Mar 14, 2022
399eb8d
[MINODR] Remove repeated kafka-clients dependencies (#5034)
wangxianghu Mar 14, 2022
07d6929
[HUDI-3621] Fixing NullPointerException in DeltaStreamer (#5039)
nsivabalan Mar 14, 2022
f9ae271
[HUDI-3623] Removing hive sync node from non hive yamls (#5040)
nsivabalan Mar 14, 2022
31b54c7
[HUDI-3620] Adding spark3.2.0 profile (#5038)
nsivabalan Mar 14, 2022
95ef13c
[HUDI-3547] Introduce MaxwellSourcePostProcessor to extract data from…
wangxianghu Mar 15, 2022
1a7157a
[HUDI-3606] Add `org.objenesis:objenesis` to hudi-timeline-server-bun…
cdmikechen Mar 15, 2022
145440b
[HUDI-3619] Fix HoodieOperation fromValue using wrong constant value …
Mar 15, 2022
c0eecb5
[HUDI-3514] Rebase Data Skipping flow to rely on MT Column Stats inde…
Mar 15, 2022
035c3ca
[HUDI-3633] Allow non-string values to be set in TypedProperties (#5045)
codope Mar 15, 2022
ece2ae6
[HUDI-3589] flink sync hive metadata supports table properties and se…
todd5167 Mar 15, 2022
a55ce33
[HUDI-3588] Remove hudi-common and hudi-hadoop-mr jars in Presto Dock…
yihua Mar 16, 2022
895becc
[HUDI-3607] Support backend switch in HoodieFlinkStreamer (#5032)
liufangqi Mar 16, 2022
00b2e45
[Hudi-3376] Add an option to skip under deletion files for HoodieMeta…
zhangyue19921010 Mar 17, 2022
4512e96
[HUDI-3404] Automatically adjust write configs based on metadata tabl…
yihua Mar 17, 2022
c163ac2
[HUDI-3494] Consider triggering condition of MOR compaction during ar…
yihua Mar 17, 2022
402f60e
[HUDI-3645] Fix NPE caused by multiple threads accessing non-thread-s…
fengjian428 Mar 17, 2022
b825b8a
[HUDI-2439] Replace RDD with HoodieData in HoodieSparkTable and commi…
xushiyan Mar 17, 2022
029622b
[MINOR] HoodieFileScanRDD could print null path (#5056)
Mar 17, 2022
931747d
[HUDI-3598] Row Data to Hoodie Record Operator parallelism needs to a…
JerryYue-M Mar 18, 2022
75abad6
[HUDI-3656] Adding medium sized dataset for clustering and minor fixe…
nsivabalan Mar 18, 2022
9037045
[HUDI-3659] Reducing the validation frequency with integ tests (#5067)
nsivabalan Mar 18, 2022
9c40d0c
[HUDI-3457] Refactored Spark DataSource Relations to avoid code dupli…
Mar 19, 2022
d9ca8e1
[HUDI-3663] Fixing Column Stats index to properly handle first Data T…
Mar 20, 2022
dfc05b7
[MINOR] Remove flaky assert in TestInLineFileSystem (#5069)
yihua Mar 20, 2022
618fe26
[HUDI-3665] Support flink multiple versions (#5072)
danny0405 Mar 21, 2022
b28f5d2
[MINOR] Fixing sparkUpdateNode for record generation (#5079)
nsivabalan Mar 21, 2022
542cec6
[HUDI-3559] Flink bucket index with COW table throws NoSuchElementExc…
wxplovecc Mar 11, 2022
75056ea
[HUDI-1436]: Provide an option to trigger clean every nth commit (#4385)
pratyakshsharma Mar 22, 2022
d1e31f8
[HUDI-3640] Set SimpleKeyGenerator as default in 2to3 table upgrade f…
yihua Mar 22, 2022
e19b5d1
[HUDI-2883] Refactor hive sync tool / config to use reflection and st…
rmahindra123 Mar 22, 2022
b709f75
[HUDI-3642] Handle NPE due to empty requested replacecommit metadata …
codope Mar 23, 2022
1ce9a5e
Fixing non partitioned all files record in MDT (#5108)
nsivabalan Mar 24, 2022
dcbb074
[minor] Checks the data block type for archived timeline (#5106)
danny0405 Mar 24, 2022
0640f20
[HUDI-3689] Fix glob path and hive sync in deltastreamer tests (#5117)
codope Mar 24, 2022
d482527
[HUDI-3684] Fixing NPE in `ParquetUtils` (#5102)
Mar 24, 2022
7f5ee51
[HUDI-3689] Remove Azure CI cache (#5121)
xushiyan Mar 24, 2022
5558b79
[HUDI-3689] Fix UT failures in TestHoodieDeltaStreamer (#5120)
xushiyan Mar 24, 2022
a9b4110
[HUDI-3706] Downgrade maven surefire and failsafe version (#5123)
yihua Mar 24, 2022
ffac31e
[HUDI-3689] Fix delta streamer tests (#5124)
xushiyan Mar 24, 2022
5854243
[HUDI-3689] Disable flaky tests in TestHoodieDeltaStreamer (#5127)
yihua Mar 24, 2022
f8092a3
[HUDI-3624] Check all instants before starting a commit in metadata t…
yihua Mar 25, 2022
32b9700
[HUDI-3638] Make ZookeeperBasedLockProvider serializable (#5112)
yihua Mar 25, 2022
27adaa2
[HUDI-3701] Flink bulk_insert support bucket hash index (#5118)
danny0405 Mar 25, 2022
9c49e43
[HUDI-1180] Upgrade HBase to 2.4.9 (#5004)
yihua Mar 25, 2022
1959d8b
[HUDI-3703] Reset taskID in restoreWriteMetadata (#5122)
yuzhaojing Mar 25, 2022
4568fae
[HUDI-3580] Claim RFC number 48 for LogCompaction action RFC (#5128)
suryaprasanna Mar 25, 2022
c43747e
[HUDI-3678] Fix record rewrite of create handle when 'preserveMetadat…
danny0405 Mar 25, 2022
5b66abf
[HUDI-3594] Supporting Composite Expressions over Data Table Columns …
Mar 25, 2022
06ac8cb
[HUDI-3711] Fix typo in MaxwellJsonKafkaSourcePostProcessor.Config#PR…
wangxianghu Mar 25, 2022
24cc379
[HUDI-3563] Make quickstart examples covered by CI tests (#5082)
XuQianJin-Stars Mar 25, 2022
5a0a1e9
`Merge branch 'master' into HUDI-2815
stayrascal Apr 12, 2022
d9da263
Merge branch 'master' into HUDI-2815
stayrascal Apr 12, 2022
20b1ee4
fix conflict
stayrascal Apr 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public List<HoodieRecord<T>> deduplicateRecords(
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
boolean choosePrev = data1.equals(reducedData);
boolean choosePrev = data2.compareTo(data1) < 0;
HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey();
HoodieOperation operation = choosePrev ? rec1.getOperation() : rec2.getOperation();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we must need a compareTo here ?

Copy link
Contributor Author

@stayrascal stayrascal Feb 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous logic of data2.preCombine(data1) is that return one of data1 or data2 ordering by their orderVal. But if we merge/combine data1 and data2 into a new payload(reduceData), the data1.equals(reduceData) is always false. In order to get the HoodieKey and HoodieOperation for new HoodieRecord with reduceData, we need to get the latest HoodieKey and HoodieOperation from data1 and data2compareTo is used for replace #preCombine to compare their orderingVal.

 @Override
  public int compareTo(OverwriteWithLatestAvroPayload oldValue) {
    return orderingVal.compareTo(oldValue.orderingVal);
  }
@Test
  public void testCompareFunction() {
    GenericRecord record = new GenericData.Record(schema);
    record.put("id", "1");
    record.put("partition", "partition1");
    record.put("ts", 0L);
    record.put("_hoodie_is_deleted", false);
    record.put("city", "NY0");
    record.put("child", Arrays.asList("A"));

    PartialOverwriteWithLatestAvroPayload payload1 = new PartialOverwriteWithLatestAvroPayload(record, 1);
    PartialOverwriteWithLatestAvroPayload payload2 = new PartialOverwriteWithLatestAvroPayload(record, 2);

    assertEquals(payload1.compareTo(payload2), -1);
    assertEquals(payload2.compareTo(payload1), 1);
    assertEquals(payload1.compareTo(payload1), 0);
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, rec1 and rec2 should have same HoodieKey here, right, but the HodieOperation might different.

HoodieRecord<T> hoodieRecord = new HoodieAvroRecord<>(reducedKey, reducedData, operation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,8 @@ public String getFileId() {
public void setFileId(String fileId) {
this.fileId = fileId;
}

public HoodieRecordLocation toLocal(String instantTime) {
return new HoodieRecordLocation(instantTime, fileId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ default T preCombine(T oldValue, Properties properties) {
return preCombine(oldValue);
}

/**
* When more than one HoodieRecord have the same HoodieKey in the incoming batch, and get the merged result after calling preCombine method instead of choose one of two records,
* can call this method to get the order among combined record with previous records
* @param oldValue instance of the old {@link HoodieRecordPayload} to be compare.
* @return a negative integer, zero, or a positive integer as this object is less than, equal to, or greater than the specified object.
*
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
default int compareTo(T oldValue) {
return 0;
}

/**
* This methods is deprecated. Please refer to {@link #combineAndGetUpdateValue(IndexedRecord, Schema, Properties)} for java docs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,18 @@
public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {

/**
* the schema of generic record
*/
public final String schema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be confusing w/ schema arg with combineAndGetUpdateValue. can you fix either of the names.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but in general, storing schema along w/ payload might have an impact on the performance. and thats why initial payload was designed that way. So, do add a line here for payload implementations setting this schema field might have to watch out for performance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this field.


public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
this(record, orderingVal, null);
}

public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal, String schema) {
super(record, orderingVal);
this.schema = schema;
}

public OverwriteWithLatestAvroPayload(Option<GenericRecord> record) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.model;

import org.apache.hudi.common.util.Option;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro;

/**
* The only difference with {@link OverwriteNonDefaultsWithLatestAvroPayload} is that it supports
* merging the latest non-null partial fields with the old record instead of replacing the whole record.
* And merging the non-null fields during preCombine multiple records with same record key instead of choosing the latest record based on ordering field.
*
* <p> Regarding #combineAndGetUpdateValue, Assuming a {@link GenericRecord} has row schema: (f0 int , f1 int, f2 int).
* The first record value is: (1, 2, 3), the second record value is: (4, 5, null) with the field f2 value as null.
* Calling the #combineAndGetUpdateValue method of the two records returns record: (4, 5, 3).
* Note that field f2 value is ignored because it is null. </p>
*
* <p> Regarding #preCombine, Assuming a {@link GenericRecord} has row schema: (f0 int , f1 int, f2 int, o1 int),
* and initial two {@link PartialOverwriteWithLatestAvroPayload} with different ordering value.
* The first record value is (1, null, 1, 1) with the filed f1 value as null, the second value is: (2, 2, null, 2) with the f2 value as null.
* Calling the #preCombine method of the two records returns record: (2, 2, 1, 2).
* Note:
* <ol>
* <li>the field f0 value is 2 because the ordering value of second record is bigger.</li>
* <li>the filed f1 value is 2 because the f2 value of first record is null.</li>
* <li>the filed f2 value is 1 because the f2 value of second record is null.</li>
* <li>the filed o1 value is 2 because the ordering value of second record is bigger.</li>
* </ol>
*
* </p>
*/
public class PartialOverwriteWithLatestAvroPayload extends OverwriteWithLatestAvroPayload {

public PartialOverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
this(record, orderingVal, null);
}

public PartialOverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal, String schema) {
super(record, orderingVal, schema);
}

public PartialOverwriteWithLatestAvroPayload(Option<GenericRecord> record) {
super(record); // natural order
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
if (recordBytes.length == 0) {
return Option.empty();
}

GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
if (isDeleteRecord(incomingRecord)) {
return Option.empty();
}

GenericRecord currentRecord = (GenericRecord) currentValue;
List<Schema.Field> fields = schema.getFields();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guess, this has to be "this.schema.getFields". as I commented earlier, its confusing :) . can we fix the naming of either of them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed the schema name.

fields.forEach(field -> {
Object value = incomingRecord.get(field.name());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to deal w/ nested fields here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

current logic will overwrite whole nested field, if the incoming field is not null.

And I think we don't need to support the partial update inner nested field, for example, for Map, List, etc. we should not merge map(1 -> 'a', 2 -> 'b') & map(1 -> ''', 3 -> 'c') to map(1 -> '', 2 -> 'b', 3 -> 'c') incase the upstream want to delete the key '2', if we merge them together, they cannot delete some elements. the List as well.

if (Objects.nonNull(value)) {
currentRecord.put(field.name(), value);
}
});

return Option.of(currentRecord);
}

@Override
public int compareTo(OverwriteWithLatestAvroPayload oldValue) {
return orderingVal.compareTo(oldValue.orderingVal);
}

@Override
public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of storing the schema with payload, did you think about adding a new preCombine method as follows

OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Schema schema);

this would make it a lot simpler right. Since preCombine is used only to dedup records within a single batch, both records should have same schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @nsivabalan , thanks a lot for review this.

Regarding adding new preCombine method with Schema, I considered this, but it means that the method caller who need to get the schema info at first, and currently, it seems that we only can get the schema info from Configuration(from hoodie.avro.schema field). Sometimes, the caller might hard to get the schema info, Especially for FlinkWriteHeler.deduplicateRecords(List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism).

But compare the performance, it seems that passing the schema in method might be a better approach.
BTW, since we already have had the method preCombine(T oldValue, Properties properties), how about put the schema string in properties, and then parse the schema string to Schema later, so that we don't need to create a new method any more. otherwise, I cannot image when will we will Properties.

if (null == this.schema || null == oldValue.schema) {
return super.preCombine(oldValue);
}

try {
Schema schema = new Schema.Parser().parse(this.schema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

argh. this again clashes w/ instance variable "schema". Can we fix the naming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

Option<IndexedRecord> incomingOption = getInsertValue(new Schema.Parser().parse(this.schema));
Option<IndexedRecord> insertRecordOption = oldValue.getInsertValue(new Schema.Parser().parse(oldValue.schema));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

insertRecordOption -> oldRecordOption

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved.


if (incomingOption.isPresent() && insertRecordOption.isPresent()) {
GenericRecord currentRecord = (GenericRecord) incomingOption.get();
GenericRecord insertRecord = (GenericRecord) insertRecordOption.get();
boolean chooseCurrent = this.orderingVal.compareTo(oldValue.orderingVal) > 0;

if (!isDeleteRecord(insertRecord) && !isDeleteRecord(currentRecord)) {
schema.getFields().forEach(field -> {
Object insertValue = insertRecord.get(field.name());
Object currentValue = currentRecord.get(field.name());
currentRecord.put(field.name(), mergeValue(currentValue, insertValue, chooseCurrent));
});
return new PartialOverwriteWithLatestAvroPayload(currentRecord, chooseCurrent ? this.orderingVal : oldValue.orderingVal, this.schema);
} else {
return isDeleteRecord(insertRecord) ? this : oldValue;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be caution of the DELETEs, should we still merge for DELETE messages ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, if one of record is DELETE record, just return themselves directly, no need to merge, the DELETE message to delete old record during hudi write. Only when two records are not DELETE records, we need to merge them.

} else {
return insertRecordOption.isPresent() ? oldValue : this;
}
} catch (IOException e) {
return super.preCombine(oldValue);
}
}

private Object mergeValue(Object left, Object right, Boolean chooseLeft) {
if (null != left && null != right) {
return chooseLeft ? left : right;
} else {
return null == left ? right : left;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.model;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;

class PartialOverwriteWithLatestAvroPayloadTest {
private Schema schema;

@BeforeEach
public void setUp() throws Exception {
schema = Schema.createRecord("record", null, null, false, Arrays.asList(
new Schema.Field("id", Schema.create(Schema.Type.STRING), "", null),
new Schema.Field("partition", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", ""),
new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null),
new Schema.Field("_hoodie_is_deleted", Schema.create(Schema.Type.BOOLEAN), "", false),
new Schema.Field("city", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", null),
new Schema.Field("child", Schema.createArray(Schema.create(Schema.Type.STRING)), "", Collections.emptyList())
));
}

@Test
public void testActiveRecordsWithoutSchema() throws IOException {
GenericRecord record1 = new GenericData.Record(schema);
record1.put("id", "1");
record1.put("partition", "partition1");
record1.put("ts", 0L);
record1.put("_hoodie_is_deleted", false);
record1.put("city", "NY0");
record1.put("child", Arrays.asList("A"));

GenericRecord record2 = new GenericData.Record(schema);
record2.put("id", "2");
record2.put("partition", "");
record2.put("ts", 1L);
record2.put("_hoodie_is_deleted", false);
record2.put("city", "NY");
record2.put("child", Collections.emptyList());

GenericRecord record3 = new GenericData.Record(schema);
record3.put("id", "2");
record3.put("partition", "");
record3.put("ts", 1L);
record3.put("_hoodie_is_deleted", false);
record3.put("city", "NY");
record3.put("child", Arrays.asList("A"));


PartialOverwriteWithLatestAvroPayload payload1 = new PartialOverwriteWithLatestAvroPayload(record1, 1);
PartialOverwriteWithLatestAvroPayload payload2 = new PartialOverwriteWithLatestAvroPayload(record2, 2);
assertEquals(payload1.preCombine(payload2), payload2);
assertEquals(payload2.preCombine(payload1), payload2);

assertEquals(record1, payload1.getInsertValue(schema).get());
assertEquals(record2, payload2.getInsertValue(schema).get());

assertEquals(payload1.combineAndGetUpdateValue(record2, schema).get(), record1);
assertEquals(payload2.combineAndGetUpdateValue(record1, schema).get(), record3);
}

@Test
public void testCompareFunction() {
GenericRecord record = new GenericData.Record(schema);
record.put("id", "1");
record.put("partition", "partition1");
record.put("ts", 0L);
record.put("_hoodie_is_deleted", false);
record.put("city", "NY0");
record.put("child", Arrays.asList("A"));

PartialOverwriteWithLatestAvroPayload payload1 = new PartialOverwriteWithLatestAvroPayload(record, 1);
PartialOverwriteWithLatestAvroPayload payload2 = new PartialOverwriteWithLatestAvroPayload(record, 2);

assertEquals(payload1.compareTo(payload2), -1);
assertEquals(payload2.compareTo(payload1), 1);
assertEquals(payload1.compareTo(payload1), 0);
}

@Test
public void testActiveRecordsWithSchema() throws IOException {
GenericRecord record1 = new GenericData.Record(schema);
record1.put("id", "1");
record1.put("partition", "partition1");
record1.put("ts", 0L);
record1.put("_hoodie_is_deleted", false);
record1.put("city", null);
record1.put("child", Arrays.asList("A"));

GenericRecord record2 = new GenericData.Record(schema);
record2.put("id", "2");
record2.put("partition", null);
record2.put("ts", 1L);
record2.put("_hoodie_is_deleted", false);
record2.put("city", "NY");
record2.put("child", Collections.emptyList());

GenericRecord expectedRecord = new GenericData.Record(schema);
expectedRecord.put("id", "2");
expectedRecord.put("partition", "partition1");
expectedRecord.put("ts", 1L);
expectedRecord.put("_hoodie_is_deleted", false);
expectedRecord.put("city", "NY");
expectedRecord.put("child", Collections.emptyList());


PartialOverwriteWithLatestAvroPayload payload1 = new PartialOverwriteWithLatestAvroPayload(record1, 1, schema.toString());
PartialOverwriteWithLatestAvroPayload payload2 = new PartialOverwriteWithLatestAvroPayload(record2, 2, schema.toString());
PartialOverwriteWithLatestAvroPayload expectedPayload = new PartialOverwriteWithLatestAvroPayload(expectedRecord, 2, schema.toString());
assertArrayEquals(payload1.preCombine(payload2).recordBytes, expectedPayload.recordBytes);
assertArrayEquals(payload2.preCombine(payload1).recordBytes, expectedPayload.recordBytes);
assertEquals(payload1.preCombine(payload2).orderingVal, expectedPayload.orderingVal);
assertEquals(payload2.preCombine(payload1).orderingVal, expectedPayload.orderingVal);
}

@Test
public void testDeletedRecord() throws IOException {
GenericRecord record1 = new GenericData.Record(schema);
record1.put("id", "1");
record1.put("partition", "partition0");
record1.put("ts", 0L);
record1.put("_hoodie_is_deleted", false);
record1.put("city", "NY0");
record1.put("child", Collections.emptyList());

GenericRecord delRecord1 = new GenericData.Record(schema);
delRecord1.put("id", "2");
delRecord1.put("partition", "partition1");
delRecord1.put("ts", 1L);
delRecord1.put("_hoodie_is_deleted", true);
delRecord1.put("city", "NY0");
delRecord1.put("child", Collections.emptyList());

GenericRecord record2 = new GenericData.Record(schema);
record2.put("id", "1");
record2.put("partition", "partition0");
record2.put("ts", 0L);
record2.put("_hoodie_is_deleted", true);
record2.put("city", "NY0");
record2.put("child", Collections.emptyList());

PartialOverwriteWithLatestAvroPayload payload1 = new PartialOverwriteWithLatestAvroPayload(record1, 1, schema.toString());
PartialOverwriteWithLatestAvroPayload payload2 = new PartialOverwriteWithLatestAvroPayload(delRecord1, 2, schema.toString());

assertEquals(payload1.preCombine(payload2), payload1);
assertEquals(payload2.preCombine(payload1), payload1);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,12 @@ private FlinkOptions() {
.withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n"
+ "This will render any value set for the option in-effective");

public static final ConfigOption<Boolean> PARTIAL_OVERWRITE_ENABLED = ConfigOptions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the idea for this additional configuration (beside the record payload class)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the feature toggle to control another change about BucketAssignFunction to support the case the record partition path is changed, but I have removed it. so this feature toggle can be removed as well.

.key("partial.overwrite.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Partial overwrite payload, the write.payload.class should be org.apache.hudi.common.model.PartialOverwriteWithLatestAvroPayload when it is true");

/**
* Flag to indicate whether to drop duplicates before insert/upsert.
* By default false to gain extra performance.
Expand Down
Loading