Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
09d0d5c
Start work on Jython UDF support
holdenk May 20, 2016
64954e4
More work on the calling it from Python side
holdenk May 20, 2016
b820135
Ok the basics work but maybe not to use reflection in base for int/long
holdenk May 20, 2016
f6462e3
Ok it now works for single elem inputs and integer/array of string re…
holdenk May 21, 2016
0c16ff3
Take zero to 2 arguments
holdenk May 21, 2016
0a74efc
PyLint and expand a bit on the error cases
holdenk May 21, 2016
21e9f4e
Switch from json back to pickle
holdenk May 21, 2016
4d72647
Reeeeealllllly sketchy Row-ish-support-ish
holdenk May 21, 2016
c978215
Style fixes
holdenk May 21, 2016
fed0beb
Use generic Row
holdenk May 22, 2016
d530712
Start on a bit of ScalaDoc and mark classes as private
holdenk May 22, 2016
68ba3b8
Remove debug prints
holdenk May 22, 2016
b1b39bb
Doc params
holdenk May 22, 2016
2788285
Start adding some tests
holdenk May 22, 2016
6e96430
Remove some ignores
holdenk May 22, 2016
b6f4aa3
Merge branch 'master' into SPARK-15369-investigate-selectively-using-…
holdenk May 22, 2016
87900b4
python3 compatability (yay), also doctests + dill doesn't play super …
holdenk May 22, 2016
bd00c6c
Start adding tests for jython functionality (broken)
holdenk May 22, 2016
9e173d6
PySpark tests
holdenk May 23, 2016
764929e
Update the tests, seems to work in py2 - need to fix issue with skipp…
holdenk May 23, 2016
e7bf7be
Merge branch 'master' into SPARK-15369-investigate-selectively-using-…
holdenk May 23, 2016
a404a5a
Skip on dill not being available
holdenk May 24, 2016
ee57eef
Handle closure arguments (aww yeah) and make the tests pass (py2 w/di…
holdenk May 24, 2016
be55ded
Suppoer python 2 and 3 closures
holdenk May 24, 2016
6e69628
Merge branch 'master' into SPARK-15369-investigate-selectively-using-…
holdenk May 24, 2016
7055e66
broadcast the LazyJythonFunc
holdenk May 25, 2016
aacc311
Refactor a bit to simplify the imports/vars/setup code and allow skip…
holdenk May 25, 2016
b4a8e22
pep8 fixes
holdenk May 25, 2016
c84aca6
Start adding sql udf perf
holdenk May 25, 2016
80507b1
pep8ify the new example
holdenk May 25, 2016
e657e32
Merge branch 'master' into SPARK-15369-investigate-selectively-using-…
holdenk May 26, 2016
0dae122
Merge branch 'master' into SPARK-15369-investigate-selectively-using-…
holdenk May 27, 2016
f09fd44
Merge branch 'master' into SPARK-15369-investigate-selectively-using-…
holdenk May 27, 2016
4b4723f
Fix up some scaladoc/pydoc
holdenk May 27, 2016
87d1a29
Fix pref example
holdenk May 27, 2016
5b94752
Fix cleanup
holdenk May 27, 2016
e99e020
Fix import creation
holdenk May 27, 2016
0e8142d
Change path extension code to not depend on being launched with pyspark
holdenk May 28, 2016
ed4703a
unpersist rdd explicitly
holdenk May 28, 2016
e25ba51
Merge branch 'master' into SPARK-15369-investigate-selectively-using-…
holdenk May 30, 2016
988f76c
Generate 1 through 22
holdenk May 31, 2016
c1d5d6c
Merge branch 'master' into SPARK-15369-investigate-selectively-using-…
holdenk Jun 6, 2016
35f9fbe
Fix python style issues with sql_udf_perf
holdenk Jun 6, 2016
36ab14b
Merge branch 'master' into SPARK-15369-investigate-selectively-using-…
holdenk Jun 8, 2016
e84e52a
Merge branch 'master' into SPARK-15369-investigate-selectively-using-…
holdenk Jun 8, 2016
1ca895a
Update deps to include jython
holdenk Jun 9, 2016
fe91f54
Merge branch 'master' into SPARK-15369-investigate-selectively-using-…
holdenk Jun 10, 2016
e56c715
Start working on Python 2.6 support (although supposed to be dropped …
holdenk Jun 10, 2016
8c056a8
Fix long line
holdenk Jun 10, 2016
17880f2
fix profiler
holdenk Jun 10, 2016
84e1bf1
Merge branch 'master' into SPARK-15369-investigate-selectively-using-…
holdenk Jun 11, 2016
0244f34
Merge in master - conflict in gitingore
holdenk Jul 5, 2016
bfa39e8
Merge in master
holdenk Jul 25, 2016
06f753e
Merge branch 'master' into SPARK-15369-investigate-selectively-using-…
holdenk Jul 25, 2016
7ab97b9
Merge in master
holdenk Aug 18, 2016
75404b7
Merge branch 'master' into SPARK-15369-investigate-selectively-using-…
holdenk Sep 8, 2016
fbe4549
Update to use SparkSession for parseDataType - change happened in mas…
holdenk Sep 9, 2016
6a127e5
Merge branch 'master' into SPARK-15369-investigate-selectively-using-…
holdenk Sep 21, 2016
c00d71c
Merge in master
holdenk Oct 4, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,7 @@ spark-warehouse/
.Rhistory
*.Rproj
*.Rproj.*

.Rproj.user

# For Jython
*py.class
1 change: 1 addition & 0 deletions dev/deps/spark-deps-hadoop-2.2
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ jsr305-1.3.9.jar
jta-1.1.jar
jtransforms-2.4.0.jar
jul-to-slf4j-1.7.16.jar
jython-standalone-2.7.0.jar
kryo-shaded-3.0.3.jar
leveldbjni-all-1.8.jar
libfb303-0.9.2.jar
Expand Down
1 change: 1 addition & 0 deletions dev/deps/spark-deps-hadoop-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ jsr305-1.3.9.jar
jta-1.1.jar
jtransforms-2.4.0.jar
jul-to-slf4j-1.7.16.jar
jython-standalone-2.7.0.jar
kryo-shaded-3.0.3.jar
leveldbjni-all-1.8.jar
libfb303-0.9.2.jar
Expand Down
1 change: 1 addition & 0 deletions dev/deps/spark-deps-hadoop-2.4
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ jsr305-1.3.9.jar
jta-1.1.jar
jtransforms-2.4.0.jar
jul-to-slf4j-1.7.16.jar
jython-standalone-2.7.0.jar
kryo-shaded-3.0.3.jar
leveldbjni-all-1.8.jar
libfb303-0.9.2.jar
Expand Down
1 change: 1 addition & 0 deletions dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ jsr305-1.3.9.jar
jta-1.1.jar
jtransforms-2.4.0.jar
jul-to-slf4j-1.7.16.jar
jython-standalone-2.7.0.jar
kryo-shaded-3.0.3.jar
leveldbjni-all-1.8.jar
libfb303-0.9.2.jar
Expand Down
1 change: 1 addition & 0 deletions dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ jsr305-1.3.9.jar
jta-1.1.jar
jtransforms-2.4.0.jar
jul-to-slf4j-1.7.16.jar
jython-standalone-2.7.0.jar
kryo-shaded-3.0.3.jar
leveldbjni-all-1.8.jar
libfb303-0.9.2.jar
Expand Down
100 changes: 100 additions & 0 deletions examples/src/main/python/sql_udf_perf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import print_function

import os
import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
from pyspark.sql.types import *
import timeit


def legacy_word_count(rdd):
wc = rdd.flatMap(lambda x: x.split(" ")).map(lambda w: (w, 1)).reduceByKey(lambda x, y: x + y)
return wc.count()


def dataframe_udf_word_count(tokenizeUDF, df):
wc = df.select(tokenizeUDF(df['value']).alias("w")).select(explode("w").alias("words")) \
.groupBy("words").count()
return wc.count()


def dataframe_scala_udf_word_count(df):
wc = df.select(split(df['value'], " ").alias("w")).select(explode("w").alias("words")) \
.groupBy("words").count()
return wc.count()


def benchmark(textInputPath, repeat, number):
"""
Benchmark wordcount on a provided text input path with a given number
of repeats each executed number times.

:param textInputPath: The input path to perform wordcount on.
:param repeat: Number of times to repeat the test
:param number: Number of iterations to perform the wordcount per test
"""
def benchmark_func(func):
return timeit.repeat(func, repeat=repeat, number=number)

print("Benchmarking wordcount:")
tokenize = lambda x: x.split(" ")
returnUDFType = ArrayType(StringType())
tokenizeUDF = session.catalog.registerFunction("split", tokenize, returnUDFType)
tokenizeJythonUDF = session.catalog.registerJythonFunction("split", tokenize, returnUDFType)
rdd = sc.textFile(textInputPath)
rdd.cache()
rdd.count()
print("RDD:")
print(timeit.repeat(lambda: legacy_word_count(rdd), repeat=repeat, number=number))
rdd.unpersist()
df = session.read.text(textInputPath)
df.cache()
df.count()
print("DataFrame Python UDF:")
python_udf_test_lambda = lambda: dataframe_udf_word_count(tokenizeUDF, df)
python_udf_times = timeit.repeat(python_udf_test_lambda, repeat=repeat, number=number)
print(python_udf_times)
print("DataFrame Jython UDF:")
jython_udf_test_lambda = lambda: dataframe_udf_word_count(tokenizeJythonUDF, df)
jython_udf_times = timeit.repeat(jython_udf_test_lambda, repeat=repeat, number=number)
print(jython_udf_times)
print("DataFrame Scala UDF:")
scala_udf_test_lambda = lambda: dataframe_scala_udf_word_count(df)
scala_udf_times = timeit.repeat(scala_udf_test_lambda, repeat=repeat, number=number)
print(scala_udf_times)


if __name__ == "__main__":
if len(sys.argv) != 4:
print("Usage: sql_udf_perf <inputPath> <repeat> <number>", file=sys.stderr)
sys.exit(-1)

session = SparkSession\
.builder\
.appName("PythonSQL Per")\
.getOrCreate()
sc = session._sc
spark = session
textInputPath = sys.argv[1]
repeat = int(sys.argv[2])
number = int(sys.argv[3])
benchmark(textInputPath, repeat, number)
5 changes: 4 additions & 1 deletion python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
from pyspark.broadcast import Broadcast
from pyspark.conf import SparkConf
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
# Detect when running in Jython and skip java_gateway
is_jython = "JDK" in sys.version
if not is_jython:
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer, AutoBatchedSerializer, NoOpSerializer
from pyspark.storagelevel import StorageLevel
Expand Down
7 changes: 5 additions & 2 deletions python/pyspark/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
# limitations under the License.
#

import cProfile
try:
import cProfile as profiler
except:
import profile as profiler
import pstats
import os
import atexit
Expand Down Expand Up @@ -156,7 +159,7 @@ def __init__(self, ctx):

def profile(self, func):
""" Runs and profiles the method to_profile passed in. A profile object is returned. """
pr = cProfile.Profile()
pr = profiler.Profile()
pr.runcall(func)
st = pstats.Stats(pr)
st.stream = None # make it picklable
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
get_used_memory, ExternalSorter, ExternalGroupBy
from pyspark.traceback_utils import SCCallSiteSync

# Detect when running in Jython and skip py4j
is_jython = "JDK" in sys.version
if not is_jython:
from py4j.java_collections import ListConverter, MapConverter


__all__ = ["RDD"]

Expand Down
10 changes: 5 additions & 5 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,11 @@ def namedtuple(*args, **kwargs):
# hack the cls already generated by namedtuple
# those created in other module can be pickled as normal,
# so only hack those in __main__ module
for n, o in sys.modules["__main__"].__dict__.items():
if (type(o) is type and o.__base__ is tuple
and hasattr(o, "_fields")
and "__reduce__" not in o.__dict__):
_hack_namedtuple(o) # hack inplace
if "__main__" in sys.modules:
for n, o in sys.modules["__main__"].__dict__.items():
if (type(o) is type and o.__base__ is tuple and hasattr(o, "_fields")
and "__reduce__" not in o.__dict__):
_hack_namedtuple(o) # hack inplace


_hijack_namedtuple()
Expand Down
32 changes: 28 additions & 4 deletions python/pyspark/sql/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from pyspark import since
from pyspark.rdd import ignore_unicode_prefix
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.functions import UserDefinedFunction, UserDefinedJythonFunction
from pyspark.sql.types import IntegerType, StringType, StructType


Expand Down Expand Up @@ -195,22 +195,46 @@ def registerFunction(self, name, f, returnType=StringType()):
:param f: python function
:param returnType: a :class:`pyspark.sql.types.DataType` object

>>> spark.catalog.registerFunction("stringLengthString", lambda x: len(x))
>>> strLenStr = spark.catalog.registerFunction("stringLengthString", lambda x: len(x))
>>> spark.sql("SELECT stringLengthString('test')").collect()
[Row(stringLengthString(test)=u'4')]

>>> from pyspark.sql.types import IntegerType
>>> spark.catalog.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
>>> sli = spark.catalog.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
>>> spark.sql("SELECT stringLengthInt('test')").collect()
[Row(stringLengthInt(test)=4)]

>>> from pyspark.sql.types import IntegerType
>>> spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
>>> sli = spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
>>> spark.sql("SELECT stringLengthInt('test')").collect()
[Row(stringLengthInt(test)=4)]
"""
udf = UserDefinedFunction(f, returnType, name)
self._jsparkSession.udf().registerPython(name, udf._judf)
return udf

@since(2.1)
def registerJythonFunction(self, name, f, returnType=StringType()):
"""
Register a function to be executed using Jython on the workers.
The function passed in must either be a string containing your python lambda expression,
or if you have dill installed on the driver a lambda dill can extract the source for.
Note that not all Python code will execute in Jython and not all Python
code will execute well in Jython. However, for some UDFs, executing in Jython
may be faster as we can avoid copying the data from the JVM to the Python
executor.

This is a very experimental feature, and may be removed in future versions
once we figure out if it is a good idea or not.
...Note: Experimental

:param name: name of the UDF
:param f: String containing python lambda or python function
:param returnType: a :class:`DataType` object
"""
udf = UserDefinedJythonFunction(f, returnType, name)
self._jsparkSession.udf().registerJython(name, udf._judf)
return udf

@since(2.0)
def isCached(self, tableName):
Expand Down
41 changes: 37 additions & 4 deletions python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,21 +186,54 @@ def registerFunction(self, name, f, returnType=StringType()):
:param f: python function
:param returnType: a :class:`pyspark.sql.types.DataType` object

>>> sqlContext.registerFunction("stringLengthString", lambda x: len(x))
>>> sLs =sqlContext.registerFunction("stringLengthString", lambda x: len(x))
>>> sqlContext.sql("SELECT stringLengthString('test')").collect()
[Row(stringLengthString(test)=u'4')]
>>> df.select(sLs(df.field2)).take(1)
[Row(stringLengthString(field2)=u'4')]

>>> from pyspark.sql.types import IntegerType
>>> sqlContext.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
>>> sLi = sqlContext.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
>>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
[Row(stringLengthInt(test)=4)]

>>> from pyspark.sql.types import IntegerType
>>> sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
>>> sLi = sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
>>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
[Row(stringLengthInt(test)=4)]
"""
self.sparkSession.catalog.registerFunction(name, f, returnType)
return self.sparkSession.catalog.registerFunction(name, f, returnType)

@since(2.1)
def registerJythonFunction(self, name, f, returnType=StringType()):
"""
Register a function to be executed using Jython on the workers.
The function passed in must either be a string containing your python lambda expression,
or if you have dill installed on the driver a lambda dill can extract the source for.
Note that not all Python code will execute in Jython and not all Python
code will execute well in Jython. However, for some UDFs, executing in Jython
may be faster as we can avoid copying the data from the JVM to the Python
executor.

This is a very experimental feature, and may be removed in future versions
once we figure out if it is a good idea or not.
...Note: Experimental

:param name: name of the UDF
:param f: String containing python lambda or python function
:param returnType: a :class:`DataType` object

>>> from pyspark.sql.types import IntegerType
>>> add1 = sqlContext.registerJythonFunction("add1", "lambda x: x + 1", IntegerType())
>>> df.registerTempTable("magic")
>>> sqlContext.sql("SELECT add1(field1) FROM magic").collect()
[Row(add1(field1)=2), Row(add1(field1)=3), Row(add1(field1)=4)]


df.select(add1(df.field1)).collect()
[Row(add1(field1)=2), Row(add1(field1)=3), Row(add1(field1)=4)]
"""
return self.sparkSession.catalog.registerJythonFunction(name, f, returnType)

# TODO(andrew): delete this once we refactor things to take in SparkSession
def _inferSchema(self, rdd, samplingRatio=None):
Expand Down
Loading