diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index a392b29bb379..3f8a3a759545 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -171,7 +171,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None, - dropFieldIfAllNull=None, encoding=None, locale=None): + dropFieldIfAllNull=None, encoding=None, locale=None, recursiveFileLookup=None): """ Loads JSON files and returns the results as a :class:`DataFrame`. @@ -247,6 +247,10 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param locale: sets a locale as language tag in IETF BCP 47 format. If None is set, it uses the default value, ``en-US``. For instance, ``locale`` is used while parsing dates and timestamps. + :param recursiveFileLookup: recursively scan a directory for files. Using this option + disables `partition discovery`_. + + .. _partition discovery: /sql-data-sources-parquet.html#partition-discovery >>> df1 = spark.read.json('python/test_support/sql/people.json') >>> df1.dtypes @@ -266,7 +270,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, timestampFormat=timestampFormat, multiLine=multiLine, allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep, samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding, - locale=locale) + locale=locale, recursiveFileLookup=recursiveFileLookup) if isinstance(path, basestring): path = [path] if type(path) == list: @@ -300,9 +304,12 @@ def table(self, tableName): return self._df(self._jreader.table(tableName)) @since(1.4) - def parquet(self, *paths): + def parquet(self, *paths, **options): """Loads Parquet files, returning the result as a :class:`DataFrame`. + :param recursiveFileLookup: recursively scan a directory for files. Using this option + disables `partition discovery`_. + You can set the following Parquet-specific option(s) for reading Parquet files: * ``mergeSchema``: sets whether we should merge schemas collected from all \ Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \ @@ -312,11 +319,13 @@ def parquet(self, *paths): >>> df.dtypes [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')] """ + recursiveFileLookup = options.get('recursiveFileLookup', None) + self._set_opts(recursiveFileLookup=recursiveFileLookup) return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) @ignore_unicode_prefix @since(1.6) - def text(self, paths, wholetext=False, lineSep=None): + def text(self, paths, wholetext=False, lineSep=None, recursiveFileLookup=None): """ Loads text files and returns a :class:`DataFrame` whose schema starts with a string column named "value", and followed by partitioned columns if there @@ -329,6 +338,8 @@ def text(self, paths, wholetext=False, lineSep=None): :param wholetext: if true, read each file from input path(s) as a single row. :param lineSep: defines the line separator that should be used for parsing. If None is set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. + :param recursiveFileLookup: recursively scan a directory for files. Using this option + disables `partition discovery`_. >>> df = spark.read.text('python/test_support/sql/text-test.txt') >>> df.collect() @@ -337,7 +348,8 @@ def text(self, paths, wholetext=False, lineSep=None): >>> df.collect() [Row(value=u'hello\\nthis')] """ - self._set_opts(wholetext=wholetext, lineSep=lineSep) + self._set_opts( + wholetext=wholetext, lineSep=lineSep, recursiveFileLookup=recursiveFileLookup) if isinstance(paths, basestring): paths = [paths] return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths))) @@ -349,7 +361,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, - samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None): + samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None, + recursiveFileLookup=None): r"""Loads a CSV file and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -457,6 +470,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param lineSep: defines the line separator that should be used for parsing. If None is set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. Maximum length is 1 character. + :param recursiveFileLookup: recursively scan a directory for files. Using this option + disables `partition discovery`_. >>> df = spark.read.csv('python/test_support/sql/ages.csv') >>> df.dtypes @@ -476,7 +491,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine, charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio, - enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep) + enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep, + recursiveFileLookup=recursiveFileLookup) if isinstance(path, basestring): path = [path] if type(path) == list: @@ -504,13 +520,17 @@ def func(iterator): raise TypeError("path can be only string, list or RDD") @since(1.5) - def orc(self, path): + def orc(self, path, recursiveFileLookup=None): """Loads ORC files, returning the result as a :class:`DataFrame`. + :param recursiveFileLookup: recursively scan a directory for files. Using this option + disables `partition discovery`_. + >>> df = spark.read.orc('python/test_support/sql/orc_partitioned') >>> df.dtypes [('a', 'bigint'), ('b', 'int'), ('c', 'int')] """ + self._set_opts(recursiveFileLookup=recursiveFileLookup) if isinstance(path, basestring): path = [path] return self._df(self._jreader.orc(_to_seq(self._spark._sc, path))) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index f224526a4cc7..93b4c7895386 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -411,7 +411,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None, allowUnquotedControlChars=None, lineSep=None, locale=None, - dropFieldIfAllNull=None, encoding=None): + dropFieldIfAllNull=None, encoding=None, recursiveFileLookup=None): """ Loads a JSON file stream and returns the results as a :class:`DataFrame`. @@ -487,6 +487,10 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, the JSON files. For example UTF-16BE, UTF-32LE. If None is set, the encoding of input JSON will be detected automatically when the multiLine option is set to ``true``. + :param recursiveFileLookup: recursively scan a directory for files. Using this option + disables `partition discovery`_. + + .. _partition discovery: /sql-data-sources-parquet.html#partition-discovery >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema) >>> json_sdf.isStreaming @@ -502,33 +506,41 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, timestampFormat=timestampFormat, multiLine=multiLine, allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep, locale=locale, - dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding) + dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding, + recursiveFileLookup=recursiveFileLookup) if isinstance(path, basestring): return self._df(self._jreader.json(path)) else: raise TypeError("path can be only a single string") @since(2.3) - def orc(self, path): + def orc(self, path, recursiveFileLookup=None): """Loads a ORC file stream, returning the result as a :class:`DataFrame`. .. note:: Evolving. + :param recursiveFileLookup: recursively scan a directory for files. Using this option + disables `partition discovery`_. + >>> orc_sdf = spark.readStream.schema(sdf_schema).orc(tempfile.mkdtemp()) >>> orc_sdf.isStreaming True >>> orc_sdf.schema == sdf_schema True """ + self._set_opts(recursiveFileLookup=recursiveFileLookup) if isinstance(path, basestring): return self._df(self._jreader.orc(path)) else: raise TypeError("path can be only a single string") @since(2.0) - def parquet(self, path): + def parquet(self, path, recursiveFileLookup=None): """Loads a Parquet file stream, returning the result as a :class:`DataFrame`. + :param recursiveFileLookup: recursively scan a directory for files. Using this option + disables `partition discovery`_. + You can set the following Parquet-specific option(s) for reading Parquet files: * ``mergeSchema``: sets whether we should merge schemas collected from all \ Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \ @@ -542,6 +554,7 @@ def parquet(self, path): >>> parquet_sdf.schema == sdf_schema True """ + self._set_opts(recursiveFileLookup=recursiveFileLookup) if isinstance(path, basestring): return self._df(self._jreader.parquet(path)) else: @@ -549,7 +562,7 @@ def parquet(self, path): @ignore_unicode_prefix @since(2.0) - def text(self, path, wholetext=False, lineSep=None): + def text(self, path, wholetext=False, lineSep=None, recursiveFileLookup=None): """ Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a string column named "value", and followed by partitioned columns if there @@ -564,6 +577,8 @@ def text(self, path, wholetext=False, lineSep=None): :param wholetext: if true, read each file from input path(s) as a single row. :param lineSep: defines the line separator that should be used for parsing. If None is set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. + :param recursiveFileLookup: recursively scan a directory for files. Using this option + disables `partition discovery`_. >>> text_sdf = spark.readStream.text(tempfile.mkdtemp()) >>> text_sdf.isStreaming @@ -571,7 +586,8 @@ def text(self, path, wholetext=False, lineSep=None): >>> "value" in str(text_sdf.schema) True """ - self._set_opts(wholetext=wholetext, lineSep=lineSep) + self._set_opts( + wholetext=wholetext, lineSep=lineSep, recursiveFileLookup=recursiveFileLookup) if isinstance(path, basestring): return self._df(self._jreader.text(path)) else: @@ -584,7 +600,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, - enforceSchema=None, emptyValue=None, locale=None, lineSep=None): + enforceSchema=None, emptyValue=None, locale=None, lineSep=None, + recursiveFileLookup=None): r"""Loads a CSV file stream and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -687,6 +704,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param lineSep: defines the line separator that should be used for parsing. If None is set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. Maximum length is 1 character. + :param recursiveFileLookup: recursively scan a directory for files. Using this option + disables `partition discovery`_. >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema) >>> csv_sdf.isStreaming @@ -704,7 +723,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine, charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema, - emptyValue=emptyValue, locale=locale, lineSep=lineSep) + emptyValue=emptyValue, locale=locale, lineSep=lineSep, + recursiveFileLookup=recursiveFileLookup) if isinstance(path, basestring): return self._df(self._jreader.csv(path)) else: