Integration Example#

dantro works best if it’s tightly integrated into your project. The aim of this guide is to give step-by-step instructions of how dantro can be used to build a data processing pipeline for your project.

Note

The examples given here focus on coveying the necessary steps for the pipeline integration, not so much on the ideal structure of the implementation. Thus, we recommend going through this guide at least once before starting with the actual implementation of the data processing pipeline for your project. 🤓


Overview#

Let’s assume you are in the following situation:

  • You are working on a project that generates some form of structured data. The data itself can have very different properties. It …

    • … may be hierarchically organized, e.g. in HDF5 files

    • … may contain data of very different kinds (numerical array-like, meta-data, plain text data, configuration files…), i.e. semantically heterogeneous data

    • … may contain data that requires processing before becoming meaningful

  • You want to be able to work with this data in a uniform way:

    • Load the generated data

    • Access and explore it

    • Transform it

    • Visualize it

  • You will be going through this process more than once, such that putting in the effort to automate the above steps will pay off.

The result of this integration guide will be a data processing pipeline: an automated set of procedures that can be carried out on the generated data in order to handle, transform, and visualize it. These procedures will be referred to as the three stages of the processing pipeline.

Setting up a tightly integrated pipeline will require more than a few lines of code, as you will see in this guide. However, once implemented, the pipeline will be highly flexible, such that you can quickly configure it to your needs. Overall, we think that the up-front time investment of setting up the pipeline will be paid-off by the everyday gains of using this framework and the automizations it provides.

Hint

If you encounter any questions or issues with the integration, please raise an issue on the project page. We are happy to assist and smooth out the pipeline integration process.

Note

Some remarks regarding the code examples that follow below:

  • The sequence of examples below is tested and ensured to work with the version of dantro this documentation corresponds to.

  • The my_project string is used to refer to the Python project in which the processing pipeline is being defined in.

  • In general, every name of the form MyFoobar denotes that you can (and should) choose your own name for these data structures or variables.

Important: For illustrational purposes, the code shown here is not modularized into different files but presented in a linear fashion. If you are looking for a minimal pipeline implementation, you can follow this approach. However, if you are building a processing pipeline that should be expandable and grow alongside your project, splitting these code chunks into multiple modules is highly recommended; see Suggested module structure.

While all relevant parts of the code examples are already included on this page, you can inspect the full source code used for these examples:

Note that these files are part of the automated tests and thus include a small number of additional definitions, e.g. to write output to a temporary directory instead of a local path.

Data Generation#

For this guide, we define a simple data generator that will feed the example data processing pipeline.

This is of course only a placeholder for your already-existing project, e.g. a numerical simulation, an agent-based model, or some data collection routine. The routine shown here is meant to illustrate which kinds of data structures can be worked with and in which manner.

Storyline#

The storyline for our example data generator will be a numerical simulation. Given a set of parameters, each simulation will create output data that includes:

  • An HDF5 file with a bunch of (hierarchically nested) datasets

    • A random_walk time series

    • A simple agent-based model, which stores all its data to an abm group. It will write the state of each agent as well as some macroscopic observables into that group.

  • The set of parameters that were used to generate the simulation output

  • The accompanying logger output as a text file

Furthermore, the simulation will make use of the paramspace package to generate simulation data not for a single set of parameters but for a whole multi-dimensional parameter space.

Preparations#

Disregarding the details of the numerical simulation for a moment, let’s look at how it will be embedded, configured, and invoked.

First, we need some basic imports and definitions. For this example, let’s assume that you have base_out_path defined as the directory where simulation output should be stored in, and sim_cfg_path as the path to a YAML file that defines the parameters for the simulation. (The project_cfg_path and plots_cfg_path will be used later on.)

# -- Step 0: Some basic imports and definitions
import os

import paramspace as psp

import dantro as dtr
from dantro.tools import load_yml, write_yml

sim_cfg_path = "~/path/to/sim_cfg.yml"             # simulation parameters
base_out_path = "~/my_output_directory"            # where to store output
project_cfg_path = "~/my_project/project_cfg.yml"  # project configuration
plots_cfg_path = "~/path/to/plot_cfg.yml"          # plot configurations
# NOTE: In practice, you might want to define these paths not in this
#       absolute fashion but via `pkg_resources.resource_filename` function

Now, let’s load the configuration and extract the simulation parameters. Here, they will be a ParamSpace object which allows to easily define a set of different parameters to sweep over. (For the actual values of the parameter space, see below.)

