Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.util.TableUtils.tableId;

public class HoodieAdbJdbcClient extends HoodieSyncClient {

Expand Down Expand Up @@ -109,11 +110,11 @@ public void createTable(String tableName, MessageType storageSchema, String inpu
Map<String, String> serdeProperties, Map<String, String> tableProperties) {
try {
LOG.info("Creating table:{}", tableName);
String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema,
String createSQLQuery = HiveSchemaUtil.generateCreateTableDDL(databaseName, tableName, storageSchema,
config, inputFormatClass, outputFormatClass, serdeClass, serdeProperties, tableProperties);
executeAdbSql(createSQLQuery);
} catch (IOException e) {
throw new HoodieException("Fail to create table:" + tableName, e);
throw new HoodieException("Fail to create table:" + tableId(databaseName, tableName), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import javax.annotation.concurrent.Immutable;

import java.util.List;
import java.util.Properties;

/**
Expand Down Expand Up @@ -204,4 +205,24 @@ public TypedProperties toProps() {
public void validateParameters() {
ValidationUtils.checkArgument(getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM) > 0, "batch-sync-num for sync hive table must be greater than 0, pls check your parameter");
}

public boolean getIsHiveSupportTimestampType() {
return getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE);
}

public boolean getHiveCreateManagedTable() {
return getBoolean(HIVE_CREATE_MANAGED_TABLE);
}

public boolean getHiveCreateExternalTable() {
return !getHiveCreateManagedTable();
}

public List<String> getMetaSyncPartitionFields() {
return getSplitStrings(META_SYNC_PARTITION_FIELDS);
}

public String getBuckets() {
return getString(HIVE_SYNC_BUCKET_SYNC_SPEC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DECODE_PARTITION;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.util.TableUtils.tableId;

/**
* This class adds functionality for all query based DDLExecutors. The classes extending it only have to provide runSQL(sql) functions.
Expand Down Expand Up @@ -76,25 +77,37 @@ public QueryBasedDDLExecutor(HiveSyncConfig config) {
*/
public abstract void runSQL(String sql);

/**
* Create a database with the given name.
*/
@Override
public void createDatabase(String databaseName) {
runSQL("create database if not exists " + databaseName);
String createSQLQuery = HiveSchemaUtil.generateCreateDataBaseDDL(databaseName);
LOG.info("Creating database with {}.", createSQLQuery);
runSQL(createSQLQuery);
}

/**
* Create a table with the given params.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just remove the patam docs if there is no much to address:

  /**
   * Create a table with the given params.
   */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your suggestion! I will simplify the comment information.

@Override
public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass, Map<String, String> serdeProperties,
Map<String, String> tableProperties) {
public void createTable(String tableName, MessageType storageSchema, String inputFormatClass,
String outputFormatClass, String serdeClass, Map<String, String> serdeProperties,
Map<String, String> tableProperties) {
try {
String createSQLQuery =
HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, config, inputFormatClass,
HiveSchemaUtil.generateCreateTableDDL(databaseName, tableName, storageSchema, config, inputFormatClass,
outputFormatClass, serdeClass, serdeProperties, tableProperties);
LOG.info("Creating table with " + createSQLQuery);
LOG.info("Creating table with {}.", createSQLQuery);
runSQL(createSQLQuery);
} catch (IOException e) {
throw new HoodieHiveSyncException("Failed to create table " + tableName, e);
throw new HoodieHiveSyncException("Failed to create table " + tableId(databaseName, tableName), e);
}
}

/**
* Create a table with the given params.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the doc correct?

@Override
public void updateTableDefinition(String tableName, MessageType newSchema) {
try {
Expand All @@ -113,6 +126,9 @@ public void updateTableDefinition(String tableName, MessageType newSchema) {
}
}

/**
* Create a table with the given params.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@Override
public void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
if (partitionsToAdd.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.hive.util;

import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
import org.apache.hudi.hive.SchemaDifference;
Expand All @@ -39,14 +40,15 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.util.TableUtils.tableId;

/**
* Schema Utilities.
Expand Down Expand Up @@ -446,58 +448,76 @@ public static String generateSchemaString(MessageType storageSchema, List<String
return generateSchemaString(storageSchema, colsToSkip, false);
}

public static String generateSchemaString(MessageType storageSchema, List<String> colsToSkip, boolean supportTimestamp) throws IOException {
/**
* Generates the Column DDL string for creating a Hive table from the given schema.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love the original document that we add a SQL statement demo for the method, can we keep that?

*/
public static String generateSchemaString(MessageType storageSchema, List<String> colsToSkip,
boolean supportTimestamp) throws IOException {
Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, supportTimestamp);
StringBuilder columns = new StringBuilder();
List<String> columnDescs = new ArrayList<>();
for (Map.Entry<String, String> hiveSchemaEntry : hiveSchema.entrySet()) {
if (!colsToSkip.contains(removeSurroundingTick(hiveSchemaEntry.getKey()))) {
columns.append(hiveSchemaEntry.getKey()).append(" ");
columns.append(hiveSchemaEntry.getValue()).append(", ");
String columnName = hiveSchemaEntry.getKey();
String columnType = hiveSchemaEntry.getValue();
String columnDesc = columnName + " " + columnType;
columnDescs.add(columnDesc);
}
}
// Remove the last ", "
columns.delete(columns.length() - 2, columns.length());
return columns.toString();
return StringUtils.join(columnDescs, ", \n");
}

public static String generateCreateDDL(String tableName, MessageType storageSchema, HiveSyncConfig config, String inputFormatClass,
String outputFormatClass, String serdeClass, Map<String, String> serdeProperties,
Map<String, String> tableProperties) throws IOException {
Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE));
String columns = generateSchemaString(storageSchema, config.getSplitStrings(META_SYNC_PARTITION_FIELDS), config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE));
/**
* Generate CreateDataBase SQL.
*
* @param databaseName DataBaseName.
* @return createDatabase SQL.
*/
public static String generateCreateDataBaseDDL(String databaseName) {
return "CREATE DATABASE IF NOT EXISTS " + tickSurround(databaseName);
}

List<String> partitionFields = new ArrayList<>();
for (String partitionKey : config.getSplitStrings(META_SYNC_PARTITION_FIELDS)) {
String partitionKeyWithTicks = tickSurround(partitionKey);
partitionFields.add(new StringBuilder().append(partitionKeyWithTicks).append(" ")
.append(getPartitionKeyType(hiveSchema, partitionKeyWithTicks)).toString());
}
/**
* Generate CreateTable SQL.
*/
public static String generateCreateTableDDL(String pDataBaseName, String pTableName, MessageType storageSchema,
HiveSyncConfig config, String inputFormatClass,
String outputFormatClass, String serdeClass, Map<String, String> serdeProperties,
Map<String, String> tableProperties) throws IOException {

String partitionsStr = String.join(",", partitionFields);
StringBuilder sb = new StringBuilder();
if (config.getBoolean(HIVE_CREATE_MANAGED_TABLE)) {
sb.append("CREATE TABLE IF NOT EXISTS ");
} else {
sb.append("CREATE EXTERNAL TABLE IF NOT EXISTS ");
}
sb.append(HIVE_ESCAPE_CHARACTER).append(config.getStringOrDefault(META_SYNC_DATABASE_NAME)).append(HIVE_ESCAPE_CHARACTER)
.append(".").append(HIVE_ESCAPE_CHARACTER).append(tableName).append(HIVE_ESCAPE_CHARACTER);
sb.append("( ").append(columns).append(")");
String dataBaseName = tickSurround(pDataBaseName);
String tableName = tickSurround(pTableName);

// Append DBName and TableName
sb.append(" CREATE ");
sb.append(getExternal(config));
sb.append(" TABLE IF NOT EXISTS ");
sb.append(tableId(dataBaseName, tableName));

// Append Columns
sb.append("(").append(getColumns(storageSchema, config)).append(")");

// Append Partitions
if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) {
sb.append(" PARTITIONED BY (").append(partitionsStr).append(")");
sb.append(getPartitions(storageSchema, config));
}

// Append Buckets
if (config.getString(HIVE_SYNC_BUCKET_SYNC_SPEC) != null) {
sb.append(' ' + config.getString(HIVE_SYNC_BUCKET_SYNC_SPEC) + ' ');
}
sb.append(" ROW FORMAT SERDE '").append(serdeClass).append("'");
if (serdeProperties != null && !serdeProperties.isEmpty()) {
sb.append(" WITH SERDEPROPERTIES (").append(propertyToString(serdeProperties)).append(")");
sb.append("\n ").append(config.getBuckets());
}
sb.append(" STORED AS INPUTFORMAT '").append(inputFormatClass).append("'");
sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("' LOCATION '").append(config.getAbsoluteBasePath()).append("'");

if (tableProperties != null && !tableProperties.isEmpty()) {
sb.append(" TBLPROPERTIES(").append(propertyToString(tableProperties)).append(")");
// Append serdeClass, inputFormatClass, outputFormatClass
sb.append(getRowFormat(serdeClass, serdeProperties, inputFormatClass, outputFormatClass));

// Append location
sb.append(getLocationBlock(config.getAbsoluteBasePath()));

// Append TBLPROPERTIES
if (!tableProperties.isEmpty()) {
sb.append("\n TBLPROPERTIES (");
sb.append(propertyToString(tableProperties));
sb.append(")");
}
return sb.toString();
}
Expand Down Expand Up @@ -527,4 +547,76 @@ public static String getPartitionKeyType(Map<String, String> hiveSchema, String
// Dont do that
return STRING_TYPE_NAME;
}

/**
* If the table is a EXTERNAL table, the EXTERNAL keyword will be added.
* If it is not a EXTERNAL table, an empty string will be returned.
*/
private static String getExternal(HiveSyncConfig config) {
return config.getHiveCreateExternalTable() ? "EXTERNAL " : "";
}

/**
* Get the RowFormat information of the table.
*/
private static String getRowFormat(String serdeClass,
Map<String, String> serdeParams, String inputFormatClass, String outputFormatClass) {
StringBuilder rowFormat = new StringBuilder();
rowFormat.append("\n ROW FORMAT SERDE \n")
.append(" '" + serdeClass + "' \n");
if (!serdeParams.isEmpty()) {
getSerdeParams(rowFormat, serdeParams);
rowFormat.append(" \n");
}
rowFormat.append("STORED AS INPUTFORMAT \n '" + inputFormatClass + "' \n")
.append("OUTPUTFORMAT \n '" + outputFormatClass + "'");
return rowFormat.toString();
}

/**
* Get parameters associated with the SerDe.
*/
public static void getSerdeParams(StringBuilder builder, Map<String, String> serdeParams) {
SortedMap<String, String> sortedSerdeParams = new TreeMap<>(serdeParams);
List<String> serdeCols = new ArrayList<>();
for (Map.Entry<String, String> entry : sortedSerdeParams.entrySet()) {
serdeCols.add(" '" + entry.getKey() + "'='" + entry.getValue() + "'");
}
builder.append("WITH SERDEPROPERTIES ( \n")
.append(StringUtils.join(serdeCols, ", \n"))
.append(')');
}

/**
* Get the partition Information for creating a table.
*/
private static String getPartitions(MessageType storageSchema, HiveSyncConfig config)
throws IOException {
Map<String, String> hiveSchema =
convertParquetSchemaToHiveSchema(storageSchema, config.getIsHiveSupportTimestampType());
List<String> partitionFields = new ArrayList<>();
for (String partitionKey : config.getSplitStrings(META_SYNC_PARTITION_FIELDS)) {
String partitionKeyWithTicks = tickSurround(partitionKey);
partitionFields.add(new StringBuilder().append(partitionKeyWithTicks).append(" ")
.append(getPartitionKeyType(hiveSchema, partitionKeyWithTicks)).toString());
}
String partitionsStr = String.join(",", partitionFields);
return "\n PARTITIONED BY ( \n" + partitionsStr + ")";
}

/**
* Get the column Information for creating a table.
*/
private static String getColumns(MessageType storageSchema, HiveSyncConfig config)
throws IOException {
return generateSchemaString(storageSchema,
config.getMetaSyncPartitionFields(), config.getIsHiveSupportTimestampType());
}

/**
* Get the location Information for creating a table.
*/
private static String getLocationBlock(String location) {
return "\n LOCATION\n " + "'" + location + "'";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,11 @@ public void testSchemaDiffForTimestampMicros() {
schemaDifference.getAddColumnTypes(), Collections.emptyList(), true);
assertTrue(schemaDifference.isEmpty());
}

@Test
public void testGenerateCreateDataBaseDDL() {
String expectedCreateDataBaseSQL = "CREATE DATABASE IF NOT EXISTS `test_database`";
String testDataBase = HiveSchemaUtil.generateCreateDataBaseDDL("test_database");
assertEquals(expectedCreateDataBaseSQL, testDataBase);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also have a test case for create table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion, I will submit the test code as soon as possible.

}