Source code for dantro.dag

"""This is an implementation of a DAG for transformations on dantro objects"""

import os
import sys
import glob
import copy
import time
import logging
import pickle as pkl
from itertools import chain

from typing import TypeVar, Dict, Tuple, Sequence, Any, Union, List, Set

import numpy as np
import xarray as xr

from paramspace.tools import recursive_replace, recursive_collect

from .abc import AbstractDataContainer, PATH_JOIN_CHAR
from .base import BaseDataGroup
from .utils import KeyOrderedDict, apply_operation, register_operation
from .tools import recursive_update
from .data_loaders import LOADER_BY_FILE_EXT
from .containers import ObjectContainer, NumpyDataContainer, XrDataContainer
from ._dag_utils import (DAGObjects, DAGReference, DAGTag, DAGNode,
                         parse_dag_syntax as _parse_dag_syntax,
                         parse_dag_minimal_syntax as _parse_dag_minimal_syntax)
from ._hash import _hash, SHORT_HASH_LENGTH, FULL_HASH_LENGTH


# Local constants .............................................................
log = logging.getLogger(__name__)

# The path within the DAG's associated DataManager to which caches are loaded
DAG_CACHE_DM_PATH = 'cache/dag'

# Functions that can store the DAG computation result objects, distinguishing
# by their type.
DAG_CACHE_RESULT_SAVE_FUNCS = {
    # Saving functions of specific dantro objects
    (NumpyDataContainer,):
        lambda obj, p, **kws: obj.save(p+".npy", **kws),
    (XrDataContainer,):
        lambda obj, p, **kws: obj.save(p+".xrdc", **kws),

    # Saving functions of external packages
    (np.ndarray,):
        lambda obj, p, **kws: np.save(p+".npy", obj, **kws),
    (xr.DataArray,):
        lambda obj, p, **kws: obj.to_netcdf(p+".nc_da", **kws),
    (xr.Dataset,):
        lambda obj, p, **kws: obj.to_netcdf(p+".nc_ds", **kws),
}


# -----------------------------------------------------------------------------

