Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String b
DataSourceWriteOptions.DEFAULT_HIVE_USE_JDBC_OPT_VAL()));
hiveSyncConfig.skipROSuffix = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX(),
DataSourceWriteOptions.DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL()));
hiveSyncConfig.supportTimestamp = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP(),
DataSourceWriteOptions.DEFAULT_HIVE_SUPPORT_TIMESTAMP()));
return hiveSyncConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ object DataSourceWriteOptions {
val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY = "hoodie.datasource.hive_sync.use_pre_apache_input_format"
val HIVE_USE_JDBC_OPT_KEY = "hoodie.datasource.hive_sync.use_jdbc"
val HIVE_SKIP_RO_SUFFIX = "hoodie.datasource.hive_sync.skip_ro_suffix"
val HIVE_SUPPORT_TIMESTAMP = "hoodie.datasource.hive_sync.support_timestamp"

// DEFAULT FOR HIVE SPECIFIC CONFIGS
val DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL = "false"
Expand All @@ -307,6 +308,7 @@ object DataSourceWriteOptions {
val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false"
val DEFAULT_HIVE_USE_JDBC_OPT_VAL = "true"
val DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL = "false"
val DEFAULT_HIVE_SUPPORT_TIMESTAMP = "false"

// Async Compaction - Enabled by default for MOR
val ASYNC_COMPACT_ENABLE_OPT_KEY = "hoodie.datasource.compaction.async.enable"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ private[hudi] object HoodieSparkSqlWriter {
ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList: _*)
hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean
hiveSyncConfig.supportTimestamp = parameters.get(HIVE_SUPPORT_TIMESTAMP).exists(r => r.toBoolean)
hiveSyncConfig
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ public class DLASyncConfig implements Serializable {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;

@Parameter(names = {"--support-timestamp"}, description = "If true, converts int64(timestamp_micros) to timestamp type")
public Boolean supportTimestamp = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to add this option into DataSourceOptions, DataSourceUtils, and HoodieSparkSqlWriter

something like?
"hoodie.datasource.hive_sync.support_timestamp"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bschell thank you for pointing this out. We only use standalone mode, so I missed this DataSourceOptions. Fixed now. PTAL.


public static DLASyncConfig copy(DLASyncConfig cfg) {
DLASyncConfig newConfig = new DLASyncConfig();
newConfig.databaseName = cfg.databaseName;
Expand All @@ -81,6 +84,7 @@ public static DLASyncConfig copy(DLASyncConfig cfg) {
newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
newConfig.skipROSuffix = cfg.skipROSuffix;
newConfig.useDLASyncHiveStylePartitioning = cfg.useDLASyncHiveStylePartitioning;
newConfig.supportTimestamp = cfg.supportTimestamp;
return newConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi
} else {
// Check if the table schema has evolved
Map<String, String> tableSchema = hoodieDLAClient.getTableSchema(tableName);
SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields);
SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields, cfg.supportTimestamp);
if (!schemaDiff.isEmpty()) {
LOG.info("Schema difference found for " + tableName);
hoodieDLAClient.updateTableDefinition(tableName, schemaDiff);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@ public void testCopy() {
assertEquals(copied.basePath, dlaSyncConfig.basePath);
assertEquals(copied.jdbcUrl, dlaSyncConfig.jdbcUrl);
assertEquals(copied.skipROSuffix, dlaSyncConfig.skipROSuffix);
assertEquals(copied.supportTimestamp, dlaSyncConfig.supportTimestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;

@Parameter(names = {"--support-timestamp"}, description = "'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type."
+ "Disabled by default for backward compatibility.")
public Boolean supportTimestamp = false;

public static HiveSyncConfig copy(HiveSyncConfig cfg) {
HiveSyncConfig newConfig = new HiveSyncConfig();
newConfig.basePath = cfg.basePath;
Expand All @@ -89,6 +93,7 @@ public static HiveSyncConfig copy(HiveSyncConfig cfg) {
newConfig.jdbcUrl = cfg.jdbcUrl;
newConfig.tableName = cfg.tableName;
newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
newConfig.supportTimestamp = cfg.supportTimestamp;
return newConfig;
}

Expand All @@ -97,7 +102,7 @@ public String toString() {
return "HiveSyncConfig{databaseName='" + databaseName + '\'' + ", tableName='" + tableName + '\''
+ ", hiveUser='" + hiveUser + '\'' + ", hivePass='" + hivePass + '\'' + ", jdbcUrl='" + jdbcUrl + '\''
+ ", basePath='" + basePath + '\'' + ", partitionFields=" + partitionFields + ", partitionValueExtractorClass='"
+ partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning
+ partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning + '\'' + ", supportTimestamp='" + supportTimestamp + '\''
+ ", usePreApacheInputFormat=" + usePreApacheInputFormat + ", useJdbc=" + useJdbc + ", help=" + help + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi
} else {
// Check if the table schema has evolved
Map<String, String> tableSchema = hoodieHiveClient.getTableSchema(tableName);
SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields);
SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields, cfg.supportTimestamp);
if (!schemaDiff.isEmpty()) {
LOG.info("Schema difference found for " + tableName);
hoodieHiveClient.updateTableDefinition(tableName, schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public List<Partition> scanTablePartitions(String tableName) throws TException {

void updateTableDefinition(String tableName, MessageType newSchema) {
try {
String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, syncConfig.partitionFields);
String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, syncConfig.partitionFields, syncConfig.supportTimestamp);
// Cascade clause should not be present for non-partitioned tables
String cascadeClause = syncConfig.partitionFields.size() > 0 ? " cascade" : "";
StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append(HIVE_ESCAPE_CHARACTER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -51,10 +52,15 @@ public class HiveSchemaUtil {
* Get the schema difference between the storage schema and hive table schema.
*/
public static SchemaDifference getSchemaDifference(MessageType storageSchema, Map<String, String> tableSchema,
List<String> partitionKeys) {
List<String> partitionKeys) {
return getSchemaDifference(storageSchema, tableSchema, partitionKeys, false);
}

public static SchemaDifference getSchemaDifference(MessageType storageSchema, Map<String, String> tableSchema,
List<String> partitionKeys, boolean supportTimestamp) {
Map<String, String> newTableSchema;
try {
newTableSchema = convertParquetSchemaToHiveSchema(storageSchema);
newTableSchema = convertParquetSchemaToHiveSchema(storageSchema, supportTimestamp);
} catch (IOException e) {
throw new HoodieHiveSyncException("Failed to convert parquet schema to hive schema", e);
}
Expand Down Expand Up @@ -132,16 +138,16 @@ private static boolean isFieldExistsInSchema(Map<String, String> newTableSchema,
* @param messageType : Parquet Schema
* @return : Hive Table schema read from parquet file MAP[String,String]
*/
public static Map<String, String> convertParquetSchemaToHiveSchema(MessageType messageType) throws IOException {
private static Map<String, String> convertParquetSchemaToHiveSchema(MessageType messageType, boolean supportTimestamp) throws IOException {
Map<String, String> schema = new LinkedHashMap<>();
List<Type> parquetFields = messageType.getFields();
for (Type parquetType : parquetFields) {
StringBuilder result = new StringBuilder();
String key = parquetType.getName();
if (parquetType.isRepetition(Type.Repetition.REPEATED)) {
result.append(createHiveArray(parquetType, ""));
result.append(createHiveArray(parquetType, "", supportTimestamp));
} else {
result.append(convertField(parquetType));
result.append(convertField(parquetType, supportTimestamp));
}

schema.put(hiveCompatibleFieldName(key, false), result.toString());
Expand All @@ -155,7 +161,7 @@ public static Map<String, String> convertParquetSchemaToHiveSchema(MessageType m
* @param parquetType : Single paruet field
* @return : Equivalent sHive schema
*/
private static String convertField(final Type parquetType) {
private static String convertField(final Type parquetType, boolean supportTimestamp) {
StringBuilder field = new StringBuilder();
if (parquetType.isPrimitive()) {
final PrimitiveType.PrimitiveTypeName parquetPrimitiveTypeName =
Expand All @@ -167,7 +173,10 @@ private static String convertField(final Type parquetType) {
.append(decimalMetadata.getScale()).append(")").toString();
} else if (originalType == OriginalType.DATE) {
return field.append("DATE").toString();
} else if (supportTimestamp && originalType == OriginalType.TIMESTAMP_MICROS) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain what happens now if the originalType == OriginalType.TIMESTAMP_MICROS ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@n3nash without this change, we get 'bigint' for hive type (goes into parquetPrimitiveTypeName.convert -> convertINT64 method)

return field.append("TIMESTAMP").toString();
}

// TODO - fix the method naming here
return parquetPrimitiveTypeName.convert(new PrimitiveType.PrimitiveTypeNameConverter<String, RuntimeException>() {
@Override
Expand Down Expand Up @@ -227,7 +236,7 @@ public String convertBINARY(PrimitiveType.PrimitiveTypeName primitiveTypeName) {
if (!elementType.isRepetition(Type.Repetition.REPEATED)) {
throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
}
return createHiveArray(elementType, parquetGroupType.getName());
return createHiveArray(elementType, parquetGroupType.getName(), supportTimestamp);
case MAP:
if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) {
throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
Expand All @@ -245,7 +254,7 @@ public String convertBINARY(PrimitiveType.PrimitiveTypeName primitiveTypeName) {
throw new UnsupportedOperationException("Map key type must be binary (UTF8): " + keyType);
}
Type valueType = mapKeyValType.getType(1);
return createHiveMap(convertField(keyType), convertField(valueType));
return createHiveMap(convertField(keyType, supportTimestamp), convertField(valueType, supportTimestamp));
case ENUM:
case UTF8:
return "string";
Expand All @@ -260,7 +269,7 @@ public String convertBINARY(PrimitiveType.PrimitiveTypeName primitiveTypeName) {
}
} else {
// if no original type then it's a record
return createHiveStruct(parquetGroupType.getFields());
return createHiveStruct(parquetGroupType.getFields(), supportTimestamp);
}
}
}
Expand All @@ -271,14 +280,14 @@ public String convertBINARY(PrimitiveType.PrimitiveTypeName primitiveTypeName) {
* @param parquetFields : list of parquet fields
* @return : Equivalent 'struct' Hive schema
*/
private static String createHiveStruct(List<Type> parquetFields) {
private static String createHiveStruct(List<Type> parquetFields, boolean supportTimestamp) {
StringBuilder struct = new StringBuilder();
struct.append("STRUCT< ");
for (Type field : parquetFields) {
// TODO: struct field name is only translated to support special char($)
// We will need to extend it to other collection type
struct.append(hiveCompatibleFieldName(field.getName(), true)).append(" : ");
struct.append(convertField(field)).append(", ");
struct.append(convertField(field, supportTimestamp)).append(", ");
}
struct.delete(struct.length() - 2, struct.length()); // Remove the last
// ", "
Expand Down Expand Up @@ -327,19 +336,19 @@ private static String createHiveMap(String keyType, String valueType) {
/**
* Create an Array Hive schema from equivalent parquet list type.
*/
private static String createHiveArray(Type elementType, String elementName) {
private static String createHiveArray(Type elementType, String elementName, boolean supportTimestamp) {
StringBuilder array = new StringBuilder();
array.append("ARRAY< ");
if (elementType.isPrimitive()) {
array.append(convertField(elementType));
array.append(convertField(elementType, supportTimestamp));
} else {
final GroupType groupType = elementType.asGroupType();
final List<Type> groupFields = groupType.getFields();
if (groupFields.size() > 1 || (groupFields.size() == 1
&& (elementType.getName().equals("array") || elementType.getName().equals(elementName + "_tuple")))) {
array.append(convertField(elementType));
array.append(convertField(elementType, supportTimestamp));
} else {
array.append(convertField(groupType.getFields().get(0)));
array.append(convertField(groupType.getFields().get(0), supportTimestamp));
}
}
array.append(">");
Expand All @@ -364,11 +373,15 @@ public static boolean isSchemaTypeUpdateAllowed(String prevType, String newType)
}

public static String generateSchemaString(MessageType storageSchema) throws IOException {
return generateSchemaString(storageSchema, new ArrayList<>());
return generateSchemaString(storageSchema, Collections.EMPTY_LIST);
}

public static String generateSchemaString(MessageType storageSchema, List<String> colsToSkip) throws IOException {
Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema);
return generateSchemaString(storageSchema, colsToSkip, false);
}

public static String generateSchemaString(MessageType storageSchema, List<String> colsToSkip, boolean supportTimestamp) throws IOException {
Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, supportTimestamp);
StringBuilder columns = new StringBuilder();
for (Map.Entry<String, String> hiveSchemaEntry : hiveSchema.entrySet()) {
if (!colsToSkip.contains(removeSurroundingTick(hiveSchemaEntry.getKey()))) {
Expand All @@ -382,9 +395,9 @@ public static String generateSchemaString(MessageType storageSchema, List<String
}

public static String generateCreateDDL(String tableName, MessageType storageSchema, HiveSyncConfig config, String inputFormatClass,
String outputFormatClass, String serdeClass) throws IOException {
Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema);
String columns = generateSchemaString(storageSchema, config.partitionFields);
String outputFormatClass, String serdeClass) throws IOException {
Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, config.supportTimestamp);
String columns = generateSchemaString(storageSchema, config.partitionFields, config.supportTimestamp);

List<String> partitionFields = new ArrayList<>();
for (String partitionKey : config.partitionFields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;

Expand Down Expand Up @@ -151,6 +152,39 @@ public void testSchemaConvertArray() throws IOException {
assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString);
}

@Test
public void testSchemaConvertTimestampMicros() throws IOException {
MessageType schema = Types.buildMessage().optional(PrimitiveType.PrimitiveTypeName.INT64)
.as(OriginalType.TIMESTAMP_MICROS).named("my_element").named("my_timestamp");
String schemaString = HiveSchemaUtil.generateSchemaString(schema);
// verify backward compability - int64 converted to bigint type
assertEquals("`my_element` bigint", schemaString);
// verify new functionality - int64 converted to timestamp type when 'supportTimestamp' is enabled
schemaString = HiveSchemaUtil.generateSchemaString(schema, Collections.emptyList(), true);
assertEquals("`my_element` TIMESTAMP", schemaString);
}

@Test
public void testSchemaDiffForTimestampMicros() {
MessageType schema = Types.buildMessage().optional(PrimitiveType.PrimitiveTypeName.INT64)
.as(OriginalType.TIMESTAMP_MICROS).named("my_element").named("my_timestamp");
// verify backward compability - int64 converted to bigint type
SchemaDifference schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
Collections.emptyMap(), Collections.emptyList(), false);
assertEquals("bigint", schemaDifference.getAddColumnTypes().get("`my_element`"));
schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
schemaDifference.getAddColumnTypes(), Collections.emptyList(), false);
assertTrue(schemaDifference.isEmpty());

// verify schema difference is calculated correctly when supportTimestamp is enabled
schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
Collections.emptyMap(), Collections.emptyList(), true);
assertEquals("TIMESTAMP", schemaDifference.getAddColumnTypes().get("`my_element`"));
schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
schemaDifference.getAddColumnTypes(), Collections.emptyList(), true);
assertTrue(schemaDifference.isEmpty());
}

@ParameterizedTest
@MethodSource({"useJdbcAndSchemaFromCommitMetadata"})
public void testBasicSync(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
Expand Down