dantro.data_ops package#

Implements the data operations database of dantro, which is used in the data transformation framework to apply transformations on data using TransformationDAG.

isort:skip_file

Submodules#

dantro.data_ops._base_ops module#

Implements operations that need to be importable from other modules or that are so basic that they apply to a wide range of applications.

BOOLEAN_OPERATORS = {'!=': <built-in function ne>, '<': <built-in function lt>, '<=': <built-in function le>, '==': <built-in function eq>, '>': <built-in function gt>, '>=': <built-in function ge>, '^': <built-in function xor>, 'contains': <built-in function contains>, 'eq': <built-in function eq>, 'ge': <built-in function ge>, 'gt': <built-in function gt>, 'in': <function <lambda>>, 'in interval': <function <lambda>>, 'le': <built-in function le>, 'lt': <built-in function lt>, 'ne': <built-in function ne>, 'not in': <function <lambda>>, 'not in interval': <function <lambda>>, 'xor': <built-in function xor>}#

Boolean binary operators

_make_passthrough(func: Callable) Callable[source]#

Wraps a callable such that it returns its first positional argument.

This is meant for functions that operate on an object (conventionally the first argument) and do not have a return value. By constructing a callable using this function, it can be made compatible with the data transformation framework.

f = setattr                 # f has no return value
g = _make_passtrough(f)     # g will return the first argument

dantro.data_ops.apply module#

Implements the application of operations on the given arguments and data

apply_operation(op_name: str, *op_args, _ops: Optional[dict] = None, _log_level: int = 5, **op_kwargs) Any[source]#

Apply an operation with the given arguments and then return its return value. This is used by the Data Transformation Framework and allows to invoke operations from the data operations database, see Data Processing.

Parameters
  • op_name (str) – The name of the operation to carry out; need to be part of the operations database _ops.

  • *op_args – The positional arguments to the operation

  • _ops (dict, optional) – The operations database object to use; if None, uses the dantro operations database.

  • _log_level (int, optional) – Log level of the log messages created by this function.

  • **op_kwargs – The keyword arguments to the operation

Returns

The result of the operation

Return type

Any

Raises

dantro.data_ops.arr_ops module#

Implements data operations that work on array-like data, e.g. from numpy or xarray.

apply_along_axis(func: Callable, axis: int, arr: ndarray, *args, **kwargs) ndarray[source]#

This is like numpy’s function of the same name, but does not try to cast the results of func to an numpy.ndarray but tries to keep them as dtype object. Thus, the return value of this function will always have one fewer dimension then the input array.

This goes along the equivalent formulation of numpy.apply_along_axis(), outlined in their documentation of the function.

Parameters
  • func (Callable) – The function to apply along the axis

  • axis (int) – Which axis to apply it to

  • arr (ndarray) – The array-like data to apply the function to

  • *args – Passed to func

  • **kwargs – Passed to func

Returns

with func applied along axis, reducing the array

dimensions by one.

Return type

ndarray

create_mask(data: DataArray, operator_name: str, rhs_value: float) DataArray[source]#

Given the data, returns a binary mask by applying the following comparison: data <operator> rhs value.

Parameters
  • data (DataArray) – The data to apply the comparison to. This is the left-hand-side of the comparison.

  • operator_name (str) – The name of the binary operator function as registered in the BOOLEAN_OPERATORS database.

  • rhs_value (float) – The right-hand-side value

Raises

KeyError – On invalid operator name

Returns

Boolean mask

Return type

DataArray

where(data: DataArray, operator_name: str, rhs_value: float, **kwargs) DataArray[source]#

Filter elements from the given data according to a condition. Only those elemens where the condition is fulfilled are not masked.

Note

This typically leads to a dtype change to numpy.float64.

Parameters
count_unique(data, dims: List[str] = None) DataArray[source]#

Applies numpy.unique() to the given data and constructs a xarray.DataArray for the results.

NaN values are filtered out.

Parameters
  • data – The data

  • dims (List[str], optional) – The dimensions along which to apply np.unique. The other dimensions will be available after the operation. If not provided it is applied along all dims.

populate_ndarray(objs: typing.Iterable, shape: typing.Optional[typing.Tuple[int]] = None, dtype: typing.Union[str, type, numpy.dtype] = <class 'float'>, order: str = 'C', out: typing.Optional[numpy.ndarray] = None, ufunc: typing.Optional[typing.Callable] = None) ndarray[source]#

