From baebb7124f01c51c26f81163b3529d5b46a900e0 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Mon, 4 Jan 2021 08:12:17 -0500 Subject: [PATCH] Fixing sorting of partition vals for hive sync computation --- .../apache/hudi/hive/HoodieHiveClient.java | 2 - .../apache/hudi/hive/TestHiveSyncTool.java | 66 +++++++++++++++---- .../hudi/hive/testutils/HiveTestUtil.java | 20 ++++++ 3 files changed, 75 insertions(+), 13 deletions(-) diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 88f4c10611b08..c26efb29cc54e 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -207,7 +207,6 @@ List getPartitionEvents(List tablePartitions, List paths = new HashMap<>(); for (Partition tablePartition : tablePartitions) { List hivePartitionValues = tablePartition.getValues(); - Collections.sort(hivePartitionValues); String fullTablePartitionPath = Path.getPathWithoutSchemeAndAuthority(new Path(tablePartition.getSd().getLocation())).toUri().getPath(); paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath); @@ -219,7 +218,6 @@ List getPartitionEvents(List tablePartitions, List storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); - Collections.sort(storagePartitionValues); if (!storagePartitionValues.isEmpty()) { String storageValue = String.join(", ", storagePartitionValues); if (!paths.containsKey(storageValue)) { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 1d8cbd85347fd..8a1ea4f893927 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -21,10 +21,10 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.Option; -import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; -import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; import org.apache.hudi.hive.testutils.HiveTestUtil; import org.apache.hudi.hive.util.HiveSchemaUtil; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.parquet.schema.MessageType; @@ -56,7 +56,7 @@ private static Stream useJdbc() { } private static Iterable useJdbcAndSchemaFromCommitMetadata() { - return Arrays.asList(new Object[][] { { true, true }, { true, false }, { false, true }, { false, false } }); + return Arrays.asList(new Object[][] {{true, true}, {true, false}, {false, true}, {false, false}}); } @BeforeEach @@ -347,7 +347,7 @@ public void testSyncMergeOnRead(boolean useJdbc, boolean useSchemaFromCommitMeta assertEquals(hiveClient.getTableSchema(roTableName).size(), SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size() + HoodieRecord.HOODIE_META_COLUMNS.size(), - "Hive Schema should match the table schema + partition field"); + "Hive Schema should match the table schema + partition field"); } else { // The data generated and schema in the data file do not have metadata columns, so we need a separate check. assertEquals(hiveClient.getTableSchema(roTableName).size(), @@ -377,7 +377,7 @@ public void testSyncMergeOnRead(boolean useJdbc, boolean useSchemaFromCommitMeta assertEquals(hiveClient.getTableSchema(roTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size() + HoodieRecord.HOODIE_META_COLUMNS.size(), - "Hive Schema should match the evolved table schema + partition field"); + "Hive Schema should match the evolved table schema + partition field"); } else { // The data generated and schema in the data file do not have metadata columns, so we need a separate check. assertEquals(hiveClient.getTableSchema(roTableName).size(), @@ -418,7 +418,7 @@ public void testSyncMergeOnReadRT(boolean useJdbc, boolean useSchemaFromCommitMe assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size() + HoodieRecord.HOODIE_META_COLUMNS.size(), - "Hive Schema should match the table schema + partition field"); + "Hive Schema should match the table schema + partition field"); } else { // The data generated and schema in the data file do not have metadata columns, so we need a separate check. assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), @@ -489,6 +489,50 @@ public void testMultiPartitionKeySync(boolean useJdbc) throws Exception { "Table partitions should match the number of partitions we wrote"); assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), "The last commit that was sycned should be updated in the TBLPROPERTIES"); + + // HoodieHiveClient had a bug where partition vals were sorted + // and stored as keys in a map. The following tests this particular case. + // Now lets create partition "2010/01/02" and followed by "2010/02/01". + String commitTime2 = "101"; + HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2); + + hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime)); + assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit"); + List hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName); + List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince); + assertEquals(1, partitionEvents.size(), "There should be only one paritition event"); + assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD"); + + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool.syncHoodieTable(); + + // Sync should add the one partition + assertEquals(6, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + "Table partitions should match the number of partitions we wrote"); + assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), + "The last commit that was sycned should be 101"); + + // create partition "2010/02/01" and ensure sync works + String commitTime3 = "102"; + HiveTestUtil.addCOWPartition("2010/02/01", true, true, commitTime3); + HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); + + hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool.syncHoodieTable(); + + assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName), + "Table " + hiveSyncConfig.tableName + " should exist after sync completes"); + assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(), + hiveClient.getDataSchema().getColumns().size() + 3, + "Hive Schema should match the table schema + partition fields"); + assertEquals(7, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + "Table partitions should match the number of partitions we wrote"); + assertEquals(commitTime3, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), + "The last commit that was sycned should be updated in the TBLPROPERTIES"); + assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size()); } @ParameterizedTest @@ -507,17 +551,17 @@ public void testNonPartitionedSync(boolean useJdbc) throws Exception { HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName), - "Table " + hiveSyncConfig.tableName + " should not exist initially"); + "Table " + hiveSyncConfig.tableName + " should not exist initially"); // Lets do the sync HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); tool.syncHoodieTable(); assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName), - "Table " + hiveSyncConfig.tableName + " should exist after sync completes"); + "Table " + hiveSyncConfig.tableName + " should exist after sync completes"); assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(), - hiveClient.getDataSchema().getColumns().size(), - "Hive Schema should match the table schema,ignoring the partition fields"); + hiveClient.getDataSchema().getColumns().size(), + "Hive Schema should match the table schema,ignoring the partition fields"); assertEquals(0, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), - "Table should not have partitions because of the NonPartitionedExtractor"); + "Table should not have partitions because of the NonPartitionedExtractor"); } @ParameterizedTest diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index d0d1b667aea20..09090532bf919 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -210,6 +210,14 @@ public static void addCOWPartitions(int numberOfPartitions, boolean isParquetSch createCommitFile(commitMetadata, instantTime); } + public static void addCOWPartition(String partitionPath, boolean isParquetSchemaSimple, + boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException { + HoodieCommitMetadata commitMetadata = + createPartition(partitionPath, isParquetSchemaSimple, useSchemaFromCommitMetadata, instantTime); + createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); + createCommitFile(commitMetadata, instantTime); + } + public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple, boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime) throws IOException, URISyntaxException, InterruptedException { @@ -266,6 +274,18 @@ private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boo return commitMetadata; } + private static HoodieCommitMetadata createPartition(String partitionPath, boolean isParquetSchemaSimple, + boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException { + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath); + fileSystem.makeQualified(partPath); + fileSystem.mkdirs(partPath); + List writeStats = createTestData(partPath, isParquetSchemaSimple, instantTime); + writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s)); + addSchemaToCommitMetadata(commitMetadata, isParquetSchemaSimple, useSchemaFromCommitMetadata); + return commitMetadata; + } + private static List createTestData(Path partPath, boolean isParquetSchemaSimple, String instantTime) throws IOException, URISyntaxException { List writeStats = new ArrayList<>();