[docs]class Transformation: """A transformation is the collection of an N-ary operation and its inputs. Transformation objects store the name of the operation that is to be carried out and the arguments that are to be fed to that operation. After a Transformation is defined, the only interaction with them is via the ``compute`` method. For computation, the arguments are recursively inspected for whether there are any DAGReference-derived objects; these need to be resolved first, meaning they are looked up in the DAG's object database and -- if they are another Transformation object -- their result is computed. This can lead to a traversal along the DAG. .. warning:: Objects of this class should under *no* circumstances be changed after they were created! For performance reasons, the :py:attr:`~dantro.dag.Transformation.hashstr` property is cached; thus, changing attributes that are included into the hash computation will not lead to a new hash, thus silently creating wrong behaviour. All relevant attributes (operation, args, kwargs, salt) are thus set read-only. This should be respected! """
[docs] def __init__(self, *, operation: str, args: Sequence[Union[DAGReference, Any]], kwargs: Dict[str, Union[DAGReference, Any]], dag: 'TransformationDAG'=None, salt: int=None, file_cache: dict=None): """Initialize a Transformation object. Args: operation (str): The operation that is to be carried out. args (Sequence[Union[DAGReference, Any]]): Positional arguments for the operation. kwargs (Dict[str, Union[DAGReference, Any]]): Keyword arguments for the operation. These are internally stored as a :py:class:`~dantro.utils.ordereddict.KeyOrderedDict`. dag (TransformationDAG, optional): An associated DAG that is needed for object lookup. Without an associated DAG, args or kwargs may NOT contain any object references. salt (int, optional): A hashing salt that can be used to let this specific Transformation object have a different hash than other objects, thus leading to cache misses. file_cache (dict, optional): File cache options. Expected keys are ``write`` (boolean or dict) and ``read`` (boolean or dict). Note that the options given here are NOT reflected in the hash of the object! The following arguments are possible under the ``read`` key: enabled (bool, optional): Whether it should be attempted to read from the file cache. load_options (dict, optional): Passed on to the method that loads the cache, :py:meth:`~dantro.data_mngr.DataManager.load`. Under the ``write`` key, the following arguments are possible. They are evaluated in the order that they are listed here. See :py:meth:`~dantro.dag.Transformation._cache_result` for more information. enabled (bool, optional): Whether writing is enabled at all always (bool, optional): If given, will always write. allow_overwrite (bool, optional): If False, will not write a cache file if one already exists. If True, a cache file *might* be written, although one already exists. This is still conditional on the evaluation of the other arguments. min_size (int, optional): The *minimum* size of the result object that allows writing the cache. max_size (int, optional): The *maximum* size of the result object that allows writing the cache. min_compute_time (float, optional): The minimal individual computation time of this node that is needed in order for the file cache to be written. *Note* that this value can be lower if the node result is not computed but looked up from the cache. min_cumulative_compute_time (float, optional): The minimal cumulative computation time of this node and all its dependencies that is needed in order for the file cache to be written. *Note* that this value can be lower if the node result is not computed but looked up from the cache. storage_options (dict, optional): Passed on to the cache storage method, :py:meth:`dantro.dag.TransformationDAG._write_to_cache_file`. The following arguments are available: ignore_groups (bool, optional): Whether to store groups. Disabled by default. attempt_pickling (bool, optional): Whether it should be attempted to store results that could not be stored via a dedicated storage function by pickling them. Enabled by default. raise_on_error (bool, optional): Whether to raise on error to store a result. Disabled by default; it is useful to enable this when debugging. pkl_kwargs (dict, optional): Arguments passed on to the pickle.dump function. further keyword arguments: Passed on to the chosen storage method. """ # Storage attributes self._operation = operation self._args = args self._kwargs = KeyOrderedDict(**kwargs) self._dag = dag self._salt = salt self._hashstr = None self._profile = dict(compute=0., cumulative_compute=0., hashstr=0., cache_lookup=0., cache_writing=0.) # Parse file cache options, making sure it's a dict with default values self._fc_opts = file_cache if file_cache is not None else {} if isinstance(self._fc_opts.get('write', {}), bool): self._fc_opts['write'] = dict(enabled=self._fc_opts['write']) elif 'write' not in self._fc_opts: self._fc_opts['write'] = dict(enabled=False) if isinstance(self._fc_opts.get('read', {}), bool): self._fc_opts['read'] = dict(enabled=self._fc_opts['read']) elif 'read' not in self._fc_opts: self._fc_opts['read'] = dict(enabled=False) # Cache dict, containing the result and whether the cache is in memory self._cache = dict(result=None, filled=False)
# ......................................................................... # String representation and hashing
[docs] def __str__(self) -> str: """A human-readable string characterizing this Transformation""" return ("<{t:}, operation: {op:}, {Na:d} args, {Nkw:d} kwargs>\n" " args: {args:}\n" " kwargs: {kwargs:}\n" "".format(t=type(self).__name__, op=self._operation, Na=len(self._args), Nkw=len(self._kwargs), args=self._args, kwargs=self._kwargs))
[docs] def __repr__(self) -> str: """A deterministic string representation of this transformation. .. note:: This is also used for hash creation, thus it does not include the attributes that are set via the initialization arguments ``dag`` and ``file_cache``. .. warning:: Changing this method will lead to cache invalidations! """ return ("<{mod:}.{t:}, operation={op:}, args={args:}, " "kwargs={kwargs:}, salt={salt:}>" "".format(mod=type(self).__module__, t=type(self).__name__, op=repr(self._operation), args=repr(self._args), kwargs=repr(dict(self._kwargs)),# TODO Check sorting! salt=repr(self._salt)))
@property def hashstr(self) -> str: """Computes the hash of this Transformation by creating a deterministic representation of this Transformation using ``__repr__`` and then applying a checksum hash function to it. Note that this does NOT rely on the built-in hash function but on the custom dantro ``_hash`` function which produces a platform-independent and deterministic hash. As this is a *string*-based (rather than an integer-based) hash, it is not implemented as the ``__hash__`` magic method but as this separate property. Returns: str: The hash string for this transformation """ if self._hashstr is None: t0 = time.time() self._hashstr = _hash(repr(self)) self._update_profile(hashstr=time.time() - t0) return self._hashstr
[docs] def __hash__(self) -> int: """Computes the python-compatible integer hash of this object from the string-based hash of this Transformation. """ return hash(self.hashstr)
# ......................................................................... # Properties @property def dag(self) -> 'TransformationDAG': """The associated TransformationDAG; used for object lookup""" return self._dag @property def dependencies(self) -> Set[DAGReference]: """Recursively collects the references that are found in the positional and keyword arguments of this Transformation. """ return set(recursive_collect(chain(self._args, self._kwargs.values()), select_func=(lambda o: isinstance(o, DAGReference)))) @property def resolved_dependencies(self) -> Set['Transformation']: """Transformation objects that this Transformation depends on""" return set([ref.resolve_object(dag=self.dag) for ref in self.dependencies]) @property def profile(self) -> Dict[str, float]: """The profiling data for this transformation""" return self._profile # YAML representation ..................................................... yaml_tag = u'!dag_trf'
[docs] @classmethod def from_yaml(cls, constructor, node): return cls(**constructor.construct_mapping(node, deep=True))
[docs] @classmethod def to_yaml(cls, representer, node): """A YAML representation of this Transformation, including all its arguments (which must again be YAML-representable). In essence, this returns a YAML mapping that has the ``!dag_trf`` YAML tag prefixed, such that *reading* it in will lead to the ``from_yaml`` method being invoked. .. note:: The YAML representation does *not* include the ``file_cache`` parameters. .. warning:: The YAML representation is used in computing the hashstr that identifies this transformation. Changing the argument order here or adding further keys to the dict will lead to hash changes and thus to cache misses. """ # Collect the attributes that are relevant for the transformation. d = dict(operation=node._operation, args=node._args, kwargs=dict(node._kwargs)) # If a specific salt was given, add that to the dict as well if node._salt is not None: d['salt'] = node._salt # Let YAML represent this as a mapping with an additional tag return representer.represent_mapping(cls.yaml_tag, d)
# ......................................................................... # Compute interface
[docs] def compute(self) -> Any: """Computes the result of this transformation by recursively resolving objects and carrying out operations. This method can also be called if the result is already computed; this will lead only to a cache-lookup, not a re-computation. Returns: Any: The result of the operation """ def is_DAGReference(obj: Any) -> bool: return isinstance(obj, DAGReference) def resolve_and_compute(ref: DAGReference): """Resolve references to their objects, if necessary computing the results of referenced Transformation objects recursively. Makes use of arguments from outer scope. """ if self.dag is None: raise ValueError("Cannot resolve Transformation arguments " "that contain DAG references, because no DAG " "was associated with this Transformation!") # Let the reference resolve the corresponding object from the DAG obj = ref.resolve_object(dag=self.dag) # Check if this refers to an object that is NOT a transformation. # This could be the DataManager or the DAG itself, but also other # objects in the DAG's object database. Performing computation on # those is either not possible or would lead to infinite loops. if not isinstance(obj, Transformation): return obj # else: It is another Transformation object. Compute it, which # leads to a traversal up the DAG tree. return obj.compute() # Try to look up an already computed result from memory or file cache success, res = self._lookup_result() if not success: # Did not find a result in memory or file cache -> Compute it. # First, compute the result of the references in the arguments. args = recursive_replace(copy.deepcopy(self._args), select_func=is_DAGReference, replace_func=resolve_and_compute) kwargs = recursive_replace(copy.deepcopy(self._kwargs), select_func=is_DAGReference, replace_func=resolve_and_compute) # NOTE Important to deepcopy here, because otherwise the recursive # replacement and the mutability of both args and kwargs will # lead to DAGReference objects being replaced with the actual # objects, which would break the .hashstr property of this # object. The deepcopy is always possible, because even if # the args and kwargs are nested, they contain only trivial # objects. # Carry out the operation res = self._perform_operation(args=args, kwargs=kwargs) # Allow caching the result, even if it comes from the cache self._cache_result(res) return res
[docs] def _perform_operation(self, *, args, kwargs) -> Any: """Perform the operation, updating the profiling info on the side""" # Initialize a dict for profiling info prof = dict() # Set up profiling t0 = time.time() # Actually perform the operation res = apply_operation(self._operation, *args, **kwargs) # TODO Add error handling with node information # Prase profiling info and return the result self._update_profile(cumulative_compute=(time.time() - t0)) return res
[docs] def _update_profile(self, *, cumulative_compute: float=None, **times) -> None: """Given some new profiling times, updates the profiling information. Args: cumulative_compute (float, optional): The cumulative computation time; if given, additionally computes the computation time for this individual node. **times: Valid profiling data. """ # If cumulative computation time was given, calculate individual time if cumulative_compute is not None: self._profile['cumulative_compute'] = cumulative_compute # Aggregate the dependencies' cumulative computation times deps_cctime = sum([dep.profile['cumulative_compute'] for dep in self.resolved_dependencies if isinstance(dep, Transformation)]) # NOTE The dependencies might not have this value set because there # might have been a cache lookup self._profile['compute'] = max(0., cumulative_compute-deps_cctime) # Store the remaining entries self._profile.update(times)
# Cache handling . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
[docs] def _lookup_result(self) -> Tuple[bool, Any]: """Look up the transformation result to spare re-computation""" success, res = False, None # Retrieve cache parameters read_opts = self._fc_opts.get('read', {}) load_opts = read_opts.get('load_options', {}) # Check if the cache is already filled. If not, see if the file cache # can be read and is configured to be read. if self._cache['filled']: success = True res = self._cache['result'] elif self.dag is not None and read_opts.get('enabled', False): # Setup profiling t0 = time.time() # Let the DAG check if there is a file cache, i.e. if a file with # this Transformation's hash exists in the DAG's cache directory. success, res = self.dag._retrieve_from_cache_file(self.hashstr, **load_opts) # Store the result if success: self._cache['result'] = res self._cache['filled'] = True self._update_profile(cache_lookup=(time.time() - t0)) return success, res
[docs] def _cache_result(self, result: Any) -> None: """Stores a computed result in the cache""" def should_write(*, enabled: bool, always: bool=False, allow_overwrite: bool=False, min_size: int=None, max_size: int=None, min_compute_time: float=None, min_cumulative_compute_time: float=None, storage_options: dict=None ) -> bool: """A helper function to evaluate _whether_ the file cache is to be written or not. Args: enabled (bool): Whether writing is enabled at all always (bool, optional): If given, will always write. allow_overwrite (bool, optional): If False, will not write a cache file if one already exists. If True, a cache file _might_ be written, although one already exists. This is still conditional on the evaluation of the other arguments. min_size (int, optional): The minimum size of the result object that allows writing the cache. max_size (int, optional): The maximum size of the result object that allows writing the cache. min_compute_time (float, optional): The minimal individual computation time of this node that is needed in order for the file cache to be written. Note that this value can be lower if the node result is not computed but looked up from the cache. min_cumulative_compute_time (float, optional): The minimal cumulative computation time of this node and all its dependencies that is needed in order for the file cache to be written. Note that this value can be lower if the node result is not computed but looked up from the cache. storage_options (dict, optional): (ignored here) Returns: bool: Whether to write the file cache or not. """ if not enabled: # ... nothing else to check return False # With always: always write, don't look at other arguments. if always: return True # All checks below are formulated such that they return False. # If overwriting is _disabled_ and a cache file already exists, it # is already clear that a new one should _not_ be written if not allow_overwrite and self.hashstr in self.dag.cache_files: return False # Evaluate profiling information if min_compute_time is not None: if self.profile['compute'] < min_compute_time: return False if min_cumulative_compute_time is not None: if ( self.profile['cumulative_compute'] < min_cumulative_compute_time): return False # Evaluate object size if min_size is not None or max_size is not None: size_itvl = [min_size if min_size is not None else 0, max_size if max_size is not None else np.inf] obj_size = sys.getsizeof(result) # from outer scope if not (size_itvl[0] < obj_size < size_itvl[1]): return False # If this point is reached, the cache file should be written. return True # Store a reference to the result and mark the cache as being in use self._cache['result'] = result self._cache['filled'] = True # NOTE If instead of a proper computation, the passed result object was # previously looked up from the cache, this will not have an # effect. # Get file cache writing parameters; don't write if not write_opts = self._fc_opts['write'] # Determine whether to write to a file if self.dag is not None and should_write(**write_opts): # Setup profiling t0 = time.time() # Write the result to a file inside the DAG's cache directory. This # is handled by the DAG itself, because the Transformation does not # know (and should not care) aboute the cache directory ... storage_opts = write_opts.get('storage_options', {}) self.dag._write_to_cache_file(self.hashstr, result=result, **storage_opts) self._update_profile(cache_writing=(time.time() - t0))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
[docs]class TransformationDAG: """This class collects transformation operations that are (already by their own structure) connected into a directed acyclic graph. The aim of this class is to maintain base objects, manage references, and allow operations on the DAG, the most central of which is computing the result of a node. Furthermore, this class also implements caching of transformations, such that operations that take very long can be stored (in memory or on disk) to speed up future operations. Objects of this class are initialized with dict-like arguments which specify the transformation operations. There are some shorthands that allow a simple definition syntax, for example the ``select`` syntax, which takes care of selecting a basic set of data from the associated DataManager. """
[docs] def __init__(self, *, dm: 'DataManager', select: dict=None, transform: Sequence[dict]=None, cache_dir: str='.cache', file_cache_defaults: dict=None, base_transform: Sequence[Transformation]=None, select_base: Union[DAGReference, str]=None, select_path_prefix: str=None): """Initialize a DAG which is associated with a DataManager and load the specified transformations configuration into it. Args: dm (DataManager): The associated data manager select (dict, optional): Selection specifications, which are translated into regular transformations based on ``getitem`` operations. The ``base_transform`` and ``select_base`` arguments can be used to define from which object to select. By default, selection happens from the associated DataManager. transform (Sequence[dict], optional): Transform specifications. cache_dir (str, optional): The name of the cache directory to create if file caching is enabled. If this is a relative path, it is interpreted relative to the associated data manager's data directory. If it is absolute, the absolute path is used. The directory is only created if it is needed. file_cache_defaults (dict, optional): Default arguments for file caching behaviour. This is recursively updated with the arguments given in each individual select or transform specification. base_transform (Sequence[Transformation], optional): A sequence of transform specifications that are added to the DAG prior to those added via ``select`` and ``transform``. These can be used to create some other object from the data manager which should be used as the basis of ``select`` operations. select_base (Union[DAGReference, str], optional): Which tag to base the ``select`` operations on. If None, will use the (always-registered) tag for the data manager, ``dm``. This attribute can also be set via the ``select_base`` property. select_path_prefix (str, optional): If given, this path is prefixed to all ``path`` specifications made within the ``select`` argument. Note that unlike setting the ``select_base`` this merely joins the given prefix to the given paths, thus leading to repeated path resolution. For that reason, using the ``select_base`` argument is generally preferred and the ``select_path_prefix`` should only be used if ``select_base`` is already in use. If this path ends with a ``/``, it is directly prepended. If not, the ``/`` is added before adjoining it to the other path. """ self._dm = dm self._objects = DAGObjects() self._tags = dict() self._nodes = list() self._fc_opts = file_cache_defaults if file_cache_defaults else {} self._select_base = None self._profile = dict(add_node=0., compute=0.) self._select_path_prefix = select_path_prefix # Determine cache directory path; relative path interpreted as relative # to the DataManager's data directory if os.path.isabs(cache_dir): self._cache_dir = cache_dir else: self._cache_dir = os.path.join(self.dm.dirs['data'], cache_dir) # Add the DAG itself and the DataManager as objects with default tags self.tags['dag'] = self.objects.add_object(self) self.tags['dm'] = self.objects.add_object(self.dm) # NOTE The data manager is NOT a node of the DAG, but more like an # external data source, thus being accessible only as a tag # Add base transformations that do not rely on select operations self.add_nodes(transform=base_transform) # Set the selection base tag; the property setter checks availability self.select_base = select_base # Now add nodes via the main arguments; these can now make use of the # select interface, because a select base tag is set and base transform # operations were already added. self.add_nodes(select=select, transform=transform)
# .........................................................................
[docs] def __str__(self) -> str: """A human-readable string characterizing this TransformationDAG""" return ("<TransformationDAG, " "{:d} node(s), {:d} tag(s), {:d} object(s)>" "".format(len(self.nodes), len(self.tags), len(self.objects)))
# ......................................................................... @property def dm(self) -> 'DataManager': """The associated DataManager""" return self._dm @property def hashstr(self) -> str: """Returns the hash of this DAG, which depends solely on the hash of the associated DataManager. """ return _hash("<TransformationDAG, coupled to DataManager with ref {}>" "".format(self.dm.hashstr)) @property def objects(self) -> DAGObjects: """The object database""" return self._objects @property def tags(self) -> Dict[str, str]: """A mapping from tags to objects' hashes; the hashes can be looked up in the object database to get to the objects. """ return self._tags @property def nodes(self) -> List[str]: """The nodes of the DAG""" return self._nodes @property def cache_dir(self) -> str: """The path to the cache directory that is associated with the DataManager that is coupled to this DAG. Note that the directory might not exist yet! """ return self._cache_dir @property def cache_files(self) -> Dict[str, Tuple[str, str]]: """Scans the cache directory for cache files and returns a dict that has as keys the hash strings and as values a tuple of full path and file extension. """ info = dict() # Go over all files in the cache dir that have an extension for path in glob.glob(os.path.join(self.cache_dir, '*.*')): if not os.path.isfile(path): continue # Get filename and extension, then check if it is a hash fname, ext = os.path.splitext(os.path.basename(path)) if len(fname) != FULL_HASH_LENGTH: continue # else: filename is assumed to be the hash. if fname in info: raise ValueError("Encountered a duplicate cache file for the " "transformation with hash {}! Delete all but " "one of those files from the cache directory " "{}.".format(fname, self.cache_dir)) # All good, store info. info[fname] = dict(full_path=path, ext=ext) return info @property def select_base(self) -> DAGReference: """The reference to the object that is used for select operations""" return self._select_base @select_base.setter def select_base(self, new_base: Union[DAGReference, str]): """Set the reference that is to be used as the base of select operations. It can either be a reference object or a string, which is then interpreted as a tag. """ # Distinguish by type. If it's not a DAGReference, assume it's a tag. if new_base is None: new_base = DAGTag('dm').convert_to_ref(dag=self) elif isinstance(new_base, DAGReference): # Make sure it is a proper DAGReference object (hash-based) and not # an object of a derived class. new_base = new_base.convert_to_ref(dag=self) elif new_base not in self.tags: raise KeyError("The tag '{}' cannot be the basis of future select " "operations because it is not available! Make sure " "that a node with that tag is added prior to the " "attempt of setting it. Available tags: {}. " "Alternatively, pass a DAGReference object." "".format(new_base, ", ".join(self.tags))) else: # Tag is available. Create a DAGReference via DAGTag conversion log.debug("Setting select_base to tag '%s' ...", new_base) new_base = DAGTag(new_base).convert_to_ref(dag=self) # Have a DAGReference now. Store it. self._select_base = new_base @property def profile(self) -> Dict[str, float]: """Returns the profiling information for the DAG.""" return self._profile @property def profile_extended(self) -> Dict[str, Union[float, Dict[str, float]]]: """Builds an extended profile that includes the profiles from all transformations and some aggregated information. This is calculated anew upon each invocation; the result is not cached. """ prof = copy.deepcopy(self.profile) # Add tag-specific information prof['tags'] = dict() for tag, obj_hash in self.tags.items(): obj = self.objects[obj_hash] if not isinstance(obj, Transformation): continue tprof = copy.deepcopy(obj.profile) prof['tags'][tag] = tprof # Aggregate the profiled times from all transformations (by item) to_aggregate = ('compute', 'hashstr', 'cache_lookup', 'cache_writing') stat_funcs = dict(mean=lambda d: np.mean(d), std=lambda d: np.std(d), min=lambda d: np.min(d), max=lambda d: np.max(d), q25=lambda d: np.quantile(d, .25), q50=lambda d: np.quantile(d, .50), q75=lambda d: np.quantile(d, .75)) tprofs = {item: list() for item in to_aggregate} for obj_hash, obj in self.objects.items(): if not isinstance(obj, Transformation): continue tprof = copy.deepcopy(obj.profile) for item in to_aggregate: tprofs[item].append(tprof[item]) # Compute some statistics for the aggregated elements prof['aggregated'] = dict() for item in to_aggregate: prof['aggregated'][item] = {k: (f(tprofs[item]) if tprofs[item] else np.nan) for k, f in stat_funcs.items()} return prof # .........................................................................
[docs] def add_node(self, *, operation: str, args: list=None, kwargs: dict=None, tag: str=None, file_cache: dict=None, **trf_kwargs) -> DAGReference: """Add a new node by creating a new Transformation object and adding it to the node list. Args: operation (str): The name of the operation args (list, optional): Positional arguments to the operation kwargs (dict, optional): Keyword arguments to the operation tag (str, optional): The tag the transformation should be made available as. file_cache (dict, optional): File cache options for this node. If defaults were given during initialization, those defaults will be updated with the given dict. **trf_kwargs: Passed on to Transformation.__init__ Raises: ValueError: If the tag already exists Returns: DAGReference: The reference to the created node """ t0 = time.time() # Some helper methods for the recursive replacement def not_proper_ref(obj: Any) -> bool: return ( isinstance(obj, DAGReference) and type(obj) is not DAGReference) def convert_to_ref(obj: DAGReference) -> DAGReference: return obj.convert_to_ref(dag=self) # . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . # Handle default values of arguments args = copy.deepcopy(args) if args else [] kwargs = copy.deepcopy(kwargs) if kwargs else {} # NOTE Deep copy is important here, because the mutability of nested # args or kwargs may lead to side effects. The deep copy should # always be possible, because args and kwargs should only contain # trivial objects. # Recursively replace any derived references to proper DAGReferences, # which work hash-based. This is to not have multiple options of how # another TransformationDAG object is referenced. args = recursive_replace(args, select_func=not_proper_ref, replace_func=convert_to_ref) kwargs = recursive_replace(kwargs, select_func=not_proper_ref, replace_func=convert_to_ref) # Parse file cache parameters fc_opts = copy.deepcopy(self._fc_opts) # Always a dict if file_cache is not None: fc_opts = recursive_update(fc_opts, file_cache) # From these arguments, create the Transformation object and add it to # the objects database. trf = Transformation(operation=operation, args=args, kwargs=kwargs, dag=self, file_cache=fc_opts, **trf_kwargs) trf_hash = self.objects.add_object(trf) # NOTE From this point on, the object itself has no relevance. # Transformation objects should only be handled via their hash in # order to reduce duplicate calculations and make efficient # caching possible. # Store the hash in the node list self.nodes.append(trf_hash) # If a tag was specified, create a tag if tag: if tag in self.tags.keys(): raise ValueError("Tag '{}' already exists! Choose a different " "one. Already in use: {}" "".format(tag, ", ".join(self.tags.keys()))) self.tags[tag] = trf_hash # Update the profile self._update_profile(add_node=time.time() - t0) # Return a reference to the newly created node return DAGReference(trf_hash)
[docs] def add_nodes(self, *, select: dict=None, transform: Sequence[dict]=None): """Adds multiple nodes by parsing the specification given via the ``select`` and ``transform`` arguments. Args: select (dict, optional): Selection specifications, which are translated into regular transformations based on ``getitem`` operations. The ``base_transform`` and ``select_base`` arguments can be used to define from which object to select. By default, selection happens from the associated DataManager. transform (Sequence[dict], optional): Transform specifications. """ # Parse the arguments and add multiple nodes from those specs specs = self._parse_trfs(select=select, transform=transform) if not specs: return for spec in specs: self.add_node(**spec)
[docs] def compute(self, *, compute_only: Sequence[str]=None) -> Dict[str, Any]: """Computes all specified tags and returns a result dict. Args: compute_only (Sequence[str], optional): The tags to compute. If not given, will compute all associated tags. cache_options (dict, optional): Cache options. These will update the default cache options given at initialization of the DAG. Returns: Dict[str, Any]: A mapping from tags to fully computed results. """ def postprocess_result(res, *, tag: str): """Performs some postprocessing operations on the results of individual tag computations. """ # If the object is a detached dantro tree object, use the short # transformation hash for its name if isinstance(res, (AbstractDataContainer)) and res.parent is None: res.name = trf.hashstr[:SHORT_HASH_LENGTH] # Unwrap ObjectContainer; these are only meant for usage within the # data tree and it makes little sense to keep them in that form. if isinstance(res, ObjectContainer): res = res.data return res # Initiate start time for profiling t0 = time.time() # Determine which tags to compute compute_only = compute_only if compute_only is not None else 'all' if compute_only == 'all': compute_only = [t for t in self.tags.keys() if t not in ['dm', 'dag']] log.info("Computation invoked on DAG with %d nodes.", len(self.nodes)) # The results dict results = dict() if not compute_only: log.remark("No tags were selected to be computed. " "Available tags:\n %s", ", ".join(self.tags)) return results # Compute and collect the results for tag in compute_only: log.remark("Computing tag '%s' ...", tag) # Resolve the transformation, then compute the result trf = self.objects[self.tags[tag]] res = trf.compute() # Postprocess it and store the result under its tag results[tag] = postprocess_result(res, tag=tag) # Update profiling information t1 = time.time() self._update_profile(compute=t1-t0) # Provide some information to the user log.note("Computed %d tag%s in %.2gs: %s", len(compute_only), "s" if len(compute_only) != 1 else "", t1-t0, ", ".join(results.keys())) prof_extd = self.profile_extended fstr = ("{name:>25s} {p[mean]:<7s} ± {p[std]:<7s} " "({p[min]:<7s} | {p[max]:<7s})") log.remark("Profiling results per node: " "mean ± std (min|max) [s]\n%s", "\n".join([fstr.format(name=k, p={_k: "{:.2g}".format(_v) for _k, _v in v.items()}) for k, v in prof_extd['aggregated'].items() if k not in ('hashstr',)])) # TODO In the future, make this prettier; with proper time formatting! return results
# ......................................................................... # Helpers: Parsing transformation specifications
[docs] def _parse_trfs(self, *, select: dict, transform: Sequence[dict]) -> Sequence[dict]: """Parse the given arguments to bring them into a uniform format: a sequence of parameters for transformation operations. Args: select (dict): The shorthand to select certain objects from the DataManager. These may also include transformations. transform (Sequence[dict]): Actual transformation operations, carried out afterwards. Returns: Sequence[dict]: A sequence of transformation parameters that was brought into a uniform structure. Raises: TypeError: On invalid type within entry of ``select`` ValueError: When ``file_cache`` is given for selection from base """ # The to-be-populated list of transformations trfs = list() # Prepare arguments: make sure they are dicts and deep copies. select = copy.deepcopy(select) if select else {} transform = copy.deepcopy(transform) if transform else [] # First, parse the ``select`` argument. This contains a basic operation # to select data from the selection base (e.g. the DataManager) and # also allows to perform some operations on it. for tag, params in sorted(select.items()): if isinstance(params, str): path = params with_previous_result = False more_trfs = None salt = None omit_tag = False elif isinstance(params, dict): path = params['path'] with_previous_result = params.get('with_previous_result',False) more_trfs = params.get('transform') salt = params.get('salt') omit_tag = params.get('omit_tag', False) if 'file_cache' in params: raise ValueError("For selection from the selection base, " "the file cache is always disabled! " "The `file_cache` argument is thus not " "allowed; remove it from the selection " "for tag '{}'.".format(tag)) else: raise TypeError("Invalid type for '{}' entry within `select` " "argument! Got {} but expected string or dict." "".format(tag, type(params))) # If given, process the path by prepending the prefix if self._select_path_prefix: if self._select_path_prefix[-1] == PATH_JOIN_CHAR: path = self._select_path_prefix + path else: path = PATH_JOIN_CHAR.join([self._select_path_prefix,path]) # Construct parameters to select from the selection base. # Only assign a tag if there are no further transformations; # otherwise, the last additional transformation should set the tag. sel_trf = dict(operation='getitem', tag=None if (more_trfs or omit_tag) else tag, args=[self.select_base, path], kwargs=dict(), file_cache=dict(read=False, write=False)) # Carry additional parameters only if given if salt is not None: sel_trf['salt'] = salt # Now finished with the formulation of the select operation. trfs.append(sel_trf) if not more_trfs: # Done with this select operation. continue # else: there are additional transformations to be parsed and added for i, trf_params in enumerate(more_trfs): trf_params = _parse_dag_minimal_syntax(trf_params) # Might have to use the previous result ... if 'with_previous_result' not in trf_params: trf_params['with_previous_result'] = with_previous_result # Can now parse the regular syntax trf_params = _parse_dag_syntax(**trf_params) # If the tag is not to be omitted, the last transformation for # the selected tag needs to set the tag. if i+1 == len(more_trfs) and not omit_tag: if trf_params.get('tag'): raise ValueError("The tag of the last transform " "operation within a select routine " "cannot be set manually. Check the " "parameters for selection of tag " "'{}'.".format(tag)) # TODO Could actually allow multiple tags here ... # Add the tag to the parameters trf_params['tag'] = tag # Can append it now trfs.append(trf_params) # Now, parse the normal `transform` argument. The operations defined # here are added after the instructions from the `select` section. for trf_params in transform: trf_params = _parse_dag_minimal_syntax(trf_params) trfs.append(_parse_dag_syntax(**trf_params)) # Done parsing, yay. return trfs
# ......................................................................... # Helpers: Profiling
[docs] def _update_profile(self, **times): """Updates profiling information by adding the given time to the matching key. """ for key, t in times.items(): self._profile[key] = self._profile.get(key, 0.) + t
# ......................................................................... # Cache writing and reading # NOTE This is done here rather than in Transformation because this is the # more central entity and it is a bit easier ...
[docs] def _retrieve_from_cache_file(self, trf_hash: str, **load_kwargs) -> Tuple[bool, Any]: """Retrieves a transformation's result from a cache file.""" success, res = False, None cache_files = self.cache_files if trf_hash not in cache_files.keys(): # Bad luck, no cache file return success, res # Parse load options if 'exists_action' not in load_kwargs: load_kwargs['exists_action'] = 'skip_nowarn' # else: There was a file. Let the DataManager load it. file_ext = cache_files[trf_hash]['ext'] self.dm.load('dag_cache', loader=LOADER_BY_FILE_EXT[file_ext[1:]], base_path=self.cache_dir, glob_str=trf_hash + file_ext, target_path=DAG_CACHE_DM_PATH + "/{basename:}", required=True, **load_kwargs) # NOTE If a file was already loaded from the cache, it will not be # loaded again. Thus, the DataManager acts as a persistent # storage for loaded cache files. Consequently, these are shared # among all TransformationDAG objects. # Retrieve from the DataManager res = self.dm[DAG_CACHE_DM_PATH][trf_hash] # Done. success = True return success, res
[docs] def _write_to_cache_file(self, trf_hash: str, *, result: Any, ignore_groups: bool=True, attempt_pickling: bool=True, raise_on_error: bool=False, pkl_kwargs: dict=None, **save_kwargs) -> bool: """Writes the given result object to a hash file, overwriting existing ones. Args: trf_hash (str): The hash; will be used for the file name result (Any): The result object to write as a cache file ignore_groups (bool, optional): Whether to store groups. Disabled by default. attempt_pickling (bool, optional): Whether it should be attempted to store results that could not be stored via a dedicated storage function by pickling them. Enabled by default. raise_on_error (bool, optional): Whether to raise on error to store a result. Disabled by default; it is useful to enable this when debugging. pkl_kwargs (dict, optional): Arguments passed on to the pickle.dump function. **save_kwargs: Passed on to the chosen storage method. Returns: bool: Whether a cache file was saved Raises: NotImplementedError: When attempting to store instances of :py:class:`~dantro.base.BaseDataGroup` or a derived class RuntimeError: When ``raise_on_error`` was given and there was an error during saving. """ # Cannot store groups if isinstance(result, BaseDataGroup): if not ignore_groups: raise NotImplementedError("Cannot currently write dantro " "groups to a cache file. Sorry. Adjust the ignore_groups " "argument in the file cache write options for the " "transformation resulting in {}.".format(result)) return False # Make sure the directory exists os.makedirs(self.cache_dir, exist_ok=True) # Prepare the file path (still lacks an extension) fpath = os.path.join(self.cache_dir, trf_hash) # Go over the saving functions and see if the type agrees. If so, use # that function to write the data. for types, sfunc in DAG_CACHE_RESULT_SAVE_FUNCS.items(): if not isinstance(result, types): continue # else: type matched, invoke saving function try: sfunc(result, fpath, **save_kwargs) except Exception as exc: msg = ("Failed saving transformation cache file for result of " "type {} using storage function for type(s) {}. Value " "of result: {}. Additional keyword arguments: {}" "".format(type(result), types, result, save_kwargs)) if raise_on_error: raise RuntimeError(msg) from exc log.warning("%s. %s: %s", msg, exc.__class__.__name__, exc) # ... will attempt pickling below else: # Success return True # Reached the end of the loop without returning -> not saved yet if not attempt_pickling: return False # Try to pickle it try: with open(fpath + ".pkl", mode='wb') as pkl_file: pkl.dump(result, pkl_file, **(pkl_kwargs if pkl_kwargs else {})) except Exception as exc: msg = ("Failed saving transformation cache file. Cannot pickle " "result object of type {} and with value {}. Consider " "deactivating file caching or pickling for this " "transformation." "".format(type(result), result)) if raise_on_error: raise RuntimeError(msg) from exc log.warning("%s. %s: %s", msg, exc.__class__.__name__, exc) return False # else: Success return True