Compose a workflow#
A workflow is composed by linking methods together using the output files of one method as input for another method. To compose a workflow, the following steps are taken:
initialize a workflow with optional wildcards and configuration.

Initialize a workflow#
To initialize the Workflow
class, a user can supply following arguments (all optional):
The root directory (recommended!) is the directory where the workflow is executed. All input and output files of the rule are relative to this directory.
The workflow configuration is used to store all workflow parameters and input files which are not output of a rule. Users can add their own parameters to the configuration and use these to initialize the methods. The configuration will also be stored as a separate file in the root directory when parsing the workflow to a workflow engine, see Export to and execute with Workflow Engine.
The wildcards are used to evaluate the method wildcards, see wildcards.
In [1]: from hydroflows.workflow import Workflow
In [2]: import logging
# setup logging
In [3]: logging.basicConfig(level=logging.INFO, format='%(levelname)s:%(message)s')
# initialize a workflow
In [4]: wf = Workflow(
...: root="./my_workflow",
...: config={"model_exe": "bin/model/model.exe"},
...: wildcards={"region": ["region1", "region2"]},
...: )
...:
# we now have a workflow with wildcards, but without rules
In [5]: print(wf)
Workflow(
wildcards={'region': ['region1', 'region2']}
rules=[]
)
Create workflow rules (basic)#
To create a rule, the method it is based on should be initialized first.
A rule is then created by added it to the workflow using the create_rule()
.
When calling create_rule
with a method the following steps are executed in the background:
The wildcards are detected and validated. Based on the wildcard values different method instances are created. This makes it possible to execute the method for multiple input files or parameters in parallel. A ReduceMethod expects a wildcard on the input files which is not present on the output files. An ExpandMethod expects a wildcard on the output files which is not present on the input files. And a Method expects either no wildcards or the same repeat wildcard on the input and output files.
The method input files are either linked to output files of previous rules or set in the workflow configuration.
A check is performed to ensure the output files are unique (not already used in the workflow).
The dependencies of the rule are evaluated.
The rule is added to the workflow.
Note
You cannot overwrite rules in the workflow or create rules with the same outputs. If you want to modify a rule when creating the workflow in an interactive environment, you should rerun the workflow creation from scratch. Most IPython Notebooks IDEs have A “Run all above” button to rerun the notebook from the beginning.
The following example shows how to create a rule based on a method. Here, the model_exe parameter is set using a reference to the workflow configuration.
In [6]: from hydroflows.methods.dummy import RunDummyEvent
# initialize a dummy method which performs a simulation for an event
In [7]: simulate_event = RunDummyEvent(
...: event_csv="events/event_x.csv",
...: settings_toml="settings.toml",
...: model_exe=wf.get_ref("$config.model_exe"), # use a reference to the workflow configuration
...: output_dir="model",
...: event_name="event_x",
...: )
...:
In [8]: print(simulate_event) # inspect the method
Method(
name=run_dummy_event
input=(event_csv=PosixPath('events/event_x.csv') settings_toml=PosixPath('settings.toml') model_exe=PosixPath('bin/model/model.exe'))
output=(model_out_nc=PosixPath('model/event_event_x_result.nc'))
params=(run_method='exe' output_dir=PosixPath('model') event_name='event_x')
)
# create a rule and add it to the workflow
In [9]: wf.create_rule(simulate_event, rule_id="simulate_event")
Out[9]: Rule(id=simulate_event, method=run_dummy_event, runs=1)
# we now have a workflow with one rule
In [10]: print(wf)
Workflow(
wildcards={'region': ['region1', 'region2']}
rules=[Rule(id=simulate_event, method=run_dummy_event, runs=1)]
)
The output files of the method can be used as input for subsequent methods, see example below. Note that the rules need to be created and added to the workflow in the right order to ensure that the output files of one method are available as input for the next method.
The following example shows how to create a rule that uses the output of the previous rule. The output of the previous rule is accessed using the output attribute of the method. A reference to the output file can automatically be created because output files have to be unique in the workflow.
In [11]: from hydroflows.methods.dummy import PostprocessDummyEvent
# initialize a method that postprocesses the output of the simulation
In [12]: postprocess = PostprocessDummyEvent(
....: model_nc=simulate_event.output.model_out_nc,
....: output_dir="results",
....: event_name="event_x",
....: )
....:
In [13]: print(postprocess) # inspect the method
Method(
name=postprocess_dummy_event
input=(model_nc=PosixPath('model/event_event_x_result.nc'))
output=(postprocessed_nc=PosixPath('results/event_event_x_postprocessed.nc'))
params=(output_dir=PosixPath('results') event_name='event_x')
)
# add the method to the workflow
In [14]: wf.create_rule(postprocess, rule_id="postprocess_event")
Out[14]: Rule(id=postprocess_event, method=postprocess_dummy_event, runs=1)
# we now have a workflow with two rules
In [15]: print(wf)
Workflow(
wildcards={'region': ['region1', 'region2']}
rules=[Rule(id=simulate_event, method=run_dummy_event, runs=1)
Rule(id=postprocess_event, method=postprocess_dummy_event, runs=1)]
)
Create workflow rules (repeat wildcards)#
The same workflow can be repeated using wildcards in the method input files or parameters.
We use wildcards instead of python loops to ensure that the workflow can be parallelized and executed on a workflow engine.
This is done using the wildcard name between {}
in the input files or parameters of the method.
Note that the wildcard should be on the input and output to repeat the method for each wildcard value.
The following example shows how to create a workflow with repeat wildcards. First the wildcards are set at the workflow level, then the wildcards are used in the input files and parameters of the method which should be repeated. Multiple wildcards can be combined in which case the method is repeated for all combinations of the wildcard values. If the wildcard is accidentally only used in the input files or output files, an error will be raised.
The following example shows how create a rules with repeat wildcards.
The same methods as before are used, but now with the {event}
and {region}
wildcards.
The new rules will have six instances, one for each combination of the wildcards.
# set the wildcards for the workflow
In [16]: wf.wildcards.set("event", ["event01", "event02", "event03"])
# initialize a method with `region` and `event` wildcards
In [17]: simulate_event_repeat = RunDummyEvent(
....: event_csv="events/{region}/{event}.csv",
....: settings_toml="{region}/settings.toml",
....: model_exe=wf.get_ref("$config.model_exe"), # use a reference to the workflow configuration
....: output_dir="model/{region}",
....: event_name="{event}",
....: )
....:
In [18]: print(simulate_event_repeat) # inspect the method
Method(
name=run_dummy_event
input=(event_csv=PosixPath('events/{region}/{event}.csv') settings_toml=PosixPath('{region}/settings.toml') model_exe=PosixPath('bin/model/model.exe'))
output=(model_out_nc=PosixPath('model/{region}/event_{event}_result.nc'))
params=(run_method='exe' output_dir=PosixPath('model/{region}') event_name='{event}')
)
# add the method to the workflow. Note that we give it a new unique rule_id
In [19]: wf.create_rule(simulate_event_repeat, rule_id="simulate_event_repeat")
Out[19]: Rule(id=simulate_event_repeat, method=run_dummy_event, runs=6, repeat=['event', 'region'])
# initialize a method that postprocesses the output of the simulation
In [20]: postprocess_repeat = PostprocessDummyEvent(
....: model_nc=simulate_event_repeat.output.model_out_nc,
....: output_dir="results/{region}",
....: event_name="{event}",
....: )
....:
In [21]: print(postprocess_repeat) # inspect the method
Method(
name=postprocess_dummy_event
input=(model_nc=PosixPath('model/{region}/event_{event}_result.nc'))
output=(postprocessed_nc=PosixPath('results/{region}/event_{event}_postprocessed.nc'))
params=(output_dir=PosixPath('results/{region}') event_name='{event}')
)
# add the method to the workflow
In [22]: wf.create_rule(postprocess_repeat, rule_id="postprocess_repeat")
Out[22]: Rule(id=postprocess_repeat, method=postprocess_dummy_event, runs=6, repeat=['event', 'region'])
# we now have a workflow with four rules, the two new rules are repeated for each event
In [23]: print(wf)
Workflow(
wildcards={'event': ['event01', 'event02', 'event03'], 'region': ['region1', 'region2']}
rules=[Rule(id=simulate_event, method=run_dummy_event, runs=1)
Rule(id=simulate_event_repeat, method=run_dummy_event, runs=6, repeat=['event', 'region'])
Rule(id=postprocess_event, method=postprocess_dummy_event, runs=1)
Rule(id=postprocess_repeat, method=postprocess_dummy_event, runs=6, repeat=['event', 'region'])]
)
Create workflow rules (expand and reduce wildcards)#
In order to create multiple output files from a single set of input files (expand) or to create a single output file from multiple input files (reduce),
special methods called ExpandMethod
and ReduceMethod
can be used, see Expand and reduce methods.
For example, the PrepareDummyEvents
method can be used to create multiple events for different return periods from a single time series.
The method has a wildcard
parameter to define the wildcard name, while its values will be based on the rps
parameter.
Which input parameter is used for expanding or reducing depends on the method logic and is described in the method documentation.
At initialization, an ExpandMethod
stores the name and values as expand wildcard which are used to create multiple output files.
In [24]: from hydroflows.methods.dummy import PrepareDummyEvents
# initialize new workflow
In [25]: wf = Workflow(
....: root="./my_workflow",
....: config={"model_exe": "bin/model/model.exe"},
....: )
....:
# initialize a method
In [26]: prepare_events = PrepareDummyEvents(
....: timeseries_csv="data/timeseries.csv",
....: output_dir="events",
....: rps=[1,5,10,50,100],
....: wildcard="event",
....: )
....:
In [27]: print(prepare_events) # inspect the method
ExpandMethod(
name=prepare_dummy_events
input=(timeseries_csv=PosixPath('data/timeseries.csv'))
output=(event_csv=PosixPath('events/event_rp{event}.csv') event_set_yaml=PosixPath('events/event_set.yml'))
params=(output_dir=PosixPath('events') index_col=0 wildcard='event' rps=[1, 5, 10, 50, 100])
expand_wildcards=(event=['0001', '0005', '0010', '0050', '0100'])
)
# add the method to the workflow
In [28]: wf.create_rule(prepare_events, rule_id="prepare_events")
Out[28]: Rule(id=prepare_events, method=prepare_dummy_events, runs=1, expand=['event'])
# inspect the workflow to see if the expand wildcard is set
In [29]: print(wf)
Workflow(
wildcards={'event': ['0001', '0005', '0010', '0050', '0100']}
rules=[Rule(id=prepare_events, method=prepare_dummy_events, runs=1, expand=['event'])]
)
After an ExpandMethod
is added to the workflow, the wildcard can be used in subsequent rules to repeat the
method for each value of the wildcard value and/or to reduce over multiple input files.
In the following example, the RunDummyEvent
method is repeated for each event,
created by the PrepareDummyEvents
method,
followed by the ReduceMethod
CombineDummyEvents
that combines the results.
The latter takes the output of all event simulations as input.
In [30]: from hydroflows.methods.dummy import CombineDummyEvents
# initialize a method that simulates the events
In [31]: simulate_events = RunDummyEvent(
....: event_csv=prepare_events.output.event_csv,
....: settings_toml="settings.toml",
....: model_exe=wf.get_ref("$config.model_exe"), # use a reference to the workflow configuration
....: output_dir="model",
....: event_name="{event}",
....: )
....:
In [32]: print(simulate_events) # inspect the method
Method(
name=run_dummy_event
input=(event_csv=PosixPath('events/event_rp{event}.csv') settings_toml=PosixPath('settings.toml') model_exe=PosixPath('bin/model/model.exe'))
output=(model_out_nc=PosixPath('model/event_{event}_result.nc'))
params=(run_method='exe' output_dir=PosixPath('model') event_name='{event}')
)
# add the method to the workflow
In [33]: wf.create_rule(simulate_events, rule_id="simulate_events")
Out[33]: Rule(id=simulate_events, method=run_dummy_event, runs=5, repeat=['event'])
# initialize a method that combines the results of the events
In [34]: combine_events = CombineDummyEvents(
....: model_out_ncs=simulate_events.output.model_out_nc,
....: output_dir="results",
....: )
....:
In [35]: print(combine_events) # inspect the method
ReduceMethod(
name=combine_dummy_events
input=(model_out_ncs=PosixPath('model/event_{event}_result.nc'))
output=(combined_out_nc=PosixPath('results/events_combined.nc'))
params=(output_dir=PosixPath('results'))
)
# add the method to the workflow
In [36]: wf.create_rule(combine_events, rule_id="combine_events")
Out[36]: Rule(id=combine_events, method=combine_dummy_events, runs=1, reduce=['event'])
# we now have a workflow with thee rules, only the second rule is repeated for each event
In [37]: print(wf)
Workflow(
wildcards={'event': ['0001', '0005', '0010', '0050', '0100']}
rules=[Rule(id=prepare_events, method=prepare_dummy_events, runs=1, expand=['event'])
Rule(id=simulate_events, method=run_dummy_event, runs=5, repeat=['event'])
Rule(id=combine_events, method=combine_dummy_events, runs=1, reduce=['event'])]
)
More workflow examples#
More (complex) examples with full flood risk workflows are available in the HydroFlows Example workflows.