diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index cbc5b030c30df..46876ca181e4b 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -256,6 +256,8 @@ public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String b DataSourceWriteOptions.DEFAULT_HIVE_USE_JDBC_OPT_VAL())); hiveSyncConfig.skipROSuffix = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX(), DataSourceWriteOptions.DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL())); + hiveSyncConfig.supportTimestamp = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP(), + DataSourceWriteOptions.DEFAULT_HIVE_SUPPORT_TIMESTAMP())); return hiveSyncConfig; } } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 34d5dd2e3e602..6596df3974795 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -291,6 +291,7 @@ object DataSourceWriteOptions { val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY = "hoodie.datasource.hive_sync.use_pre_apache_input_format" val HIVE_USE_JDBC_OPT_KEY = "hoodie.datasource.hive_sync.use_jdbc" val HIVE_SKIP_RO_SUFFIX = "hoodie.datasource.hive_sync.skip_ro_suffix" + val HIVE_SUPPORT_TIMESTAMP = "hoodie.datasource.hive_sync.support_timestamp" // DEFAULT FOR HIVE SPECIFIC CONFIGS val DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL = "false" @@ -307,6 +308,7 @@ object DataSourceWriteOptions { val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false" val DEFAULT_HIVE_USE_JDBC_OPT_VAL = "true" val DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL = "false" + val DEFAULT_HIVE_SUPPORT_TIMESTAMP = "false" // Async Compaction - Enabled by default for MOR val ASYNC_COMPACT_ENABLE_OPT_KEY = "hoodie.datasource.compaction.async.enable" diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 8cab7c16e6d99..0477b88fb8cb4 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -342,6 +342,7 @@ private[hudi] object HoodieSparkSqlWriter { ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList: _*) hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY) hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean + hiveSyncConfig.supportTimestamp = parameters.get(HIVE_SUPPORT_TIMESTAMP).exists(r => r.toBoolean) hiveSyncConfig } diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java index 988e9cde4cbd7..b04cc0a3241cd 100644 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java +++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java @@ -68,6 +68,9 @@ public class DLASyncConfig implements Serializable { @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; + @Parameter(names = {"--support-timestamp"}, description = "If true, converts int64(timestamp_micros) to timestamp type") + public Boolean supportTimestamp = false; + public static DLASyncConfig copy(DLASyncConfig cfg) { DLASyncConfig newConfig = new DLASyncConfig(); newConfig.databaseName = cfg.databaseName; @@ -81,6 +84,7 @@ public static DLASyncConfig copy(DLASyncConfig cfg) { newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning; newConfig.skipROSuffix = cfg.skipROSuffix; newConfig.useDLASyncHiveStylePartitioning = cfg.useDLASyncHiveStylePartitioning; + newConfig.supportTimestamp = cfg.supportTimestamp; return newConfig; } diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java index 1ece9548785f7..347bb62135e9f 100644 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java +++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java @@ -159,7 +159,7 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi } else { // Check if the table schema has evolved Map tableSchema = hoodieDLAClient.getTableSchema(tableName); - SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields); + SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields, cfg.supportTimestamp); if (!schemaDiff.isEmpty()) { LOG.info("Schema difference found for " + tableName); hoodieDLAClient.updateTableDefinition(tableName, schemaDiff); diff --git a/hudi-sync/hudi-dla-sync/src/test/java/org/apache/hudi/dla/TestDLASyncConfig.java b/hudi-sync/hudi-dla-sync/src/test/java/org/apache/hudi/dla/TestDLASyncConfig.java index 5f24f8b934ff0..366d5a24efb06 100644 --- a/hudi-sync/hudi-dla-sync/src/test/java/org/apache/hudi/dla/TestDLASyncConfig.java +++ b/hudi-sync/hudi-dla-sync/src/test/java/org/apache/hudi/dla/TestDLASyncConfig.java @@ -50,5 +50,6 @@ public void testCopy() { assertEquals(copied.basePath, dlaSyncConfig.basePath); assertEquals(copied.jdbcUrl, dlaSyncConfig.jdbcUrl); assertEquals(copied.skipROSuffix, dlaSyncConfig.skipROSuffix); + assertEquals(copied.supportTimestamp, dlaSyncConfig.supportTimestamp); } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index 00a35aae6f195..bb633fdc2473e 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -77,6 +77,10 @@ public class HiveSyncConfig implements Serializable { @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; + @Parameter(names = {"--support-timestamp"}, description = "'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type." + + "Disabled by default for backward compatibility.") + public Boolean supportTimestamp = false; + public static HiveSyncConfig copy(HiveSyncConfig cfg) { HiveSyncConfig newConfig = new HiveSyncConfig(); newConfig.basePath = cfg.basePath; @@ -89,6 +93,7 @@ public static HiveSyncConfig copy(HiveSyncConfig cfg) { newConfig.jdbcUrl = cfg.jdbcUrl; newConfig.tableName = cfg.tableName; newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat; + newConfig.supportTimestamp = cfg.supportTimestamp; return newConfig; } @@ -97,7 +102,7 @@ public String toString() { return "HiveSyncConfig{databaseName='" + databaseName + '\'' + ", tableName='" + tableName + '\'' + ", hiveUser='" + hiveUser + '\'' + ", hivePass='" + hivePass + '\'' + ", jdbcUrl='" + jdbcUrl + '\'' + ", basePath='" + basePath + '\'' + ", partitionFields=" + partitionFields + ", partitionValueExtractorClass='" - + partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning + + partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning + '\'' + ", supportTimestamp='" + supportTimestamp + '\'' + ", usePreApacheInputFormat=" + usePreApacheInputFormat + ", useJdbc=" + useJdbc + ", help=" + help + '}'; } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index a3b524feac922..3beaeff1ffd43 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -176,7 +176,7 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi } else { // Check if the table schema has evolved Map tableSchema = hoodieHiveClient.getTableSchema(tableName); - SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields); + SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields, cfg.supportTimestamp); if (!schemaDiff.isEmpty()) { LOG.info("Schema difference found for " + tableName); hoodieHiveClient.updateTableDefinition(tableName, schema); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 5a4b72a1c69a7..4f6cbfe3acee3 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -238,7 +238,7 @@ public List scanTablePartitions(String tableName) throws TException { void updateTableDefinition(String tableName, MessageType newSchema) { try { - String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, syncConfig.partitionFields); + String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, syncConfig.partitionFields, syncConfig.supportTimestamp); // Cascade clause should not be present for non-partitioned tables String cascadeClause = syncConfig.partitionFields.size() > 0 ? " cascade" : ""; StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append(HIVE_ESCAPE_CHARACTER) diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java index 7fd64bd8b2fea..6a209bef7c3ec 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -51,10 +52,15 @@ public class HiveSchemaUtil { * Get the schema difference between the storage schema and hive table schema. */ public static SchemaDifference getSchemaDifference(MessageType storageSchema, Map tableSchema, - List partitionKeys) { + List partitionKeys) { + return getSchemaDifference(storageSchema, tableSchema, partitionKeys, false); + } + + public static SchemaDifference getSchemaDifference(MessageType storageSchema, Map tableSchema, + List partitionKeys, boolean supportTimestamp) { Map newTableSchema; try { - newTableSchema = convertParquetSchemaToHiveSchema(storageSchema); + newTableSchema = convertParquetSchemaToHiveSchema(storageSchema, supportTimestamp); } catch (IOException e) { throw new HoodieHiveSyncException("Failed to convert parquet schema to hive schema", e); } @@ -132,16 +138,16 @@ private static boolean isFieldExistsInSchema(Map newTableSchema, * @param messageType : Parquet Schema * @return : Hive Table schema read from parquet file MAP[String,String] */ - public static Map convertParquetSchemaToHiveSchema(MessageType messageType) throws IOException { + private static Map convertParquetSchemaToHiveSchema(MessageType messageType, boolean supportTimestamp) throws IOException { Map schema = new LinkedHashMap<>(); List parquetFields = messageType.getFields(); for (Type parquetType : parquetFields) { StringBuilder result = new StringBuilder(); String key = parquetType.getName(); if (parquetType.isRepetition(Type.Repetition.REPEATED)) { - result.append(createHiveArray(parquetType, "")); + result.append(createHiveArray(parquetType, "", supportTimestamp)); } else { - result.append(convertField(parquetType)); + result.append(convertField(parquetType, supportTimestamp)); } schema.put(hiveCompatibleFieldName(key, false), result.toString()); @@ -155,7 +161,7 @@ public static Map convertParquetSchemaToHiveSchema(MessageType m * @param parquetType : Single paruet field * @return : Equivalent sHive schema */ - private static String convertField(final Type parquetType) { + private static String convertField(final Type parquetType, boolean supportTimestamp) { StringBuilder field = new StringBuilder(); if (parquetType.isPrimitive()) { final PrimitiveType.PrimitiveTypeName parquetPrimitiveTypeName = @@ -167,7 +173,10 @@ private static String convertField(final Type parquetType) { .append(decimalMetadata.getScale()).append(")").toString(); } else if (originalType == OriginalType.DATE) { return field.append("DATE").toString(); + } else if (supportTimestamp && originalType == OriginalType.TIMESTAMP_MICROS) { + return field.append("TIMESTAMP").toString(); } + // TODO - fix the method naming here return parquetPrimitiveTypeName.convert(new PrimitiveType.PrimitiveTypeNameConverter() { @Override @@ -227,7 +236,7 @@ public String convertBINARY(PrimitiveType.PrimitiveTypeName primitiveTypeName) { if (!elementType.isRepetition(Type.Repetition.REPEATED)) { throw new UnsupportedOperationException("Invalid list type " + parquetGroupType); } - return createHiveArray(elementType, parquetGroupType.getName()); + return createHiveArray(elementType, parquetGroupType.getName(), supportTimestamp); case MAP: if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) { throw new UnsupportedOperationException("Invalid map type " + parquetGroupType); @@ -245,7 +254,7 @@ public String convertBINARY(PrimitiveType.PrimitiveTypeName primitiveTypeName) { throw new UnsupportedOperationException("Map key type must be binary (UTF8): " + keyType); } Type valueType = mapKeyValType.getType(1); - return createHiveMap(convertField(keyType), convertField(valueType)); + return createHiveMap(convertField(keyType, supportTimestamp), convertField(valueType, supportTimestamp)); case ENUM: case UTF8: return "string"; @@ -260,7 +269,7 @@ public String convertBINARY(PrimitiveType.PrimitiveTypeName primitiveTypeName) { } } else { // if no original type then it's a record - return createHiveStruct(parquetGroupType.getFields()); + return createHiveStruct(parquetGroupType.getFields(), supportTimestamp); } } } @@ -271,14 +280,14 @@ public String convertBINARY(PrimitiveType.PrimitiveTypeName primitiveTypeName) { * @param parquetFields : list of parquet fields * @return : Equivalent 'struct' Hive schema */ - private static String createHiveStruct(List parquetFields) { + private static String createHiveStruct(List parquetFields, boolean supportTimestamp) { StringBuilder struct = new StringBuilder(); struct.append("STRUCT< "); for (Type field : parquetFields) { // TODO: struct field name is only translated to support special char($) // We will need to extend it to other collection type struct.append(hiveCompatibleFieldName(field.getName(), true)).append(" : "); - struct.append(convertField(field)).append(", "); + struct.append(convertField(field, supportTimestamp)).append(", "); } struct.delete(struct.length() - 2, struct.length()); // Remove the last // ", " @@ -327,19 +336,19 @@ private static String createHiveMap(String keyType, String valueType) { /** * Create an Array Hive schema from equivalent parquet list type. */ - private static String createHiveArray(Type elementType, String elementName) { + private static String createHiveArray(Type elementType, String elementName, boolean supportTimestamp) { StringBuilder array = new StringBuilder(); array.append("ARRAY< "); if (elementType.isPrimitive()) { - array.append(convertField(elementType)); + array.append(convertField(elementType, supportTimestamp)); } else { final GroupType groupType = elementType.asGroupType(); final List groupFields = groupType.getFields(); if (groupFields.size() > 1 || (groupFields.size() == 1 && (elementType.getName().equals("array") || elementType.getName().equals(elementName + "_tuple")))) { - array.append(convertField(elementType)); + array.append(convertField(elementType, supportTimestamp)); } else { - array.append(convertField(groupType.getFields().get(0))); + array.append(convertField(groupType.getFields().get(0), supportTimestamp)); } } array.append(">"); @@ -364,11 +373,15 @@ public static boolean isSchemaTypeUpdateAllowed(String prevType, String newType) } public static String generateSchemaString(MessageType storageSchema) throws IOException { - return generateSchemaString(storageSchema, new ArrayList<>()); + return generateSchemaString(storageSchema, Collections.EMPTY_LIST); } public static String generateSchemaString(MessageType storageSchema, List colsToSkip) throws IOException { - Map hiveSchema = convertParquetSchemaToHiveSchema(storageSchema); + return generateSchemaString(storageSchema, colsToSkip, false); + } + + public static String generateSchemaString(MessageType storageSchema, List colsToSkip, boolean supportTimestamp) throws IOException { + Map hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, supportTimestamp); StringBuilder columns = new StringBuilder(); for (Map.Entry hiveSchemaEntry : hiveSchema.entrySet()) { if (!colsToSkip.contains(removeSurroundingTick(hiveSchemaEntry.getKey()))) { @@ -382,9 +395,9 @@ public static String generateSchemaString(MessageType storageSchema, List hiveSchema = convertParquetSchemaToHiveSchema(storageSchema); - String columns = generateSchemaString(storageSchema, config.partitionFields); + String outputFormatClass, String serdeClass) throws IOException { + Map hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, config.supportTimestamp); + String columns = generateSchemaString(storageSchema, config.partitionFields, config.supportTimestamp); List partitionFields = new ArrayList<>(); for (String partitionKey : config.partitionFields) { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 17bc2155c31c6..1d8cbd85347fd 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Stream; @@ -151,6 +152,39 @@ public void testSchemaConvertArray() throws IOException { assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString); } + @Test + public void testSchemaConvertTimestampMicros() throws IOException { + MessageType schema = Types.buildMessage().optional(PrimitiveType.PrimitiveTypeName.INT64) + .as(OriginalType.TIMESTAMP_MICROS).named("my_element").named("my_timestamp"); + String schemaString = HiveSchemaUtil.generateSchemaString(schema); + // verify backward compability - int64 converted to bigint type + assertEquals("`my_element` bigint", schemaString); + // verify new functionality - int64 converted to timestamp type when 'supportTimestamp' is enabled + schemaString = HiveSchemaUtil.generateSchemaString(schema, Collections.emptyList(), true); + assertEquals("`my_element` TIMESTAMP", schemaString); + } + + @Test + public void testSchemaDiffForTimestampMicros() { + MessageType schema = Types.buildMessage().optional(PrimitiveType.PrimitiveTypeName.INT64) + .as(OriginalType.TIMESTAMP_MICROS).named("my_element").named("my_timestamp"); + // verify backward compability - int64 converted to bigint type + SchemaDifference schemaDifference = HiveSchemaUtil.getSchemaDifference(schema, + Collections.emptyMap(), Collections.emptyList(), false); + assertEquals("bigint", schemaDifference.getAddColumnTypes().get("`my_element`")); + schemaDifference = HiveSchemaUtil.getSchemaDifference(schema, + schemaDifference.getAddColumnTypes(), Collections.emptyList(), false); + assertTrue(schemaDifference.isEmpty()); + + // verify schema difference is calculated correctly when supportTimestamp is enabled + schemaDifference = HiveSchemaUtil.getSchemaDifference(schema, + Collections.emptyMap(), Collections.emptyList(), true); + assertEquals("TIMESTAMP", schemaDifference.getAddColumnTypes().get("`my_element`")); + schemaDifference = HiveSchemaUtil.getSchemaDifference(schema, + schemaDifference.getAddColumnTypes(), Collections.emptyList(), true); + assertTrue(schemaDifference.isEmpty()); + } + @ParameterizedTest @MethodSource({"useJdbcAndSchemaFromCommitMetadata"}) public void testBasicSync(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {