Multi-scenario workflows (workflow)

Concept & design

Research with MESSAGEix models often involves multiple scenarios that are related to one another or derived from one another by certain modifications. Together, the solutions/reported information from these scenarios provide the output data used in research products, e.g. a plot comparing total emissions in a policy scenario to a reference scenario.

model.build provides tools to build models or scenarios based on (possibly empty) base scenarios; and tools provides tools for manipulating scenarios or model input data (parameters). The Workflow API provided in this module allows researchers to use these pieces and atomic, reusable functions to define arbitrarily complex workflows involving many, related scenarios; and then to solve, report, or otherwise operate on those scenarios.

The generic pattern for workflows is:

  • Each scenario has zero or 1 (or more?) base/precursor/antecedent scenarios. These must exist before the target scenario can be created.

  • A workflow ‘step’ includes:

    1. A precursor scenario is obtained.

      It may be returned by a prior workflow step, or loaded from a Platform.

    2. (Optional) The precursor scenario is cloned to a target model name and scenario name.

    3. A function is called to operate on the scenario. This function may do zero or more of:

      • Apply structure or data modifications, for example:

        • Set up a model variant, e.g. adding the MESSAGEix-Materials structure to a base MESSAGEix-GLOBIOM model.

        • Change policy variables via constraint parameters.

        • Any other possible modification.

      • Solve the target scenario.

      • Invoke reporting.

    4. The resulting function is passed to the next workflow step.

  • A workflow can consist of any number of scenarios and steps.

  • The same precursor scenario can be used as the basis for multiple target scenarios.

  • A workflow is Workflow.run() starting with the earliest precursor scenario, ending with 1 or many target scenarios.

The implementation is based on the observation that these form a graph (specifically, a directed, acyclic graph, or DAG) of nodes (= scenarios) and edges (= steps), in the same way that message_ix.report calculations do; and so the dask DAG features (via genno) can be used to organize the workflow.

Usage

General

Define a workflow using ordinary Python functions, each handling the modifications/manipulations in an atomic workflow step. These functions must:

  • Accept at least 2 arguments:

    1. A Context instance.

    2. The precursor scenario.

    3. Optionally additional, keyword-only arguments.

  • Return either:

    • a Scenario object, that can be the same object provided as an argument, or a different scenario, e.g. a clone or a different scenario, even from a different platform.

    • None. In this case, any modifications implemented by the step should be reflected in the Scenario given as an argument.

The functions may:

  • call any other code, and

  • be as short (one line) or long (many lines) as desired;

and they should:

  • respond in documented, simple ways to settings on the Context argument and/or their keyword argument(s), if any.

def changes_a(s: Scenario) -> None:
    """Change a scenario by modifying structure data, but not data."""
    with s.transact():
        s.add_set("technology", "test_tech")

    # Here, invoke other code to further modify `s`

def changes_b(s: Scenario, value=100.0) -> None:
    """Change a scenario by modifying parameter data, but not structure.

    This function takes an extra argument, `values`, so functools.partial()
    can be used to supply different values when it is used in different
    workflow steps. See below.
    """
    with s.transact():
        s.add_par(
            "technical_lifetime",
            make_df(
                "technical_lifetime",
                node_loc=s.set("node")[0],
                year_vtg=s.set("year")[0],
                technology="test_tech",
                value=100.0,
                unit="y",
            ),
        )

    # Here, invoke other code to further modify `s`

With the steps defined, the workflow is composed using a Workflow instance. Call Workflow.add_step() to define each target model with its precursor and the function that will create the former from the latter:

from message_ix_models import Context, Workflow

# Create the workflow
ctx = Context.get_instance()
wf = Workflow(ctx)

# "Model name/base" is loaded from an existing platform
wf.add_step(
    "base",
    None,
    target="ixmp://example-platform/Model name/base#123",
)

# "Model/A" is created from "Model/base" by calling changes_a()
wf.add_step("A", "base", changes_a, target="Model/A")

# "Model/B1" is created from "Model/A" by calling changes_b() with the
# default value
wf.add_step("B1", "A", changes_b, target="Model/B1")

# "Model/B2" is similar, but uses a different value
wf.add_step("B2", "A", partial(changes_b, value=200.0), target="model/B2")

