"""This module implements the DataManager class, the root of the data tree."""
import copy
import datetime
import functools
import glob
import logging
import multiprocessing as mp
import os
import pathlib
import platform
import re
import time
import warnings
from typing import Callable, Dict, List, Tuple, Union
from ._hash import _hash
from ._import_tools import LazyLoader
from .base import PATH_JOIN_CHAR, BaseDataContainer, BaseDataGroup
from .exceptions import *
from .groups import OrderedDataGroup
from .tools import (
PoolCallbackHandler,
PoolErrorCallbackHandler,
clear_line,
ensure_dict,
ensure_posix_path_on_windows,
fill_line,
format_bytesize,
)
from .tools import format_time as _format_time
from .tools import (
glob_paths,
load_yml,
print_line,
recursive_update,
total_bytesize,
)
log = logging.getLogger(__name__)
pkl = LazyLoader("dill")
DATA_TREE_DUMP_EXT = ".d3"
"""File extension for data cache file"""
_fmt_time = lambda seconds: _format_time(seconds, ms_precision=2)
"""Locally used time formatting function"""
# -----------------------------------------------------------------------------
[docs]
def _load_file_wrapper(
filepath: str, *, dm: "DataManager", loader: str, **kwargs
) -> Tuple[BaseDataGroup, str]:
"""A wrapper around :py:meth:`~dantro.data_mngr.DataManager._load_file`
that is used for parallel loading via multiprocessing.Pool.
It takes care of resolving the loader function and instantiating the file-
loading method.
This function needs to be on the module scope such that it is pickleable.
For that reason, loader resolution also takes place here, because pickling
the load function may be problematic.
Args:
filepath (str): The path of the file to load data from
dm (DataManager): The DataManager instance to resolve the loader from
loader (str): The namer of the loader
**kwargs: Any further loading arguments.
Returns:
Tuple[BaseDataContainer, str]: The return value of
:py:meth:`~dantro.data_mngr.DataManager._load_file`.
"""
load_func, TargetCls = dm._resolve_loader(loader)
return dm._load_file(
filepath,
loader=loader,
load_func=load_func,
TargetCls=TargetCls,
**kwargs,
)
[docs]
def _parse_parallel_opts(
files: List[str],
*,
enabled: bool = True,
processes: int = None,
min_files: int = 2,
min_total_size: int = None,
cpu_count: int = os.cpu_count(),
) -> int:
"""Parser function for the parallel file loading options dict
Args:
files (List[str]): List of files that are to be loaded
enabled (bool, optional): Whether to use parallel loading. If True,
the threshold arguments will still need to be fulfilled.
processes (int, optional): The number of processors to use; if this is
a negative integer, will deduce from available CPU count.
min_files (int, optional): If there are fewer files to load than this
number, will *not* use parallel loading.
min_total_size (int, optional): If the total file size is smaller than
this file size (in bytes), will *not* use parallel loading.
cpu_count (int, optional): Number of CPUs to consider "available".
Defaults to :py:func:`os.cpu_count`, i.e. the number of actually
available CPUs.
Returns:
int: number of processes to use. Will return 1 if loading should *not*
happen in parallel. Additionally, this number will never be larger
than the number of files in order to prevent unnecessary processes.
"""
if not enabled:
return 1
# Minimum files threshold
if min_files and len(files) < min_files:
log.remark(
"Not loading in parallel: there are only %d < %d files to load.",
len(files),
min_files,
)
return 1
# Minimum total file size
if min_total_size:
min_total_size = int(min_total_size)
fs = total_bytesize(files)
if fs < min_total_size:
log.remark(
"Not loading in parallel: total file size was %s < %s.",
format_bytesize(fs),
format_bytesize(min_total_size),
)
return 1
# Number of processes to use
if processes:
processes = processes if processes >= 0 else (cpu_count + processes)
# Little warning if choosing too many processes
if processes > cpu_count:
log.caution(
"Loading with more processes (%d) than there are CPUs (%d) "
"will typically slow down file loading!",
processes,
cpu_count,
)
else:
processes = cpu_count
return min(processes, len(files))
# -----------------------------------------------------------------------------
[docs]
class DataManager(OrderedDataGroup):
"""The DataManager is the root of a data tree, coupled to a specific data
directory.
It handles the loading of data and can be used for interactive work with
the data.
"""
# Use a base load configuration to start with
_BASE_LOAD_CFG = None
# Define some default groups (same syntax create_groups argument)
_DEFAULT_GROUPS = None
# Define as class variable what should be the default group type
_NEW_GROUP_CLS: type = OrderedDataGroup
# The default tree file cache path, parsed by _parse_file_path
_DEFAULT_TREE_CACHE_PATH = ".tree_cache.d3"
# .. Initialization .......................................................
[docs]
def __init__(
self,
data_dir: str,
*,
name: str = None,
load_cfg: Union[dict, str] = None,
out_dir: Union[str, bool] = "_output/{timestamp:}",
out_dir_kwargs: dict = None,
create_groups: List[Union[str, dict]] = None,
condensed_tree_params: dict = None,
default_tree_cache_path: str = None,
):
"""Initializes a DataManager for the specified data directory.
Args:
data_dir (str): the directory the data can be found in. If this is
a relative path, it is considered relative to the current
working directory.
name (str, optional): which name to give to the DataManager. If no
name is given, the data directories basename will be used
load_cfg (Union[dict, str], optional): The base configuration used
for loading data. If a string is given, assumes it to be the
path to a YAML file and loads it using the
:py:func:`~yayaml.io.load_yml` function. If None is given,
it can still be supplied to the
:py:meth:`~dantro.data_mngr.DataManager.load` method later on.
out_dir (Union[str, bool], optional): where output is written to.
If this is given as a relative path, it is considered relative
to the ``data_dir``. A formatting operation with the keys
``timestamp`` and ``name`` is performed on this, where the
latter is the name of the data manager. If set to False, no
output directory is created.
out_dir_kwargs (dict, optional): Additional arguments that affect
how the output directory is created.
create_groups (List[Union[str, dict]], optional): If given, these
groups will be created after initialization. If the list
entries are strings, the default group class will be used; if
they are dicts, the `name` key specifies the name of the group
and the `Cls` key specifies the type. If a string is given
instead of a type, the lookup happens from the
``_DATA_GROUP_CLASSES`` variable.
condensed_tree_params (dict, optional): If given, will set the
parameters used for the condensed tree representation.
Available options: ``max_level`` and ``condense_thresh``, where
the latter may be a callable.
See :py:meth:`dantro.base.BaseDataGroup._tree_repr` for more
information.
default_tree_cache_path (str, optional): The path to the default
tree cache file. If not given, uses the value from the class
variable ``_DEFAULT_TREE_CACHE_PATH``. Whichever value was
chosen is then prepared using the
:py:meth:`~dantro.data_mngr.DataManager._parse_file_path`
method, which regards relative paths as being relative to the
associated data directory.
"""
# Find a name if none was given
if not name:
basename = os.path.basename(os.path.abspath(data_dir))
name = "{}_Manager".format(basename.replace(" ", "_"))
log.info("Initializing %s '%s'...", self.classname, name)
# Initialize as a data group via parent class
super().__init__(name=name)
# Set condensed tree parameters
if condensed_tree_params:
self._set_condensed_tree_params(**condensed_tree_params)
# Initialize directories
self.dirs = self._init_dirs(
data_dir=data_dir,
out_dir=out_dir,
**(out_dir_kwargs if out_dir_kwargs else {}),
)
# Parse the default tree cache path
_tcp = (
default_tree_cache_path
if default_tree_cache_path
else self._DEFAULT_TREE_CACHE_PATH
)
self._tree_cache_path = self._parse_file_path(_tcp)
# Start out with the default load configuration or, if not given, with
# an empty one
self.load_cfg = {} if not self._BASE_LOAD_CFG else self._BASE_LOAD_CFG
# Resolve string arguments
if isinstance(load_cfg, str):
# Assume this is the path to a configuration file and load it
log.debug(
"Loading the default load config from a path:\n %s", load_cfg
)
load_cfg = load_yml(load_cfg)
# If given, use it to recursively update the base
if load_cfg:
self.load_cfg = recursive_update(self.load_cfg, load_cfg)
# Create default groups, as specified in the _DEFAULT_GROUPS class
# variable and the create_groups argument
if self._DEFAULT_GROUPS or create_groups:
# Parse both into a new list to iterate over
specs = self._DEFAULT_GROUPS if self._DEFAULT_GROUPS else []
if create_groups:
specs += create_groups
log.debug(
"Creating %d empty groups from defaults and/or given "
"initialization arguments ...",
len(specs),
)
for spec in specs:
if isinstance(spec, dict):
# Got a more elaborate group specification
self.new_group(**spec)
else:
# Assume this is the group name; use the default class
self.new_group(spec)
# Done
log.debug("%s initialized.", self.logstr)
[docs]
def _set_condensed_tree_params(self, **params):
"""Helper method to set the ``_COND_TREE_*`` class variables"""
available_keys = ("max_level", "condense_thresh")
for key, value in params.items():
if key.lower() not in available_keys:
raise KeyError(
"Invalid condensed tree parameter: '{}'! The "
"available keys are: {}."
"".format(key, ", ".join(available_keys))
)
setattr(self, "_COND_TREE_" + key.upper(), value)
[docs]
def _init_dirs(
self,
*,
data_dir: str,
out_dir: Union[str, bool],
timestamp: float = None,
timefstr: str = "%y%m%d-%H%M%S",
exist_ok: bool = False,
) -> Dict[str, str]:
"""Initializes the directories managed by this DataManager and returns
a dictionary that stores the absolute paths to these directories.
If they do not exist, they will be created.
Args:
data_dir (str): the directory the data can be found in. If this is
a relative path, it is considered relative to the current
working directory.
out_dir (Union[str, bool]): where output is written to.
If this is given as a relative path, it is considered relative
to the **data directory**. A formatting operation with the
keys ``timestamp`` and ``name`` is performed on this, where
the latter is the name of the data manager. If set to False,
no output directory is created.
timestamp (float, optional): If given, use this time to generate
the `date` format string key. If not, uses the current time.
timefstr (str, optional): Format string to use for generating the
string representation of the current timestamp
exist_ok (bool, optional): Whether the output directory may exist.
Note that it only makes sense to set this to True if you can
be sure that there will be no file conflicts! Otherwise the
errors will just occur at a later stage.
Returns:
Dict[str, str]: The directory paths registered under certain keys,
e.g. ``data`` and ``out``.
"""
# Make the data directory absolute
log.debug("Received `data_dir` argument:\n %s", data_dir)
data_dir = os.path.abspath(data_dir)
# Save dictionary that will hold info on directories
dirs = dict(data=data_dir)
# See if an output directory should be created
if out_dir:
log.debug("Received `out_dir` argument:\n %s", out_dir)
# Make current date and time available for formatting operations
time = (
datetime.datetime.fromtimestamp(timestamp)
if timestamp
else datetime.datetime.now()
)
timestr = time.strftime(timefstr)
# Perform a format operation on the output directory
out_dir = out_dir.format(name=self.name.lower(), timestamp=timestr)
# If it is relative, assume it to be relative to the data directory
if not os.path.isabs(out_dir):
# By joining them together, out_dir is now relative
out_dir = os.path.join(data_dir, out_dir)
# Make path absolute and store in dict
dirs["out"] = os.path.abspath(out_dir)
# Create the directory
os.makedirs(dirs["out"], exist_ok=exist_ok)
else:
dirs["out"] = False
# Inform about the managed directories, then return
log.debug(
"Managed directories:\n%s",
"\n".join([f" {k:>8s} : {v}" for k, v in dirs.items()]),
)
return dirs
@property
def hashstr(self) -> str:
"""The hash of a DataManager is computed from its name and the coupled
data directory, which are regarded as the relevant parts. While other
parts of the DataManager are not invariant, it is characterized most by
the directory it is associated with.
On all operating systems the Posix representation of the path is used
for consistency purposes.
As this is a string-based hash, it is not implemented as the __hash__
magic method but as a separate property.
WARNING Changing how the hash is computed for the DataManager will
invalidate all TransformationDAG caches.
"""
if platform.system() == "Windows":
# Remove the C: from the path
_data_dir = ensure_posix_path_on_windows(self.dirs["data"])[2:]
else:
_data_dir = self.dirs["data"]
return _hash(f"<DataManager '{self.name}' @ {_data_dir}>")
[docs]
def __hash__(self) -> int:
"""The hash of this DataManager, computed from the hashstr property"""
return hash(self.hashstr)
@property
def tree_cache_path(self) -> str:
"""Absolute path to the default tree cache file"""
return self._tree_cache_path
@property
def tree_cache_exists(self) -> bool:
"""Whether the tree cache file exists"""
return os.path.isfile(self._tree_cache_path)
# .. Loading data .........................................................
@property
def available_loaders(self) -> List[str]:
"""Returns a sorted list of available loader function names"""
def is_load_func(attr: str):
return attr.startswith("_load_") and hasattr(
getattr(self, attr), "TargetCls"
)
# Loaders can be available both as a method and as a lookup from the
# data loader registry
# TODO Once we can be sure that all method loaders are also registered,
# this should be simplified to return self._loader_registry.keys()
method_loaders = [s[6:] for s in dir(self) if is_load_func(s)]
additionally_registered_loaders = [
name
for name in self._loader_registry
if name not in method_loaders
]
return sorted(method_loaders + additionally_registered_loaders)
@property
def _loader_registry(self) -> "DataLoaderRegistry":
"""Retrieves the data loader registry"""
from .data_loaders._registry import DATA_LOADERS
return DATA_LOADERS
[docs]
def load_from_cfg(
self,
*,
load_cfg: dict = None,
update_load_cfg: dict = None,
exists_action: str = "raise",
print_tree: Union[bool, str] = False,
) -> None:
"""Load multiple data entries using the specified load configuration.
Args:
load_cfg (dict, optional): The load configuration to use. If not
given, the one specified during initialization is used.
update_load_cfg (dict, optional): If given, it is used to update
the load configuration recursively
exists_action (str, optional): The behaviour upon existing data.
Can be: ``raise`` (default), ``skip``, ``skip_nowarn``,
``overwrite``, ``overwrite_nowarn``. With the ``*_nowarn``
values, no warning is given if an entry already existed.
print_tree (Union[bool, str], optional): If True, the full tree
representation of the DataManager is printed after the data
was loaded. If ``'condensed'``, the condensed tree will be
printed.
Raises:
TypeError: Raised if a given configuration entry was of invalid
type, i.e. not a dict
"""
# Determine which load configuration to use
if not load_cfg:
log.debug(
"No new load configuration given; will use load "
"configuration given at initialization."
)
load_cfg = self.load_cfg
# Make sure to work on a copy, be it on the defaults or on the passed
load_cfg = copy.deepcopy(load_cfg)
if update_load_cfg:
# Recursively update with the given keywords
load_cfg = recursive_update(load_cfg, update_load_cfg)
log.debug("Updated the load configuration.")
log.hilight("Loading %d data entries ...", len(load_cfg))
# Loop over the data entries that were configured to be loaded
for entry_name, params in load_cfg.items():
# Check if this is of valid type
if not isinstance(params, dict):
raise TypeError(
"Got invalid load specifications for entry "
f"'{entry_name}'! Expected dict, got {type(params)} with "
f"value '{params}'. Check the correctness of the given "
"load configuration!"
)
# Use the public method to load this single entry
self.load(
entry_name,
exists_action=exists_action,
print_tree=False, # to not have prints during loading
**params,
)
# All done
log.success("Successfully loaded %d data entries.", len(load_cfg))
# Finally, print the tree
if print_tree:
if print_tree == "condensed":
print(self.tree_condensed)
else:
print(self.tree)
[docs]
def load(
self,
entry_name: str,
*,
loader: str,
enabled: bool = True,
glob_str: Union[str, List[str]],
base_path: str = None,
target_group: str = None,
target_path: str = None,
print_tree: Union[bool, str] = False,
load_as_attr: bool = False,
parallel: Union[bool, dict] = False,
**load_params,
) -> None:
"""Performs a single load operation.
Args:
entry_name (str): Name of this entry; will also be the name of the
created group or container, unless ``target_basename`` is given
loader (str): The name of the loader to use
enabled (bool, optional): Whether the load operation is enabled.
If not, simply returns without loading any data or performing
any further checks.
glob_str (Union[str, List[str]]): A glob string or a list of glob
strings by which to identify the files within ``data_dir`` that
are to be loaded using the given loader function
base_path (str, optional): The base directory to concatenate the
glob string to; if None, will use the DataManager's data
directory. With this option, it becomes possible to load data
from a path outside the associated data directory.
target_group (str, optional): If given, the files to be loaded will
be stored in this group. This may only be given if the argument
target_path is *not* given.
target_path (str, optional): The path to write the data to. This
can be a format string. It is evaluated for each file that has
been matched. If it is not given, the content is loaded to a
group with the name of this entry at the root level.
Available keys are: ``basename``, ``match`` (if ``path_regex``
is used, see ``**load_params``)
print_tree (Union[bool, str], optional): If True, the full tree
representation of the DataManager is printed after the data
was loaded. If ``'condensed'``, the condensed tree will be
printed.
load_as_attr (bool, optional): If True, the loaded entry will be
added not as a new DataContainer or DataGroup, but as an
attribute to an (already existing) object at ``target_path``.
The name of the attribute will be the ``entry_name``.
parallel (Union[bool, dict]): If True, data is loaded in parallel.
If a dict, can supply more options:
- ``enabled``: whether to use parallel loading
- ``processes``: how many processes to use; if None, will
use as many as are available. For negative integers, will
use ``os.cpu_count() + processes`` processes.
- ``min_files``: if given, will fall back to non-parallel
loading if fewer than the given number of files were
matched by ``glob_str``
- ``min_size``: if given, specifies the minimum *total*
size of all matched files (in bytes) below which to fall
back to non-parallel loading
Note that a single file will *never* be loaded in parallel and
there will never be more processes used than files that were
selected to be loaded.
Parallel loading incurs a constant overhead and is typically
only speeding up data loading if the task is CPU-bound. Also,
it requires the data tree to be fully serializable.
**load_params: Further loading parameters, all optional. These are
evaluated by :py:meth:`~dantro.data_mngr.DataManager._load`.
ignore (list):
The exact file names in this list will be ignored during
loading. Paths are seen as elative to the data directory
of the data manager.
required (bool):
If True, will raise an error if no files were found.
Default: False.
path_regex (str):
This pattern can be used to match a part of the file path
that is being loaded. The match result is available to the
format string under the ``match`` key.
See :py:meth:`._prepare_target_path` for more information.
exists_action (str):
The behaviour upon existing data.
Can be: ``raise`` (default), ``skip``, ``skip_nowarn``,
``overwrite``, ``overwrite_nowarn``.
With ``*_nowarn`` values, no warning is given if an entry
already existed. Note that this is ignored when
the ``load_as_attr`` argument is given.
unpack_data (bool, optional):
If True, and ``load_as_attr`` is active, not the
DataContainer or DataGroup itself will be stored in the
attribute, but the content of its ``.data`` attribute.
progress_indicator (bool):
Whether to print a progress indicator or not. Default: True
any further kwargs:
passed on to the loader function
Returns:
None
Raises:
ValueError: Upon invalid combination of ``target_group`` and
``target_path`` arguments
"""
def glob_match_single(glob_str: Union[str, List[str]]) -> bool:
"""Returns True if the given glob str matches at most one file."""
return bool(isinstance(glob_str, str) and glob_str.find("*") < 0)
# Initial checks
if not enabled:
log.progress("Skipping loading of data entry '%s' ...", entry_name)
return
log.progress("Loading data entry '%s' ...", entry_name)
t0 = time.time()
# Parse the arguments that result in the target path
if load_as_attr:
if not target_path:
raise ValueError(
"With `load_as_attr`, the `target_path` "
"argument needs to be given."
)
# The target path should not be adjusted, as it points to the
# object to store the loaded data as attribute in.
log.debug(
"Will load this entry as attribute to the target path "
"'%s' ...",
target_path,
)
# To communicate the attribute name, store it in the load_as_attr
# variable; otherwise it would require passing on two arguments ...
load_as_attr = entry_name
elif target_group:
if target_path:
raise ValueError(
"Received both arguments `target_group` and "
"`target_path`; make sure to only pass one "
"or none of them."
)
if glob_match_single(glob_str):
target_path = target_group + "/" + entry_name
else:
target_path = target_group + "/{basename:}"
elif not target_path:
if glob_match_single(glob_str):
target_path = entry_name
else:
target_path = entry_name + "/{basename:}"
# else: target_path was given
# Try loading the data and handle specific DataManagerErrors
try:
num_files, num_success = self._load(
target_path=target_path,
loader=loader,
glob_str=glob_str,
base_path=base_path,
load_as_attr=load_as_attr,
parallel=parallel,
**load_params,
)
except RequiredDataMissingError:
raise
except MissingDataError as err:
warnings.warn(
"No files were found to import!\n" + str(err),
MissingDataWarning,
)
return # Does not raise, but does not save anything either
except LoaderError:
raise
else:
# Everything loaded as desired
log.progress(
"Loaded all data for entry '%s' in %s.\n",
entry_name,
_fmt_time(time.time() - t0),
)
# Done with this entry. Print tree, if desired.
if print_tree == "condensed":
print(self.tree_condensed)
elif print_tree:
print(self.tree)
[docs]
def _load(
self,
*,
target_path: str,
loader: str,
glob_str: Union[str, List[str]],
include_files: bool = True,
include_directories: bool = True,
load_as_attr: Union[str, None] = False,
base_path: str = None,
ignore: List[str] = None,
required: bool = False,
path_regex: str = None,
exists_action: str = "raise",
unpack_data: bool = False,
progress_indicator: bool = True,
parallel: Union[bool, dict] = False,
**loader_kwargs,
) -> Tuple[int, int]:
"""Helper function that loads a data entry to the specified path.
Args:
target_path (str): The path to load the result of the loader to.
This can be a format string; it is evaluated for each file.
Available keys are: basename, match (if ``path_regex`` is
given)
loader (str): The loader to use
glob_str (Union[str, List[str]]): A glob string or a list of glob
strings to match files in the data directory
include_files (bool, optional): If false, will exclude paths that
point to files.
include_directories (bool, optional): If false, will exclude paths
that point to directories.
load_as_attr (Union[str, None], optional): If a string, the entry
will be loaded into the object at ``target_path`` under a new
attribute with this name.
base_path (str, optional): The base directory to concatenate the
glob string to; if None, will use the DataManager's data
directory. With this option, it becomes possible to load data
from a path outside the associated data directory.
ignore (List[str], optional): The exact file names in this list
will be ignored during loading. Paths are seen as relative to
the data directory.
required (bool, optional): If True, will raise an error if no files
were found or if loading of a file failed.
path_regex (str, optional): The regex applied to the relative path
of the files that were found. It is used to generate the name
of the target container. If not given, the basename is used.
exists_action (str, optional): The behaviour upon existing data.
Can be: ``raise`` (default), ``skip``, ``skip_nowarn``,
``overwrite``, ``overwrite_nowarn``. With ``*_nowarn`` values,
no warning is given if an entry already existed.
Note that this is ignored if ``load_as_attr`` is given.
unpack_data (bool, optional): If True, and ``load_as_attr`` is
active, not the DataContainer or DataGroup itself will be
stored in the attribute, but the content of its ``.data``
attribute.
progress_indicator (bool, optional): Whether to print a progress
indicator or not
parallel (Union[bool, dict], optional): If True, data is loaded in parallel.
If a dict, can supply more options:
- ``enabled``: whether to use parallel loading
- ``processes``: how many processes to use; if None, will
use as many as are available. For negative integers, will
use ``os.cpu_count() + processes`` processes.
- ``min_files``: if given, will fall back to non-parallel
loading if fewer than the given number of files were
matched by ``glob_str``
- ``min_size``: if given, specifies the minimum *total*
size of all matched files (in bytes) below which to fall
back to non-parallel loading
Note that a single file will *never* be loaded in parallel and
there will never be more processes used than files that were
selected to be loaded.
Parallel loading incurs a constant overhead and is typically
only speeding up data loading if the task is CPU-bound. Also,
it requires the data tree to be fully serializable.
**loader_kwargs: passed on to the loader function
No Longer Returned:
Tuple[int, int]: Tuple of number of files that matched the glob
strings, *including* those that may have been skipped, and
number of successfully loaded and stored entries
"""
def _print_line(s: str):
if progress_indicator:
print_line(s)
# Create the list of file paths to load
files, _base_path = self._resolve_path_list(
glob_str=glob_str,
ignore=ignore,
required=required,
base_path=base_path,
include_files=include_files,
include_directories=include_directories,
sort=True,
)
# If a regex pattern was specified, compile it
path_sre = re.compile(path_regex) if path_regex else None
# Parse the parallel argument, assuming that it's a dict or boolean
if isinstance(parallel, bool):
parallel = dict(enabled=parallel)
elif isinstance(parallel, int):
parallel = dict(enabled=True, processes=parallel)
num_procs = _parse_parallel_opts(files, **parallel)
# Loading . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
num_success = 0
if num_procs <= 1:
# Get the loader function; it's the same for all files
load_func, TargetCls = self._resolve_loader(loader)
# Go over the files and load them
for n, filepath in enumerate(files):
self._progress_info_str = f" Loading {n+1}/{len(files)} ..."
_print_line(self._progress_info_str)
# Loading of the file and storing of the resulting object
_obj, _target_path = self._load_file(
filepath,
loader=loader,
load_func=load_func,
target_path=target_path,
path_sre=path_sre,
load_as_attr=load_as_attr,
TargetCls=TargetCls,
required=required,
_base_path=_base_path,
**loader_kwargs,
)
num_success += self._store_object(
_obj,
target_path=_target_path,
as_attr=load_as_attr,
unpack_data=unpack_data,
exists_action=exists_action,
)
else:
# Set up callback handlers, load function, and other pool arguments
cb = PoolCallbackHandler(
len(files), silent=(not progress_indicator)
)
ecb = PoolErrorCallbackHandler()
_load_file = functools.partial(
_load_file_wrapper,
dm=self,
loader=loader,
target_path=target_path,
path_sre=path_sre,
load_as_attr=load_as_attr,
required=True, # errors are handled via ErrorCallbackHandler
_base_path=_base_path,
**loader_kwargs,
)
# Inform the user about what's going to happen
# Important to set the progress info string here, which loaders
# may use to inform about the progress themselves (by concurrently
# writing to the same line in terminal...)
log.remark(
"Loading %d files using %d processes ...",
len(files),
num_procs,
)
self._progress_info_str = " Loading in parallel ..."
_print_line(" Loading ...")
# Create pool and populate with tasks, storing the AsyncResults
with mp.Pool(num_procs) as pool:
results = [
pool.apply_async(
_load_file, (_f,), callback=cb, error_callback=ecb
)
for _f in files
]
pool.close()
pool.join()
# Merge the results ...
_print_line(" Merging results ...")
for result in results:
try:
_obj, _target_path = result.get()
except Exception as exc:
ecb.track_error(exc)
continue
num_success += self._store_object(
_obj,
target_path=_target_path,
as_attr=load_as_attr,
unpack_data=unpack_data,
exists_action=exists_action,
)
if ecb.errors:
_errors = "\n\n".join(
[f" - {e.__class__.__name__}: {e}" for e in ecb.errors]
)
err_msg = (
f"There were {len(ecb.errors)} errors during parallel "
f"loading of data from {len(files)} files!\n"
"The errors were:\n\n"
f"{_errors}\n\n"
"Check if the errors may be caused by loading in "
"parallel, e.g. because of unpickleable objects in the "
"loaded data tree."
)
if required:
raise DataLoadingError(
err_msg + "\nUnset `required` to ignore this error."
)
log.error(err_msg + "\nSet `required` to raise an error.")
# Clear the line to get rid of the progress indicator in case there
# were no errors (otherwise the last message might get chopped off)
if progress_indicator and num_success == len(files):
clear_line()
log.debug("Finished loading data from %d file(s).", len(files))
return len(files), num_success
[docs]
def _load_file(
self,
filepath: str,
*,
loader: str,
load_func: Callable,
target_path: str,
path_sre: Union[re.Pattern, None],
load_as_attr: str,
TargetCls: type,
required: bool,
_base_path: str,
target_path_kwargs: dict = None,
**loader_kwargs,
) -> Tuple[Union[None, BaseDataContainer], List[str]]:
"""Loads the data of a single file into a dantro object and returns
the loaded object (or None) and the parsed target path key sequence.
"""
# Prepare the target path (will be a key sequence, ie. list of keys)
_target_path = self._prepare_target_path(
target_path,
filepath=filepath,
path_sre=path_sre,
base_path=_base_path,
**ensure_dict(target_path_kwargs),
)
# Distinguish regular loading and loading as attribute, and prepare the
# target class correspondingly to assure that the name is already
# correct. An object of the target class will then be filled by the
# load function.
if load_as_attr:
_name = load_as_attr
else:
_name = _target_path[-1]
_TargetCls = lambda **kws: TargetCls(name=_name, **kws)
# Let the load function retrieve the data
try:
_data = load_func(filepath, TargetCls=_TargetCls, **loader_kwargs)
except Exception as exc:
err_msg = (
f"Failed loading file {filepath} with '{loader}' "
f"loader!\nGot {exc.__class__.__name__}: {exc}"
)
if required:
raise DataLoadingError(
f"{err_msg}\nUnset `required` to ignore this error."
) from exc
log.error("%s\nSet `required` to raise an error.", err_msg)
return None, _target_path
log.debug(
"Successfully loaded file '%s' into %s.", filepath, _data.logstr
)
return _data, _target_path
[docs]
def _resolve_loader(self, loader: str) -> Tuple[Callable, type]:
"""Resolves the loader function and returns a 2-tuple containing the
load function and the declared dantro target type to load data to.
"""
def resolve_from_registry(name: str) -> Callable:
load_func = self._loader_registry[loader]
log.debug(
"Got load func '%s' from registry, wrapping it ...", name
)
# Need to supply self as first positional argument to comply to the
# signature of the method-based invocation.
# This in turn requires to carry over some attributes that the
# decorator sets and that functools.partial does not carry over.
wrapped = functools.partial(load_func, self)
wrapped.__doc__ = load_func.__doc__
wrapped._orig = load_func
for attr_name in dir(load_func):
if attr_name.startswith("__"):
continue
setattr(wrapped, attr_name, getattr(load_func, attr_name))
return wrapped
# Find the method name
load_func_name = f"_load_{loader.lower()}"
try:
load_func = getattr(self, load_func_name)
except AttributeError as err:
try:
load_func = resolve_from_registry(loader)
except KeyError:
raise LoaderError(
f"Loader '{loader}' was not available to {self.logstr}! "
"Make sure to use a mixin class that supplies "
f"the '{load_func_name}' loader method or that the loader "
"is properly registered via the @add_loader decorator.\n"
f"Available loaders: {', '.join(self.available_loaders)}"
) from err
else:
log.debug("Resolved '%s' loader function.", loader)
try:
TargetCls = getattr(load_func, "TargetCls")
except AttributeError as err:
raise LoaderError(
f"Load method '{load_func}' misses required attribute "
"'TargetCls'. Make sure the loading method in your mixin uses "
"the @add_loader decorator!"
) from err
return load_func, TargetCls
[docs]
def _resolve_path_list(
self,
*,
glob_str: Union[str, List[str]],
ignore: Union[str, List[str]] = None,
base_path: str = None,
required: bool = False,
**glob_kwargs,
) -> List[str]:
"""Create the list of file or directory paths to load.
Internally, this uses a set, thus ensuring that the paths are
unique. The set is converted to a list before returning.
.. note::
Paths may refer to file *and* directory paths.
Args:
glob_str (Union[str, List[str]]): The glob pattern or a list of
glob patterns to use for searching for files. Relative paths
will be seen as relative to ``base_path``.
ignore (List[str]): A list of paths to ignore. Relative paths will
be seen as relative to ``base_path``. Supports glob patterns.
base_path (str, optional): The base path for the glob pattern. If
not given, will use the ``data`` directory.
required (bool, optional): If true, will raise an error if at least
one matching path is required.
**glob_kwargs: Passed on to :py:func:`dantro.tools.glob_paths`.
See there for more available parameters.
Returns:
List[str]:
The (file or directory) paths to load.
Raises:
MissingDataError:
If no files could be matched.
RequiredDataMissingError:
If no files could be matched but were required.
"""
if base_path is None:
base_path = self.dirs["data"]
log.debug("Using data directory as base path.")
paths = glob_paths(
glob_str,
base_path=base_path,
ignore=ignore,
**glob_kwargs,
)
log.note(
"Found %d path%s with a total size of %s.",
len(paths),
"s" if len(paths) != 1 else "",
format_bytesize(total_bytesize(paths)),
)
log.debug("\n %s", "\n ".join(paths))
if not paths:
# No paths found; exit here, one way or another
if not required:
raise MissingDataError(
"No paths found matching "
f"`glob_str` {glob_str} (and ignoring {ignore})."
)
raise RequiredDataMissingError(
f"No paths found matching `glob_str` {glob_str} "
f"(and ignoring {ignore}) were found, but were "
"marked as required!"
)
return paths, base_path
[docs]
def _prepare_target_path(
self,
target_path: str,
*,
filepath: str,
base_path: str,
path_sre: re.Pattern = None,
join_char_replacement: str = "__",
**fstr_params,
) -> List[str]:
r"""Prepare the target path within the data tree where the loader's
output is to be placed.
The ``target_path`` argument can be a format string.
The following keys are available:
- ``dirname``: the directory path *relative* to the selected base
directory (typically the data directory).
- ``basename``: the lower-case base name of the file, without extension
- ``ext``: the lower-case extension of the file, without leading dot
- ``relpath``: The full (relative) path (without extension)
- ``dirname_cleaned`` and ``relpath_cleaned``: like above but with the
path join character (``/``) replaced by ``join_char_replacement``.
If ``path_sre`` is given, will additionally have the following keys
available as result of calling :py:meth:`re.Pattern.search` on the
given ``filepath``:
- ``match``: the first matched group, named or unnamed.
This is equivalent to ``groups[0]``.
If no match is made, will warn and fall back to the ``basename``.
- ``groups``: the sequence of matched groups; individual groups can be
accessed via the expanded formatting syntax, where ``{groups[1]:}``
will access the second match.
*Not available if there was no match.*
- ``named``: contains the matches for *named* groups; individual groups
can be accessed via ``{named[foo]:}``, where ``foo`` is the name of
the group.
*Not available if there was no match.*
For more information on how to define named groups, refer to the
`Python docs <https://docs.python.org/3/howto/regex.html#non-capturing-and-named-groups>`_.
.. hint::
For more complex target path format strings, use the ``named``
matches for higher robustness.
Examples (using ``path_regex`` instead of ``path_sre``):
.. code-block:: yaml
# Without pattern matching
filepath: data/some_file.ext
target_path: target/{ext}/{basename} # -> target/ext/some_file
# With simple pattern matching
path_regex: data/uni(\d+)/data.h5
filepath: data/uni01234/data.h5 # matches 01234
target_path: multiverse/{match}/data # -> multiverse/01234/data
# With pattern matching that uses named groups
path_regex: data/no(?P<num>\d+)/data.h5
filepath: data/no123/data.h5 # matches 123
target_path: target/{named[num]} # -> target/123
Args:
target_path (str): The target path :py:func:`format` string, which
may contain placeholders that are replaced in this method. For
instance, these placeholders may be those from the path regex
pattern specified in ``path_sre``, see above.
filepath (str): The actual path of the *file*, used as input to the
regex pattern.
base_path (str): The base path used when determining the
``filepath`` and from which a relative path can be computed.
Available as format keys ``relname`` and ``relname_cleaned``.
path_sre (re.Pattern, optional): The regex pattern that is used to
generate additional arguments that are useable in the format
string.
join_char_replacement(str, optional): The string to use to replace
the ``PATH_JOIN_CHAR`` (``/``) in the relative paths
**fstr_params: Made available to the formatting operation
Returns:
List[str]:
Path sequence that represents the target path within the
data tree where the loaded data is to be placed.
"""
clean_path = lambda p: p.replace(PATH_JOIN_CHAR, join_char_replacement)
# On Windows platform, ensure the path is given in posix format
if platform.system() == "Windows":
filepath = ensure_posix_path_on_windows(filepath)
# The dict to be filled with formatting parameters
fps = dict(**fstr_params)
# Extract the file path and name information
basename, ext = os.path.splitext(os.path.basename(filepath))
fps["basename"] = basename.lower()
fps["ext"] = ext[min(1, len(ext)) :].lower()
# Relative file path information
dirname = os.path.relpath(os.path.dirname(filepath), start=base_path)
fps["dirname"] = dirname
fps["dirname_cleaned"] = clean_path(dirname)
relpath, _ = os.path.splitext(
os.path.relpath(filepath, start=base_path)
)
# On Windows platform, ensure the relpath is given in posix format
if platform.system() == "Windows":
relpath = ensure_posix_path_on_windows(relpath)
fps["relpath"] = relpath
fps["relpath_cleaned"] = clean_path(relpath)
# Use the specified regex pattern to extract a match
if path_sre:
if (match := path_sre.search(filepath)) is None:
# nothing could be found
warnings.warn(
"Could not extract a name using the regex pattern "
f"'{path_sre}' on the file path:\n {filepath}\n"
"Using the path's basename instead.",
NoMatchWarning,
)
fps["match"] = fps["basename"]
else:
log.debug("Matched %s in file path '%s'.", match, filepath)
fps["groups"] = match.groups()
fps["match"] = fps["groups"][0]
fps["named"] = match.groupdict()
# Parse the format string to generate the file path
log.debug(
"Parsing format string '%s' to generate target path ...",
target_path,
)
log.debug(" kwargs: %s", fps)
try:
target_path = target_path.format(**fps)
except (KeyError, IndexError) as err:
_fps = "\n".join(f" {k}: {v}" for k, v in fps.items())
raise ValueError(
"Failed evaluating target path format string!\n"
f"Got {type(err).__name__}: {err}\n\n"
"Check the error message, the given regex pattern and the "
"possible file names that may occur.\n"
f" target_path: {target_path}\n"
f" filepath: {filepath}\n"
f" path_regex: {path_sre.pattern if path_sre else 'None'}\n"
f" available format string keys:\n{_fps}"
) from err
log.debug("Generated target path: %s", target_path)
return target_path.split(PATH_JOIN_CHAR)
[docs]
def _skip_path(self, path: str, *, exists_action: str) -> bool:
"""Check whether a given path exists and — depending on the
``exists_action`` – decides whether to skip this path or not.
Args:
path (str): The path to check for existence.
exists_action (str): The behaviour upon existing data. Can be:
``raise``, ``skip``, ``skip_nowarn``,
``overwrite``, ``overwrite_nowarn``.
The ``*_nowarn`` arguments suppress the warning.
Returns:
bool: Whether to skip this path
Raises:
ExistingDataError: Raised when `exists_action == 'raise'`
ValueError: Raised for invalid `exists_action` value
"""
if path not in self:
# Does not exist yet -> no need to skip
return False
# else: path exists already
# NOTE that it is not known whether the path points to a group
# or to a container
_msg = f"Path '{PATH_JOIN_CHAR.join(path)}' already exists."
# Distinguish different actions
if exists_action == "raise":
raise ExistingDataError(
f"{_msg} Adjust argument `exists_action` to allow skipping "
"or overwriting of existing entries."
)
if exists_action in ("skip", "skip_nowarn"):
if exists_action == "skip":
warnings.warn(
f"{_msg} Loading of this entry will be skipped.",
ExistingDataWarning,
)
return True # will lead to the data not being loaded
elif exists_action in ("overwrite", "overwrite_nowarn"):
if exists_action == "overwrite":
warnings.warn(
f"{_msg} It will be overwritten!", ExistingDataWarning
)
return False # will lead to the data being loaded
else:
raise ValueError(
"Invalid value for `exists_action` argument "
f"'{exists_action}'! Can be: raise, skip, "
"skip_nowarn, overwrite, overwrite_nowarn."
)
[docs]
def _store_object(
self,
obj: Union[BaseDataGroup, BaseDataContainer],
*,
target_path: List[str],
as_attr: Union[str, None],
unpack_data: bool,
exists_action: str,
) -> bool:
"""Store the given ``obj`` at the supplied ``target_path``.
Note that this will automatically overwrite, assuming that all
checks have been made prior to the call to this function.
Args:
obj (Union[BaseDataGroup, BaseDataContainer]): Object to store
target_path (List[str]): The path to store the object at
as_attr (Union[str, None]): If a string, store the object in
the attributes of the container or group at target_path
unpack_data (bool): Description
exists_action (str): Description
Returns:
bool: Whether storing was successful. May be False in case the
target path already existed and ``exists_action`` specifies
that it is to be skipped, or if the object was None.
Raises:
ExistingDataError: If non-group-like data already existed at
that path
RequiredDataMissingError: If storing as attribute was selected
but there was no object at the given target_path
"""
if obj is None:
log.debug("Object was None, not storing.")
return False
# Now handle the (easy) case where the object is to be stored
# as the attribute at the target_path
if as_attr:
# Try to load the object at the target path
try:
target = self[target_path]
except KeyError as err:
raise RequiredDataMissingError(
f"In order to store the object {obj.logstr} at the "
f"target path '{target_path}', a group or container "
"already needs to exist at that location within "
f"{self.logstr}, but there is no such object at that path!"
) from err
# Check whether an attribute with that name already exists
if as_attr in target.attrs:
raise ExistingDataError(
f"An attribute with the name '{as_attr}' "
f"already exists in {target.logstr}!"
)
# All checks passed. Can store it now, either directly or with
# unpacking of its data ...
if not unpack_data:
target.attrs[as_attr] = obj
else:
target.attrs[as_attr] = obj.data
log.debug(
"Stored %s as attribute '%s' of %s.",
obj.classname,
as_attr,
target.logstr,
)
return True
# Check if it is to be skipped
if self._skip_path(target_path, exists_action=exists_action):
log.debug("Skipping storing of %s.", obj.logstr)
return False
# Extract a target group path and a base name from path list
group_path = target_path[:-1]
basename = target_path[-1]
# Find out the target group, creating it if necessary.
if not group_path:
group = self
else:
# Need to retrieve or create the group.
# The difficulty is that the path can also point to a container.
# Need to assure here, that the group path points to a group.
if group_path not in self:
# Needs to be created
self.new_group(group_path)
elif not isinstance(self[group_path], BaseDataGroup):
# Already exists, but is no group. Cannot continue
group_path = PATH_JOIN_CHAR.join(group_path)
target_path = PATH_JOIN_CHAR.join(target_path)
raise ExistingDataError(
f"The object at '{group_path}' in {self.logstr} is "
f"not a group but a {type(self[group_path])}. Cannot "
f"store {obj.logstr} there because the target path "
f"'{target_path}' requires it to be a group."
)
# Now the group path will definitely point to a group
group = self[group_path]
# Check if any container already exists in that group, overwrite if so.
# Depending on `exists_action`, this method might have returned above
# already if overwriting is not intended.
if basename in group:
del group[basename]
# All good, can store the object now.
group.add(obj)
log.debug("Successfully stored %s at '%s'.", obj.logstr, obj.path)
return True
# .. Dumping and restoring the DataManager ................................
[docs]
def _parse_file_path(self, path: str, *, default_ext=None) -> str:
"""Parses a file path: if it is a relative path, makes it relative to
the associated data directory. If a default extension is specified and
the path does not contain one, that extension is added.
This helper method is used as part of dumping and storing the data
tree, i.e. in the :py:meth:`~dantro.data_mngr.DataManager.dump` and
:py:meth:`~dantro.data_mngr.DataManager.restore` methods.
"""
path = os.path.expanduser(path)
if not os.path.isabs(path):
path = os.path.join(self.dirs["data"], path)
# Handle file extension, adding a default extension if none was given
if default_ext:
path, ext = os.path.splitext(path)
path += ext if ext else default_ext
return path
[docs]
def dump(self, *, path: str = None, **dump_kwargs) -> str:
"""Dumps the data tree to a new file at the given path, creating any
necessary intermediate data directories.
For restoring, use :py:meth:`~dantro.data_mngr.DataManager.restore`.
Args:
path (str, optional): The path to store this file at. If this is
not given, use the default tree cache path that was set up
during initialization.
If it is given and a relative path, it is assumed relative to
the data directory.
If the path does not end with an extension, the ``.d3`` (read:
"data tree") extension is automatically added.
**dump_kwargs: Passed on to ``pkl.dump``
Returns:
str: The path that was used for dumping the tree file
"""
if path:
path = self._parse_file_path(path, default_ext=DATA_TREE_DUMP_EXT)
else:
path = self.tree_cache_path
os.makedirs(os.path.dirname(path), exist_ok=True)
log.progress("Dumping data tree of %s ...", self.logstr)
with open(path, mode="x+b") as file:
pkl.dump(self, file, **dump_kwargs)
log.progress("Successfully stored data tree in cache file.")
log.note(" Path: %s", path)
log.note(" File size: %s", format_bytesize(os.path.getsize(path)))
return path
[docs]
def restore(
self, *, from_path: str = None, merge: bool = False, **load_kwargs
):
"""Restores the data tree from a dump.
For dumping, use :py:meth:`~dantro.data_mngr.DataManager.dump`.
Args:
from_path (str, optional): The path to restore this DataManager
from. If it is not given, uses the default tree cache path
that was set up at initialization.
If it is a relative path, it is assumed relative to the data
directory. Take care to add the corresponding file extension.
merge (bool, optional): If True, uses a recursive update to merge
the current tree with the restored tree.
If False, uses :py:meth:`~dantro.data_mngr.DataManager.clear`
to clear the current tree and then re-populates it with the
restored tree.
**load_kwargs: Passed on to ``pkl.load``
Raises:
FileNotFoundError: If no file is found at the (expanded) path.
"""
if from_path:
path = self._parse_file_path(from_path)
else:
path = self.tree_cache_path
if not os.path.isfile(path):
raise FileNotFoundError(
f"Could not restore {self.logstr} as there was no tree "
f"cache file at '{path}'!"
)
log.progress("Restoring %s from data tree file ...", self.logstr)
log.note(" Path: %s", path)
log.note(" File size: %s", format_bytesize(os.path.getsize(path)))
with open(path, mode="rb") as file:
dm = pkl.load(file, **load_kwargs)
if not merge:
log.note(" Mode: clear and load")
self.clear()
else:
log.note(" Mode: load and merge")
self.recursive_update(dm)
log.progress("Successfully restored the data tree.")