Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/reformatters/common/shared_memory_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from reformatters.common.iterating import consume, shard_slice_indexers
from reformatters.common.logging import get_logger
from reformatters.common.types import AppendDim, ArrayFloat32
from reformatters.common.zarr import assert_fill_values_set

log = get_logger(__name__)

Expand Down Expand Up @@ -173,6 +174,9 @@ def write_shard_to_zarr(
shard_indexer: tuple[slice, ...],
) -> None:
"""Write a shard of data held in shared memory to a zarr store."""

assert_fill_values_set(processing_region_da_template)

with (
warnings.catch_warnings(),
closing(SharedMemory(name=shared_buffer_name)) as shared_memory,
Expand All @@ -199,4 +203,4 @@ def write_shard_to_zarr(
message="In a future version of xarray decode_timedelta will default to False rather than None.",
category=FutureWarning,
)
data_array[shard_indexer].to_zarr(store, region="auto") # type: ignore[call-overload]
data_array[shard_indexer].to_zarr(store, region="auto", write_empty_chunks=True) # type: ignore[call-overload]
12 changes: 12 additions & 0 deletions src/reformatters/common/template_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,18 @@ def get_template(self, end_time: DatetimeLike) -> xr.Dataset:
for coordinate in ds.coords.values():
coordinate.load()

# Work around what appears to be a bug where fill_value is not set in encodings read from existing zarr template
for coord in self.coords:
assert "fill_value" not in ds[coord.name].encoding, (
"Fill value round tripped. That's good but not the previous behavior and if you see this AND the fill_value is correct, you can remove the workaround."
)
ds[coord.name].encoding["fill_value"] = coord.encoding.fill_value
for var in self.data_vars:
assert "fill_value" not in ds[var.name].encoding, (
"Fill value round tripped. That's good but not the previous behavior and if you see this AND the fill_value is correct, you can remove the workaround."
)
ds[var.name].encoding["fill_value"] = var.encoding.fill_value

return ds

def update_template(self) -> None:
Expand Down
3 changes: 3 additions & 0 deletions src/reformatters/common/template_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from reformatters.common.config_models import Coordinate, DataVar
from reformatters.common.logging import get_logger
from reformatters.common.storage import StoreFactory, commit_if_icechunk
from reformatters.common.zarr import assert_fill_values_set

log = get_logger(__name__)

Expand All @@ -26,6 +27,8 @@ def write_metadata(
store: zarr.abc.store.Store | Path
replica_stores: list[zarr.abc.store.Store]

assert_fill_values_set(template_ds)

if isinstance(storage, StoreFactory):
store = storage.primary_store(writable=True)
assert mode is None, "mode should not be provided if StoreFactory is provided"
Expand Down
19 changes: 19 additions & 0 deletions src/reformatters/common/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,22 @@ def sync_to_store(store: zarr.abc.store.Store, key: str, data: bytes) -> None:
),
max_attempts=6,
)


def assert_fill_values_set(xr_obj: xr.Dataset | xr.DataArray) -> None:
if isinstance(xr_obj, xr.DataArray):
assert "fill_value" in xr_obj.encoding, (
f"Fill value not set for DataArray {xr_obj.name}"
)

elif isinstance(xr_obj, xr.Dataset):
for coord_name, coord in xr_obj.coords.items():
assert "fill_value" in coord.encoding, (
f"Fill value not set for coordinate {coord_name}"
)
for var_name, var in xr_obj.data_vars.items():
assert "fill_value" in var.encoding, (
f"Fill value not set for variable {var_name}"
)
else:
raise ValueError(f"Expected xr.Dataset or xr.DataArray, got {type(xr_obj)}")
15 changes: 13 additions & 2 deletions src/reformatters/contrib/noaa/ndvi_cdr/analysis/region_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ def _read_netcdf_data(
) -> ArrayFloat32 | ArrayInt16:
"""Read data from NetCDF file."""
out_dtype = data_var.encoding.dtype
encoding_fill_value = data_var.encoding.fill_value

