Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package org.apache.spark.examples.sql.hive
// $example on:spark_hive$
import java.io.File

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
// $example off:spark_hive$

object SparkHiveExample {
Expand Down Expand Up @@ -102,8 +101,41 @@ object SparkHiveExample {
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ...
// $example off:spark_hive$

// Create Hive managed table with parquet
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parquet -> Parquet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for highlight, improved the same.

sql("CREATE TABLE records(key int, value string) STORED AS PARQUET")
// Save DataFrame to Hive Managed table as Parquet format
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Managed -> managed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for highlight, improved the same.

val hiveTableDF = sql("SELECT * FROM records")
hiveTableDF.write.mode(SaveMode.Overwrite).saveAsTable("database_name.records")
// Create External Hive table with parquet
sql("CREATE EXTERNAL TABLE records(key int, value string) " +
"STORED AS PARQUET LOCATION '/user/hive/warehouse/'")
// to make Hive parquet format compatible with spark parquet format
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parquet ->Parquet

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark -> Spark

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for highlight, improved the same.

spark.sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")

// Multiple parquet files could be created accordingly to volume of data under directory given.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parquet -> Parquet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for highlight, improved the same.

val hiveExternalTableLocation = "/user/hive/warehouse/database_name.db/records"

// Save DataFrame to Hive External table as compatible parquet format
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parquet ->Parquet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for highlight, improved the same.

hiveTableDF.write.mode(SaveMode.Overwrite).parquet(hiveExternalTableLocation)

// turn on flag for Dynamic Partitioning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

turn -> Turn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for highlight, improved the same.

spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

// You can create partitions in Hive table, so downstream queries run much faster.
hiveTableDF.write.mode(SaveMode.Overwrite).partitionBy("key")
.parquet(hiveExternalTableLocation)

// reduce number of files for each partition by repartition
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reduce -> Reduce.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for highlight, improved the same.

hiveTableDF.repartition($"key").write.mode(SaveMode.Overwrite)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a standard usage, let's not put it in the example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan removed all comments , as discussed with @srowen it does really make sense to have at docs with removed inconsitency.

.partitionBy("key").parquet(hiveExternalTableLocation)

// Control number of files in each partition by coalesce
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Control number of files -> Control the number of files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for highlight, improved the same.

hiveTableDF.coalesce(10).write.mode(SaveMode.Overwrite)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.partitionBy("key").parquet(hiveExternalTableLocation)
// $example off:spark_hive$

spark.stop()
}
}