dantro.dag module

This is an implementation of a DAG for transformations on dantro objects

dantro.dag.fmt_time(seconds)
dantro.dag._deepcopy(obj: Any) → Any[source]

A pickle based deep-copy overload, that uses copy.deepcopy only as a fallback option if serialization was not possible.

The pickling approach being based on a C implementation, this can easily be many times faster than the pure-Python-based copy.deepcopy.

class dantro.dag.Transformation(*, operation: str, args: Sequence[Union[dantro._dag_utils.DAGReference, Any]], kwargs: Dict[str, Union[dantro._dag_utils.DAGReference, Any]], dag: Optional[dantro.dag.TransformationDAG] = None, salt: int = None, allow_failure: Union[bool, str] = None, fallback: Any = None, file_cache: dict = None)[source]

Bases: object

A transformation is the collection of an N-ary operation and its inputs.

Transformation objects store the name of the operation that is to be carried out and the arguments that are to be fed to that operation. After a Transformation is defined, the only interaction with them is via the compute method.

For computation, the arguments are recursively inspected for whether there are any DAGReference-derived objects; these need to be resolved first, meaning they are looked up in the DAG’s object database and – if they are another Transformation object – their result is computed. This can lead to a traversal along the DAG.

Warning

Objects of this class should under no circumstances be changed after they were created! For performance reasons, the hashstr property is cached; thus, changing attributes that are included into the hash computation will not lead to a new hash, thus silently creating wrong behaviour.

All relevant attributes (operation, args, kwargs, salt) are thus set read-only. This should be respected!

__init__(*, operation: str, args: Sequence[Union[dantro._dag_utils.DAGReference, Any]], kwargs: Dict[str, Union[dantro._dag_utils.DAGReference, Any]], dag: Optional[dantro.dag.TransformationDAG] = None, salt: int = None, allow_failure: Union[bool, str] = None, fallback: Any = None, file_cache: dict = None)[source]

Initialize a Transformation object.

Parameters
  • operation (str) – The operation that is to be carried out.

  • args (Sequence[Union[DAGReference, Any]]) – Positional arguments for the operation.

  • kwargs (Dict[str, Union[DAGReference, Any]]) – Keyword arguments for the operation. These are internally stored as a KeyOrderedDict.

  • dag (TransformationDAG, optional) – An associated DAG that is needed for object lookup. Without an associated DAG, args or kwargs may NOT contain any object references.

  • salt (int, optional) – A hashing salt that can be used to let this specific Transformation object have a different hash than other objects, thus leading to cache misses.

  • allow_failure (Union[bool, str], optional) – Whether the computation of this operation or its arguments may fail. In case of failure, the fallback value is used. If True or 'log', will emit a log message upon failure. If 'warn', will issue a warning. If 'silent', will use the fallback without any notification of failure. Note that the failure may occur not only during computation of this transformation’s operation, but also during the recursive computation of the referenced arguments. In other words, if the computation of an upstream dependency failed, the fallback will be used as well.

  • fallback (Any, optional) – If allow_failure was set, specifies the alternative value to use for this operation. This may in turn be a reference to another DAG node.

  • file_cache (dict, optional) –

    File cache options. Expected keys are write (boolean or dict) and read (boolean or dict). Note that the options given here are NOT reflected in the hash of the object!

    The following arguments are possible under the read key:

    enabled (bool, optional):

    Whether it should be attempted to read from the file cache.

    load_options (dict, optional):

    Passed on to the method that loads the cache, load().

    Under the write key, the following arguments are possible. They are evaluated in the order that they are listed here. See _cache_result() for more information.

    enabled (bool, optional):

    Whether writing is enabled at all

    always (bool, optional):

    If given, will always write.

    allow_overwrite (bool, optional):

    If False, will not write a cache file if one already exists. If True, a cache file might be written, although one already exists. This is still conditional on the evaluation of the other arguments.

    min_size (int, optional):

    The minimum size of the result object that allows writing the cache.

    max_size (int, optional):

    The maximum size of the result object that allows writing the cache.

    min_compute_time (float, optional):

    The minimal individual computation time of this node that is needed in order for the file cache to be written. Note that this value can be lower if the node result is not computed but looked up from the cache.

    min_cumulative_compute_time (float, optional):

    The minimal cumulative computation time of this node and all its dependencies that is needed in order for the file cache to be written. Note that this value can be lower if the node result is not computed but looked up from the cache.

    storage_options (dict, optional):

    Passed on to the cache storage method, dantro.dag.TransformationDAG._write_to_cache_file(). The following arguments are available:

    ignore_groups (bool, optional):

    Whether to store groups. Disabled by default.

    attempt_pickling (bool, optional):

    Whether it should be attempted to store results that could not be stored via a dedicated storage function by pickling them. Enabled by default.

    raise_on_error (bool, optional):

    Whether to raise on error to store a result. Disabled by default; it is useful to enable this when debugging.

    pkl_kwargs (dict, optional):

    Arguments passed on to the pickle.dump function.

    further keyword arguments:

    Passed on to the chosen storage method.

