ADB_SYNC_DROP_TABLE_BEFORE_CREATION = ConfigProperty
+ .key("hoodie.datasource.adb.sync.drop_table_before_creation")
+ .defaultValue(false)
+ .withDocumentation("Whether drop table before creation");
+
+ public AdbSyncConfig() {
+ this(new TypedProperties());
+ }
+
+ public AdbSyncConfig(TypedProperties props) {
+ super(props);
+
+ adbUser = getString(ADB_SYNC_USER);
+ adbPass = getString(ADB_SYNC_PASS);
+ jdbcUrl = getString(ADB_SYNC_JDBC_URL);
+ skipROSuffix = getBooleanOrDefault(ADB_SYNC_SKIP_RO_SUFFIX);
+ skipRTSync = getBooleanOrDefault(ADB_SYNC_SKIP_RT_SYNC);
+ useHiveStylePartitioning = getBooleanOrDefault(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING);
+ supportTimestamp = getBooleanOrDefault(ADB_SYNC_SUPPORT_TIMESTAMP);
+ syncAsSparkDataSourceTable = getBooleanOrDefault(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE);
+ tableProperties = getString(ADB_SYNC_TABLE_PROPERTIES);
+ serdeProperties = getString(ADB_SYNC_SERDE_PROPERTIES);
+ sparkSchemaLengthThreshold = getIntOrDefault(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD);
+ dbLocation = getString(ADB_SYNC_DB_LOCATION);
+ autoCreateDatabase = getBooleanOrDefault(ADB_SYNC_AUTO_CREATE_DATABASE);
+ skipLastCommitTimeSync = getBooleanOrDefault(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC);
+ dropTableBeforeCreation = getBooleanOrDefault(ADB_SYNC_DROP_TABLE_BEFORE_CREATION);
+ }
+
+ public static TypedProperties toProps(AdbSyncConfig cfg) {
+ TypedProperties properties = new TypedProperties();
+ properties.put(META_SYNC_DATABASE_NAME.key(), cfg.databaseName);
+ properties.put(META_SYNC_TABLE_NAME.key(), cfg.tableName);
+ properties.put(ADB_SYNC_USER.key(), cfg.adbUser);
+ properties.put(ADB_SYNC_PASS.key(), cfg.adbPass);
+ properties.put(ADB_SYNC_JDBC_URL.key(), cfg.jdbcUrl);
+ properties.put(META_SYNC_BASE_PATH.key(), cfg.basePath);
+ properties.put(META_SYNC_PARTITION_FIELDS.key(), String.join(",", cfg.partitionFields));
+ properties.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), cfg.partitionValueExtractorClass);
+ properties.put(META_SYNC_ASSUME_DATE_PARTITION.key(), String.valueOf(cfg.assumeDatePartitioning));
+ properties.put(ADB_SYNC_SKIP_RO_SUFFIX.key(), String.valueOf(cfg.skipROSuffix));
+ properties.put(ADB_SYNC_SKIP_RT_SYNC.key(), String.valueOf(cfg.skipRTSync));
+ properties.put(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING.key(), String.valueOf(cfg.useHiveStylePartitioning));
+ properties.put(META_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), String.valueOf(cfg.useFileListingFromMetadata));
+ properties.put(ADB_SYNC_SUPPORT_TIMESTAMP.key(), String.valueOf(cfg.supportTimestamp));
+ properties.put(ADB_SYNC_TABLE_PROPERTIES.key(), cfg.tableProperties);
+ properties.put(ADB_SYNC_SERDE_PROPERTIES.key(), cfg.serdeProperties);
+ properties.put(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE.key(), String.valueOf(cfg.syncAsSparkDataSourceTable));
+ properties.put(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key(), String.valueOf(cfg.sparkSchemaLengthThreshold));
+ properties.put(META_SYNC_SPARK_VERSION.key(), cfg.sparkVersion);
+ properties.put(ADB_SYNC_DB_LOCATION.key(), cfg.dbLocation);
+ properties.put(ADB_SYNC_AUTO_CREATE_DATABASE.key(), String.valueOf(cfg.autoCreateDatabase));
+ properties.put(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC.key(), String.valueOf(cfg.skipLastCommitTimeSync));
+ properties.put(ADB_SYNC_DROP_TABLE_BEFORE_CREATION.key(), String.valueOf(cfg.dropTableBeforeCreation));
+
+ return properties;
+ }
+
+ @Override
+ public String toString() {
+ return "AdbSyncConfig{"
+ + "adbUser='" + adbUser + '\''
+ + ", adbPass='" + adbPass + '\''
+ + ", jdbcUrl='" + jdbcUrl + '\''
+ + ", skipROSuffix=" + skipROSuffix
+ + ", skipRTSync=" + skipRTSync
+ + ", useHiveStylePartitioning=" + useHiveStylePartitioning
+ + ", supportTimestamp=" + supportTimestamp
+ + ", syncAsSparkDataSourceTable=" + syncAsSparkDataSourceTable
+ + ", tableProperties='" + tableProperties + '\''
+ + ", serdeProperties='" + serdeProperties + '\''
+ + ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold
+ + ", dbLocation='" + dbLocation + '\''
+ + ", autoCreateDatabase=" + autoCreateDatabase
+ + ", skipLastCommitTimeSync=" + skipLastCommitTimeSync
+ + ", dropTableBeforeCreation=" + dropTableBeforeCreation
+ + ", help=" + help
+ + ", databaseName='" + databaseName + '\''
+ + ", tableName='" + tableName + '\''
+ + ", basePath='" + basePath + '\''
+ + ", baseFileFormat='" + baseFileFormat + '\''
+ + ", partitionFields=" + partitionFields
+ + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\''
+ + ", assumeDatePartitioning=" + assumeDatePartitioning
+ + ", decodePartition=" + decodePartition
+ + ", useFileListingFromMetadata=" + useFileListingFromMetadata
+ + ", isConditionalSync=" + isConditionalSync
+ + ", sparkVersion='" + sparkVersion + '\''
+ + '}';
+ }
+}
diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
new file mode 100644
index 0000000000000..8c2f9e20451ca
--- /dev/null
+++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.adb;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.hive.SchemaDifference;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
+import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.util.ConfigUtils;
+
+import com.beust.jcommander.JCommander;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Adb sync tool is mainly used to sync hoodie tables to Alibaba Cloud AnalyticDB(ADB),
+ * it can be used as API `AdbSyncTool.syncHoodieTable(AdbSyncConfig)` or as command
+ * line `java -cp hoodie-hive.jar AdbSyncTool [args]`
+ *
+ *
+ * This utility will get the schema from the latest commit and will sync ADB table schema,
+ * incremental partitions will be synced as well.
+ */
+@SuppressWarnings("WeakerAccess")
+public class AdbSyncTool extends AbstractSyncTool {
+ private static final Logger LOG = LoggerFactory.getLogger(AdbSyncTool.class);
+
+ public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
+ public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
+
+ private final AdbSyncConfig adbSyncConfig;
+ private final AbstractAdbSyncHoodieClient hoodieAdbClient;
+ private final String snapshotTableName;
+ private final Option roTableTableName;
+
+ public AdbSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
+ super(props, conf, fs);
+ this.adbSyncConfig = new AdbSyncConfig(props);
+ this.hoodieAdbClient = getHoodieAdbClient(adbSyncConfig, fs);
+ switch (hoodieAdbClient.getTableType()) {
+ case COPY_ON_WRITE:
+ this.snapshotTableName = adbSyncConfig.tableName;
+ this.roTableTableName = Option.empty();
+ break;
+ case MERGE_ON_READ:
+ this.snapshotTableName = adbSyncConfig.tableName + SUFFIX_SNAPSHOT_TABLE;
+ this.roTableTableName = adbSyncConfig.skipROSuffix ? Option.of(adbSyncConfig.tableName)
+ : Option.of(adbSyncConfig.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
+ break;
+ default:
+ throw new HoodieAdbSyncException("Unknown table type:" + hoodieAdbClient.getTableType()
+ + ", basePath:" + hoodieAdbClient.getBasePath());
+ }
+ }
+
+ private AbstractAdbSyncHoodieClient getHoodieAdbClient(AdbSyncConfig adbSyncConfig, FileSystem fs) {
+ return new HoodieAdbJdbcClient(adbSyncConfig, fs);
+ }
+
+ @Override
+ public void syncHoodieTable() {
+ try {
+ switch (hoodieAdbClient.getTableType()) {
+ case COPY_ON_WRITE:
+ syncHoodieTable(snapshotTableName, false, false);
+ break;
+ case MERGE_ON_READ:
+ // Sync a ro table for MOR table
+ syncHoodieTable(roTableTableName.get(), false, true);
+ // Sync a rt table for MOR table
+ if (!adbSyncConfig.skipRTSync) {
+ syncHoodieTable(snapshotTableName, true, false);
+ }
+ break;
+ default:
+ throw new HoodieAdbSyncException("Unknown table type:" + hoodieAdbClient.getTableType()
+ + ", basePath:" + hoodieAdbClient.getBasePath());
+ }
+ } catch (Exception re) {
+ throw new HoodieAdbSyncException("Sync hoodie table to ADB failed, tableName:" + adbSyncConfig.tableName, re);
+ } finally {
+ hoodieAdbClient.close();
+ }
+ }
+
+ private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
+ boolean readAsOptimized) throws Exception {
+ LOG.info("Try to sync hoodie table, tableName:{}, path:{}, tableType:{}",
+ tableName, hoodieAdbClient.getBasePath(), hoodieAdbClient.getTableType());
+
+ if (adbSyncConfig.autoCreateDatabase) {
+ try {
+ synchronized (AdbSyncTool.class) {
+ if (!hoodieAdbClient.databaseExists(adbSyncConfig.databaseName)) {
+ hoodieAdbClient.createDatabase(adbSyncConfig.databaseName);
+ }
+ }
+ } catch (Exception e) {
+ throw new HoodieAdbSyncException("Failed to create database:" + adbSyncConfig.databaseName
+ + ", useRealtimeInputFormat = " + useRealtimeInputFormat, e);
+ }
+ } else if (!hoodieAdbClient.databaseExists(adbSyncConfig.databaseName)) {
+ throw new HoodieAdbSyncException("ADB database does not exists:" + adbSyncConfig.databaseName);
+ }
+
+ // Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table,
+ // so we disable the syncAsSparkDataSourceTable here to avoid read such kind table
+ // by the data source way (which will use the HoodieBootstrapRelation).
+ // TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071],
+ // we can remove this logical.
+ if (hoodieAdbClient.isBootstrap()
+ && hoodieAdbClient.getTableType() == HoodieTableType.MERGE_ON_READ
+ && !readAsOptimized) {
+ adbSyncConfig.syncAsSparkDataSourceTable = false;
+ LOG.info("Disable sync as spark datasource table for mor rt table:{}", tableName);
+ }
+
+ if (adbSyncConfig.dropTableBeforeCreation) {
+ LOG.info("Drop table before creation, tableName:{}", tableName);
+ hoodieAdbClient.dropTable(tableName);
+ }
+
+ boolean tableExists = hoodieAdbClient.tableExists(tableName);
+
+ // Get the parquet schema for this table looking at the latest commit
+ MessageType schema = hoodieAdbClient.getDataSchema();
+
+ // Sync schema if needed
+ syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);
+ LOG.info("Sync schema complete, start syncing partitions for table:{}", tableName);
+
+ // Get the last time we successfully synced partitions
+ Option lastCommitTimeSynced = Option.empty();
+ if (tableExists) {
+ lastCommitTimeSynced = hoodieAdbClient.getLastCommitTimeSynced(tableName);
+ }
+ LOG.info("Last commit time synced was found:{}", lastCommitTimeSynced.orElse("null"));
+
+ // Scan synced partitions
+ List writtenPartitionsSince;
+ if (adbSyncConfig.partitionFields.isEmpty()) {
+ writtenPartitionsSince = new ArrayList<>();
+ } else {
+ writtenPartitionsSince = hoodieAdbClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
+ }
+ LOG.info("Scan partitions complete, partitionNum:{}", writtenPartitionsSince.size());
+
+ // Sync the partitions if needed
+ syncPartitions(tableName, writtenPartitionsSince);
+
+ // Update sync commit time
+ // whether to skip syncing commit time stored in tbl properties, since it is time consuming.
+ if (!adbSyncConfig.skipLastCommitTimeSync) {
+ hoodieAdbClient.updateLastCommitTimeSynced(tableName);
+ }
+ LOG.info("Sync complete for table:{}", tableName);
+ }
+
+ /**
+ * Get the latest schema from the last commit and check if its in sync with the ADB
+ * table schema. If not, evolves the table schema.
+ *
+ * @param tableName The table to be synced
+ * @param tableExists Whether target table exists
+ * @param useRealTimeInputFormat Whether using realtime input format
+ * @param readAsOptimized Whether read as optimized table
+ * @param schema The extracted schema
+ */
+ private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat,
+ boolean readAsOptimized, MessageType schema) throws Exception {
+ // Append spark table properties & serde properties
+ Map tableProperties = ConfigUtils.toMap(adbSyncConfig.tableProperties);
+ Map serdeProperties = ConfigUtils.toMap(adbSyncConfig.serdeProperties);
+ if (adbSyncConfig.syncAsSparkDataSourceTable) {
+ Map sparkTableProperties = getSparkTableProperties(adbSyncConfig.partitionFields,
+ adbSyncConfig.sparkVersion, adbSyncConfig.sparkSchemaLengthThreshold, schema);
+ Map sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized, adbSyncConfig.basePath);
+ tableProperties.putAll(sparkTableProperties);
+ serdeProperties.putAll(sparkSerdeProperties);
+ LOG.info("Sync as spark datasource table, tableName:{}, tableExists:{}, tableProperties:{}, sederProperties:{}",
+ tableName, tableExists, tableProperties, serdeProperties);
+ }
+
+ // Check and sync schema
+ if (!tableExists) {
+ LOG.info("ADB table [{}] is not found, creating it", tableName);
+ String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(HoodieFileFormat.PARQUET, useRealTimeInputFormat);
+
+ // Custom serde will not work with ALTER TABLE REPLACE COLUMNS
+ // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
+ // /ql/exec/DDLTask.java#L3488
+ hoodieAdbClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
+ ParquetHiveSerDe.class.getName(), serdeProperties, tableProperties);
+ } else {
+ // Check if the table schema has evolved
+ Map tableSchema = hoodieAdbClient.getTableSchema(tableName);
+ SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, adbSyncConfig.partitionFields,
+ adbSyncConfig.supportTimestamp);
+ if (!schemaDiff.isEmpty()) {
+ LOG.info("Schema difference found for table:{}", tableName);
+ hoodieAdbClient.updateTableDefinition(tableName, schemaDiff);
+ } else {
+ LOG.info("No Schema difference for table:{}", tableName);
+ }
+ }
+ }
+
+ /**
+ * Syncs the list of storage partitions passed in (checks if the partition is in adb, if not adds it or if the
+ * partition path does not match, it updates the partition path).
+ */
+ private void syncPartitions(String tableName, List writtenPartitionsSince) {
+ try {
+ if (adbSyncConfig.partitionFields.isEmpty()) {
+ LOG.info("Not a partitioned table.");
+ return;
+ }
+
+ Map, String> partitions = hoodieAdbClient.scanTablePartitions(tableName);
+ List partitionEvents = hoodieAdbClient.getPartitionEvents(partitions, writtenPartitionsSince);
+ List newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD);
+ LOG.info("New Partitions:{}", newPartitions);
+ hoodieAdbClient.addPartitionsToTable(tableName, newPartitions);
+ List updatePartitions = filterPartitions(partitionEvents, PartitionEventType.UPDATE);
+ LOG.info("Changed Partitions:{}", updatePartitions);
+ hoodieAdbClient.updatePartitionsToTable(tableName, updatePartitions);
+ } catch (Exception e) {
+ throw new HoodieAdbSyncException("Failed to sync partitions for table:" + tableName, e);
+ }
+ }
+
+ private List filterPartitions(List events, PartitionEventType eventType) {
+ return events.stream().filter(s -> s.eventType == eventType)
+ .map(s -> s.storagePartition).collect(Collectors.toList());
+ }
+
+ public static void main(String[] args) {
+ // parse the params
+ final AdbSyncConfig cfg = new AdbSyncConfig();
+ JCommander cmd = new JCommander(cfg, null, args);
+ if (cfg.help || args.length == 0) {
+ cmd.usage();
+ System.exit(1);
+ }
+
+ Configuration hadoopConf = new Configuration();
+ FileSystem fs = FSUtils.getFs(cfg.basePath, hadoopConf);
+ new AdbSyncTool(AdbSyncConfig.toProps(cfg), hadoopConf, fs).syncHoodieTable();
+ }
+}
diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java
new file mode 100644
index 0000000000000..a347ba701110d
--- /dev/null
+++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.adb;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hive.HiveSyncConfig;
+import org.apache.hudi.hive.HoodieHiveSyncException;
+import org.apache.hudi.hive.SchemaDifference;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
+ private static final Logger LOG = LoggerFactory.getLogger(HoodieAdbJdbcClient.class);
+
+ public static final String HOODIE_LAST_COMMIT_TIME_SYNC = "hoodie_last_sync";
+ // Make sure we have the jdbc driver in classpath
+ private static final String DRIVER_NAME = "com.mysql.jdbc.Driver";
+ public static final String ADB_ESCAPE_CHARACTER = "";
+ private static final String TBL_PROPERTIES_STR = "TBLPROPERTIES";
+
+ static {
+ try {
+ Class.forName(DRIVER_NAME);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Could not find " + DRIVER_NAME + " in classpath. ", e);
+ }
+ }
+
+ private Connection connection;
+
+ public HoodieAdbJdbcClient(AdbSyncConfig syncConfig, FileSystem fs) {
+ super(syncConfig, fs);
+ createAdbConnection();
+ LOG.info("Init adb jdbc client success, jdbcUrl:{}", syncConfig.jdbcUrl);
+ }
+
+ private void createAdbConnection() {
+ if (connection == null) {
+ try {
+ Class.forName(DRIVER_NAME);
+ } catch (ClassNotFoundException e) {
+ LOG.error("Unable to load jdbc driver class", e);
+ return;
+ }
+ try {
+ this.connection = DriverManager.getConnection(
+ adbSyncConfig.jdbcUrl, adbSyncConfig.adbUser, adbSyncConfig.adbPass);
+ } catch (SQLException e) {
+ throw new HoodieException("Cannot create adb connection ", e);
+ }
+ }
+ }
+
+ @Override
+ public void createTable(String tableName, MessageType storageSchema, String inputFormatClass,
+ String outputFormatClass, String serdeClass,
+ Map serdeProperties, Map tableProperties) {
+ try {
+ LOG.info("Creating table:{}", tableName);
+ String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema,
+ getHiveSyncConfig(), inputFormatClass, outputFormatClass, serdeClass, serdeProperties, tableProperties);
+ executeAdbSql(createSQLQuery);
+ } catch (IOException e) {
+ throw new HoodieException("Fail to create table:" + tableName, e);
+ }
+ }
+
+ @Override
+ public void dropTable(String tableName) {
+ LOG.info("Dropping table:{}", tableName);
+ String dropTable = "drop table if exists `" + adbSyncConfig.databaseName + "`.`" + tableName + "`";
+ executeAdbSql(dropTable);
+ }
+
+ public Map getTableSchema(String tableName) {
+ Map schema = new HashMap<>();
+ ResultSet result = null;
+ try {
+ DatabaseMetaData databaseMetaData = connection.getMetaData();
+ result = databaseMetaData.getColumns(adbSyncConfig.databaseName,
+ adbSyncConfig.databaseName, tableName, null);
+ while (result.next()) {
+ String columnName = result.getString(4);
+ String columnType = result.getString(6);
+ if ("DECIMAL".equals(columnType)) {
+ int columnSize = result.getInt("COLUMN_SIZE");
+ int decimalDigits = result.getInt("DECIMAL_DIGITS");
+ columnType += String.format("(%s,%s)", columnSize, decimalDigits);
+ }
+ schema.put(columnName, columnType);
+ }
+ return schema;
+ } catch (SQLException e) {
+ throw new HoodieException("Fail to get table schema:" + tableName, e);
+ } finally {
+ closeQuietly(result, null);
+ }
+ }
+
+ @Override
+ public void addPartitionsToTable(String tableName, List partitionsToAdd) {
+ if (partitionsToAdd.isEmpty()) {
+ LOG.info("No partitions to add for table:{}", tableName);
+ return;
+ }
+
+ LOG.info("Adding partitions to table:{}, partitionNum:{}", tableName, partitionsToAdd.size());
+ String sql = constructAddPartitionsSql(tableName, partitionsToAdd);
+ executeAdbSql(sql);
+ }
+
+ private void executeAdbSql(String sql) {
+ Statement stmt = null;
+ try {
+ stmt = connection.createStatement();
+ LOG.info("Executing sql:{}", sql);
+ stmt.execute(sql);
+ } catch (SQLException e) {
+ throw new HoodieException("Fail to execute sql:" + sql, e);
+ } finally {
+ closeQuietly(null, stmt);
+ }
+ }
+
+ private T executeQuerySQL(String sql, Function function) {
+ Statement stmt = null;
+ try {
+ stmt = connection.createStatement();
+ LOG.info("Executing sql:{}", sql);
+ return function.apply(stmt.executeQuery(sql));
+ } catch (SQLException e) {
+ throw new HoodieException("Fail to execute sql:" + sql, e);
+ } finally {
+ closeQuietly(null, stmt);
+ }
+ }
+
+ public void createDatabase(String databaseName) {
+ String rootPath = getDatabasePath();
+ LOG.info("Creating database:{}, databaseLocation:{}", databaseName, rootPath);
+ String sql = constructCreateDatabaseSql(rootPath);
+ executeAdbSql(sql);
+ }
+
+ public boolean databaseExists(String databaseName) {
+ String sql = constructShowCreateDatabaseSql(databaseName);
+ Function transform = resultSet -> {
+ try {
+ return resultSet.next();
+ } catch (Exception e) {
+ if (e.getMessage().contains("Unknown database `" + databaseName + "`")) {
+ return false;
+ } else {
+ throw new HoodieException("Fail to execute sql:" + sql, e);
+ }
+ }
+ };
+ return executeQuerySQL(sql, transform);
+ }
+
+ @Override
+ public boolean doesTableExist(String tableName) {
+ String sql = constructShowLikeTableSql(tableName);
+ Function transform = resultSet -> {
+ try {
+ return resultSet.next();
+ } catch (Exception e) {
+ throw new HoodieException("Fail to execute sql:" + sql, e);
+ }
+ };
+ return executeQuerySQL(sql, transform);
+ }
+
+ @Override
+ public boolean tableExists(String tableName) {
+ return doesTableExist(tableName);
+ }
+
+ @Override
+ public Option getLastCommitTimeSynced(String tableName) {
+ String sql = constructShowCreateTableSql(tableName);
+
+ Function> transform = resultSet -> {
+ try {
+ if (resultSet.next()) {
+ String table = resultSet.getString(2);
+ Map attr = new HashMap<>();
+ int index = table.indexOf(TBL_PROPERTIES_STR);
+ if (index != -1) {
+ String sub = table.substring(index + TBL_PROPERTIES_STR.length());
+ sub = sub
+ .replaceAll("\\(", "")
+ .replaceAll("\\)", "")
+ .replaceAll("'", "");
+ String[] str = sub.split(",");
+
+ for (String s : str) {
+ String key = s.split("=")[0].trim();
+ String value = s.split("=")[1].trim();
+ attr.put(key, value);
+ }
+ }
+ return Option.ofNullable(attr.getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null));
+ }
+ return Option.empty();
+ } catch (Exception e) {
+ throw new HoodieException("Fail to execute sql:" + sql, e);
+ }
+ };
+ return executeQuerySQL(sql, transform);
+ }
+
+ @Override
+ public void updateLastCommitTimeSynced(String tableName) {
+ // Set the last commit time from the TBLProperties
+ String lastCommitSynced = activeTimeline.lastInstant().get().getTimestamp();
+ try {
+ String sql = constructUpdateTblPropertiesSql(tableName, lastCommitSynced);
+ executeAdbSql(sql);
+ } catch (Exception e) {
+ throw new HoodieHiveSyncException("Fail to get update last commit time synced:" + lastCommitSynced, e);
+ }
+ }
+
+ @Override
+ public Option getLastReplicatedTime(String tableName) {
+ throw new UnsupportedOperationException("Not support getLastReplicatedTime yet");
+ }
+
+ @Override
+ public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
+ throw new UnsupportedOperationException("Not support updateLastReplicatedTimeStamp yet");
+ }
+
+ @Override
+ public void deleteLastReplicatedTimeStamp(String tableName) {
+ throw new UnsupportedOperationException("Not support deleteLastReplicatedTimeStamp yet");
+ }
+
+ @Override
+ public void updatePartitionsToTable(String tableName, List changedPartitions) {
+ if (changedPartitions.isEmpty()) {
+ LOG.info("No partitions to change for table:{}", tableName);
+ return;
+ }
+
+ LOG.info("Changing partitions on table:{}, changedPartitionNum:{}", tableName, changedPartitions.size());
+ List sqlList = constructChangePartitionsSql(tableName, changedPartitions);
+ for (String sql : sqlList) {
+ executeAdbSql(sql);
+ }
+ }
+
+ @Override
+ public void dropPartitions(String tableName, List partitionsToDrop) {
+ throw new UnsupportedOperationException("Not support dropPartitions yet.");
+ }
+
+ public Map, String> scanTablePartitions(String tableName) {
+ String sql = constructShowPartitionSql(tableName);
+ Function, String>> transform = resultSet -> {
+ Map, String> partitions = new HashMap<>();
+ try {
+ while (resultSet.next()) {
+ if (resultSet.getMetaData().getColumnCount() > 0) {
+ String str = resultSet.getString(1);
+ if (!StringUtils.isNullOrEmpty(str)) {
+ List values = partitionValueExtractor.extractPartitionValuesInPath(str);
+ Path storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, String.join("/", values));
+ String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
+ partitions.put(values, fullStoragePartitionPath);
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new HoodieException("Fail to execute sql:" + sql, e);
+ }
+ return partitions;
+ };
+ return executeQuerySQL(sql, transform);
+ }
+
+ public void updateTableDefinition(String tableName, SchemaDifference schemaDiff) {
+ LOG.info("Adding columns for table:{}", tableName);
+ schemaDiff.getAddColumnTypes().forEach((columnName, columnType) ->
+ executeAdbSql(constructAddColumnSql(tableName, columnName, columnType))
+ );
+
+ LOG.info("Updating columns' definition for table:{}", tableName);
+ schemaDiff.getUpdateColumnTypes().forEach((columnName, columnType) ->
+ executeAdbSql(constructChangeColumnSql(tableName, columnName, columnType))
+ );
+ }
+
+ private String constructAddPartitionsSql(String tableName, List partitions) {
+ StringBuilder sqlBuilder = new StringBuilder("alter table `");
+ sqlBuilder.append(adbSyncConfig.databaseName).append("`").append(".`")
+ .append(tableName).append("`").append(" add if not exists ");
+ for (String partition : partitions) {
+ String partitionClause = getPartitionClause(partition);
+ Path partitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, partition);
+ String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
+ sqlBuilder.append(" partition (").append(partitionClause).append(") location '")
+ .append(fullPartitionPathStr).append("' ");
+ }
+
+ return sqlBuilder.toString();
+ }
+
+ private List constructChangePartitionsSql(String tableName, List partitions) {
+ List changePartitions = new ArrayList<>();
+ String useDatabase = "use `" + adbSyncConfig.databaseName + "`";
+ changePartitions.add(useDatabase);
+
+ String alterTable = "alter table `" + tableName + "`";
+ for (String partition : partitions) {
+ String partitionClause = getPartitionClause(partition);
+ Path partitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, partition);
+ String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
+ String changePartition = alterTable + " add if not exists partition (" + partitionClause
+ + ") location '" + fullPartitionPathStr + "'";
+ changePartitions.add(changePartition);
+ }
+
+ return changePartitions;
+ }
+
+ /**
+ * Generate Hive Partition from partition values.
+ *
+ * @param partition Partition path
+ * @return partition clause
+ */
+ private String getPartitionClause(String partition) {
+ List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
+ ValidationUtils.checkArgument(adbSyncConfig.partitionFields.size() == partitionValues.size(),
+ "Partition key parts " + adbSyncConfig.partitionFields
+ + " does not match with partition values " + partitionValues + ". Check partition strategy. ");
+ List partBuilder = new ArrayList<>();
+ for (int i = 0; i < adbSyncConfig.partitionFields.size(); i++) {
+ partBuilder.add(adbSyncConfig.partitionFields.get(i) + "='" + partitionValues.get(i) + "'");
+ }
+
+ return String.join(",", partBuilder);
+ }
+
+ private String constructShowPartitionSql(String tableName) {
+ return String.format("show partitions `%s`.`%s`", adbSyncConfig.databaseName, tableName);
+ }
+
+ private String constructShowCreateTableSql(String tableName) {
+ return String.format("show create table `%s`.`%s`", adbSyncConfig.databaseName, tableName);
+ }
+
+ private String constructShowLikeTableSql(String tableName) {
+ return String.format("show tables from `%s` like '%s'", adbSyncConfig.databaseName, tableName);
+ }
+
+ private String constructCreateDatabaseSql(String rootPath) {
+ return String.format("create database if not exists `%s` with dbproperties(catalog = 'oss', location = '%s')",
+ adbSyncConfig.databaseName, rootPath);
+ }
+
+ private String constructShowCreateDatabaseSql(String databaseName) {
+ return String.format("show create database `%s`", databaseName);
+ }
+
+ private String constructUpdateTblPropertiesSql(String tableName, String lastCommitSynced) {
+ return String.format("alter table `%s`.`%s` set tblproperties('%s' = '%s')",
+ adbSyncConfig.databaseName, tableName, HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced);
+ }
+
+ private String constructAddColumnSql(String tableName, String columnName, String columnType) {
+ return String.format("alter table `%s`.`%s` add columns(`%s` %s)",
+ adbSyncConfig.databaseName, tableName, columnName, columnType);
+ }
+
+ private String constructChangeColumnSql(String tableName, String columnName, String columnType) {
+ return String.format("alter table `%s`.`%s` change `%s` `%s` %s",
+ adbSyncConfig.databaseName, tableName, columnName, columnName, columnType);
+ }
+
+ private HiveSyncConfig getHiveSyncConfig() {
+ HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
+ hiveSyncConfig.partitionFields = adbSyncConfig.partitionFields;
+ hiveSyncConfig.databaseName = adbSyncConfig.databaseName;
+ Path basePath = new Path(adbSyncConfig.basePath);
+ hiveSyncConfig.basePath = generateAbsolutePathStr(basePath);
+ return hiveSyncConfig;
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (SQLException e) {
+ LOG.error("Fail to close connection", e);
+ }
+ }
+}
diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbSyncException.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbSyncException.java
new file mode 100644
index 0000000000000..0deb9b94cd525
--- /dev/null
+++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbSyncException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.adb;
+
+public class HoodieAdbSyncException extends RuntimeException {
+ public HoodieAdbSyncException(String message) {
+ super(message);
+ }
+
+ public HoodieAdbSyncException(String message, Throwable t) {
+ super(message, t);
+ }
+}
diff --git a/hudi-sync/hudi-adb-sync/src/test/java/org/apache/hudi/sync/adb/TestAdbSyncConfig.java b/hudi-sync/hudi-adb-sync/src/test/java/org/apache/hudi/sync/adb/TestAdbSyncConfig.java
new file mode 100644
index 0000000000000..f4eb8fc7fc453
--- /dev/null
+++ b/hudi-sync/hudi-adb-sync/src/test/java/org/apache/hudi/sync/adb/TestAdbSyncConfig.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.adb;
+
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+public class TestAdbSyncConfig {
+ @Test
+ public void testCopy() {
+ AdbSyncConfig adbSyncConfig = new AdbSyncConfig();
+ adbSyncConfig.partitionFields = Arrays.asList("a", "b");
+ adbSyncConfig.basePath = "/tmp";
+ adbSyncConfig.assumeDatePartitioning = true;
+ adbSyncConfig.databaseName = "test";
+ adbSyncConfig.tableName = "test";
+ adbSyncConfig.adbUser = "adb";
+ adbSyncConfig.adbPass = "adb";
+ adbSyncConfig.jdbcUrl = "jdbc:mysql://localhost:3306";
+ adbSyncConfig.skipROSuffix = false;
+ adbSyncConfig.tableProperties = "spark.sql.sources.provider= 'hudi'\\n"
+ + "spark.sql.sources.schema.numParts = '1'\\n "
+ + "spark.sql.sources.schema.part.0 ='xx'\\n "
+ + "spark.sql.sources.schema.numPartCols = '1'\\n"
+ + "spark.sql.sources.schema.partCol.0 = 'dt'";
+ adbSyncConfig.serdeProperties = "'path'='/tmp/test_db/tbl'";
+ adbSyncConfig.dbLocation = "file://tmp/test_db";
+
+ TypedProperties props = AdbSyncConfig.toProps(adbSyncConfig);
+ AdbSyncConfig copied = new AdbSyncConfig(props);
+
+ assertEquals(copied.partitionFields, adbSyncConfig.partitionFields);
+ assertEquals(copied.basePath, adbSyncConfig.basePath);
+ assertEquals(copied.assumeDatePartitioning, adbSyncConfig.assumeDatePartitioning);
+ assertEquals(copied.databaseName, adbSyncConfig.databaseName);
+ assertEquals(copied.tableName, adbSyncConfig.tableName);
+ assertEquals(copied.adbUser, adbSyncConfig.adbUser);
+ assertEquals(copied.adbPass, adbSyncConfig.adbPass);
+ assertEquals(copied.basePath, adbSyncConfig.basePath);
+ assertEquals(copied.jdbcUrl, adbSyncConfig.jdbcUrl);
+ assertEquals(copied.skipROSuffix, adbSyncConfig.skipROSuffix);
+ assertEquals(copied.supportTimestamp, adbSyncConfig.supportTimestamp);
+ }
+}
diff --git a/hudi-sync/hudi-dla-sync/src/test/resources/log4j-surefire-quiet.properties b/hudi-sync/hudi-adb-sync/src/test/resources/log4j-surefire-quiet.properties
similarity index 100%
rename from hudi-sync/hudi-dla-sync/src/test/resources/log4j-surefire-quiet.properties
rename to hudi-sync/hudi-adb-sync/src/test/resources/log4j-surefire-quiet.properties
diff --git a/hudi-sync/hudi-dla-sync/src/test/resources/log4j-surefire.properties b/hudi-sync/hudi-adb-sync/src/test/resources/log4j-surefire.properties
similarity index 100%
rename from hudi-sync/hudi-dla-sync/src/test/resources/log4j-surefire.properties
rename to hudi-sync/hudi-adb-sync/src/test/resources/log4j-surefire.properties
diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java
deleted file mode 100644
index d4d580fe276af..0000000000000
--- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.dla;
-
-import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
-
-import com.beust.jcommander.Parameter;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Configs needed to sync data into DLA.
- */
-public class DLASyncConfig implements Serializable {
-
- @Parameter(names = {"--database"}, description = "name of the target database in DLA", required = true)
- public String databaseName;
-
- @Parameter(names = {"--table"}, description = "name of the target table in DLA", required = true)
- public String tableName;
-
- @Parameter(names = {"--user"}, description = "DLA username", required = true)
- public String dlaUser;
-
- @Parameter(names = {"--pass"}, description = "DLA password", required = true)
- public String dlaPass;
-
- @Parameter(names = {"--jdbc-url"}, description = "DLA jdbc connect url", required = true)
- public String jdbcUrl;
-
- @Parameter(names = {"--base-path"}, description = "Basepath of hoodie table to sync", required = true)
- public String basePath;
-
- @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by")
- public List partitionFields = new ArrayList<>();
-
- @Parameter(names = "--partition-value-extractor", description = "Class which implements PartitionValueExtractor "
- + "to extract the partition values from HDFS path")
- public String partitionValueExtractorClass = SlashEncodedDayPartitionValueExtractor.class.getName();
-
- @Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this"
- + " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter")
- public Boolean assumeDatePartitioning = false;
-
- @Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
- public Boolean skipROSuffix = false;
-
- @Parameter(names = {"--skip-rt-sync"}, description = "Skip the RT table syncing")
- public Boolean skipRTSync = false;
-
- @Parameter(names = {"--hive-style-partitioning"}, description = "Use DLA hive style partitioning, true if like the following style: field1=value1/field2=value2")
- public Boolean useDLASyncHiveStylePartitioning = false;
-
- @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata")
- public Boolean useFileListingFromMetadata = HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
-
- @Parameter(names = {"--help", "-h"}, help = true)
- public Boolean help = false;
-
- @Parameter(names = {"--support-timestamp"}, description = "If true, converts int64(timestamp_micros) to timestamp type")
- public Boolean supportTimestamp = false;
-
- public static DLASyncConfig copy(DLASyncConfig cfg) {
- DLASyncConfig newConfig = new DLASyncConfig();
- newConfig.databaseName = cfg.databaseName;
- newConfig.tableName = cfg.tableName;
- newConfig.dlaUser = cfg.dlaUser;
- newConfig.dlaPass = cfg.dlaPass;
- newConfig.jdbcUrl = cfg.jdbcUrl;
- newConfig.basePath = cfg.basePath;
- newConfig.partitionFields = cfg.partitionFields;
- newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass;
- newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
- newConfig.skipROSuffix = cfg.skipROSuffix;
- newConfig.skipRTSync = cfg.skipRTSync;
- newConfig.useDLASyncHiveStylePartitioning = cfg.useDLASyncHiveStylePartitioning;
- newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
- newConfig.supportTimestamp = cfg.supportTimestamp;
- return newConfig;
- }
-
- @Override
- public String toString() {
- return "DLASyncConfig{databaseName='" + databaseName + '\'' + ", tableName='" + tableName + '\''
- + ", dlaUser='" + dlaUser + '\'' + ", dlaPass='" + dlaPass + '\'' + ", jdbcUrl='" + jdbcUrl + '\''
- + ", basePath='" + basePath + '\'' + ", partitionFields=" + partitionFields + ", partitionValueExtractorClass='"
- + partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning
- + ", useDLASyncHiveStylePartitioning=" + useDLASyncHiveStylePartitioning
- + ", useFileListingFromMetadata=" + useFileListingFromMetadata
- + ", help=" + help + '}';
- }
-}
diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java
deleted file mode 100644
index 97838d03ed66b..0000000000000
--- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.dla;
-
-import com.beust.jcommander.JCommander;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
-import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
-
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.dla.util.Utils;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.InvalidTableException;
-import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
-import org.apache.hudi.hive.SchemaDifference;
-import org.apache.hudi.hive.util.HiveSchemaUtil;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
-import org.apache.hudi.sync.common.AbstractSyncTool;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.parquet.schema.MessageType;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * Tool to sync a hoodie table with a dla table. Either use it as a api
- * DLASyncTool.syncHoodieTable(DLASyncConfig) or as a command line java -cp hoodie-hive.jar DLASyncTool [args]
- *
- * This utility will get the schema from the latest commit and will sync dla table schema Also this will sync the
- * partitions incrementally (all the partitions modified since the last commit)
- */
-@SuppressWarnings("WeakerAccess")
-public class DLASyncTool extends AbstractSyncTool {
-
- private static final Logger LOG = LogManager.getLogger(DLASyncTool.class);
- public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
- public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
-
- private final DLASyncConfig cfg;
- private final HoodieDLAClient hoodieDLAClient;
- private final String snapshotTableName;
- private final Option roTableTableName;
-
- public DLASyncTool(TypedProperties properties, Configuration conf, FileSystem fs) {
- super(properties, conf, fs);
- this.hoodieDLAClient = new HoodieDLAClient(Utils.propertiesToConfig(properties), fs);
- this.cfg = Utils.propertiesToConfig(properties);
- switch (hoodieDLAClient.getTableType()) {
- case COPY_ON_WRITE:
- this.snapshotTableName = cfg.tableName;
- this.roTableTableName = Option.empty();
- break;
- case MERGE_ON_READ:
- this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
- this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
- Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
- break;
- default:
- LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
- throw new InvalidTableException(hoodieDLAClient.getBasePath());
- }
- }
-
- @Override
- public void syncHoodieTable() {
- try {
- switch (hoodieDLAClient.getTableType()) {
- case COPY_ON_WRITE:
- syncHoodieTable(snapshotTableName, false);
- break;
- case MERGE_ON_READ:
- // sync a RO table for MOR
- syncHoodieTable(roTableTableName.get(), false);
- // sync a RT table for MOR
- if (!cfg.skipRTSync) {
- syncHoodieTable(snapshotTableName, true);
- }
- break;
- default:
- LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
- throw new InvalidTableException(hoodieDLAClient.getBasePath());
- }
- } catch (RuntimeException re) {
- throw new HoodieException("Got runtime exception when dla syncing " + cfg.tableName, re);
- } finally {
- hoodieDLAClient.close();
- }
- }
-
- private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) {
- LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieDLAClient.getBasePath()
- + " of type " + hoodieDLAClient.getTableType());
- // Check if the necessary table exists
- boolean tableExists = hoodieDLAClient.tableExists(tableName);
- // Get the parquet schema for this table looking at the latest commit
- MessageType schema = hoodieDLAClient.getDataSchema();
- // Sync schema if needed
- syncSchema(tableName, tableExists, useRealtimeInputFormat, schema);
-
- LOG.info("Schema sync complete. Syncing partitions for " + tableName);
- // Get the last time we successfully synced partitions
- // TODO : once DLA supports alter table properties
- Option lastCommitTimeSynced = Option.empty();
- /*if (tableExists) {
- lastCommitTimeSynced = hoodieDLAClient.getLastCommitTimeSynced(tableName);
- }*/
- LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null"));
- List writtenPartitionsSince = hoodieDLAClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
- LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size());
- // Sync the partitions if needed
- syncPartitions(tableName, writtenPartitionsSince);
-
- hoodieDLAClient.updateLastCommitTimeSynced(tableName);
- LOG.info("Sync complete for " + tableName);
- }
-
- /**
- * Get the latest schema from the last commit and check if its in sync with the dla table schema. If not, evolves the
- * table schema.
- *
- * @param tableExists - does table exist
- * @param schema - extracted schema
- */
- private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat, MessageType schema) {
- // Check and sync schema
- if (!tableExists) {
- LOG.info("DLA table " + tableName + " is not found. Creating it");
-
- String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(HoodieFileFormat.PARQUET, useRealTimeInputFormat);
-
- // Custom serde will not work with ALTER TABLE REPLACE COLUMNS
- // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
- // /ql/exec/DDLTask.java#L3488
- hoodieDLAClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
- ParquetHiveSerDe.class.getName(), new HashMap<>(), new HashMap<>());
- } else {
- // Check if the table schema has evolved
- Map tableSchema = hoodieDLAClient.getTableSchema(tableName);
- SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields, cfg.supportTimestamp);
- if (!schemaDiff.isEmpty()) {
- LOG.info("Schema difference found for " + tableName);
- hoodieDLAClient.updateTableDefinition(tableName, schemaDiff);
- } else {
- LOG.info("No Schema difference for " + tableName);
- }
- }
- }
-
- /**
- * Syncs the list of storage partitions passed in (checks if the partition is in dla, if not adds it or if the
- * partition path does not match, it updates the partition path).
- */
- private void syncPartitions(String tableName, List writtenPartitionsSince) {
- try {
- if (cfg.partitionFields.isEmpty()) {
- LOG.info("not a partitioned table.");
- return;
- }
- Map, String> partitions = hoodieDLAClient.scanTablePartitions(tableName);
- List partitionEvents =
- hoodieDLAClient.getPartitionEvents(partitions, writtenPartitionsSince);
- List newPartitions = filterPartitions(partitionEvents, AbstractSyncHoodieClient.PartitionEvent.PartitionEventType.ADD);
- LOG.info("New Partitions " + newPartitions);
- hoodieDLAClient.addPartitionsToTable(tableName, newPartitions);
- List updatePartitions = filterPartitions(partitionEvents, AbstractSyncHoodieClient.PartitionEvent.PartitionEventType.UPDATE);
- LOG.info("Changed Partitions " + updatePartitions);
- hoodieDLAClient.updatePartitionsToTable(tableName, updatePartitions);
- } catch (Exception e) {
- throw new HoodieException("Failed to sync partitions for table " + tableName, e);
- }
- }
-
- private List filterPartitions(List events, AbstractSyncHoodieClient.PartitionEvent.PartitionEventType eventType) {
- return events.stream().filter(s -> s.eventType == eventType).map(s -> s.storagePartition)
- .collect(Collectors.toList());
- }
-
- public static void main(String[] args) {
- // parse the params
- final DLASyncConfig cfg = new DLASyncConfig();
- JCommander cmd = new JCommander(cfg, null, args);
- if (cfg.help || args.length == 0) {
- cmd.usage();
- System.exit(1);
- }
- Configuration hadoopConf = new Configuration();
- FileSystem fs = FSUtils.getFs(cfg.basePath, hadoopConf);
- new DLASyncTool(Utils.configToProperties(cfg), hadoopConf, fs).syncHoodieTable();
- }
-}
diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
deleted file mode 100644
index 10869eaf27b64..0000000000000
--- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.dla;
-
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hive.HiveSyncConfig;
-import org.apache.hudi.hive.HoodieHiveSyncException;
-import org.apache.hudi.hive.PartitionValueExtractor;
-import org.apache.hudi.hive.SchemaDifference;
-import org.apache.hudi.hive.util.HiveSchemaUtil;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.parquet.schema.MessageType;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class HoodieDLAClient extends AbstractSyncHoodieClient {
- private static final Logger LOG = LogManager.getLogger(HoodieDLAClient.class);
- private static final String HOODIE_LAST_COMMIT_TIME_SYNC = "hoodie_last_sync";
- // Make sure we have the dla JDBC driver in classpath
- private static final String DRIVER_NAME = "com.mysql.jdbc.Driver";
- private static final String DLA_ESCAPE_CHARACTER = "";
- private static final String TBL_PROPERTIES_STR = "TBLPROPERTIES";
-
- static {
- try {
- Class.forName(DRIVER_NAME);
- } catch (ClassNotFoundException e) {
- throw new IllegalStateException("Could not find " + DRIVER_NAME + " in classpath. ", e);
- }
- }
-
- private Connection connection;
- private DLASyncConfig dlaConfig;
- private PartitionValueExtractor partitionValueExtractor;
-
- public HoodieDLAClient(DLASyncConfig syncConfig, FileSystem fs) {
- super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata,
- false, fs);
- this.dlaConfig = syncConfig;
- try {
- this.partitionValueExtractor =
- (PartitionValueExtractor) Class.forName(dlaConfig.partitionValueExtractorClass).newInstance();
- } catch (Exception e) {
- throw new HoodieException(
- "Failed to initialize PartitionValueExtractor class " + dlaConfig.partitionValueExtractorClass, e);
- }
- createDLAConnection();
- }
-
- private void createDLAConnection() {
- if (connection == null) {
- try {
- Class.forName(DRIVER_NAME);
- } catch (ClassNotFoundException e) {
- LOG.error("Unable to load DLA driver class", e);
- return;
- }
- try {
- this.connection = DriverManager.getConnection(dlaConfig.jdbcUrl, dlaConfig.dlaUser, dlaConfig.dlaPass);
- LOG.info("Successfully established DLA connection to " + dlaConfig.jdbcUrl);
- } catch (SQLException e) {
- throw new HoodieException("Cannot create dla connection ", e);
- }
- }
- }
-
- @Override
- public void createTable(String tableName, MessageType storageSchema, String inputFormatClass,
- String outputFormatClass, String serdeClass,
- Map serdeProperties, Map tableProperties) {
- try {
- String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, toHiveSyncConfig(),
- inputFormatClass, outputFormatClass, serdeClass, serdeProperties, tableProperties);
- LOG.info("Creating table with " + createSQLQuery);
- updateDLASQL(createSQLQuery);
- } catch (IOException e) {
- throw new HoodieException("Failed to create table " + tableName, e);
- }
- }
-
- public Map getTableSchema(String tableName) {
- if (!tableExists(tableName)) {
- throw new IllegalArgumentException(
- "Failed to get schema for table " + tableName + " does not exist");
- }
- Map schema = new HashMap<>();
- ResultSet result = null;
- try {
- DatabaseMetaData databaseMetaData = connection.getMetaData();
- result = databaseMetaData.getColumns(dlaConfig.databaseName, dlaConfig.databaseName, tableName, null);
- while (result.next()) {
- TYPE_CONVERTOR.doConvert(result, schema);
- }
- return schema;
- } catch (SQLException e) {
- throw new HoodieException("Failed to get table schema for " + tableName, e);
- } finally {
- closeQuietly(result, null);
- }
- }
-
- @Override
- public void addPartitionsToTable(String tableName, List partitionsToAdd) {
- if (partitionsToAdd.isEmpty()) {
- LOG.info("No partitions to add for " + tableName);
- return;
- }
- LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName);
- String sql = constructAddPartitions(tableName, partitionsToAdd);
- updateDLASQL(sql);
- }
-
- public String constructAddPartitions(String tableName, List partitions) {
- return constructDLAAddPartitions(tableName, partitions);
- }
-
- String generateAbsolutePathStr(Path path) {
- String absolutePathStr = path.toString();
- if (path.toUri().getScheme() == null) {
- absolutePathStr = getDefaultFs() + absolutePathStr;
- }
- return absolutePathStr.endsWith("/") ? absolutePathStr : absolutePathStr + "/";
- }
-
- public List constructChangePartitions(String tableName, List partitions) {
- List changePartitions = new ArrayList<>();
- String useDatabase = "USE " + DLA_ESCAPE_CHARACTER + dlaConfig.databaseName + DLA_ESCAPE_CHARACTER;
- changePartitions.add(useDatabase);
- String alterTable = "ALTER TABLE " + DLA_ESCAPE_CHARACTER + tableName + DLA_ESCAPE_CHARACTER;
- for (String partition : partitions) {
- String partitionClause = getPartitionClause(partition);
- Path partitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition);
- String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
- String changePartition =
- alterTable + " ADD IF NOT EXISTS PARTITION (" + partitionClause + ") LOCATION '" + fullPartitionPathStr + "'";
- changePartitions.add(changePartition);
- }
- return changePartitions;
- }
-
- /**
- * Generate Hive Partition from partition values.
- *
- * @param partition Partition path
- * @return
- */
- public String getPartitionClause(String partition) {
- List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
- ValidationUtils.checkArgument(dlaConfig.partitionFields.size() == partitionValues.size(),
- "Partition key parts " + dlaConfig.partitionFields + " does not match with partition values " + partitionValues
- + ". Check partition strategy. ");
- List partBuilder = new ArrayList<>();
- for (int i = 0; i < dlaConfig.partitionFields.size(); i++) {
- partBuilder.add(dlaConfig.partitionFields.get(i) + "='" + partitionValues.get(i) + "'");
- }
- return partBuilder.stream().collect(Collectors.joining(","));
- }
-
- private String constructDLAAddPartitions(String tableName, List partitions) {
- StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
- alterSQL.append(DLA_ESCAPE_CHARACTER).append(dlaConfig.databaseName)
- .append(DLA_ESCAPE_CHARACTER).append(".").append(DLA_ESCAPE_CHARACTER)
- .append(tableName).append(DLA_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS ");
- for (String partition : partitions) {
- String partitionClause = getPartitionClause(partition);
- Path partitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition);
- String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
- alterSQL.append(" PARTITION (").append(partitionClause).append(") LOCATION '").append(fullPartitionPathStr)
- .append("' ");
- }
- return alterSQL.toString();
- }
-
- private void updateDLASQL(String sql) {
- Statement stmt = null;
- try {
- stmt = connection.createStatement();
- LOG.info("Executing SQL " + sql);
- stmt.execute(sql);
- } catch (SQLException e) {
- throw new HoodieException("Failed in executing SQL " + sql, e);
- } finally {
- closeQuietly(null, stmt);
- }
- }
-
- @Override
- public boolean doesTableExist(String tableName) {
- return tableExists(tableName);
- }
-
- @Override
- public boolean tableExists(String tableName) {
- String sql = consutructShowCreateTableSQL(tableName);
- Statement stmt = null;
- ResultSet rs = null;
- try {
- stmt = connection.createStatement();
- rs = stmt.executeQuery(sql);
- } catch (SQLException e) {
- return false;
- } finally {
- closeQuietly(rs, stmt);
- }
- return true;
- }
-
- @Override
- public Option getLastCommitTimeSynced(String tableName) {
- String sql = consutructShowCreateTableSQL(tableName);
- Statement stmt = null;
- ResultSet rs = null;
- try {
- stmt = connection.createStatement();
- rs = stmt.executeQuery(sql);
- if (rs.next()) {
- String table = rs.getString(2);
- Map attr = new HashMap<>();
- int index = table.indexOf(TBL_PROPERTIES_STR);
- if (index != -1) {
- String sub = table.substring(index + TBL_PROPERTIES_STR.length());
- sub = sub.replaceAll("\\(", "").replaceAll("\\)", "").replaceAll("'", "");
- String[] str = sub.split(",");
-
- for (int i = 0; i < str.length; i++) {
- String key = str[i].split("=")[0].trim();
- String value = str[i].split("=")[1].trim();
- attr.put(key, value);
- }
- }
- return Option.ofNullable(attr.getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null));
- }
- return Option.empty();
- } catch (Exception e) {
- throw new HoodieHiveSyncException("Failed to get the last commit time synced from the table", e);
- } finally {
- closeQuietly(rs, stmt);
- }
- }
-
- @Override
- public void updateLastCommitTimeSynced(String tableName) {
- // TODO : dla do not support update tblproperties, so do nothing.
- }
-
- @Override
- public Option getLastReplicatedTime(String tableName) {
- // no op; unsupported
- return Option.empty();
- }
-
- @Override
- public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
- // no op; unsupported
- }
-
- @Override
- public void deleteLastReplicatedTimeStamp(String tableName) {
- // no op; unsupported
- }
-
- @Override
- public void updatePartitionsToTable(String tableName, List changedPartitions) {
- if (changedPartitions.isEmpty()) {
- LOG.info("No partitions to change for " + tableName);
- return;
- }
- LOG.info("Changing partitions " + changedPartitions.size() + " on " + tableName);
- List sqls = constructChangePartitions(tableName, changedPartitions);
- for (String sql : sqls) {
- updateDLASQL(sql);
- }
- }
-
- @Override
- public void dropPartitions(String tableName, List partitionsToDrop) {
- throw new UnsupportedOperationException("Not support dropPartitions yet.");
- }
-
- public Map, String> scanTablePartitions(String tableName) {
- String sql = constructShowPartitionSQL(tableName);
- Statement stmt = null;
- ResultSet rs = null;
- Map, String> partitions = new HashMap<>();
- try {
- stmt = connection.createStatement();
- LOG.info("Executing SQL " + sql);
- rs = stmt.executeQuery(sql);
- while (rs.next()) {
- if (rs.getMetaData().getColumnCount() > 0) {
- String str = rs.getString(1);
- if (!StringUtils.isNullOrEmpty(str)) {
- List values = partitionValueExtractor.extractPartitionValuesInPath(str);
- Path storagePartitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, String.join("/", values));
- String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
- partitions.put(values, fullStoragePartitionPath);
- }
- }
- }
- return partitions;
- } catch (SQLException e) {
- throw new HoodieException("Failed in executing SQL " + sql, e);
- } finally {
- closeQuietly(rs, stmt);
- }
- }
-
- public List getPartitionEvents(Map, String> tablePartitions, List partitionStoragePartitions) {
- Map paths = new HashMap<>();
-
- for (Map.Entry, String> entry : tablePartitions.entrySet()) {
- List partitionValues = entry.getKey();
- Collections.sort(partitionValues);
- String fullTablePartitionPath = entry.getValue();
- paths.put(String.join(", ", partitionValues), fullTablePartitionPath);
- }
- List events = new ArrayList<>();
- for (String storagePartition : partitionStoragePartitions) {
- Path storagePartitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, storagePartition);
- String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
- // Check if the partition values or if hdfs path is the same
- List storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
- if (dlaConfig.useDLASyncHiveStylePartitioning) {
- String partition = String.join("/", storagePartitionValues);
- storagePartitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition);
- fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
- }
- Collections.sort(storagePartitionValues);
- if (!storagePartitionValues.isEmpty()) {
- String storageValue = String.join(", ", storagePartitionValues);
- if (!paths.containsKey(storageValue)) {
- events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
- } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
- events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
- }
- }
- }
- return events;
- }
-
- public void updateTableDefinition(String tableName, SchemaDifference schemaDiff) {
- ValidationUtils.checkArgument(schemaDiff.getDeleteColumns().size() == 0, "not support delete columns");
- ValidationUtils.checkArgument(schemaDiff.getUpdateColumnTypes().size() == 0, "not support alter column type");
- Map columns = schemaDiff.getAddColumnTypes();
- for (Map.Entry entry : columns.entrySet()) {
- String columnName = entry.getKey();
- String columnType = entry.getValue();
- StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append(DLA_ESCAPE_CHARACTER)
- .append(dlaConfig.databaseName).append(DLA_ESCAPE_CHARACTER).append(".")
- .append(DLA_ESCAPE_CHARACTER).append(tableName)
- .append(DLA_ESCAPE_CHARACTER).append(" ADD COLUMNS(")
- .append(columnName).append(" ").append(columnType).append(" )");
- LOG.info("Updating table definition with " + sqlBuilder);
- updateDLASQL(sqlBuilder.toString());
- }
- }
-
- @Override
- public void close() {
- try {
- if (connection != null) {
- connection.close();
- }
- } catch (SQLException e) {
- LOG.error("Could not close connection ", e);
- }
- }
-
- private String constructShowPartitionSQL(String tableName) {
- String sql = "show partitions " + dlaConfig.databaseName + "." + tableName;
- return sql;
- }
-
- private String consutructShowCreateTableSQL(String tableName) {
- String sql = "show create table " + dlaConfig.databaseName + "." + tableName;
- return sql;
- }
-
- private String getDefaultFs() {
- return fs.getConf().get("fs.defaultFS");
- }
-
- private HiveSyncConfig toHiveSyncConfig() {
- HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
- hiveSyncConfig.partitionFields = dlaConfig.partitionFields;
- hiveSyncConfig.databaseName = dlaConfig.databaseName;
- Path basePath = new Path(dlaConfig.basePath);
- hiveSyncConfig.basePath = generateAbsolutePathStr(basePath);
- return hiveSyncConfig;
- }
-}
diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/util/Utils.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/util/Utils.java
deleted file mode 100644
index d1b0dd4e9d56f..0000000000000
--- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/util/Utils.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.dla.util;
-
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.dla.DLASyncConfig;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-
-public class Utils {
- public static String DLA_DATABASE_OPT_KEY = "hoodie.datasource.dla_sync.database";
- public static String DLA_TABLE_OPT_KEY = "hoodie.datasource.dla_sync.table";
- public static String DLA_USER_OPT_KEY = "hoodie.datasource.dla_sync.username";
- public static String DLA_PASS_OPT_KEY = "hoodie.datasource.dla_sync.password";
- public static String DLA_URL_OPT_KEY = "hoodie.datasource.dla_sync.jdbcurl";
- public static String BATH_PATH = "basePath";
- public static String DLA_PARTITION_FIELDS_OPT_KEY = "hoodie.datasource.dla_sync.partition_fields";
- public static String DLA_PARTITION_EXTRACTOR_CLASS_OPT_KEY = "hoodie.datasource.dla_sync.partition_extractor_class";
- public static String DLA_ASSUME_DATE_PARTITIONING = "hoodie.datasource.dla_sync.assume_date_partitioning";
- public static String DLA_SKIP_RO_SUFFIX = "hoodie.datasource.dla_sync.skip_ro_suffix";
- public static String DLA_SKIP_RT_SYNC = "hoodie.datasource.dla_sync.skip_rt_sync";
- public static String DLA_SYNC_HIVE_STYLE_PARTITIONING = "hoodie.datasource.dla_sync.hive.style.partitioning";
-
- public static TypedProperties configToProperties(DLASyncConfig cfg) {
- TypedProperties properties = new TypedProperties();
- properties.put(DLA_DATABASE_OPT_KEY, cfg.databaseName);
- properties.put(DLA_TABLE_OPT_KEY, cfg.tableName);
- properties.put(DLA_USER_OPT_KEY, cfg.dlaUser);
- properties.put(DLA_PASS_OPT_KEY, cfg.dlaPass);
- properties.put(DLA_URL_OPT_KEY, cfg.jdbcUrl);
- properties.put(BATH_PATH, cfg.basePath);
- properties.put(DLA_PARTITION_EXTRACTOR_CLASS_OPT_KEY, cfg.partitionValueExtractorClass);
- properties.put(DLA_ASSUME_DATE_PARTITIONING, String.valueOf(cfg.assumeDatePartitioning));
- properties.put(DLA_SKIP_RO_SUFFIX, String.valueOf(cfg.skipROSuffix));
- properties.put(DLA_SYNC_HIVE_STYLE_PARTITIONING, String.valueOf(cfg.useDLASyncHiveStylePartitioning));
- return properties;
- }
-
- public static DLASyncConfig propertiesToConfig(TypedProperties properties) {
- DLASyncConfig config = new DLASyncConfig();
- config.databaseName = properties.getProperty(DLA_DATABASE_OPT_KEY);
- config.tableName = properties.getProperty(DLA_TABLE_OPT_KEY);
- config.dlaUser = properties.getProperty(DLA_USER_OPT_KEY);
- config.dlaPass = properties.getProperty(DLA_PASS_OPT_KEY);
- config.jdbcUrl = properties.getProperty(DLA_URL_OPT_KEY);
- config.basePath = properties.getProperty(BATH_PATH);
- if (StringUtils.isNullOrEmpty(properties.getProperty(DLA_PARTITION_FIELDS_OPT_KEY))) {
- config.partitionFields = new ArrayList<>();
- } else {
- config.partitionFields = Arrays.asList(properties.getProperty(DLA_PARTITION_FIELDS_OPT_KEY).split(","));
- }
- config.partitionValueExtractorClass = properties.getProperty(DLA_PARTITION_EXTRACTOR_CLASS_OPT_KEY);
- config.assumeDatePartitioning = Boolean.parseBoolean(properties.getProperty(DLA_ASSUME_DATE_PARTITIONING, "false"));
- config.skipROSuffix = Boolean.parseBoolean(properties.getProperty(DLA_SKIP_RO_SUFFIX, "false"));
- config.skipRTSync = Boolean.parseBoolean(properties.getProperty(DLA_SKIP_RT_SYNC, "false"));
- config.useDLASyncHiveStylePartitioning = Boolean.parseBoolean(properties.getProperty(DLA_SYNC_HIVE_STYLE_PARTITIONING, "false"));
- return config;
- }
-}
diff --git a/hudi-sync/hudi-dla-sync/src/test/java/org/apache/hudi/dla/TestDLASyncConfig.java b/hudi-sync/hudi-dla-sync/src/test/java/org/apache/hudi/dla/TestDLASyncConfig.java
deleted file mode 100644
index 366d5a24efb06..0000000000000
--- a/hudi-sync/hudi-dla-sync/src/test/java/org/apache/hudi/dla/TestDLASyncConfig.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.dla;
-
-import org.junit.jupiter.api.Test;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-public class TestDLASyncConfig {
- @Test
- public void testCopy() {
- DLASyncConfig dlaSyncConfig = new DLASyncConfig();
- List partitions = Arrays.asList("a", "b");
- dlaSyncConfig.partitionFields = partitions;
- dlaSyncConfig.basePath = "/tmp";
- dlaSyncConfig.assumeDatePartitioning = true;
- dlaSyncConfig.databaseName = "test";
- dlaSyncConfig.tableName = "test";
- dlaSyncConfig.dlaUser = "dla";
- dlaSyncConfig.dlaPass = "dla";
- dlaSyncConfig.jdbcUrl = "jdbc:mysql://localhost:3306";
- dlaSyncConfig.skipROSuffix = false;
-
- DLASyncConfig copied = DLASyncConfig.copy(dlaSyncConfig);
-
- assertEquals(copied.partitionFields, dlaSyncConfig.partitionFields);
- assertEquals(copied.basePath, dlaSyncConfig.basePath);
- assertEquals(copied.assumeDatePartitioning, dlaSyncConfig.assumeDatePartitioning);
- assertEquals(copied.databaseName, dlaSyncConfig.databaseName);
- assertEquals(copied.tableName, dlaSyncConfig.tableName);
- assertEquals(copied.dlaUser, dlaSyncConfig.dlaUser);
- assertEquals(copied.dlaPass, dlaSyncConfig.dlaPass);
- assertEquals(copied.basePath, dlaSyncConfig.basePath);
- assertEquals(copied.jdbcUrl, dlaSyncConfig.jdbcUrl);
- assertEquals(copied.skipROSuffix, dlaSyncConfig.skipROSuffix);
- assertEquals(copied.supportTimestamp, dlaSyncConfig.supportTimestamp);
- }
-}
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 939fc114c0883..5e343b9a62a00 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -27,9 +27,8 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.InvalidTableException;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
-import org.apache.hudi.hive.util.ConfigUtils;
+import org.apache.hudi.sync.common.util.ConfigUtils;
import org.apache.hudi.hive.util.HiveSchemaUtil;
-import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.sync.common.AbstractSyncTool;
@@ -43,20 +42,13 @@
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static org.apache.parquet.schema.OriginalType.UTF8;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
-
/**
* Tool to sync a hoodie HDFS table with a hive metastore table. Either use it as a api
* HiveSyncTool.syncHoodieTable(HiveSyncConfig) or as a command line java -cp hoodie-hive-sync.jar HiveSyncTool [args]
@@ -248,8 +240,9 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea
Map tableProperties = ConfigUtils.toMap(hiveSyncConfig.tableProperties);
Map serdeProperties = ConfigUtils.toMap(hiveSyncConfig.serdeProperties);
if (hiveSyncConfig.syncAsSparkDataSourceTable) {
- Map sparkTableProperties = getSparkTableProperties(hiveSyncConfig.sparkSchemaLengthThreshold, schema);
- Map sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized);
+ Map sparkTableProperties = getSparkTableProperties(hiveSyncConfig.partitionFields,
+ hiveSyncConfig.sparkVersion, hiveSyncConfig.sparkSchemaLengthThreshold, schema);
+ Map sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized, hiveSyncConfig.basePath);
tableProperties.putAll(sparkTableProperties);
serdeProperties.putAll(sparkSerdeProperties);
}
@@ -309,75 +302,6 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea
return schemaChanged;
}
- /**
- * Get Spark Sql related table properties. This is used for spark datasource table.
- * @param schema The schema to write to the table.
- * @return A new parameters added the spark's table properties.
- */
- private Map getSparkTableProperties(int schemaLengthThreshold, MessageType schema) {
- // Convert the schema and partition info used by spark sql to hive table properties.
- // The following code refers to the spark code in
- // https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
- GroupType originGroupType = schema.asGroupType();
- List partitionNames = hiveSyncConfig.partitionFields;
- List partitionCols = new ArrayList<>();
- List dataCols = new ArrayList<>();
- Map column2Field = new HashMap<>();
-
- for (Type field : originGroupType.getFields()) {
- column2Field.put(field.getName(), field);
- }
- // Get partition columns and data columns.
- for (String partitionName : partitionNames) {
- // Default the unknown partition fields to be String.
- // Keep the same logical with HiveSchemaUtil#getPartitionKeyType.
- partitionCols.add(column2Field.getOrDefault(partitionName,
- new PrimitiveType(Type.Repetition.REQUIRED, BINARY, partitionName, UTF8)));
- }
-
- for (Type field : originGroupType.getFields()) {
- if (!partitionNames.contains(field.getName())) {
- dataCols.add(field);
- }
- }
-
- List reOrderedFields = new ArrayList<>();
- reOrderedFields.addAll(dataCols);
- reOrderedFields.addAll(partitionCols);
- GroupType reOrderedType = new GroupType(originGroupType.getRepetition(), originGroupType.getName(), reOrderedFields);
-
- Map sparkProperties = new HashMap<>();
- sparkProperties.put("spark.sql.sources.provider", "hudi");
- if (!StringUtils.isNullOrEmpty(hiveSyncConfig.sparkVersion)) {
- sparkProperties.put("spark.sql.create.version", hiveSyncConfig.sparkVersion);
- }
- // Split the schema string to multi-parts according the schemaLengthThreshold size.
- String schemaString = Parquet2SparkSchemaUtils.convertToSparkSchemaJson(reOrderedType);
- int numSchemaPart = (schemaString.length() + schemaLengthThreshold - 1) / schemaLengthThreshold;
- sparkProperties.put("spark.sql.sources.schema.numParts", String.valueOf(numSchemaPart));
- // Add each part of schema string to sparkProperties
- for (int i = 0; i < numSchemaPart; i++) {
- int start = i * schemaLengthThreshold;
- int end = Math.min(start + schemaLengthThreshold, schemaString.length());
- sparkProperties.put("spark.sql.sources.schema.part." + i, schemaString.substring(start, end));
- }
- // Add partition columns
- if (!partitionNames.isEmpty()) {
- sparkProperties.put("spark.sql.sources.schema.numPartCols", String.valueOf(partitionNames.size()));
- for (int i = 0; i < partitionNames.size(); i++) {
- sparkProperties.put("spark.sql.sources.schema.partCol." + i, partitionNames.get(i));
- }
- }
- return sparkProperties;
- }
-
- private Map getSparkSerdeProperties(boolean readAsOptimized) {
- Map sparkSerdeProperties = new HashMap<>();
- sparkSerdeProperties.put("path", hiveSyncConfig.basePath);
- sparkSerdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(readAsOptimized));
- return sparkSerdeProperties;
- }
-
/**
* Syncs the list of storage partitions passed in (checks if the partition is in hive, if not adds it or if the
* partition path does not match, it updates the partition path).
diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 1c2d53ed96ded..b801f4d7daa11 100644
--- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -28,7 +28,7 @@
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.hive.testutils.HiveTestUtil;
-import org.apache.hudi.hive.util.ConfigUtils;
+import org.apache.hudi.sync.common.util.ConfigUtils;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java
index 3ca31b04395a1..b6940629af3d2 100644
--- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java
+++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java
@@ -18,7 +18,7 @@
package org.apache.hudi.hive;
-import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils;
+import org.apache.hudi.sync.common.util.Parquet2SparkSchemaUtils;
import org.apache.spark.sql.execution.SparkSqlParser;
import org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter;
import org.apache.spark.sql.internal.SQLConf;
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java
index 680b4a17ef5d9..972ae1f96c512 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java
@@ -18,12 +18,26 @@
package org.apache.hudi.sync.common;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.sync.common.util.ConfigUtils;
+import org.apache.hudi.sync.common.util.Parquet2SparkSchemaUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+
/**
* Base class to sync Hudi meta data with Metastores to make
* Hudi table queryable through external systems.
@@ -46,4 +60,72 @@ public AbstractSyncTool(Properties props, FileSystem fileSystem) {
public abstract void syncHoodieTable();
+ /**
+ * Get Spark Sql related table properties. This is used for spark datasource table.
+ * @param schema The schema to write to the table.
+ * @return A new parameters added the spark's table properties.
+ */
+ protected Map getSparkTableProperties(List partitionNames, String sparkVersion,
+ int schemaLengthThreshold, MessageType schema) {
+ // Convert the schema and partition info used by spark sql to hive table properties.
+ // The following code refers to the spark code in
+ // https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+ GroupType originGroupType = schema.asGroupType();
+ List partitionCols = new ArrayList<>();
+ List dataCols = new ArrayList<>();
+ Map column2Field = new HashMap<>();
+
+ for (Type field : originGroupType.getFields()) {
+ column2Field.put(field.getName(), field);
+ }
+ // Get partition columns and data columns.
+ for (String partitionName : partitionNames) {
+ // Default the unknown partition fields to be String.
+ // Keep the same logical with HiveSchemaUtil#getPartitionKeyType.
+ partitionCols.add(column2Field.getOrDefault(partitionName,
+ new PrimitiveType(Type.Repetition.REQUIRED, BINARY, partitionName, UTF8)));
+ }
+
+ for (Type field : originGroupType.getFields()) {
+ if (!partitionNames.contains(field.getName())) {
+ dataCols.add(field);
+ }
+ }
+
+ List reOrderedFields = new ArrayList<>();
+ reOrderedFields.addAll(dataCols);
+ reOrderedFields.addAll(partitionCols);
+ GroupType reOrderedType = new GroupType(originGroupType.getRepetition(), originGroupType.getName(), reOrderedFields);
+
+ Map sparkProperties = new HashMap<>();
+ sparkProperties.put("spark.sql.sources.provider", "hudi");
+ if (!StringUtils.isNullOrEmpty(sparkVersion)) {
+ sparkProperties.put("spark.sql.create.version", sparkVersion);
+ }
+ // Split the schema string to multi-parts according the schemaLengthThreshold size.
+ String schemaString = Parquet2SparkSchemaUtils.convertToSparkSchemaJson(reOrderedType);
+ int numSchemaPart = (schemaString.length() + schemaLengthThreshold - 1) / schemaLengthThreshold;
+ sparkProperties.put("spark.sql.sources.schema.numParts", String.valueOf(numSchemaPart));
+ // Add each part of schema string to sparkProperties
+ for (int i = 0; i < numSchemaPart; i++) {
+ int start = i * schemaLengthThreshold;
+ int end = Math.min(start + schemaLengthThreshold, schemaString.length());
+ sparkProperties.put("spark.sql.sources.schema.part." + i, schemaString.substring(start, end));
+ }
+ // Add partition columns
+ if (!partitionNames.isEmpty()) {
+ sparkProperties.put("spark.sql.sources.schema.numPartCols", String.valueOf(partitionNames.size()));
+ for (int i = 0; i < partitionNames.size(); i++) {
+ sparkProperties.put("spark.sql.sources.schema.partCol." + i, partitionNames.get(i));
+ }
+ }
+ return sparkProperties;
+ }
+
+ protected Map getSparkSerdeProperties(boolean readAsOptimized, String basePath) {
+ Map sparkSerdeProperties = new HashMap<>();
+ sparkSerdeProperties.put("path", basePath);
+ sparkSerdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(readAsOptimized));
+ return sparkSerdeProperties;
+ }
}
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ConfigUtils.java
similarity index 98%
rename from hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java
rename to hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ConfigUtils.java
index 94ebdaadd8ff3..ca5224aef4697 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ConfigUtils.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.hive.util;
+package org.apache.hudi.sync.common.util;
import java.util.HashMap;
import java.util.Map;
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/Parquet2SparkSchemaUtils.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/Parquet2SparkSchemaUtils.java
similarity index 99%
rename from hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/Parquet2SparkSchemaUtils.java
rename to hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/Parquet2SparkSchemaUtils.java
index debc262b5518c..c5b98c17eb4a1 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/Parquet2SparkSchemaUtils.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/Parquet2SparkSchemaUtils.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.hive.util;
+package org.apache.hudi.sync.common.util;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.parquet.schema.GroupType;
diff --git a/hudi-sync/pom.xml b/hudi-sync/pom.xml
index 0ee145418f5ee..ffcbac8a652ef 100644
--- a/hudi-sync/pom.xml
+++ b/hudi-sync/pom.xml
@@ -32,7 +32,7 @@
hudi-datahub-sync
- hudi-dla-sync
+ hudi-adb-sync
hudi-hive-sync
hudi-sync-common