diff --git a/python/pyspark/sql/connect/streaming/readwriter.py b/python/pyspark/sql/connect/streaming/readwriter.py index b266f485c96c4..957e273078517 100644 --- a/python/pyspark/sql/connect/streaming/readwriter.py +++ b/python/pyspark/sql/connect/streaming/readwriter.py @@ -168,9 +168,75 @@ def json( json.__doc__ = PySparkDataStreamReader.json.__doc__ - # def orc() TODO - # def parquet() TODO - # def text() TODO + def orc( + self, + path: str, + mergeSchema: Optional[bool] = None, + pathGlobFilter: Optional[Union[bool, str]] = None, + recursiveFileLookup: Optional[Union[bool, str]] = None, + ) -> "DataFrame": + self._set_opts( + mergeSchema=mergeSchema, + pathGlobFilter=pathGlobFilter, + recursiveFileLookup=recursiveFileLookup, + ) + if isinstance(path, str): + return self.load(path=path, format="orc") + else: + raise TypeError("path can be only a single string") + + orc.__doc__ = PySparkDataStreamReader.orc.__doc__ + + def parquet( + self, + path: str, + mergeSchema: Optional[bool] = None, + pathGlobFilter: Optional[Union[bool, str]] = None, + recursiveFileLookup: Optional[Union[bool, str]] = None, + datetimeRebaseMode: Optional[Union[bool, str]] = None, + int96RebaseMode: Optional[Union[bool, str]] = None, + ) -> "DataFrame": + self._set_opts( + mergeSchema=mergeSchema, + pathGlobFilter=pathGlobFilter, + recursiveFileLookup=recursiveFileLookup, + datetimeRebaseMode=datetimeRebaseMode, + int96RebaseMode=int96RebaseMode, + ) + self._set_opts( + mergeSchema=mergeSchema, + pathGlobFilter=pathGlobFilter, + recursiveFileLookup=recursiveFileLookup, + datetimeRebaseMode=datetimeRebaseMode, + int96RebaseMode=int96RebaseMode, + ) + if isinstance(path, str): + return self.load(path=path, format="parquet") + else: + raise TypeError("path can be only a single string") + + parquet.__doc__ = PySparkDataStreamReader.parquet.__doc__ + + def text( + self, + path: str, + wholetext: bool = False, + lineSep: Optional[str] = None, + pathGlobFilter: Optional[Union[bool, str]] = None, + recursiveFileLookup: Optional[Union[bool, str]] = None, + ) -> "DataFrame": + self._set_opts( + wholetext=wholetext, + lineSep=lineSep, + pathGlobFilter=pathGlobFilter, + recursiveFileLookup=recursiveFileLookup, + ) + if isinstance(path, str): + return self.load(path=path, format="text") + else: + raise TypeError("path can be only a single string") + + text.__doc__ = PySparkDataStreamReader.text.__doc__ def csv( self, @@ -245,7 +311,7 @@ def csv( csv.__doc__ = PySparkDataStreamReader.csv.__doc__ - # def table() TODO. Use Read(table_name) relation. + # def table() TODO(SPARK-43042). Use Read(table_name) relation. DataStreamReader.__doc__ = PySparkDataStreamReader.__doc__