Populates an empty numpy.ndarray of the given dtype with the given objects by zipping over a new array of the given shape and the sequence of objects.

Parameters
  • objs (Iterable) – The objects to add to the numpy.ndarray. These objects are added in the order they are given here. Note that their final position inside the resulting array is furthermore determined by the order argument.

  • shape (Tuple[int], optional) – The shape of the new array. Required if no out array is given.

  • dtype (Union[str, type, dtype], optional) – dtype of the new array. Ignored if out is given.

  • order (str, optional) – Order of the new array, determines iteration order. Ignored if out is given.

  • out (ndarray, optional) – If given, populates this array rather than an empty array.

  • ufunc (Callable, optional) – If given, applies this unary function to each element before storing it in the to-be-returned ndarray.

Returns

The populated out array or the newly created one (if

out was not given)

Return type

ndarray

Raises
  • TypeError – On missing shape argument if out is not given

  • ValueError – If the number of given objects did not match the array size

build_object_array(objs: Union[Dict, Sequence], *, dims: Tuple[str] = ('label',), fillna: Any = None) DataArray[source]#

Creates a simple labelled multidimensional object array.

It accepts simple iterable types like dictionaries or lists and unpacks them into the array, using the key or index (respectively) as coordinate for the entry. For dict-like entries, multi-dimensional coordinates can be specified by using tuples for keys. Subsequently, list-like iterable types (list, tuple etc.) will result in one-dimensional output array.

Warning

This data operation is built for flexibility, not for speed. It will call the merge() operation for every element in the objs iterable, thus being slow and potentially creating an array with many empty elements. To efficiently populate an n-dimensional object array, use the populate_ndarray() operation instead and build a labelled array from that output.

Parameters
  • objs (Union[Dict, Sequence]) – The objects to populate the object array with. If dict-like, keys are assumed to encode coordinates, which can be of the form coord0 or (coord0, coord1, …), where the tuple-form requires as many coordinates as there are entries in the dims argument. If list- or tuple-like (more exactly: if missing the items attribute) trivial indexing is used and dims needs to be 1D.

  • dims (Tuple[str], optional) – The names of the dimensions of the labelled array.

  • fillna (Any, optional) – The fill value for entries that are not covered by the dimensions specified by objs. Note that this will replace all null values, which includes NaN but also None. This operation is only called if fillna is not None.

Raises

ValueError – If coordinates and/or dims argument for individual entries did not match.

multi_concat(arrs: ndarray, *, dims: Sequence[str]) DataArray[source]#

Concatenates xarray.Dataset or xarray.DataArray objects using xarray.concat(). This function expects the xarray objects to be pre-aligned inside the numpy object array arrs, with the number of dimensions matching the number of concatenation operations desired. The position inside the array carries information on where the objects that are to be concatenated are placed inside the higher dimensional coordinate system.

Through multiple concatenation, the dimensionality of the contained objects is increased by dims, while their dtype can be maintained.

For the sequential application of xarray.concat() along the outer dimensions, the custom apply_along_axis() function is used.

Parameters
  • arrs (ndarray) – The array containing xarray objects which are to be concatenated. Each array dimension should correspond to one of the given dims. For each of the dimensions, the xarray.concat() operation is applied along the axis, effectively reducing the dimensionality of arrs to a scalar and increasing the dimensionality of the contained xarray objects until they additionally contain the dimensions specified in the dims argument.

  • dims (Sequence[str]) – A sequence of dimension names that is assumed to match the dimension names of the array. During each concatenation operation, the name is passed along to xarray.concat() where it is used to select the dimension of the content of arrs along which concatenation should occur.

Raises

ValueError – If number of dimension names does not match the number of data dimensions.

merge(arrs: Union[Sequence[Union[DataArray, Dataset]], ndarray], *, reduce_to_array: bool = False, **merge_kwargs) Union[Dataset, DataArray][source]#

Merges the given sequence of xarray objects into an xarray.Dataset.

As a convenience, this also allows passing a numpy.ndarray of dtype object containing the xarray objects. Furthermore, if the resulting xarray.Dataset contains only a single data variable, that variable can be extracted as a xarray.DataArray by setting the reduce_to_array flag, making that array the return value of this operation.

expand_dims(d: Union[ndarray, DataArray], *, dim: dict = None, **kwargs) DataArray[source]#

Expands the dimensions of the given object.

If the object does not support a expand_dims method call, it will be attempted to convert it to an xarray.DataArray first.

