"""Implements the LabelledDataGroup, which allows to handle groups and
containers that can be associated with further coordinates.
This imitates the xarray selection interface and provides a uniform interface
to select data from these groups. Most importantly, it allows to combine all
the data of one group, allowing to conveniently work with heterogeneously
stored data.
"""
import logging
from typing import Tuple, Dict, Union, List
import numpy as np
import xarray as xr
from . import OrderedDataGroup
from ..abc import AbstractDataContainer
from ..containers import XrDataContainer
from ..utils import extract_coords
from ..utils.coords import TCoord, TCoordsDict, TDims
from ..tools import apply_along_axis
# Local constants
log = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
[docs]class LabelledDataGroup(OrderedDataGroup):
"""A group that assumes that the members it contains can be labelled
with dimension names and coordinates.
Such a group has the great benefit to provide a selection interface that
works fully on the dimension labels and coordinates and can cooperate with
the xarray selection interface, i.e. the ``sel`` and ``isel`` methods.
"""
# Let new containers be xarray-based
_NEW_CONTAINER_CLS = XrDataContainer
# Configuration options for this group ....................................
# Whether to use deep selection by default
LDG_ALLOW_DEEP_SELECTION = True
# The dimensions of this group, i.e. the dimensions of the space covered by
# the members of this group.
LDG_DIMS = tuple()
# How to extract coordinates of the members; for available modes, see the
# `dantro.utils.coords.extract_coords` function.
LDG_EXTRACT_COORDS_FROM = 'data'
# Configuration for mode 'attrs' . . . . . . . . . . . . . . . . . . . . .
LDG_COORDS_ATTR_PREFIX = 'ext_coords__'
LDG_COORDS_MODE_ATTR_PREFIX = 'ext_coords_mode__'
LDG_COORDS_MODE_DEFAULT = 'scalar'
LDG_STRICT_ATTR_CHECKING = False
# Configuration for mode 'name' . . . . . . . . . . . . . . . . . . . . . .
LDG_COORDS_SEPARATOR_IN_NAME = ';'
# .........................................................................
[docs] def __init__(self, *args, dims: TDims=None,
allow_deep_selection: bool=None, **kwargs):
"""Initialize a LabelledDataGroup
Args:
*args: Passed on to
:py:class:`~dantro.groups.ordered.OrderedDataGroup`
dims (TDims, optional): The dimensions associated with this group.
If not given, will use those defined in the ``LDG_DIMS`` class
variable. These can *not* be changed afterwards!
allow_deep_selection (bool, optional): Whether to allow deep
selection. If not given, will use the
``LDG_ALLOW_DEEP_SELECTION`` class variable's value. Behaviour
can be changed via the property of the same name.
**kwargs: Passed on to
:py:class:`~dantro.groups.ordered.OrderedDataGroup`
"""
# Initialize the member map, which is needed if containers are added
# during initialization (thus invoking _add_container_callback)
self.__member_map = None
super().__init__(*args, **kwargs)
self._dims = dims if dims is not None else tuple(self.LDG_DIMS)
self._allow_deep_selection = self.LDG_ALLOW_DEEP_SELECTION
if allow_deep_selection is not None:
self._allow_deep_selection = allow_deep_selection
# Dimension and coordinates ...............................................
@property
def dims(self) -> Tuple[str]:
"""The names of the group-level dimensions this group manages.
It _may_ contain dimensions that overlap with dimension names from the
members; this is intentional.
"""
return self._dims
@property
def ndim(self) -> int:
"""The rank of the space covered by the group-level dimensions."""
return len(self.dims)
@property
def coords(self) -> Dict[str, List[TCoord]]:
"""Returns a dict-like container of group-level coordinates.
The coordinates are calculated by iterating over all members and
aggregating their individual coordinates. Once the member map is
available, information is retrieved from there rather than
recalculating it.
"""
if self.member_map_available:
return self.member_map.coords
# Need to collect them from the members; set guarantee uniqueness
coords = {dim_name: set() for dim_name in self.dims}
for cont_name, cont in self.items():
cont_coords = self._get_coords_of(cont)
for dim_name, coord_vals in cont_coords.items():
coords[dim_name].update(coord_vals)
# Convert to dict of lists
return {dim_name: sorted(list(s)) for dim_name, s in coords.items()}
@property
def shape(self) -> Tuple[int]:
"""Return the shape of the space covered by the group-level dimensions.
This will be calculated from the available coordinates. Once the
member map is available, information is retrieved from there rather
than recalculating it.
"""
if self.member_map_available:
return self.member_map.shape
# Need to derive it from the coordinates
coords = self.coords
return tuple([len(coords[dim_name]) for dim_name in self.dims])
# Additional properties ...................................................
@property
def allow_deep_selection(self) -> bool:
"""Whether deep selection is allowed."""
return self._allow_deep_selection
@allow_deep_selection.setter
def allow_deep_selection(self, val: bool):
"""Change whether deep selection is allowed."""
self._allow_deep_selection = val
@property
def member_map(self) -> xr.DataArray:
"""Returns an array that represents the space that the members of this
group span, where each value (i.e. a specific coordinate combination)
is the name of the corresponding member of this group.
Upon first call, this is computed here. If members are added, it is
tried to accomodate them in there; if not possible, the cache will be
invalidated.
The member map _may_ include empty strings, i.e. coordinate
combinations that are not covered by any member. Also, they can contain
duplicate names, as one member can cover multiple coordinates.
.. note::
The member map is invalidated when new members are added that can
not be accomodated in it. It will be recalculated when needed.
"""
if self.member_map_available:
return self.__member_map
# Create an empty DataArray of strings, using the existing dimension
# names and coordinates to label it
mm = xr.DataArray(data=np.zeros(self.shape, dtype='<U255'),
dims=self.dims, coords=self.coords)
# Iterate over members and populate the array with member names
for name, cont in self.items():
coords = self._get_coords_of(cont)
# These coordinates describe a hypercube in coordinate space that
# is to be associated with this container. Thus, the member map
# should contain the name of the member for all these coordinates:
mm.loc[coords] = name
# Cache the map and return it
self.__member_map = mm
return mm
@property
def member_map_available(self) -> bool:
"""Whether the member map is available yet."""
return (self.__member_map is not None)
# Selection interface .....................................................
[docs] def isel(self, indexers: dict=None, *, drop: bool=False,
combination_method: str='try_concat', deep: bool=None,
**indexers_kwargs) -> xr.DataArray:
"""Return a new labelled `xr.DataArray` with an index-selected subset
of members of this group.
If deep selection is activated, those indexers that are not available
in the group-managed dimensions are looked up in the members of this
group.
Args:
indexers (dict, optional): A dict with keys matching dimensions and
values given by scalars, slices or arrays of tick indices.
As `xr.DataArray.sel`, uses pandas-like indexing, i.e.: slices
include the terminal value.
drop (bool, optional): Drop coordinate variables instead of making
them scalar.
combination_method (str, optional): How to combine group-level data
with member-level data. Can be:
* ``concat``: Concatenate. This can preserve the dtype, but
requires that no data is missing.
* ``merge``: Merge, using `xarray.merge`. This leads to a
type conversion to ``float64``, but allows members
being missing or coordinates not fully filling the
available space.
* ``try_concat``: Try concatenation, fall back to merging
if that was unsuccessful.
deep (bool, optional): Whether to allow deep indexing, i.e.: that
``indexers`` may contain dimensions that don't refer to group-
level dimensions but to dimensions that are only availble among
the member data. If ``None``, will use the value returned by
the ``allow_deep_selection`` property.
**indexers_kwargs: Additional indexers
Returns:
xr.DataArray: The selected data, potentially a combination of data
on group level and member-level data.
"""
idxrs, deep_idxrs = self._parse_indexers(indexers, allow_deep=deep,
**indexers_kwargs)
# Use the (shallow) indexers to select (by index) those members that
# are to be combined ...
tbc = self.member_map.isel(idxrs, drop=drop)
# If only a single item remains, pass deep indexers on to it
if tbc.size == 1:
cont = self[tbc.item()]
if not deep_idxrs:
return cont
return cont.isel(deep_idxrs, drop=drop)
# Now, combine them, potentially also applying deep indexing
return self._combine(tbc, combination_method=combination_method,
deep_indexers=deep_idxrs, by_index=True,
drop=drop)
[docs] def sel(self, indexers: dict=None, *, method: str=None,
tolerance: float=None, drop: bool=False,
combination_method: str='try_concat', deep: bool=None,
**indexers_kwargs) -> xr.DataArray:
"""Return a new labelled `xr.DataArray` with a coordinate-selected
subset of members of this group.
If deep selection is activated, those indexers that are not available
in the group-managed dimensions are looked up in the members of this
group.
Args:
indexers (dict, optional): A dict with keys matching dimensions and
values given by scalars, slices or arrays of tick labels.
As `xr.DataArray.sel`, uses pandas-like indexing, i.e.: slices
include the terminal value.
method (str, optional): Method to use for inexact matches
tolerance (float, optional): Maximum (absolute) distance between
original and given label for inexact matches.
drop (bool, optional): Drop coordinate variables instead of making
them scalar.
combination_method (str, optional): How to combine group-level data
with member-level data. Can be:
* ``concat``: Concatenate. This can preserve the dtype, but
requires that no data is missing.
* ``merge``: Merge, using `xarray.merge`. This leads to a
type conversion to ``float64``, but allows members
being missing or coordinates not fully filling the
available space.
* ``try_concat``: Try concatenation, fall back to merging
if that was unsuccessful.
deep (bool, optional): Whether to allow deep indexing, i.e.: that
``indexers`` may contain dimensions that don't refer to group-
level dimensions but to dimensions that are only availble among
the member data. If ``None``, will use the value returned by
the ``allow_deep_selection`` property.
**indexers_kwargs: Additional indexers
Returns:
xr.DataArray: The selected data, potentially a combination of data
on group level and member-level data.
"""
idxrs, deep_idxrs = self._parse_indexers(indexers, allow_deep=deep,
**indexers_kwargs)
# Use the (shallow) indexers to select (by label) those members that
# are to be combined ...
tbc = self.member_map.sel(idxrs, method=method,
tolerance=tolerance, drop=drop)
# If only a single item remains, pass deep indexers on to it
if tbc.size == 1:
cont = self[tbc.item()]
if not deep_idxrs:
return cont
return cont.sel(deep_idxrs, method=method,
tolerance=tolerance, drop=drop)
# Now, combine them, potentially also applying deep indexing
return self._combine(tbc, combination_method=combination_method,
deep_indexers=deep_idxrs, by_index=False,
method=method, tolerance=tolerance, drop=drop)
# Helpers .................................................................
# General . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
[docs] def _get_coords_of(self, obj: AbstractDataContainer, *,
mode=None) -> TCoordsDict:
"""Extract the coordinates for the given object using the
`dantro.utils.coords.extract_coords` function.
Args:
obj (AbstractDataContainer): The object to get the coordinates of.
mode (None, optional): By which coordiante extraction mode to get
the coordinates from the object. Can be ``attrs``, ``name``,
``data`` or anything else specified in
~`dantro.utils.coords.extract_coords`.
Returns:
TCoordsDict: The extracted coordinates
"""
# Depending on the mode, compile the dict of additional parameters
mode = mode if mode is not None else self.LDG_EXTRACT_COORDS_FROM
kwargs = dict()
if mode == 'attrs':
kwargs['coords_attr_prefix'] = self.LDG_COORDS_ATTR_PREFIX
kwargs['mode_attr_prefix'] = self.LDG_COORDS_MODE_ATTR_PREFIX
kwargs['default_mode'] = self.LDG_COORDS_MODE_DEFAULT
kwargs['strict'] = self.LDG_STRICT_ATTR_CHECKING
elif mode == 'name':
kwargs['separator'] = self.LDG_COORDS_SEPARATOR_IN_NAME
return extract_coords(obj, dims=self.dims, mode=mode, **kwargs)
[docs] def _add_container_callback(self, cont: AbstractDataContainer) -> None:
"""Called by the base class after adding a container, this method
checks whether the member map needs to be invalidated or whether the
new container can be accomodated in it.
If it can be accomodated, the member map will be adjusted such that for
all coordinates associated with the given ``cont``, the member map
points to the newly added container.
Args:
cont (AbstractDataContainer): The newly added container
Returns:
None: Description
"""
# First, let the parent class do its thinkg
super()._add_container_callback(cont)
# Don't have to do anything if there is no member map yet
if not self.member_map_available:
return
# There is a map. Check if it can accomodate the new container
coords = self._get_coords_of(cont)
try:
self.__member_map.loc[coords] = cont.name
except Exception:
# Cannot accomodate it -> invalidate it
self.__member_map = None
# For selection . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
[docs] def _parse_indexers(self, indexers: dict, *, allow_deep: bool,
**indexers_kwargs) -> Tuple[dict, dict]:
"""Parses the given indexer arguments and split them into indexers for
the selection of group members and deep selection.
Args:
indexers (dict): The indexers dict, may be empty
allow_deep (bool): Whether to allow deep selection
**indexers_kwargs: Additional indexers
Returns:
Tuple[dict, dict]: (shallow indexers, deep indexers)
Raises:
ValueError: If deep indexers were given but deep selection was not
enabled
"""
allow_deep = (allow_deep if allow_deep is not None
else self.allow_deep_selection)
idxrs = dict(**(indexers if indexers else {}), **indexers_kwargs)
# Split by those for deep selection and those for this group
deep_idxrs = {k: v for k, v in idxrs.items() if k not in self.dims}
idxrs = {k: v for k, v in idxrs.items() if k in self.dims}
if deep_idxrs and not allow_deep:
raise ValueError("Deep indexing is not allowed for {}, but got "
"indexers that don't match any of its dimension "
"names: {}. You can change this behavior using "
"the allow_deep_selection property or the class "
"variable LDG_ALLOW_DEEP_SELECTION."
"".format(self.logstr, ", ".join(self.dims)))
return idxrs, deep_idxrs
[docs] def _combine(self, cont_names: xr.DataArray, *, combination_method: str,
deep_indexers: dict, by_index: bool,
**sel_kwargs) -> xr.Dataset:
"""Combine the given objects by the specified method. If deep indexers
are given, apply the deep indexing on each of the members.
This method receives a labelled array of container names, on which the
selection already took place. The aim is now to align the objects these
names refer to, including their coordinates, and thereby construct an
array that contains both the dimensions given by the ``cont_names``
array and each members' data dimensions.
Available combination methods are based either on `xarray.merge`
operations or `xarray.concat` along each dimension. For both these
combination methods, the members of this group need to be prepared such
that the operation can be applied, i.e.: they need to already be in an
array capable of that operation and they need to directly or indirectly
preserve coordinate information.
For that purpose, an object-array is constructed that has the same
shape as the given ``cont_names``. As the `xarray.Dataset` and
`xarray.DataArray` types have issues with handling array-like objects
in object arrays, this is done via a `numpy.ndarray`.
Args:
cont_names (xr.DataArray): The pre-selected member map object, i.e.
a labelled array containing names of the desired members that
are to be combined.
combination_method (str): How to combine them: concat, try_concat,
or merge. Concatenation will allow preserving the dtype of the
underlying data.
deep_indexers (dict): Whether any further indexing is to take place
before combination.
by_index (bool): Whether the deep indexing should take place by
index; if False, will use label-based selection.
**sel_kwargs: Passed on to ``.sel`` or ``.isel``.
Returns:
xr.Dataset: The data of the members from ``cont_names``, combined
using the given combination method.
Raises:
ValueError: Invalid combination method
KeyError: In ``concat`` mode, upon missing members.
"""
def get_cont(name: str, combination_method: str
) -> Tuple[Union[XrDataContainer, None], str]:
"""Retrieve the container from the group, potentially changing the
combination method. If no container could be found, returns None,
which denotes that further processing should be skipped
"""
try:
cont = self[name]
except KeyError as err:
if combination_method != 'concat':
# Failing is ok. But cannot do anything else here
return None, combination_method
# Otherwise, should raise!
raise KeyError("Could not find a member named '{}' in {}, but "
"need it for concatenation! Make sure that the "
"member can be found under this name or change "
"the combination method to 'merge' or "
"'try_concat'.".format(name, self.logstr)
) from err
else:
return cont, combination_method
def process_cont(cont, coords) -> Tuple[xr.DataArray, dict]:
"""Process the given container and coordinates into a data array;
this also applies the deep selection.
"""
# Apply the coordinates of the overlapping dimensions
# (Does nothing if there are no overlapping dimensions)
darr = cont.sel({dim: coord for dim, coord in coords.items()
if dim in cont.dims},
drop=False)
# This is to ensure that the array that is used matches only a
# single coordinate combination, i.e. one _point_ in the space
# spanned by self.member_map.coords.
# This operation _might_ increase memory usage temporarily, because
# data from the same member is accessed multiple times and only one
# coordinate combination is extracted from it (instead of selecting
# all at once; which would however require a different architecture
# and not allow using the convenient xarray interface).
# Apply the deep indexers
if by_index:
darr = darr.isel(deep_indexers, **sel_kwargs)
else:
darr = darr.sel(deep_indexers, **sel_kwargs)
# For the following, the container coordinates may not contain
# any dimension names that are overlapping with those of the group
coords = {dim_name: coords for dim_name, coords in coords.items()
if dim_name not in darr.dims}
return darr, coords
dsets = np.zeros(cont_names.shape, dtype='object')
dsets.fill(dict()) # placeholders, ignored in xr.merge
# Create an iterator over the container names (mirrors dsets iteration)
names_iter = np.nditer(cont_names, flags=('multi_index', 'refs_ok'))
for name in names_iter:
# Get the corresponding member container, potentially changing the
# combination method
cont, combination_method = get_cont(name.item(),
combination_method)
# Might not have been found; go to the next iteration
if cont is None:
continue
# Get the coordinates for this member container and further process
# the container into a DataArray
coords = cont_names[names_iter.multi_index].coords
darr, coords = process_cont(cont, coords)
# As it's easier to work on xr.Datasets than on xr.DataArray-like
# objects, create a dataset from the container, using a temporary
# name which will later be used to resolve it back to a DataArray
dset = darr.to_dataset(name='_tmp_dset_name')
# Now, need to expand the dimensions to accomodate the coordinates.
# Add the new dimensions in front. (Important for concatenation!)
dset = dset.expand_dims(dim=list(coords.keys()))
# NOTE While this creates a non-shallow copy of the data, there is
# no other way of doing this: a copy can only be avoided if
# the DataArray can re-use the existing variables – for the
# changes it needs to do to expand the dims, however, it will
# necessarily need to create a copy of the original data.
# Thus, we might as well let xarray take care of that instead
# of bothering with that ourselves ...
# ...and assign coordinates to them (shallow copy of existing dset)
dset = dset.assign_coords(**{k: [v] for k, v in coords.items()})
# Done. Store it in the object-array of datasets
dsets[names_iter.multi_index] = dset
# Now ready to combine them.
if combination_method == 'concat':
dset = self._combine_by_concatenation(dsets,
dims=cont_names.dims)
elif combination_method == 'merge':
dset = self._combine_by_merge(dsets)
elif combination_method == 'try_concat':
try:
dset = self._combine_by_concatenation(dsets,
dims=cont_names.dims)
except Exception as exc:
# NOTE The exception is now something other than a member
# missing, i.e. some numerical issue during concatenation
# Try again with merging ...
log.warning("Failed concatenation with %s: %s",
exc.__class__.__name__, exc)
dset = self._combine_by_merge(dsets)
else:
raise ValueError("Invalid combination_method argument: {}! "
"Available methods: try_concat, concat, merge."
"".format(combination_method))
# Combined into one dataset now, with '_tmp_dset_name' data variable...
# Convert back into a DataArray; can drop the temporary name now.
darr = dset['_tmp_dset_name']
darr.name = None
return darr
[docs] @classmethod
def _combine_by_merge(cls, dsets: np.ndarray) -> xr.Dataset:
"""Combine the given datasets by merging using `xarray.merge`.
Args:
dsets (np.ndarray): The object-dtype array of xr.Datasets that are
to be combined.
Returns:
xr.Dataset: All datasets, aligned and combined via `xarray.merge`
"""
log.debug("Combining %d datasets by merging ...", dsets.size)
dset = xr.merge(dsets.flat)
log.debug("Merge successful.")
return dset
[docs] @classmethod
def _combine_by_concatenation(cls, dsets: np.ndarray, *,
dims: TDims) -> xr.Dataset:
"""Combine the given datasets by concatenation using `xarray.concat`
and subsequent application along all dimensions specified in ``dims``.
Args:
dsets (np.ndarray): The object-dtype array of xr.Dataset objects
that are to be combined by concatenation.
dims (TDims): The dimension names corresponding to _all_ the
dimensions of the ``dsets`` array.
Returns:
xr.Dataset: The dataset resulting from the concatenation
"""
log.debug("Combining %d datasets by concatenation along %d "
"dimension%s ...", dsets.size, len(dsets.shape),
"s" if len(dsets.shape) != 1 else "")
# Go over all dimensions and concatenate
# This effectively reduces the dsets array by one dimension in each
# iteration by applying the xr.concat function along the axis
# NOTE np.apply_along_axis would be what is desired here, but that
# function unfortunately tries to cast objects to np.arrays
# which is not what we want here at all!
# Thus, there is one implemented in dantro.tools ...
for dim_idx, dim_name in reversed(list(enumerate(dims))):
log.debug("Concatenating along axis '%s' (axis # %d) ...",
dim_name, dim_idx)
dsets = apply_along_axis(xr.concat, axis=dim_idx, arr=dsets,
dim=dim_name)
log.debug("Concatenation successful.")
# The single item in the now scalar array is the combined xr.Dataset
return dsets.item()