diff --git a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java index 9f8e29d68773f..5a6db78f882e3 100644 --- a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java +++ b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java @@ -55,18 +55,33 @@ public static void main(String[] args) { SparkConf sparkConf = HoodieExampleSparkUtils.defaultSparkConf("hoodie-client-example"); try (JavaSparkContext jsc = new JavaSparkContext(sparkConf)) { - final HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator<>(); + runQuickstart(jsc, spark, tableName, tablePath); + } + } - insertData(spark, jsc, tablePath, tableName, dataGen); - updateData(spark, jsc, tablePath, tableName, dataGen); - queryData(spark, jsc, tablePath, tableName, dataGen); + /** + * Visible for testing + */ + public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) { + final HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator<>(); - incrementalQuery(spark, tablePath, tableName); - pointInTimeQuery(spark, tablePath, tableName); + insertData(spark, jsc, tablePath, tableName, dataGen); + queryData(spark, jsc, tablePath, tableName, dataGen); - delete(spark, tablePath, tableName); - deleteByPartition(spark, tablePath, tableName); - } + updateData(spark, jsc, tablePath, tableName, dataGen); + queryData(spark, jsc, tablePath, tableName, dataGen); + + incrementalQuery(spark, tablePath, tableName); + pointInTimeQuery(spark, tablePath, tableName); + + delete(spark, tablePath, tableName); + queryData(spark, jsc, tablePath, tableName, dataGen); + + insertOverwriteData(spark, jsc, tablePath, tableName, dataGen); + queryData(spark, jsc, tablePath, tableName, dataGen); + + deleteByPartition(spark, tablePath, tableName); + queryData(spark, jsc, tablePath, tableName, dataGen); } /** @@ -77,6 +92,7 @@ public static void insertData(SparkSession spark, JavaSparkContext jsc, String t String commitTime = Long.toString(System.currentTimeMillis()); List inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20)); Dataset df = spark.read().json(jsc.parallelize(inserts, 1)); + df.write().format("org.apache.hudi") .options(QuickstartUtils.getQuickstartWriteConfigs()) .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts") @@ -87,6 +103,27 @@ public static void insertData(SparkSession spark, JavaSparkContext jsc, String t .save(tablePath); } + /** + * Generate new records, load them into a {@link Dataset} and insert-overwrite it into the Hudi dataset + */ + public static void insertOverwriteData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName, + HoodieExampleDataGenerator dataGen) { + String commitTime = Long.toString(System.currentTimeMillis()); + List inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20)); + Dataset df = spark.read().json(jsc.parallelize(inserts, 1)); + + df.write().format("org.apache.hudi") + .options(QuickstartUtils.getQuickstartWriteConfigs()) + .option("hoodie.datasource.write.operation", WriteOperationType.INSERT_OVERWRITE.name()) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts") + .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid") + .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath") + .option(TBL_NAME.key(), tableName) + .mode(Append) + .save(tablePath); + } + + /** * Load the data files into a DataFrame. */ diff --git a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java index 212dcc440933f..b9ab120460581 100644 --- a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java +++ b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java @@ -36,12 +36,22 @@ import java.io.File; import java.nio.file.Paths; +import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.delete; +import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.deleteByPartition; +import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.incrementalQuery; +import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.insertData; +import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.insertOverwriteData; +import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.pointInTimeQuery; +import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.queryData; +import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.runQuickstart; +import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.updateData; + public class TestHoodieSparkQuickstart implements SparkProvider { - protected static transient HoodieSparkEngineContext context; + protected static HoodieSparkEngineContext context; - private static transient SparkSession spark; - private static transient SQLContext sqlContext; - private static transient JavaSparkContext jsc; + private static SparkSession spark; + private static SQLContext sqlContext; + private static JavaSparkContext jsc; /** * An indicator of the initialization status. @@ -50,8 +60,6 @@ public class TestHoodieSparkQuickstart implements SparkProvider { @TempDir protected java.nio.file.Path tempDir; - private static final HoodieExampleDataGenerator DATA_GEN = new HoodieExampleDataGenerator<>(); - @Override public SparkSession spark() { return spark; @@ -100,15 +108,7 @@ public void testHoodieSparkQuickstart() { String tablePath = tablePath(tableName); try { - HoodieSparkQuickstart.insertData(spark, jsc, tablePath, tableName, DATA_GEN); - HoodieSparkQuickstart.updateData(spark, jsc, tablePath, tableName, DATA_GEN); - - HoodieSparkQuickstart.queryData(spark, jsc, tablePath, tableName, DATA_GEN); - HoodieSparkQuickstart.incrementalQuery(spark, tablePath, tableName); - HoodieSparkQuickstart.pointInTimeQuery(spark, tablePath, tableName); - - HoodieSparkQuickstart.delete(spark, tablePath, tableName); - HoodieSparkQuickstart.deleteByPartition(spark, tablePath, tableName); + runQuickstart(jsc, spark, tableName, tablePath); } finally { Utils.deleteRecursively(new File(tablePath)); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteFromTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteFromTable.scala new file mode 100644 index 0000000000000..a972f835e8054 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteFromTable.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +class TestDeleteFromTable extends HoodieSparkSqlTestBase { + + test("Test deleting from table") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + spark.sql( + s""" + |CREATE TABLE $tableName ( + | id int, + | dt string, + | name string, + | price double, + | ts long + |) USING hudi + | tblproperties ( + | primaryKey = 'id', + | tableType = '$tableType' + | ) + | PARTITIONED BY (dt) + | LOCATION '${tmp.getCanonicalPath}' + """.stripMargin) + + // NOTE: Do not write the field alias, the partition field must be placed last. + spark.sql( + s""" + | INSERT INTO $tableName VALUES + | (1, 'a1', 10, 1000, "2021-01-05"), + | (2, 'a2', 20, 2000, "2021-01-06"), + | (3, 'a3', 30, 3000, "2021-01-07") + """.stripMargin) + + checkAnswer(s"SELECT id, name, price, ts, dt FROM $tableName")( + Seq(1, "a1", 10.0, 1000, "2021-01-05"), + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3", 30.0, 3000, "2021-01-07") + ) + + // Delete single row + spark.sql(s"DELETE FROM $tableName WHERE id = 1") + + checkAnswer(s"SELECT id, name, price, ts, dt FROM $tableName")( + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3", 30.0, 3000, "2021-01-07") + ) + + // Try deleting non-existent row + spark.sql(s"DELETE FROM $tableName WHERE id = 1") + + checkAnswer(s"SELECT id, name, price, ts, dt FROM $tableName")( + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3", 30.0, 3000, "2021-01-07") + ) + + // Delete record identified by some field other than the primary-key + spark.sql(s"DELETE FROM $tableName WHERE name = 'a2'") + + checkAnswer(s"SELECT id, name, price, ts, dt FROM $tableName")( + Seq(3, "a3", 30.0, 3000, "2021-01-07") + ) + } + } + } +} diff --git a/hudi-utilities/src/test/resources/hive-site.xml b/hudi-utilities/src/test/resources/hive-site.xml new file mode 100644 index 0000000000000..4866230d2707b --- /dev/null +++ b/hudi-utilities/src/test/resources/hive-site.xml @@ -0,0 +1,35 @@ + + + + + + + hive.server2.enable.doAs + false + + + + hive.metastore.schema.verification + false + + + + datanucleus.schema.autoCreateTables + true + +