Parameters
  • d (Union[ndarray, DataArray]) – The object to expand the dimensions of

  • dim (dict, optional) – Keys specify the dimensions to expand, values can either be an integer specifying the length of the dimension, or a sequence of coordinates.

  • **kwargs – Passed on to the expand_dims method call. For an xarray objects that would be xarray.DataArray.expand_dims().

Returns

The input data with expanded dimensions.

Return type

DataArray

expand_object_array(d: DataArray, *, shape: Sequence[int] = None, astype: Union[str, type, dtype] = None, dims: Sequence[str] = None, coords: Union[dict, str] = 'trivial', combination_method: str = 'concat', allow_reshaping_failure: bool = False, **combination_kwargs) DataArray[source]#

Expands a labelled object-array that contains array-like objects into a higher-dimensional labelled array.

d is expected to be an array of arrays, i.e. each element of the outer array is an object that itself is an numpy.ndarray-like object. The shape is the expected shape of each of these inner arrays. Importantly, all these arrays need to have the exact same shape!

Typically, e.g. when loading data from HDF5 files, the inner array will not be labelled but will consist of simple numpy.ndarray objects. The arguments dims and coords are used to label the inner arrays.

This uses multi_concat() for concatenating or merge() for merging the object arrays into a higher-dimensional array, where the latter option allows for missing values.

Todo

Make reshaping and labelling optional if the inner array already is a labelled array. In such cases, the coordinate assignment is already done and all information for combination is already available.

Parameters
  • d (DataArray) – The labelled object-array containing further arrays as elements (which are assumed to be unlabelled).

  • shape (Sequence[int], optional) – Shape of the inner arrays. If not given, the first element is used to determine the shape.

  • astype (Union[str, type, dtype], optional) – All inner arrays need to have the same dtype. If this argument is given, the arrays will be coerced to this dtype. For numeric data, float is typically a good fallback. Note that with combination_method == "merge", the choice here might not be respected.

  • dims (Sequence[str], optional) – Dimension names for labelling the inner arrays. This is necessary for proper alignment. The number of dimensions need to match the shape. If not given, will use inner_dim_0 and so on.

  • coords (Union[dict, str], optional) – Coordinates of the inner arrays. These are necessary to align the inner arrays with each other. With coords = "trivial", trivial coordinates will be assigned to all dimensions. If specifying a dict and giving "trivial" as value, that dimension will be assigned trivial coordinates.

  • combination_method (str, optional) – The combination method to use to combine the object array. For concat, will use dantro’s multi_concat(), which preserves dtype but does not allow missing values. For merge, will use merge(), which allows missing values (masked using np.nan) but leads to the dtype decaying to float.

  • allow_reshaping_failure (bool, optional) – If true, the expansion is not stopped if reshaping to shape fails for an element. This will lead to missing values at the respective coordinates and the combination_method will automatically be changed to merge.

  • **combination_kwargs – Passed on to the selected combination function, multi_concat() or merge().

Returns

A new, higher-dimensional labelled array.

Return type

DataArray

Raises
  • TypeError – If no shape can be extracted from the first element in the input data d

  • ValueError – On bad argument values for dims, shape, coords or combination_method.

transform_coords(d: DataArray, dim: Union[str, Sequence[str]], func: Callable, *, func_kwargs: dict = None) DataArray[source]#

Assigns new, transformed coordinates to a data array by applying a function on the existing coordinates.

Uses xarray.DataArray.assign_coords() to set the new coordinates, which returns a shallow copy of the given object.

Parameters
  • d (DataArray) – The array to transform the dim coordinates of

  • dim (Union[str, Sequence[str]]) – The name or names of the coordinate dimension(s) to apply func to.

  • func (Callable) – The callable to apply to d.coords[dim]

  • func_kwargs (dict, optional) – Passed to the function invocation like func(d.coords[dim], **func_kwargs)

dantro.data_ops.ctrl_ops module#

Implements operations that control the behaviour of the transformation or pipeline in general, including functions that can be used for debugging.

raise_SkipPlot(cond: bool = True, *, reason: str = '', passthrough: Optional[Any] = None)[source]#

Raises SkipPlot to trigger that a plot is skipped without error, see Skipping Plots.

If cond is False, this will do nothing but return the passthrough.

Parameters
  • cond (bool, optional) – Whether to actually raise the exception

  • reason (str, optional) – The reason for skipping, optional

  • passthrough (Any, optional) – A passthrough value which is returned if cond did not evaluate to True.

