Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None,
dropFieldIfAllNull=None, encoding=None, locale=None):
dropFieldIfAllNull=None, encoding=None, locale=None, recursiveFileLookup=None):
"""
Loads JSON files and returns the results as a :class:`DataFrame`.

Expand Down Expand Up @@ -247,6 +247,10 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param locale: sets a locale as language tag in IETF BCP 47 format. If None is set,
it uses the default value, ``en-US``. For instance, ``locale`` is used while
parsing dates and timestamps.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.

.. _partition discovery: /sql-data-sources-parquet.html#partition-discovery

>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
Expand All @@ -266,7 +270,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
timestampFormat=timestampFormat, multiLine=multiLine,
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep,
samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
locale=locale)
locale=locale, recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
Expand Down Expand Up @@ -300,9 +304,12 @@ def table(self, tableName):
return self._df(self._jreader.table(tableName))

@since(1.4)
def parquet(self, *paths):
def parquet(self, *paths, **options):
Copy link
Contributor Author

@nchammas nchammas Nov 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To support Python 2, we need to say **options instead of recursiveFileLookup=None because Python 2 doesn't support keyword-only arguments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems fine.

"""Loads Parquet files, returning the result as a :class:`DataFrame`.

:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.

You can set the following Parquet-specific option(s) for reading Parquet files:
* ``mergeSchema``: sets whether we should merge schemas collected from all \
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
Expand All @@ -312,11 +319,13 @@ def parquet(self, *paths):
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
"""
recursiveFileLookup = options.get('recursiveFileLookup', None)
self._set_opts(recursiveFileLookup=recursiveFileLookup)
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))

@ignore_unicode_prefix
@since(1.6)
def text(self, paths, wholetext=False, lineSep=None):
def text(self, paths, wholetext=False, lineSep=None, recursiveFileLookup=None):
"""
Loads text files and returns a :class:`DataFrame` whose schema starts with a
string column named "value", and followed by partitioned columns if there
Expand All @@ -329,6 +338,8 @@ def text(self, paths, wholetext=False, lineSep=None):
:param wholetext: if true, read each file from input path(s) as a single row.
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.

>>> df = spark.read.text('python/test_support/sql/text-test.txt')
>>> df.collect()
Expand All @@ -337,7 +348,8 @@ def text(self, paths, wholetext=False, lineSep=None):
>>> df.collect()
[Row(value=u'hello\\nthis')]
"""
self._set_opts(wholetext=wholetext, lineSep=lineSep)
self._set_opts(
wholetext=wholetext, lineSep=lineSep, recursiveFileLookup=recursiveFileLookup)
if isinstance(paths, basestring):
paths = [paths]
return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))
Expand All @@ -349,7 +361,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None):
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
recursiveFileLookup=None):
r"""Loads a CSV file and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -457,6 +470,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
Maximum length is 1 character.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.

>>> df = spark.read.csv('python/test_support/sql/ages.csv')
>>> df.dtypes
Expand All @@ -476,7 +491,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep)
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep,
recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
Expand Down Expand Up @@ -504,13 +520,17 @@ def func(iterator):
raise TypeError("path can be only string, list or RDD")

@since(1.5)
def orc(self, path):
def orc(self, path, recursiveFileLookup=None):
"""Loads ORC files, returning the result as a :class:`DataFrame`.

:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.

>>> df = spark.read.orc('python/test_support/sql/orc_partitioned')
>>> df.dtypes
[('a', 'bigint'), ('b', 'int'), ('c', 'int')]
"""
self._set_opts(recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
path = [path]
return self._df(self._jreader.orc(_to_seq(self._spark._sc, path)))
Expand Down
36 changes: 28 additions & 8 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
multiLine=None, allowUnquotedControlChars=None, lineSep=None, locale=None,
dropFieldIfAllNull=None, encoding=None):
dropFieldIfAllNull=None, encoding=None, recursiveFileLookup=None):
"""
Loads a JSON file stream and returns the results as a :class:`DataFrame`.

Expand Down Expand Up @@ -487,6 +487,10 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
the JSON files. For example UTF-16BE, UTF-32LE. If None is set,
the encoding of input JSON will be detected automatically
when the multiLine option is set to ``true``.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.

.. _partition discovery: /sql-data-sources-parquet.html#partition-discovery

>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
>>> json_sdf.isStreaming
Expand All @@ -502,33 +506,41 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
timestampFormat=timestampFormat, multiLine=multiLine,
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep, locale=locale,
dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding)
dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
return self._df(self._jreader.json(path))
else:
raise TypeError("path can be only a single string")

@since(2.3)
def orc(self, path):
def orc(self, path, recursiveFileLookup=None):
"""Loads a ORC file stream, returning the result as a :class:`DataFrame`.

.. note:: Evolving.

:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.

>>> orc_sdf = spark.readStream.schema(sdf_schema).orc(tempfile.mkdtemp())
>>> orc_sdf.isStreaming
True
>>> orc_sdf.schema == sdf_schema
True
"""
self._set_opts(recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
return self._df(self._jreader.orc(path))
else:
raise TypeError("path can be only a single string")

@since(2.0)
def parquet(self, path):
def parquet(self, path, recursiveFileLookup=None):
"""Loads a Parquet file stream, returning the result as a :class:`DataFrame`.

:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.

You can set the following Parquet-specific option(s) for reading Parquet files:
* ``mergeSchema``: sets whether we should merge schemas collected from all \
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
Expand All @@ -542,14 +554,15 @@ def parquet(self, path):
>>> parquet_sdf.schema == sdf_schema
True
"""
self._set_opts(recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
return self._df(self._jreader.parquet(path))
else:
raise TypeError("path can be only a single string")

@ignore_unicode_prefix
@since(2.0)
def text(self, path, wholetext=False, lineSep=None):
def text(self, path, wholetext=False, lineSep=None, recursiveFileLookup=None):
"""
Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a
string column named "value", and followed by partitioned columns if there
Expand All @@ -564,14 +577,17 @@ def text(self, path, wholetext=False, lineSep=None):
:param wholetext: if true, read each file from input path(s) as a single row.
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.

>>> text_sdf = spark.readStream.text(tempfile.mkdtemp())
>>> text_sdf.isStreaming
True
>>> "value" in str(text_sdf.schema)
True
"""
self._set_opts(wholetext=wholetext, lineSep=lineSep)
self._set_opts(
wholetext=wholetext, lineSep=lineSep, recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
return self._df(self._jreader.text(path))
else:
Expand All @@ -584,7 +600,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
enforceSchema=None, emptyValue=None, locale=None, lineSep=None):
enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
recursiveFileLookup=None):
r"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -687,6 +704,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
Maximum length is 1 character.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.

>>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
>>> csv_sdf.isStreaming
Expand All @@ -704,7 +723,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema,
emptyValue=emptyValue, locale=locale, lineSep=lineSep)
emptyValue=emptyValue, locale=locale, lineSep=lineSep,
recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
else:
Expand Down