diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index 0c2abdbf571ac..9b6385120ff9f 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -120,6 +120,9 @@ public class HiveSyncConfig implements Serializable { @Parameter(names = {"--with-operation-field"}, description = "Whether to include the '_hoodie_operation' field in the metadata fields") public Boolean withOperationField = false; + @Parameter(names = {"--conditional-sync"}, description = "If true, only sync on conditions like schema change or partition change.") + public Boolean isConditionalSync = false; + // enhance the similar function in child class public static HiveSyncConfig copy(HiveSyncConfig cfg) { HiveSyncConfig newConfig = new HiveSyncConfig(); @@ -143,6 +146,7 @@ public static HiveSyncConfig copy(HiveSyncConfig cfg) { newConfig.syncAsSparkDataSourceTable = cfg.syncAsSparkDataSourceTable; newConfig.sparkSchemaLengthThreshold = cfg.sparkSchemaLengthThreshold; newConfig.withOperationField = cfg.withOperationField; + newConfig.isConditionalSync = cfg.isConditionalSync; return newConfig; } @@ -174,6 +178,7 @@ public String toString() { + ", syncAsSparkDataSourceTable=" + syncAsSparkDataSourceTable + ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold + ", withOperationField=" + withOperationField + + ", isConditionalSync=" + isConditionalSync + '}'; } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 6a1d930c5e89e..3bbaee1ed8bcb 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -179,7 +179,7 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, cfg.syncAsSparkDataSourceTable = false; } // Sync schema if needed - syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema); + boolean schemaChanged = syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema); LOG.info("Schema sync complete. Syncing partitions for " + tableName); // Get the last time we successfully synced partitions @@ -192,8 +192,11 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size()); // Sync the partitions if needed - syncPartitions(tableName, writtenPartitionsSince); - hoodieHiveClient.updateLastCommitTimeSynced(tableName); + boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince); + boolean meetSyncConditions = schemaChanged || partitionsChanged; + if (!cfg.isConditionalSync || meetSyncConditions) { + hoodieHiveClient.updateLastCommitTimeSynced(tableName); + } LOG.info("Sync complete for " + tableName); } @@ -204,7 +207,7 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, * @param tableExists - does table exist * @param schema - extracted schema */ - private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat, + private boolean syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat, boolean readAsOptimized, MessageType schema) { // Append spark table properties & serde properties Map tableProperties = ConfigUtils.toMap(cfg.tableProperties); @@ -215,6 +218,7 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi tableProperties.putAll(sparkTableProperties); serdeProperties.putAll(sparkSerdeProperties); } + boolean schemaChanged = false; // Check and sync schema if (!tableExists) { LOG.info("Hive table " + tableName + " is not found. Creating it"); @@ -236,6 +240,7 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi // /ql/exec/DDLTask.java#L3488 hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, outputFormatClassName, serDeFormatClassName, serdeProperties, tableProperties); + schemaChanged = true; } else { // Check if the table schema has evolved Map tableSchema = hoodieHiveClient.getTableSchema(tableName); @@ -248,10 +253,12 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi hoodieHiveClient.updateTableProperties(tableName, tableProperties); LOG.info("Sync table properties for " + tableName + ", table properties is: " + cfg.tableProperties); } + schemaChanged = true; } else { LOG.info("No Schema difference for " + tableName); } } + return schemaChanged; } /** @@ -324,7 +331,8 @@ private Map getSparkSerdeProperties(boolean readAsOptimized) { * Syncs the list of storage partitions passed in (checks if the partition is in hive, if not adds it or if the * partition path does not match, it updates the partition path). */ - private void syncPartitions(String tableName, List writtenPartitionsSince) { + private boolean syncPartitions(String tableName, List writtenPartitionsSince) { + boolean partitionsChanged; try { List hivePartitions = hoodieHiveClient.scanTablePartitions(tableName); List partitionEvents = @@ -335,9 +343,11 @@ private void syncPartitions(String tableName, List writtenPartitionsSinc List updatePartitions = filterPartitions(partitionEvents, PartitionEventType.UPDATE); LOG.info("Changed Partitions " + updatePartitions); hoodieHiveClient.updatePartitionsToTable(tableName, updatePartitions); + partitionsChanged = !updatePartitions.isEmpty() || !newPartitions.isEmpty(); } catch (Exception e) { throw new HoodieHiveSyncException("Failed to sync partitions for table " + tableName, e); } + return partitionsChanged; } private List filterPartitions(List events, PartitionEventType eventType) { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 64043a5bb29f8..d36727a571deb 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -1017,4 +1017,35 @@ public void testTypeConverter(String syncMode) throws Exception { .containsValue("BIGINT"), errorMsg); ddlExecutor.runSQL(dropTableSql); } + + @ParameterizedTest + @MethodSource("syncMode") + public void testSyncWithoutDiffs(String syncMode) throws Exception { + hiveSyncConfig.syncMode = syncMode; + hiveSyncConfig.isConditionalSync = true; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; + String tableName = HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; + + String commitTime0 = "100"; + String commitTime1 = "101"; + String commitTime2 = "102"; + HiveTestUtil.createMORTable(commitTime0, commitTime1, 2, true, true); + + HoodieHiveClient hiveClient = + new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + + HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool.syncHoodieTable(); + + assertTrue(hiveClient.doesTableExist(tableName)); + assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(tableName).get()); + + HiveTestUtil.addMORPartitions(0, true, true, true, ZonedDateTime.now().plusDays(2), commitTime1, commitTime2); + + tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool.syncHoodieTable(); + hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(tableName).get()); + } + }