print_data(data: Any, *, end: str = '\n', fstr: Optional[str] = None, **fstr_kwargs) Any[source]#

Prints and passes on the data using print().

The print operation distinguishes between dantro types (in which case some more information is shown) and non-dantro types. If a custom format string is given, will always use that one.

Note

This is a passthrough-function: data is always returned without any changes. However, the print operation may lead to resolution of proxy objects.

Parameters
  • data (Any) – The data to print

  • end (str, optional) – The end argument to the print call

  • fstr (str, optional) – If given, will use this to format the data for printing. The data will be the passed as first positional argument to the format string, thus addressable by {0:} or data (e.g. to access attributes via format-string syntax). If the format string is not None, will always use the format string and not use the custom formatting for dantro objects.

  • **fstr_kwargs – Keyword arguments passed to the format() call.

Returns

the given data

Return type

Any

dantro.data_ops.db module#

This module holds the data operations database

dantro.data_ops.db_tools module#

Tools that help to monitor and manipulate the operations database

register_operation(func: Callable, name: Optional[str] = None, *, skip_existing: bool = False, overwrite_existing: bool = False, _ops: Optional[dict] = None) None[source]#

Adds an entry to the shared operations registry.

Parameters
  • func (Callable) – The callable that is to be registered as operation.

  • name (str, optional) – The name of the operation. If not given (and the callable not being a lambda), will use the function name instead.

  • skip_existing (bool, optional) – Whether to skip registration if the operation name is already registered. This suppresses the ValueError raised on existing operation name.

  • overwrite_existing (bool, optional) – Whether to overwrite a potentially already existing operation of the same name. If given, this takes precedence over skip_existing.

  • _ops (dict, optional) – The operations database object to use; if None, uses the dantro operations database.

Raises
  • TypeError – On invalid name or non-callable for the func argument

  • ValueError – On already existing operation name and no skipping or overwriting enabled. Also if no name was given but the given callable is a lambda (which only has <lambda> as name).

is_operation(arg: Optional[Union[str, Callable]] = None, /, *, _ops: Optional[dict] = None, _reg_func: Optional[Callable] = None, **kws)[source]#

Decorator for registering functions with the operations database.

As an alternative to register_operation(), this decorator can be used to register a function with the operations database right where its defined:

from dantro.data_ops import is_operation

# Operation name deduced from function name
@is_operation
def my_operation(data, *args):
    # ... do stuff here ...
    return data

# Custom operation name
@is_operation("do_something")
def my_operation_with_a_custom_name(foo, bar):
    pass

# Overwriting an operation of the same name
@is_operation("do_something", overwrite_existing=True)
def actually_do_something(spam, fish):
    pass

See Registering operations for general information. For instructions on how to overwrite this decorator with a custom one, see Customizing database tools.

Parameters
  • arg (Union[str, Callable], optional) – The name that should be used in the operation registry. If not given, will use the name of the decorated function instead. If a callable, this refers to the @is_operation call syntax and will use that as a function.

  • _ops (dict, optional) – The operations database to use. If not given, uses the dantro operations database.

  • _reg_func (Callable, optional) – If given, uses that callable for registration, which should have the same signature as register_operation(). If None, uses dantro’s registration function, register_operation().

  • **kws – Passed to register_operation() or a potentially given custom _reg_func.

available_operations(*, match: Optional[str] = None, n: int = 5, _ops: Optional[dict] = None) Sequence[str][source]#

Returns all available operation names or a fuzzy-matched subset of them.

Also see Available operations for an overview.

Parameters
  • match (str, optional) – If given, fuzzy-matches the names and only returns close matches to this name.

  • n (int, optional) – Number of close matches to return. Passed on to difflib.get_close_matches()

  • _ops (dict, optional) – The operations database object to use; if None, uses the dantro operations database.

Returns

All available operation names or the matched subset.

The sequence is sorted alphabetically.

Return type

Sequence[str]

get_operation(op_name: str, *, _ops: Optional[Dict[str, Callable]] = None) Callable[source]#

Retrieve the operation’s callable

Parameters
  • op_name (str) – Name of the operation

  • _ops (Dict[str, Callable], optional) – The operations database object to use; if None, uses the dantro operations database.

Raises

BadOperationName – Upon invalid operation name

dantro.data_ops.expr_ops module#

Implements data operations that work with expressions, e.g. lambda function definitions or symbolic math

