Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
b03797d
XXX
Apr 30, 2022
125cff3
Inlined `KeyGenerator.getRecordKeyFieldNames` usages, rebased onto `g…
May 6, 2022
47bca25
Cleaned up `SparkKeyGeneratorInterface`: moved in API defs from `Buil…
May 6, 2022
3983e40
Rebased `BuiltinKeyGenerator` to return `UTF8String` for API ingestin…
May 6, 2022
3a1fa7c
Abstracted away nested-field path access, and Spark's `Row` conversio…
May 7, 2022
262217d
Rebased `SimpleKeyGenerator`, `NonpartitionedKeyGenerator` onto new f…
May 7, 2022
0e98599
Propery handle null partition-path parts
May 7, 2022
d33037e
Rebased remaining builtin key generators
May 7, 2022
ca3eb23
Modified `SparkKeyGeneratorInterface` to return `UTF8String` instead …
May 7, 2022
ef77760
Added `NestedFieldPath` to improve readability when used in Java
May 7, 2022
d69a515
Cleaned up `RowKeyGeneratorHelper`
May 7, 2022
4249ee2
Fixed `HoodieDatasetBulkInsertHelper`
May 7, 2022
0a8937b
Fixing compilation (after rebase)
Jul 15, 2022
5396498
`lint`
Jul 15, 2022
44af96f
Fixing partition-path extraction in `BulkInsertDataInternalWriterHelper`
Jul 15, 2022
70aba21
Fixing compilation
Jul 15, 2022
80bbfd8
Fixing tests
Jul 15, 2022
c693292
Cloned `UTF8StringBuilder` to accommodate for Spark 2
Jul 15, 2022
5a6f50a
Fixing invalid cast
Jul 16, 2022
96be961
Added missing partition path encoding;
Jul 16, 2022
b51306d
Fixing compilation
Jul 18, 2022
dc1c556
Fixing raw values deserialization when extracted from `InternalRow`s;
Jul 18, 2022
b13b455
Tidying up
Jul 18, 2022
f206c49
Extracting composite-key proper composition sequence into `BuiltinKey…
Jul 18, 2022
f9027b8
Fixed handling of null/empty key-parts for composite-keys to be in-sy…
Jul 18, 2022
457abfe
Handle failing to resolve nested field-paths;
Jul 18, 2022
19496ec
Fixed handling all-null/empty composite keys
Jul 18, 2022
105cf4e
Fixing encoding for `Row` seq;
Jul 18, 2022
1ea962d
Tidying up
Jul 18, 2022
7f87cb7
Fixed empty/null partition-path part handling
Jul 19, 2022
7c88042
Fixing typo;
Jul 19, 2022
3e5bb5c
Fixing more tests
Jul 19, 2022
fcfb212
Fixed expected exception wrapping
Jul 19, 2022
4e160de
Fixed NPE
Jul 19, 2022
ac3c8af
Unified key-gen sequences to be generic across String/UTF8String
Jul 19, 2022
8965fac
Fixed null/empty predicate testing
Jul 19, 2022
ab04d50
`lint`
Jul 19, 2022
f2764c7
Rewrote key-gen tests to avoid duplication, add more cases;
Jul 19, 2022
c86f7e8
Fixed tests
Jul 19, 2022
65856a4
Fixing NPE
Jul 19, 2022
8eca13b
Fixing tests
Jul 19, 2022
e07428e
Fixed invalid predicate
Jul 19, 2022
2773d05
Tidying up
Jul 19, 2022
85a30b1
Reverting `KeyGenerator`s API breaking name change
Jul 19, 2022
5c973ed
Fixed more tests
Jul 19, 2022
21a7ad5
Tidying up, elaborating docs
Jul 19, 2022
33defc6
Fixing tests
Jul 20, 2022
8466484
Fixed `HoodieRowCreateHandle` to properly instantiate `HoodieParquetW…
Jul 20, 2022
865baea
Fixed `HoodieRowCreateHandle` to properly write w/ and w/o meta-fields
Jul 20, 2022
1922a94
Fixing `CustomKeyGenerator`
Jul 20, 2022
ed3c096
Fixed `HoodieTimer` API
Jul 20, 2022
d8b9d3d
Fixing `CustomKeyGenerator` some more
Jul 20, 2022
8521cfa
Fixing tests
Jul 20, 2022
8825b2f
Fixed `UTF8String` handling in `TimestampBasedKeyGenerator`;
Jul 20, 2022
bd92244
Fixing mutable-buffer-aliasing bug `HoodieInsertDataInternalWriterHel…
Jul 20, 2022
1af1cc9
Fallback to default for hive-style partitioning, url-encoding configs
Jul 22, 2022
bf52e67
Always dump stdout (since sometimes spark-shell wouldn't exit, and do…
Jul 22, 2022
dbaafef
Reverting `KafkaConnectUtils` change
Jul 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public ComplexAvroKeyGenerator(TypedProperties props) {

@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields(), isConsistentLogicalTimestampEnabled());
return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ public String getPartitionPath(GenericRecord record) {
@Override
public String getRecordKey(GenericRecord record) {
validateRecordKeyFields();
return getRecordKeyFields().size() == 1
return getRecordKeyFieldNames().size() == 1
? new SimpleAvroKeyGenerator(config).getRecordKey(record)
: new ComplexAvroKeyGenerator(config).getRecordKey(record);
}

private void validateRecordKeyFields() {
if (getRecordKeyFields() == null || getRecordKeyFields().isEmpty()) {
if (getRecordKeyFieldNames() == null || getRecordKeyFieldNames().isEmpty()) {
throw new HoodieKeyException("Unable to find field names for record key in cfg");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public GlobalAvroDeleteKeyGenerator(TypedProperties config) {

@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields(), isConsistentLogicalTimestampEnabled());
return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class KeyGenUtils {

protected static final String HUDI_DEFAULT_PARTITION_PATH = PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
public static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
public static final String DEFAULT_RECORD_KEY_PARTS_SEPARATOR = ",";

/**
* Fetches record key from the GenericRecord.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ public String getRecordKey(GenericRecord record) {
// for backward compatibility, we need to use the right format according to the number of record key fields
// 1. if there is only one record key field, the format of record key is just "<value>"
// 2. if there are multiple record key fields, the format is "<field1>:<value1>,<field2>:<value2>,..."
if (getRecordKeyFields().size() == 1) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0), isConsistentLogicalTimestampEnabled());
if (getRecordKeyFieldNames().size() == 1) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
}
return KeyGenUtils.getRecordKey(record, getRecordKeyFields(), isConsistentLogicalTimestampEnabled());
return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
}

public String getEmptyPartition() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public SimpleAvroKeyGenerator(TypedProperties props) {

@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0), isConsistentLogicalTimestampEnabled());
return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public String getPartitionPath(GenericRecord record) {
try {
return getPartitionPath(partitionVal);
} catch (Exception e) {
throw new HoodieKeyGeneratorException("Unable to parse input partition field :" + partitionVal, e);
throw new HoodieKeyGeneratorException("Unable to parse input partition field: " + partitionVal, e);
}
}

Expand Down Expand Up @@ -181,7 +181,7 @@ public String getPartitionPath(Object partitionVal) {
timeMs = convertLongTimeToMillis(((LocalDate) partitionVal).toEpochDay());
} else if (partitionVal instanceof CharSequence) {
if (!inputFormatter.isPresent()) {
throw new HoodieException("Missing inputformatter. Ensure " + KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
throw new HoodieException("Missing input formatter. Ensure " + KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
}
DateTime parsedDateTime = inputFormatter.get().parseDateTime(partitionVal.toString());
if (this.outputDateTimeZone == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.MapData;
import org.apache.spark.sql.types.DataType;
Expand All @@ -34,7 +35,7 @@
* Hudi internal implementation of the {@link InternalRow} allowing to extend arbitrary
* {@link InternalRow} overlaying Hudi-internal meta-fields on top of it.
*
* Capable of overlaying meta-fields in both cases: whether original {@link #row} contains
* Capable of overlaying meta-fields in both cases: whether original {@link #sourceRow} contains
* meta columns or not. This allows to handle following use-cases allowing to avoid any
* manipulation (reshuffling) of the source row, by simply creating new instance
* of {@link HoodieInternalRow} with all the meta-values provided
Expand All @@ -50,22 +51,27 @@ public class HoodieInternalRow extends InternalRow {

/**
* Collection of meta-fields as defined by {@link HoodieRecord#HOODIE_META_COLUMNS}
*
* NOTE: {@code HoodieInternalRow} *always* overlays its own meta-fields even in case
* when source row also contains them, to make sure these fields are mutable and
* can be updated (for ex, {@link UnsafeRow} doesn't support mutations due to
* its memory layout, as it persists field offsets)
*/
private final UTF8String[] metaFields;
private final InternalRow row;
private final InternalRow sourceRow;

/**
* Specifies whether source {@link #row} contains meta-fields
* Specifies whether source {@link #sourceRow} contains meta-fields
*/
private final boolean containsMetaFields;
private final boolean sourceContainsMetaFields;
Comment on lines +61 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these renaming necessary in this PR? Next time, I would prefer to have them in a separate PR to avoid reviewing overhead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, renamed to resolve confusion that arose in our previous discussion regarding the scope of these fields

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 this is already part of guidelines. https://hudi.apache.org/contribute/developer-setup#coding-guidelines next time, lets not land any PRs violating this, until resolved.


public HoodieInternalRow(UTF8String commitTime,
UTF8String commitSeqNumber,
UTF8String recordKey,
UTF8String partitionPath,
UTF8String fileName,
InternalRow row,
boolean containsMetaFields) {
InternalRow sourceRow,
boolean sourceContainsMetaFields) {
this.metaFields = new UTF8String[] {
commitTime,
commitSeqNumber,
Expand All @@ -74,29 +80,29 @@ public HoodieInternalRow(UTF8String commitTime,
fileName
};

this.row = row;
this.containsMetaFields = containsMetaFields;
this.sourceRow = sourceRow;
this.sourceContainsMetaFields = sourceContainsMetaFields;
}

private HoodieInternalRow(UTF8String[] metaFields,
InternalRow row,
boolean containsMetaFields) {
InternalRow sourceRow,
boolean sourceContainsMetaFields) {
this.metaFields = metaFields;
this.row = row;
this.containsMetaFields = containsMetaFields;
this.sourceRow = sourceRow;
this.sourceContainsMetaFields = sourceContainsMetaFields;
}

@Override
public int numFields() {
return row.numFields();
return sourceRow.numFields();
}

@Override
public void setNullAt(int ordinal) {
if (ordinal < metaFields.length) {
metaFields[ordinal] = null;
} else {
row.setNullAt(rebaseOrdinal(ordinal));
sourceRow.setNullAt(rebaseOrdinal(ordinal));
}
}

Expand All @@ -112,7 +118,7 @@ public void update(int ordinal, Object value) {
String.format("Could not update the row at (%d) with value of type (%s), either UTF8String or String are expected", ordinal, value.getClass().getSimpleName()));
}
} else {
row.update(rebaseOrdinal(ordinal), value);
sourceRow.update(rebaseOrdinal(ordinal), value);
}
}

Expand All @@ -121,113 +127,113 @@ public boolean isNullAt(int ordinal) {
if (ordinal < metaFields.length) {
return metaFields[ordinal] == null;
}
return row.isNullAt(rebaseOrdinal(ordinal));
return sourceRow.isNullAt(rebaseOrdinal(ordinal));
}

@Override
public UTF8String getUTF8String(int ordinal) {
if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
if (ordinal < metaFields.length) {
return metaFields[ordinal];
}
return row.getUTF8String(rebaseOrdinal(ordinal));
return sourceRow.getUTF8String(rebaseOrdinal(ordinal));
}

@Override
public Object get(int ordinal, DataType dataType) {
if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
if (ordinal < metaFields.length) {
validateMetaFieldDataType(dataType);
return metaFields[ordinal];
}
return row.get(rebaseOrdinal(ordinal), dataType);
return sourceRow.get(rebaseOrdinal(ordinal), dataType);
}

@Override
public boolean getBoolean(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, Boolean.class);
return row.getBoolean(rebaseOrdinal(ordinal));
return sourceRow.getBoolean(rebaseOrdinal(ordinal));
}

@Override
public byte getByte(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, Byte.class);
return row.getByte(rebaseOrdinal(ordinal));
return sourceRow.getByte(rebaseOrdinal(ordinal));
}

@Override
public short getShort(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, Short.class);
return row.getShort(rebaseOrdinal(ordinal));
return sourceRow.getShort(rebaseOrdinal(ordinal));
}

@Override
public int getInt(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, Integer.class);
return row.getInt(rebaseOrdinal(ordinal));
return sourceRow.getInt(rebaseOrdinal(ordinal));
}

@Override
public long getLong(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, Long.class);
return row.getLong(rebaseOrdinal(ordinal));
return sourceRow.getLong(rebaseOrdinal(ordinal));
}

@Override
public float getFloat(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, Float.class);
return row.getFloat(rebaseOrdinal(ordinal));
return sourceRow.getFloat(rebaseOrdinal(ordinal));
}

@Override
public double getDouble(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, Double.class);
return row.getDouble(rebaseOrdinal(ordinal));
return sourceRow.getDouble(rebaseOrdinal(ordinal));
}

@Override
public Decimal getDecimal(int ordinal, int precision, int scale) {
ruleOutMetaFieldsAccess(ordinal, Decimal.class);
return row.getDecimal(rebaseOrdinal(ordinal), precision, scale);
return sourceRow.getDecimal(rebaseOrdinal(ordinal), precision, scale);
}

@Override
public byte[] getBinary(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, Byte[].class);
return row.getBinary(rebaseOrdinal(ordinal));
return sourceRow.getBinary(rebaseOrdinal(ordinal));
}

@Override
public CalendarInterval getInterval(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, CalendarInterval.class);
return row.getInterval(rebaseOrdinal(ordinal));
return sourceRow.getInterval(rebaseOrdinal(ordinal));
}

@Override
public InternalRow getStruct(int ordinal, int numFields) {
ruleOutMetaFieldsAccess(ordinal, InternalRow.class);
return row.getStruct(rebaseOrdinal(ordinal), numFields);
return sourceRow.getStruct(rebaseOrdinal(ordinal), numFields);
}

@Override
public ArrayData getArray(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, ArrayData.class);
return row.getArray(rebaseOrdinal(ordinal));
return sourceRow.getArray(rebaseOrdinal(ordinal));
}

@Override
public MapData getMap(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, MapData.class);
return row.getMap(rebaseOrdinal(ordinal));
return sourceRow.getMap(rebaseOrdinal(ordinal));
}

@Override
public InternalRow copy() {
return new HoodieInternalRow(Arrays.copyOf(metaFields, metaFields.length), row.copy(), containsMetaFields);
return new HoodieInternalRow(Arrays.copyOf(metaFields, metaFields.length), sourceRow.copy(), sourceContainsMetaFields);
}

private int rebaseOrdinal(int ordinal) {
// NOTE: In cases when source row does not contain meta fields, we will have to
// rebase ordinal onto its indexes
return containsMetaFields ? ordinal : ordinal - metaFields.length;
return sourceContainsMetaFields ? ordinal : ordinal - metaFields.length;
}

private void validateMetaFieldDataType(DataType dataType) {
Expand Down
Loading