# -- Step 1: Load a configuration file
sim_cfg = load_yml(sim_cfg_path)

# ... and extract the simulation parameters (an iterable ParamSpace object)
pspace = sim_cfg["parameter_space"]
assert isinstance(pspace, psp.ParamSpace)

Also, we need to prepare some output directory path, here: sim_out_dir, where all the output for this specific simulation run should be stored. The directory itself need not be created here.

# -- Step 2: Prepare the output directory path for *this* simulation
sim_name = "my_simulation"  # ... probably want a time stamp here
sim_out_dir = os.path.join(base_out_path, sim_name)

# Store the parameter space there, for reproducibility
write_yml(pspace, path=os.path.join(sim_out_dir, "pspace.yml"))

Generating data and storing it#

Having extracted the relevant parameters, we now iterate over the parameter space. For each set of parameters resulting from this iteration, we create the data using those parameters and store it inside the output directory:

# -- Step 3: Use the parameter space to generate and store the output data
for params, state_str in pspace.iterator(with_info="state_no_str"):
    # `params` is now a dict that contains the set of parameters for this
    # specific instantiation.
    # `state_str` is a string identifying this point of the parameter space

    # Create the path to the output directory for _this_ simulation
    this_out_dir = os.path.join(sim_out_dir, "sim_" + state_str)

    # Generate the data, using all those parameters
    print(f"Running simulation for parameters '{state_str}' … ", end="\t")
    generate_and_store_data(this_out_dir, **params)
    print("Done.")

print(f"All data generated and stored at {sim_out_dir} .")

In this example, the generate_and_store_data function takes care of all of the tasks. In your project, this might be done in any other way.

Note

As the exact procedure of how the data is generated is not important, the corresponding source code of the above examples is omitted here. (If you are interested, you can still find it below.)

Hint

If the above procedure is similar in your project, you may want to consider to also use the paramspace package in your project to manage the parameters of your data generation procedure.

To handle hyper-parameters, dantro makes use of ParamSpace objects in some other parts as well, e.g. for creating plots from simulations that were created in the above manner. Using paramspace for the generation routine can thus simplify the automation of data loading and visualization later on.

So far, so good: We now have some simulation output that we can use to feed the data processing pipeline.

Summary#

The above serves as an example of how we can:

  • Use a configuration file to define the parameters for a set of simulations

  • Store the simulation output in a specific output directory for each set of parameters

  • Store the configuration and parameter space alongside, such that we can later reproduce the simulation if we wanted to

Of course, all this might be quite different to what is needed to generate or collect actual data in your specific scenario. This example merely illustrates one way to generate that data, in the hope that you can adapt it to your needs.

For now, the important point is: You are writing data to some output directory and storing the metadata (the configuration) alongside. This data will be the input to the processing pipeline.

Note

The data can be generated in any conceivable fashion; it is not required that it is generated by a Python project. Only the processing pipeline will be implemented as a Python project.

References#


Stage 1: Data Loading and Handling#

Loading the generated data into a uniform data structure, the data tree, is the first stage of the data processing pipeline.

The loading will be carried out by a custom DataManager that we will call MyDataManager. This specialization can be configured such that it adapts to the structure of the data that is being worked with, e.g. by using specialized container or group types for certain kinds of data.

Following dantro’s configurability philosophy, all relevant parameters for loading will be consolidated into a configuration file.

Defining a custom DataManager#

To specialize the DataManager for the pipeline, we can simply derive from it:

# -- Step 4: Define a custom DataManager
import dantro.data_loaders
import dantro.groups

class MyDataManager(dtr.data_loaders.AllAvailableLoadersMixin,
                    dtr.DataManager):
    """MyDataManager can load HDF5, YAML, Text, Pickle, ..."""
    # Register known group types
    _DATA_GROUP_CLASSES = dict(ParamSpaceGroup=dtr.groups.ParamSpaceGroup,
                               TimeSeriesGroup=dtr.groups.TimeSeriesGroup)

    # Specify a content mapping for loading HDF5 data
    _HDF5_GROUP_MAP = dict(time_series=dtr.groups.TimeSeriesGroup)

In this case, MyDataManager has all available loaders available. If desired, the available loaders can be controlled in a more granular fashion, see Specializing the DataManager.

Furthermore, it was supplied with information about available group types. We will use those below to build the initial tree structure.

