From 9990f95e981519e201f974c0baf0c3be12a77556 Mon Sep 17 00:00:00 2001 From: mabin001 Date: Tue, 7 Jul 2020 14:43:01 +0800 Subject: [PATCH 1/8] [HUDI-1064]Delete extra rows from the code --- .../main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 5774c896d970e..6c036cbcbc407 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -52,10 +52,11 @@ private[hudi] object HoodieSparkSqlWriter { val sparkContext = sqlContext.sparkContext val path = parameters.get("path") - val tblName = parameters.get(HoodieWriteConfig.TABLE_NAME) - if (path.isEmpty || tblName.isEmpty) { + val tblNameOp = parameters.get(HoodieWriteConfig.TABLE_NAME) + if (path.isEmpty || tblNameOp.isEmpty) { throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.") } + val tblName = tblNameOp.get.trim sparkContext.getConf.getOption("spark.serializer") match { case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") => case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer") From 48e79da09e5e83a8aca22a03433cca908eb7509d Mon Sep 17 00:00:00 2001 From: mabin001 Date: Tue, 7 Jul 2020 15:07:00 +0800 Subject: [PATCH 2/8] [HUDI-1064]remove .get --- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 6c036cbcbc407..3ba34cb2671aa 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -86,7 +86,7 @@ private[hudi] object HoodieSparkSqlWriter { if (exists && mode == SaveMode.Append) { val existingTableName = new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig.getTableName - if (!existingTableName.equals(tblName.get)) { + if (!existingTableName.equals(tblName)) { throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath") } } @@ -94,8 +94,8 @@ private[hudi] object HoodieSparkSqlWriter { val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) = if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) { // register classes & schemas - val structName = s"${tblName.get}_record" - val nameSpace = s"hoodie.${tblName.get}" + val structName = s"${tblName}_record" + val nameSpace = s"hoodie.${tblName}" sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], classOf[org.apache.avro.Schema])) @@ -130,11 +130,11 @@ private[hudi] object HoodieSparkSqlWriter { // Create the table if not present if (!exists) { HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, tableType, - tblName.get, "archived", parameters(PAYLOAD_CLASS_OPT_KEY)) + tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY)) } // Create a HoodieWriteClient & issue the write. - val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get, + val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName, mapAsJavaMap(parameters) ) @@ -159,8 +159,8 @@ private[hudi] object HoodieSparkSqlWriter { throw new HoodieException(s"Append is the only save mode applicable for $operation operation") } - val structName = s"${tblName.get}_record" - val nameSpace = s"hoodie.${tblName.get}" + val structName = s"${tblName}_record" + val nameSpace = s"hoodie.${tblName}" sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], classOf[org.apache.avro.Schema])) @@ -176,7 +176,7 @@ private[hudi] object HoodieSparkSqlWriter { // Create a HoodieWriteClient & issue the delete. val client = DataSourceUtils.createHoodieClient(jsc, - Schema.create(Schema.Type.NULL).toString, path.get, tblName.get, + Schema.create(Schema.Type.NULL).toString, path.get, tblName, mapAsJavaMap(parameters) ) From 8fd3b6c94779c3a296cf966562001e3583e6990c Mon Sep 17 00:00:00 2001 From: mabin001 Date: Tue, 7 Jul 2020 16:41:49 +0800 Subject: [PATCH 3/8] [HUDI-1064]Trim hoodie table name --- .../src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 5774c896d970e..8f4900897236d 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -52,7 +52,7 @@ private[hudi] object HoodieSparkSqlWriter { val sparkContext = sqlContext.sparkContext val path = parameters.get("path") - val tblName = parameters.get(HoodieWriteConfig.TABLE_NAME) + val tblName = parameters.get(HoodieWriteConfig.TABLE_NAME).get.trim if (path.isEmpty || tblName.isEmpty) { throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.") } From c0302a1ccc84bd172d8b53e695e08a9d2d12d6b1 Mon Sep 17 00:00:00 2001 From: mabin001 Date: Tue, 7 Jul 2020 16:46:13 +0800 Subject: [PATCH 4/8] [HUDI-1064]remove .get() --- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 8f4900897236d..4f94f5b199680 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -85,7 +85,7 @@ private[hudi] object HoodieSparkSqlWriter { if (exists && mode == SaveMode.Append) { val existingTableName = new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig.getTableName - if (!existingTableName.equals(tblName.get)) { + if (!existingTableName.equals(tblName)) { throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath") } } @@ -93,8 +93,8 @@ private[hudi] object HoodieSparkSqlWriter { val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) = if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) { // register classes & schemas - val structName = s"${tblName.get}_record" - val nameSpace = s"hoodie.${tblName.get}" + val structName = s"${tblName}_record" + val nameSpace = s"hoodie.${tblName}" sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], classOf[org.apache.avro.Schema])) @@ -129,11 +129,11 @@ private[hudi] object HoodieSparkSqlWriter { // Create the table if not present if (!exists) { HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, tableType, - tblName.get, "archived", parameters(PAYLOAD_CLASS_OPT_KEY)) + tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY)) } // Create a HoodieWriteClient & issue the write. - val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get, + val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName, mapAsJavaMap(parameters) ) @@ -158,8 +158,8 @@ private[hudi] object HoodieSparkSqlWriter { throw new HoodieException(s"Append is the only save mode applicable for $operation operation") } - val structName = s"${tblName.get}_record" - val nameSpace = s"hoodie.${tblName.get}" + val structName = s"${tblName}_record" + val nameSpace = s"hoodie.${tblName}" sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], classOf[org.apache.avro.Schema])) @@ -175,7 +175,7 @@ private[hudi] object HoodieSparkSqlWriter { // Create a HoodieWriteClient & issue the delete. val client = DataSourceUtils.createHoodieClient(jsc, - Schema.create(Schema.Type.NULL).toString, path.get, tblName.get, + Schema.create(Schema.Type.NULL).toString, path.get, tblName, mapAsJavaMap(parameters) ) From ed585fc47add68f8487c006ae9950f74ff877d43 Mon Sep 17 00:00:00 2001 From: mabin001 Date: Tue, 7 Jul 2020 16:52:50 +0800 Subject: [PATCH 5/8] [HUDI-1064]To solve the java.util.NoSuchElementException: None.get exception --- .../main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 4f94f5b199680..37f13a4a971af 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -52,8 +52,10 @@ private[hudi] object HoodieSparkSqlWriter { val sparkContext = sqlContext.sparkContext val path = parameters.get("path") - val tblName = parameters.get(HoodieWriteConfig.TABLE_NAME).get.trim - if (path.isEmpty || tblName.isEmpty) { + val tblName_tmp = parameters.get(HoodieWriteConfig.TABLE_NAME) + if ( !path.isEmpty && !tblName_tmp.isEmpty) { + val tblName = tblName_tmp.get.trim + }else{ throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.") } sparkContext.getConf.getOption("spark.serializer") match { From 8b3e6b5514952d606e2b8e0d6202eae19138a9fd Mon Sep 17 00:00:00 2001 From: mabin001 Date: Tue, 7 Jul 2020 16:55:45 +0800 Subject: [PATCH 6/8] [HUDI-1064]compiler suceess --- .../main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 37f13a4a971af..3ba34cb2671aa 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -52,12 +52,11 @@ private[hudi] object HoodieSparkSqlWriter { val sparkContext = sqlContext.sparkContext val path = parameters.get("path") - val tblName_tmp = parameters.get(HoodieWriteConfig.TABLE_NAME) - if ( !path.isEmpty && !tblName_tmp.isEmpty) { - val tblName = tblName_tmp.get.trim - }else{ + val tblNameOp = parameters.get(HoodieWriteConfig.TABLE_NAME) + if (path.isEmpty || tblNameOp.isEmpty) { throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.") } + val tblName = tblNameOp.get.trim sparkContext.getConf.getOption("spark.serializer") match { case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") => case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer") From 868331086cc957615ac994385a795855bd3170da Mon Sep 17 00:00:00 2001 From: linshan-ma Date: Fri, 7 Aug 2020 10:11:57 +0800 Subject: [PATCH 7/8] [HUDI-1156]Remove unused dependencies from HoodieDeltaStreamerWrapper Class --- .../apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java index f28e6bfc031b1..5179e892c04c7 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java @@ -18,8 +18,6 @@ package org.apache.hudi.integ.testsuite; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.collection.Pair; From 7af02ad443b7d8f3a4591acb1eb88fbdce1b2167 Mon Sep 17 00:00:00 2001 From: linshan-ma Date: Fri, 7 Aug 2020 13:49:35 +0800 Subject: [PATCH 8/8] trigger rebuild