diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index 6123f64cd59f3..0d490c4ab14a1 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.sync.common.HoodieSyncConfig; import com.beust.jcommander.Parameter; @@ -69,6 +70,7 @@ public static String getBucketSpec(String bucketCols, int bucketNum) { public HiveSyncConfig(Properties props) { super(props); + validateParameters(); } public HiveSyncConfig(Properties props, Configuration hadoopConf) { @@ -78,6 +80,7 @@ public HiveSyncConfig(Properties props, Configuration hadoopConf) { // HiveConf needs to load fs conf to allow instantiation via AWSGlueClientFactory hiveConf.addResource(getHadoopFileSystem().getConf()); setHadoopConf(hiveConf); + validateParameters(); } public HiveConf getHiveConf() { @@ -171,4 +174,8 @@ public TypedProperties toProps() { return props; } } + + public void validateParameters() { + ValidationUtils.checkArgument(getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM) > 0, "batch-sync-num for sync hive table must be greater than 0, pls check your parameter"); + } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java index adb8ce988d320..c14536a27743b 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.StorageSchemes; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; @@ -47,12 +48,14 @@ import org.apache.parquet.schema.MessageType; import org.apache.thrift.TException; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; @@ -192,18 +195,23 @@ public void addPartitionsToTable(String tableName, List partitionsToAdd) LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName); try { StorageDescriptor sd = client.getTable(databaseName, tableName).getSd(); - List partitionList = partitionsToAdd.stream().map(partition -> { - StorageDescriptor partitionSd = new StorageDescriptor(); - partitionSd.setCols(sd.getCols()); - partitionSd.setInputFormat(sd.getInputFormat()); - partitionSd.setOutputFormat(sd.getOutputFormat()); - partitionSd.setSerdeInfo(sd.getSerdeInfo()); - String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.getString(META_SYNC_BASE_PATH), partition).toString(); - List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); - partitionSd.setLocation(fullPartitionPath); - return new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null); - }).collect(Collectors.toList()); - client.add_partitions(partitionList, true, false); + int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); + for (List batch : CollectionUtils.batches(partitionsToAdd, batchSyncPartitionNum)) { + List partitionList = new ArrayList<>(); + batch.forEach(x -> { + StorageDescriptor partitionSd = new StorageDescriptor(); + partitionSd.setCols(sd.getCols()); + partitionSd.setInputFormat(sd.getInputFormat()); + partitionSd.setOutputFormat(sd.getOutputFormat()); + partitionSd.setSerdeInfo(sd.getSerdeInfo()); + String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.getString(META_SYNC_BASE_PATH), x).toString(); + List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(x); + partitionSd.setLocation(fullPartitionPath); + partitionList.add(new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null)); + }); + client.add_partitions(partitionList, true, false); + LOG.info("HMSDDLExecutor add a batch partitions done: " + partitionList.size()); + } } catch (TException e) { LOG.error(databaseName + "." + tableName + " add partition failed", e); throw new HoodieHiveSyncException(databaseName + "." + tableName + " add partition failed", e); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java index 2673e46a9f745..30d0843bf5e0b 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java @@ -160,9 +160,6 @@ public void dropPartitionsToTable(String tableName, List partitionsToDro } private List constructDropPartitions(String tableName, List partitions) { - if (config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM) <= 0) { - throw new HoodieHiveSyncException("batch-sync-num for sync hive table must be greater than 0, pls check your parameter"); - } List result = new ArrayList<>(); int batchSyncPartitionNum = config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); StringBuilder alterSQL = getAlterTableDropPrefix(tableName); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java index ea39dba7d0cd0..6f0d8e51d351f 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java @@ -156,9 +156,6 @@ public void updateTableComments(String tableName, Map constructAddPartitions(String tableName, List partitions) { - if (config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM) <= 0) { - throw new HoodieHiveSyncException("batch-sync-num for sync hive table must be greater than 0, pls check your parameter"); - } List result = new ArrayList<>(); int batchSyncPartitionNum = config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); StringBuilder alterSQL = getAlterTablePrefix(tableName);