Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package org.apache.spark.examples.sql.hive
// $example on:spark_hive$
import java.io.File

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
// $example off:spark_hive$

object SparkHiveExample {
Expand Down Expand Up @@ -104,6 +103,60 @@ object SparkHiveExample {
// ...
// $example off:spark_hive$
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you not want the code below to render in the docs as part of the example? maybe not, just checking if that's intentional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen Thank you for valueable feedback review, I have added that so it can help other develoeprs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen Can you please review this cc\ @holdenk @sameeragarwal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen I have updated DDL when storing data with parititoning in Hive.
cc\ @HyukjinKwon @mgaido91 @markgrover @markhamstra

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you turn the example listing off then on again? just remove those two lines

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen I mis-understood your first comment. I have reverted as suggested. Please check now


// to save DataFrame to Hive Managed table as Parquet format

/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh just noticed this. You're using javadoc style comments here, but they won't have effect.
just use the // block style for comments that you see above, for consistency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen Done, changes addressed

* 1. Create Hive Database / Schema with location at HDFS if you want to mentioned explicitly else default
* warehouse location will be used to store Hive table Data.
* Ex: CREATE DATABASE IF NOT EXISTS database_name LOCATION hdfs_path;
* You don't have to explicitly give location for each table, every tables under specified schema will be located at
* location given while creating schema.
* 2. Create Hive Managed table with storage format as 'Parquet'
* Ex: CREATE TABLE records(key int, value string) STORED AS PARQUET;
*/
val hiveTableDF = sql("SELECT * FROM records").toDF()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.toDF is not needed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I think spark.table("records") is a better example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen done cc\ @cloud-fan removed toDF()

hiveTableDF.write.mode(SaveMode.Overwrite).saveAsTable("database_name.records")

// to save DataFrame to Hive External table as compatible parquet format.
/*
* 1. Create Hive External table with storage format as parquet.
* Ex: CREATE EXTERNAL TABLE records(key int, value string) STORED AS PARQUET;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's weird to create an external table without a location. User may be confused between the difference between managed table and external table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan we'll keep all comments description at documentation with user friendly lines. I have added location also.

* Since we are not explicitly providing hive database location, it automatically takes default warehouse location
* given to 'spark.sql.warehouse.dir' while creating SparkSession with enableHiveSupport().
* For example, we have given '/user/hive/warehouse/' as a Hive Warehouse location. It will create schema directories
* under '/user/hive/warehouse/' as '/user/hive/warehouse/database_name.db' and '/user/hive/warehouse/database_name'.
*/

// to make Hive parquet format compatible with spark parquet format
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parquet ->Parquet

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark -> Spark

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for highlight, improved the same.

spark.sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")
// Multiple parquet files could be created accordingly to volume of data under directory given.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parquet -> Parquet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for highlight, improved the same.

val hiveExternalTableLocation = s"/user/hive/warehouse/database_name.db/records"
hiveTableDF.write.mode(SaveMode.Overwrite).parquet(hiveExternalTableLocation)

// turn on flag for Dynamic Partitioning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

turn -> Turn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for highlight, improved the same.

spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// You can create partitions in Hive table, so downstream queries run much faster.
hiveTableDF.write.mode(SaveMode.Overwrite).partitionBy("key")
.parquet(hiveExternalTableLocation)
/*
If Data volume is very huge, then every partitions would have many small-small files which may harm
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more stuff that should go in docs, not comments in an example. It kind of duplicates existing documentation. Is this commentary really needed to illustrate usage of the API? that's the only goal right here.

What are small-small files? You have some inconsistent capitalization; Parquet should be capitalized but not file, bandwidth, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen I totally agree with you. I will rephrase content for docs. from here: i have removed as of now. please check and do needful.

downstream query performance due to File I/O, Bandwidth I/O, Network I/O, Disk I/O.
To improve performance you can create single parquet file under each partition directory using 'repartition'
on partitioned key for Hive table.
*/
hiveTableDF.repartition($"key").write.mode(SaveMode.Overwrite)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a standard usage, let's not put it in the example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan removed all comments , as discussed with @srowen it does really make sense to have at docs with removed inconsitency.

.partitionBy("key").parquet(hiveExternalTableLocation)

/*
You can also do coalesce to control number of files under each partitions, repartition does full shuffle and equal
data distribution to all partitions. here coalesce can reduce number of files to given 'Int' argument without
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sentences need some cleanup here. What do you mean by 'Int' argument? maybe it's best to point people to the API docs rather than incompletely repeat it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen done.

full data shuffle.
*/
// coalesce of 10 could create 10 parquet files under each partitions,
// if data is huge and make sense to do partitioning.
hiveTableDF.coalesce(10).write.mode(SaveMode.Overwrite)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.partitionBy("key").parquet(hiveExternalTableLocation)
spark.stop()
}
}