__str__() → str[source]

A human-readable string characterizing this Transformation

__repr__() → str[source]

A deterministic string representation of this transformation.

Note

This is also used for hash creation, thus it does not include the attributes that are set via the initialization arguments dag and file_cache.

Warning

Changing this method will lead to cache invalidations!

property hashstr

Computes the hash of this Transformation by creating a deterministic representation of this Transformation using __repr__ and then applying a checksum hash function to it.

Note that this does NOT rely on the built-in hash function but on the custom dantro _hash function which produces a platform-independent and deterministic hash. As this is a string-based (rather than an integer-based) hash, it is not implemented as the __hash__ magic method but as this separate property.

Returns

The hash string for this transformation

Return type

str

__hash__() → int[source]

Computes the python-compatible integer hash of this object from the string-based hash of this Transformation.

property operation

The operation this transformation performs

property dag

The associated TransformationDAG; used for object lookup

property dependencies

Recursively collects the references that are found in the positional and keyword arguments of this Transformation as well as in the fallback value.

property resolved_dependencies

Transformation objects that this Transformation depends on

property profile

The profiling data for this transformation

yaml_tag = '!dag_trf'
classmethod from_yaml(constructor, node)[source]
classmethod to_yaml(representer, node)[source]

A YAML representation of this Transformation, including all its arguments (which must again be YAML-representable). In essence, this returns a YAML mapping that has the !dag_trf YAML tag prefixed, such that reading it in will lead to the from_yaml method being invoked.

Note

The YAML representation does not include the file_cache parameters.

compute() → Any[source]

Computes the result of this transformation by recursively resolving objects and carrying out operations.

This method can also be called if the result is already computed; this will lead only to a cache-lookup, not a re-computation.

Returns

The result of the operation

Return type

Any

_perform_operation(*, args: list, kwargs: dict) → Any[source]

Perform the operation, updating the profiling info on the side

Parameters
  • args (list) – The positional arguments to the operation

  • kwargs (dict) – The keyword arguments to the operation

Returns

The result of the operation

Return type

Any

Raises
_resolve_refs(cont: Sequence) → Sequence[source]

Resolves DAG references within a deepcopy of the given container by iterating over it and computing the referenced nodes.

Parameters

cont (Sequence) – The container containing the references to resolve

_handle_error_and_fallback(err: Exception, *, context: str) → Any[source]

Handles an error that occured during application of the operation or during resolving of arguments (and the recursively invoked computations on dependent nodes).

Without error handling enabled, this will directly re-raise the active exception. Otherwise, it will generate a log message and will resolve the fallback value.

_update_profile(*, cumulative_compute: float = None, **times) → None[source]

Given some new profiling times, updates the profiling information.

Parameters
  • cumulative_compute (float, optional) – The cumulative computation time; if given, additionally computes the computation time for this individual node.

  • **times – Valid profiling data.

_lookup_result() → Tuple[bool, Any][source]

Look up the transformation result to spare re-computation

_cache_result(result: Any) → None[source]

Stores a computed result in the cache

