Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7401a36
test: rewrite test to allow better parametrization
Alvaro-Kothe Oct 24, 2025
23345cf
fix(rolling.skew): handle outliers in window
Alvaro-Kothe Oct 25, 2025
488e99e
test: change tested behavior
Alvaro-Kothe Oct 26, 2025
ff9108d
docs(whatsnew): add bufix entry
Alvaro-Kothe Oct 26, 2025
a2d0937
docs(whatsnew): add change for degenerate distribution
Alvaro-Kothe Oct 26, 2025
8020107
docs: link issue
Alvaro-Kothe Oct 26, 2025
edc2393
perf: recompute the window less
Alvaro-Kothe Oct 26, 2025
ae1f3f9
perf: use fma for `m3_update`
Alvaro-Kothe Oct 27, 2025
5cc8b60
chore: add formulas for update
Alvaro-Kothe Oct 27, 2025
7f1d2a3
test(rolling): undo test modifications for degenerate windows
Alvaro-Kothe Oct 28, 2025
0aa2949
docs: remove note about nan change
Alvaro-Kothe Oct 28, 2025
6f21167
fix: undo scale check for m2
Alvaro-Kothe Oct 28, 2025
cca03e9
fix: undo NaN changes
Alvaro-Kothe Oct 28, 2025
6177d6a
chore: simplifies diff
Alvaro-Kothe Oct 28, 2025
9d9451d
fix: re-add consecutive count reset to recompute
Alvaro-Kothe Oct 29, 2025
74aba02
chore: remove extra whitespace
Alvaro-Kothe Oct 29, 2025
a1fb800
chore: justify ill-condition tolerance
Alvaro-Kothe Oct 31, 2025
7d931d7
Merge remote-tracking branch 'upstream/main' into fix/polluted-window…
Alvaro-Kothe Nov 3, 2025
b6b68b9
chore: rename IncConditionNumber -> InvCondTol
Alvaro-Kothe Nov 3, 2025
4a095ac
chore: add github issue to test
Alvaro-Kothe Nov 3, 2025
7f2a748
perf: stop using `fma`
Alvaro-Kothe Nov 3, 2025
f5f0fed
fix: pass memview to `roll_skew`
Alvaro-Kothe Nov 4, 2025
7db1cbc
Merge branch 'main' into fix/polluted-window-skew
Alvaro-Kothe Nov 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/source/whatsnew/v3.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,7 @@ Groupby/resample/rolling
- Bug in :meth:`DataFrameGroupby.transform` and :meth:`SeriesGroupby.transform` with a reducer and ``observed=False`` that coerces dtype to float when there are unobserved categories. (:issue:`55326`)
- Bug in :meth:`Rolling.apply` for ``method="table"`` where column order was not being respected due to the columns getting sorted by default. (:issue:`59666`)
- Bug in :meth:`Rolling.apply` where the applied function could be called on fewer than ``min_period`` periods if ``method="table"``. (:issue:`58868`)
- Bug in :meth:`Rolling.skew` incorrectly computing skewness for windows following outliers due to numerical instability. The calculation now properly handles catastrophic cancellation by recomputing affected windows (:issue:`47461`)
- Bug in :meth:`Series.resample` could raise when the date range ended shortly before a non-existent time. (:issue:`58380`)
- Bug in :meth:`Series.resample` raising error when resampling non-nanosecond resolutions out of bounds for nanosecond precision (:issue:`57427`)

Expand Down
202 changes: 102 additions & 100 deletions pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# cython: boundscheck=False, wraparound=False, cdivision=True

