-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add chunk-friendly code path to encode_cf_datetime
and encode_cf_timedelta
#8575
Changes from 9 commits
644cc4d
2558164
180dbcd
edd587c
2a243dd
a28bc2a
8ede271
eea3bb7
207a725
2797f1c
eed6ab7
bf4da23
23081b6
b65dfab
d4071c9
141f1c2
2e56529
5271842
bf1f6ba
63c32d8
7230c1a
9d12948
a1c8133
4b1e978
2f59fbe
76761ba
52d6428
6b4127e
d9d9701
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -5,7 +5,7 @@ | |||||||
from collections.abc import Hashable | ||||||||
from datetime import datetime, timedelta | ||||||||
from functools import partial | ||||||||
from typing import TYPE_CHECKING, Callable, Union | ||||||||
from typing import TYPE_CHECKING, Callable, TypeVar, Union | ||||||||
|
||||||||
import numpy as np | ||||||||
import pandas as pd | ||||||||
|
@@ -22,6 +22,7 @@ | |||||||
) | ||||||||
from xarray.core import indexing | ||||||||
from xarray.core.common import contains_cftime_datetimes, is_np_datetime_like | ||||||||
from xarray.core.duck_array_ops import asarray | ||||||||
from xarray.core.formatting import first_n_items, format_timestamp, last_item | ||||||||
from xarray.core.pdcompat import nanosecond_precision_timestamp | ||||||||
from xarray.core.pycompat import is_duck_dask_array | ||||||||
|
@@ -38,6 +39,14 @@ | |||||||
|
||||||||
T_Name = Union[Hashable, None] | ||||||||
|
||||||||
# Copied from types.py to avoid a circular import | ||||||||
try: | ||||||||
from dask.array import Array as DaskArray | ||||||||
except ImportError: | ||||||||
DaskArray = np.ndarray # type: ignore | ||||||||
|
||||||||
T_NumPyOrDaskArray = TypeVar("T_NumPyOrDaskArray", np.ndarray, DaskArray) | ||||||||
|
||||||||
# standard calendars recognized by cftime | ||||||||
_STANDARD_CALENDARS = {"standard", "gregorian", "proleptic_gregorian"} | ||||||||
|
||||||||
|
@@ -667,12 +676,48 @@ def _division(deltas, delta, floor): | |||||||
return num | ||||||||
|
||||||||
|
||||||||
def _cast_to_dtype_if_safe(num: np.ndarray, dtype: np.dtype) -> np.ndarray: | ||||||||
with warnings.catch_warnings(): | ||||||||
warnings.filterwarnings("ignore", message="overflow") | ||||||||
cast_num = np.asarray(num, dtype=dtype) | ||||||||
|
||||||||
if np.issubdtype(dtype, np.integer): | ||||||||
if not (num == cast_num).all(): | ||||||||
if np.issubdtype(num.dtype, np.floating): | ||||||||
raise ValueError( | ||||||||
f"Not possible to cast all encoded times from " | ||||||||
f"{num.dtype!r} to {dtype!r} without losing precision. " | ||||||||
f"Consider modifying the units such that integer values " | ||||||||
f"can be used, or removing the units and dtype encoding, " | ||||||||
f"at which point xarray will make an appropriate choice." | ||||||||
) | ||||||||
else: | ||||||||
raise OverflowError( | ||||||||
f"Not possible to cast encoded times from " | ||||||||
f"{num.dtype!r} to {dtype!r} without overflow. Consider " | ||||||||
f"removing the dtype encoding, at which point xarray will " | ||||||||
f"make an appropriate choice, or explicitly switching to " | ||||||||
"a larger integer dtype." | ||||||||
) | ||||||||
else: | ||||||||
if np.isinf(cast_num).any(): | ||||||||
raise OverflowError( | ||||||||
f"Not possible to cast encoded times from {num.dtype!r} to " | ||||||||
f"{dtype!r} without overflow. Consider removing the dtype " | ||||||||
f"encoding, at which point xarray will make an appropriate " | ||||||||
f"choice, or explicitly switching to a larger floating point " | ||||||||
f"dtype." | ||||||||
) | ||||||||
|
||||||||
return cast_num | ||||||||
|
||||||||
|
||||||||
def encode_cf_datetime( | ||||||||
dates, | ||||||||
dates: T_NumPyOrDaskArray, | ||||||||
units: str | None = None, | ||||||||
calendar: str | None = None, | ||||||||
dtype: np.dtype | None = None, | ||||||||
) -> tuple[np.ndarray, str, str]: | ||||||||
) -> tuple[T_NumPyOrDaskArray, str, str]: | ||||||||
"""Given an array of datetime objects, returns the tuple `(num, units, | ||||||||
calendar)` suitable for a CF compliant time variable. | ||||||||
|
||||||||
|
@@ -682,6 +727,24 @@ def encode_cf_datetime( | |||||||
-------- | ||||||||
cftime.date2num | ||||||||
""" | ||||||||
dates = asarray(dates) | ||||||||
if isinstance(dates, np.ndarray): | ||||||||
return _eagerly_encode_cf_datetime(dates, units, calendar, dtype) | ||||||||
elif is_duck_dask_array(dates): | ||||||||
return _lazily_encode_cf_datetime(dates, units, calendar, dtype) | ||||||||
else: | ||||||||
raise ValueError( | ||||||||
f"dates provided have an unsupported array type: {type(dates)}." | ||||||||
) | ||||||||
|
||||||||
|
||||||||
def _eagerly_encode_cf_datetime( | ||||||||
dates: np.ndarray, | ||||||||
spencerkclark marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
units: str | None = None, | ||||||||
calendar: str | None = None, | ||||||||
dtype: np.dtype | None = None, | ||||||||
allow_units_modification: bool = True, | ||||||||
) -> tuple[np.ndarray, str, str]: | ||||||||
dates = np.asarray(dates) | ||||||||
|
||||||||
data_units = infer_datetime_units(dates) | ||||||||
|
@@ -731,7 +794,7 @@ def encode_cf_datetime( | |||||||
f"Set encoding['dtype'] to integer dtype to serialize to int64. " | ||||||||
f"Set encoding['dtype'] to floating point dtype to silence this warning." | ||||||||
) | ||||||||
elif np.issubdtype(dtype, np.integer): | ||||||||
elif np.issubdtype(dtype, np.integer) and allow_units_modification: | ||||||||
new_units = f"{needed_units} since {format_timestamp(ref_date)}" | ||||||||
emit_user_level_warning( | ||||||||
f"Times can't be serialized faithfully to int64 with requested units {units!r}. " | ||||||||
|
@@ -752,11 +815,84 @@ def encode_cf_datetime( | |||||||
# we already covered for this in pandas-based flow | ||||||||
num = cast_to_int_if_safe(num) | ||||||||
|
||||||||
return (num, units, calendar) | ||||||||
if dtype is not None: | ||||||||
num = _cast_to_dtype_if_safe(num, dtype) | ||||||||
|
||||||||
return num, units, calendar | ||||||||
|
||||||||
|
||||||||
def _encode_cf_datetime_within_map_blocks( | ||||||||
dates: np.ndarray, | ||||||||
units: str, | ||||||||
calendar: str, | ||||||||
dtype: np.dtype, | ||||||||
) -> np.ndarray: | ||||||||
num, *_ = _eagerly_encode_cf_datetime( | ||||||||
dates, units, calendar, dtype, allow_units_modification=False | ||||||||
) | ||||||||
return num | ||||||||
|
||||||||
|
||||||||
def _lazily_encode_cf_datetime( | ||||||||
dates: DaskArray, | ||||||||
units: str | None = None, | ||||||||
calendar: str | None = None, | ||||||||
dtype: np.dtype | None = None, | ||||||||
) -> tuple[DaskArray, str, str]: | ||||||||
import dask.array | ||||||||
|
||||||||
if calendar is None: | ||||||||
# This will only trigger minor compute if dates is an object dtype array. | ||||||||
calendar = infer_calendar_name(dates) | ||||||||
|
||||||||
if units is None and dtype is None: | ||||||||
if dates.dtype == "O": | ||||||||
units = "microseconds since 1970-01-01" | ||||||||
dtype = np.dtype("int64") | ||||||||
else: | ||||||||
units = "nanoseconds since 1970-01-01" | ||||||||
dtype = np.dtype("int64") | ||||||||
|
||||||||
if units is None or dtype is None: | ||||||||
raise ValueError( | ||||||||
f"When encoding chunked arrays of datetime values, both the units " | ||||||||
f"and dtype must be prescribed or both must be unprescribed. " | ||||||||
f"Prescribing only one or the other is not currently supported. " | ||||||||
f"Got a units encoding of {units} and a dtype encoding of {dtype}." | ||||||||
) | ||||||||
|
||||||||
num = dask.array.map_blocks( | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There's a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah that's neat! Is there anything special I need to do to test this or is it sufficient to demonstrate that it at least works with dask (as my tests already do)? One other minor detail is how to handle typing with this. I gave it a shot (hopefully I'm in the right ballpark), but I still encounter this error:
I'm not a typing expert so any guidance would be appreciated! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I believe the dask tests are fine since the cubed tests live elsewhere.
cc @TomNicholas There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yep.
The correct
This can't be typed properly yet (because we don't yet have a full protocol to describe the T_ChunkedArray = TypeVar("T_ChunkedArray", bound=Any) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gotcha, thanks for the typing guidance @TomNicholas. Adding
Do you have any thoughts on how to handle that? I'm guessing the Line 176 in 41d33f5
Does #8575 (comment) make the output side of this more difficult to handle as well (at least if we want to be as accurate as possible)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @TomNicholas do you have any thoughts on this? From my perspective the only thing holding this PR up at this point is typing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey, sorry for forgetting about this @spencerkclark !
Yes - that fixes some typing errors in the
I agree with Deepak that typing should not hold this up. Even if it needs some |
||||||||
_encode_cf_datetime_within_map_blocks, | ||||||||
dates, | ||||||||
units, | ||||||||
calendar, | ||||||||
dtype, | ||||||||
dtype=dtype, | ||||||||
) | ||||||||
return num, units, calendar | ||||||||
|
||||||||
|
||||||||
def encode_cf_timedelta( | ||||||||
timedeltas, units: str | None = None, dtype: np.dtype | None = None | ||||||||
timedeltas: T_NumPyOrDaskArray, | ||||||||
units: str | None = None, | ||||||||
dtype: np.dtype | None = None, | ||||||||
) -> tuple[T_NumPyOrDaskArray, str]: | ||||||||
timedeltas = asarray(timedeltas) | ||||||||
if isinstance(timedeltas, np.ndarray): | ||||||||
spencerkclark marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
return _eagerly_encode_cf_timedelta(timedeltas, units, dtype) | ||||||||
elif is_duck_dask_array(timedeltas): | ||||||||
return _lazily_encode_cf_timedelta(timedeltas, units, dtype) | ||||||||
else: | ||||||||
raise ValueError( | ||||||||
f"timedeltas provided have an unsupported array type: {type(timedeltas)}." | ||||||||
) | ||||||||
|
||||||||
|
||||||||
def _eagerly_encode_cf_timedelta( | ||||||||
timedeltas: np.ndarray, | ||||||||
units: str | None = None, | ||||||||
dtype: np.dtype | None = None, | ||||||||
allow_units_modification: bool = True, | ||||||||
) -> tuple[np.ndarray, str]: | ||||||||
data_units = infer_timedelta_units(timedeltas) | ||||||||
|
||||||||
|
@@ -784,7 +920,7 @@ def encode_cf_timedelta( | |||||||
f"Set encoding['dtype'] to integer dtype to serialize to int64. " | ||||||||
f"Set encoding['dtype'] to floating point dtype to silence this warning." | ||||||||
) | ||||||||
elif np.issubdtype(dtype, np.integer): | ||||||||
elif np.issubdtype(dtype, np.integer) and allow_units_modification: | ||||||||
emit_user_level_warning( | ||||||||
f"Timedeltas can't be serialized faithfully with requested units {units!r}. " | ||||||||
f"Serializing with units {needed_units!r} instead. " | ||||||||
|
@@ -797,7 +933,50 @@ def encode_cf_timedelta( | |||||||
|
||||||||
num = _division(time_deltas, time_delta, floor_division) | ||||||||
num = num.values.reshape(timedeltas.shape) | ||||||||
return (num, units) | ||||||||
|
||||||||
if dtype is not None: | ||||||||
num = _cast_to_dtype_if_safe(num, dtype) | ||||||||
|
||||||||
return num, units | ||||||||
|
||||||||
|
||||||||
def _encode_cf_timedelta_within_map_blocks( | ||||||||
timedeltas: np.ndarray, | ||||||||
units: str, | ||||||||
dtype: np.dtype, | ||||||||
) -> np.ndarray: | ||||||||
num, _ = _eagerly_encode_cf_timedelta( | ||||||||
timedeltas, units, dtype, allow_units_modification=False | ||||||||
) | ||||||||
return num | ||||||||
|
||||||||
|
||||||||
def _lazily_encode_cf_timedelta( | ||||||||
timedeltas: DaskArray, units: str | None = None, dtype: np.dtype | None = None | ||||||||
) -> tuple[DaskArray, str]: | ||||||||
import dask.array | ||||||||
|
||||||||
if units is None and dtype is None: | ||||||||
units = "nanoseconds" | ||||||||
dtype = np.dtype("int64") | ||||||||
|
||||||||
if units is None or dtype is None: | ||||||||
raise ValueError( | ||||||||
f"When encoding chunked arrays of timedelta values, both the " | ||||||||
f"units and dtype must be prescribed or both must be " | ||||||||
f"unprescribed. Prescribing only one or the other is not " | ||||||||
f"currently supported. Got a units encoding of {units} and a " | ||||||||
f"dtype encoding of {dtype}." | ||||||||
) | ||||||||
|
||||||||
num = dask.array.map_blocks( | ||||||||
spencerkclark marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
_encode_cf_timedelta_within_map_blocks, | ||||||||
timedeltas, | ||||||||
units, | ||||||||
dtype, | ||||||||
dtype=dtype, | ||||||||
) | ||||||||
return num, units | ||||||||
|
||||||||
|
||||||||
class CFDatetimeCoder(VariableCoder): | ||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason other arrays might fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, I do not think so. It's mainly that mypy complains if the function may not always return something:
I suppose instead of
elif is_duck_array(dates)
I could simply useelse
, since I useasarray
above?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess one issue that occurs to me is that as written I think
_eagerly_encode_cf_datetime
will always return a NumPy array, due to its reliance on doing calculations through pandas or cftime. In other words it won't necessarily fail, but it will not work quite like the other array types, where the type you put in is what you get out.