Source code for message_ix_models.workflow

"""Tools for modeling workflows."""

import logging
import re
from collections.abc import Callable, Mapping
from typing import TYPE_CHECKING, Literal, Optional, Union

from genno import Computer
from message_ix import Scenario

from message_ix_models.util.context import Context
from message_ix_models.util.ixmp import parse_url

if TYPE_CHECKING:
    from click import Command

log = logging.getLogger(__name__)

# commented: this conflicts with option keyword arguments to workflow step functions
# CallbackType = Callable[[Context, Scenario], Scenario]
CallbackType = Callable


[docs]class WorkflowStep: """Single step in a multi-scenario workflow. Nothing occurs when the WorkflowStep is instantiated. Parameters ---------- name : str ``"model name/scenario name"`` for the :class:`.Scenario` produced by the step. action : CallbackType, optional Function to be executed to modify the base into the target Scenario. clone : bool, optional :obj:`True` to clone the base scenario the target. target : str, optional URL for the scenario produced by the workflow step. Parsed to :attr:`scenario_info` and :attr:`platform_info`. kwargs Keyword arguments for `action`. """ #: Function to be executed on the subject scenario. If :obj:`None`, the target #: scenario is loaded via :meth:`Context.get_scenario`. action: Optional[CallbackType] = None #: :obj:`True` or a :class:`dict` with keyword arguments to clone before #: :attr:`action` is executed. Default: :obj:`False`, do not clone. clone: Union[bool, dict] = False #: Keyword arguments passed to :attr:`action`. kwargs: dict #: Target platform name and additional options. platform_info: dict #: Target model name, scenario name, and optional version. scenario_info: dict def __init__( self, action: Optional[CallbackType], target=None, clone=False, **kwargs ): try: # Store platform and scenario info by parsing the `target` URL self.platform_info, self.scenario_info = parse_url(target) except (AttributeError, ValueError): if clone is not False: raise TypeError("target= must be supplied for clone=True") self.platform_info = dict() self.scenario_info = dict() # Store the callback and options self.action = action self.clone = clone self.kwargs = kwargs
[docs] def __call__( self, context: Context, scenario: Optional[Scenario] = None ) -> Scenario: """Execute the workflow step.""" if scenario is None: # No base scenario if self.action: raise RuntimeError( f"Step with action {self.action!r} requires a base scenario" ) # Use Context to retrieve the identified scenario context.platform_info.update(self.platform_info) context.scenario_info.update(self.scenario_info) s = context.get_scenario() log.info(f"Loaded ixmp://{s.platform.name}/{s.url}") else: # Modify the context to identify destination scenario; possibly nothing context.dest_scenario.update(self.scenario_info) s = scenario log.info(f"Step runs on ixmp://{s.platform.name}/{s.url}") if context.dest_scenario: log.info(f" with context.dest_scenario={context.dest_scenario}") if self.clone is not False: # Clone to target model/scenario name log.info("Clone to {model}/{scenario}".format(**self.scenario_info)) kw = self.scenario_info.copy() # If clone contains keyword arguments, e.g. shift_first_model_year, use them # NB user code should give clone = dict(keep_solution=True) if desired kw.update( self.clone if isinstance(self.clone, dict) else dict(keep_solution=False) ) s = s.clone(**kw) if not self.action: return s log.info(f"Execute {self.action!r}") # Modify context to identify the target scenario context.set_scenario(s) try: # Invoke the callback result = self.action(context, s, **self.kwargs) except Exception: # pragma: no cover s.platform.close_db() # Avoid locking the scenario raise if result is None: log.info(f"…nothing returned, workflow will continue with {s.url}") result = s return result
def __repr__(self): action = f"{self.action.__name__}()" if self.action else "load" dest = "" if self.scenario_info: dest = " -> {model}/{scenario}".format(**self.scenario_info) return f"<Step {action}{dest}>"
[docs]class Workflow(Computer): """Workflow for operations on multiple :class:`Scenarios <message_ix.Scenario>`. Parameters ---------- context : Context Context object with settings common to the entire workflow. """ def __init__(self, context: Context): super().__init__() self.add_single("context", context)
[docs] def add_step( self, name: str, base: Optional[str] = None, action: Optional[CallbackType] = None, replace=False, **kwargs, ) -> str: """Add a :class:`WorkflowStep` to the workflow. Parameters ---------- name : str Name for the new step. base : str or None Previous step that produces the a pre-requisite scenario for this step. action : CallbackType Function to be executed to modify the base into the target Scenario. replace : bool :data:`True` to replace an existing step. kwargs Keyword arguments for `action`; passed to and stored on the :class:`WorkflowStep` until used. Returns ------- str The same as `name`. Raises ------ genno.KeyExistsError if the step `name` already exists. Use `replace` to force overwriting an existing step. """ # Create the workflow step step = WorkflowStep(action, **kwargs) if replace: # Remove any existing step self.graph.pop(name, None) # Add to the Computer; return the name of the added step return str(self.add_single(name, step, "context", base, strict=True))
[docs] def run(self, name_or_names: Union[str, list[str]]): """Run all workflow steps necessary to produce `name_or_names`. Parameters ---------- name_or_names: str or list of str Identifier(s) of steps to run. """ return self.get(name_or_names)
[docs] def truncate(self, name: str): """Truncate the workflow at the step `name`. The step `name` is replaced with a new :class:`WorkflowStep` that simply loads the target :class:`.Scenario` that would be produced by the original step. Raises ------ KeyError if step `name` does not exist. """ # Generate a new step that merely loads the scenario identified by `name` or its # base step = WorkflowStep(None) step.scenario_info.update(self.guess_target(name, "scenario")[0]) try: step.platform_info.update(self.guess_target(name, "platform")[0]) except KeyError as e: if e.args[0] is None: raise RuntimeError( f"Unable to locate platform info for {step.scenario_info}" ) else: # pragma: no cover raise # Something else # Replace the existing step self.add_single(name, step, "context", None)
[docs] def guess_target( self, step_name: str, kind: Literal["platform", "scenario"] = "scenario" ) -> tuple[Mapping, str]: """Traverse the graph looking for non-empty platform_info/scenario_info. Returns the info, and the step name containing it. Usually, this will identify the name of the platform, model, and/or scenario that is received and acted upon by `step_name`. This may not be the case if preceding workflow steps perform clone steps that are not recorded in the `target` parameter to :class:`WorkflowStep`. Parameters ---------- step_name : str Initial step from which to work backwards. kind : str, "platform" or "scenario" Whether to look up :attr:`~WorkflowStep.platform_info` or :attr:`~WorkflowStep.scenario_info`. """ task = self.graph[step_name] i = getattr(task[0], f"{kind}_info") return (i.copy(), step_name) if len(i) else self.guess_target(task[2], kind)
[docs]def make_click_command(wf_callback: str, name: str, slug: str, **kwargs) -> "Command": """Generate a click CLI command to run a :class:`.Workflow`. This command: - when invoked, imports the module containing the `wf_callback`, retrieve and calls the function. This function receives the values for any :mod:`click` parameters (arguments and/or options) passed in `kwargs`. The module is not imported until/unless the command is run. - …is automatically given the parameters: - :program:`--go`: Actually run the workflow; otherwise the workflow is only displayed. - :program:`--from`: Truncate the workflow at any step(s) whose names are a full match for this regular expression. - uses the :attr:`~.Computer.default_key` (if any) of the :class:`.Workflow` returned by `wf_callback`, if the user does not provide :program:`TARGET` on the command-line. Parameters ---------- wf_callback : str Fully-resolved name (module and object name) for a function that generates the workflow; for instance "message_ix_models.project.foo.workflow.generate". name : str Descriptive workflow name used in the :program:`--help` text. slug : str File name fragment for writing the workflow diagram; the path :file:`{slug}-workflow.svg` is used. kwargs : optional Passed to :func:`click.command`, for instance to define additional parameters for the command. """ import click help_arg = f"""Run the {name} workflow up to step TARGET. Unless --go is given, the workflow is only displayed. --from is interpreted as a regular expression. """ @click.command(name="run", help=help_arg, **kwargs) @click.option("--go", is_flag=True, help="Actually run the workflow.") @click.option( "--from", "truncate_step", help="Truncate workflow at matching step(s)." ) @click.argument("target_step", metavar="TARGET", required=False) @click.pass_obj def _func(context, go, truncate_step, target_step, **kwargs): from importlib import import_module from message_ix_models.util import show_versions # Import the module and retrieve the callback function module_name, callback_name = wf_callback.rsplit(".", maxsplit=1) module = import_module(module_name) callback = getattr(module, callback_name) # Generate the workflow wf = callback(context, **kwargs) # Truncate the workflow try: expr = re.compile(truncate_step.replace("\\", "")) except AttributeError: pass # truncate_step is None else: for step in filter(expr.fullmatch, wf.keys()): log.info(f"Truncate workflow at {step!r}") wf.truncate(step) # Identify the target step if target_step: # Compile the string into a regular expression target_expr = re.compile(target_step) # Select 1 or more targets based on a regular expression in `target_step` target_steps = sorted(filter(lambda k: target_expr.fullmatch(k), wf.keys())) if len(target_steps): # Create a new target that collects the selected ones target_step = "cli-targets" wf.add(target_step, target_steps) else: raise click.ClickException( f"No step(s) matched {target_expr!r} among:\n{sorted(wf.keys())}" ) else: # Workflow default if not wf.default_key: raise click.ClickException( f"No target step provided and no default for {wf}" ) target_step = wf.default_key log.info(f"Execute workflow\n{wf.describe(target_step)}") log.debug(f"…with package versions:\n{show_versions()}") if not go: path = context.get_local_path(f"{slug}-workflow.svg") wf.visualize(str(path)) log.info(f"Workflow diagram written to {path}") return wf.run(target_step) return _func
def solve(context, scenario, **kwargs): scenario.solve(**kwargs) return scenario