class dantro.dag.TransformationDAG(*, dm: DataManager, define: Dict[str, Union[List[dict], Any]] = None, select: dict = None, transform: Sequence[dict] = None, cache_dir: str = '.cache', file_cache_defaults: dict = None, base_transform: Sequence[dantro.dag.Transformation] = None, select_base: Union[dantro._dag_utils.DAGReference, str] = None, select_path_prefix: str = None, meta_operations: Dict[str, Union[list, dict]] = None, exclude_from_all: List[str] = None, verbosity: int = 1)[source]

Bases: object

This class collects transformation operations that are (already by their own structure) connected into a directed acyclic graph. The aim of this class is to maintain base objects, manage references, and allow operations on the DAG, the most central of which is computing the result of a node.

Furthermore, this class also implements caching of transformations, such that operations that take very long can be stored (in memory or on disk) to speed up future operations.

Objects of this class are initialized with dict-like arguments which specify the transformation operations. There are some shorthands that allow a simple definition syntax, for example the select syntax, which takes care of selecting a basic set of data from the associated DataManager.

SPECIAL_TAGS = ('dag', 'dm', 'select_base')
__init__(*, dm: DataManager, define: Dict[str, Union[List[dict], Any]] = None, select: dict = None, transform: Sequence[dict] = None, cache_dir: str = '.cache', file_cache_defaults: dict = None, base_transform: Sequence[dantro.dag.Transformation] = None, select_base: Union[dantro._dag_utils.DAGReference, str] = None, select_path_prefix: str = None, meta_operations: Dict[str, Union[list, dict]] = None, exclude_from_all: List[str] = None, verbosity: int = 1)[source]

Initialize a DAG which is associated with a DataManager and load the specified transformations configuration into it.

Parameters
  • dm (DataManager) – The associated data manager

  • define (Dict[str, Union[List[dict], Any]], optional) – Definitions of tags. This can happen in two ways: If the given entries contain a list or tuple, they are interpreted as sequences of transformations which are subsequently added to the DAG, the tag being attached to the last transformation of each sequence. If the entries contain objects of any other type, including dict (!), they will be added to the DAG via a single node that uses the define operation. This argument can be helpful to define inputs or variables which may then be used in the transformations added via the select or transform arguments. See The define interface for more information and examples.

  • select (dict, optional) – Selection specifications, which are translated into regular transformations based on getitem operations. The base_transform and select_base arguments can be used to define from which object to select. By default, selection happens from the associated DataManager.

  • transform (Sequence[dict], optional) – Transform specifications.

  • cache_dir (str, optional) – The name of the cache directory to create if file caching is enabled. If this is a relative path, it is interpreted relative to the associated data manager’s data directory. If it is absolute, the absolute path is used. The directory is only created if it is needed.

  • file_cache_defaults (dict, optional) – Default arguments for file caching behaviour. This is recursively updated with the arguments given in each individual select or transform specification.

  • base_transform (Sequence[Transformation], optional) – A sequence of transform specifications that are added to the DAG prior to those added via define, select and transform. These can be used to create some other object from the data manager which should be used as the basis of select operations. These transformations should be kept as simple as possible and ideally be only used to traverse through the data tree.

  • select_base (Union[DAGReference, str], optional) – Which tag to base the select operations on. If None, will use the (always-registered) tag for the data manager, dm. This attribute can also be set via the select_base property.

  • select_path_prefix (str, optional) – If given, this path is prefixed to all path specifications made within the select argument. Note that unlike setting the select_base this merely joins the given prefix to the given paths, thus leading to repeated path resolution. For that reason, using the select_base argument is generally preferred and the select_path_prefix should only be used if select_base is already in use. If this path ends with a /, it is directly prepended. If not, the / is added before adjoining it to the other path.

  • meta_operations (dict, optional) – Meta-operations are basically function definitions using the language of the transformation framework; for information on how to define and use them, see Meta-Operations.

  • exclude_from_all (List[str], optional) – Tag names that should not be defined as compute() targets if compute_only: all is set there. Note that, alternatively, tags can be named starting with . or _ to exclude them from that list.

  • verbosity (str, optional) –

    Logging verbosity during computation. This mostly pertains to the extent of statistics being emitted through the logger.

    • 0: No statistics

    • 1: Per-node statistics (mean, std, min, max)

    • 2: Total effective time for the 5 slowest operations

    • 3: Same as 2 but for all operations

