diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 258c769ff593..fca11e1eb7f5 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1640,6 +1640,138 @@ Configuration of Hive is done by placing your `hive-site.xml`, `core-site.xml` a
You may run `./bin/spark-sql --help` for a complete list of all available
options.
+# PySpark Usage Guide for Pandas with Apache Arrow
+
+## Apache Arrow in Spark
+
+Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer
+data between JVM and Python processes. This currently is most beneficial to Python users that
+work with Pandas/NumPy data. Its usage is not automatic and might require some minor
+changes to configuration or code to take full advantage and ensure compatibility. This guide will
+give a high-level description of how to use Arrow in Spark and highlight any differences when
+working with Arrow-enabled data.
+
+### Ensure PyArrow Installed
+
+If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the
+SQL module with the command `pip install pyspark[sql]`. Otherwise, you must ensure that PyArrow
+is installed and available on all cluster nodes. The current supported version is 0.8.0.
+You can install using pip or conda from the conda-forge channel. See PyArrow
+[installation](https://arrow.apache.org/docs/python/install.html) for details.
+
+## Enabling for Conversion to/from Pandas
+
+Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame
+using the call `toPandas()` and when creating a Spark DataFrame from a Pandas DataFrame with
+`createDataFrame(pandas_df)`. To use Arrow when executing these calls, users need to first set
+the Spark configuration 'spark.sql.execution.arrow.enabled' to 'true'. This is disabled by default.
+
+
+
+{% include_example dataframe_with_arrow python/sql/arrow.py %}
+
+
+
+Using the above optimizations with Arrow will produce the same results as when Arrow is not
+enabled. Note that even with Arrow, `toPandas()` results in the collection of all records in the
+DataFrame to the driver program and should be done on a small subset of the data. Not all Spark
+data types are currently supported and an error can be raised if a column has an unsupported type,
+see [Supported Types](#supported-sql-arrow-types). If an error occurs during `createDataFrame()`,
+Spark will fall back to create the DataFrame without Arrow.
+
+## Pandas UDFs (a.k.a. Vectorized UDFs)
+
+Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and
+Pandas to work with the data. A Pandas UDF is defined using the keyword `pandas_udf` as a decorator
+or to wrap the function, no additional configuration is required. Currently, there are two types of
+Pandas UDF: Scalar and Group Map.
+
+### Scalar
+
+Scalar Pandas UDFs are used for vectorizing scalar operations. They can be used with functions such
+as `select` and `withColumn`. The Python function should take `pandas.Series` as inputs and return
+a `pandas.Series` of the same length. Internally, Spark will execute a Pandas UDF by splitting
+columns into batches and calling the function for each batch as a subset of the data, then
+concatenating the results together.
+
+The following example shows how to create a scalar Pandas UDF that computes the product of 2 columns.
+
+
+
+{% include_example scalar_pandas_udf python/sql/arrow.py %}
+
+
+
+### Group Map
+Group map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern.
+Split-apply-combine consists of three steps:
+* Split the data into groups by using `DataFrame.groupBy`.
+* Apply a function on each group. The input and output of the function are both `pandas.DataFrame`. The
+ input data contains all the rows and columns for each group.
+* Combine the results into a new `DataFrame`.
+
+To use `groupBy().apply()`, the user needs to define the following:
+* A Python function that defines the computation for each group.
+* A `StructType` object or a string that defines the schema of the output `DataFrame`.
+
+Note that all data for a group will be loaded into memory before the function is applied. This can
+lead to out of memory exceptons, especially if the group sizes are skewed. The configuration for
+[maxRecordsPerBatch](#setting-arrow-batch-size) is not applied on groups and it is up to the user
+to ensure that the grouped data will fit into the available memory.
+
+The following example shows how to use `groupby().apply()` to subtract the mean from each value in the group.
+
+
+
+{% include_example group_map_pandas_udf python/sql/arrow.py %}
+
+
+
+For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) and
+[`pyspark.sql.GroupedData.apply`](api/python/pyspark.sql.html#pyspark.sql.GroupedData.apply).
+
+## Usage Notes
+
+### Supported SQL Types
+
+Currently, all Spark SQL data types are supported by Arrow-based conversion except `MapType`,
+`ArrayType` of `TimestampType`, and nested `StructType`.
+
+### Setting Arrow Batch Size
+
+Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to
+high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow
+record batches can be adjusted by setting the conf "spark.sql.execution.arrow.maxRecordsPerBatch"
+to an integer that will determine the maximum number of rows for each batch. The default value is
+10,000 records per batch. If the number of columns is large, the value should be adjusted
+accordingly. Using this limit, each data partition will be made into 1 or more record batches for
+processing.
+
+### Timestamp with Time Zone Semantics
+
+Spark internally stores timestamps as UTC values, and timestamp data that is brought in without
+a specified time zone is converted as local time to UTC with microsecond resolution. When timestamp
+data is exported or displayed in Spark, the session time zone is used to localize the timestamp
+values. The session time zone is set with the configuration 'spark.sql.session.timeZone' and will
+default to the JVM system local time zone if not set. Pandas uses a `datetime64` type with nanosecond
+resolution, `datetime64[ns]`, with optional time zone on a per-column basis.
+
+When timestamp data is transferred from Spark to Pandas it will be converted to nanoseconds
+and each column will be converted to the Spark session time zone then localized to that time
+zone, which removes the time zone and displays values as local time. This will occur
+when calling `toPandas()` or `pandas_udf` with timestamp columns.
+
+When timestamp data is transferred from Pandas to Spark, it will be converted to UTC microseconds. This
+occurs when calling `createDataFrame` with a Pandas DataFrame or when returning a timestamp from a
+`pandas_udf`. These conversions are done automatically to ensure Spark will have data in the
+expected format, so it is not necessary to do any of these conversions yourself. Any nanosecond
+values will be truncated.
+
+Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects, which is
+different than a Pandas timestamp. It is recommended to use Pandas time series functionality when
+working with timestamps in `pandas_udf`s to get the best performance, see
+[here](https://pandas.pydata.org/pandas-docs/stable/timeseries.html) for details.
+
# Migration Guide
## Upgrading From Spark SQL 2.2 to 2.3
@@ -1788,7 +1920,7 @@ options.
Note that, for DecimalType(38,0)*, the table above intentionally does not cover all other combinations of scales and precisions because currently we only infer decimal type like `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type.
- In PySpark, now we need Pandas 0.19.2 or upper if you want to use Pandas related functionalities, such as `toPandas`, `createDataFrame` from Pandas DataFrame, etc.
- In PySpark, the behavior of timestamp values for Pandas related functionalities was changed to respect session timezone. If you want to use the old behavior, you need to set a configuration `spark.sql.execution.pandas.respectSessionTimeZone` to `False`. See [SPARK-22395](https://issues.apache.org/jira/browse/SPARK-22395) for details.
- - In PySpark, `na.fill()` or `fillna` also accepts boolean and replaces nulls with booleans. In prior Spark versions, PySpark just ignores it and returns the original Dataset/DataFrame.
+ - In PySpark, `na.fill()` or `fillna` also accepts boolean and replaces nulls with booleans. In prior Spark versions, PySpark just ignores it and returns the original Dataset/DataFrame.
- Since Spark 2.3, when either broadcast hash join or broadcast nested loop join is applicable, we prefer to broadcasting the table that is explicitly specified in a broadcast hint. For details, see the section [Broadcast Hint](#broadcast-hint-for-sql-queries) and [SPARK-22489](https://issues.apache.org/jira/browse/SPARK-22489).
- Since Spark 2.3, when all inputs are binary, `functions.concat()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.concatBinaryAsString` to `true`.
- Since Spark 2.3, when all inputs are binary, SQL `elt()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.eltOutputAsString` to `true`.
diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py
new file mode 100644
index 000000000000..6c0028b3f1c1
--- /dev/null
+++ b/examples/src/main/python/sql/arrow.py
@@ -0,0 +1,129 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A simple example demonstrating Arrow in Spark.
+Run with:
+ ./bin/spark-submit examples/src/main/python/sql/arrow.py
+"""
+
+from __future__ import print_function
+
+from pyspark.sql import SparkSession
+from pyspark.sql.utils import require_minimum_pandas_version, require_minimum_pyarrow_version
+
+require_minimum_pandas_version()
+require_minimum_pyarrow_version()
+
+
+def dataframe_with_arrow_example(spark):
+ # $example on:dataframe_with_arrow$
+ import numpy as np
+ import pandas as pd
+
+ # Enable Arrow-based columnar data transfers
+ spark.conf.set("spark.sql.execution.arrow.enabled", "true")
+
+ # Generate a Pandas DataFrame
+ pdf = pd.DataFrame(np.random.rand(100, 3))
+
+ # Create a Spark DataFrame from a Pandas DataFrame using Arrow
+ df = spark.createDataFrame(pdf)
+
+ # Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
+ result_pdf = df.select("*").toPandas()
+ # $example off:dataframe_with_arrow$
+ print("Pandas DataFrame result statistics:\n%s\n" % str(result_pdf.describe()))
+
+
+def scalar_pandas_udf_example(spark):
+ # $example on:scalar_pandas_udf$
+ import pandas as pd
+
+ from pyspark.sql.functions import col, pandas_udf
+ from pyspark.sql.types import LongType
+
+ # Declare the function and create the UDF
+ def multiply_func(a, b):
+ return a * b
+
+ multiply = pandas_udf(multiply_func, returnType=LongType())
+
+ # The function for a pandas_udf should be able to execute with local Pandas data
+ x = pd.Series([1, 2, 3])
+ print(multiply_func(x, x))
+ # 0 1
+ # 1 4
+ # 2 9
+ # dtype: int64
+
+ # Create a Spark DataFrame, 'spark' is an existing SparkSession
+ df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
+
+ # Execute function as a Spark vectorized UDF
+ df.select(multiply(col("x"), col("x"))).show()
+ # +-------------------+
+ # |multiply_func(x, x)|
+ # +-------------------+
+ # | 1|
+ # | 4|
+ # | 9|
+ # +-------------------+
+ # $example off:scalar_pandas_udf$
+
+
+def group_map_pandas_udf_example(spark):
+ # $example on:group_map_pandas_udf$
+ from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+ df = spark.createDataFrame(
+ [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+ ("id", "v"))
+
+ @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)
+ def substract_mean(pdf):
+ # pdf is a pandas.DataFrame
+ v = pdf.v
+ return pdf.assign(v=v - v.mean())
+
+ df.groupby("id").apply(substract_mean).show()
+ # +---+----+
+ # | id| v|
+ # +---+----+
+ # | 1|-0.5|
+ # | 1| 0.5|
+ # | 2|-3.0|
+ # | 2|-1.0|
+ # | 2| 4.0|
+ # +---+----+
+ # $example off:group_map_pandas_udf$
+
+
+if __name__ == "__main__":
+ spark = SparkSession \
+ .builder \
+ .appName("Python Arrow-in-Spark example") \
+ .getOrCreate()
+
+ print("Running Pandas to/from conversion example")
+ dataframe_with_arrow_example(spark)
+ print("Running pandas_udf scalar example")
+ scalar_pandas_udf_example(spark)
+ print("Running pandas_udf group map example")
+ group_map_pandas_udf_example(spark)
+
+ spark.stop()