"""This module implements the DataManager class, the root of the data tree."""
import os
import copy
import datetime
import re
import glob
import logging
import warnings
from typing import Union, Callable, List, Tuple, Dict
from .base import PATH_JOIN_CHAR, BaseDataContainer, BaseDataGroup
from .groups import OrderedDataGroup
from .tools import fill_line, clear_line, recursive_update, load_yml
from ._hash import _hash
# Local constants
log = logging.getLogger(__name__)
# Exception classes ...........................................................
[docs]class DataManagerError(Exception):
"""All DataManager exceptions derive from this one"""
pass
[docs]class RequiredDataMissingError(DataManagerError):
"""Raised if required data was missing."""
pass
[docs]class MissingDataError(DataManagerError):
"""Raised if data was missing, but is not required."""
pass
[docs]class ExistingDataError(DataManagerError):
"""Raised if data already existed."""
pass
[docs]class ExistingGroupError(DataManagerError):
"""Raised if a group already existed."""
pass
[docs]class LoaderError(DataManagerError):
"""Raised if a data loader was not available"""
pass
[docs]class MissingDataWarning(UserWarning):
"""Used as warning instead of MissingDataError"""
pass
[docs]class ExistingDataWarning(UserWarning):
"""If there was data already existing ..."""
pass
[docs]class NoMatchWarning(UserWarning):
"""If there was no regex match"""
pass
# -----------------------------------------------------------------------------
[docs]class DataManager(OrderedDataGroup):
"""The DataManager is the root of a data tree, coupled to a specific data
directory.
It handles the loading of data and can be used for interactive work with
the data.
"""
# Use a base load configuration to start with
_BASE_LOAD_CFG = None
# Define some default groups (same syntax create_groups argument)
_DEFAULT_GROUPS = None
# Define as class variable what should be the default group type
_DATA_GROUP_DEFAULT_CLS = OrderedDataGroup
# For simple lookups, store class names in a dict; not set by default
_DATA_GROUP_CLASSES = None
# .........................................................................
# Initialization
[docs] def __init__(self, data_dir: str, *, name: str=None,
load_cfg: Union[dict, str]=None,
out_dir: Union[str, bool]="_output/{timestamp:}",
out_dir_kwargs: dict=None,
create_groups: List[Union[str, dict]]=None,
condensed_tree_params: dict=None):
"""Initializes a DataManager for the specified data directory.
Args:
data_dir (str): the directory the data can be found in. If this is
a relative path, it is considered relative to the current
working directory.
name (str, optional): which name to give to the DataManager. If no
name is given, the data directories basename will be used
load_cfg (Union[dict, str], optional): The base configuration used
for loading data. If a string is given, assumes it to be the
path to a YAML file and loads it using the
:py:func:`~dantro._yaml.load_yml` function. If None is given,
it can still be supplied to the
:py:meth:`~dantro.data_mngr.DataManager.load` method later on.
out_dir (Union[str, bool], optional): where output is written to.
If this is given as a relative path, it is considered relative
to the ``data_dir``. A formatting operation with the keys
``timestamp`` and ``name`` is performed on this, where the
latter is the name of the data manager. If set to False, no
output directory is created.
out_dir_kwargs (dict, optional): Additional arguments that affect
how the output directory is created.
create_groups (List[Union[str, dict]], optional): If given, these
groups will be created after initialization. If the list
entries are strings, the default group class will be used; if
they are dicts, the `name` key specifies the name of the group
and the `Cls` key specifies the type. If a string is given
instead of a type, the lookup happens from the
``_DATA_GROUP_CLASSES`` variable.
condensed_tree_params (dict, optional): If given, will set the
parameters used for the condensed tree representation.
Available options: ``max_level`` and ``condense_thresh``, where
the latter may be a callable.
See :py:meth:`dantro.base.BaseDataGroup._tree_repr` for more
information.
"""
# Find a name if none was given
if not name:
basename = os.path.basename(os.path.abspath(data_dir))
name = "{}_Manager".format(basename.replace(" ", "_"))
log.info("Initializing %s '%s'...", self.classname, name)
# Initialize as a data group via parent class
super().__init__(name=name)
# Set condensed tree parameters
if condensed_tree_params:
self._set_condensed_tree_params(**condensed_tree_params)
# Initialize directories
self.dirs = self._init_dirs(data_dir=data_dir, out_dir=out_dir,
**(out_dir_kwargs if out_dir_kwargs
else {}))
# Start out with the default load configuration or, if not given, with
# an empty one
self.load_cfg = {} if not self._BASE_LOAD_CFG else self._BASE_LOAD_CFG
# Resolve string arguments
if isinstance(load_cfg, str):
# Assume this is the path to a configuration file and load it
log.debug("Loading the default load config from a path:\n %s",
load_cfg)
load_cfg = load_yml(load_cfg)
# If given, use it to recursively update the base
if load_cfg:
self.load_cfg = recursive_update(self.load_cfg, load_cfg)
# Create default groups, as specified in the _DEFAULT_GROUPS class
# variable and the create_groups argument
if self._DEFAULT_GROUPS or create_groups:
# Parse both into a new list to iterate over
specs = self._DEFAULT_GROUPS if self._DEFAULT_GROUPS else []
if create_groups:
specs += create_groups
log.debug("Creating %d empty groups from defaults and/or given "
"initialization arguments ...", len(specs))
for spec in specs:
if isinstance(spec, dict):
# Got a more elaborate group specification
self.new_group(**spec)
else:
# Assume this is the group name; use the default class
self.new_group(spec)
# Done
log.debug("%s initialized.", self.logstr)
[docs] def _set_condensed_tree_params(self, **params):
"""Helper method to set the ``_COND_TREE_*`` class variables"""
available_keys = ('max_level', 'condense_thresh')
for key, value in params.items():
if key.lower() not in available_keys:
raise KeyError("Invalid condensed tree parameter: '{}'! The "
"available keys are: {}."
"".format(key, ", ".join(available_keys)))
setattr(self, '_COND_TREE_'+key.upper(), value)
[docs] def _init_dirs(self, *, data_dir: str, out_dir: Union[str, bool],
timestamp: float=None, timefstr: str="%y%m%d-%H%M%S",
exist_ok: bool=False) -> Dict[str, str]:
"""Initializes the directories managed by this DataManager and returns
a dictionary that stores the absolute paths to these directories.
If they do not exist, they will be created.
Args:
data_dir (str): the directory the data can be found in. If this is
a relative path, it is considered relative to the current
working directory.
out_dir (Union[str, bool]): where output is written to.
If this is given as a relative path, it is considered relative
to the **data directory**. A formatting operation with the
keys ``timestamp`` and ``name`` is performed on this, where
the latter is the name of the data manager. If set to False,
no output directory is created.
timestamp (float, optional): If given, use this time to generate
the `date` format string key. If not, uses the current time.
timefstr (str, optional): Format string to use for generating the
string representation of the current timestamp
exist_ok (bool, optional): Whether the output directory may exist.
Note that it only makes sense to set this to True if you can
be sure that there will be no file conflicts! Otherwise the
errors will just occur at a later stage.
Returns:
Dict[str, str]: The directory paths registered under certain keys,
e.g. ``data`` and ``out``.
"""
# Make the data directory absolute
log.debug("Received `data_dir` argument:\n %s", data_dir)
data_dir = os.path.abspath(data_dir)
# Save dictionary that will hold info on directories
dirs = dict(data=data_dir)
# See if an output directory should be created
if out_dir:
log.debug("Received `out_dir` argument:\n %s", out_dir)
# Make current date and time available for formatting operations
time = (datetime.datetime.fromtimestamp(timestamp) if timestamp
else datetime.datetime.now())
timestr = time.strftime(timefstr)
# Perform a format operation on the output directory
out_dir = out_dir.format(name=self.name.lower(), timestamp=timestr)
# If it is relative, assume it to be relative to the data directory
if not os.path.isabs(out_dir):
# By joining them together, out_dir is now relative
out_dir = os.path.join(data_dir, out_dir)
# Make path absolute and store in dict
dirs['out'] = os.path.abspath(out_dir)
# Create the directory
os.makedirs(dirs['out'], exist_ok=exist_ok)
else:
dirs['out'] = False
# Inform about the managed directories, then return
log.debug("Managed directories:\n%s",
"\n".join([" {:>8s} : {}".format(k, v)
for k, v in dirs.items()]))
return dirs
@property
def hashstr(self) -> str:
"""The hash of a DataManager is computed from its name and the coupled
data directory, which are regarded as the relevant parts. While other
parts of the DataManager are not invariant, it is characterized most by
the directory it is associated with.
As this is a string-based hash, it is not implemented as the __hash__
magic method but as a separate property.
WARNING Changing how the hash is computed for the DataManager will
invalidate all TransformationDAG caches.
"""
return _hash("<DataManager '{}' @ {}>".format(self.name,
self.dirs['data']))
[docs] def __hash__(self) -> int:
"""The hash of this DataManager, computed from the hashstr property"""
return hash(self.hashstr)
# .........................................................................
# Loading data
[docs] def load_from_cfg(self, *, load_cfg: dict=None,
update_load_cfg: dict=None,
exists_action: str='raise',
print_tree: Union[bool, str]=False) -> None:
"""Load multiple data entries using the specified load configuration.
Args:
load_cfg (dict, optional): The load configuration to use. If not
given, the one specified during initialization is used.
update_load_cfg (dict, optional): If given, it is used to update
the load configuration recursively
exists_action (str, optional): The behaviour upon existing data.
Can be: ``raise`` (default), ``skip``, ``skip_nowarn``,
``overwrite``, ``overwrite_nowarn``. With the ``*_nowarn``
values, no warning is given if an entry already existed.
print_tree (Union[bool, str], optional): If True, the full tree
representation of the DataManager is printed after the data
was loaded. If ``'condensed'``, the condensed tree will be
printed.
Raises:
TypeError: Raised if a given configuration entry was of invalid
type, i.e. not a dict
"""
# Determine which load configuration to use
if not load_cfg:
log.debug("No new load configuration given; will use load "
"configuration given at initialization.")
load_cfg = self.load_cfg
# Make sure to work on a copy, be it on the defaults or on the passed
load_cfg = copy.deepcopy(load_cfg)
if update_load_cfg:
# Recursively update with the given keywords
load_cfg = recursive_update(load_cfg, update_load_cfg)
log.debug("Updated the load configuration.")
log.hilight("Loading %d data entries ...", len(load_cfg))
# Loop over the data entries that were configured to be loaded
for entry_name, params in load_cfg.items():
# Check if this is of valid type
if not isinstance(params, dict):
raise TypeError("Got invalid load specifications for entry "
"'{}'! Expected dict, got {} with value '{}'. "
"Check the correctness of the given load "
"configuration!".format(entry_name,
type(params), params))
# Use the public method to load this single entry
self.load(entry_name, exists_action=exists_action,
print_tree=False, # to not have prints during loading
**params)
# All done
log.success("Successfully loaded %d data entries.", len(load_cfg))
# Finally, print the tree
if print_tree:
if print_tree == 'condensed':
print(self.tree_condensed)
else:
print(self.tree)
[docs] def load(self, entry_name: str, *, loader: str, enabled: bool=True,
glob_str: Union[str, List[str]], base_path: str=None,
target_group: str=None, target_path: str=None,
print_tree: Union[bool, str]=False,
load_as_attr: bool=False, **load_params) -> None:
"""Performs a single load operation.
Args:
entry_name (str): Name of this entry; will also be the name of the
created group or container, unless ``target_basename`` is given
loader (str): The name of the loader to use
enabled (bool, optional): Whether the load operation is enabled.
If not, simply returns without loading any data or performing
any further checks.
glob_str (Union[str, List[str]]): A glob string or a list of glob
strings by which to identify the files within ``data_dir`` that
are to be loaded using the given loader function
base_path (str, optional): The base directory to concatenate the
glob string to; if None, will use the DataManager's data
directory. With this option, it becomes possible to load data
from a path outside the associated data directory.
target_group (str, optional): If given, the files to be loaded will
be stored in this group. This may only be given if the argument
target_path is *not* given.
target_path (str, optional): The path to write the data to. This
can be a format string. It is evaluated for each file that has
been matched. If it is not given, the content is loaded to a
group with the name of this entry at the root level.
Available keys are: ``basename``, ``match`` (if ``path_regex``
is used, see ``**load_params``)
print_tree (Union[bool, str], optional): If True, the full tree
representation of the DataManager is printed after the data
was loaded. If ``'condensed'``, the condensed tree will be
printed.
load_as_attr (bool, optional): If True, the loaded entry will be
added not as a new DataContainer or DataGroup, but as an
attribute to an (already existing) object at ``target_path``.
The name of the attribute will be the ``entry_name``.
**load_params: Further loading parameters, all optional. These are
evaluated by :py:meth:`~dantro.data_mngr.DataManager._load`.
ignore (list):
The exact file names in this list will be ignored during
loading. Paths are seen as elative to the data directory
of the data manager.
required (bool):
If True, will raise an error if no files were found.
Default: False.
path_regex (str):
This pattern can be used to match the path of the file
that is being loaded. The match result is available to the
format string under the ``match`` key.
exists_action (str):
The behaviour upon existing data.
Can be: ``raise`` (default), ``skip``, ``skip_nowarn``,
``overwrite``, ``overwrite_nowarn``.
With ``*_nowarn`` values, no warning is given if an entry
already existed. Note that this is ignored when
the ``load_as_attr`` argument is given.
unpack_data (bool, optional):
If True, and ``load_as_attr`` is active, not the
DataContainer or DataGroup itself will be stored in the
attribute, but the content of its ``.data`` attribute.
progress_indicator (bool):
Whether to print a progress indicator or not. Default: True
parallel (bool):
If True, data is loaded in parallel. This feature is not
implemented yet!
any further kwargs:
passed on to the loader function
Returns:
None
Raises:
ValueError: Upon invalid combination of ``target_group`` and
``target_path`` arguments
"""
def glob_match_single(glob_str: Union[str, List[str]]) -> bool:
"""Returns True if the given glob str matches at most one file."""
return bool(isinstance(glob_str, str) and glob_str.find('*') < 0)
def check_target_path(target_path: str):
"""Check that the target path evaluates correctly."""
log.debug("Checking target path '%s' ...", target_path)
try:
_target_path = target_path.format(basename="basename",
match="match")
except (IndexError, KeyError) as err:
raise ValueError("Invalid argument `target_path`. Will not be "
"able to properly evaluate '{}' later due to "
"a {}: {}".format(target_path,
type(err), err)) from err
else:
log.debug("Target path will be: %s", _target_path)
if not enabled:
log.progress("Skipping loading of data entry '%s' ...", entry_name)
return
log.progress("Loading data entry '%s' ...", entry_name)
# Parse the arguments that result in the target path
if load_as_attr:
if not target_path:
raise ValueError("With `load_as_attr`, the `target_path` "
"argument needs to be given.")
# The target path should not be adjusted, as it points to the
# object to store the loaded data as attribute in.
log.debug("Will load this entry as attribute to the target path "
"'%s' ...", target_path)
# To communicate the attribute name, store it in the load_as_attr
# variable; otherwise it would require passing two arguments to
# _load
load_as_attr = entry_name
elif target_group:
if target_path:
raise ValueError("Received both arguments `target_group` and "
"`target_path`; make sure to only pass one "
"or none of them.")
if glob_match_single(glob_str):
target_path = target_group + "/" + entry_name
else:
target_path = target_group + "/{basename:}"
elif not target_path:
if glob_match_single(glob_str):
target_path = entry_name
else:
target_path = entry_name + "/{basename:}"
# else: target_path was given
# ...and check that it is working.
check_target_path(target_path)
# Try loading the data and handle specific DataManagerErrors
try:
num_files = self._load(target_path=target_path, loader=loader,
glob_str=glob_str, base_path=base_path,
load_as_attr=load_as_attr, **load_params)
except RequiredDataMissingError:
raise
except MissingDataError as err:
warnings.warn("No files were found to import!\n"+str(err),
MissingDataWarning)
return # Does not raise, but does not save anything either
except LoaderError:
raise
else:
# Everything loaded as desired
log.progress("Loaded all data for entry '%s'.\n", entry_name)
# Done with this entry. Print tree, if desired.
if print_tree:
if print_tree == 'condensed':
print(self.tree_condensed)
else:
print(self.tree)
[docs] def _load(self, *, target_path: str, loader: str,
glob_str: Union[str, List[str]], load_as_attr: Union[str, None],
base_path: str=None, ignore: List[str]=None,
required: bool=False, path_regex: str=None,
exists_action: str='raise', unpack_data: bool=False,
progress_indicator: bool=True, parallel: bool=False,
**loader_kwargs) -> int:
"""Helper function that loads a data entry to the specified path.
Args:
target_path (str): The path to load the result of the loader to.
This can be a format string; it is evaluated for each file.
Available keys are: basename, match (if ``path_regex`` is
given)
loader (str): The loader to use
glob_str (Union[str, List[str]]): A glob string or a list of glob
strings to match files in the data directory
load_as_attr (Union[str, None]): If a string, the entry will be
loaded into the object at ``target_path`` under a new attribute
with this name.
base_path (str, optional): The base directory to concatenate the
glob string to; if None, will use the DataManager's data
directory. With this option, it becomes possible to load data
from a path outside the associated data directory.
ignore (List[str], optional): The exact file names in this list
will be ignored during loading. Paths are seen as relative to
the data directory.
required (bool, optional): If True, will raise an error if no files
were found.
path_regex (str, optional): The regex applied to the relative path
of the files that were found. It is used to generate the name
of the target container. If not given, the basename is used.
exists_action (str, optional): The behaviour upon existing data.
Can be: ``raise`` (default), ``skip``, ``skip_nowarn``,
``overwrite``, ``overwrite_nowarn``. With ``*_nowarn`` values,
no warning is given if an entry already existed.
Note that this is ignored if ``load_as_attr`` is given.
unpack_data (bool, optional): If True, and ``load_as_attr`` is
active, not the DataContainer or DataGroup itself will be
stored in the attribute, but the content of its ``.data``
attribute.
progress_indicator (bool, optional): Whether to print a progress
indicator or not
parallel (bool, optional): If True, data is loaded in parallel -
not implemented yet!
**loader_kwargs: passed on to the loader function
Raises:
NotImplementedError: For ``parallel == True``
ValueError: Bad ``path_regex``
Returns:
int: Number of files that data was loaded from
"""
def resolve_loader(loader: str) -> Tuple[Callable, str, Callable]:
"""Resolves the loader function"""
load_func_name = '_load_' + loader.lower()
try:
load_func = getattr(self, load_func_name)
except AttributeError as err:
raise LoaderError("Loader '{}' was not available to {}! Make "
"sure to use a mixin class that supplies "
"the '{}' loader method."
"".format(loader, self.logstr,
load_func_name)) from err
else:
log.debug("Resolved '%s' loader function.", loader)
try:
TargetCls = getattr(load_func, 'TargetCls')
except AttributeError as err:
raise LoaderError("Load function {} misses required attribute "
"'TargetCls'. Check your mixin!"
"".format(load_func)) from err
return load_func, load_func_name, TargetCls
def create_files_list(*, glob_str: Union[str, List[str]],
ignore: List[str], base_path: str=None,
required: bool=False, sort: bool=False) -> list:
"""Create the list of file paths to load from.
Internally, this uses a set, thus ensuring that the paths are
unique. The set is converted to a list before returning.
Args:
glob_str (Union[str, List[str]]): The glob pattern or a list of
glob patterns
ignore (List[str]): The list of files to ignore
base_path (str, optional): The base path for the glob pattern;
use data directory, if not given.
required (bool, optional): Will lead to an error being raised
if no files could be matched
sort (bool, optional): If true, sorts the list before returning
Returns:
list: the file paths to load
Raises:
MissingDataError: If no files could be matched
RequiredDataMissingError: If no files could be matched but were
required.
"""
# Create a set to assure that all files are unique
files = set()
# Assure it is a list of strings
if isinstance(glob_str, str):
# Is a single glob string
# Put it into a list to handle the same as the given arg
glob_str = [glob_str]
# Assuming glob_str to be lists of strings now
log.debug("Got %d glob string(s) to create set of matching file "
"paths from.", len(glob_str))
# Handle base path, defaulting to the data directory
if base_path is None:
base_path = self.dirs['data']
log.debug("Using data directory as base path.")
else:
if not os.path.isabs(base_path):
raise ValueError("Given base_path argument needs be an "
"absolute path, was not: {}"
"".format(base_path))
# Go over the given glob strings and add to the files set
for gs in glob_str:
# Make the glob string absolute
gs = os.path.join(base_path, gs)
log.debug("Adding files that match glob string:\n %s", gs)
# Add to the set of files; this assures uniqueness of the paths
files.update(list(glob.glob(gs, recursive=True)))
# See if some files should be ignored
if ignore:
log.debug("Got list of files to ignore:\n %s", ignore)
# Make absolute and generate list of files to exclude
ignore = [os.path.join(self.dirs['data'], path)
for path in ignore]
log.debug("Removing them one by one now ...")
# Remove the elements one by one
while ignore:
rmf = ignore.pop()
try:
files.remove(rmf)
except KeyError:
log.debug("%s was not found in set of files.", rmf)
else:
log.debug("%s removed from set of files.", rmf)
# Now the file list is final
log.note("Found %d file%s to load.",
len(files), "s" if len(files) != 1 else "")
log.debug("\n %s", "\n ".join(files))
if not files:
# No files found; exit here, one way or another
if not required:
raise MissingDataError("No files found matching "
"`glob_str` {} (and ignoring {})."
"".format(glob_str, ignore))
raise RequiredDataMissingError("No files found matching "
"`glob_str` {} (and ignoring "
"{}) were found, but were "
"marked as required!"
"".format(glob_str, ignore))
# Convert to list
files = list(files)
# Sort, if asked to do so
if sort:
files.sort()
return files
def prepare_target_path(target_path: str, *, filepath: str,
path_sre=None) -> List[str]:
"""Prepare the target path"""
# The dict to be filled with formatting parameters
fps = dict()
# Extract the file basename (without extension)
fps['basename'] = os.path.splitext(os.path.basename(filepath))[0]
fps['basename'] = fps['basename'].lower()
# Use the specified regex pattern to extract a match
if path_sre:
try:
_match = path_sre.findall(filepath)[0]
except IndexError:
# nothing could be found
warnings.warn("Could not extract a name using the "
"regex pattern '{}' on the file path:\n"
"{}\nUsing the path's basename instead."
"".format(path_sre, filepath),
NoMatchWarning)
_match = fps['basename']
else:
log.debug("Matched '%s' in file path '%s'.",
_match, filepath)
fps['match'] = _match
# Parse the format string to generate the file path
log.debug("Parsing format string '%s' to generate target path ...",
target_path)
log.debug(" kwargs: %s", fps)
target_path = target_path.format(**fps)
log.debug("Generated target path: %s", target_path)
return target_path.split(PATH_JOIN_CHAR)
def skip_path(path: str, *, exists_action: str) -> bool:
"""Check whether a given path exists and — depending on the
`exists_action` – decides whether to skip this path or now.
Args:
path (str): The path to check for existence.
exists_action (str): The behaviour upon existing data. Can be:
raise, skip, skip_nowarn, overwrite, overwrite_nowarn.
The *_nowarn arguments suppress the warning
Returns:
bool: Whether to skip this path
Raises:
ExistingDataError: Raised when `exists_action == 'raise'`
ValueError: Raised for invalid `exists_action` value
"""
if path not in self:
# Does not exist yet -> no need to skip
return False
# else: path exists already
# NOTE that it is not known whether the path points to a group
# or to a container
_msg = ("Path '{}' already exists."
"".format(PATH_JOIN_CHAR.join(path)))
# Distinguish different actions
if exists_action == 'raise':
raise ExistingDataError(_msg + " Adjust argument "
"`exists_action` to allow skipping "
"or overwriting of existing entries.")
if exists_action in ['skip', 'skip_nowarn']:
if exists_action == 'skip':
warnings.warn(_msg
+ " Loading of this entry will be skipped.",
ExistingDataWarning)
return True # will lead to the data not being loaded
elif exists_action in ['overwrite', 'overwrite_nowarn']:
if exists_action == 'overwrite':
warnings.warn(_msg + " It will be overwritten!",
ExistingDataWarning)
return False # will lead to the data being loaded
else:
raise ValueError("Invalid value for `exists_action` "
"argument '{}'! Can be: raise, skip, "
"skip_nowarn, overwrite, overwrite_nowarn."
"".format(exists_action))
def store(obj: Union[BaseDataGroup, BaseDataContainer], *,
target_path: List[str], as_attr: Union[str, None],
unpack_data: bool) -> None:
"""Store the given `obj` at the supplied `path`.
Note that this will automatically overwrite, assuming that all
checks have been made prior to the call to this function.
Args:
obj (Union[BaseDataGroup, BaseDataContainer]): Object to store
target_path (List[str]): The path to store the object at
as_attr (Union[str, None]): If a string, store the object in
the attributes of the container or group at target_path
Raises:
ExistingDataError: If non-group-like data already existed at
that path
RequiredDataMissingError: If storing as attribute was selected
but there was no object at the given target_path
"""
# First, handle the (easy) case where the object is to be stored
# as the attribute at the target_path
if as_attr:
# Try to load the object at the target path
try:
target = self[target_path]
except KeyError as err:
raise RequiredDataMissingError("In order to store the "
"object {} at the target "
"path '{}', a group or "
"container already needs "
"to exist at that location "
"within {}."
"".format(obj.logstr,
target_path,
self.logstr)
) from err
# Check whether an attribute with that name already exists
if as_attr in target.attrs:
raise ExistingDataError("An attribute with the name '{}' "
"already exists in {}!"
"".format(as_attr, target.logstr))
# All checks passed. Can store it now, either directly or with
# unpacking of its data ...
if not unpack_data:
target.attrs[as_attr] = obj
else:
target.attrs[as_attr] = obj.data
log.debug("Stored %s as attribute '%s' of %s.",
obj.classname, as_attr, target.logstr)
# Done here. Return.
return
# Extract a target group path and a base name from path list
group_path = target_path[:-1]
basename = target_path[-1]
# Resolve the target group object; create it if necessary
# Need to check whether it is given at all. If not, write into the
# data manager directly
if not group_path:
# Write directly into data manager root
group = self
else:
# Need to retrieve or create the group
# The difficulty is that the path can also point to a container
# Need to assure here, that the group path points to a group
if group_path not in self:
# Needs to be created
self._create_groups(group_path)
elif not isinstance(self[group_path], BaseDataGroup):
# Already exists, but is no group. Cannot continue
group_path = PATH_JOIN_CHAR.join(group_path)
target_path = PATH_JOIN_CHAR.join(target_path)
raise ExistingDataError("The object at '{}' in {} is not "
"a group but a {}. Cannot store "
"{} there because the target path "
"'{}' requires it to be a group."
"".format(group_path, self.logstr,
type(self[group_path]),
obj.logstr, target_path))
# Now the group path will point to a group
group = self[group_path]
# Store data, if possible
if basename in group:
# Already exists. Delete the old one, then store the new one
del group[basename]
# Can add now
group.add(obj)
# Done
log.debug("Successfully stored %s at '%s'.",
_data.logstr, PATH_JOIN_CHAR.join(target_path))
# End of helper functions . . . . . . . . . . . . . . . . . . . . . . .
# Get the loader function
load_func, load_func_name, TargetCls = resolve_loader(loader)
# Create the list of file paths to load
files = create_files_list(glob_str=glob_str, ignore=ignore,
required=required, base_path=base_path,
sort=True)
# If a regex pattern was specified, compile it
path_sre = re.compile(path_regex) if path_regex else None
# Check if the `match` key is being used in the target_path
if path_sre is not None and target_path.find("{match:") < 0:
raise ValueError("Received the `path_regex` argument to match the "
"file path, but the `target_path` argument did "
"not contain the corresponding `{{match:}}` "
"placeholder. `target_path` value: '{}'."
"".format(target_path))
if parallel:
# TODO could be implemented by parallelising the below for loop
raise NotImplementedError("Cannot load in parallel yet.")
# Ready for loading files now . . . . . . . . . . . . . . . . . . . . .
# Go over the files and load them
for n, file in enumerate(files):
if progress_indicator:
line = " Loading {}/{} ...".format(n+1, len(files))
print(fill_line(line), end="\r")
# Prepare the target path (a list of strings)
_target_path = prepare_target_path(target_path, filepath=file,
path_sre=path_sre)
# Distinguish regular loading and loading as attribute
if not load_as_attr:
# Check if it is to be skipped
if skip_path(_target_path, exists_action=exists_action):
log.debug("Skipping file '%s' ...", file)
continue
# Prepare the target class, which will be filled by the load
# function; this assures that the name is already correct
_TargetCls = lambda **kws: TargetCls(name=_target_path[-1],
**kws)
else:
# For loading as attribute, the exists_action is not valid;
# that check is thus not needed. Also, the target class name
# does not come from the target path but from that argument
_TargetCls = lambda **kws: TargetCls(name=load_as_attr, **kws)
# Get the data
_data = load_func(file, TargetCls=_TargetCls, **loader_kwargs)
log.debug("Successfully loaded file '%s' into %s.",
file, _data.logstr)
# If this succeeded, store the data
store(_data, target_path=_target_path,
as_attr=load_as_attr, unpack_data=unpack_data)
# Done with this file. Go to next iteration
# Clear the line to get rid of the load indicator, if there was one
if progress_indicator:
clear_line()
# Done
log.debug("Finished loading data from %d file(s).", len(files))
return len(files)
[docs] def _contains_group(self, path: Union[str, List[str]], *,
base_group: BaseDataGroup=None) -> bool:
"""Recursively checks if the given path is available _and_ a group.
Args:
path (Union[str, List[str]]): The path to check.
base_group (BaseDataGroup): The group to start from. If not
given, will use self.
Returns:
bool: Whether the path points to a group
"""
def check(path: str, base_group: BaseDataGroup) -> bool:
"""Returns True if the object at path within base_group is
a group. False otherwise.
"""
return (path in base_group
and isinstance(base_group[path], BaseDataGroup))
if not isinstance(path, list):
path = path.split(PATH_JOIN_CHAR)
if not base_group:
base_group = self
if len(path) > 1:
# Need to continue recursively
if check(path[0], base_group):
return self._contains_group(path[1:],
base_group=base_group[path[0]])
return False
# End of recursion
return check(path[0], base_group)
[docs] def _create_groups(self, path: Union[str, List[str]], *,
base_group: BaseDataGroup=None,
GroupCls: Union[type, str]=None, exist_ok: bool=True):
"""Recursively create groups for the given path. Unlike new_group, this
also creates the groups at the intermediate paths.
Args:
path (Union[str, List[str]]): The path to create groups along
base_group (BaseDataGroup, optional): The group to start from. If
not given, uses self.
GroupCls (Union[type, str], optional): The class to use for
creating the groups or None if the _DATA_GROUP_DEFAULT_CLS is
to be used. If a string is given, lookup happens from the
_DATA_GROUPS_CLASSES variable.
exist_ok (bool, optional): Whether it is ok that groups along the
path already exist. These might also be of different type.
Default: True
Raises:
ExistingDataError: If not `exist_ok`
ExistingGroupError: If not `exist_ok` and a group already exists
"""
# Parse arguments
if isinstance(path, str):
path = path.split(PATH_JOIN_CHAR)
if base_group is None:
base_group = self
GroupCls = self._determine_group_class(GroupCls)
# Catch the disallowed case as early as possible
if path[0] in base_group:
# Check if it is a group that exists there
if isinstance(base_group[path[0]], BaseDataGroup):
if not exist_ok:
raise ExistingGroupError(path[0])
else:
# There is data (that is not a group) existing at the path.
# Cannot continue
raise ExistingDataError("Tried to create a group '{}' in {}, "
"but a container was already stored "
"at that path."
"".format(path[0], base_group.logstr))
# Create the group, if it does not yet exist
if path[0] not in base_group:
log.debug("Creating group '%s' in %s ...",
path[0], base_group.logstr)
base_group.new_group(path[0])
# path[0] is now created
# Check whether to continue recursion
if len(path) > 1:
# Continue recursion
self._create_groups(path[1:], base_group=base_group[path[0]],
GroupCls=GroupCls)
[docs] def _determine_group_class(self, Cls: Union[type, str]) -> type:
"""Helper function to determine the type of a group from an argument.
Args:
Cls (Union[type, str]): If None, uses the _DATA_GROUP_DEFAULT_CLS.
If a string, tries to extract it from the _DATA_GROUP_CLASSES
class variable. Otherwise, assumes this is already a type.
Returns:
type: The group class to use
Raises:
KeyError: If the string class name was not registered
ValueError: If no _DATA_GROUP_CLASSES variable was populated
"""
if Cls is None:
return self._DATA_GROUP_DEFAULT_CLS
if isinstance(Cls, str):
cls_name = Cls
if not self._DATA_GROUP_CLASSES:
raise ValueError("The class variable _DATA_GROUP_CLASSES is "
"empty; cannot look up class type by the "
"given name '{}'.".format(cls_name))
elif cls_name not in self._DATA_GROUP_CLASSES:
raise KeyError("The given class name '{}' was not registered "
"with this {}! Available classes: {}"
"".format(cls_name, self.classname,
self._DATA_GROUP_CLASSES))
# everything ok, retrieve the class type
return self._DATA_GROUP_CLASSES[cls_name]
# else: assume it is already a type and just return the given argument
return Cls
# .........................................................................
# Working with the data in the tree
[docs] def new_group(self, path: str, *, Cls: Union[type, str]=None, **kwargs):
"""Creates a new group at the given path.
This is a slightly advanced version of the new_group method of the
BaseDataGroup. It not only adjusts the default type, but also allows
more ways how to specify the type of the group to create.
Args:
path (str): Where to create the group. Note that the intermediates
of this path need to already exist.
Cls (Union[type, str], optional): If given, use this type to
create the group. If a string is given, resolves the type from
the _DATA_GROUP_CLASSES class variable. If None, uses the
default data group type of the data manager.
**kwargs: Passed on to Cls.__init__
Returns:
Cls: the created group
"""
# Use helper function to parse the group class correctly
Cls = self._determine_group_class(Cls)
return super().new_group(path, Cls=Cls, **kwargs)