The _HDF5_GROUP_MAP class variable is an example of a customization of one of the loaders. In this case, the given mapping is used by the Hdf5LoaderMixin to load appropriately labelled HDF5 groups not as the default dantro group type, but as the specified type, which can be a specialized version.

Initializing MyDataManager#

To instantiate MyDataManager, we read the corresponding configuration entry from the project configuration and pass those parameters to it:

# -- Step 5: Load the project configuration
project_cfg = load_yml(project_cfg_path)

# ... and extract the initialization parameters for MyDataManager
dm_cfg = project_cfg["data_manager"]


# -- Step 6: Set up the DataManager & associate it with the data directory.
dm = MyDataManager(sim_out_dir, name=sim_name, **dm_cfg)

# The data tree is still empty (except for the `simulations` group).
# Let's check:
print(dm.tree)
# Will print:
#   Tree of MyDataManager 'my_simulation', 1 member, 0 attributes
#    └─ simulations           <ParamSpaceGroup, 0 members, 0 attributes>

As initialization parameters, we pass the following arguments:

data_manager:
  # Where to store output that is associated with the data from this directory
  out_dir: "eval/{timestamp:}"  # can be a format string

  # Define the structure of the data tree beforehand; this allows to specify
  # the types of groups before content is loaded into them.
  create_groups:
    - path: simulations
      Cls: ParamSpaceGroup

  # The load configuration: specifies which files to load how and from where
  load_cfg:
    # ... load configuration goes here; omitted for brevity

These already include the so-called load_cfg, i.e. a set of parameters that specifies which data should be loaded from where and how it should be stored in the data tree.

Furthermore, these parameters can be used to already generate a part of the data tree; this can make loading data easier in some scenarios. Here, the create_groups argument creates the simulations group, a ParamSpaceGroup, where each member is assumed to be the output of a single simulation. (See here for more information.)

The out_dir of the DataManager is a directory that is used to store output that is associated with the to-be-loaded data. For example, the visualization output will end up in that directory.

Loading data#

Let’s recap which data was written during data generation:

  • pspace.yml stored the simulation parameters of all simulations.

  • For each simulation, the following files were created:

    • data.h5 is an HDF5 file with hierarchically structured numerical data

    • params.yml is the set of parameters for this particular simulation

    • sim.log is the plain text simulation log output

Basically, we want to represent the same structure in the data tree. Thus, loading should carry out the following operations:

  • Load the global pspace.yml and associate it with the already existing simulations group, such that it is aware of the parameter space.

  • For each simulation output directory:

    • Load data.h5 into a new group inside the simulations group.

    • Load simulation metadata (sim.log and params.yml) and store them alongside.

As mentioned above, all these load operations can be specified in the load_cfg. For the data.h5 files, an entry of the load_cfg would look something like this:

sim_data:
  loader: hdf5
  glob_str: sim_*/data.h5
  required: true
  path_regex: sim_(\d+)/data.h5
  target_path: simulations/{match:}/data

This selects the relevant data.h5 files inside the output directory using the glob_str argument and then uses path_regex to determine the target_path inside the simulations group. The full load configuration is omitted here (you can inspect it below). For general information on the load configuration, see here.

With the load configuration already specified during initialization, loading the data into the data tree is a simple matter of invoking the load_from_cfg() method:

