diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java index 69ccc49528fd6..92b07640a36a1 100644 --- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java +++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java @@ -54,6 +54,7 @@ import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS; +import static org.apache.hudi.sync.common.util.TableUtils.tableId; public class HoodieAdbJdbcClient extends HoodieSyncClient { @@ -109,11 +110,11 @@ public void createTable(String tableName, MessageType storageSchema, String inpu Map serdeProperties, Map tableProperties) { try { LOG.info("Creating table:{}", tableName); - String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, + String createSQLQuery = HiveSchemaUtil.generateCreateTableDDL(databaseName, tableName, storageSchema, config, inputFormatClass, outputFormatClass, serdeClass, serdeProperties, tableProperties); executeAdbSql(createSQLQuery); } catch (IOException e) { - throw new HoodieException("Fail to create table:" + tableName, e); + throw new HoodieException("Fail to create table:" + tableId(databaseName, tableName), e); } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index dcbec94dbddeb..7ba3d178ca514 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -32,6 +32,7 @@ import javax.annotation.concurrent.Immutable; +import java.util.List; import java.util.Properties; /** @@ -204,4 +205,24 @@ public TypedProperties toProps() { public void validateParameters() { ValidationUtils.checkArgument(getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM) > 0, "batch-sync-num for sync hive table must be greater than 0, pls check your parameter"); } + + public boolean getIsHiveSupportTimestampType() { + return getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE); + } + + public boolean getHiveCreateManagedTable() { + return getBoolean(HIVE_CREATE_MANAGED_TABLE); + } + + public boolean getHiveCreateExternalTable() { + return !getHiveCreateManagedTable(); + } + + public List getMetaSyncPartitionFields() { + return getSplitStrings(META_SYNC_PARTITION_FIELDS); + } + + public String getBuckets() { + return getString(HIVE_SYNC_BUCKET_SYNC_SPEC); + } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java index 1c4dcec592e73..cb414ad0b304b 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java @@ -46,6 +46,7 @@ import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DECODE_PARTITION; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS; +import static org.apache.hudi.sync.common.util.TableUtils.tableId; /** * This class adds functionality for all query based DDLExecutors. The classes extending it only have to provide runSQL(sql) functions. @@ -76,25 +77,37 @@ public QueryBasedDDLExecutor(HiveSyncConfig config) { */ public abstract void runSQL(String sql); + /** + * Create a database with the given name. + */ @Override public void createDatabase(String databaseName) { - runSQL("create database if not exists " + databaseName); + String createSQLQuery = HiveSchemaUtil.generateCreateDataBaseDDL(databaseName); + LOG.info("Creating database with {}.", createSQLQuery); + runSQL(createSQLQuery); } + /** + * Create a table with the given params. + */ @Override - public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass, Map serdeProperties, - Map tableProperties) { + public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, + String outputFormatClass, String serdeClass, Map serdeProperties, + Map tableProperties) { try { String createSQLQuery = - HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, config, inputFormatClass, + HiveSchemaUtil.generateCreateTableDDL(databaseName, tableName, storageSchema, config, inputFormatClass, outputFormatClass, serdeClass, serdeProperties, tableProperties); - LOG.info("Creating table with " + createSQLQuery); + LOG.info("Creating table with {}.", createSQLQuery); runSQL(createSQLQuery); } catch (IOException e) { - throw new HoodieHiveSyncException("Failed to create table " + tableName, e); + throw new HoodieHiveSyncException("Failed to create table " + tableId(databaseName, tableName), e); } } + /** + * Create a table with the given params. + */ @Override public void updateTableDefinition(String tableName, MessageType newSchema) { try { @@ -113,6 +126,9 @@ public void updateTableDefinition(String tableName, MessageType newSchema) { } } + /** + * Create a table with the given params. + */ @Override public void addPartitionsToTable(String tableName, List partitionsToAdd) { if (partitionsToAdd.isEmpty()) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java index 17d3a5fa4952c..326fe43fd7052 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java @@ -18,6 +18,7 @@ package org.apache.hudi.hive.util; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.SchemaDifference; @@ -39,14 +40,15 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.Set; import java.util.stream.Collectors; -import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC; -import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS; +import static org.apache.hudi.sync.common.util.TableUtils.tableId; /** * Schema Utilities. @@ -446,58 +448,76 @@ public static String generateSchemaString(MessageType storageSchema, List colsToSkip, boolean supportTimestamp) throws IOException { + /** + * Generates the Column DDL string for creating a Hive table from the given schema. + */ + public static String generateSchemaString(MessageType storageSchema, List colsToSkip, + boolean supportTimestamp) throws IOException { Map hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, supportTimestamp); - StringBuilder columns = new StringBuilder(); + List columnDescs = new ArrayList<>(); for (Map.Entry hiveSchemaEntry : hiveSchema.entrySet()) { if (!colsToSkip.contains(removeSurroundingTick(hiveSchemaEntry.getKey()))) { - columns.append(hiveSchemaEntry.getKey()).append(" "); - columns.append(hiveSchemaEntry.getValue()).append(", "); + String columnName = hiveSchemaEntry.getKey(); + String columnType = hiveSchemaEntry.getValue(); + String columnDesc = columnName + " " + columnType; + columnDescs.add(columnDesc); } } - // Remove the last ", " - columns.delete(columns.length() - 2, columns.length()); - return columns.toString(); + return StringUtils.join(columnDescs, ", \n"); } - public static String generateCreateDDL(String tableName, MessageType storageSchema, HiveSyncConfig config, String inputFormatClass, - String outputFormatClass, String serdeClass, Map serdeProperties, - Map tableProperties) throws IOException { - Map hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE)); - String columns = generateSchemaString(storageSchema, config.getSplitStrings(META_SYNC_PARTITION_FIELDS), config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE)); + /** + * Generate CreateDataBase SQL. + * + * @param databaseName DataBaseName. + * @return createDatabase SQL. + */ + public static String generateCreateDataBaseDDL(String databaseName) { + return "CREATE DATABASE IF NOT EXISTS " + tickSurround(databaseName); + } - List partitionFields = new ArrayList<>(); - for (String partitionKey : config.getSplitStrings(META_SYNC_PARTITION_FIELDS)) { - String partitionKeyWithTicks = tickSurround(partitionKey); - partitionFields.add(new StringBuilder().append(partitionKeyWithTicks).append(" ") - .append(getPartitionKeyType(hiveSchema, partitionKeyWithTicks)).toString()); - } + /** + * Generate CreateTable SQL. + */ + public static String generateCreateTableDDL(String pDataBaseName, String pTableName, MessageType storageSchema, + HiveSyncConfig config, String inputFormatClass, + String outputFormatClass, String serdeClass, Map serdeProperties, + Map tableProperties) throws IOException { - String partitionsStr = String.join(",", partitionFields); StringBuilder sb = new StringBuilder(); - if (config.getBoolean(HIVE_CREATE_MANAGED_TABLE)) { - sb.append("CREATE TABLE IF NOT EXISTS "); - } else { - sb.append("CREATE EXTERNAL TABLE IF NOT EXISTS "); - } - sb.append(HIVE_ESCAPE_CHARACTER).append(config.getStringOrDefault(META_SYNC_DATABASE_NAME)).append(HIVE_ESCAPE_CHARACTER) - .append(".").append(HIVE_ESCAPE_CHARACTER).append(tableName).append(HIVE_ESCAPE_CHARACTER); - sb.append("( ").append(columns).append(")"); + String dataBaseName = tickSurround(pDataBaseName); + String tableName = tickSurround(pTableName); + + // Append DBName and TableName + sb.append(" CREATE "); + sb.append(getExternal(config)); + sb.append(" TABLE IF NOT EXISTS "); + sb.append(tableId(dataBaseName, tableName)); + + // Append Columns + sb.append("(").append(getColumns(storageSchema, config)).append(")"); + + // Append Partitions if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) { - sb.append(" PARTITIONED BY (").append(partitionsStr).append(")"); + sb.append(getPartitions(storageSchema, config)); } + + // Append Buckets if (config.getString(HIVE_SYNC_BUCKET_SYNC_SPEC) != null) { - sb.append(' ' + config.getString(HIVE_SYNC_BUCKET_SYNC_SPEC) + ' '); - } - sb.append(" ROW FORMAT SERDE '").append(serdeClass).append("'"); - if (serdeProperties != null && !serdeProperties.isEmpty()) { - sb.append(" WITH SERDEPROPERTIES (").append(propertyToString(serdeProperties)).append(")"); + sb.append("\n ").append(config.getBuckets()); } - sb.append(" STORED AS INPUTFORMAT '").append(inputFormatClass).append("'"); - sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("' LOCATION '").append(config.getAbsoluteBasePath()).append("'"); - if (tableProperties != null && !tableProperties.isEmpty()) { - sb.append(" TBLPROPERTIES(").append(propertyToString(tableProperties)).append(")"); + // Append serdeClass, inputFormatClass, outputFormatClass + sb.append(getRowFormat(serdeClass, serdeProperties, inputFormatClass, outputFormatClass)); + + // Append location + sb.append(getLocationBlock(config.getAbsoluteBasePath())); + + // Append TBLPROPERTIES + if (!tableProperties.isEmpty()) { + sb.append("\n TBLPROPERTIES ("); + sb.append(propertyToString(tableProperties)); + sb.append(")"); } return sb.toString(); } @@ -527,4 +547,76 @@ public static String getPartitionKeyType(Map hiveSchema, String // Dont do that return STRING_TYPE_NAME; } + + /** + * If the table is a EXTERNAL table, the EXTERNAL keyword will be added. + * If it is not a EXTERNAL table, an empty string will be returned. + */ + private static String getExternal(HiveSyncConfig config) { + return config.getHiveCreateExternalTable() ? "EXTERNAL " : ""; + } + + /** + * Get the RowFormat information of the table. + */ + private static String getRowFormat(String serdeClass, + Map serdeParams, String inputFormatClass, String outputFormatClass) { + StringBuilder rowFormat = new StringBuilder(); + rowFormat.append("\n ROW FORMAT SERDE \n") + .append(" '" + serdeClass + "' \n"); + if (!serdeParams.isEmpty()) { + getSerdeParams(rowFormat, serdeParams); + rowFormat.append(" \n"); + } + rowFormat.append("STORED AS INPUTFORMAT \n '" + inputFormatClass + "' \n") + .append("OUTPUTFORMAT \n '" + outputFormatClass + "'"); + return rowFormat.toString(); + } + + /** + * Get parameters associated with the SerDe. + */ + public static void getSerdeParams(StringBuilder builder, Map serdeParams) { + SortedMap sortedSerdeParams = new TreeMap<>(serdeParams); + List serdeCols = new ArrayList<>(); + for (Map.Entry entry : sortedSerdeParams.entrySet()) { + serdeCols.add(" '" + entry.getKey() + "'='" + entry.getValue() + "'"); + } + builder.append("WITH SERDEPROPERTIES ( \n") + .append(StringUtils.join(serdeCols, ", \n")) + .append(')'); + } + + /** + * Get the partition Information for creating a table. + */ + private static String getPartitions(MessageType storageSchema, HiveSyncConfig config) + throws IOException { + Map hiveSchema = + convertParquetSchemaToHiveSchema(storageSchema, config.getIsHiveSupportTimestampType()); + List partitionFields = new ArrayList<>(); + for (String partitionKey : config.getSplitStrings(META_SYNC_PARTITION_FIELDS)) { + String partitionKeyWithTicks = tickSurround(partitionKey); + partitionFields.add(new StringBuilder().append(partitionKeyWithTicks).append(" ") + .append(getPartitionKeyType(hiveSchema, partitionKeyWithTicks)).toString()); + } + String partitionsStr = String.join(",", partitionFields); + return "\n PARTITIONED BY ( \n" + partitionsStr + ")"; + } + + /** + * Get the column Information for creating a table. + */ + private static String getColumns(MessageType storageSchema, HiveSyncConfig config) + throws IOException { + return generateSchemaString(storageSchema, + config.getMetaSyncPartitionFields(), config.getIsHiveSupportTimestampType()); + } + + /** + * Get the location Information for creating a table. + */ + private static String getLocationBlock(String location) { + return "\n LOCATION\n " + "'" + location + "'"; + } } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveSchemaUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveSchemaUtil.java index 9c8ffc106db81..ee40f1910c248 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveSchemaUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveSchemaUtil.java @@ -145,4 +145,11 @@ public void testSchemaDiffForTimestampMicros() { schemaDifference.getAddColumnTypes(), Collections.emptyList(), true); assertTrue(schemaDifference.isEmpty()); } + + @Test + public void testGenerateCreateDataBaseDDL() { + String expectedCreateDataBaseSQL = "CREATE DATABASE IF NOT EXISTS `test_database`"; + String testDataBase = HiveSchemaUtil.generateCreateDataBaseDDL("test_database"); + assertEquals(expectedCreateDataBaseSQL, testDataBase); + } }