Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
Expand Down Expand Up @@ -889,6 +890,10 @@ public String getKeyGeneratorClass() {
return getString(KEYGENERATOR_CLASS_NAME);
}

public boolean isConsistentLogicalTimestampEnabled() {
return getBooleanOrDefault(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
}

public Boolean shouldAutoCommit() {
return getBoolean(AUTO_COMMIT_ENABLE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ public ComplexAvroKeyGenerator(TypedProperties props) {

@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
return KeyGenUtils.getRecordKey(record, getRecordKeyFields(), isConsistentLogicalTimestampEnabled());
}

@Override
public String getPartitionPath(GenericRecord record) {
return KeyGenUtils.getRecordPartitionPath(record, getPartitionPathFields(), hiveStylePartitioning, encodePartitionPath);
return KeyGenUtils.getRecordPartitionPath(record, getPartitionPathFields(), hiveStylePartitioning, encodePartitionPath, isConsistentLogicalTimestampEnabled());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public GlobalAvroDeleteKeyGenerator(TypedProperties config) {

@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
return KeyGenUtils.getRecordKey(record, getRecordKeyFields(), isConsistentLogicalTimestampEnabled());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static String getPartitionPathFromGenericRecord(GenericRecord genericReco

/**
* Extracts the record key fields in strings out of the given record key,
* this is the reverse operation of {@link #getRecordKey(GenericRecord, String)}.
* this is the reverse operation of {@link #getRecordKey(GenericRecord, String, boolean)}.
*
* @see SimpleAvroKeyGenerator
* @see org.apache.hudi.keygen.ComplexAvroKeyGenerator
Expand All @@ -89,11 +89,11 @@ public static String[] extractRecordKeys(String recordKey) {
}
}

public static String getRecordKey(GenericRecord record, List<String> recordKeyFields) {
public static String getRecordKey(GenericRecord record, List<String> recordKeyFields, boolean consistentLogicalTimestampEnabled) {
boolean keyIsNullEmpty = true;
StringBuilder recordKey = new StringBuilder();
for (String recordKeyField : recordKeyFields) {
String recordKeyValue = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true);
String recordKeyValue = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true, consistentLogicalTimestampEnabled);
if (recordKeyValue == null) {
recordKey.append(recordKeyField + ":" + NULL_RECORDKEY_PLACEHOLDER + ",");
} else if (recordKeyValue.isEmpty()) {
Expand All @@ -112,14 +112,14 @@ public static String getRecordKey(GenericRecord record, List<String> recordKeyFi
}

public static String getRecordPartitionPath(GenericRecord record, List<String> partitionPathFields,
boolean hiveStylePartitioning, boolean encodePartitionPath) {
boolean hiveStylePartitioning, boolean encodePartitionPath, boolean consistentLogicalTimestampEnabled) {
if (partitionPathFields.isEmpty()) {
return "";
}

StringBuilder partitionPath = new StringBuilder();
for (String partitionPathField : partitionPathFields) {
String fieldVal = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true);
String fieldVal = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true, consistentLogicalTimestampEnabled);
if (fieldVal == null || fieldVal.isEmpty()) {
partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + HUDI_DEFAULT_PARTITION_PATH
: HUDI_DEFAULT_PARTITION_PATH);
Expand All @@ -135,17 +135,17 @@ public static String getRecordPartitionPath(GenericRecord record, List<String> p
return partitionPath.toString();
}

public static String getRecordKey(GenericRecord record, String recordKeyField) {
String recordKey = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true);
public static String getRecordKey(GenericRecord record, String recordKeyField, boolean consistentLogicalTimestampEnabled) {
String recordKey = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true, consistentLogicalTimestampEnabled);
if (recordKey == null || recordKey.isEmpty()) {
throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty.");
}
return recordKey;
}

public static String getPartitionPath(GenericRecord record, String partitionPathField,
boolean hiveStylePartitioning, boolean encodePartitionPath) {
String partitionPath = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true);
boolean hiveStylePartitioning, boolean encodePartitionPath, boolean consistentLogicalTimestampEnabled) {
String partitionPath = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true, consistentLogicalTimestampEnabled);
if (partitionPath == null || partitionPath.isEmpty()) {
partitionPath = HUDI_DEFAULT_PARTITION_PATH;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ public String getRecordKey(GenericRecord record) {
// 1. if there is only one record key field, the format of record key is just "<value>"
// 2. if there are multiple record key fields, the format is "<field1>:<value1>,<field2>:<value2>,..."
if (getRecordKeyFieldNames().size() == 1) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0));
return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0), isConsistentLogicalTimestampEnabled());
}
return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
return KeyGenUtils.getRecordKey(record, getRecordKeyFields(), isConsistentLogicalTimestampEnabled());
}

public String getEmptyPartition() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ public SimpleAvroKeyGenerator(TypedProperties props) {

@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0));
return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0), isConsistentLogicalTimestampEnabled());
}

@Override
public String getPartitionPath(GenericRecord record) {
return KeyGenUtils.getPartitionPath(record, getPartitionPathFields().get(0), hiveStylePartitioning, encodePartitionPath);
return KeyGenUtils.getPartitionPath(record, getPartitionPathFields().get(0), hiveStylePartitioning, encodePartitionPath, isConsistentLogicalTimestampEnabled());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -125,7 +126,7 @@ public TimestampBasedAvroKeyGenerator(TypedProperties config) throws IOException

@Override
public String getPartitionPath(GenericRecord record) {
Object partitionVal = HoodieAvroUtils.getNestedFieldVal(record, getPartitionPathFields().get(0), true);
Object partitionVal = HoodieAvroUtils.getNestedFieldVal(record, getPartitionPathFields().get(0), true, isConsistentLogicalTimestampEnabled());
if (partitionVal == null) {
partitionVal = getDefaultPartitionVal();
}
Expand Down Expand Up @@ -191,6 +192,8 @@ public String getPartitionPath(Object partitionVal) {
timeMs = convertLongTimeToMillis(((Float) partitionVal).longValue());
} else if (partitionVal instanceof Long) {
timeMs = convertLongTimeToMillis((Long) partitionVal);
} else if (partitionVal instanceof Timestamp && isConsistentLogicalTimestampEnabled()) {
timeMs = ((Timestamp) partitionVal).getTime();
} else if (partitionVal instanceof Integer) {
timeMs = convertLongTimeToMillis(((Integer) partitionVal).longValue());
} else if (partitionVal instanceof BigDecimal) {
Expand Down Expand Up @@ -225,5 +228,4 @@ private long convertLongTimeToMillis(Long partitionVal) {
}
return MILLISECONDS.convert(partitionVal, timeUnit);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public HoodieTableMetadataKeyGenerator(TypedProperties config) {

@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY);
return KeyGenUtils.getRecordKey(record, HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY, isConsistentLogicalTimestampEnabled());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ protected Option<BulkInsertPartitioner<T>> getPartitioner(Map<String, String> st
if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
return Option.of(new JavaCustomColumnsSortPartitioner(
strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
HoodieAvroUtils.addMetadataFields(schema)));
HoodieAvroUtils.addMetadataFields(schema),
getWriteConfig().isConsistentLogicalTimestampEnabled()));
} else {
return Option.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,20 @@ public class JavaCustomColumnsSortPartitioner<T extends HoodieRecordPayload>

private final String[] sortColumnNames;
private final Schema schema;
private final boolean consistentLogicalTimestampEnabled;

public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema) {
public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema, boolean consistentLogicalTimestampEnabled) {
this.sortColumnNames = columnNames;
this.schema = schema;
this.consistentLogicalTimestampEnabled = consistentLogicalTimestampEnabled;
}

@Override
public List<HoodieRecord<T>> repartitionRecords(
List<HoodieRecord<T>> records, int outputSparkPartitions) {
return records.stream().sorted((o1, o2) -> {
Object values1 = HoodieAvroUtils.getRecordColumnValues(o1, sortColumnNames, schema);
Object values2 = HoodieAvroUtils.getRecordColumnValues(o2, sortColumnNames, schema);
Object values1 = HoodieAvroUtils.getRecordColumnValues(o1, sortColumnNames, schema, consistentLogicalTimestampEnabled);
Object values2 = HoodieAvroUtils.getRecordColumnValues(o2, sortColumnNames, schema, consistentLogicalTimestampEnabled);
return values1.toString().compareTo(values2.toString());
}).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ public void testCustomColumnSortPartitioner(String sortColumnString) throws Exce

List<HoodieRecord> records = generateTestRecordsForBulkInsert(1000);
testBulkInsertInternalPartitioner(
new JavaCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA),
new JavaCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false),
records, true, generatePartitionNumRecords(records), Option.of(columnComparator));
}

private Comparator<HoodieRecord> getCustomColumnComparator(Schema schema, String[] sortColumns) {
return Comparator.comparing(
record -> HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema).toString());
record -> HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema, false).toString());
}

private void verifyRecordAscendingOrder(List<HoodieRecord> records,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> performClustering(final HoodieC
return writeMetadata;
}


/**
* Execute clustering to write inputRecords into new files as defined by rules in strategy parameters.
* The number of new file groups created is bounded by numOutputGroups.
Expand Down Expand Up @@ -141,7 +140,7 @@ protected Option<BulkInsertPartitioner<T>> getPartitioner(Map<String, String> st
getWriteConfig(), HoodieAvroUtils.addMetadataFields(schema)));
} else if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
return Option.of(new RDDCustomColumnsSortPartitioner(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
HoodieAvroUtils.addMetadataFields(schema)));
HoodieAvroUtils.addMetadataFields(schema), getWriteConfig().isConsistentLogicalTimestampEnabled()));
} else {
return Option.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,29 @@ public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload>

private final String[] sortColumnNames;
private final SerializableSchema serializableSchema;
private final boolean consistentLogicalTimestampEnabled;

public RDDCustomColumnsSortPartitioner(HoodieWriteConfig config) {
this.serializableSchema = new SerializableSchema(new Schema.Parser().parse(config.getSchema()));
this.sortColumnNames = getSortColumnName(config);
this.consistentLogicalTimestampEnabled = config.isConsistentLogicalTimestampEnabled();
}

public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema) {
public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema, boolean consistentLogicalTimestampEnabled) {
this.sortColumnNames = columnNames;
this.serializableSchema = new SerializableSchema(schema);
this.consistentLogicalTimestampEnabled = consistentLogicalTimestampEnabled;
}

@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
int outputSparkPartitions) {
final String[] sortColumns = this.sortColumnNames;
final SerializableSchema schema = this.serializableSchema;
final boolean consistentLogicalTimestampEnabled = this.consistentLogicalTimestampEnabled;
return records.sortBy(
record -> {
Object recordValue = HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema);
Object recordValue = HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema, consistentLogicalTimestampEnabled);
// null values are replaced with empty string for null_first order
if (recordValue == null) {
return StringUtils.EMPTY_STRING;
Expand All @@ -66,7 +70,6 @@ record -> {
}
},
true, outputSparkPartitions);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ public void testCustomColumnSortPartitioner() throws Exception {

JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
JavaRDD<HoodieRecord> records2 = generateTripleTestRecordsForBulkInsert(jsc);
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA),
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false),
records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA),
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false),
records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));

HoodieWriteConfig config = HoodieWriteConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void testBucketIdWithSimpleRecordKey() {
String indexKeyField = "_row_key";
GenericRecord record = KeyGeneratorTestUtilities.getRecord();
HoodieRecord hoodieRecord = new HoodieRecord(
new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField), ""), null);
new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField, false), ""), null);
int bucketId = BucketIdentifier.getBucketId(hoodieRecord, indexKeyField, 8);
assert bucketId == BucketIdentifier.getBucketId(
Arrays.asList(record.get(indexKeyField).toString()), 8);
Expand All @@ -57,7 +57,7 @@ public void testBucketIdWithComplexRecordKey() {
String indexKeyField = "_row_key";
GenericRecord record = KeyGeneratorTestUtilities.getRecord();
HoodieRecord hoodieRecord = new HoodieRecord(
new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField), ""), null);
new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField, false), ""), null);
int bucketId = BucketIdentifier.getBucketId(hoodieRecord, indexKeyField, 8);
assert bucketId == BucketIdentifier.getBucketId(
Arrays.asList(record.get(indexKeyField).toString()), 8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,40 @@ public void testScalar() throws IOException {
assertEquals("2021-04-19", keyGen.getPartitionPath(baseRow));
}

@Test
public void testScalarWithLogicalType() throws IOException {
schema = SchemaTestUtil.getTimestampWithLogicalTypeSchema();
structType = AvroConversionUtils.convertAvroSchemaToStructType(schema);
baseRecord = SchemaTestUtil.generateAvroRecordFromJson(schema, 1, "001", "f1");
baseRecord.put("createTime", 1638513806000000L);

properties = getBaseKeyConfig("SCALAR", "yyyy/MM/dd", "GMT", "MICROSECONDS");
properties.setProperty(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), "true");
TimestampBasedKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk1 = keyGen.getKey(baseRecord);
assertEquals("2021/12/03", hk1.getPartitionPath());

// test w/ Row
baseRow = genericRecordToRow(baseRecord);
assertEquals("2021/12/03", keyGen.getPartitionPath(baseRow));
internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
assertEquals("2021/12/03", keyGen.getPartitionPath(internalRow, baseRow.schema()));

// timezone is GMT, createTime is null
baseRecord.put("createTime", null);
properties = getBaseKeyConfig("SCALAR", "yyyy/MM/dd", "GMT", "MICROSECONDS");
properties.setProperty(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), "true");
keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk2 = keyGen.getKey(baseRecord);
assertEquals("1970/01/01", hk2.getPartitionPath());

// test w/ Row
baseRow = genericRecordToRow(baseRecord);
assertEquals("1970/01/01", keyGen.getPartitionPath(baseRow));
internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
assertEquals("1970/01/01", keyGen.getPartitionPath(internalRow, baseRow.schema()));
}

@Test
public void test_ExpectsMatch_SingleInputFormat_ISO8601WithMsZ_OutputTimezoneAsUTC() throws IOException {
baseRecord.put("createTime", "2020-04-01T13:01:33.428Z");
Expand Down
Loading