# -- Step 7: Load data using the load configuration given at initialisation
dm.load_from_cfg(print_tree="condensed")
# Will load the data and then print a condensed tree overview:
# Tree of MyDataManager 'my_simulation', 1 member, 0 attributes
#  └─ simulations             <ParamSpaceGroup, 30 members, 1 attribute>
#     └┬ 12                   <ParamSpaceStateGroup, 3 members, 0 attributes>
#        └┬ params            <MutableMappingContainer, 1 attribute>
#         ├ data              <OrderedDataGroup, 2 members, 0 attributes>
#           └┬ abm            <OrderedDataGroup, 2 members, 0 attributes>
#              └┬ energy      <TimeSeriesGroup, 31 members, 1 attribute>
#                 └┬ 0        <XrDataContainer, float64, (dim_0: 42), 0 attributes>
#                  ├ 1        <XrDataContainer, float64, (dim_0: 42), 0 attributes>
#                  ├ ...          ... (27 more) ...
#                  ├ 29       <XrDataContainer, float64, (dim_0: 4), 0 attributes>
#                  └ 30       <XrDataContainer, float64, (dim_0: 0), 0 attributes>
#               ├ mean_energy <NumpyDataContainer, float64, shape (101,), 0 attributes>
#               └ num_agents  <NumpyDataContainer, uint32, shape (101,), 0 attributes>
#            └ random_walk    <NumpyDataContainer, float64, shape (1024,), 0 attributes>
#         └ log               <StringContainer, str stored, 1 attribute>
#      ├ 13                   <ParamSpaceStateGroup, 3 members, 0 attributes>
#        └┬ params            <MutableMappingContainer, 1 attribute>
#         ├ data              <OrderedDataGroup, 2 members, 0 attributes>
#           └┬ abm            <OrderedDataGroup, 2 members, 0 attributes>
#              └┬ energy      <TimeSeriesGroup, 35 members, 1 attribute>
#                 └┬ 0        <XrDataContainer, float64, (dim_0: 42), 0 attributes>
#                  ├ ...          ... (33 more) ...
#                  └ 34       <XrDataContainer, float64, (dim_0: 0), 0 attributes>
#               ├ mean_energy <NumpyDataContainer, float64, shape (101,), 0 attributes>
#               └ num_agents  <NumpyDataContainer, uint32, shape (101,), 0 attributes>
#            └ random_walk    <NumpyDataContainer, float64, shape (1024,), 0 attributes>
#         └ log               <StringContainer, str stored, 1 attribute>
#      ├ 14                   <ParamSpaceStateGroup, 3 members, 0 attributes>
#      ...

The (condensed) tree view shows which data was loaded into which part of the tree and provides some further information on the structure of the data. As you see, the initial simulations group was populated with the output from the individual simulations, the HDF5 tree was unpacked, and the parameter and log output was stored alongside. So: We preserved the hierarchical representation of the data, both from within the HDF5 file and from the directory structure.

Furthermore, the loader already applied a type mapping during loading: the data/abm/energy group is a TimeSeriesGroup, which assumes that the underlying datasets represent a time series.

Hint

load_from_cfg() also allows supplying new parameters or updating those given at initialization.

Once loaded, the tree can be navigated in a dict-like fashion:

# To access data, can use the dict interface and paths
for sim in dm["simulations"].values():
    num_steps = sim["params"]["abm"]["num_steps"]
    extinct_after = np.argmin(sim["data/abm/num_agents"])

    print(f"In simulation '{sim.name}', agents got extinct after "
          f"{extinct_after} / {num_steps} iterations.")

Summary#

To recap, the following steps were carried out:

  • We specialized a DataManager

  • We then initialized it with arguments from the project_cfg

  • We loaded data as it was specified in a load configuration (also defined in project_cfg)

With this, the first stage of the data processing pipeline is set up: We have automated the loading of data into the data tree. If further data needs to be loaded or the shape of the data tree needs to be adjusted, the load_cfg can be changed accordingly.

References#

Stage 2: Data Transformation#

To couple to the data transformation framework, the second stage of the processing pipeline, no special steps need to be taken. As part of the data visualization stage, the plot creators take care of setting up everything and passing the relevant configuration options directly to the data transformation framework.

However, to be able to conveniently register additional data operations, we suggest to add a dedicated module (e.g. data_ops.py) to your project in which data operations can be defined and registered using the register_operation() function. It can look as simple as the following:

# -- Step 8: Add a module where additional data operations can be defined
"""This module can be used to register project-specific data operations"""
from dantro.data_ops import register_operation

def do_something(data):
    """Given some data, does something."""
    # ... do something here ...
    return data

register_operation(name="do_something", func=do_something)

Even if you do not have the need for custom operations at the point of building the integration, it is useful to already set up this module, such that it is easy to add further operations once you need them.

Note

Make sure that this additional module is loaded when the rest of your project is loaded. If the register_operation() calls are not interpreted, the operations will not be available.

Summary#

  • No additional steps required

  • For having a place to define and register further data operations, adding a custom module is useful

References#

Stage 3: Visualization#

With the data tree loaded and the transformation framework ready, we merely need to set up the plotting framework, which is orchestrated by the PlotManager.

The process is similar to that with the DataManager: We will create a specialized version of it, instantiate it, and provide a configuration that defines some common parameters.

Defining a custom PlotManager#

Akin to the customization achieved via MyDataManager, we will define MyPlotManager as a customization of PlotManager. The customizations done there pertain mostly to registering further plot creators.

In this example, we will use dantro’s existing plot creators. Subsequently, MyPlotManager is simply a child of PlotManager and does not require any further changes:

