Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package org.apache.spark.examples.sql.hive
// $example on:spark_hive$
import java.io.File

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
// $example off:spark_hive$

object SparkHiveExample {
Expand Down Expand Up @@ -102,8 +101,41 @@ object SparkHiveExample {
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ...
// $example off:spark_hive$

// Create Hive managed table with Parquet
sql("CREATE TABLE records(key int, value string) STORED AS PARQUET")
// Save DataFrame to Hive managed table as Parquet format
val hiveTableDF = sql("SELECT * FROM records")
hiveTableDF.write.mode(SaveMode.Overwrite).saveAsTable("database_name.records")
// Create External Hive table with Parquet
sql("CREATE EXTERNAL TABLE records(key int, value string) " +
"STORED AS PARQUET LOCATION '/user/hive/warehouse/'")
// to make Hive Parquet format compatible with Spark Parquet format
spark.sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")

// Multiple Parquet files could be created accordingly to volume of data under directory given.
val hiveExternalTableLocation = "/user/hive/warehouse/database_name.db/records"

// Save DataFrame to Hive External table as compatible Parquet format
hiveTableDF.write.mode(SaveMode.Overwrite).parquet(hiveExternalTableLocation)

// Turn on flag for Dynamic Partitioning
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

// You can create partitions in Hive table, so downstream queries run much faster.
hiveTableDF.write.mode(SaveMode.Overwrite).partitionBy("key")
.parquet(hiveExternalTableLocation)

// Reduce number of files for each partition by repartition
hiveTableDF.repartition($"key").write.mode(SaveMode.Overwrite)
.partitionBy("key").parquet(hiveExternalTableLocation)

// Control the number of files in each partition by coalesce
hiveTableDF.coalesce(10).write.mode(SaveMode.Overwrite)
.partitionBy("key").parquet(hiveExternalTableLocation)
// $example off:spark_hive$

spark.stop()
}
}