var_name = data_var.internal_attrs.netcdf_var_name
netcdf_fill_value = data_var.internal_attrs.fill_value
Expand All @@ -183,7 +182,19 @@ def _read_netcdf_data(

# Set invalid values to NaN before scaling (for float data)
if var_name != QA_NETCDF_VAR_NAME:
result[result == netcdf_fill_value] = encoding_fill_value
# We are using a different fill value here than the data var encoding fill value
# This is because encoding fill value was previously NaN, and so when we matched
# matched our no data value, we set values to NaN. We have now changed the
# encoding fill value to 0. This is to accomdate the fact that due to an Xarray bug,
# the encoding fill value was not round tripped (it was persisted as 0 despite the
# definition in our encoding). We have updated the encoding fill value to 0 to match
# what was written at the time of our backfill. That change ensures that empty chunks
# continue to be interpreted as 0. But consequently, we need to ensure that when we
# are setting the no data value when reading the netcdf data, we continue to use NaN.
if data_var.internal_attrs.read_data_fill_value is not None:
result[result == netcdf_fill_value] = (
data_var.internal_attrs.read_data_fill_value
)

assert scale_factor is not None
assert add_offset is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class NoaaNdviCdrInternalAttrs(BaseInternalAttrs):
scale_factor: float | None = None
add_offset: float | None = None
valid_range: tuple[float, float] | None = None
read_data_fill_value: float | None = None


class NoaaNdviCdrDataVar(DataVar[NoaaNdviCdrInternalAttrs]):
Expand Down Expand Up @@ -209,7 +210,7 @@ def data_vars(self) -> Sequence[NoaaNdviCdrDataVar]:

encoding_float32_default = Encoding(
dtype="float32",
fill_value=np.nan,
fill_value=0,
chunks=tuple(var_chunks[d] for d in self.dims),
shards=tuple(var_shards[d] for d in self.dims),
compressors=[BLOSC_4BYTE_ZSTD_LEVEL3_SHUFFLE],
Expand Down Expand Up @@ -243,6 +244,7 @@ def data_vars(self) -> Sequence[NoaaNdviCdrDataVar]:
scale_factor=0.0001,
add_offset=0.0,
valid_range=(-1000, 10000),
read_data_fill_value=np.nan,
),
),
NoaaNdviCdrDataVar(
Expand All @@ -262,6 +264,7 @@ def data_vars(self) -> Sequence[NoaaNdviCdrDataVar]:
scale_factor=0.0001,
add_offset=0.0,
valid_range=(-1000, 10000),
read_data_fill_value=np.nan,
),
),
NoaaNdviCdrDataVar(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"separator": "/"
}
},
"fill_value": "NaN",
"fill_value": 0.0,
"codecs": [
{
"name": "sharding_indexed",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"separator": "/"
}
},
"fill_value": "NaN",
"fill_value": 0.0,
"codecs": [
{
"name": "sharding_indexed",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@
"separator": "/"
}
},
"fill_value": "NaN",
"fill_value": 0.0,
"codecs": [
{
"name": "sharding_indexed",
Expand Down Expand Up @@ -228,7 +228,7 @@
"separator": "/"
}
},
"fill_value": "NaN",
"fill_value": 0.0,
"codecs": [
{
"name": "sharding_indexed",
Expand Down
14 changes: 13 additions & 1 deletion src/reformatters/contrib/uarizona/swann/analysis/region_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,19 @@ def read_data(
no_data_value = -999
with rasterio.open(netcdf_path) as reader:
result: Array2D[np.float32] = reader.read(band, out_dtype=np.float32)
result[result == no_data_value] = np.nan
# We are using a different fill value here than the data var encoding fill value
# This is because encoding fill value was previously NaN, and so when we matched
# matched our no data value, we set values to NaN. We have now changed the
# encoding fill value to 0. This is to accomdate the fact that due to an Xarray bug,
# the encoding fill value was not round tripped (it was persisted as 0 despite the
# definition in our encoding). We have updated the encoding fill value to 0 to match
# what was written at the time of our backfill. That change ensures that empty chunks
# continue to be interpreted as 0. But consequently, we need to ensure that when we
# are setting the no data value when reading the netcdf data, we continue to use NaN.
if data_var.internal_attrs.read_data_fill_value is not None:
result[result == no_data_value] = (
data_var.internal_attrs.read_data_fill_value
)
assert result.shape == (621, 1405)
return result

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

class UarizonaSwannInternalAttrs(BaseInternalAttrs):
netcdf_var_name: str
read_data_fill_value: float | None


class UarizonaSwannDataVar(DataVar[UarizonaSwannInternalAttrs]):
Expand Down Expand Up @@ -197,7 +198,7 @@ def data_vars(self) -> Sequence[UarizonaSwannDataVar]:

encoding_float32_default = Encoding(
dtype="float32",
fill_value=np.nan,
fill_value=0,
chunks=tuple(var_chunks[d] for d in self.dims),
shards=tuple(var_shards[d] for d in self.dims),
compressors=[BLOSC_4BYTE_ZSTD_LEVEL3_SHUFFLE],
Expand All @@ -219,6 +220,7 @@ def data_vars(self) -> Sequence[UarizonaSwannDataVar]:
internal_attrs=UarizonaSwannInternalAttrs(
keep_mantissa_bits=default_keep_mantissa_bits,
netcdf_var_name="SWE",
read_data_fill_value=np.nan,
),
),
UarizonaSwannDataVar(
Expand All @@ -234,6 +236,7 @@ def data_vars(self) -> Sequence[UarizonaSwannDataVar]:
internal_attrs=UarizonaSwannInternalAttrs(
keep_mantissa_bits=default_keep_mantissa_bits,
netcdf_var_name="DEPTH",
read_data_fill_value=np.nan,
),
),
]
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"separator": "/"
}
},
"fill_value": "NaN",
"fill_value": 0.0,
"codecs": [
{
"name": "sharding_indexed",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"separator": "/"
}
},
"fill_value": "NaN",
"fill_value": 0.0,
"codecs": [
{
"name": "sharding_indexed",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@
"separator": "/"
}
},
"fill_value": "NaN",
"fill_value": 0.0,
"codecs": [
{
"name": "sharding_indexed",
Expand Down Expand Up @@ -228,7 +228,7 @@
"separator": "/"
}
},
"fill_value": "NaN",
"fill_value": 0.0,
"codecs": [
{
"name": "sharding_indexed",
Expand Down
Loading