# -- Step 9: Specialize a PlotManager
class MyPlotManager(dtr.PlotManager):
    """My custom PlotManager"""
    pass

    # If plot creators are customized, specify them here
    # CREATORS = dict(custom=MyCustomPlotCreator)

Initializing MyPlotManager#

In order to have access to the data tree, MyPlotManager is associated with access to the MyDataManager instance from above. Furthermore, we will use the configuration specified in the project_cfg during initialization, such that we can adjust MyPlotManager behaviour directly from the project configuration file:

# -- Step 10: Initialize MyPlotManager from the project configuration
pm_cfg = project_cfg["plot_manager"]

pm = MyPlotManager(dm=dm, **pm_cfg)

The pm_cfg is used to specify some default behaviour of the manager, e.g. that it should raise exceptions instead of merely logging them:

plot_manager:
  # Set the default creator type
  default_creator: pyplot

  # Raise exceptions when a plot fails
  raise_exc: true

  # Specify some default kwargs for the creators
  creator_init_kwargs:
    external:
      default_ext: pdf  # plots should by default be saved as PDFs

    universe:
      default_ext: pdf
      psgrp_path: simulations

    multiverse:
      default_ext: pdf
      psgrp_path: simulations

As part of this initialization process, default arguments for the plot creators are also supplied via creator_init_kwargs. In this case, we configure these creators to use pdf as the default file extension. For the ParamSpace-supporting plot creators (see Plots from Multidimensional Data), we specify the path to the ParamSpaceGroup inside the data tree.

Creating plots#

Creating plots is now as easy as invoking plot_from_cfg() with a path to a configuration file (or with a dict containing the corresponding configuration).

Let’s have a look at an example plot configuration and how it is invoked:

# For each simulation, called "universe" here, plot the random walk
random_walk:
  # Choose a creator
  # Here: the UniversePlotCreator, a specialization of the PyPlotCreator
  creator: universe
  universes: all

  # Use dantro's generic facet grid function, useful for representing
  # high-dimensional data
  module: .generic
  plot_func: facet_grid

  # Select the data to plot
  select:
    data:
      # Access the data/random_walk container for each simulation ...
      path: data/random_walk
      # ... and transform it into an xr.DataArray
      transform:
        - xr.DataArray: !dag_prev
          kwargs:
            dims: [iteration]
# -- Step 11: Invoke the plots specified in a configuration file
pm.plot_from_cfg(plots_cfg=plots_cfg_path)

Once invoked, the logger output will show the progress of the plotting procedure. It will show that a plot named random_walk is created for each of the simulations, as specified in the plot configuration. This is using the UniversePlotCreator, which is capable of detecting the parameter space and which uses the capabilities of the PlotManager to generate multiple plots.

Hint

To plot only a subset of the plots configured in plots_cfg, use the plot_only argument of plot_from_cfg(). This is a useful parameter to make available via a CLI.

The plotting output will be saved to the output directory, which is the eval/{timestamp:} directory that MyDataManager created inside the data directory exactly for this purpose.

Extended example#

Let’s look at a more involved example that plots mean random walk data from the parameter sweep (mean_random_walk) and the ABM’s mean energy time series (abm_mean_energy):

# --- Define so-called "multiverse" plots, using data from all simulations

# Using data from all simulations, compute the mean over the seed dimension and
# then show different lines for different step sizes
mean_random_walk:
  creator: multiverse
  module: .generic
  plot_func: facet_grid

  # Select the data from the individual simulations and combine them into a
  # higher-dimensional xarray.DataArray
  select_and_combine:
    fields:
      random_walk: data/random_walk

  transform:
    # Perform the mean operation over the seed dimension
    - .mean: [!dag_tag random_walk, seed]
    # Rename the dim_0 dimension
    - .rename: !dag_prev
      kwargs:
        dim_0: iterations
      tag: data

  # Configure the facet grid to show max_step_size as line hues
  kind: line
  x: iterations
  hue: max_step_size

  # And make the plot prettier using the PlotHelper
  helpers:
    set_title:
      title: Averaged Random Walk Trajectories
    set_labels:
      y: Value