__str__() → str[source]

A human-readable string characterizing this TransformationDAG

property dm

The associated DataManager

property hashstr

Returns the hash of this DAG, which depends solely on the hash of the associated DataManager.

property objects

The object database

property tags

A mapping from tags to objects’ hashes; the hashes can be looked up in the object database to get to the objects.

property nodes

The nodes of the DAG

property ref_stacks

Named reference stacks, e.g. for resolving tags that were defined ´ inside meta-operations.

property meta_operations

The names of all registered meta-operations.

To register new meta-operations, use the dedicated registration method, register_meta_operation().

property cache_dir

The path to the cache directory that is associated with the DataManager that is coupled to this DAG. Note that the directory might not exist yet!

property cache_files

Scans the cache directory for cache files and returns a dict that has as keys the hash strings and as values a tuple of full path and file extension.

property select_base

The reference to the object that is used for select operations

property profile

Returns the profiling information for the DAG.

property profile_extended

Builds an extended profile that includes the profiles from all transformations and some aggregated information.

This is calculated anew upon each invocation; the result is not cached.

The extended profile contains the following information:

  • tags: profiles for each tag, stored under the tag

  • aggregated: aggregated statistics of all nodes with profile information on compute time, cache lookup, cache writing

  • sorted: individual profiling times, with NaN values set to 0

register_meta_operation(name: str, *, select: dict = None, transform: Sequence[dict] = None) → None[source]

Registers a new meta-operation, i.e. a transformation sequence with placeholders for the required positional and keyword arguments.

add_node(*, operation: str, args: list = None, kwargs: dict = None, tag: str = None, file_cache: dict = None, fallback: Any = None, **trf_kwargs) → dantro._dag_utils.DAGReference[source]

Add a new node by creating a new Transformation object and adding it to the node list.

In case of operation being a meta-operation, this method will add multiple Transformation objects to the node list. The tag and the file_cache argument then refer to the result node of the meta- operation, while the **trf_kwargs are passed to all these nodes. For more information, see Meta-Operations.

Parameters
  • operation (str) – The name of the operation or meta-operation.

  • args (list, optional) – Positional arguments to the operation

  • kwargs (dict, optional) – Keyword arguments to the operation

  • tag (str, optional) – The tag the transformation should be made available as.

  • file_cache (dict, optional) – File cache options for this node. If defaults were given during initialization, those defaults will be updated with the given dict.

  • **trf_kwargs – Passed on to Transformation.__init__

Raises

ValueError – If the tag already exists

Returns

The reference to the created node. In case of the

operation being a meta operation, the return value is a reference to the result node of the meta-operation.

Return type

DAGReference

add_nodes(*, define: Dict[str, Union[List[dict], Any]] = None, select: dict = None, transform: Sequence[dict] = None)[source]

Adds multiple nodes by parsing the specification given via the define, select, and transform arguments (in that order).

Note

The current select_base property value is used as basis for all getitem operations.

Parameters
  • define (Dict[str, Union[List[dict], Any]], optional) – Definitions of tags. This can happen in two ways: If the given entries contain a list or tuple, they are interpreted as sequences of transformations which are subsequently added to the DAG, the tag being attached to the last transformation of each sequence. If the entries contain objects of any other type, including dict (!), they will be added to the DAG via a single node that uses the define operation. This argument can be helpful to define inputs or variables which may then be used in the transformations added via the select or transform arguments. See The define interface for more information and examples.

  • select (dict, optional) – Selection specifications, which are translated into regular transformations based on getitem operations. The base_transform and select_base arguments can be used to define from which object to select. By default, selection happens from the associated DataManager.

  • transform (Sequence[dict], optional) – Transform specifications.

compute(*, compute_only: Sequence[str] = None, verbosity: int = None) → Dict[str, Any][source]

Computes all specified tags and returns a result dict.

