diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 6d85395d3f6e5..fdb6daf02d233 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -194,7 +194,6 @@ List getPartitionEvents(List tablePartitions, List paths = new HashMap<>(); for (Partition tablePartition : tablePartitions) { List hivePartitionValues = tablePartition.getValues(); - Collections.sort(hivePartitionValues); String fullTablePartitionPath = Path.getPathWithoutSchemeAndAuthority(new Path(tablePartition.getSd().getLocation())).toUri().getPath(); paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath); @@ -206,7 +205,6 @@ List getPartitionEvents(List tablePartitions, List storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); - Collections.sort(storagePartitionValues); if (!storagePartitionValues.isEmpty()) { String storageValue = String.join(", ", storagePartitionValues); if (!paths.containsKey(storageValue)) { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 1d8cbd85347fd..b8e616e9352e7 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -480,15 +480,54 @@ public void testMultiPartitionKeySync(boolean useJdbc) throws Exception { // Lets do the sync HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); tool.syncHoodieTable(); + assertEquals(5, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + "Table partitions should match the number of partitions we wrote"); + assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), + "The last commit that was sycned should be updated in the TBLPROPERTIES"); + + // Now lets create partition "2010/01/02" and followed by "2010/02/01". HoodieHiveClient had a bug where partition vals were sorted + // and stored as keys in a map. The following tests this particular case. + String commitTime2 = "101"; + HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2); + //HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); + + hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime)); + assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit"); + List hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName); + List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince); + assertEquals(1, partitionEvents.size(), "There should be only one paritition event"); + assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD"); + + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool.syncHoodieTable(); + + // Sync should add the one partition + assertEquals(6, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + "The one partition we wrote should be added to hive"); + assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), + "The last commit that was sycned should be 101"); + + // create partition "2010/02/01" and ensure sync works + String commitTime3 = "102"; + HiveTestUtil.addCOWPartition("2010/02/01", true, true, commitTime3); + HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); + + hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool.syncHoodieTable(); + assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName), "Table " + hiveSyncConfig.tableName + " should exist after sync completes"); assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(), hiveClient.getDataSchema().getColumns().size() + 3, "Hive Schema should match the table schema + partition fields"); - assertEquals(5, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + assertEquals(7, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), "Table partitions should match the number of partitions we wrote"); - assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), + assertEquals(commitTime3, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), "The last commit that was sycned should be updated in the TBLPROPERTIES"); + assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size()); } @ParameterizedTest diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index d0d1b667aea20..09090532bf919 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -210,6 +210,14 @@ public static void addCOWPartitions(int numberOfPartitions, boolean isParquetSch createCommitFile(commitMetadata, instantTime); } + public static void addCOWPartition(String partitionPath, boolean isParquetSchemaSimple, + boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException { + HoodieCommitMetadata commitMetadata = + createPartition(partitionPath, isParquetSchemaSimple, useSchemaFromCommitMetadata, instantTime); + createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); + createCommitFile(commitMetadata, instantTime); + } + public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple, boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime) throws IOException, URISyntaxException, InterruptedException { @@ -266,6 +274,18 @@ private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boo return commitMetadata; } + private static HoodieCommitMetadata createPartition(String partitionPath, boolean isParquetSchemaSimple, + boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException { + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath); + fileSystem.makeQualified(partPath); + fileSystem.mkdirs(partPath); + List writeStats = createTestData(partPath, isParquetSchemaSimple, instantTime); + writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s)); + addSchemaToCommitMetadata(commitMetadata, isParquetSchemaSimple, useSchemaFromCommitMetadata); + return commitMetadata; + } + private static List createTestData(Path partPath, boolean isParquetSchemaSimple, String instantTime) throws IOException, URISyntaxException { List writeStats = new ArrayList<>();