From 9990f95e981519e201f974c0baf0c3be12a77556 Mon Sep 17 00:00:00 2001 From: mabin001 Date: Tue, 7 Jul 2020 14:43:01 +0800 Subject: [PATCH 1/8] [HUDI-1064]Delete extra rows from the code --- .../main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 5774c896d970e..6c036cbcbc407 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -52,10 +52,11 @@ private[hudi] object HoodieSparkSqlWriter { val sparkContext = sqlContext.sparkContext val path = parameters.get("path") - val tblName = parameters.get(HoodieWriteConfig.TABLE_NAME) - if (path.isEmpty || tblName.isEmpty) { + val tblNameOp = parameters.get(HoodieWriteConfig.TABLE_NAME) + if (path.isEmpty || tblNameOp.isEmpty) { throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.") } + val tblName = tblNameOp.get.trim sparkContext.getConf.getOption("spark.serializer") match { case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") => case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer") From 48e79da09e5e83a8aca22a03433cca908eb7509d Mon Sep 17 00:00:00 2001 From: mabin001 Date: Tue, 7 Jul 2020 15:07:00 +0800 Subject: [PATCH 2/8] [HUDI-1064]remove .get --- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 6c036cbcbc407..3ba34cb2671aa 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -86,7 +86,7 @@ private[hudi] object HoodieSparkSqlWriter { if (exists && mode == SaveMode.Append) { val existingTableName = new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig.getTableName - if (!existingTableName.equals(tblName.get)) { + if (!existingTableName.equals(tblName)) { throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath") } } @@ -94,8 +94,8 @@ private[hudi] object HoodieSparkSqlWriter { val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) = if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) { // register classes & schemas - val structName = s"${tblName.get}_record" - val nameSpace = s"hoodie.${tblName.get}" + val structName = s"${tblName}_record" + val nameSpace = s"hoodie.${tblName}" sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], classOf[org.apache.avro.Schema])) @@ -130,11 +130,11 @@ private[hudi] object HoodieSparkSqlWriter { // Create the table if not present if (!exists) { HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, tableType, - tblName.get, "archived", parameters(PAYLOAD_CLASS_OPT_KEY)) + tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY)) } // Create a HoodieWriteClient & issue the write. - val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get, + val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName, mapAsJavaMap(parameters) ) @@ -159,8 +159,8 @@ private[hudi] object HoodieSparkSqlWriter { throw new HoodieException(s"Append is the only save mode applicable for $operation operation") } - val structName = s"${tblName.get}_record" - val nameSpace = s"hoodie.${tblName.get}" + val structName = s"${tblName}_record" + val nameSpace = s"hoodie.${tblName}" sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], classOf[org.apache.avro.Schema])) @@ -176,7 +176,7 @@ private[hudi] object HoodieSparkSqlWriter { // Create a HoodieWriteClient & issue the delete. val client = DataSourceUtils.createHoodieClient(jsc, - Schema.create(Schema.Type.NULL).toString, path.get, tblName.get, + Schema.create(Schema.Type.NULL).toString, path.get, tblName, mapAsJavaMap(parameters) ) From 8fd3b6c94779c3a296cf966562001e3583e6990c Mon Sep 17 00:00:00 2001 From: mabin001 Date: Tue, 7 Jul 2020 16:41:49 +0800 Subject: [PATCH 3/8] [HUDI-1064]Trim hoodie table name --- .../src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 5774c896d970e..8f4900897236d 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -52,7 +52,7 @@ private[hudi] object HoodieSparkSqlWriter { val sparkContext = sqlContext.sparkContext val path = parameters.get("path") - val tblName = parameters.get(HoodieWriteConfig.TABLE_NAME) + val tblName = parameters.get(HoodieWriteConfig.TABLE_NAME).get.trim if (path.isEmpty || tblName.isEmpty) { throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.") } From c0302a1ccc84bd172d8b53e695e08a9d2d12d6b1 Mon Sep 17 00:00:00 2001 From: mabin001 Date: Tue, 7 Jul 2020 16:46:13 +0800 Subject: [PATCH 4/8] [HUDI-1064]remove .get() --- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 8f4900897236d..4f94f5b199680 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -85,7 +85,7 @@ private[hudi] object HoodieSparkSqlWriter { if (exists && mode == SaveMode.Append) { val existingTableName = new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig.getTableName - if (!existingTableName.equals(tblName.get)) { + if (!existingTableName.equals(tblName)) { throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath") } } @@ -93,8 +93,8 @@ private[hudi] object HoodieSparkSqlWriter { val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) = if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) { // register classes & schemas - val structName = s"${tblName.get}_record" - val nameSpace = s"hoodie.${tblName.get}" + val structName = s"${tblName}_record" + val nameSpace = s"hoodie.${tblName}" sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], classOf[org.apache.avro.Schema])) @@ -129,11 +129,11 @@ private[hudi] object HoodieSparkSqlWriter { // Create the table if not present if (!exists) { HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, tableType, - tblName.get, "archived", parameters(PAYLOAD_CLASS_OPT_KEY)) + tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY)) } // Create a HoodieWriteClient & issue the write. - val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get, + val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName, mapAsJavaMap(parameters) ) @@ -158,8 +158,8 @@ private[hudi] object HoodieSparkSqlWriter { throw new HoodieException(s"Append is the only save mode applicable for $operation operation") } - val structName = s"${tblName.get}_record" - val nameSpace = s"hoodie.${tblName.get}" + val structName = s"${tblName}_record" + val nameSpace = s"hoodie.${tblName}" sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], classOf[org.apache.avro.Schema])) @@ -175,7 +175,7 @@ private[hudi] object HoodieSparkSqlWriter { // Create a HoodieWriteClient & issue the delete. val client = DataSourceUtils.createHoodieClient(jsc, - Schema.create(Schema.Type.NULL).toString, path.get, tblName.get, + Schema.create(Schema.Type.NULL).toString, path.get, tblName, mapAsJavaMap(parameters) ) From ed585fc47add68f8487c006ae9950f74ff877d43 Mon Sep 17 00:00:00 2001 From: mabin001 Date: Tue, 7 Jul 2020 16:52:50 +0800 Subject: [PATCH 5/8] [HUDI-1064]To solve the java.util.NoSuchElementException: None.get exception --- .../main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 4f94f5b199680..37f13a4a971af 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -52,8 +52,10 @@ private[hudi] object HoodieSparkSqlWriter { val sparkContext = sqlContext.sparkContext val path = parameters.get("path") - val tblName = parameters.get(HoodieWriteConfig.TABLE_NAME).get.trim - if (path.isEmpty || tblName.isEmpty) { + val tblName_tmp = parameters.get(HoodieWriteConfig.TABLE_NAME) + if ( !path.isEmpty && !tblName_tmp.isEmpty) { + val tblName = tblName_tmp.get.trim + }else{ throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.") } sparkContext.getConf.getOption("spark.serializer") match { From 8b3e6b5514952d606e2b8e0d6202eae19138a9fd Mon Sep 17 00:00:00 2001 From: mabin001 Date: Tue, 7 Jul 2020 16:55:45 +0800 Subject: [PATCH 6/8] [HUDI-1064]compiler suceess --- .../main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 37f13a4a971af..3ba34cb2671aa 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -52,12 +52,11 @@ private[hudi] object HoodieSparkSqlWriter { val sparkContext = sqlContext.sparkContext val path = parameters.get("path") - val tblName_tmp = parameters.get(HoodieWriteConfig.TABLE_NAME) - if ( !path.isEmpty && !tblName_tmp.isEmpty) { - val tblName = tblName_tmp.get.trim - }else{ + val tblNameOp = parameters.get(HoodieWriteConfig.TABLE_NAME) + if (path.isEmpty || tblNameOp.isEmpty) { throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.") } + val tblName = tblNameOp.get.trim sparkContext.getConf.getOption("spark.serializer") match { case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") => case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer") From c9917ffe5707afeb721e11b5059da33654932953 Mon Sep 17 00:00:00 2001 From: linshan-ma Date: Thu, 19 Nov 2020 15:46:48 +0800 Subject: [PATCH 7/8] [HUDI-1383]Incorrect partitions getting hive synced,Modify hive partition synchronization --- .../src/main/java/org/apache/hudi/hive/HoodieHiveClient.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 6d85395d3f6e5..fdb6daf02d233 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -194,7 +194,6 @@ List getPartitionEvents(List tablePartitions, List paths = new HashMap<>(); for (Partition tablePartition : tablePartitions) { List hivePartitionValues = tablePartition.getValues(); - Collections.sort(hivePartitionValues); String fullTablePartitionPath = Path.getPathWithoutSchemeAndAuthority(new Path(tablePartition.getSd().getLocation())).toUri().getPath(); paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath); @@ -206,7 +205,6 @@ List getPartitionEvents(List tablePartitions, List storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); - Collections.sort(storagePartitionValues); if (!storagePartitionValues.isEmpty()) { String storageValue = String.join(", ", storagePartitionValues); if (!paths.containsKey(storageValue)) { From c1e36b2d5dc569bd9130f6afa58de02bc878f72f Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Fri, 1 Jan 2021 22:41:18 -0500 Subject: [PATCH 8/8] Adding a test to ensure fix works --- .../apache/hudi/hive/TestHiveSyncTool.java | 43 ++++++++++++++++++- .../hudi/hive/testutils/HiveTestUtil.java | 20 +++++++++ 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 1d8cbd85347fd..b8e616e9352e7 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -480,15 +480,54 @@ public void testMultiPartitionKeySync(boolean useJdbc) throws Exception { // Lets do the sync HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); tool.syncHoodieTable(); + assertEquals(5, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + "Table partitions should match the number of partitions we wrote"); + assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), + "The last commit that was sycned should be updated in the TBLPROPERTIES"); + + // Now lets create partition "2010/01/02" and followed by "2010/02/01". HoodieHiveClient had a bug where partition vals were sorted + // and stored as keys in a map. The following tests this particular case. + String commitTime2 = "101"; + HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2); + //HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); + + hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime)); + assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit"); + List hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName); + List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince); + assertEquals(1, partitionEvents.size(), "There should be only one paritition event"); + assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD"); + + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool.syncHoodieTable(); + + // Sync should add the one partition + assertEquals(6, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + "The one partition we wrote should be added to hive"); + assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), + "The last commit that was sycned should be 101"); + + // create partition "2010/02/01" and ensure sync works + String commitTime3 = "102"; + HiveTestUtil.addCOWPartition("2010/02/01", true, true, commitTime3); + HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); + + hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool.syncHoodieTable(); + assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName), "Table " + hiveSyncConfig.tableName + " should exist after sync completes"); assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(), hiveClient.getDataSchema().getColumns().size() + 3, "Hive Schema should match the table schema + partition fields"); - assertEquals(5, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + assertEquals(7, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), "Table partitions should match the number of partitions we wrote"); - assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), + assertEquals(commitTime3, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), "The last commit that was sycned should be updated in the TBLPROPERTIES"); + assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size()); } @ParameterizedTest diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index d0d1b667aea20..09090532bf919 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -210,6 +210,14 @@ public static void addCOWPartitions(int numberOfPartitions, boolean isParquetSch createCommitFile(commitMetadata, instantTime); } + public static void addCOWPartition(String partitionPath, boolean isParquetSchemaSimple, + boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException { + HoodieCommitMetadata commitMetadata = + createPartition(partitionPath, isParquetSchemaSimple, useSchemaFromCommitMetadata, instantTime); + createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); + createCommitFile(commitMetadata, instantTime); + } + public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple, boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime) throws IOException, URISyntaxException, InterruptedException { @@ -266,6 +274,18 @@ private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boo return commitMetadata; } + private static HoodieCommitMetadata createPartition(String partitionPath, boolean isParquetSchemaSimple, + boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException { + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath); + fileSystem.makeQualified(partPath); + fileSystem.mkdirs(partPath); + List writeStats = createTestData(partPath, isParquetSchemaSimple, instantTime); + writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s)); + addSchemaToCommitMetadata(commitMetadata, isParquetSchemaSimple, useSchemaFromCommitMetadata); + return commitMetadata; + } + private static List createTestData(Path partPath, boolean isParquetSchemaSimple, String instantTime) throws IOException, URISyntaxException { List writeStats = new ArrayList<>();