Depending on the verbosity attribute, a varying level of profiling statistics will be emitted via the logger.

Parameters

compute_only (Sequence[str], optional) – The tags to compute. If None, will compute all non-private tags: all tags not starting with . or _ that are not included in the TransformationDAG.exclude_from_all list.

Returns

A mapping from tags to fully computed results.

Return type

Dict[str, Any]

_parse_trfs(*, select: dict, transform: Sequence[dict], define: dict = None) → Sequence[dict][source]

Parse the given arguments to bring them into a uniform format: a sequence of parameters for transformation operations. The arguments are parsed starting with the define tags, followed by the select and the transform argument.

Parameters
  • select (dict) – The shorthand to select certain objects from the DataManager. These may also include transformations.

  • transform (Sequence[dict]) – Actual transformation operations, carried out afterwards.

  • define (dict, optional) – Each entry corresponds either to a transformation sequence (if type is list or tuple) where the key is used as the tag and attached to the last transformation of each sequence. For any other type, will add a single transformation directly with the content of each entry.

Returns

A sequence of transformation parameters that was

brought into a uniform structure.

Return type

Sequence[dict]

Raises
  • TypeError – On invalid type within entry of select

  • ValueError – When file_cache is given for selection from base

_add_meta_operation_nodes(operation: str, *, args: list = None, kwargs: dict = None, tag: str = None, file_cache: dict = None, allow_failure: Union[bool, str] = None, fallback: Any = None, **trf_kwargs) → dantro._dag_utils.DAGReference[source]

Adds Transformation nodes for meta-operations

This method resolves the placeholder references in the specified meta- operation such that they point to the args and kwargs. It then calls add_node() repeatedly to add the actual nodes.

Note

The last node added by this method is considered the “result” of the selected meta-operation. Subsequently, the arguments tag, file_cache, allow_failure and fallback are only applied to this last node.

The trf_kwargs (which include the salt) on the other hand are passed to all transformations of the meta-operation.

Parameters
  • operation (str) – The meta-operation to add nodes for

  • args (list, optional) – Positional arguments to the meta-operation

  • kwargs (dict, optional) – Keyword arguments to the meta-operation

  • tag (str, optional) – The tag that is to be attached to the result of this meta-operation.

  • file_cache (dict, optional) – File caching options for the result.

  • allow_failure (Union[bool, str], optional) – Specifies the error handling for the result node of this meta-operation.

  • fallback (Any, optional) – Specifies the fallback for the result node of this meta-operation.

  • **trf_kwargs – Transformation keyword arguments, passed on to all transformations that are to be added.

_update_profile(**times)[source]

Updates profiling information by adding the given time to the matching key.

_parse_compute_only(compute_only: Union[str, List[str]]) → List[str][source]

Prepares the compute_only argument for use in compute().

_retrieve_from_cache_file(trf_hash: str, **load_kwargs) → Tuple[bool, Any][source]

Retrieves a transformation’s result from a cache file.

_write_to_cache_file(trf_hash: str, *, result: Any, ignore_groups: bool = True, attempt_pickling: bool = True, raise_on_error: bool = False, pkl_kwargs: dict = None, **save_kwargs) → bool[source]

Writes the given result object to a hash file, overwriting existing ones.

Parameters
  • trf_hash (str) – The hash; will be used for the file name

  • result (Any) – The result object to write as a cache file

  • ignore_groups (bool, optional) – Whether to store groups. Disabled by default.

  • attempt_pickling (bool, optional) – Whether it should be attempted to store results that could not be stored via a dedicated storage function by pickling them. Enabled by default.

  • raise_on_error (bool, optional) – Whether to raise on error to store a result. Disabled by default; it is useful to enable this when debugging.

  • pkl_kwargs (dict, optional) – Arguments passed on to the pickle.dump function.

  • **save_kwargs – Passed on to the chosen storage method.

Returns

Whether a cache file was saved

Return type

bool

Raises
  • NotImplementedError – When attempting to store instances of BaseDataGroup or a derived class

  • RuntimeError – When raise_on_error was given and there was an error during saving.