expression(expr: str, *, symbols: typing.Optional[dict] = None, evaluate: bool = True, transformations: typing.Optional[typing.Tuple[typing.Callable]] = None, astype: typing.Union[type, str] = <class 'float'>)[source]#

Parses and evaluates a symbolic math expression using SymPy.

For parsing, uses sympy’s sympy.parsing.sympy_parser.parse_expr(). The symbols are provided as local_dict; the global_dict is not explicitly set and subsequently uses the sympy default value, containing all basic sympy symbols and notations.

Note

The expression given here is not Python code, but symbolic math. You cannot call arbitrary functions, but only those that are imported by from sympy import *.

Hint

When using this expression as part of the Data Transformation Framework, it is attached to a so-called syntax hook that makes it easier to specify the symbols parameter. See here for more information.

Warning

While the expression is symbolic math, be aware that smypy by default interprets the ^ operator as XOR, not an exponentiation! For exponentiation, use the ** operator or adjust the transformations argument as specified in the sympy documentation.

Warning

The return object of this operation will only contain symbolic sympy objects if astype is None. Otherwise, the type cast will evaluate all symbolic objects to the numerical equivalent specified by the given astype.

Parameters
  • expr (str) – The expression to evaluate

  • symbols (dict, optional) – The symbols to use

  • evaluate (bool, optional) – Controls whether sympy evaluates expr. This may lead to a fully evaluated result, but does not guarantee that no sympy objects are contained in the result. For ensuring a fully numerical result, see the astype argument.

  • transformations (Tuple[Callable], optional) – The transformations argument for sympy’s sympy.parsing.sympy_parser.parse_expr(). By default, the sympy standard transformations are performed.

  • astype (Union[type, str], optional) – If given, performs a cast to this data type, fully evaluating all symbolic expressions. Default: Python float.

Raises
  • TypeError – Upon failing astype cast, e.g. due to free symbols remaining in the evaluated expression.

  • ValueError – When parsing of expr failed.

Returns

The result of the evaluated expression.

generate_lambda(expr: str) Callable[source]#

Generates a lambda from a string. This is useful when working with callables in other operations.

The expr argument needs to be a valid Python lambda expression.

Inside the lambda body, the following names are available for use:

  • A large part of the builtins module

  • Every name from the Python math module, e.g. sin, cos, …

  • These modules (and their long form): np, xr, scipy

Internally, this uses eval but imposes the following restrictions:

  • The following strings may not appear in expr: ;, __.

  • There can be no nested lambda, i.e. the only allowed lambda string is that in the beginning of expr.

  • The dangerous parts from the builtins module are not available.

Parameters

expr (str) – The expression string to evaluate into a lambda.

Returns

The generated Callable.

Return type

Callable

Raises

SyntaxError – Upon failed evaluation of the given expression, invalid expression pattern, or disallowed strings in the lambda body.

dantro.data_ops.hooks module#

Implements operation hooks for the DAG parser implemented in _dag_utils.

DAG_PARSER_OPERATION_HOOKS = {'expression': <function op_hook_expression>}#

Contains hooks that are invoked when a certain operation is parsed. The values should be callables that receive operation, *args, **kwargs and return a 3-tuple of the manipulated operation, args, kwargs. The return values will be those that the Transformation object is created from.

See the DAG Syntax Operation Hooks page for more information on integration and available hooks.

Example of defining a hook and registering it:

# Define hook function
def _op_hook_my_operation(
    operation, *args, **kwargs
) -> Tuple[str, list, dict]:
    # ... do stuff here ...
    return operation, args, kwargs

# Register with hooks registry
from dantro.data_ops import DAG_PARSER_OPERATION_HOOKS

DAG_PARSER_OPERATION_HOOKS["my_operation"] = _op_hook_my_operation

Todo

Implement a decorator to automatically register operation hooks

op_hook_expression(operation, *args, **kwargs) Tuple[str, list, dict][source]#

An operation hook for the expression operation, attempting to auto-detect which symbols are specified in the given expression. From those, DAGTag objects are created, making it more convenient to specify an expression that is based on other DAG tags.

The detected symbols are added to the kwargs.symbols, if no symbol of the same name is already explicitly defined there.

This hook accepts as positional arguments both the (expr,) form and the (prev_node, expr) form, making it more robust when the with_previous_result flag was set.

If the expression contains the prev or previous_result symbols, the corresponding DAGNode will be added to the symbols additionally.

For more information on operation hooks, see DAG Syntax Operation Hooks.