diff --git a/lib/iris/experimental/ugrid.py b/lib/iris/experimental/ugrid.py index 488212ff9b..002c40952f 100644 --- a/lib/iris/experimental/ugrid.py +++ b/lib/iris/experimental/ugrid.py @@ -10,6 +10,7 @@ """ +from collections import Mapping, namedtuple from functools import wraps import dask.array as da @@ -17,7 +18,10 @@ from .. import _lazy_data as _lazy from ..common.metadata import ( + _hexdigest, BaseMetadata, + CoordMetadata, + DimCoordMetadata, metadata_manager_factory, SERVICES, SERVICES_COMBINE, @@ -25,16 +29,48 @@ SERVICES_DIFFERENCE, ) from ..common.lenient import _lenient_service as lenient_service -from ..coords import _DimensionalMetadata +from ..common.mixin import CFVariableMixin +from ..config import get_logger +from ..coords import _DimensionalMetadata, AuxCoord +from ..exceptions import CoordinateNotFoundError +from ..util import guess_coord_axis __all__ = [ "Connectivity", "ConnectivityMetadata", + "Mesh1DCoords", + "Mesh2DCoords", + "MeshEdgeCoords", + "MeshFaceCoords", + "MeshNodeCoords", "MeshMetadata", ] +# Configure the logger. +logger = get_logger(__name__, fmt="[%(cls)s.%(funcName)s]") + + +# Mesh dimension names namedtuples. +Mesh1DNames = namedtuple("Mesh1DNames", ["node_dimension", "edge_dimension"]) +Mesh2DNames = namedtuple( + "Mesh2DNames", ["node_dimension", "edge_dimension", "face_dimension"] +) + +# Mesh coordinate manager namedtuples. +Mesh1DCoords = namedtuple( + "Mesh1DCoords", ["node_x", "node_y", "edge_x", "edge_y"] +) +Mesh2DCoords = namedtuple( + "Mesh2DCoords", + ["node_x", "node_y", "edge_x", "edge_y", "face_x", "face_y"], +) +MeshNodeCoords = namedtuple("MeshNodeCoords", ["node_x", "node_y"]) +MeshEdgeCoords = namedtuple("MeshEdgeCoords", ["edge_x", "edge_y"]) +MeshFaceCoords = namedtuple("MeshFaceCoords", ["face_x", "face_y"]) + + class Connectivity(_DimensionalMetadata): """ A CF-UGRID topology connectivity, describing the topological relationship @@ -737,358 +773,913 @@ def equal(self, other, lenient=None): return super().equal(other, lenient=lenient) -# class Mesh(CFVariableMixin): -# """ -# -# .. todo:: -# -# .. questions:: -# -# - decide on the verbose/succinct version of __str__ vs __repr__ -# -# .. notes:: -# -# - the mesh is location agnostic -# -# - no need to support volume at mesh level, yet -# -# - topology_dimension -# - use for fast equality between Mesh instances -# - checking connectivity dimensionality, specifically the highest dimensonality of the -# "geometric element" being added i.e., reference the src_location/tgt_location -# - used to honour and enforce the minimum UGRID connectivity contract -# -# - support pickling -# -# - copy is off the table!! -# -# - MeshCoord.guess_points() -# - MeshCoord.to_AuxCoord() -# -# - don't provide public methods to return the coordinate and connectivity -# managers -# -# """ -# def __init__( -# self, -# topology_dimension, -# standard_name=None, -# long_name=None, -# var_name=None, -# units=None, -# attributes=None, -# node_dimension=None, -# edge_dimension=None, -# face_dimension=None, -# node_coords_and_axes=None, # [(coord, "x"), (coord, "y")] this is a stronger contract, not relying on guessing -# edge_coords_and_axes=None, # ditto -# face_coords_and_axes=None, # ditto -# connectivities=None, # [Connectivity, [Connectivity], ...] -# ): -# # TODO: support volumes. -# # TODO: support (coord, "z") -# -# # These are strings, if None is provided then assign the default string. -# self.node_dimension = node_dimension -# self.edge_dimension = edge_dimension -# self.face_dimension = face_dimension -# -# self._metadata_manager = metadata_manager_factory(MeshMetadata) -# -# self._metadata_manager.topology_dimension = topology_dimension -# -# self.standard_name = standard_name -# self.long_name = long_name -# self.var_name = var_name -# self.units = units -# self.attributes = attributes -# -# # based on the topology_dimension create the appropriate coordinate manager -# # with some intelligence -# self._coord_manager = ... -# -# # based on the topology_dimension create the appropriate connectivity manager -# # with some intelligence -# self._connectivity_manager = ... -# -# @property -# def all_coords(self): -# # return a namedtuple -# # coords = mesh.all_coords -# # coords.face_x, coords.edge_y -# pass -# -# @property -# def node_coords(self): -# # return a namedtuple -# # node_coords = mesh.node_coords -# # node_coords.x -# # node_coords.y -# pass -# -# @property -# def edge_coords(self): -# # as above -# pass -# -# @property -# def face_coords(self): -# # as above -# pass -# -# @property -# def all_connectivities(self): -# # return a namedtuple -# # conns = mesh.all_connectivities -# # conns.edge_node, conns.boundary_node -# pass -# -# @property -# def face_node_connectivity(self): -# # required -# return self._connectivity_manager.face_node -# -# @property -# def edge_node_connectivity(self): -# # optionally required -# return self._connectivity_manager.edge_node -# -# @property -# def face_edge_connectivity(self): -# # optional -# return self._connectivity_manager.face_edge -# -# @property -# def face_face_connectivity(self): -# # optional -# return self._connectivity_manager.face_face -# -# @property -# def edge_face_connectivity(self): -# # optional -# return self._connectivity_manager.edge_face -# -# @property -# def boundary_node_connectivity(self): -# # optional -# return self._connectivity_manager.boundard_node -# -# def coord(self, ...): -# # as Cube.coord i.e., ensure that one and only one coord-like is returned -# # otherwise raise and exception -# pass -# -# def coords( -# self, -# name_or_coord=None, -# standard_name=None, -# long_name=None, -# var_name=None, -# attributes=None, -# axis=None, -# node=False, -# edge=False, -# face=False, -# ): -# # do we support the coord_system kwargs? -# self._coord_manager.coords(...) -# -# def connectivity(self, ...): -# pass -# -# def connectivities( -# self, -# name_or_coord=None, -# standard_name=None, -# long_name=None, -# var_name=None, -# attributes=None, -# node=False, -# edge=False, -# face=False, -# ): -# pass -# -# def add_coords(self, node_x=None, node_y=None, edge_x=None, edge_y=None, face_x=None, face_y=None): -# # this supports adding a new coord to the manager, but also replacing an existing coord -# self._coord_manager.add(...) -# -# def add_connectivities(self, *args): -# # this supports adding a new connectivity to the manager, but also replacing an existing connectivity -# self._connectivity_manager.add(*args) -# -# def remove_coords(self, ...): -# # could provide the "name", "metadata", "coord"-instance -# # this could use mesh.coords() to find the coords -# self._coord_manager.remove(...) -# -# def remove_connectivities(self, ...): -# # needs to respect the minimum UGRID contract -# self._connectivity_manager.remove(...) -# -# def __eq__(self, other): -# # Full equality could be MASSIVE, so we want to avoid that. -# # Ideally we want a mesh signature from LFRic for comparison, although this would -# # limit Iris' relevance outside MO. -# # TL;DR: unknown quantity. -# raise NotImplemented -# -# def __ne__(self, other): -# # See __eq__ -# raise NotImplemented -# -# def __str__(self): -# pass -# -# def __repr__(self): -# pass -# -# def __unicode__(self, ...): -# pass -# -# def __getstate__(self): -# pass -# -# def __setstate__(self, state): -# pass -# -# def xml_element(self): -# pass -# -# # the MeshCoord will always have bounds, perhaps points. However the MeshCoord.guess_points() may -# # be a very useful part of its behaviour. -# # after using MeshCoord.guess_points(), the user may wish to add the associated MeshCoord.points into -# # the Mesh as face_coordinates. -# -# def to_AuxCoord(self, location, axis): -# # factory method -# # return the lazy AuxCoord(...) for the given location and axis -# -# def to_AuxCoords(self, location): -# # factory method -# # return the lazy AuxCoord(...), AuxCoord(...) -# -# def to_MeshCoord(self, location, axis): -# # factory method -# # return MeshCoord(..., location=location, axis=axis) -# # use Connectivity.indices_by_src() for fetching indices. -# -# def to_MeshCoords(self, location): -# # factory method -# # return MeshCoord(..., location=location, axis="x"), MeshCoord(..., location=location, axis="y") -# # use Connectivity.indices_by_src() for fetching indices. -# -# def dimension_names_reset(self, node=False, face=False, edge=False): -# # reset to defaults like this (suggestion) -# -# def dimension_names(self, node=None, face=None, edge=None): -# # e.g., only set self.node iff node != None. these attributes will -# # always be set to a user provided string or the default string. -# # return a namedtuple of dict-like -# -# @property -# def cf_role(self): -# return "mesh_topology" -# -# @property -# def topology_dimension(self): -# """ -# read-only -# -# """ -# return self._metadata_manager.topology_dimension -# -# # -# # - validate coord_systems -# # - validate climatological -# # - use guess_coord_axis (iris.utils) -# # - others? -# # -# class _Mesh1DCoordinateManager: -# REQUIRED = ( -# "node_x", -# "node_y", -# ) -# OPTIONAL = ( -# "edge_x", -# "edge_y", -# ) -# def __init__(self, node_x, node_y, edge_x=None, edge_y=None): -# # required -# self.node_x = node_x -# self.node_y = node_y -# # optional -# self.edge_x = edge_x -# self.edge_y = edge_y -# -# # WOO-GA - this can easily get out of sync with the self attributes. -# # choose the container wisely e.g., could be an dict..., also the self -# # attributes may need to be @property's that access the chosen _members container -# self._members = [ ... ] -# -# def __iter__(self): -# for member in self._members: -# yield member -# -# def __getstate__(self): -# pass -# -# def __setstate__(self, state): -# pass -# -# def coord(self, **kwargs): -# # see Cube.coord for pattern, checking for a single result -# return self.coords(**kwargs)[0] -# -# def coords(self, ...): -# # see Cube.coords for relevant patterns -# # return [ ... ] -# pass -# -# def add(self, **kwargs): -# pass -# -# def remove(self, ...): -# # needs to respect the minimum UGRID contract -# # use logging/warning to flag items not removed - highlight in doc-string -# # don't raise an exception -# -# def __str__(self): -# pass -# -# def __repr__(self): -# pass -# -# def __eq__(self, other): -# # Full equality could be MASSIVE, so we want to avoid that. -# # Ideally we want a mesh signature from LFRic for comparison, although this would -# # limit Iris' relevance outside MO. -# # TL;DR: unknown quantity. -# raise NotImplemented -# -# def __ne__(self, other): -# # See __eq__ -# raise NotImplemented -# -# -# class _Mesh2DCoordinateManager(_Mesh1DCoordinateManager): -# OPTIONAL = ( -# "edge_x", -# "edge_y", -# "face_x", -# "face_y", -# ) -# def __init__(self, node_x, node_y, edge_x=None, edge_y=None, face_x=None, face_y=None): -# # optional -# self.face_x = face_x -# self.face_y = face_y -# -# super().__init__(node_x, node_y, edge_x=edge_x, edge_y=edge_y) -# -# # does the order matter? -# self._members.extend([self.face_x, self.face_y]) -# -# +class Mesh(CFVariableMixin): + """ + + .. todo:: + + .. questions:: + + - decide on the verbose/succinct version of __str__ vs __repr__ + + .. notes:: + + - the mesh is location agnostic + + - no need to support volume at mesh level, yet + + - topology_dimension + - use for fast equality between Mesh instances + - checking connectivity dimensionality, specifically the highest dimensonality of the + "geometric element" being added i.e., reference the src_location/tgt_location + - used to honour and enforce the minimum UGRID connectivity contract + + - support pickling + + - copy is off the table!! + + - MeshCoord.guess_points() + - MeshCoord.to_AuxCoord() + + - don't provide public methods to return the coordinate and connectivity + managers + + - validate both managers contents e.g., shape? more...? + + """ + + # TBD: for volume and/or z-axis support include axis "z" and/or dimension "3" + AXES = ("x", "y") + TOPOLOGY_DIMENSIONS = (1, 2) + + def __init__( + self, + topology_dimension, + node_coords_and_axes, + standard_name=None, + long_name=None, + var_name=None, + units=None, + attributes=None, + edge_coords_and_axes=None, + face_coords_and_axes=None, + # connectivities=None, + node_dimension=None, + edge_dimension=None, + face_dimension=None, + ): + # TODO: support volumes. + # TODO: support (coord, "z") + + self._metadata_manager = metadata_manager_factory(MeshMetadata) + + # topology_dimension is read-only, so assign directly to the metadata manager + if topology_dimension not in self.TOPOLOGY_DIMENSIONS: + emsg = f"Expected 'topology_dimension' in range {self.TOPOLOGY_DIMENSIONS!r}, got {topology_dimension!r}." + raise ValueError(emsg) + self._metadata_manager.topology_dimension = topology_dimension + + # TBD: these are strings, if None is provided then assign the default string. + self.node_dimension = node_dimension + self.edge_dimension = edge_dimension + self.face_dimension = face_dimension + + # assign the metadata to the metadata manager + self.standard_name = standard_name + self.long_name = long_name + self.var_name = var_name + self.units = units + self.attributes = attributes + + # based on the topology_dimension, create the appropriate coordinate manager + def normalise(location, axis): + result = str(axis).lower() + if result not in self.AXES: + emsg = f"Invalid axis specified for {location} coordinate {coord.name()!r}, got {axis!r}." + raise ValueError(emsg) + return f"{location}_{axis}" + + kwargs = {} + for coord, axis in node_coords_and_axes: + kwargs[normalise("node", axis)] = coord + if edge_coords_and_axes is not None: + for coord, axis in edge_coords_and_axes: + kwargs[normalise("edge", axis)] = coord + if face_coords_and_axes is not None: + for coord, axis in face_coords_and_axes: + kwargs[normalise("face", axis)] = coord + + # check the UGRID minimum requirement for coordinates + if "node_x" not in kwargs: + emsg = ( + "Require a node coordinate that is x-axis like to be provided." + ) + raise ValueError(emsg) + if "node_y" not in kwargs: + emsg = ( + "Require a node coordinate that is y-axis like to be provided." + ) + raise ValueError(emsg) + + if self.topology_dimension == 1: + self._coord_manager = _Mesh1DCoordinateManager(**kwargs) + elif self.topology_dimension == 2: + self._coord_manager = _Mesh2DCoordinateManager(**kwargs) + else: + emsg = f"Unsupported 'topology_dimension', got {topology_dimension!r}." + raise NotImplementedError(emsg) + + # based on the topology_dimension, create the appropriate connectivity manager + # self._connectivity_manager = ... + + def __eq__(self, other): + # TBD + return NotImplemented + + def __getstate__(self): + # TBD + pass + + def __ne__(self, other): + # TBD + return NotImplemented + + def __repr__(self): + # TBD + args = [] + return f"{self.__class__.__name__}({', '.join(args)})" + + def __setstate__(self, state): + # TBD + pass + + def __str__(self): + # TBD + args = [] + return f"{self.__class__.__name__}({', '.join(args)})" + + @property + def all_coords(self): + return self._coord_manager.all_members + + @property + def edge_dimension(self): + return self._edge_dimension + + @edge_dimension.setter + def edge_dimension(self, name): + if not name or not isinstance(name, str): + self._edge_dimension = f"Mesh{self.topology_dimension}d_edge" + else: + self._edge_dimension = name + + @property + def edge_coords(self): + return self._coord_manager.edge_coords + + @property + def face_dimension(self): + return self._face_dimension + + @face_dimension.setter + def face_dimension(self, name): + if not name or not isinstance(name, str): + self._face_dimension = f"Mesh{self.topology_dimension}d_face" + else: + self._face_dimension = name + + @property + def face_coords(self): + return self._coord_manager.face_coords + + @property + def node_dimension(self): + return self._node_dimension + + @node_dimension.setter + def node_dimension(self, name): + if not name or not isinstance(name, str): + self._node_dimension = f"Mesh{self.topology_dimension}d_node" + else: + self._node_dimension = name + + @property + def node_coords(self): + return self._coord_manager.node_coords + + # @property + # def all_connectivities(self): + # # return a namedtuple + # # conns = mesh.all_connectivities + # # conns.edge_node, conns.boundary_node + # pass + # + # @property + # def face_node_connectivity(self): + # # required + # return self._connectivity_manager.face_node + # + # @property + # def edge_node_connectivity(self): + # # optionally required + # return self._connectivity_manager.edge_node + # + # @property + # def face_edge_connectivity(self): + # # optional + # return self._connectivity_manager.face_edge + # + # @property + # def face_face_connectivity(self): + # # optional + # return self._connectivity_manager.face_face + # + # @property + # def edge_face_connectivity(self): + # # optional + # return self._connectivity_manager.edge_face + # + # @property + # def boundary_node_connectivity(self): + # # optional + # return self._connectivity_manager.boundary_node + + def add_coords( + self, + node_x=None, + node_y=None, + edge_x=None, + edge_y=None, + face_x=None, + face_y=None, + ): + self._coord_manager.add( + node_x=node_x, + node_y=node_y, + edge_x=edge_x, + edge_y=edge_y, + face_x=face_x, + face_y=face_y, + ) + + # def add_connectivities(self, *args): + # # this supports adding a new connectivity to the manager, but also replacing an existing connectivity + # self._connectivity_manager.add(*args) + + # def connectivities( + # self, + # name_or_coord=None, + # standard_name=None, + # long_name=None, + # var_name=None, + # attributes=None, + # node=False, + # edge=False, + # face=False, + # ): + # pass + + # def connectivity(self, ...): + # pass + + def coord( + self, + item=None, + standard_name=None, + long_name=None, + var_name=None, + attributes=None, + axis=None, + node=None, + edge=None, + face=None, + ): + return self._coord_manager.filter( + item=item, + standard_name=standard_name, + long_name=long_name, + var_name=var_name, + attributes=attributes, + axis=axis, + node=node, + edge=edge, + face=face, + ) + + def coords( + self, + item=None, + standard_name=None, + long_name=None, + var_name=None, + attributes=None, + axis=None, + node=False, + edge=False, + face=False, + ): + return self._coord_manager.filters( + item=item, + standard_name=standard_name, + long_name=long_name, + var_name=var_name, + attributes=attributes, + axis=axis, + node=node, + edge=edge, + face=face, + ) + + # def remove_connectivities(self, ...): + # # needs to respect the minimum UGRID contract + # self._connectivity_manager.remove(...) + + def remove_coords( + self, + item=None, + standard_name=None, + long_name=None, + var_name=None, + attributes=None, + axis=None, + node=None, + edge=None, + face=None, + ): + self._coord_manager.remove( + item=item, + standard_name=standard_name, + long_name=long_name, + var_name=var_name, + attributes=attributes, + axis=axis, + node=node, + edge=edge, + face=face, + ) + + def xml_element(self): + # TBD + pass + + # the MeshCoord will always have bounds, perhaps points. However the MeshCoord.guess_points() may + # be a very useful part of its behaviour. + # after using MeshCoord.guess_points(), the user may wish to add the associated MeshCoord.points into + # the Mesh as face_coordinates. + + # def to_AuxCoord(self, location, axis): + # # factory method + # # return the lazy AuxCoord(...) for the given location and axis + # + # def to_AuxCoords(self, location): + # # factory method + # # return the lazy AuxCoord(...), AuxCoord(...) + # + # def to_MeshCoord(self, location, axis): + # # factory method + # # return MeshCoord(..., location=location, axis=axis) + # # use Connectivity.indices_by_src() for fetching indices. + # + # def to_MeshCoords(self, location): + # # factory method + # # return MeshCoord(..., location=location, axis="x"), MeshCoord(..., location=location, axis="y") + # # use Connectivity.indices_by_src() for fetching indices. + + def dimension_names_reset(self, node=False, edge=False, face=False): + if node: + self.node_dimension = None + if edge: + self.edge_dimension = None + if face: + self.face_dimension = None + if self.topology_dimension == 1: + result = Mesh1DNames(self.node_dimension, self.edge_dimension) + else: + result = Mesh2DNames( + self.node_dimension, self.edge_dimension, self.face_dimension + ) + return result + + def dimension_names(self, node=None, edge=None, face=None): + if node: + self.node_dimension = node + if edge: + self.edge_dimension = edge + if face: + self.face_dimension = face + if self.topology_dimension == 1: + result = Mesh1DNames(self.node_dimension, self.edge_dimension) + else: + result = Mesh2DNames( + self.node_dimension, self.edge_dimension, self.node_dimension + ) + return result + + @property + def cf_role(self): + return "mesh_topology" + + @property + def topology_dimension(self): + return self._metadata_manager.topology_dimension + + +class _Mesh1DCoordinateManager: + """ + + TBD: require clarity on coord_systems validation + TBD: require clarity on __eq__ support + TBD: rationalise self.coords() logic with other manager and Cube + + """ + + REQUIRED = ( + "node_x", + "node_y", + ) + OPTIONAL = ( + "edge_x", + "edge_y", + ) + + def __init__(self, node_x, node_y, edge_x=None, edge_y=None): + # initialise all the coordinates + self.ALL = self.REQUIRED + self.OPTIONAL + self._members = {member: None for member in self.ALL} + + # required coordinates + self.node_x = node_x + self.node_y = node_y + # optional coordinates + self.edge_x = edge_x + self.edge_y = edge_y + + def __eq__(self, other): + # TBD + return NotImplemented + + def __getstate__(self): + # TBD + pass + + def __iter__(self): + for item in self._members.items(): + yield item + + def __ne__(self, other): + # TBD + return NotImplemented + + def __repr__(self): + args = [ + f"{member}={coord!r}" + for member, coord in self + if coord is not None + ] + return f"{self.__class__.__name__}({', '.join(args)})" + + def __setstate__(self, state): + # TBD + pass + + def __str__(self): + args = [ + f"{member}=True" for member, coord in self if coord is not None + ] + return f"{self.__class__.__name__}({', '.join(args)})" + + @staticmethod + def _filters( + members, + item=None, + standard_name=None, + long_name=None, + var_name=None, + attributes=None, + axis=None, + ): + """ + TDB: support coord_systems? + + """ + name = None + coord = None + + if isinstance(item, str): + name = item + else: + coord = item + + if name is not None: + members = {k: v for k, v in members.items() if v.name() == name} + + if standard_name is not None: + members = { + k: v + for k, v in members.items() + if v.standard_name == standard_name + } + + if long_name is not None: + members = { + k: v for k, v in members.items() if v.long_name == long_name + } + + if var_name is not None: + members = { + k: v for k, v in members.items() if v.var_name == var_name + } + + if axis is not None: + axis = axis.upper() + members = { + k: v for k, v in members.items() if guess_coord_axis(v) == axis + } + + if attributes is not None: + if not isinstance(attributes, Mapping): + emsg = ( + "The attributes keyword was expecting a dictionary " + f"type, but got a {type(attributes)} instead." + ) + raise ValueError(emsg) + + def _filter(coord): + return all( + k in coord.attributes + and _hexdigest(coord.attributes[k]) == _hexdigest(v) + for k, v in attributes.items() + ) + + members = {k: v for k, v in members.items() if _filter(v)} + + if coord is not None: + if hasattr(coord, "__class__") and coord.__class__ in ( + CoordMetadata, + DimCoordMetadata, + ): + target_metadata = coord + else: + target_metadata = coord.metadata + + members = { + k: v + for k, v in members.items() + if v.metadata == target_metadata + } + + return members + + def _remove(self, **kwargs): + result = {} + members = self.filters(**kwargs) + + for member in members.keys(): + if member in self.REQUIRED: + dmsg = f"Ignoring request to remove required coordinate {member!r}" + logger.debug(dmsg, extra=dict(cls=self.__class__.__name__)) + else: + result[member] = members[member] + setattr(self, member, None) + + return result + + def _setter(self, location, axis, coord, shape): + axis = axis.lower() + member = f"{location}_{axis}" + + # enforce the UGRID minimum coordinate requirement + if location == "node" and coord is None: + emsg = ( + f"{member!r} is a required coordinate, cannot set to 'None'." + ) + raise ValueError(emsg) + + if coord is not None: + if not isinstance(coord, AuxCoord): + emsg = f"{member!r} requires to be an 'AuxCoord', got {type(coord)}." + raise TypeError(emsg) + + guess_axis = guess_coord_axis(coord) + + if guess_axis and guess_axis.lower() != axis: + emsg = f"{member!r} requires a {axis}-axis like 'AuxCoord', got a {guess_axis.lower()}-axis like." + raise TypeError(emsg) + + if coord.climatological: + emsg = f"{member!r} cannot be a climatological 'AuxCoord'." + raise TypeError(emsg) + + if shape is not None and coord.shape != shape: + emsg = f"{member!r} requires to have shape {shape!r}, got {coord.shape!r}." + raise ValueError(emsg) + + self._members[member] = coord + + def _shape(self, location): + coord = getattr(self, f"{location}_x") + shape = coord.shape if coord is not None else None + if shape is None: + coord = getattr(self, f"{location}_y") + if coord is not None: + shape = coord.shape + return shape + + @property + def _edge_shape(self): + return self._shape(location="edge") + + @property + def _node_shape(self): + return self._shape(location="node") + + @property + def all_members(self): + return Mesh1DCoords(**self._members) + + @property + def edge_coords(self): + return MeshEdgeCoords(edge_x=self.edge_x, edge_y=self.edge_y) + + @property + def edge_x(self): + return self._members["edge_x"] + + @edge_x.setter + def edge_x(self, coord): + self._setter( + location="edge", axis="x", coord=coord, shape=self._edge_shape + ) + + @property + def edge_y(self): + return self._members["edge_y"] + + @edge_y.setter + def edge_y(self, coord): + self._setter( + location="edge", axis="y", coord=coord, shape=self._edge_shape + ) + + @property + def node_coords(self): + return MeshNodeCoords(node_x=self.node_x, node_y=self.node_y) + + @property + def node_x(self): + return self._members["node_x"] + + @node_x.setter + def node_x(self, coord): + self._setter( + location="node", axis="x", coord=coord, shape=self._node_shape + ) + + @property + def node_y(self): + return self._members["node_y"] + + @node_y.setter + def node_y(self, coord): + self._setter( + location="node", axis="y", coord=coord, shape=self._node_shape + ) + + def _add(self, coords): + member_x, member_y = coords._fields + + # deal with the special case where both members are changing + if coords[0] is not None and coords[1] is not None: + cache_x = self._members[member_x] + cache_y = self._members[member_y] + self._members[member_x] = None + self._members[member_y] = None + + try: + setattr(self, member_x, coords[0]) + setattr(self, member_y, coords[1]) + except (TypeError, ValueError): + # restore previous valid state + self._members[member_x] = cache_x + self._members[member_y] = cache_y + # now, re-raise the exception + raise + else: + # deal with the case where one or no member is changing + if coords[0] is not None: + setattr(self, member_x, coords[0]) + if coords[1] is not None: + setattr(self, member_y, coords[1]) + + def add(self, node_x=None, node_y=None, edge_x=None, edge_y=None): + """ + use self.remove(edge_x=True) to remove a coordinate e.g., using the + pattern self.add(edge_x=None) will not remove the edge_x coordinate + + """ + self._add(MeshNodeCoords(node_x, node_y)) + self._add(MeshEdgeCoords(edge_x, edge_y)) + + def filter(self, **kwargs): + result = self.filters(**kwargs) + + if len(result) > 1: + names = ", ".join( + f"{member}={coord!r}" for member, coord in result.items() + ) + emsg = ( + f"Expected to find exactly 1 coordinate, but found {len(result)}. " + f"They were: {names}." + ) + raise CoordinateNotFoundError(emsg) + + if len(result) == 0: + item = kwargs["item"] + if item is not None: + if not isinstance(item, str): + item = item.name() + name = ( + item + or kwargs["standard_name"] + or kwargs["long_name"] + or kwargs["var_name"] + or None + ) + name = "" if name is None else f"{name!r} " + emsg = ( + f"Expected to find exactly 1 {name}coordinate, but found none." + ) + raise CoordinateNotFoundError(emsg) + + return result + + def filters( + self, + item=None, + standard_name=None, + long_name=None, + var_name=None, + attributes=None, + axis=None, + node=None, + edge=None, + face=None, + ): + # rationalise the tri-state behaviour + args = [node, edge, face] + state = not any(set(filter(lambda arg: arg is not None, args))) + node, edge, face = map( + lambda arg: arg if arg is not None else state, args + ) + + def func(args): + return args[1] is not None + + members = {} + if node: + members.update( + dict(filter(func, self.node_coords._asdict().items())) + ) + if edge: + members.update( + dict(filter(func, self.edge_coords._asdict().items())) + ) + if hasattr(self, "face_coords"): + if face: + members.update( + dict(filter(func, self.face_coords._asdict().items())) + ) + else: + dmsg = "Ignoring request to filter non-existent 'face_coords'" + logger.debug(dmsg, extra=dict(cls=self.__class__.__name__)) + + result = self._filters( + members, + item=item, + standard_name=standard_name, + long_name=long_name, + var_name=var_name, + attributes=attributes, + axis=axis, + ) + + return result + + def remove( + self, + item=None, + standard_name=None, + long_name=None, + var_name=None, + attributes=None, + axis=None, + node=None, + edge=None, + ): + return self._remove( + item=item, + standard_name=standard_name, + long_name=long_name, + var_name=var_name, + attributes=attributes, + axis=axis, + node=node, + edge=edge, + ) + + +class _Mesh2DCoordinateManager(_Mesh1DCoordinateManager): + OPTIONAL = ( + "edge_x", + "edge_y", + "face_x", + "face_y", + ) + + def __init__( + self, + node_x, + node_y, + edge_x=None, + edge_y=None, + face_x=None, + face_y=None, + ): + super().__init__(node_x, node_y, edge_x=edge_x, edge_y=edge_y) + + # optional coordinates + self.face_x = face_x + self.face_y = face_y + + @property + def _face_shape(self): + return self._shape(location="face") + + @property + def all_members(self): + return Mesh2DCoords(**self._members) + + @property + def face_coords(self): + return MeshFaceCoords(face_x=self.face_x, face_y=self.face_y) + + @property + def face_x(self): + return self._members["face_x"] + + @face_x.setter + def face_x(self, coord): + self._setter( + location="face", axis="x", coord=coord, shape=self._face_shape + ) + + @property + def face_y(self): + return self._members["face_y"] + + @face_y.setter + def face_y(self, coord): + self._setter( + location="face", axis="y", coord=coord, shape=self._face_shape + ) + + def add( + self, + node_x=None, + node_y=None, + edge_x=None, + edge_y=None, + face_x=None, + face_y=None, + ): + super().add(node_x=node_x, node_y=node_y, edge_x=edge_x, edge_y=edge_y) + self._add(MeshFaceCoords(face_x, face_y)) + + def remove( + self, + item=None, + standard_name=None, + long_name=None, + var_name=None, + attributes=None, + axis=None, + node=None, + edge=None, + face=None, + ): + return self._remove( + item=item, + standard_name=standard_name, + long_name=long_name, + var_name=var_name, + attributes=attributes, + axis=axis, + node=node, + edge=edge, + face=face, + ) + + # # keep an eye on the __init__ inheritance # class _Mesh1DConnectivityManager: # REQUIRED = (