# Using data of the sweep over the `seed` dimension, plot individual time
# series of the mean agent energy
abm_mean_energy:
  creator: multiverse

  module: .generic
  plot_func: facet_grid

  select_and_combine:
    fields:
      mean_energy:
        path: data/abm/mean_energy
        # Transform it into a DataArray and rename dimensions accordingly
        transform:
          - xr.DataArray
          - .rename: !dag_prev
            kwargs:
              dim_0: iterations

    # Only use a single max_step_size, because ABM results are unaffected
    subspace:
      max_step_size: [1.]

  transform:
    - .squeeze: !dag_tag mean_energy
      kwargs:
        drop: true
    - print: !dag_prev
      tag: data

  # Configure the facet grid to show different seeds as line hue
  kind: line
  x: iterations
  hue: seed

  # Finally, invoke some helpers
  helpers:
    set_title:
      title: Mean Agent Energy for Different Simulations
    set_limits:
      x: [0, ~]
      y: [0, ~]

These plot configurations already do much more and are meant to illustrate the capabilities of the plotting framework. Without going into detail, let’s highlight some of the operations specified above:

  • With the MultiversePlotCreator, data from several simulations can be combined into a higher-dimensional array.

  • The select_and_combine key controls which data to select from each simulation and how it should be combined into the higher-dimensional object.

  • The transform key is used to control the Data Transformation Framework, e.g. to calculate the mean over some dimension of the data or label the dimensions accordingly.

  • The facet_grid plot is a very versatile plotting function for high-dimensional data, which is why it is used here. See here for more information.

  • With the plot helpers, the aesthetics of the plot can be changed, e.g. to set limits or labels right from the plot configuration.

The above example gives a glimpse of the possibilities of the plotting framework. All of these features are already available as part of dantro.

Importantly, though, the plotting framework becomes much more capable once you specialize it to your needs. For example, with the PyPlotCreator and its built-in access to the Data Transformation Framework, you can easily define further plotting functions that form a bridge between selected and transformed data and its visualization.

Hint

To re-use plot configurations, there is the Plot Configuration Inheritance feature, which makes plot specifications much more modular and concise. It allows to outsource common parts of the plot configurations into a so-called “base configuration”, and compose these back together using the based_on argument.

This feature requires to specify a set of “base plot configurations”, e.g. as defined in a base_plots_cfg.yml file. The path to this file or the content of it needs to be communicated to the PlotManager at some point, e.g. via its __init__() call.

Summary#

To couple the data loading and transformation stages to the plotting framework, the following steps were necessary:

  • Specialize a PlotManager

  • Instantiate it using arguments from a configuration file

  • Configure plots in a configuration file

  • Tell the MyPlotManager instance to generate plots from that configuration

With this, the data processing pipeline is complete: it automates the loading of data, its processing, and its visualization. 🎉🎉

Note

Before repeating these steps for your project, make sure to study the Suggested module structure section below.

References#

Closing Remarks#

Of course, integration doesn’t end here. While this guide describes how the basic infrastructure of the pipeline can be implemented, you have many more possibilities to specialize the pipeline to your project’s needs.

We hope that this guide helps in integrating dantro into your project!

Note

If you encounter any difficulties with this process, have a question or suggestion, or need support of any other kind, feel free to open an issue on the project page. We are looking forward to your feedback!


Further Information#

Suggested module structure#

In the linearly presented code examples above, no particular module structure is apparent. For the Python project your project’s processing pipeline will be implemented in, we suggest the following structure:

├ run_my_pipeline.py        # performs pipeline invocation
└ my_project/               # contains the pipeline definition
  ├ __init__.py
  ├ data_io.py              # defines MyDataManager and custom containers
  ├ data_ops.py             # defines custom data operations
  ├ ...
  ├ pipeline_cfg.yml        # stores default pipeline configuration
  ├ base_plots_cfg.yml      # defines base plot configurations
  ├ ...
  ├ plot_funcs.py           # defines functions for PyPlotCreator
  └ plotting.py             # defines MyPlotManager

Here, run_my_pipeline.py is a script that determines which configuration files should be used and passed to MyDataManager and MyPlotManager. It can, for example, use the argparse module to provide a command line interface where the data directory and the configuration file paths can be specified.

What goes where?#

So… which code examples from above should be implemented in which module?

  • Class definitions and specializations should all go into the modules inside my_project

  • Variable definitions (e.g. via CLI), instantiations of managers, and method calls should go into run_my_pipeline.py (for good measure: inside an if __name__ == "__main__" block)

  • … the only exception being calls to register_operation(), which should be made in the data_ops module directly

