From f71b7f2bf123aacf2e28ca8ec8f087a9fa57c3c5 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 5 Aug 2022 14:09:14 -0700 Subject: [PATCH 1/5] Added "hive-site.xml" to fix tests failing locally due to Hive trying to run its commands on behalf of the executing user, which fails on Mac --- .../src/test/resources/hive-site.xml | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 hudi-utilities/src/test/resources/hive-site.xml diff --git a/hudi-utilities/src/test/resources/hive-site.xml b/hudi-utilities/src/test/resources/hive-site.xml new file mode 100644 index 0000000000000..4866230d2707b --- /dev/null +++ b/hudi-utilities/src/test/resources/hive-site.xml @@ -0,0 +1,35 @@ + + + + + + + hive.server2.enable.doAs + false + + + + hive.metastore.schema.verification + false + + + + datanucleus.schema.autoCreateTables + true + + From 9a303fe1b2ca75ab84c8ea43585dddfb4e0dd37e Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 5 Aug 2022 14:56:49 -0700 Subject: [PATCH 2/5] Added tests validating `DELETE FROM` statements --- .../spark/sql/hudi/TestDeleteFromTable.scala | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteFromTable.scala diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteFromTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteFromTable.scala new file mode 100644 index 0000000000000..a972f835e8054 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteFromTable.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +class TestDeleteFromTable extends HoodieSparkSqlTestBase { + + test("Test deleting from table") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + spark.sql( + s""" + |CREATE TABLE $tableName ( + | id int, + | dt string, + | name string, + | price double, + | ts long + |) USING hudi + | tblproperties ( + | primaryKey = 'id', + | tableType = '$tableType' + | ) + | PARTITIONED BY (dt) + | LOCATION '${tmp.getCanonicalPath}' + """.stripMargin) + + // NOTE: Do not write the field alias, the partition field must be placed last. + spark.sql( + s""" + | INSERT INTO $tableName VALUES + | (1, 'a1', 10, 1000, "2021-01-05"), + | (2, 'a2', 20, 2000, "2021-01-06"), + | (3, 'a3', 30, 3000, "2021-01-07") + """.stripMargin) + + checkAnswer(s"SELECT id, name, price, ts, dt FROM $tableName")( + Seq(1, "a1", 10.0, 1000, "2021-01-05"), + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3", 30.0, 3000, "2021-01-07") + ) + + // Delete single row + spark.sql(s"DELETE FROM $tableName WHERE id = 1") + + checkAnswer(s"SELECT id, name, price, ts, dt FROM $tableName")( + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3", 30.0, 3000, "2021-01-07") + ) + + // Try deleting non-existent row + spark.sql(s"DELETE FROM $tableName WHERE id = 1") + + checkAnswer(s"SELECT id, name, price, ts, dt FROM $tableName")( + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3", 30.0, 3000, "2021-01-07") + ) + + // Delete record identified by some field other than the primary-key + spark.sql(s"DELETE FROM $tableName WHERE name = 'a2'") + + checkAnswer(s"SELECT id, name, price, ts, dt FROM $tableName")( + Seq(3, "a3", 30.0, 3000, "2021-01-07") + ) + } + } + } +} From 7c035969b45d690030bb143738aee617ca5112fe Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 8 Aug 2022 18:04:27 -0700 Subject: [PATCH 3/5] Expanded `HoodieSparkQuickStart` and corresponding test (to match the one on the website) --- .../quickstart/HoodieSparkQuickstart.java | 30 +++++++++++++++++ .../quickstart/TestHoodieSparkQuickstart.java | 33 +++++++++++++++---- 2 files changed, 56 insertions(+), 7 deletions(-) diff --git a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java index 9f8e29d68773f..348902b99f87e 100644 --- a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java +++ b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java @@ -58,6 +58,8 @@ public static void main(String[] args) { final HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator<>(); insertData(spark, jsc, tablePath, tableName, dataGen); + queryData(spark, jsc, tablePath, tableName, dataGen); + updateData(spark, jsc, tablePath, tableName, dataGen); queryData(spark, jsc, tablePath, tableName, dataGen); @@ -65,7 +67,13 @@ public static void main(String[] args) { pointInTimeQuery(spark, tablePath, tableName); delete(spark, tablePath, tableName); + queryData(spark, jsc, tablePath, tableName, dataGen); + + insertOverwriteData(spark, jsc, tablePath, tableName, dataGen); + queryData(spark, jsc, tablePath, tableName, dataGen); + deleteByPartition(spark, tablePath, tableName); + queryData(spark, jsc, tablePath, tableName, dataGen); } } @@ -77,6 +85,7 @@ public static void insertData(SparkSession spark, JavaSparkContext jsc, String t String commitTime = Long.toString(System.currentTimeMillis()); List inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20)); Dataset df = spark.read().json(jsc.parallelize(inserts, 1)); + df.write().format("org.apache.hudi") .options(QuickstartUtils.getQuickstartWriteConfigs()) .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts") @@ -87,6 +96,27 @@ public static void insertData(SparkSession spark, JavaSparkContext jsc, String t .save(tablePath); } + /** + * Generate new records, load them into a {@link Dataset} and insert-overwrite it into the Hudi dataset + */ + public static void insertOverwriteData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName, + HoodieExampleDataGenerator dataGen) { + String commitTime = Long.toString(System.currentTimeMillis()); + List inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20)); + Dataset df = spark.read().json(jsc.parallelize(inserts, 1)); + + df.write().format("org.apache.hudi") + .options(QuickstartUtils.getQuickstartWriteConfigs()) + .option("hoodie.datasource.write.operation", WriteOperationType.INSERT_OVERWRITE.name()) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts") + .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid") + .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath") + .option(TBL_NAME.key(), tableName) + .mode(Append) + .save(tablePath); + } + + /** * Load the data files into a DataFrame. */ diff --git a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java index 212dcc440933f..b8fd40d2ace39 100644 --- a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java +++ b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java @@ -36,6 +36,15 @@ import java.io.File; import java.nio.file.Paths; +import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.delete; +import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.deleteByPartition; +import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.incrementalQuery; +import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.insertData; +import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.insertOverwriteData; +import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.pointInTimeQuery; +import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.queryData; +import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.updateData; + public class TestHoodieSparkQuickstart implements SparkProvider { protected static transient HoodieSparkEngineContext context; @@ -100,15 +109,25 @@ public void testHoodieSparkQuickstart() { String tablePath = tablePath(tableName); try { - HoodieSparkQuickstart.insertData(spark, jsc, tablePath, tableName, DATA_GEN); - HoodieSparkQuickstart.updateData(spark, jsc, tablePath, tableName, DATA_GEN); + final HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator<>(); + + insertData(spark, jsc, tablePath, tableName, dataGen); + queryData(spark, jsc, tablePath, tableName, dataGen); + + updateData(spark, jsc, tablePath, tableName, dataGen); + queryData(spark, jsc, tablePath, tableName, dataGen); + + incrementalQuery(spark, tablePath, tableName); + pointInTimeQuery(spark, tablePath, tableName); + + delete(spark, tablePath, tableName); + queryData(spark, jsc, tablePath, tableName, dataGen); - HoodieSparkQuickstart.queryData(spark, jsc, tablePath, tableName, DATA_GEN); - HoodieSparkQuickstart.incrementalQuery(spark, tablePath, tableName); - HoodieSparkQuickstart.pointInTimeQuery(spark, tablePath, tableName); + insertOverwriteData(spark, jsc, tablePath, tableName, dataGen); + queryData(spark, jsc, tablePath, tableName, dataGen); - HoodieSparkQuickstart.delete(spark, tablePath, tableName); - HoodieSparkQuickstart.deleteByPartition(spark, tablePath, tableName); + deleteByPartition(spark, tablePath, tableName); + queryData(spark, jsc, tablePath, tableName, dataGen); } finally { Utils.deleteRecursively(new File(tablePath)); } From f1a4f17c4c306eb7b8c8118e7bd8172a182aa95b Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 8 Aug 2022 18:15:07 -0700 Subject: [PATCH 4/5] Tidying up --- .../examples/quickstart/TestHoodieSparkQuickstart.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java index b8fd40d2ace39..192d9e7a8fb96 100644 --- a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java +++ b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java @@ -46,11 +46,11 @@ import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.updateData; public class TestHoodieSparkQuickstart implements SparkProvider { - protected static transient HoodieSparkEngineContext context; + protected static HoodieSparkEngineContext context; - private static transient SparkSession spark; - private static transient SQLContext sqlContext; - private static transient JavaSparkContext jsc; + private static SparkSession spark; + private static SQLContext sqlContext; + private static JavaSparkContext jsc; /** * An indicator of the initialization status. @@ -59,8 +59,6 @@ public class TestHoodieSparkQuickstart implements SparkProvider { @TempDir protected java.nio.file.Path tempDir; - private static final HoodieExampleDataGenerator DATA_GEN = new HoodieExampleDataGenerator<>(); - @Override public SparkSession spark() { return spark; From 25172a471763150066600d2d0d01d21f4e8d2df7 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 8 Aug 2022 18:27:02 -0700 Subject: [PATCH 5/5] Extracted common piece b/w the test and quickstart itself --- .../quickstart/HoodieSparkQuickstart.java | 35 +++++++++++-------- .../quickstart/TestHoodieSparkQuickstart.java | 21 ++--------- 2 files changed, 23 insertions(+), 33 deletions(-) diff --git a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java index 348902b99f87e..5a6db78f882e3 100644 --- a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java +++ b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java @@ -55,26 +55,33 @@ public static void main(String[] args) { SparkConf sparkConf = HoodieExampleSparkUtils.defaultSparkConf("hoodie-client-example"); try (JavaSparkContext jsc = new JavaSparkContext(sparkConf)) { - final HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator<>(); + runQuickstart(jsc, spark, tableName, tablePath); + } + } + + /** + * Visible for testing + */ + public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) { + final HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator<>(); - insertData(spark, jsc, tablePath, tableName, dataGen); - queryData(spark, jsc, tablePath, tableName, dataGen); + insertData(spark, jsc, tablePath, tableName, dataGen); + queryData(spark, jsc, tablePath, tableName, dataGen); - updateData(spark, jsc, tablePath, tableName, dataGen); - queryData(spark, jsc, tablePath, tableName, dataGen); + updateData(spark, jsc, tablePath, tableName, dataGen); + queryData(spark, jsc, tablePath, tableName, dataGen); - incrementalQuery(spark, tablePath, tableName); - pointInTimeQuery(spark, tablePath, tableName); + incrementalQuery(spark, tablePath, tableName); + pointInTimeQuery(spark, tablePath, tableName); - delete(spark, tablePath, tableName); - queryData(spark, jsc, tablePath, tableName, dataGen); + delete(spark, tablePath, tableName); + queryData(spark, jsc, tablePath, tableName, dataGen); - insertOverwriteData(spark, jsc, tablePath, tableName, dataGen); - queryData(spark, jsc, tablePath, tableName, dataGen); + insertOverwriteData(spark, jsc, tablePath, tableName, dataGen); + queryData(spark, jsc, tablePath, tableName, dataGen); - deleteByPartition(spark, tablePath, tableName); - queryData(spark, jsc, tablePath, tableName, dataGen); - } + deleteByPartition(spark, tablePath, tableName); + queryData(spark, jsc, tablePath, tableName, dataGen); } /** diff --git a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java index 192d9e7a8fb96..b9ab120460581 100644 --- a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java +++ b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java @@ -43,6 +43,7 @@ import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.insertOverwriteData; import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.pointInTimeQuery; import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.queryData; +import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.runQuickstart; import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.updateData; public class TestHoodieSparkQuickstart implements SparkProvider { @@ -107,25 +108,7 @@ public void testHoodieSparkQuickstart() { String tablePath = tablePath(tableName); try { - final HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator<>(); - - insertData(spark, jsc, tablePath, tableName, dataGen); - queryData(spark, jsc, tablePath, tableName, dataGen); - - updateData(spark, jsc, tablePath, tableName, dataGen); - queryData(spark, jsc, tablePath, tableName, dataGen); - - incrementalQuery(spark, tablePath, tableName); - pointInTimeQuery(spark, tablePath, tableName); - - delete(spark, tablePath, tableName); - queryData(spark, jsc, tablePath, tableName, dataGen); - - insertOverwriteData(spark, jsc, tablePath, tableName, dataGen); - queryData(spark, jsc, tablePath, tableName, dataGen); - - deleteByPartition(spark, tablePath, tableName); - queryData(spark, jsc, tablePath, tableName, dataGen); + runQuickstart(jsc, spark, tableName, tablePath); } finally { Utils.deleteRecursively(new File(tablePath)); }