Finally, the workflow is triggered using Workflow.run(), giving either one step name or a list of names. The indicated scenarios are created (and solved, if the workflow steps involve solving); if this requires any precursor scenarios, those are first created and solved, etc. as required. Other, unrelated scenarios/steps are not created.

s1, s2 = wf.run(["B1", "B2"])

Usage examples

  • message_data.projects.navigate.workflow

Todo

Expand with discussion of workflow patterns common in research projects using MESSAGEix, e.g.:

  • Run the same scenario with multiple emissions budgets.

API reference

Tools for modeling workflows.

class message_ix_models.workflow.Workflow(context: Context)[source]

Workflow for operations on multiple Scenarios.

Parameters:

context (Context) – Context object with settings common to the entire workflow.

add_step(name: str, base: str | None = None, action: Callable | None = None, replace=False, **kwargs) str[source]

Add a WorkflowStep to the workflow.

Parameters:
  • name (str) – Name for the new step.

  • base (str or None) – Previous step that produces the a pre-requisite scenario for this step.

  • action (CallbackType) – Function to be executed to modify the base into the target Scenario.

  • replace (bool) – True to replace an existing step.

  • kwargs – Keyword arguments for action; passed to and stored on the WorkflowStep until used.

Returns:

The same as name.

Return type:

str

Raises:

genno.KeyExistsError – if the step name already exists. Use replace to force overwriting an existing step.

guess_target(step_name: str, kind: Literal['platform', 'scenario'] = 'scenario') tuple[Mapping, str][source]

Traverse the graph looking for non-empty platform_info/scenario_info.

Returns the info, and the step name containing it. Usually, this will identify the name of the platform, model, and/or scenario that is received and acted upon by step_name. This may not be the case if preceding workflow steps perform clone steps that are not recorded in the target parameter to WorkflowStep.

Parameters:
run(name_or_names: str | list[str])[source]

Run all workflow steps necessary to produce name_or_names.

Parameters:

name_or_names (str or list of str) – Identifier(s) of steps to run.

truncate(name: str)[source]

Truncate the workflow at the step name.

The step name is replaced with a new WorkflowStep that simply loads the target Scenario that would be produced by the original step.

Raises:

KeyError – if step name does not exist.

message_ix_models.workflow.make_click_command(wf_callback: str, name: str, slug: str, **kwargs) Command[source]

Generate a click CLI command to run a Workflow.

This command:

  • when invoked, imports the module containing the wf_callback, retrieve and calls the function. This function receives the values for any click parameters (arguments and/or options) passed in kwargs. The module is not imported until/unless the command is run.

  • …is automatically given the parameters:

    • --go: Actually run the workflow; otherwise the workflow is only displayed.

    • --from: Truncate the workflow at any step(s) whose names are a full match for this regular expression.

  • uses the default_key (if any) of the Workflow returned by wf_callback, if the user does not provide TARGET on the command-line.

Parameters:
  • wf_callback (str) – Fully-resolved name (module and object name) for a function that generates the workflow; for instance “message_ix_models.project.foo.workflow.generate”.

  • name (str) – Descriptive workflow name used in the --help text.

  • slug (str) – File name fragment for writing the workflow diagram; the path slug-workflow.svg is used.

  • kwargs (optional) – Passed to click.command(), for instance to define additional parameters for the command.

class message_ix_models.workflow.WorkflowStep(action: Callable | None, target=None, clone=False, **kwargs)[source]

Single step in a multi-scenario workflow.

Nothing occurs when the WorkflowStep is instantiated.

Parameters:
  • name (str) – "model name/scenario name" for the Scenario produced by the step.

  • action (CallbackType, optional) – Function to be executed to modify the base into the target Scenario.

  • clone (bool, optional) – True to clone the base scenario the target.

  • target (str, optional) – URL for the scenario produced by the workflow step. Parsed to scenario_info and platform_info.

  • kwargs (dict) – Keyword arguments for action.

__call__(context: Context, scenario: Scenario | None = None) Scenario[source]

Execute the workflow step.

action: Callable | None = None

Function to be executed on the subject scenario. If None, the target scenario is loaded via Context.get_scenario().

clone: bool | dict = False

True or a dict with keyword arguments to clone before action is executed. Default: False, do not clone.

kwargs: dict

Keyword arguments passed to action.

platform_info: dict

Target platform name and additional options.

scenario_info: dict

Target model name, scenario name, and optional version.