diff --git a/hudi-spark/pom.xml b/hudi-spark/pom.xml index e670ff729ecaf..2f9a57b27aa3d 100644 --- a/hudi-spark/pom.xml +++ b/hudi-spark/pom.xml @@ -186,6 +186,11 @@ hudi-hive-sync ${project.version} + + org.apache.hudi + hudi-sync-common + ${project.version} + diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 5195f05742730..cd70ccb6080c0 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -250,6 +250,7 @@ object DataSourceWriteOptions { */ val STREAMING_IGNORE_FAILED_BATCH_OPT_KEY = "hoodie.datasource.write.streaming.ignore.failed.batch" val DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL = "true" + val SYNC_CLIENT_TOOL_CLASS = "hoodie.sync.client.tool.class" // HIVE SYNC SPECIFIC CONFIGS //NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 5774c896d970e..c7b30ad6ccaec 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -18,6 +18,7 @@ package org.apache.hudi import java.util +import java.util.Properties import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord @@ -30,9 +31,11 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecordPayload import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.HoodieActiveTimeline +import org.apache.hudi.common.util.ReflectionUtils import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} +import org.apache.hudi.sync.common.AbstractSyncTool import org.apache.log4j.LogManager import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD @@ -289,6 +292,24 @@ private[hudi] object HoodieSparkSqlWriter { } else { true } + val clientImpls = parameters.get(SYNC_CLIENT_TOOL_CLASS).getOrElse("") + log.info(s"impls is $clientImpls") + if (!clientImpls.isEmpty) { + val impls = clientImpls.split(",") + impls.foreach(impl => { + if (!impl.trim.contains(classOf[HiveSyncTool].getName)) { + val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration) + val properties = new Properties(); + properties.putAll(parameters) + properties.put("basePath", basePath.toString) + val syncHoodie = ReflectionUtils.loadClass(impl.trim, Array[Class[_]](classOf[Properties], classOf[FileSystem]), properties, fs).asInstanceOf[AbstractSyncTool] + syncHoodie.syncHoodieTable() + true + } else { + log.warn("please use hoodie.datasource.hive_sync.enable to sync to hive") + } + }) + } client.close() commitSuccess && syncHiveSucess } else { diff --git a/hudi-hive-sync/pom.xml b/hudi-sync/hudi-hive-sync/pom.xml similarity index 96% rename from hudi-hive-sync/pom.xml rename to hudi-sync/hudi-hive-sync/pom.xml index 9d4b8e275e74b..4b210294714a0 100644 --- a/hudi-hive-sync/pom.xml +++ b/hudi-sync/hudi-hive-sync/pom.xml @@ -20,7 +20,9 @@ hudi org.apache.hudi 0.6.0-SNAPSHOT + ../../hudi + 4.0.0 hudi-hive-sync @@ -43,6 +45,11 @@ hudi-hadoop-mr ${project.version} + + org.apache.hudi + hudi-sync-common + ${project.version} + diff --git a/hudi-hive-sync/run_sync_tool.sh b/hudi-sync/hudi-hive-sync/run_sync_tool.sh similarity index 100% rename from hudi-hive-sync/run_sync_tool.sh rename to hudi-sync/hudi-hive-sync/run_sync_tool.sh diff --git a/hudi-hive-sync/src/assembly/src.xml b/hudi-sync/hudi-hive-sync/src/assembly/src.xml similarity index 100% rename from hudi-hive-sync/src/assembly/src.xml rename to hudi-sync/hudi-hive-sync/src/assembly/src.xml diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java similarity index 100% rename from hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java rename to hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java similarity index 96% rename from hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java rename to hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 877ba4791ba97..16e168add39b9 100644 --- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -23,8 +23,8 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.InvalidTableException; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; -import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent; -import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; import org.apache.hudi.hive.util.HiveSchemaUtil; import com.beust.jcommander.JCommander; @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hudi.sync.common.AbstractSyncTool; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.MessageType; @@ -49,7 +50,7 @@ * partitions incrementally (all the partitions modified since the last commit) */ @SuppressWarnings("WeakerAccess") -public class HiveSyncTool { +public class HiveSyncTool extends AbstractSyncTool { private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class); public static final String SUFFIX_SNAPSHOT_TABLE = "_rt"; @@ -61,6 +62,7 @@ public class HiveSyncTool { private final Option roTableTableName; public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) { + super(configuration.getAllProperties(), fs); this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs); this.cfg = cfg; // Set partitionFields to empty, when the NonPartitionedExtractor is used @@ -84,6 +86,7 @@ public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) { } } + @Override public void syncHoodieTable() { try { switch (hoodieHiveClient.getTableType()) { diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java similarity index 82% rename from hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java rename to hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index f1034e3cde832..cbd74b9818ef0 100644 --- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -20,14 +20,9 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.StorageSchemes; -import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hadoop.fs.FileSystem; @@ -43,6 +38,7 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.jdbc.HiveDriver; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.MessageType; @@ -62,7 +58,7 @@ import java.util.Map; import java.util.stream.Collectors; -public class HoodieHiveClient { +public class HoodieHiveClient extends AbstractSyncHoodieClient { private static final String HOODIE_LAST_COMMIT_TIME_SYNC = "last_commit_time_sync"; // Make sure we have the hive JDBC driver in classpath @@ -78,8 +74,6 @@ public class HoodieHiveClient { } private static final Logger LOG = LogManager.getLogger(HoodieHiveClient.class); - private final HoodieTableMetaClient metaClient; - private final HoodieTableType tableType; private final PartitionValueExtractor partitionValueExtractor; private IMetaStoreClient client; private HiveSyncConfig syncConfig; @@ -89,10 +83,9 @@ public class HoodieHiveClient { private HiveConf configuration; public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) { + super(cfg.basePath, cfg.assumeDatePartitioning, fs); this.syncConfig = cfg; this.fs = fs; - this.metaClient = new HoodieTableMetaClient(fs.getConf(), cfg.basePath, true); - this.tableType = metaClient.getTableType(); this.configuration = configuration; // Support both JDBC and metastore based implementations for backwards compatiblity. Future users should @@ -125,7 +118,8 @@ public HoodieTimeline getActiveTimeline() { /** * Add the (NEW) partitions to the table. */ - void addPartitionsToTable(String tableName, List partitionsToAdd) { + @Override + public void addPartitionsToTable(String tableName, List partitionsToAdd) { if (partitionsToAdd.isEmpty()) { LOG.info("No partitions to add for " + tableName); return; @@ -138,7 +132,8 @@ void addPartitionsToTable(String tableName, List partitionsToAdd) { /** * Partition path has changed - update the path for te following partitions. */ - void updatePartitionsToTable(String tableName, List changedPartitions) { + @Override + public void updatePartitionsToTable(String tableName, List changedPartitions) { if (changedPartitions.isEmpty()) { LOG.info("No partitions to change for " + tableName); return; @@ -258,7 +253,8 @@ void updateTableDefinition(String tableName, MessageType newSchema) { } } - void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass) { + @Override + public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass) { try { String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, syncConfig, inputFormatClass, outputFormatClass, serdeClass); @@ -272,6 +268,7 @@ void createTable(String tableName, MessageType storageSchema, String inputFormat /** * Get the table schema. */ + //???? overwrite public Map getTableSchema(String tableName) { if (syncConfig.useJdbc) { if (!doesTableExist(tableName)) { @@ -327,24 +324,10 @@ public Map getTableSchemaUsingMetastoreClient(String tableName) } } - /** - * Gets the schema for a hoodie table. Depending on the type of table, try to read schema from commit metadata if - * present, else fallback to reading from any file written in the latest commit. We will assume that the schema has - * not changed within a single atomic write. - * - * @return Parquet schema for this table - */ - public MessageType getDataSchema() { - try { - return new TableSchemaResolver(metaClient).getTableParquetSchema(); - } catch (Exception e) { - throw new HoodieHiveSyncException("Failed to read data schema", e); - } - } - /** * @return true if the configured table exists */ + @Override public boolean doesTableExist(String tableName) { try { return client.tableExists(syncConfig.databaseName, tableName); @@ -455,36 +438,7 @@ private String getHiveJdbcUrlWithDefaultDBName() { return hiveJdbcUrl + (urlAppend == null ? "" : urlAppend); } - private static void closeQuietly(ResultSet resultSet, Statement stmt) { - try { - if (stmt != null) { - stmt.close(); - } - } catch (SQLException e) { - LOG.error("Could not close the statement opened ", e); - } - - try { - if (resultSet != null) { - resultSet.close(); - } - } catch (SQLException e) { - LOG.error("Could not close the resultset opened ", e); - } - } - - public String getBasePath() { - return metaClient.getBasePath(); - } - - HoodieTableType getTableType() { - return tableType; - } - - public FileSystem getFs() { - return fs; - } - + @Override public Option getLastCommitTimeSynced(String tableName) { // Get the last commit time from the TBLproperties try { @@ -509,33 +463,12 @@ public void close() { } } - List getPartitionsWrittenToSince(Option lastCommitTimeSynced) { - if (!lastCommitTimeSynced.isPresent()) { - LOG.info("Last commit time synced is not known, listing all partitions in " + syncConfig.basePath + ",FS :" + fs); - try { - return FSUtils.getAllPartitionPaths(fs, syncConfig.basePath, syncConfig.assumeDatePartitioning); - } catch (IOException e) { - throw new HoodieIOException("Failed to list all partitions in " + syncConfig.basePath, e); - } - } else { - LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then"); - - HoodieTimeline timelineToSync = activeTimeline.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE); - return timelineToSync.getInstants().map(s -> { - try { - return HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(s).get(), HoodieCommitMetadata.class); - } catch (IOException e) { - throw new HoodieIOException("Failed to get partitions written since " + lastCommitTimeSynced, e); - } - }).flatMap(s -> s.getPartitionToWriteStats().keySet().stream()).distinct().collect(Collectors.toList()); - } - } - List getAllTables(String db) throws Exception { return client.getAllTables(db); } - void updateLastCommitTimeSynced(String tableName) { + @Override + public void updateLastCommitTimeSynced(String tableName) { // Set the last commit time from the TBLproperties String lastCommitSynced = activeTimeline.lastInstant().get().getTimestamp(); try { @@ -546,30 +479,4 @@ void updateLastCommitTimeSynced(String tableName) { throw new HoodieHiveSyncException("Failed to get update last commit time synced to " + lastCommitSynced, e); } } - - /** - * Partition Event captures any partition that needs to be added or updated. - */ - static class PartitionEvent { - - public enum PartitionEventType { - ADD, UPDATE - } - - PartitionEventType eventType; - String storagePartition; - - PartitionEvent(PartitionEventType eventType, String storagePartition) { - this.eventType = eventType; - this.storagePartition = storagePartition; - } - - static PartitionEvent newPartitionAddEvent(String storagePartition) { - return new PartitionEvent(PartitionEventType.ADD, storagePartition); - } - - static PartitionEvent newPartitionUpdateEvent(String storagePartition) { - return new PartitionEvent(PartitionEventType.UPDATE, storagePartition); - } - } } \ No newline at end of file diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncException.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncException.java similarity index 100% rename from hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncException.java rename to hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncException.java diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/MultiPartKeysValueExtractor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/MultiPartKeysValueExtractor.java similarity index 100% rename from hudi-hive-sync/src/main/java/org/apache/hudi/hive/MultiPartKeysValueExtractor.java rename to hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/MultiPartKeysValueExtractor.java diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/NonPartitionedExtractor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/NonPartitionedExtractor.java similarity index 100% rename from hudi-hive-sync/src/main/java/org/apache/hudi/hive/NonPartitionedExtractor.java rename to hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/NonPartitionedExtractor.java diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/PartitionValueExtractor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/PartitionValueExtractor.java similarity index 100% rename from hudi-hive-sync/src/main/java/org/apache/hudi/hive/PartitionValueExtractor.java rename to hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/PartitionValueExtractor.java diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SchemaDifference.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SchemaDifference.java similarity index 98% rename from hudi-hive-sync/src/main/java/org/apache/hudi/hive/SchemaDifference.java rename to hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SchemaDifference.java index bef40d564df28..2a4d3ebb7c8e1 100644 --- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SchemaDifference.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SchemaDifference.java @@ -20,12 +20,14 @@ import org.apache.parquet.schema.MessageType; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.StringJoiner; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; + /** * Represents the schema difference between the storage schema and hive table schema. @@ -91,7 +93,7 @@ public Builder(MessageType storageSchema, Map tableSchema) { this.tableSchema = tableSchema; deleteColumns = new ArrayList<>(); updateColumnTypes = new HashMap<>(); - addColumnTypes = new HashMap<>(); + addColumnTypes = new LinkedHashMap<>(); } public Builder deleteTableColumn(String column) { diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedDayPartitionValueExtractor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedDayPartitionValueExtractor.java similarity index 100% rename from hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedDayPartitionValueExtractor.java rename to hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedDayPartitionValueExtractor.java diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ColumnNameXLator.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ColumnNameXLator.java similarity index 100% rename from hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ColumnNameXLator.java rename to hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ColumnNameXLator.java diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java similarity index 100% rename from hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java rename to hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java similarity index 99% rename from hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java rename to hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index cab26e06921a5..17bc2155c31c6 100644 --- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -21,8 +21,8 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.Option; -import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent; -import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; import org.apache.hudi.hive.testutils.HiveTestUtil; import org.apache.hudi.hive.util.HiveSchemaUtil; diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java similarity index 100% rename from hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java rename to hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java similarity index 100% rename from hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java rename to hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java diff --git a/hudi-hive-sync/src/test/resources/log4j-surefire-quiet.properties b/hudi-sync/hudi-hive-sync/src/test/resources/log4j-surefire-quiet.properties similarity index 100% rename from hudi-hive-sync/src/test/resources/log4j-surefire-quiet.properties rename to hudi-sync/hudi-hive-sync/src/test/resources/log4j-surefire-quiet.properties diff --git a/hudi-hive-sync/src/test/resources/log4j-surefire.properties b/hudi-sync/hudi-hive-sync/src/test/resources/log4j-surefire.properties similarity index 100% rename from hudi-hive-sync/src/test/resources/log4j-surefire.properties rename to hudi-sync/hudi-hive-sync/src/test/resources/log4j-surefire.properties diff --git a/hudi-sync/hudi-sync-common/pom.xml b/hudi-sync/hudi-sync-common/pom.xml new file mode 100644 index 0000000000000..6042efbf513a9 --- /dev/null +++ b/hudi-sync/hudi-sync-common/pom.xml @@ -0,0 +1,71 @@ + + + + + hudi + org.apache.hudi + 0.6.0-SNAPSHOT + ../../hudi + + 4.0.0 + + hudi-sync-common + jar + + + ${project.parent.basedir} + + + + + org.apache.hudi + hudi-common + ${project.version} + + + org.apache.parquet + parquet-avro + + + org.apache.hadoop + hadoop-common + + + + + + + src/main/resources + + + + + org.apache.maven.plugins + maven-jar-plugin + ${maven-jar-plugin.version} + + + + test-jar + + + + + + + diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java new file mode 100644 index 0000000000000..1fec416829909 --- /dev/null +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sync.common; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.stream.Collectors; + +public abstract class AbstractSyncHoodieClient { + private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class); + protected final HoodieTableMetaClient metaClient; + protected HoodieTimeline activeTimeline; + protected final HoodieTableType tableType; + protected final FileSystem fs; + private String basePath; + private boolean assumeDatePartitioning; + + public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, FileSystem fs) { + this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true); + this.tableType = metaClient.getTableType(); + this.basePath = basePath; + this.assumeDatePartitioning = assumeDatePartitioning; + this.fs = fs; + this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + } + + public abstract void createTable(String tableName, MessageType storageSchema, + String inputFormatClass, String outputFormatClass, String serdeClass); + + public abstract boolean doesTableExist(String tableName); + + public abstract Option getLastCommitTimeSynced(String tableName); + + public abstract void updateLastCommitTimeSynced(String tableName); + + public abstract void addPartitionsToTable(String tableName, List partitionsToAdd); + + public abstract void updatePartitionsToTable(String tableName, List changedPartitions); + + public HoodieTimeline getActiveTimeline() { + return activeTimeline; + } + + public HoodieTableType getTableType() { + return tableType; + } + + public String getBasePath() { + return metaClient.getBasePath(); + } + + public FileSystem getFs() { + return fs; + } + + public void closeQuietly(ResultSet resultSet, Statement stmt) { + try { + if (stmt != null) { + stmt.close(); + } + } catch (SQLException e) { + LOG.error("Could not close the statement opened ", e); + } + + try { + if (resultSet != null) { + resultSet.close(); + } + } catch (SQLException e) { + LOG.error("Could not close the resultset opened ", e); + } + } + + /** + * Gets the schema for a hoodie table. Depending on the type of table, try to read schema from commit metadata if + * present, else fallback to reading from any file written in the latest commit. We will assume that the schema has + * not changed within a single atomic write. + * + * @return Parquet schema for this table + */ + public MessageType getDataSchema() { + try { + return new TableSchemaResolver(metaClient).getTableParquetSchema(); + } catch (Exception e) { + throw new HoodieSyncException("Failed to read data schema", e); + } + } + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + public List getPartitionsWrittenToSince(Option lastCommitTimeSynced) { + if (!lastCommitTimeSynced.isPresent()) { + LOG.info("Last commit time synced is not known, listing all partitions in " + basePath + ",FS :" + fs); + try { + return FSUtils.getAllPartitionPaths(fs, basePath, assumeDatePartitioning); + } catch (IOException e) { + throw new HoodieIOException("Failed to list all partitions in " + basePath, e); + } + } else { + LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then"); + + HoodieTimeline timelineToSync = activeTimeline.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE); + return timelineToSync.getInstants().map(s -> { + try { + return HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(s).get(), HoodieCommitMetadata.class); + } catch (IOException e) { + throw new HoodieIOException("Failed to get partitions written since " + lastCommitTimeSynced, e); + } + }).flatMap(s -> s.getPartitionToWriteStats().keySet().stream()).distinct().collect(Collectors.toList()); + } + } + + private MessageType readSchemaFromLastCompaction(Option lastCompactionCommitOpt) throws IOException { + HoodieInstant lastCompactionCommit = lastCompactionCommitOpt.orElseThrow(() -> new HoodieSyncException( + "Could not read schema from last compaction, no compaction commits found on path " + basePath)); + + // Read from the compacted file wrote + HoodieCommitMetadata compactionMetadata = HoodieCommitMetadata + .fromBytes(activeTimeline.getInstantDetails(lastCompactionCommit).get(), HoodieCommitMetadata.class); + String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny() + .orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for compaction " + + lastCompactionCommit + ", could not get schema for table " + metaClient.getBasePath())); + return readSchemaFromBaseFile(new Path(filePath)); + } + + /** + * Read the parquet schema from a parquet File. + */ + private MessageType readSchemaFromBaseFile(Path parquetFilePath) throws IOException { + LOG.info("Reading schema from " + parquetFilePath); + if (!fs.exists(parquetFilePath)) { + throw new IllegalArgumentException( + "Failed to read schema from data file " + parquetFilePath + ". File does not exist."); + } + ParquetMetadata fileFooter = + ParquetFileReader.readFooter(fs.getConf(), parquetFilePath, ParquetMetadataConverter.NO_FILTER); + return fileFooter.getFileMetaData().getSchema(); + } + + /** + * Read the schema from the log file on path. + */ + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + private MessageType readSchemaFromLogFile(Option lastCompactionCommitOpt, Path path) throws IOException { + MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(fs, path); + // Fall back to read the schema from last compaction + if (messageType == null) { + LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt); + return readSchemaFromLastCompaction(lastCompactionCommitOpt); + } + return messageType; + } + + /** + * Partition Event captures any partition that needs to be added or updated. + */ + public static class PartitionEvent { + + public enum PartitionEventType { + ADD, UPDATE + } + + public PartitionEventType eventType; + public String storagePartition; + + PartitionEvent(PartitionEventType eventType, String storagePartition) { + this.eventType = eventType; + this.storagePartition = storagePartition; + } + + public static PartitionEvent newPartitionAddEvent(String storagePartition) { + return new PartitionEvent(PartitionEventType.ADD, storagePartition); + } + + public static PartitionEvent newPartitionUpdateEvent(String storagePartition) { + return new PartitionEvent(PartitionEventType.UPDATE, storagePartition); + } + } +} diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java new file mode 100644 index 0000000000000..6621468ee2e25 --- /dev/null +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sync.common; + +import org.apache.hadoop.fs.FileSystem; + +import java.util.Properties; + +public abstract class AbstractSyncTool { + protected Properties props; + protected FileSystem fileSystem; + + public AbstractSyncTool(Properties props, FileSystem fileSystem) { + this.props = props; + this.fileSystem = fileSystem; + } + + public abstract void syncHoodieTable(); + +} diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncException.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncException.java new file mode 100644 index 0000000000000..d7238fbe8bf98 --- /dev/null +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sync.common; + +public class HoodieSyncException extends RuntimeException { + + public HoodieSyncException() { + super(); + } + + public HoodieSyncException(String message) { + super(message); + } + + public HoodieSyncException(String message, Throwable t) { + super(message, t); + } + + public HoodieSyncException(Throwable t) { + super(t); + } + + protected static String format(String message, Object... args) { + return String.format(String.valueOf(message), (Object[]) args); + } +} diff --git a/hudi-sync/pom.xml b/hudi-sync/pom.xml new file mode 100644 index 0000000000000..e976b8dcc3d7c --- /dev/null +++ b/hudi-sync/pom.xml @@ -0,0 +1,37 @@ + + + + + hudi + org.apache.hudi + 0.6.0-SNAPSHOT + + 4.0.0 + + hudi-sync + pom + + + ${project.parent.basedir} + + + + hudi-sync-common + hudi-hive-sync + + diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 0d3e90c49be54..8f3a9f240a8f5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -23,6 +23,7 @@ import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -30,6 +31,8 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; @@ -38,6 +41,7 @@ import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.sync.common.AbstractSyncTool; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; @@ -63,10 +67,11 @@ import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Objects; +import java.util.Properties; +import java.util.List; +import java.util.ArrayList; import java.util.function.Function; import java.util.stream.Collectors; @@ -389,6 +394,7 @@ private Option writeToSink(JavaRDD records, String checkpo long totalRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue(); boolean hasErrors = totalErrorRecords > 0; long hiveSyncTimeMs = 0; + long metaSyncTimeMs = 0; if (!hasErrors || cfg.commitOnErrors) { HashMap checkpointCommitMetadata = new HashMap<>(); checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr); @@ -415,6 +421,9 @@ private Option writeToSink(JavaRDD records, String checkpo Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext(); syncHive(); hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0; + Timer.Context metaSyncContext = metrics.getMetaSyncTimerContext(); + syncMeta(); + metaSyncTimeMs = metaSyncContext != null ? metaSyncContext.stop() : 0; } } else { LOG.info("Commit " + instantTime + " failed!"); @@ -436,11 +445,32 @@ private Option writeToSink(JavaRDD records, String checkpo long overallTimeMs = overallTimerContext != null ? overallTimerContext.stop() : 0; // Send DeltaStreamer Metrics - metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs); + metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs, true); + metrics.updateDeltaStreamerMetrics(overallTimeMs, metaSyncTimeMs, false); return scheduledCompactionInstant; } + private void syncMeta() { + if (!StringUtils.isNullOrEmpty(cfg.syncClientToolClass)) { + String[] impls = cfg.syncClientToolClass.split(","); + for (String impl : impls) { + impl = impl.trim(); + if (HiveSyncTool.class.getName().equals(impl)) { + LOG.warn("please use hoodie.datasource.hive_sync.enable to sync to hive"); + continue; + } + FileSystem fs = FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()); + Properties properties = new Properties(); + properties.putAll(props); + properties.put("basePath", cfg.targetBasePath); + AbstractSyncTool tool = (AbstractSyncTool) ReflectionUtils.loadClass(impl, new Class[]{Properties.class, FileSystem.class}, properties, fs); + tool.syncHoodieTable(); + } + } + } + + /** * Try to start a new commit. *

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index ccd5c49e186b3..8c2254916393d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -237,6 +237,9 @@ public static class Config implements Serializable { @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive") public Boolean enableHiveSync = false; + @Parameter(names = {"--hoodie-sync-client-tool-class"}, description = "Meta sync client tool, using comma to separate multi tools") + public String syncClientToolClass = ""; + @Parameter(names = {"--max-pending-compactions"}, description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless" + "outstanding compactions is less than this number") diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java index a054b277a8d8f..76252f2dfda27 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java @@ -30,8 +30,10 @@ public class HoodieDeltaStreamerMetrics { public String overallTimerName = null; public String hiveSyncTimerName = null; + public String metaSyncTimerName = null; private Timer overallTimer = null; public Timer hiveSyncTimer = null; + public Timer metaSyncTimer = null; public HoodieDeltaStreamerMetrics(HoodieWriteConfig config) { this.config = config; @@ -40,6 +42,7 @@ public HoodieDeltaStreamerMetrics(HoodieWriteConfig config) { Metrics.init(config); this.overallTimerName = getMetricsName("timer", "deltastreamer"); this.hiveSyncTimerName = getMetricsName("timer", "deltastreamerHiveSync"); + this.metaSyncTimerName = getMetricsName("timer", "deltastreamerMetaSync"); } } @@ -57,6 +60,13 @@ public Timer.Context getHiveSyncTimerContext() { return hiveSyncTimer == null ? null : hiveSyncTimer.time(); } + public Timer.Context getMetaSyncTimerContext() { + if (config.isMetricsOn() && metaSyncTimer == null) { + metaSyncTimer = createTimer(metaSyncTimerName); + } + return metaSyncTimer == null ? null : metaSyncTimer.time(); + } + private Timer createTimer(String name) { return config.isMetricsOn() ? Metrics.getInstance().getRegistry().timer(name) : null; } @@ -65,10 +75,15 @@ String getMetricsName(String action, String metric) { return config == null ? null : String.format("%s.%s.%s", tableName, action, metric); } - public void updateDeltaStreamerMetrics(long durationInNs, long hiveSyncNs) { + public void updateDeltaStreamerMetrics(long durationInNs, long syncNs, boolean hiveSync) { if (config.isMetricsOn()) { Metrics.registerGauge(getMetricsName("deltastreamer", "duration"), getDurationInMs(durationInNs)); - Metrics.registerGauge(getMetricsName("deltastreamer", "hiveSyncDuration"), getDurationInMs(hiveSyncNs)); + if (hiveSync) { + Metrics.registerGauge(getMetricsName("deltastreamer", "hiveSyncDuration"), getDurationInMs(syncNs)); + } else { + Metrics.registerGauge(getMetricsName("deltastreamer", "metaSyncDuration"), getDurationInMs(syncNs)); + } + } } diff --git a/pom.xml b/pom.xml index 05907f34845ab..46d93023ba309 100644 --- a/pom.xml +++ b/pom.xml @@ -39,10 +39,10 @@ hudi-cli hudi-client hudi-hadoop-mr - hudi-hive-sync hudi-spark hudi-timeline-service hudi-utilities + hudi-sync packaging/hudi-hadoop-mr-bundle packaging/hudi-hive-sync-bundle packaging/hudi-spark-bundle