Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 35 additions & 32 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,9 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
def orc(self, path):
"""Loads an ORC file, returning the result as a :class:`DataFrame`.

::Note: Currently ORC support is only available together with
:class:`HiveContext`.
.. note:: Currently ORC support is only available together with Hive support.

>>> df = hiveContext.read.orc('python/test_support/sql/orc_partitioned')
>>> df = spark.read.orc('python/test_support/sql/orc_partitioned')
>>> df.dtypes
[('a', 'bigint'), ('b', 'int'), ('c', 'int')]
"""
Expand All @@ -415,28 +414,31 @@ def orc(self, path):
def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None,
predicates=None, properties=None):
"""
Construct a :class:`DataFrame` representing the database table accessible
via JDBC URL `url` named `table` and connection `properties`.
Construct a :class:`DataFrame` representing the database table named ``table``
accessible via JDBC URL ``url`` and connection ``properties``.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single backticks in Sphinx just make things italic. e.g. See the current doc.

The `column` parameter could be used to partition the table, then it will
be retrieved in parallel based on the parameters passed to this function.
Partitions of the table will be retrieved in parallel if either ``column`` or
``predicates`` is specified.

The `predicates` parameter gives a list expressions suitable for inclusion
in WHERE clauses; each one defines one partition of the :class:`DataFrame`.
If both ``column`` and ``predicates`` are specified, ``column`` will be used.

::Note: Don't create too many partitions in parallel on a large cluster;
.. note:: Don't create too many partitions in parallel on a large cluster; \
otherwise Spark might crash your external database systems.

:param url: a JDBC URL
:param table: name of table
:param column: the column used to partition
:param lowerBound: the lower bound of partition column
:param upperBound: the upper bound of the partition column
:param url: a JDBC URL of the form ``jdbc:subprotocol:subname``
:param table: the name of the table
:param column: the name of an integer column that will be used for partitioning;
if this parameter is specified, then ``numPartitions``, ``lowerBound``
(inclusive), and ``upperBound`` (exclusive) will form partition strides
for generated WHERE clause expressions used to split the column
``column`` evenly
:param lowerBound: the minimum value of ``column`` used to decide partition stride
:param upperBound: the maximum value of ``column`` used to decide partition stride
:param numPartitions: the number of partitions
:param predicates: a list of expressions
:param properties: JDBC database connection arguments, a list of arbitrary string
tag/value. Normally at least a "user" and "password" property
should be included.
:param predicates: a list of expressions suitable for inclusion in WHERE clauses;
each one defines one partition of the :class:`DataFrame`
:param properties: a dictionary of JDBC database connection arguments; normally,
at least a "user" and "password" property should be included
:return: a DataFrame
"""
if properties is None:
Expand Down Expand Up @@ -538,7 +540,7 @@ def partitionBy(self, *cols):
def queryName(self, queryName):
"""Specifies the name of the :class:`ContinuousQuery` that can be started with
:func:`startStream`. This name must be unique among all the currently active queries
in the associated spark
in the associated SparkSession.

.. note:: Experimental.

Expand Down Expand Up @@ -808,8 +810,7 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
def orc(self, path, mode=None, partitionBy=None, compression=None):
"""Saves the content of the :class:`DataFrame` in ORC format at the specified path.

::Note: Currently ORC support is only available together with
:class:`HiveContext`.
.. note:: Currently ORC support is only available together with Hive support.

:param path: the path in any Hadoop supported file system
:param mode: specifies the behavior of the save operation when data already exists.
Expand All @@ -823,7 +824,7 @@ def orc(self, path, mode=None, partitionBy=None, compression=None):
known case-insensitive shorten names (none, snappy, zlib, and lzo).
This will overwrite ``orc.compress``.

>>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned')
>>> orc_df = spark.read.orc('python/test_support/sql/orc_partitioned')
>>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
Expand All @@ -837,7 +838,7 @@ def orc(self, path, mode=None, partitionBy=None, compression=None):
def jdbc(self, url, table, mode=None, properties=None):
"""Saves the content of the :class:`DataFrame` to a external database table via JDBC.

.. note:: Don't create too many partitions in parallel on a large cluster;\
.. note:: Don't create too many partitions in parallel on a large cluster; \
otherwise Spark might crash your external database systems.

:param url: a JDBC URL of the form ``jdbc:subprotocol:subname``
Expand All @@ -864,30 +865,32 @@ def _test():
import doctest
import os
import tempfile
import py4j
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, Row, HiveContext
from pyspark.sql import SparkSession, Row
import pyspark.sql.readwriter

os.chdir(os.environ["SPARK_HOME"])

globs = pyspark.sql.readwriter.__dict__.copy()
sc = SparkContext('local[4]', 'PythonTest')
try:
spark = SparkSession.withHiveSupport(sc)
except py4j.protocol.Py4JError:
spark = SparkSession(sc)

globs['tempfile'] = tempfile
globs['os'] = os
globs['sc'] = sc
globs['spark'] = SparkSession.builder\
.enableHiveSupport()\
.getOrCreate()
globs['hiveContext'] = HiveContext._createForTesting(sc)
globs['df'] = globs['spark'].read.parquet('python/test_support/sql/parquet_partitioned')
globs['spark'] = spark
globs['df'] = spark.read.parquet('python/test_support/sql/parquet_partitioned')
globs['sdf'] = \
globs['spark'].read.format('text').stream('python/test_support/sql/streaming')
spark.read.format('text').stream('python/test_support/sql/streaming')

(failure_count, test_count) = doctest.testmod(
pyspark.sql.readwriter, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
globs['sc'].stop()
sc.stop()
if failure_count:
exit(-1)

Expand Down