Cheatsheet for Apache Spark DataFrame.
DataFrame is simply a type alias of Dataset[Row]
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.master("local")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
-
create DataSet from seq
// vertically spark.createDataset(Seq(1, 2))
// horizontally val rows = spark.sparkContext.parallelize(Seq(Row.fromSeq(Seq(1, 2)))) val schema = StructType(Seq("col1", "col2").map(col => StructField(col, IntegerType, nullable = false))) spark.createDataFrame(rows, schema).show()
-
create DataSet from range
spark.createDataset(1 to 10)
-
create DataSet from array of tuples
spark.createDataset(Array((1, "Tom"), (2, "Jerry"))).toDF("id", "name")
val newNames = Seq("id", "x1", "x2", "x3") val dfRenamed = df.toDF(newNames: _*)
-
Seq to Dataset
List("a").toDS() Seq(1, 3, 5).toDS()
import spark.implicits._ Seq.empty[(String, Int)].toDF("k", "v")
-
create Dataset from Seq of case class
// define case class Person(name: String, age: Long) outside of the method. reason
val caseClassDS = Seq(Person("Andy", 32)).toDS() val caseClassDS = spark.createDataset(Seq(Person("Andy", 32), Person("Andy2", 33)))
-
create Dataset from RDD
import spark.implicits._ val rdd = sc.parallelize(1 to 5) spark.createDataset(rdd)
import spark.implicits._ val rdd = sc.parallelize(1 to 5) rdd.toDS().show() rdd.toDF().show()
val df = rdd.map({ case Row(val1: String, ..., valN: Long) => (val1, ..., valN)}).toDF("col1_name", ..., "colN_name")
// define case class Person(name: String, age: Long) outside of the method. reason
val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF()
-
create DataFrame from RDD with schema
val rows = freqItemsets.map(f => Row(f.items, f.freq)) val schema = StructType(Seq( StructField("items", dataset.schema($(featuresCol)).dataType, nullable = false), StructField("freq", LongType, nullable = false))) val frequentItems = dataset.sparkSession.createDataFrame(rows, schema)
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil) spark.createDataFrame(sc.emptyRDD[Row], schema).show()
-
create DataSet from File
spark.read.json("examples/src/main/resources/people.json")
// from json
val path = "examples/src/main/resources/people.json"`
val peopleDS = spark.read.json(path).as[Person]
// from text file
import spark.implicits._
val dataset = spark.read.textFile("data/mllib/sample_fpgrowth.txt")
.map(t => t.split(" ")).toDF("features")
// read from csv
val df = session.read
.format("csv")
.option("header", "true") //reading the headers
.option("mode", "DROPMALFORMED")
.csv("csv/file/path")
-
select with col function
import org.apache.spark.sql.functions._ dataset.select(col($(labelCol)), col($(featuresCol))).rdd.map { case Row(label: Double, features: Vector) => LabeledPoint(label, features) }
// avg average dataset.select(avg(inputCol)).as[Double].first()
// median (or other percentage) filtered.stat.approxQuantile(inputCol, Array(0.5), 0.001)
// check df empty df.rdd.isEmpty
// select array of columns df.select(cols.head, cols.tail: _*) df.select(cols.map(col): _*)
-
select with type
output.select("features").as[Vector].collect()
-
select with basic calculation
import spark.implicits._ df.select($"name", $"age" + 1).show()
-
select from temp view
df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people")
// Global temporary view is tied to a system preserved database `global_temp spark.sql("SELECT * FROM global_temp.people").show() // Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show()
-
select with sql
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
-
Filter
df.filter($"age" > 21).show()
val ic = col(inputCol) val filtered = dataset.select(ic.cast(DoubleType)) .filter(ic.isNotNull && ic =!= $(missingValue) && !ic.isNaN)
df.filter($"state" === "TX") df.filter("state = 'TX'") df.filter($"foo".contains("bar")) df.filter(not($"state" === "TX")) df.filter($"foo".like("bar"))
-
sort
import org.apache.spark.sql.functions._ df.orderBy(asc("col1")) df.sort(desc("col2"))
-
Rename column
df.select($"id".alias("x1")).show()
val lookup = Map("id" -> "foo", "value" -> "bar") df.select(df.columns.map(c => col(c).as(lookup.getOrElse(c, c))): _*)
df.withColumnRenamed("_1", "x1")
-
change column type (cast)
val df2 = df.select($"id", col("value").cast(StringType))
df.selectExpr("cast(year as int) year", "make")
-
GroupBy
df.groupBy("age").count().show()
val dfMax = df.groupBy($"id").agg(sum($"value"))
df.as[Record] .groupByKey(_.id) .reduceGroups((x, y) => x).show()
-
Window
import org.apache.spark.sql.functions.{rowNumber, max, broadcast} import org.apache.spark.sql.expressions.Window val temp = Window.partitionBy($"hour").orderBy($"TotalValue".desc) val top = df.withColumn("rn", rowNumber.over(temp)).where($"rn" === 1)
-
join
df.join(broadcast(dfMax), "col1").show()
Leaddetails.join( Utm_Master, Leaddetails("LeadSource") <=> Utm_Master("LeadSource") && Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source") && Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium") && Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"), "left" )
-
concat
df.createOrReplaceTempView("df") spark.sql("SELECT CONCAT(id, ' ', value) as cc FROM df").show()
df.select(concat($"id", lit(" "), $"value"))
-
with generic
private def genericFit[T: ClassTag](dataset: Dataset[_]): FPGrowthModel = { val data = dataset.select($(featuresCol)) val items = data.where(col($(featuresCol)).isNotNull).rdd.map(r => r.getSeq[T](0).toArray) ... }
-
when
val ic = col(inputCol) outputDF = outputDF.withColumn(outputCol, when(ic.isNull, surrogate) .when(ic === $(missingValue), surrogate) .otherwise(ic) .cast(inputType))
val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"} val sqlfunc = udf(coder) myDF.withColumn("Code", sqlfunc(col("Amt")))
// (1, -1) label to (1, 0) label df.select($"id", when($"label" === 1, 1).otherwise(0).as("label")).show()
// drop NaN and null df.na.drop().show()
-
cube
ds.cube($"department", $"gender").agg(Map( "salary" -> "avg", "age" -> "max" ))
-
statistics
df.stat.freqItems(Seq("id")).show() df.stat.approxQuantile(...) df.stat.bloomFilter(...) df.stat.countMinSketch()
// count distinct df.select(approx_count_distinct(col("value"))).show()
-
append constant
import org.apache.spark.sql.functions._ df.withColumn("new_column", lit(10)).show()
df.withColumn("map", map(lit("key1"), lit(1), lit("key2"), lit(2)))
df.select('*', (df.age + 10).alias('newAge'))
-
select DataFrame with UDF
protected def raw2prediction(rawPrediction: Vector): Double = rawPrediction.argmax ... udf(raw2prediction _).apply(col(getRawPredictionCol))
val predictUDF = udf { (features: Any) => bcastModel.value.predict(features.asInstanceOf[Vector]) } dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol))))
// concat two columns with udf //Define a udf to concatenate two passed in string values val getConcatenated = udf( (first: String, second: String) => { first + " " + second } ) //use withColumn method to add a new column called newColName df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show()
-
print schema
df.printSchema()
dataset.schema($(labelCol))
df.explain()
// spark internal SchemaUtils.checkColumnTypes(schema, inputCol, Seq(DoubleType, FloatType))
MetadataUtils.getNumClasses(dataset.schema($(labelCol)))
-
repartition
df.repartition($"value") df.explain()
df.repartition(2)
-
custom class
import spark.implicits._ class MyObj(val i: Int) implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj] // ... val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String]) // alias for the type to convert to and from type MyObjEncoded = (Int, String, Set[String]) // implicit conversions implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s) implicit def fromEncoded(e: MyObjEncoded): MyObj = new MyObj(e._1, java.util.UUID.fromString(e._2), e._3) val d = spark.createDataset(Seq[MyObjEncoded]( new MyObj(1, java.util.UUID.randomUUID, Set("foo")), new MyObj(2, java.util.UUID.randomUUID, Set("bar")) )).toDF("i","u","s").as[MyObjEncoded]
-
parquet
df.write.parquet(dataPath) ... val data = sparkSession.read.format("parquet").load(dataPath) val Row(coefficients: Vector, intercept: Double) = data.select("coefficients", "intercept").head()
-
checkpoint
df.checkpoint()
-
save by key
df.write.partitionBy("id").text("people")