Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ crossScalaVersions := Seq("2.11.12", "2.12.7")
scalaVersion := "2.11.12"
sparkVersion := "2.4.2"

libraryDependencies += "org.apache.spark" %% "spark-catalyst" % "2.4.2" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.2" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.4.2" % "provided"

Expand Down
117 changes: 117 additions & 0 deletions src/main/scala/com/github/mrpowers/spark/daria/sql/functions.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.github.mrpowers.spark.daria.sql

import org.apache.spark.sql.catalyst.expressions.daria._
import org.apache.spark.sql.catalyst.expressions._

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

Expand All @@ -15,6 +18,120 @@ import scala.reflect.runtime.universe._
* @groupname Ungrouped Support functions for DataFrames
*/
object functions {
private def withExpr(e: Expression) = new Column(e)

private def expressionFunction(f: Column => Column): Expression => Expression =
x => f(new Column(x)).expr

private def expressionFunction(f: (Column, Column) => Column): (Expression, Expression) => Expression =
(x, y) =>
f(
new Column(x),
new Column(y)
).expr

private def expressionFunction(f: (Column, Column, Column) => Column): (Expression, Expression, Expression) => Expression =
(x, y, z) =>
f(
new Column(x),
new Column(y),
new Column(z)
).expr

/**
* Returns an array of elements after applying a tranformation to each element
* in the input array.
*
* @group collection_funcs
*/
def transform(column: Column, f: Column => Column): Column = withExpr {
HigherOrderUtils.transform(
column.expr,
expressionFunction(f)
)
}

/**
* Returns an array of elements after applying a tranformation to each element
* in the input array.
*
* @group collection_funcs
*/
def transform(column: Column, f: (Column, Column) => Column): Column = withExpr {
HigherOrderUtils.transform(
column.expr,
expressionFunction(f)
)
}

/**
* Returns whether a predicate holds for one or more elements in the array.
*
* @group collection_funcs
*/
def exists(column: Column, f: Column => Column): Column = withExpr {
HigherOrderUtils.exists(
column.expr,
expressionFunction(f)
)
}

/**
* Returns an array of elements for which a predicate holds in a given array.
*
* @group collection_funcs
*/
def filter(column: Column, f: Column => Column): Column = withExpr {
HigherOrderUtils.filter(
column.expr,
expressionFunction(f)
)
}

/**
* Applies a binary operator to an initial state and all elements in the array,
* and reduces this to a single state. The final state is converted into the final result
* by applying a finish function.
*
* @group collection_funcs
*/
def aggregate(expr: Column, zero: Column, merge: (Column, Column) => Column, finish: Column => Column): Column = withExpr {
HigherOrderUtils.aggregate(
expr.expr,
zero.expr,
expressionFunction(merge),
expressionFunction(finish)
)
}

/**
* Applies a binary operator to an initial state and all elements in the array,
* and reduces this to a single state.
*
* @group collection_funcs
*/
def aggregate(expr: Column, zero: Column, merge: (Column, Column) => Column): Column =
aggregate(
expr,
zero,
merge,
c => c
)

/**
* Merge two given arrays, element-wise, into a signle array using a function.
* If one array is shorter, nulls are appended at the end to match the length of the longer
* array, before applying the function.
*
* @group collection_funcs
*/
def zip_with(left: Column, right: Column, f: (Column, Column) => Column): Column = withExpr {
HigherOrderUtils.zip_with(
left.expr,
right.expr,
expressionFunction(f)
)
}

/**
* Replaces all whitespace in a string with single spaces
Expand Down
Loading