diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py index 2e2e5540bd4b..05dd6d15eec6 100644 --- a/python/pyspark/pandas/groupby.py +++ b/python/pyspark/pandas/groupby.py @@ -607,7 +607,7 @@ def quantile(self, q: float = 0.5, accuracy: int = 10000) -> FrameLike: ------- `quantile` in pandas-on-Spark are using distributed percentile approximation algorithm unlike pandas, the result might different with pandas, also - `interpolation` parameters are not supported yet. + `interpolation` parameter is not supported yet. See Also -------- diff --git a/python/pyspark/pandas/missing/window.py b/python/pyspark/pandas/missing/window.py index 31684e43ccf2..a6d423d08f1c 100644 --- a/python/pyspark/pandas/missing/window.py +++ b/python/pyspark/pandas/missing/window.py @@ -82,7 +82,6 @@ class MissingPandasLikeExpanding: corr = _unsupported_function_expanding("corr") cov = _unsupported_function_expanding("cov") median = _unsupported_function_expanding("median") - quantile = _unsupported_function_expanding("quantile") validate = _unsupported_function_expanding("validate") exclusions = _unsupported_property_expanding("exclusions") @@ -101,7 +100,6 @@ class MissingPandasLikeRolling: corr = _unsupported_function_rolling("corr") cov = _unsupported_function_rolling("cov") median = _unsupported_function_rolling("median") - quantile = _unsupported_function_rolling("quantile") validate = _unsupported_function_rolling("validate") exclusions = _unsupported_property_rolling("exclusions") @@ -120,7 +118,6 @@ class MissingPandasLikeExpandingGroupby: corr = _unsupported_function_expanding("corr") cov = _unsupported_function_expanding("cov") median = _unsupported_function_expanding("median") - quantile = _unsupported_function_expanding("quantile") validate = _unsupported_function_expanding("validate") exclusions = _unsupported_property_expanding("exclusions") @@ -139,7 +136,6 @@ class MissingPandasLikeRollingGroupby: corr = _unsupported_function_rolling("corr") cov = _unsupported_function_rolling("cov") median = _unsupported_function_rolling("median") - quantile = _unsupported_function_rolling("quantile") validate = _unsupported_function_rolling("validate") exclusions = _unsupported_property_rolling("exclusions") diff --git a/python/pyspark/pandas/tests/test_expanding.py b/python/pyspark/pandas/tests/test_expanding.py index aeb0e9f297bc..77ced41eb8cb 100644 --- a/python/pyspark/pandas/tests/test_expanding.py +++ b/python/pyspark/pandas/tests/test_expanding.py @@ -82,6 +82,9 @@ def test_expanding_max(self): def test_expanding_mean(self): self._test_expanding_func("mean") + def test_expanding_quantile(self): + self._test_expanding_func(lambda x: x.quantile(0.5), lambda x: x.quantile(0.5, "lower")) + def test_expanding_sum(self): self._test_expanding_func("sum") @@ -212,6 +215,11 @@ def test_groupby_expanding_max(self): def test_groupby_expanding_mean(self): self._test_groupby_expanding_func("mean") + def test_groupby_expanding_quantile(self): + self._test_groupby_expanding_func( + lambda x: x.quantile(0.5), lambda x: x.quantile(0.5, "lower") + ) + def test_groupby_expanding_sum(self): self._test_groupby_expanding_func("sum") diff --git a/python/pyspark/pandas/tests/test_rolling.py b/python/pyspark/pandas/tests/test_rolling.py index 3f92eba79ce9..be21bf16d409 100644 --- a/python/pyspark/pandas/tests/test_rolling.py +++ b/python/pyspark/pandas/tests/test_rolling.py @@ -79,6 +79,9 @@ def test_rolling_max(self): def test_rolling_mean(self): self._test_rolling_func("mean") + def test_rolling_quantile(self): + self._test_rolling_func(lambda x: x.quantile(0.5), lambda x: x.quantile(0.5, "lower")) + def test_rolling_sum(self): self._test_rolling_func("sum") @@ -212,6 +215,11 @@ def test_groupby_rolling_max(self): def test_groupby_rolling_mean(self): self._test_groupby_rolling_func("mean") + def test_groupby_rolling_quantile(self): + self._test_groupby_rolling_func( + lambda x: x.quantile(0.5), lambda x: x.quantile(0.5, "lower") + ) + def test_groupby_rolling_sum(self): self._test_groupby_rolling_func("sum") diff --git a/python/pyspark/pandas/window.py b/python/pyspark/pandas/window.py index 2808f72fd3c1..274000cbb2cd 100644 --- a/python/pyspark/pandas/window.py +++ b/python/pyspark/pandas/window.py @@ -40,6 +40,9 @@ from pyspark.pandas.spark import functions as SF from pyspark.pandas.utils import scol_for from pyspark.sql.column import Column +from pyspark.sql.types import ( + DoubleType, +) from pyspark.sql.window import WindowSpec @@ -101,6 +104,15 @@ def mean(scol: Column) -> Column: return self._apply_as_series_or_frame(mean) + def quantile(self, q: float, accuracy: int = 10000) -> FrameLike: + def quantile(scol: Column) -> Column: + return F.when( + F.row_number().over(self._unbounded_window) >= self._min_periods, + F.percentile_approx(scol.cast(DoubleType()), q, accuracy).over(self._window), + ).otherwise(SF.lit(None)) + + return self._apply_as_series_or_frame(quantile) + def std(self) -> FrameLike: def std(scol: Column) -> Column: return F.when( @@ -561,6 +573,101 @@ def mean(self) -> FrameLike: """ return super().mean() + def quantile(self, quantile: float, accuracy: int = 10000) -> FrameLike: + """ + Calculate the rolling quantile of the values. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + quantile : float + Value between 0 and 1 providing the quantile to compute. + accuracy : int, optional + Default accuracy of approximation. Larger value means better accuracy. + The relative error can be deduced by 1.0 / accuracy. + This is a panda-on-Spark specific parameter. + + Returns + ------- + Series or DataFrame + Returned object type is determined by the caller of the rolling + calculation. + + Notes + ----- + `quantile` in pandas-on-Spark are using distributed percentile approximation + algorithm unlike pandas, the result might different with pandas, also `interpolation` + parameter is not supported yet. + + the current implementation of this API uses Spark's Window without + specifying partition specification. This leads to move all data into + single partition in single machine and could cause serious + performance degradation. Avoid this method against very large dataset. + + See Also + -------- + pyspark.pandas.Series.rolling : Calling rolling with Series data. + pyspark.pandas.DataFrame.rolling : Calling rolling with DataFrames. + pyspark.pandas.Series.quantile : Aggregating quantile for Series. + pyspark.pandas.DataFrame.quantile : Aggregating quantile for DataFrame. + + Examples + -------- + >>> s = ps.Series([4, 3, 5, 2, 6]) + >>> s + 0 4 + 1 3 + 2 5 + 3 2 + 4 6 + dtype: int64 + + >>> s.rolling(2).quantile(0.5) + 0 NaN + 1 3.0 + 2 3.0 + 3 2.0 + 4 2.0 + dtype: float64 + + >>> s.rolling(3).quantile(0.5) + 0 NaN + 1 NaN + 2 4.0 + 3 3.0 + 4 5.0 + dtype: float64 + + For DataFrame, each rolling quantile is computed column-wise. + + >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) + >>> df + A B + 0 4 16 + 1 3 9 + 2 5 25 + 3 2 4 + 4 6 36 + + >>> df.rolling(2).quantile(0.5) + A B + 0 NaN NaN + 1 3.0 9.0 + 2 3.0 9.0 + 3 2.0 4.0 + 4 2.0 4.0 + + >>> df.rolling(3).quantile(0.5) + A B + 0 NaN NaN + 1 NaN NaN + 2 4.0 16.0 + 3 3.0 9.0 + 4 5.0 25.0 + """ + return super().quantile(quantile, accuracy) + def std(self) -> FrameLike: """ Calculate rolling standard deviation. @@ -1136,6 +1243,77 @@ def mean(self) -> FrameLike: """ return super().mean() + def quantile(self, quantile: float, accuracy: int = 10000) -> FrameLike: + """ + Calculate rolling quantile. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + quantile : float + Value between 0 and 1 providing the quantile to compute. + accuracy : int, optional + Default accuracy of approximation. Larger value means better accuracy. + The relative error can be deduced by 1.0 / accuracy. + This is a panda-on-Spark specific parameter. + + Returns + ------- + Series or DataFrame + Returned object type is determined by the caller of the rolling + calculation. + + Notes + ----- + `quantile` in pandas-on-Spark are using distributed percentile approximation + algorithm unlike pandas, the result might different with pandas, also `interpolation` + parameter is not supported yet. + + See Also + -------- + pyspark.pandas.Series.rolling : Calling rolling with Series data. + pyspark.pandas.DataFrame.rolling : Calling rolling with DataFrames. + pyspark.pandas.Series.quantile : Aggregating quantile for Series. + pyspark.pandas.DataFrame.quantile : Aggregating quantile for DataFrame. + + Examples + -------- + >>> s = ps.Series([2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5]) + >>> s.groupby(s).rolling(3).quantile(0.5).sort_index() + 2 0 NaN + 1 NaN + 3 2 NaN + 3 NaN + 4 3.0 + 4 5 NaN + 6 NaN + 7 4.0 + 8 4.0 + 5 9 NaN + 10 NaN + dtype: float64 + + For DataFrame, each rolling quantile is computed column-wise. + + >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) + >>> df.groupby(df.A).rolling(2).quantile(0.5).sort_index() + B + A + 2 0 NaN + 1 4.0 + 3 2 NaN + 3 9.0 + 4 9.0 + 4 5 NaN + 6 16.0 + 7 16.0 + 8 16.0 + 5 9 NaN + 10 25.0 + """ + return super().quantile(quantile, accuracy) + def std(self) -> FrameLike: """ Calculate rolling standard deviation. @@ -1483,6 +1661,66 @@ def mean(self) -> FrameLike: """ return super().mean() + def quantile(self, quantile: float, accuracy: int = 10000) -> FrameLike: + """ + Calculate the expanding quantile of the values. + + Returns + ------- + Series or DataFrame + Returned object type is determined by the caller of the expanding + calculation. + + Parameters + ---------- + quantile : float + Value between 0 and 1 providing the quantile to compute. + accuracy : int, optional + Default accuracy of approximation. Larger value means better accuracy. + The relative error can be deduced by 1.0 / accuracy. + This is a panda-on-Spark specific parameter. + + Notes + ----- + `quantile` in pandas-on-Spark are using distributed percentile approximation + algorithm unlike pandas, the result might different with pandas (the result is + similar to the interpolation set to `lower`), also `interpolation` parameter is + not supported yet. + + the current implementation of this API uses Spark's Window without + specifying partition specification. This leads to move all data into + single partition in single machine and could cause serious + performance degradation. Avoid this method against very large dataset. + + See Also + -------- + pyspark.pandas.Series.expanding : Calling expanding with Series data. + pyspark.pandas.DataFrame.expanding : Calling expanding with DataFrames. + pyspark.pandas.Series.quantile : Aggregating quantile for Series. + pyspark.pandas.DataFrame.quantile : Aggregating quantile for DataFrame. + + Examples + -------- + The below examples will show expanding quantile calculations with window sizes of + two and three, respectively. + + >>> s = ps.Series([1, 2, 3, 4]) + >>> s.expanding(2).quantile(0.5) + 0 NaN + 1 1.0 + 2 2.0 + 3 2.0 + dtype: float64 + + >>> s.expanding(3).quantile(0.5) + 0 NaN + 1 NaN + 2 2.0 + 3 2.0 + dtype: float64 + """ + return super().quantile(quantile, accuracy) + def std(self) -> FrameLike: """ Calculate expanding standard deviation. @@ -1978,6 +2216,77 @@ def mean(self) -> FrameLike: """ return super().mean() + def quantile(self, quantile: float, accuracy: int = 10000) -> FrameLike: + """ + Calculate the expanding quantile of the values. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + quantile : float + Value between 0 and 1 providing the quantile to compute. + accuracy : int, optional + Default accuracy of approximation. Larger value means better accuracy. + The relative error can be deduced by 1.0 / accuracy. + This is a panda-on-Spark specific parameter. + + Returns + ------- + Series or DataFrame + Returned object type is determined by the caller of the expanding + calculation. + + Notes + ----- + `quantile` in pandas-on-Spark are using distributed percentile approximation + algorithm unlike pandas, the result might different with pandas, also `interpolation` + parameter is not supported yet. + + See Also + -------- + pyspark.pandas.Series.expanding : Calling expanding with Series data. + pyspark.pandas.DataFrame.expanding : Calling expanding with DataFrames. + pyspark.pandas.Series.quantile : Aggregating quantile for Series. + pyspark.pandas.DataFrame.quantile : Aggregating quantile for DataFrame. + + Examples + -------- + >>> s = ps.Series([2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5]) + >>> s.groupby(s).expanding(3).quantile(0.5).sort_index() + 2 0 NaN + 1 NaN + 3 2 NaN + 3 NaN + 4 3.0 + 4 5 NaN + 6 NaN + 7 4.0 + 8 4.0 + 5 9 NaN + 10 NaN + dtype: float64 + + For DataFrame, each expanding quantile is computed column-wise. + + >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) + >>> df.groupby(df.A).expanding(2).quantile(0.5).sort_index() + B + A + 2 0 NaN + 1 4.0 + 3 2 NaN + 3 9.0 + 4 9.0 + 4 5 NaN + 6 16.0 + 7 16.0 + 8 16.0 + 5 9 NaN + 10 25.0 + """ + return super().quantile(quantile, accuracy) + def std(self) -> FrameLike: """ Calculate expanding standard deviation.