Regarding configuration files, we suggest the following:

  • Put the pipeline default values into pipeline_cfg.yml and use the entries from there to set up MyDataManager and MyPlotManager, similar as the project_cfg.yml the example.

  • Any updates to those defaults can then be done at runtime, e.g. via run_my_pipeline.py

  • Plot configurations of plots you frequently use should go into base_plots_cfg.yml

Adapting to a growing project#

Your project will certainly grow over time. The above structure allows that your pipeline implementation grows alongside. You can dynamically extend the above structure with submodules to allow a more granular module structure:

├ run_my_pipeline.py
└ my_project/
  ├ data_io/
    ├ __init__.py
    ├ some_custom_container.py
    ├ some_custom_group.py
    ├ ...
    └ some_custom_proxy.py
  ├ data_ops/
    ├ __init__.py
    ├ operations.py
    └ ...
  ├ plotting/
    ├ plot_funcs/
      ├ __init__.py
      ├ generic.py
      ├ ...
      └ multi_dim.py
    ├ __init__.py
    ├ some_plot_creator.py
    ├ ...
    └ some_custom_proxy.py
  ├ __init__.py
  ├ data_io.py
  ├ data_ops.py
  ├ ...
  ├ pipeline_cfg.yml
  ├ base_plots_cfg.yml
  ├ ...
  ├ plot_funcs.py
  └ plotting.py

Hint

By adding additional imports from the new submodules to the top-level modules, you can avoid breaking imports.

Remarks#

  • For robustly determining configuration file paths from within the python package, use pkg_resources.resource_filename (see their docs)

  • The dantro manager structures usually allow to pass strings instead of nested dicts for defining configurations, e.g. the plots_cfg. Such a string is interpreted as a path to a YAML configuration file. This can alleviate loading the YAML files in the outer scope, e.g. the run_my_pipeline.py.

  • We are considering to add a CLI interface directly to dantro to alleviate the need to define a run_my_pipeline.py file manually.

Example of a full pipeline#

For an example of a fully integrated data processing pipeline that makes use of most dantro features, have a look at the Utopia project. The specializations described above are implemented in the datacontainer, datagroup, plotting and datamngr modules shown above. User-defined plotting functions for the customized plot creators can be found in a separate plotting module.

Data generation and storage function#

The following example shows the data generation routine, consisting of a random walk and a (naive) implementation of a simple agent-based model (ABM).

import os
from functools import reduce

import h5py as h5
import numpy as np

from dantro.tools import write_yml


class Agent:
    """A simple agent class, emulating an ABM"""
    def __init__(self, *, energy: float):
        """Sets up an agent with some initial energy value"""
        self.energy = energy

    def iterate(self, *, p_death: float, p_eat: float,
                dE_eat: float, dE_live: float) -> "Agent":
        """Iterates the agent state: deduces life costs, evaluates probability
        of eating and random death.

        Note that negative energy will lead to the agent not being regarded
        as alive any longer.
        """
        self.energy += dE_live

        if np.random.random() < p_eat:
            self.energy += dE_eat

        if p_death > 0. and np.random.random() < p_death:
            self.energy = 0
        return self

    def is_alive(self) -> bool:
        """Whether the agent is still alive, i.e. has positive energy"""
        return self.energy > 0.