from libc.math cimport (
fabs,
round,
signbit,
sqrt,
Expand Down Expand Up @@ -60,6 +61,12 @@ cdef:
float64_t MAXfloat64 = np.inf

float64_t NaN = <float64_t>np.nan
float64_t EpsF64 = np.finfo(np.float64).eps

# Consider an operation ill-conditioned if
# it will only have up to 3 significant digits in base 10 remaining.
# https://en.wikipedia.org/wiki/Condition_number
float64_t InvCondTol = EpsF64 * 1e3

cdef bint is_monotonic_increasing_start_end_bounds(
ndarray[int64_t, ndim=1] start, ndarray[int64_t, ndim=1] end
Expand Down Expand Up @@ -482,75 +489,74 @@ def roll_var(const float64_t[:] values, ndarray[int64_t] start,


cdef float64_t calc_skew(int64_t minp, int64_t nobs,
float64_t x, float64_t xx, float64_t xxx,
float64_t mean, float64_t m2, float64_t m3,
int64_t num_consecutive_same_value
) noexcept nogil:
cdef:
float64_t result, dnobs
float64_t A, B, C, R
float64_t moments_ratio, correction
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks you for using descriptive names for these variables!


if nobs >= minp:
dnobs = <float64_t>nobs
A = x / dnobs
B = xx / dnobs - A * A
C = xxx / dnobs - A * A * A - 3 * A * B

if nobs < 3:
result = NaN
# GH 42064 46431
# uniform case, force result to be 0
elif num_consecutive_same_value >= nobs:
result = 0.0
# #18044: with uniform distribution, floating issue will
# cause B != 0. and cause the result is a very
# #18044: with degenerate distribution, floating issue will
# cause m2 != 0. and cause the result is a very
# large number.
#
# in core/nanops.py nanskew/nankurt call the function
# _zero_out_fperr(m2) to fix floating error.
# if the variance is less than 1e-14, it could be
# treat as zero, here we follow the original
# skew/kurt behaviour to check B <= 1e-14
elif B <= 1e-14:
# skew/kurt behaviour to check m2 <= n * 1e-14
elif m2 <= dnobs * 1e-14:
result = NaN
else:
R = sqrt(B)
result = ((sqrt(dnobs * (dnobs - 1.)) * C) /
((dnobs - 2) * R * R * R))
moments_ratio = m3 / (m2 * sqrt(m2))
correction = dnobs * sqrt((dnobs - 1)) / (dnobs - 2)
result = moments_ratio * correction
else:
result = NaN

return result


cdef void add_skew(float64_t val, int64_t *nobs,
float64_t *x, float64_t *xx,
float64_t *xxx,
float64_t *compensation_x,
float64_t *compensation_xx,
float64_t *compensation_xxx,
float64_t *mean, float64_t *m2,
float64_t *m3,
bint *numerically_unstable,
int64_t *num_consecutive_same_value,
float64_t *prev_value,
) noexcept nogil:
""" add a value from the skew calc """
cdef:
float64_t y, t
float64_t n, delta, delta_n, term1, m3_update, new_m3

# Formulas adapted from
# https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics

# Not NaN
if val == val:
nobs[0] = nobs[0] + 1

y = val - compensation_x[0]
t = x[0] + y
compensation_x[0] = t - x[0] - y
x[0] = t
y = val * val - compensation_xx[0]
t = xx[0] + y
compensation_xx[0] = t - xx[0] - y
xx[0] = t
y = val * val * val - compensation_xxx[0]
t = xxx[0] + y
compensation_xxx[0] = t - xxx[0] - y
xxx[0] = t
nobs[0] += 1
n = <float64_t>(nobs[0])
delta = val - mean[0]
delta_n = delta / n
term1 = delta * delta_n * (n - 1.0)

m3_update = delta_n * (term1 * (n - 2.0) - 3.0 * m2[0])
new_m3 = m3[0] + m3_update
if (fabs(m3_update) + fabs(m3[0])) * InvCondTol > fabs(new_m3):
# possible catastrophic cancellation
numerically_unstable[0] = True

m3[0] = new_m3
m2[0] += term1
mean[0] += delta_n

# GH#42064, record num of same values to remove floating point artifacts
if val == prev_value[0]:
Expand All @@ -562,116 +568,112 @@ cdef void add_skew(float64_t val, int64_t *nobs,


cdef void remove_skew(float64_t val, int64_t *nobs,
float64_t *x, float64_t *xx,
float64_t *xxx,
float64_t *compensation_x,
float64_t *compensation_xx,
float64_t *compensation_xxx) noexcept nogil:
float64_t *mean, float64_t *m2,
float64_t *m3,
bint *numerically_unstable) noexcept nogil:
""" remove a value from the skew calc """
cdef:
float64_t y, t
float64_t n, delta, delta_n, term1, m3_update, new_m3

# This is the online update for the central moments
# when we remove an observation.
#
# δ = x - m_{n+1}
# m_{n} = m_{n+1} - (δ / n)
# m²_n = Σ_{i=1}^{n+1}(x_i - m_{n})² - (x - m_{n})² # uses new mean
# = m²_{n+1} - (δ²/n)*(n+1)
# m³_n = Σ_{i=1}^{n+1}(x_i - m_{n})³ - (x - m_{n})³ # uses new mean
# = m³_{n+1} - (δ³/n²)*(n+1)*(n+2) + 3 * m²_{n+1}*(δ/n)

# Not NaN
if val == val:
nobs[0] = nobs[0] - 1
nobs[0] -= 1
n = <float64_t>(nobs[0])
delta = val - mean[0]
delta_n = delta / n
term1 = delta_n * delta * (n + 1.0)

y = - val - compensation_x[0]
t = x[0] + y
compensation_x[0] = t - x[0] - y
x[0] = t
y = - val * val - compensation_xx[0]
t = xx[0] + y
compensation_xx[0] = t - xx[0] - y
xx[0] = t
y = - val * val * val - compensation_xxx[0]
t = xxx[0] + y
compensation_xxx[0] = t - xxx[0] - y
xxx[0] = t
m3_update = delta_n * (term1 * (n + 2.0) - 3.0 * m2[0])
new_m3 = m3[0] - m3_update

if (fabs(m3_update) + fabs(m3[0])) * InvCondTol > fabs(new_m3):
# possible catastrophic cancellation
numerically_unstable[0] = True

m3[0] = new_m3
m2[0] -= term1
mean[0] -= delta_n


def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start,
def roll_skew(const float64_t[:] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp) -> np.ndarray:
cdef:
Py_ssize_t i, j
float64_t val, min_val, mean_val, sum_val = 0
float64_t compensation_xxx_add, compensation_xxx_remove
float64_t compensation_xx_add, compensation_xx_remove
float64_t compensation_x_add, compensation_x_remove
float64_t x, xx, xxx
float64_t val
float64_t mean, m2, m3
float64_t prev_value
int64_t nobs = 0, N = len(start), V = len(values), nobs_mean = 0
int64_t nobs = 0, N = len(start)
int64_t s, e, num_consecutive_same_value
ndarray[float64_t] output, values_copy
ndarray[float64_t] output
bint is_monotonic_increasing_bounds
bint requires_recompute, numerically_unstable = False

minp = max(minp, 3)
is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds(
start, end
)
output = np.empty(N, dtype=np.float64)
min_val = np.nanmin(values)
values_copy = np.copy(values)
Comment on lines -612 to -613
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Can we type values in the roll_skew signature as const float64_t[:] now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I ran the window tests with UBSAN and it didn't report a problem.


with nogil:
for i in range(0, V):
val = values_copy[i]
if val == val:
nobs_mean += 1
sum_val += val
mean_val = sum_val / nobs_mean
# Other cases would lead to imprecision for smallest values
if min_val - mean_val > -1e5:
mean_val = round(mean_val)
for i in range(0, V):
values_copy[i] = values_copy[i] - mean_val

for i in range(0, N):

s = start[i]
e = end[i]

# Over the first window, observations can only be added
# never removed
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:

prev_value = values[s]
num_consecutive_same_value = 0

compensation_xxx_add = compensation_xxx_remove = 0
compensation_xx_add = compensation_xx_remove = 0
compensation_x_add = compensation_x_remove = 0
x = xx = xxx = 0
nobs = 0
for j in range(s, e):
val = values_copy[j]
add_skew(val, &nobs, &x, &xx, &xxx, &compensation_x_add,
&compensation_xx_add, &compensation_xxx_add,
&num_consecutive_same_value, &prev_value)

else:
requires_recompute = (
i == 0
or not is_monotonic_increasing_bounds
or s >= end[i - 1]
)

if not requires_recompute:
# After the first window, observations can both be added
# and removed
# calculate deletes
for j in range(start[i - 1], s):
val = values_copy[j]
remove_skew(val, &nobs, &x, &xx, &xxx, &compensation_x_remove,
&compensation_xx_remove, &compensation_xxx_remove)
val = values[j]
remove_skew(val, &nobs, &mean, &m2, &m3, &numerically_unstable)

# calculate adds
for j in range(end[i - 1], e):
val = values_copy[j]
add_skew(val, &nobs, &x, &xx, &xxx, &compensation_x_add,
&compensation_xx_add, &compensation_xxx_add,
val = values[j]
add_skew(val, &nobs, &mean, &m2, &m3, &numerically_unstable,
&num_consecutive_same_value, &prev_value)

output[i] = calc_skew(minp, nobs, x, xx, xxx, num_consecutive_same_value)
if requires_recompute or numerically_unstable:

prev_value = values[s]
num_consecutive_same_value = 0

mean = m2 = m3 = 0.0
nobs = 0

for j in range(s, e):
val = values[j]
add_skew(val, &nobs, &mean, &m2, &m3, &numerically_unstable,
&num_consecutive_same_value, &prev_value)

numerically_unstable = False

output[i] = calc_skew(minp, nobs, mean, m2, m3, num_consecutive_same_value)

if not is_monotonic_increasing_bounds:
nobs = 0
x = 0.0
xx = 0.0
xxx = 0.0
mean = 0.0
m2 = 0.0
m3 = 0.0

return output

Expand Down
37 changes: 27 additions & 10 deletions pandas/tests/window/test_rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,9 @@ def test_rolling_decreasing_indices(method):
increasing = getattr(df.rolling(window=5), method)()
decreasing = getattr(df_reverse.rolling(window=5), method)()

assert np.abs(decreasing.values[::-1][:-4] - increasing.values[4:]).max() < 1e-12
tm.assert_almost_equal(
decreasing.values[::-1][:-4], increasing.values[4:], atol=1e-12
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -1441,17 +1443,30 @@ def test_rolling_skew_kurt_numerical_stability(method):


@pytest.mark.parametrize(
("method", "values"),
("method", "data", "values"),
[
("skew", [2.0, 0.854563, 0.0, 1.999984]),
("kurt", [4.0, -1.289256, -1.2, 3.999946]),
(
"skew",
[3000000, 1, 1, 2, 3, 4, 999],
[np.nan] * 3 + [2.0, 0.854563, 0.0, 1.999984],
),
(
"skew",
[1e6, -1e6, 1, 2, 3, 4, 5, 6],
[np.nan] * 3 + [-5.51135192e-06, -2.0, 0.0, 0.0, 0.0],
),
(
"kurt",
[3000000, 1, 1, 2, 3, 4, 999],
[np.nan] * 3 + [4.0, -1.289256, -1.2, 3.999946],
),
],
)
def test_rolling_skew_kurt_large_value_range(method, values):
# GH: 37557
s = Series([3000000, 1, 1, 2, 3, 4, 999])
def test_rolling_skew_kurt_large_value_range(method, data, values):
# GH: 37557, 47461
s = Series(data)
result = getattr(s.rolling(4), method)()
expected = Series([np.nan] * 3 + values)
expected = Series(values)
tm.assert_series_equal(result, expected)


Expand Down Expand Up @@ -1837,9 +1852,11 @@ def test_rolling_skew_kurt_floating_artifacts():
sr = Series([1 / 3, 4, 0, 0, 0, 0, 0])
r = sr.rolling(4)
result = r.skew()
assert (result[-2:] == 0).all()
expected = Series([np.nan, np.nan, np.nan, 1.9619045191072484, 2.0, 0.0, 0.0])
tm.assert_series_equal(result, expected)
result = r.kurt()
assert (result[-2:] == -3).all()
expected = Series([np.nan, np.nan, np.nan, 3.8636048803878786, 4.0, -3.0, -3.0])
tm.assert_series_equal(result, expected)


def test_numeric_only_frame(arithmetic_win_operators, numeric_only):
Expand Down
Loading