Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,11 @@ object DataSourceWriteOptions {
.defaultValue(false)
.withDocumentation("Whether to sync the table as managed table.")

val HIVE_BATCH_SYNC_PARTITION_NUM: ConfigProperty[Int] = ConfigProperty
.key("hoodie.datasource.hive_sync.batch_num")
.defaultValue(1000)
.withDocumentation("The number of partitions one batch when synchronous partitions to hive.")

// Async Compaction - Enabled by default for MOR
val ASYNC_COMPACT_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.compaction.async.enable")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ object HoodieSparkSqlWriter {
hiveSyncConfig.supportTimestamp = hoodieConfig.getStringOrDefault(HIVE_SUPPORT_TIMESTAMP).toBoolean
hiveSyncConfig.autoCreateDatabase = hoodieConfig.getStringOrDefault(HIVE_AUTO_CREATE_DATABASE_OPT_KEY).toBoolean
hiveSyncConfig.decodePartition = hoodieConfig.getStringOrDefault(URL_ENCODE_PARTITIONING_OPT_KEY).toBoolean
hiveSyncConfig.batchSyncNum = hoodieConfig.getStringOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM).toInt

val syncAsDtaSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean
if (syncAsDtaSourceTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--managed-table"}, description = "Create a managed table")
public Boolean createManagedTable = false;

@Parameter(names = {"--batch-sync-num"}, description = "The number of partitions one batch when synchronous partitions to hive")
public Integer batchSyncNum = 1000;

// enhance the similar function in child class
public static HiveSyncConfig copy(HiveSyncConfig cfg) {
HiveSyncConfig newConfig = new HiveSyncConfig();
Expand All @@ -127,6 +130,7 @@ public static HiveSyncConfig copy(HiveSyncConfig cfg) {
newConfig.tableProperties = cfg.tableProperties;
newConfig.serdeProperties = cfg.serdeProperties;
newConfig.createManagedTable = cfg.createManagedTable;
newConfig.batchSyncNum = cfg.batchSyncNum;
return newConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ public void addPartitionsToTable(String tableName, List<String> partitionsToAdd)
return;
}
LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName);
String sql = constructAddPartitions(tableName, partitionsToAdd);
updateHiveSQL(sql);
List<String> sqls = constructAddPartitions(tableName, partitionsToAdd);
sqls.stream().forEach(sql -> updateHiveSQL(sql));
}

