partitionFields = new ArrayList<>();
+
+ @Parameter(names = "--partition-value-extractor", description = "Class which implements PartitionValueExtractor "
+ + "to extract the partition values from HDFS path")
+ public String partitionValueExtractorClass = SlashEncodedDayPartitionValueExtractor.class.getName();
+
+ @Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this"
+ + " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter")
+ public Boolean assumeDatePartitioning = false;
+
+ @Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
+ public Boolean skipROSuffix = false;
+
+ @Parameter(names = {"--hive-style-partitioning"}, description = "Use DLA hive style partitioning, true if like the following style: field1=value1/field2=value2")
+ public Boolean useDLASyncHiveStylePartitioning = false;
+
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+
+ public static DLASyncConfig copy(DLASyncConfig cfg) {
+ DLASyncConfig newConfig = new DLASyncConfig();
+ newConfig.databaseName = cfg.databaseName;
+ newConfig.tableName = cfg.tableName;
+ newConfig.dlaUser = cfg.dlaUser;
+ newConfig.dlaPass = cfg.dlaPass;
+ newConfig.jdbcUrl = cfg.jdbcUrl;
+ newConfig.basePath = cfg.basePath;
+ newConfig.partitionFields = cfg.partitionFields;
+ newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass;
+ newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
+ newConfig.skipROSuffix = cfg.skipROSuffix;
+ newConfig.useDLASyncHiveStylePartitioning = cfg.useDLASyncHiveStylePartitioning;
+ return newConfig;
+ }
+
+ @Override
+ public String toString() {
+ return "DLASyncConfig{databaseName='" + databaseName + '\'' + ", tableName='" + tableName + '\''
+ + ", dlaUser='" + dlaUser + '\'' + ", dlaPass='" + dlaPass + '\'' + ", jdbcUrl='" + jdbcUrl + '\''
+ + ", basePath='" + basePath + '\'' + ", partitionFields=" + partitionFields + ", partitionValueExtractorClass='"
+ + partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning
+ + ", useDLASyncHiveStylePartitioning=" + useDLASyncHiveStylePartitioning
+ + ", help=" + help + '}';
+ }
+}
diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java
new file mode 100644
index 0000000000000..1ece9548785f7
--- /dev/null
+++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.dla;
+
+import com.beust.jcommander.JCommander;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.dla.util.Utils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+import org.apache.hudi.hive.SchemaDifference;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * Tool to sync a hoodie table with a dla table. Either use it as a api
+ * DLASyncTool.syncHoodieTable(DLASyncConfig) or as a command line java -cp hoodie-hive.jar DLASyncTool [args]
+ *
+ * This utility will get the schema from the latest commit and will sync dla table schema Also this will sync the
+ * partitions incrementally (all the partitions modified since the last commit)
+ */
+@SuppressWarnings("WeakerAccess")
+public class DLASyncTool extends AbstractSyncTool {
+
+ private static final Logger LOG = LogManager.getLogger(DLASyncTool.class);
+ public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
+ public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
+
+ private final DLASyncConfig cfg;
+ private final HoodieDLAClient hoodieDLAClient;
+ private final String snapshotTableName;
+ private final Option roTableTableName;
+
+ public DLASyncTool(Properties properties, FileSystem fs) {
+ super(properties, fs);
+ this.hoodieDLAClient = new HoodieDLAClient(Utils.propertiesToConfig(properties), fs);
+ this.cfg = Utils.propertiesToConfig(properties);
+ switch (hoodieDLAClient.getTableType()) {
+ case COPY_ON_WRITE:
+ this.snapshotTableName = cfg.tableName;
+ this.roTableTableName = Option.empty();
+ break;
+ case MERGE_ON_READ:
+ this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
+ this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
+ Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
+ break;
+ default:
+ LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
+ throw new InvalidTableException(hoodieDLAClient.getBasePath());
+ }
+ }
+
+ @Override
+ public void syncHoodieTable() {
+ try {
+ switch (hoodieDLAClient.getTableType()) {
+ case COPY_ON_WRITE:
+ syncHoodieTable(snapshotTableName, false);
+ break;
+ case MERGE_ON_READ:
+ // sync a RO table for MOR
+ syncHoodieTable(roTableTableName.get(), false);
+ // sync a RT table for MOR
+ syncHoodieTable(snapshotTableName, true);
+ break;
+ default:
+ LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
+ throw new InvalidTableException(hoodieDLAClient.getBasePath());
+ }
+ } catch (RuntimeException re) {
+ LOG.error("Got runtime exception when dla syncing", re);
+ } finally {
+ hoodieDLAClient.close();
+ }
+ }
+
+ private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) {
+ LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieDLAClient.getBasePath()
+ + " of type " + hoodieDLAClient.getTableType());
+ // Check if the necessary table exists
+ boolean tableExists = hoodieDLAClient.doesTableExist(tableName);
+ // Get the parquet schema for this table looking at the latest commit
+ MessageType schema = hoodieDLAClient.getDataSchema();
+ // Sync schema if needed
+ syncSchema(tableName, tableExists, useRealtimeInputFormat, schema);
+
+ LOG.info("Schema sync complete. Syncing partitions for " + tableName);
+ // Get the last time we successfully synced partitions
+ // TODO : once DLA supports alter table properties
+ Option lastCommitTimeSynced = Option.empty();
+ /*if (tableExists) {
+ lastCommitTimeSynced = hoodieDLAClient.getLastCommitTimeSynced(tableName);
+ }*/
+ LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null"));
+ List writtenPartitionsSince = hoodieDLAClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
+ LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size());
+ // Sync the partitions if needed
+ syncPartitions(tableName, writtenPartitionsSince);
+
+ hoodieDLAClient.updateLastCommitTimeSynced(tableName);
+ LOG.info("Sync complete for " + tableName);
+ }
+
+ /**
+ * Get the latest schema from the last commit and check if its in sync with the dla table schema. If not, evolves the
+ * table schema.
+ *
+ * @param tableExists - does table exist
+ * @param schema - extracted schema
+ */
+ private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat, MessageType schema) {
+ // Check and sync schema
+ if (!tableExists) {
+ LOG.info("DLA table " + tableName + " is not found. Creating it");
+ if (!useRealTimeInputFormat) {
+ String inputFormatClassName = HoodieParquetInputFormat.class.getName();
+ hoodieDLAClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
+ ParquetHiveSerDe.class.getName());
+ } else {
+ // Custom serde will not work with ALTER TABLE REPLACE COLUMNS
+ // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
+ // /ql/exec/DDLTask.java#L3488
+ String inputFormatClassName = HoodieParquetRealtimeInputFormat.class.getName();
+ hoodieDLAClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
+ ParquetHiveSerDe.class.getName());
+ }
+ } else {
+ // Check if the table schema has evolved
+ Map tableSchema = hoodieDLAClient.getTableSchema(tableName);
+ SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields);
+ if (!schemaDiff.isEmpty()) {
+ LOG.info("Schema difference found for " + tableName);
+ hoodieDLAClient.updateTableDefinition(tableName, schemaDiff);
+ } else {
+ LOG.info("No Schema difference for " + tableName);
+ }
+ }
+ }
+
+ /**
+ * Syncs the list of storage parititions passed in (checks if the partition is in dla, if not adds it or if the
+ * partition path does not match, it updates the partition path).
+ */
+ private void syncPartitions(String tableName, List writtenPartitionsSince) {
+ try {
+ if (cfg.partitionFields.isEmpty()) {
+ LOG.info("not a partitioned table.");
+ return;
+ }
+ Map, String> partitions = hoodieDLAClient.scanTablePartitions(tableName);
+ List partitionEvents =
+ hoodieDLAClient.getPartitionEvents(partitions, writtenPartitionsSince);
+ List newPartitions = filterPartitions(partitionEvents, AbstractSyncHoodieClient.PartitionEvent.PartitionEventType.ADD);
+ LOG.info("New Partitions " + newPartitions);
+ hoodieDLAClient.addPartitionsToTable(tableName, newPartitions);
+ List updatePartitions = filterPartitions(partitionEvents, AbstractSyncHoodieClient.PartitionEvent.PartitionEventType.UPDATE);
+ LOG.info("Changed Partitions " + updatePartitions);
+ hoodieDLAClient.updatePartitionsToTable(tableName, updatePartitions);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to sync partitions for table " + tableName, e);
+ }
+ }
+
+ private List filterPartitions(List events, AbstractSyncHoodieClient.PartitionEvent.PartitionEventType eventType) {
+ return events.stream().filter(s -> s.eventType == eventType).map(s -> s.storagePartition)
+ .collect(Collectors.toList());
+ }
+
+ public static void main(String[] args) {
+ // parse the params
+ final DLASyncConfig cfg = new DLASyncConfig();
+ JCommander cmd = new JCommander(cfg, null, args);
+ if (cfg.help || args.length == 0) {
+ cmd.usage();
+ System.exit(1);
+ }
+ FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+ new DLASyncTool(Utils.configToProperties(cfg), fs).syncHoodieTable();
+ }
+}
diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
new file mode 100644
index 0000000000000..34a96c9ad8fe6
--- /dev/null
+++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.dla;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hive.HiveSyncConfig;
+import org.apache.hudi.hive.HoodieHiveSyncException;
+import org.apache.hudi.hive.PartitionValueExtractor;
+import org.apache.hudi.hive.SchemaDifference;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class HoodieDLAClient extends AbstractSyncHoodieClient {
+ private static final Logger LOG = LogManager.getLogger(HoodieDLAClient.class);
+ private static final String HOODIE_LAST_COMMIT_TIME_SYNC = "hoodie_last_sync";
+ // Make sure we have the dla JDBC driver in classpath
+ private static final String DRIVER_NAME = "com.mysql.jdbc.Driver";
+ private static final String DLA_ESCAPE_CHARACTER = "";
+ private static final String TBL_PROPERTIES_STR = "TBLPROPERTIES";
+
+ static {
+ try {
+ Class.forName(DRIVER_NAME);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Could not find " + DRIVER_NAME + " in classpath. ", e);
+ }
+ }
+
+ private Connection connection;
+ private DLASyncConfig dlaConfig;
+ private PartitionValueExtractor partitionValueExtractor;
+
+ public HoodieDLAClient(DLASyncConfig syncConfig, FileSystem fs) {
+ super(syncConfig.basePath, syncConfig.assumeDatePartitioning, fs);
+ this.dlaConfig = syncConfig;
+ try {
+ this.partitionValueExtractor =
+ (PartitionValueExtractor) Class.forName(dlaConfig.partitionValueExtractorClass).newInstance();
+ } catch (Exception e) {
+ throw new HoodieException(
+ "Failed to initialize PartitionValueExtractor class " + dlaConfig.partitionValueExtractorClass, e);
+ }
+ createDLAConnection();
+ }
+
+ private void createDLAConnection() {
+ if (connection == null) {
+ try {
+ Class.forName(DRIVER_NAME);
+ } catch (ClassNotFoundException e) {
+ LOG.error("Unable to load DLA driver class", e);
+ return;
+ }
+ try {
+ this.connection = DriverManager.getConnection(dlaConfig.jdbcUrl, dlaConfig.dlaUser, dlaConfig.dlaPass);
+ LOG.info("Successfully established DLA connection to " + dlaConfig.jdbcUrl);
+ } catch (SQLException e) {
+ throw new HoodieException("Cannot create dla connection ", e);
+ }
+ }
+ }
+
+ @Override
+ public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass) {
+ try {
+ String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, toHiveSyncConfig(), inputFormatClass, outputFormatClass, serdeClass);
+ LOG.info("Creating table with " + createSQLQuery);
+ updateDLASQL(createSQLQuery);
+ } catch (IOException e) {
+ throw new HoodieException("Failed to create table " + tableName, e);
+ }
+ }
+
+ public Map getTableSchema(String tableName) {
+ if (!doesTableExist(tableName)) {
+ throw new IllegalArgumentException(
+ "Failed to get schema for table " + tableName + " does not exist");
+ }
+ Map schema = new HashMap<>();
+ ResultSet result = null;
+ try {
+ DatabaseMetaData databaseMetaData = connection.getMetaData();
+ result = databaseMetaData.getColumns(dlaConfig.databaseName, dlaConfig.databaseName, tableName, null);
+ while (result.next()) {
+ String columnName = result.getString(4);
+ String columnType = result.getString(6);
+ if ("DECIMAL".equals(columnType)) {
+ int columnSize = result.getInt("COLUMN_SIZE");
+ int decimalDigits = result.getInt("DECIMAL_DIGITS");
+ columnType += String.format("(%s,%s)", columnSize, decimalDigits);
+ }
+ schema.put(columnName, columnType);
+ }
+ return schema;
+ } catch (SQLException e) {
+ throw new HoodieException("Failed to get table schema for " + tableName, e);
+ } finally {
+ closeQuietly(result, null);
+ }
+ }
+
+ @Override
+ public void addPartitionsToTable(String tableName, List partitionsToAdd) {
+ if (partitionsToAdd.isEmpty()) {
+ LOG.info("No partitions to add for " + tableName);
+ return;
+ }
+ LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName);
+ String sql = constructAddPartitions(tableName, partitionsToAdd);
+ updateDLASQL(sql);
+ }
+
+ public String constructAddPartitions(String tableName, List partitions) {
+ return constructDLAAddPartitions(tableName, partitions);
+ }
+
+ String generateAbsolutePathStr(Path path) {
+ String absolutePathStr = path.toString();
+ if (path.toUri().getScheme() == null) {
+ absolutePathStr = getDefaultFs() + absolutePathStr;
+ }
+ return absolutePathStr.endsWith("/") ? absolutePathStr : absolutePathStr + "/";
+ }
+
+ public List constructChangePartitions(String tableName, List partitions) {
+ List changePartitions = new ArrayList<>();
+ String useDatabase = "USE " + DLA_ESCAPE_CHARACTER + dlaConfig.databaseName + DLA_ESCAPE_CHARACTER;
+ changePartitions.add(useDatabase);
+ String alterTable = "ALTER TABLE " + DLA_ESCAPE_CHARACTER + tableName + DLA_ESCAPE_CHARACTER;
+ for (String partition : partitions) {
+ String partitionClause = getPartitionClause(partition);
+ Path partitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition);
+ String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
+ String changePartition =
+ alterTable + " ADD IF NOT EXISTS PARTITION (" + partitionClause + ") LOCATION '" + fullPartitionPathStr + "'";
+ changePartitions.add(changePartition);
+ }
+ return changePartitions;
+ }
+
+ /**
+ * Generate Hive Partition from partition values.
+ *
+ * @param partition Partition path
+ * @return
+ */
+ public String getPartitionClause(String partition) {
+ List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
+ ValidationUtils.checkArgument(dlaConfig.partitionFields.size() == partitionValues.size(),
+ "Partition key parts " + dlaConfig.partitionFields + " does not match with partition values " + partitionValues
+ + ". Check partition strategy. ");
+ List partBuilder = new ArrayList<>();
+ for (int i = 0; i < dlaConfig.partitionFields.size(); i++) {
+ partBuilder.add(dlaConfig.partitionFields.get(i) + "='" + partitionValues.get(i) + "'");
+ }
+ return partBuilder.stream().collect(Collectors.joining(","));
+ }
+
+ private String constructDLAAddPartitions(String tableName, List partitions) {
+ StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
+ alterSQL.append(DLA_ESCAPE_CHARACTER).append(dlaConfig.databaseName)
+ .append(DLA_ESCAPE_CHARACTER).append(".").append(DLA_ESCAPE_CHARACTER)
+ .append(tableName).append(DLA_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS ");
+ for (String partition : partitions) {
+ String partitionClause = getPartitionClause(partition);
+ Path partitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition);
+ String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
+ alterSQL.append(" PARTITION (").append(partitionClause).append(") LOCATION '").append(fullPartitionPathStr)
+ .append("' ");
+ }
+ return alterSQL.toString();
+ }
+
+ private void updateDLASQL(String sql) {
+ Statement stmt = null;
+ try {
+ stmt = connection.createStatement();
+ LOG.info("Executing SQL " + sql);
+ stmt.execute(sql);
+ } catch (SQLException e) {
+ throw new HoodieException("Failed in executing SQL " + sql, e);
+ } finally {
+ closeQuietly(null, stmt);
+ }
+ }
+
+ @Override
+ public boolean doesTableExist(String tableName) {
+ String sql = consutructShowCreateTableSQL(tableName);
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ stmt = connection.createStatement();
+ rs = stmt.executeQuery(sql);
+ } catch (SQLException e) {
+ return false;
+ } finally {
+ closeQuietly(rs, stmt);
+ }
+ return true;
+ }
+
+ @Override
+ public Option getLastCommitTimeSynced(String tableName) {
+ String sql = consutructShowCreateTableSQL(tableName);
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ stmt = connection.createStatement();
+ rs = stmt.executeQuery(sql);
+ if (rs.next()) {
+ String table = rs.getString(2);
+ Map attr = new HashMap<>();
+ int index = table.indexOf(TBL_PROPERTIES_STR);
+ if (index != -1) {
+ String sub = table.substring(index + TBL_PROPERTIES_STR.length());
+ sub = sub.replaceAll("\\(", "").replaceAll("\\)", "").replaceAll("'", "");
+ String[] str = sub.split(",");
+
+ for (int i = 0; i < str.length; i++) {
+ String key = str[i].split("=")[0].trim();
+ String value = str[i].split("=")[1].trim();
+ attr.put(key, value);
+ }
+ }
+ return Option.ofNullable(attr.getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null));
+ }
+ return Option.empty();
+ } catch (Exception e) {
+ throw new HoodieHiveSyncException("Failed to get the last commit time synced from the table", e);
+ } finally {
+ closeQuietly(rs, stmt);
+ }
+ }
+
+ @Override
+ public void updateLastCommitTimeSynced(String tableName) {
+ // TODO : dla do not support update tblproperties, so do nothing.
+ }
+
+ @Override
+ public void updatePartitionsToTable(String tableName, List changedPartitions) {
+ if (changedPartitions.isEmpty()) {
+ LOG.info("No partitions to change for " + tableName);
+ return;
+ }
+ LOG.info("Changing partitions " + changedPartitions.size() + " on " + tableName);
+ List sqls = constructChangePartitions(tableName, changedPartitions);
+ for (String sql : sqls) {
+ updateDLASQL(sql);
+ }
+ }
+
+ public Map, String> scanTablePartitions(String tableName) {
+ String sql = constructShowPartitionSQL(tableName);
+ Statement stmt = null;
+ ResultSet rs = null;
+ Map, String> partitions = new HashMap<>();
+ try {
+ stmt = connection.createStatement();
+ LOG.info("Executing SQL " + sql);
+ rs = stmt.executeQuery(sql);
+ while (rs.next()) {
+ if (rs.getMetaData().getColumnCount() > 0) {
+ String str = rs.getString(1);
+ if (!StringUtils.isNullOrEmpty(str)) {
+ List values = partitionValueExtractor.extractPartitionValuesInPath(str);
+ Path storagePartitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, String.join("/", values));
+ String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
+ partitions.put(values, fullStoragePartitionPath);
+ }
+ }
+ }
+ return partitions;
+ } catch (SQLException e) {
+ throw new HoodieException("Failed in executing SQL " + sql, e);
+ } finally {
+ closeQuietly(rs, stmt);
+ }
+ }
+
+ public List getPartitionEvents(Map, String> tablePartitions, List partitionStoragePartitions) {
+ Map paths = new HashMap<>();
+
+ for (Map.Entry, String> entry : tablePartitions.entrySet()) {
+ List partitionValues = entry.getKey();
+ Collections.sort(partitionValues);
+ String fullTablePartitionPath = entry.getValue();
+ paths.put(String.join(", ", partitionValues), fullTablePartitionPath);
+ }
+ List events = new ArrayList<>();
+ for (String storagePartition : partitionStoragePartitions) {
+ Path storagePartitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, storagePartition);
+ String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
+ // Check if the partition values or if hdfs path is the same
+ List storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
+ if (dlaConfig.useDLASyncHiveStylePartitioning) {
+ String partition = String.join("/", storagePartitionValues);
+ storagePartitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition);
+ fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
+ }
+ Collections.sort(storagePartitionValues);
+ if (!storagePartitionValues.isEmpty()) {
+ String storageValue = String.join(", ", storagePartitionValues);
+ if (!paths.containsKey(storageValue)) {
+ events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
+ } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
+ events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
+ }
+ }
+ }
+ return events;
+ }
+
+ public void updateTableDefinition(String tableName, SchemaDifference schemaDiff) {
+ ValidationUtils.checkArgument(schemaDiff.getDeleteColumns().size() == 0, "not support delete columns");
+ ValidationUtils.checkArgument(schemaDiff.getUpdateColumnTypes().size() == 0, "not support alter column type");
+ Map columns = schemaDiff.getAddColumnTypes();
+ for (Map.Entry entry : columns.entrySet()) {
+ String columnName = entry.getKey();
+ String columnType = entry.getValue();
+ StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append(DLA_ESCAPE_CHARACTER)
+ .append(dlaConfig.databaseName).append(DLA_ESCAPE_CHARACTER).append(".")
+ .append(DLA_ESCAPE_CHARACTER).append(tableName)
+ .append(DLA_ESCAPE_CHARACTER).append(" ADD COLUMNS(")
+ .append(columnName).append(" ").append(columnType).append(" )");
+ LOG.info("Updating table definition with " + sqlBuilder);
+ updateDLASQL(sqlBuilder.toString());
+ }
+ }
+
+ public void close() {
+ try {
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (SQLException e) {
+ LOG.error("Could not close connection ", e);
+ }
+ }
+
+ private String constructShowPartitionSQL(String tableName) {
+ String sql = "show partitions " + dlaConfig.databaseName + "." + tableName;
+ return sql;
+ }
+
+ private String consutructShowCreateTableSQL(String tableName) {
+ String sql = "show create table " + dlaConfig.databaseName + "." + tableName;
+ return sql;
+ }
+
+ private String getDefaultFs() {
+ return fs.getConf().get("fs.defaultFS");
+ }
+
+ private HiveSyncConfig toHiveSyncConfig() {
+ HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
+ hiveSyncConfig.partitionFields = dlaConfig.partitionFields;
+ hiveSyncConfig.databaseName = dlaConfig.databaseName;
+ Path basePath = new Path(dlaConfig.basePath);
+ hiveSyncConfig.basePath = generateAbsolutePathStr(basePath);
+ return hiveSyncConfig;
+ }
+}
diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/util/Utils.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/util/Utils.java
new file mode 100644
index 0000000000000..636fd0b8b6c87
--- /dev/null
+++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/util/Utils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.dla.util;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.dla.DLASyncConfig;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Properties;
+
+public class Utils {
+ public static String DLA_DATABASE_OPT_KEY = "hoodie.datasource.dla_sync.database";
+ public static String DLA_TABLE_OPT_KEY = "hoodie.datasource.dla_sync.table";
+ public static String DLA_USER_OPT_KEY = "hoodie.datasource.dla_sync.username";
+ public static String DLA_PASS_OPT_KEY = "hoodie.datasource.dla_sync.password";
+ public static String DLA_URL_OPT_KEY = "hoodie.datasource.dla_sync.jdbcurl";
+ public static String BATH_PATH = "basePath";
+ public static String DLA_PARTITION_FIELDS_OPT_KEY = "hoodie.datasource.dla_sync.partition_fields";
+ public static String DLA_PARTITION_EXTRACTOR_CLASS_OPT_KEY = "hoodie.datasource.dla_sync.partition_extractor_class";
+ public static String DLA_ASSUME_DATE_PARTITIONING = "hoodie.datasource.dla_sync.assume_date_partitioning";
+ public static String DLA_SKIP_RO_SUFFIX = "hoodie.datasource.dla_sync.skip_ro_suffix";
+ public static String DLA_SYNC_HIVE_STYLE_PARTITIONING = "hoodie.datasource.dla_sync.hive.style.partitioning";
+
+ public static Properties configToProperties(DLASyncConfig cfg) {
+ Properties properties = new Properties();
+ properties.put(DLA_DATABASE_OPT_KEY, cfg.databaseName);
+ properties.put(DLA_TABLE_OPT_KEY, cfg.tableName);
+ properties.put(DLA_USER_OPT_KEY, cfg.dlaUser);
+ properties.put(DLA_PASS_OPT_KEY, cfg.dlaPass);
+ properties.put(DLA_URL_OPT_KEY, cfg.jdbcUrl);
+ properties.put(BATH_PATH, cfg.basePath);
+ properties.put(DLA_PARTITION_EXTRACTOR_CLASS_OPT_KEY, cfg.partitionValueExtractorClass);
+ properties.put(DLA_ASSUME_DATE_PARTITIONING, String.valueOf(cfg.assumeDatePartitioning));
+ properties.put(DLA_SKIP_RO_SUFFIX, String.valueOf(cfg.skipROSuffix));
+ properties.put(DLA_SYNC_HIVE_STYLE_PARTITIONING, String.valueOf(cfg.useDLASyncHiveStylePartitioning));
+ return properties;
+ }
+
+ public static DLASyncConfig propertiesToConfig(Properties properties) {
+ DLASyncConfig config = new DLASyncConfig();
+ config.databaseName = properties.getProperty(DLA_DATABASE_OPT_KEY);
+ config.tableName = properties.getProperty(DLA_TABLE_OPT_KEY);
+ config.dlaUser = properties.getProperty(DLA_USER_OPT_KEY);
+ config.dlaPass = properties.getProperty(DLA_PASS_OPT_KEY);
+ config.jdbcUrl = properties.getProperty(DLA_URL_OPT_KEY);
+ config.basePath = properties.getProperty(BATH_PATH);
+ if (StringUtils.isNullOrEmpty(properties.getProperty(DLA_PARTITION_FIELDS_OPT_KEY))) {
+ config.partitionFields = new ArrayList<>();
+ } else {
+ config.partitionFields = Arrays.asList(properties.getProperty(DLA_PARTITION_FIELDS_OPT_KEY).split(","));
+ }
+ config.partitionValueExtractorClass = properties.getProperty(DLA_PARTITION_EXTRACTOR_CLASS_OPT_KEY);
+ config.assumeDatePartitioning = Boolean.parseBoolean(properties.getProperty(DLA_ASSUME_DATE_PARTITIONING, "false"));
+ config.skipROSuffix = Boolean.parseBoolean(properties.getProperty(DLA_SKIP_RO_SUFFIX, "false"));
+ config.useDLASyncHiveStylePartitioning = Boolean.parseBoolean(properties.getProperty(DLA_SYNC_HIVE_STYLE_PARTITIONING, "false"));
+ return config;
+ }
+}
diff --git a/hudi-sync/hudi-dla-sync/src/test/java/org/apache/hudi/dla/TestDLASyncConfig.java b/hudi-sync/hudi-dla-sync/src/test/java/org/apache/hudi/dla/TestDLASyncConfig.java
new file mode 100644
index 0000000000000..5f24f8b934ff0
--- /dev/null
+++ b/hudi-sync/hudi-dla-sync/src/test/java/org/apache/hudi/dla/TestDLASyncConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.dla;
+
+import org.junit.jupiter.api.Test;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+public class TestDLASyncConfig {
+ @Test
+ public void testCopy() {
+ DLASyncConfig dlaSyncConfig = new DLASyncConfig();
+ List partitions = Arrays.asList("a", "b");
+ dlaSyncConfig.partitionFields = partitions;
+ dlaSyncConfig.basePath = "/tmp";
+ dlaSyncConfig.assumeDatePartitioning = true;
+ dlaSyncConfig.databaseName = "test";
+ dlaSyncConfig.tableName = "test";
+ dlaSyncConfig.dlaUser = "dla";
+ dlaSyncConfig.dlaPass = "dla";
+ dlaSyncConfig.jdbcUrl = "jdbc:mysql://localhost:3306";
+ dlaSyncConfig.skipROSuffix = false;
+
+ DLASyncConfig copied = DLASyncConfig.copy(dlaSyncConfig);
+
+ assertEquals(copied.partitionFields, dlaSyncConfig.partitionFields);
+ assertEquals(copied.basePath, dlaSyncConfig.basePath);
+ assertEquals(copied.assumeDatePartitioning, dlaSyncConfig.assumeDatePartitioning);
+ assertEquals(copied.databaseName, dlaSyncConfig.databaseName);
+ assertEquals(copied.tableName, dlaSyncConfig.tableName);
+ assertEquals(copied.dlaUser, dlaSyncConfig.dlaUser);
+ assertEquals(copied.dlaPass, dlaSyncConfig.dlaPass);
+ assertEquals(copied.basePath, dlaSyncConfig.basePath);
+ assertEquals(copied.jdbcUrl, dlaSyncConfig.jdbcUrl);
+ assertEquals(copied.skipROSuffix, dlaSyncConfig.skipROSuffix);
+ }
+}
diff --git a/hudi-hive-sync/src/test/resources/log4j-surefire-quiet.properties b/hudi-sync/hudi-dla-sync/src/test/resources/log4j-surefire-quiet.properties
similarity index 100%
rename from hudi-hive-sync/src/test/resources/log4j-surefire-quiet.properties
rename to hudi-sync/hudi-dla-sync/src/test/resources/log4j-surefire-quiet.properties
diff --git a/hudi-hive-sync/src/test/resources/log4j-surefire.properties b/hudi-sync/hudi-dla-sync/src/test/resources/log4j-surefire.properties
similarity index 100%
rename from hudi-hive-sync/src/test/resources/log4j-surefire.properties
rename to hudi-sync/hudi-dla-sync/src/test/resources/log4j-surefire.properties
diff --git a/hudi-hive-sync/pom.xml b/hudi-sync/hudi-hive-sync/pom.xml
similarity index 96%
rename from hudi-hive-sync/pom.xml
rename to hudi-sync/hudi-hive-sync/pom.xml
index 9d4b8e275e74b..70549c96a514d 100644
--- a/hudi-hive-sync/pom.xml
+++ b/hudi-sync/hudi-hive-sync/pom.xml
@@ -20,7 +20,9 @@
hudi
org.apache.hudi
0.6.0-SNAPSHOT
+ ../../pom.xml
+
4.0.0
hudi-hive-sync
@@ -43,6 +45,11 @@
hudi-hadoop-mr
${project.version}