def generate_and_store_data(out_dir: str, *, seed: int, **params) -> dict:
    """Generate the simulation data using the given parameters and store the
    results in a file inside ``out_dir``.

    .. note::

        In practice, this will be your own data-generating module or project.
        This example function aims to show different aspects of what's possible
        to do with dantro.

    Args:
        out_dir (str): Path to the directory to store data files in
        **params: The data generation parameters
    """
    def perform_random_walk(*, num_steps: int, initial_state: float,
                            max_step_size: float) -> np.ndarray:
        """Performs a 1D random walk, returns an array of size (num_steps+1)"""
        rand_nums = np.random.uniform(-max_step_size, max_step_size,
                                      size=(num_steps + 1,))
        rand_nums[0] = initial_state
        return np.cumsum(rand_nums)

    def iterate_abm(agents, **iter_kwargs) -> list:
        """Iterates the ABM and returns an updated list of agents"""
        agents = [a.iterate(**iter_kwargs) for a in agents]
        return [a for a in agents if a.is_alive()]

    def write_agent_data(agents, *, step: int, base_group: h5.Group,
                         mean_energy: h5.Dataset, num_agents: h5.Dataset):
        """Stores agent data in the given base group"""
        energy = [a.energy for a in agents]
        base_group.create_dataset(f"energy/{step}", data=energy)

        mean_energy[step] = np.mean(energy if energy else [np.nan])
        num_agents[step] = len(agents)

        # Label the group accordingly
        base_group["energy"].attrs["content"] = "time_series"

    # -- Preparations
    # Emulate a logger for this example. In a real example, this would be a
    # proper logger, configured to write directly to a file ...
    log = []

    # Set up output directory
    log.append(f"Creating output directory {out_dir} ...")
    os.makedirs(out_dir, exist_ok=True)

    # Seed the RNG
    log.append(f"Setting PRNG seed to {seed} ...")
    np.random.seed(seed)

    # Set up HDF5 file to write most of the output to
    log.append("Opening HDF5 output file ...")
    f = h5.File(os.path.join(out_dir, "data.h5"), mode='w')


    # -- Generate data and store it into the HDF5 file
    log.append("Generating and storing data now ...")

    f.create_dataset("random_walk",
                     data=perform_random_walk(**params["random_walk"]))
    log.append("Stored random walk data.")

    log.append("Setting up simple ABM ...")
    num_steps = params["abm"]["num_steps"]
    num_agents = params["abm"]["num_agents"]

    g = f.create_group("abm")
    mean_energy_ds = g.create_dataset("mean_energy", shape=(num_steps+1,),
                                      dtype='float64', fillvalue=np.nan)
    num_agents_ds = g.create_dataset("num_agents", shape=(num_steps+1,),
                                     dtype='uint32', fillvalue=0)

    agents = [Agent(**params["abm"]["init"]) for _ in range(num_agents)]
    write_agent_data(agents, step=0, base_group=g,
                     mean_energy=mean_energy_ds, num_agents=num_agents_ds)

    for i in range(num_steps):
        agents = iterate_abm(agents, **params["abm"]["iterate"])
        write_agent_data(agents, step=i+1, base_group=g,
                         mean_energy=mean_energy_ds, num_agents=num_agents_ds)
        if not agents:
            break

    # -- Finish up data writing
    # The hierarchically structured data
    log.append("Closing HDF5 file ...")
    f.close()

    # The parameters as a YAML file
    log.append("Storing simulation parameters ...")
    write_yml(dict(seed=seed, **params),
              path=os.path.join(out_dir, "params.yml"))

    # Lastly, the log output as a text file
    log.append("Storing log output ... good bye!")
    with open(os.path.join(out_dir, "sim.log"), 'w') as f:
        f.write("\n".join(log))
        f.write("\n")

Simulation parameters#

The corresponding simulation parameters are the following, which actually represent a two-dimensional parameter space (along dimensions seed of the internal random number generator, and max_step_size of the random walk).

# A parameter space of simulation parameters
parameter_space: !pspace
  # The PRNG seed, here: a sweep variable
  seed: !pdim
    default: 42
    range: [10]  # sweeps over values 0, 1, 2, ..., 9

  # The random walk parameters
  random_walk:
    initial_state: 0.
    num_steps: 1023
    max_step_size: !pdim
      default: 1.
      values: [0.1, 0.3, 1.0]

  abm:
    num_steps: 100
    num_agents: 42
    init:
      energy: 1.
    iterate:
      dE_live: -.05
      dE_eat: +.1
      p_death: .01
      p_eat: .1

Full load configuration#

The following is the load_cfg used in the initialization of MyDataManager:

# Load the parameter space object into the ParamSpaceGroup attributes
pspace:
  loader: yaml_to_object    # ... to load it as an ObjectContainer
  glob_str: pspace.yml
  required: true
  load_as_attr: true
  unpack_data: true         # ... and store as ParamSpace obj.
  target_path: simulations

# Load the configuration files that are generated for _each_ simulation
sim_params:
  loader: yaml
  glob_str: sim_*/params.yml
  required: true
  path_regex: sim_(\d+)/params.yml
  target_path: simulations/{match:}/params

# Load the binary output data from each simulation.
sim_data:
  loader: hdf5
  glob_str: sim_*/data.h5
  required: true
  path_regex: sim_(\d+)/data.h5
  target_path: simulations/{match:}/data
  enable_mapping: true
  map_from_attr: content  # which attribute to use for key of type mapping
  # See MyDataManager for the available key -> type mappings

# Load the logging output for each simulation
sim_log:
  loader: text
  glob_str: sim_*/sim.log
  required: false
  path_regex: sim_(\d+)/sim.log
  target_path: simulations/{match:}/log