/**
Expand Down Expand Up @@ -180,18 +180,36 @@ public void updateTableProperties(String tableName, Map<String, String> tablePro
}
}

private String constructAddPartitions(String tableName, List<String> partitions) {
private StringBuilder getAlterTablePrefix(String tableName) {
StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
alterSQL.append(HIVE_ESCAPE_CHARACTER).append(syncConfig.databaseName)
.append(HIVE_ESCAPE_CHARACTER).append(".").append(HIVE_ESCAPE_CHARACTER)
.append(tableName).append(HIVE_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS ");
for (String partition : partitions) {
String partitionClause = getPartitionClause(partition);
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
return alterSQL;
}

private List<String> constructAddPartitions(String tableName, List<String> partitions) {
if (syncConfig.batchSyncNum <= 0) {
throw new HoodieHiveSyncException("batch-sync-num for sync hive table must be greater than 0, pls check your parameter");
}
List<String> result = new ArrayList<>();
int batchSyncPartitionNum = syncConfig.batchSyncNum;
StringBuilder alterSQL = getAlterTablePrefix(tableName);
for (int i = 0; i < partitions.size(); i++) {
String partitionClause = getPartitionClause(partitions.get(i));
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partitions.get(i)).toString();
alterSQL.append(" PARTITION (").append(partitionClause).append(") LOCATION '").append(fullPartitionPath)
.append("' ");
if ((i + 1) % batchSyncPartitionNum == 0) {
result.add(alterSQL.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see duplicate lines in line#186, would we encapsulate a method such as getAlterTablePrefix ?

alterSQL = getAlterTablePrefix(tableName);
}
}
// add left partitions to result
if (partitions.size() % batchSyncPartitionNum != 0) {
result.add(alterSQL.toString());
}
return alterSQL.toString();
return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public static GlobalHiveSyncConfig copy(GlobalHiveSyncConfig cfg) {
newConfig.verifyMetadataFileListing = cfg.verifyMetadataFileListing;
newConfig.supportTimestamp = cfg.supportTimestamp;
newConfig.decodePartition = cfg.decodePartition;
newConfig.batchSyncNum = cfg.batchSyncNum;
newConfig.globallyReplicatedTimeStamp = cfg.globallyReplicatedTimeStamp;
return newConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public static void cleanUpClass() {
@MethodSource({"useJdbcAndSchemaFromCommitMetadata"})
public void testBasicSync(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 3;
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata);
HoodieHiveClient hiveClient =
Expand Down Expand Up @@ -160,6 +161,7 @@ public void testBasicSync(boolean useJdbc, boolean useSchemaFromCommitMetadata)
public void testSyncCOWTableWithProperties(boolean useJdbc,
boolean useSchemaFromCommitMetadata) throws Exception {
HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 3;
Map<String, String> serdeProperties = new HashMap<String, String>() {
{
put("path", hiveSyncConfig.basePath);
Expand Down Expand Up @@ -214,6 +216,7 @@ public void testSyncCOWTableWithProperties(boolean useJdbc,
public void testSyncMORTableWithProperties(boolean useJdbc,
boolean useSchemaFromCommitMetadata) throws Exception {
HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 3;
Map<String, String> serdeProperties = new HashMap<String, String>() {
{
put("path", hiveSyncConfig.basePath);
Expand Down Expand Up @@ -312,6 +315,7 @@ public void testSyncManagedTable(boolean useJdbc,
@MethodSource("useJdbc")
public void testSyncIncremental(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
String commitTime1 = "100";
HiveTestUtil.createCOWTable(commitTime1, 5, true);
HoodieHiveClient hiveClient =
Expand Down Expand Up @@ -351,6 +355,7 @@ public void testSyncIncremental(boolean useJdbc) throws Exception {
@MethodSource("useJdbc")
public void testSyncIncrementalWithSchemaEvolution(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
String commitTime1 = "100";
HiveTestUtil.createCOWTable(commitTime1, 5, true);
HoodieHiveClient hiveClient =
Expand Down Expand Up @@ -388,6 +393,7 @@ public void testSyncIncrementalWithSchemaEvolution(boolean useJdbc) throws Excep
@MethodSource("useJdbcAndSchemaFromCommitMetadata")
public void testSyncMergeOnRead(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
String instantTime = "100";
String deltaCommitTime = "101";
HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true,
Expand Down Expand Up @@ -454,6 +460,7 @@ public void testSyncMergeOnRead(boolean useJdbc, boolean useSchemaFromCommitMeta
@MethodSource("useJdbcAndSchemaFromCommitMetadata")
public void testSyncMergeOnReadRT(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
String instantTime = "100";
String deltaCommitTime = "101";
String snapshotTableName = HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
Expand Down Expand Up @@ -524,6 +531,7 @@ public void testSyncMergeOnReadRT(boolean useJdbc, boolean useSchemaFromCommitMe
@MethodSource("useJdbc")
public void testMultiPartitionKeySync(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 5, true);

Expand Down Expand Up @@ -598,6 +606,7 @@ public void testMultiPartitionKeySync(boolean useJdbc) throws Exception {
@MethodSource("useJdbc")
public void testNonPartitionedSync(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 5, true);

Expand Down Expand Up @@ -627,6 +636,7 @@ public void testNonPartitionedSync(boolean useJdbc) throws Exception {
@MethodSource("useJdbc")
public void testReadSchemaForMOR(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
String commitTime = "100";
String snapshotTableName = HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
HiveTestUtil.createMORTable(commitTime, "", 5, false, true);
Expand Down Expand Up @@ -675,6 +685,7 @@ public void testReadSchemaForMOR(boolean useJdbc) throws Exception {
@Test
public void testConnectExceptionIgnoreConfigSet() throws IOException, URISyntaxException {
HiveTestUtil.hiveSyncConfig.useJdbc = true;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 5, false);
HoodieHiveClient hiveClient =
Expand Down Expand Up @@ -720,6 +731,7 @@ private void verifyOldParquetFileTest(HoodieHiveClient hiveClient, String emptyC
@MethodSource("useJdbc")
public void testPickingOlderParquetFileIfLatestIsEmptyCommit(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
final String commitTime = "100";
HiveTestUtil.createCOWTable(commitTime, 1, true);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
Expand All @@ -740,6 +752,7 @@ public void testPickingOlderParquetFileIfLatestIsEmptyCommit(boolean useJdbc) th
@MethodSource("useJdbc")
public void testNotPickingOlderParquetFileWhenLatestCommitReadFails(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
final String commitTime = "100";
HiveTestUtil.createCOWTable(commitTime, 1, true);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
Expand Down Expand Up @@ -782,6 +795,7 @@ public void testNotPickingOlderParquetFileWhenLatestCommitReadFails(boolean useJ
@MethodSource("useJdbc")
public void testNotPickingOlderParquetFileWhenLatestCommitReadFailsForExistingTable(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
final String commitTime = "100";
HiveTestUtil.createCOWTable(commitTime, 1, true);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
Expand Down Expand Up @@ -828,6 +842,7 @@ public void testNotPickingOlderParquetFileWhenLatestCommitReadFailsForExistingTa
@MethodSource("useJdbc")
public void testTypeConverter(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
HiveTestUtil.createCOWTable("100", 5, true);
HoodieHiveClient hiveClient =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
Expand Down