diff --git a/docs/_docs/1_1_quick_start_guide.md b/docs/_docs/1_1_quick_start_guide.md index 256e5600881a1..8111acf532f37 100644 --- a/docs/_docs/1_1_quick_start_guide.md +++ b/docs/_docs/1_1_quick_start_guide.md @@ -17,8 +17,8 @@ From the extracted directory run spark-shell with Hudi as: ```scala spark-2.4.4-bin-hadoop2.7/bin/spark-shell \ - --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \ - --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' + --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \ + --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' ```
@@ -58,14 +58,14 @@ Generate some new trips, load them into a DataFrame and write the DataFrame into ```scala val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) -df.write.format("org.apache.hudi"). - options(getQuickstartWriteConfigs). - option(PRECOMBINE_FIELD_OPT_KEY, "ts"). - option(RECORDKEY_FIELD_OPT_KEY, "uuid"). - option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). - option(TABLE_NAME, tableName). - mode(Overwrite). - save(basePath); +df.write.format("hudi"). + options(getQuickstartWriteConfigs). + option(PRECOMBINE_FIELD_OPT_KEY, "ts"). + option(RECORDKEY_FIELD_OPT_KEY, "uuid"). + option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). + option(TABLE_NAME, tableName). + mode(Overwrite). + save(basePath) ``` `mode(Overwrite)` overwrites and recreates the table if it already exists. @@ -84,10 +84,11 @@ Load the data files into a DataFrame. ```scala val tripsSnapshotDF = spark. - read. - format("org.apache.hudi"). - load(basePath + "/*/*/*/*") + read. + format("hudi"). + load(basePath + "/*/*/*/*") tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") + spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show() ``` @@ -104,15 +105,15 @@ and write DataFrame into the hudi table. ```scala val updates = convertToStringList(dataGen.generateUpdates(10)) -val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)); -df.write.format("org.apache.hudi"). - options(getQuickstartWriteConfigs). - option(PRECOMBINE_FIELD_OPT_KEY, "ts"). - option(RECORDKEY_FIELD_OPT_KEY, "uuid"). - option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). - option(TABLE_NAME, tableName). - mode(Append). - save(basePath); +val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) +df.write.format("hudi"). + options(getQuickstartWriteConfigs). + option(PRECOMBINE_FIELD_OPT_KEY, "ts"). + option(RECORDKEY_FIELD_OPT_KEY, "uuid"). + option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). + option(TABLE_NAME, tableName). + mode(Append). + save(basePath) ``` Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time. @@ -129,22 +130,21 @@ We do not need to specify endTime, if we want all changes after the given commit ```scala // reload data spark. - read. - format("org.apache.hudi"). - load(basePath + "/*/*/*/*"). - createOrReplaceTempView("hudi_trips_snapshot") + read. + format("hudi"). + load(basePath + "/*/*/*/*"). + createOrReplaceTempView("hudi_trips_snapshot") val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) // commit time we are interested in // incrementally query data -val tripsIncrementalDF = spark. - read. - format("org.apache.hudi"). - option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). - option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). - load(basePath); +val tripsIncrementalDF = spark.read.format("hudi"). + option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). + option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). + load(basePath) tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") + spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show() ``` @@ -162,11 +162,11 @@ val beginTime = "000" // Represents all commits > this time. val endTime = commits(commits.length - 2) // commit time we are interested in //incrementally query data -val tripsPointInTimeDF = spark.read.format("org.apache.hudi"). - option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). - option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). - option(END_INSTANTTIME_OPT_KEY, endTime). - load(basePath); +val tripsPointInTimeDF = spark.read.format("hudi"). + option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). + option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). + option(END_INSTANTTIME_OPT_KEY, endTime). + load(basePath) tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show() ``` @@ -176,31 +176,31 @@ Delete records for the HoodieKeys passed in. ```scala // fetch total records count -spark.sql("select uuid, partitionPath from hudi_ro_table").count() +spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count() // fetch two records to be deleted -val ds = spark.sql("select uuid, partitionPath from hudi_ro_table").limit(2) +val ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2) // issue deletes val deletes = dataGen.generateDeletes(ds.collectAsList()) val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2)); -df.write.format("org.apache.hudi"). -options(getQuickstartWriteConfigs). -option(OPERATION_OPT_KEY,"delete"). -option(PRECOMBINE_FIELD_OPT_KEY, "ts"). -option(RECORDKEY_FIELD_OPT_KEY, "uuid"). -option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). -option(TABLE_NAME, tableName). -mode(Append). -save(basePath); +df.write.format("hudi"). + options(getQuickstartWriteConfigs). + option(OPERATION_OPT_KEY,"delete"). + option(PRECOMBINE_FIELD_OPT_KEY, "ts"). + option(RECORDKEY_FIELD_OPT_KEY, "uuid"). + option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). + option(TABLE_NAME, tableName). + mode(Append). + save(basePath) // run the same read query as above. val roAfterDeleteViewDF = spark. - read. - format("org.apache.hudi"). - load(basePath + "/*/*/*/*") -roAfterDeleteViewDF.registerTempTable("hudi_ro_table") + read. + format("hudi"). + load(basePath + "/*/*/*/*") +roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") // fetch should return (total - 2) records -spark.sql("select uuid, partitionPath from hudi_ro_table").count() +spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count() ``` Note: Only `Append` mode is supported for delete operation.